SAMZA-1796: PassthroughJobCoordinator doesn't create changelog streams
authorxinyuiscool <xiliu@linkedin.com>
Wed, 1 Aug 2018 23:12:06 +0000 (16:12 -0700)
committerxiliu <xiliu@linkedin.com>
Wed, 1 Aug 2018 23:12:06 +0000 (16:12 -0700)
Currently only the ClusterBasedJobCoordinator and ZkJobCoordinator are creating changelog streams. The Passthrough one should also do it.

Author: xinyuiscool <xiliu@linkedin.com>

Reviewers: Bharath K <bharathkk@gmail.com>

Closes #595 from xinyuiscool/SAMZA-1796

samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java

index 228617a..737ac3e 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.*;
@@ -81,6 +82,8 @@ public class PassthroughJobCoordinator implements JobCoordinator {
       if (checkpointManager != null) {
         checkpointManager.createResources();
       }
+
+      ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
     } catch (Exception e) {
       LOGGER.error("Exception while trying to getJobModel.", e);
       if (coordinatorListener != null) {