SAMZA-1709: Use lazy creation for SystemAdmins in ApplicationRunners
authorCameron Lee <calee@linkedin.com>
Wed, 13 Jun 2018 23:12:44 +0000 (16:12 -0700)
committerxiliu <xiliu@linkedin.com>
Wed, 13 Jun 2018 23:12:44 +0000 (16:12 -0700)
An instance of SystemAdmins is created when instantiating any AbstractApplicationRunner, but the SystemAdmins is only actually needed for some of the methods for some of the runners. For example, LocalApplicationRunner.kill does not need SystemAdmins, and LocalContainerRunner does not need SystemAdmins for anything.
Doing lazy instantiation allows us to more easily manage the SystemAdmins lifecycle, since it removes the need to add lifecycle hooks for the ApplicationRunner.
This also fixes the lifecycle management for SystemAdmins in ApplicationRunners.

Author: Cameron Lee <calee@linkedin.com>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #512 from cameronlee314/runnner_system_admins

samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
samza-test/src/test/java/org/apache/samza/test/operator/StreamApplicationIntegrationTestHarness.java
samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java

index b0473c1..7f60f96 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.execution;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import java.util.Collection;
@@ -49,7 +50,12 @@ public class StreamManager {
 
   private final SystemAdmins systemAdmins;
 
-  public StreamManager(SystemAdmins systemAdmins) {
+  public StreamManager(Config config) {
+    this(new SystemAdmins(config));
+  }
+
+  @VisibleForTesting
+  StreamManager(SystemAdmins systemAdmins) {
     this.systemAdmins = systemAdmins;
   }
 
@@ -70,6 +76,14 @@ public class StreamManager {
     }
   }
 
+  public void start() {
+    this.systemAdmins.start();
+  }
+
+  public void stop() {
+    this.systemAdmins.stop();
+  }
+
   Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
     Map<String, Integer> streamToPartitionCount = new HashMap<>();
 
index 5043977..3716d2b 100644 (file)
  */
 package org.apache.samza.runtime;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -32,17 +39,9 @@ import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemAdmins;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
 
 /**
  * Defines common, core behavior for implementations of the {@link ApplicationRunner} API.
@@ -50,9 +49,6 @@ import java.util.Set;
 public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
-  private final StreamManager streamManager;
-  private final SystemAdmins systemAdmins;
-
   /**
    * The {@link ApplicationRunner} is supposed to run a single {@link StreamApplication} instance in the full life-cycle
    */
@@ -61,8 +57,6 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   public AbstractApplicationRunner(Config config) {
     super(config);
     this.graphSpec = new StreamGraphSpec(this, config);
-    this.systemAdmins = new SystemAdmins(config);
-    this.streamManager = new StreamManager(systemAdmins);
   }
 
   @Override
@@ -72,16 +66,6 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return getStreamSpec(streamId, physicalName);
   }
 
-  @Override
-  public void run(StreamApplication streamApp) {
-    systemAdmins.start();
-  }
-
-  @Override
-  public void kill(StreamApplication streamApp) {
-    systemAdmins.stop();
-  }
-
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
    *
@@ -126,12 +110,12 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return new StreamSpec(streamId, physicalName, system, isBounded, properties);
   }
 
-  public ExecutionPlan getExecutionPlan(StreamApplication app) throws Exception {
-    return getExecutionPlan(app, null);
+  public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager streamManager) throws Exception {
+    return getExecutionPlan(app, null, streamManager);
   }
 
   /* package private */
-  ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception {
+  ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManager streamManager) throws Exception {
     // build stream graph
     app.init(graphSpec, config);
 
@@ -152,11 +136,6 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return planner.plan(specGraph);
   }
 
-  /* package private for testing */
-  StreamManager getStreamManager() {
-    return streamManager;
-  }
-
   /**
    * Write the execution plan JSON to a file
    * @param planJson JSON representation of the plan
@@ -178,4 +157,10 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     }
   }
 
+  @VisibleForTesting
+  StreamManager buildAndStartStreamManager() {
+    StreamManager streamManager = new StreamManager(this.config);
+    streamManager.start();
+    return streamManager;
+  }
 }
index d3df741..0dcb4bf 100644 (file)
@@ -42,6 +42,7 @@ import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
+import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.operators.StreamGraphSpec;
@@ -157,10 +158,12 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      super.run(app);
+      streamManager = buildAndStartStreamManager();
+
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app);
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
       String executionPlanJson = plan.getPlanAsJson();
       writePlanJsonFile(executionPlanJson);
@@ -169,7 +172,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       // 2. create the necessary streams
       // TODO: System generated intermediate streams should have robust naming scheme. See SAMZA-1391
       String planId = String.valueOf(executionPlanJson.hashCode());
-      createStreams(planId, plan.getIntermediateStreams());
+      createStreams(planId, plan.getIntermediateStreams(), streamManager);
 
       // 3. create the StreamProcessors
       if (plan.getJobConfigs().isEmpty()) {
@@ -190,13 +193,16 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
       shutdownLatch.countDown();
       throw new SamzaException(String.format("Failed to start application: %s.", app), throwable);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public void kill(StreamApplication streamApp) {
     processors.forEach(StreamProcessor::stop);
-    super.kill(streamApp);
   }
 
   @Override
@@ -252,7 +258,9 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
    * @param intStreams list of intermediate {@link StreamSpec}s
    * @throws TimeoutException exception for latch timeout
    */
-  /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
+  private void createStreams(String planId,
+      List<StreamSpec> intStreams,
+      StreamManager streamManager) throws TimeoutException {
     if (intStreams.isEmpty()) {
       LOG.info("Set of intermediate streams is empty. Nothing to create.");
       return;
@@ -268,7 +276,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
       // each application process will try creating the streams, which
       // requires stream creation to be idempotent
-      getStreamManager().createStreams(intStreams);
+      streamManager.createStreams(intStreams);
       return;
     }
 
@@ -277,7 +285,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
       // check if the processor needs to go through leader election and stream creation
       if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
         LOG.info("lock acquired for streams creation by " + uid);
-        getStreamManager().createStreams(intStreams);
+        streamManager.createStreams(intStreams);
         lockWithState.unlockAndSet();
       } else {
         LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
index 202fa76..99fdc51 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.runtime;
 
 import java.time.Duration;
+import java.util.UUID;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -27,14 +28,13 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.execution.ExecutionPlan;
+import org.apache.samza.execution.StreamManager;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.UUID;
-
 import static org.apache.samza.job.ApplicationStatus.*;
 
 
@@ -61,22 +61,23 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
    */
   @Override
   public void run(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      super.run(app);
+      streamManager = buildAndStartStreamManager();
       // TODO: run.id needs to be set for standalone: SAMZA-1531
       // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
       String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
       LOG.info("The run id for this run is {}", runId);
 
       // 1. initialize and plan
-      ExecutionPlan plan = getExecutionPlan(app, runId);
+      ExecutionPlan plan = getExecutionPlan(app, runId, streamManager);
       writePlanJsonFile(plan.getPlanAsJson());
 
       // 2. create the necessary streams
       if (plan.getApplicationConfig().getAppMode() == ApplicationConfig.ApplicationMode.BATCH) {
-        getStreamManager().clearStreamsFromPreviousRun(getConfigFromPrevRun());
+        streamManager.clearStreamsFromPreviousRun(getConfigFromPrevRun());
       }
-      getStreamManager().createStreams(plan.getIntermediateStreams());
+      streamManager.createStreams(plan.getIntermediateStreams());
 
       // 3. submit jobs for remote execution
       plan.getJobConfigs().forEach(jobConfig -> {
@@ -86,33 +87,44 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
         });
     } catch (Throwable t) {
       throw new SamzaException("Failed to run application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public void kill(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
-      ExecutionPlan plan = getExecutionPlan(app);
+      streamManager = buildAndStartStreamManager();
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
 
       plan.getJobConfigs().forEach(jobConfig -> {
           LOG.info("Killing job {}", jobConfig.getName());
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
-      super.kill(app);
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
   @Override
   public ApplicationStatus status(StreamApplication app) {
+    StreamManager streamManager = null;
     try {
       boolean hasNewJobs = false;
       boolean hasRunningJobs = false;
       ApplicationStatus unsuccessfulFinishStatus = null;
 
-      ExecutionPlan plan = getExecutionPlan(app);
+      streamManager = buildAndStartStreamManager();
+      ExecutionPlan plan = getExecutionPlan(app, streamManager);
       for (JobConfig jobConfig : plan.getJobConfigs()) {
         ApplicationStatus status = getApplicationStatus(jobConfig);
 
@@ -148,6 +160,10 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
       }
     } catch (Throwable t) {
       throw new SamzaException("Failed to get status for application", t);
+    } finally {
+      if (streamManager != null) {
+        streamManager.stop();
+      }
     }
   }
 
index 870d586..eb0ebe9 100644 (file)
@@ -93,13 +93,11 @@ public class TestApplicationRunnerMain {
 
     @Override
     public void run(StreamApplication streamApp) {
-      super.run(streamApp);
       runCount++;
     }
 
     @Override
     public void kill(StreamApplication streamApp) {
-      super.kill(streamApp);
       killCount++;
     }
 
index 595dda2..5eb139b 100644 (file)
@@ -79,12 +79,12 @@ public class TestLocalApplicationRunner {
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).getStreamManager();
+    doReturn(streamManager).when(runner).buildAndStartStreamManager();
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
     JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
@@ -103,6 +103,7 @@ public class TestLocalApplicationRunner {
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
     assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
   }
 
   @Test
@@ -115,12 +116,12 @@ public class TestLocalApplicationRunner {
     StreamApplication app = mock(StreamApplication.class);
 
     StreamManager streamManager = mock(StreamManager.class);
-    doReturn(streamManager).when(runner).getStreamManager();
+    doReturn(streamManager).when(runner).buildAndStartStreamManager();
 
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
     when(plan.getPlanAsJson()).thenReturn("");
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
@@ -147,6 +148,7 @@ public class TestLocalApplicationRunner {
     List<StreamSpec> streamSpecs = captor.getValue();
     assertEquals(streamSpecs.size(), 1);
     assertEquals(streamSpecs.get(0).getId(), "test-stream");
+    verify(streamManager).stop();
   }
 
   @Test
@@ -186,11 +188,14 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
 
+    // buildAndStartStreamManager already includes start, so not going to verify it gets called
+    StreamManager streamManager = mock(StreamManager.class);
+    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -210,6 +215,7 @@ public class TestLocalApplicationRunner {
     runner.waitForFinish();
 
     assertEquals(runner.status(app), ApplicationStatus.SuccessfulFinish);
+    verify(streamManager).stop();
   }
 
   @Test
@@ -220,11 +226,14 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner runner = spy(new LocalApplicationRunner(new MapConfig(config)));
     StreamApplication app = mock(StreamApplication.class);
 
+    // buildAndStartStreamManager already includes start, so not going to verify it gets called
+    StreamManager streamManager = mock(StreamManager.class);
+    when(runner.buildAndStartStreamManager()).thenReturn(streamManager);
     ExecutionPlan plan = mock(ExecutionPlan.class);
     when(plan.getIntermediateStreams()).thenReturn(Collections.emptyList());
     when(plan.getPlanAsJson()).thenReturn("");
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
-    doReturn(plan).when(runner).getExecutionPlan(any());
+    doReturn(plan).when(runner).getExecutionPlan(any(), eq(streamManager));
 
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
@@ -245,6 +254,7 @@ public class TestLocalApplicationRunner {
     }
 
     assertEquals(runner.status(app), ApplicationStatus.UnsuccessfulFinish);
+    verify(streamManager).stop();
   }
 
   public static Set<StreamProcessor> getProcessors(LocalApplicationRunner runner) {
index f3093a7..044c7cf 100644 (file)
@@ -120,7 +120,6 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
-    super.run(streamApp);
     Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
     appRunner.run(streamApp);
   }
@@ -128,7 +127,6 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     appRunner.kill(streamApp);
-    super.kill(streamApp);
   }
 
   @Override
index 1595347..3c11533 100644 (file)
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.execution.TestStreamManager;
@@ -101,8 +102,6 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
   private KafkaProducer producer;
   private KafkaConsumer consumer;
   protected KafkaSystemAdmin systemAdmin;
-  private StreamApplication app;
-  protected AbstractApplicationRunner runner;
 
   private int numEmptyPolls = 3;
   private static final Duration POLL_TIMEOUT_MS = Duration.ofSeconds(20);
@@ -218,25 +217,29 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
    * @param streamApplication the application to run
    * @param appName the name of the application
    * @param overriddenConfigs configs to override
+   * @return RunApplicationContext which contains objects created within runApplication, to be used for verification
+   * if necessary
    */
-  public void runApplication(StreamApplication streamApplication, String appName, Map<String, String> overriddenConfigs) {
-    Map<String, String> configs = new HashMap<>();
-    configs.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
-    configs.put("job.name", appName);
-    configs.put("app.class", streamApplication.getClass().getCanonicalName());
-    configs.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory");
-    configs.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
-    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
-    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
-    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
-    configs.put("systems.kafka.samza.key.serde", "string");
-    configs.put("systems.kafka.samza.msg.serde", "string");
-    configs.put("systems.kafka.samza.offset.default", "oldest");
-    configs.put("job.coordinator.system", "kafka");
-    configs.put("job.default.system", "kafka");
-    configs.put("job.coordinator.replication.factor", "1");
-    configs.put("task.window.ms", "1000");
-    configs.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName());
+  protected RunApplicationContext runApplication(StreamApplication streamApplication,
+      String appName,
+      Map<String, String> overriddenConfigs) {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("job.factory.class", "org.apache.samza.job.local.ThreadJobFactory");
+    configMap.put("job.name", appName);
+    configMap.put("app.class", streamApplication.getClass().getCanonicalName());
+    configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory");
+    configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
+    configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+    configMap.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configMap.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configMap.put("systems.kafka.samza.key.serde", "string");
+    configMap.put("systems.kafka.samza.msg.serde", "string");
+    configMap.put("systems.kafka.samza.offset.default", "oldest");
+    configMap.put("job.coordinator.system", "kafka");
+    configMap.put("job.default.system", "kafka");
+    configMap.put("job.coordinator.replication.factor", "1");
+    configMap.put("task.window.ms", "1000");
+    configMap.put("task.checkpoint.factory", TestStreamManager.MockCheckpointManagerFactory.class.getName());
 
     // This is to prevent tests from taking a long time to stop after they're done. The issue is that
     // tearDown currently doesn't call runner.kill(app), and shuts down the Kafka and ZK servers immediately.
@@ -247,17 +250,18 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
     // changelog streams. Hence we just force an unclean shutdown here to. This _should be_ OK
     // since the test method has already executed by the time the shutdown hook is called. The side effect is
     // that buffered state (e.g. changelog contents) might not be flushed correctly after the test run.
-    configs.put("task.shutdown.ms", "1");
+    configMap.put("task.shutdown.ms", "1");
 
     if (overriddenConfigs != null) {
-      configs.putAll(overriddenConfigs);
+      configMap.putAll(overriddenConfigs);
     }
 
-    app = streamApplication;
-    runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(new MapConfig(configs));
+    Config config = new MapConfig(configMap);
+    AbstractApplicationRunner runner = (AbstractApplicationRunner) ApplicationRunner.fromConfig(config);
     runner.run(streamApplication);
 
     StreamAssert.waitForComplete();
+    return new RunApplicationContext(runner, config);
   }
 
   public void setNumEmptyPolls(int numEmptyPolls) {
@@ -274,4 +278,26 @@ public class StreamApplicationIntegrationTestHarness extends AbstractIntegration
     consumer.close();
     super.tearDown();
   }
+
+  /**
+   * Container for any necessary context created during runApplication. Allows tests to access objects created within
+   * runApplication in order to do verification.
+   */
+  protected static class RunApplicationContext {
+    private final AbstractApplicationRunner runner;
+    private final Config config;
+
+    private RunApplicationContext(AbstractApplicationRunner runner, Config config) {
+      this.runner = runner;
+      this.config = config;
+    }
+
+    public AbstractApplicationRunner getRunner() {
+      return this.runner;
+    }
+
+    public Config getConfig() {
+      return this.config;
+    }
+  }
 }
index 5424888..a2adb70 100644 (file)
@@ -107,7 +107,7 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
     configs.put(RepartitionJoinWindowApp.INPUT_TOPIC_NAME_2_PROP, inputTopicName2);
     configs.put(RepartitionJoinWindowApp.OUTPUT_TOPIC_NAME_PROP, outputTopicName);
 
-    runApplication(app, appName, configs);
+    RunApplicationContext runApplicationContext = runApplication(app, appName, configs);
 
     // consume and validate result
     List<ConsumerRecord<String, String>> messages = consumeMessages(Collections.singletonList(outputTopicName), 2);
@@ -138,8 +138,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
       }
       Assert.assertEquals(0, remainingMessageNum);
     }
-
-
   }
 
   @Test