SAMZA-1233: Create SystemAdmin only for JobCoordinator System in SamzaTaskProxy
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Wed, 26 Apr 2017 16:05:41 +0000 (09:05 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 26 Apr 2017 16:05:41 +0000 (09:05 -0700)
Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>, Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #138 from shanthoosh/fix_samza_task_proxy

samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java

index c40c168..a412c08 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
@@ -49,6 +50,7 @@ import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.job.JobInstance;
 import org.apache.samza.storage.ChangelogPartitionManager;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
@@ -155,8 +157,18 @@ public class SamzaTaskProxy implements TaskProxy {
       changelogManager.start();
       LocalityManager localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer);
       localityManager.start();
-      return JobModelManager.readJobModel(config, changelogManager.readChangeLogPartitionMapping(), localityManager,
-                                          new StreamMetadataCache(JobModelManager.getSystemAdmins(config), 0, SystemClock.instance()), null);
+
+      String jobCoordinatorSystemName = config.get(JobConfig.JOB_COORDINATOR_SYSTEM());
+
+      /**
+       * Select job coordinator system properties from config and instantiate SystemAdmin for it alone.
+       * Instantiating SystemAdmin's for other input/output systems defined in config is unnecessary.
+       */
+      Config systemAdminConfig = config.subset(String.format("systems.%s", jobCoordinatorSystemName), false);
+      scala.collection.immutable.Map<String, SystemAdmin> systemAdmins = JobModelManager.getSystemAdmins(systemAdminConfig);
+      StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+      Map<TaskName, Integer> changeLogPartitionMapping = changelogManager.readChangeLogPartitionMapping();
+      return JobModelManager.readJobModel(config, changeLogPartitionMapping, localityManager, streamMetadataCache, null);
     } finally {
       if (coordinatorSystemConsumer != null) {
         coordinatorSystemConsumer.stop();