SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
[samza.git] / samza-rest / src / main / java / org / apache / samza / rest / proxy / task / SamzaTaskProxy.java
index a412c08..da7b907 100644 (file)
@@ -34,25 +34,15 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.rest.model.Partition;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.job.JobInstance;
-import org.apache.samza.storage.ChangelogPartitionManager;
-import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.util.ClassLoaderHelper;
-import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,8 +58,6 @@ public class SamzaTaskProxy implements TaskProxy {
 
   private static final MetricsRegistryMap METRICS_REGISTRY = new MetricsRegistryMap();
 
-  private static final String SOURCE = "SamzaTaskProxy";
-
   private final TaskResourceConfig taskResourceConfig;
 
   private final InstallationFinder installFinder;
@@ -85,28 +73,37 @@ public class SamzaTaskProxy implements TaskProxy {
    * {@inheritDoc}
    */
   @Override
-  public List<Task> getTasks(JobInstance jobInstance)
-      throws IOException, InterruptedException {
+  public List<Task> getTasks(JobInstance jobInstance) throws IOException, InterruptedException {
     Preconditions.checkArgument(installFinder.isInstalled(jobInstance),
                                 String.format("Invalid job instance : %s", jobInstance));
-    JobModel jobModel = getJobModel(jobInstance);
-    StorageConfig storageConfig = new StorageConfig(jobModel.getConfig());
-
-    List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
-    Map<String, String> containerLocality = jobModel.getAllContainerLocality();
-    List<Task> tasks = new ArrayList<>();
-    for (ContainerModel containerModel : jobModel.getContainers().values()) {
-      String containerId = containerModel.getProcessorId();
-      String host = containerLocality.get(containerId);
-      for (TaskModel taskModel : containerModel.getTasks().values()) {
-        String taskName = taskModel.getTaskName().getTaskName();
-        List<Partition> partitions = taskModel.getSystemStreamPartitions()
-                                              .stream()
-                                              .map(Partition::new).collect(Collectors.toList());
-        tasks.add(new Task(host, taskName, containerId, partitions, storeNames));
+    CoordinatorStreamSystemConsumer coordinatorStreamSystemConsumer = null;
+    try {
+      coordinatorStreamSystemConsumer = initializeCoordinatorStreamConsumer(jobInstance);
+      return readTasksFromCoordinatorStream(coordinatorStreamSystemConsumer);
+    } finally {
+      if (coordinatorStreamSystemConsumer != null) {
+        coordinatorStreamSystemConsumer.stop();
       }
     }
-    return tasks;
+  }
+
+  /**
+   * Initialize {@link CoordinatorStreamSystemConsumer} based upon {@link JobInstance} parameter.
+   * @param jobInstance the job instance to get CoordinatorStreamSystemConsumer for.
+   * @return built and initialized CoordinatorStreamSystemConsumer.
+   */
+  protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
+    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
+    Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
+    LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig);
+    CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
+    LOG.debug("Registering coordinator system stream consumer.");
+    consumer.register();
+    LOG.debug("Starting coordinator system stream consumer.");
+    consumer.start();
+    LOG.debug("Bootstrapping coordinator system stream consumer.");
+    consumer.bootstrap();
+    return consumer;
   }
 
   /**
@@ -129,53 +126,21 @@ public class SamzaTaskProxy implements TaskProxy {
   }
 
   /**
-   * Retrieves the jobModel from the jobCoordinator.
-   * @param jobInstance the job instance (jobId, jobName).
-   * @return the JobModel fetched from the coordinator stream.
+   * Builds list of {@link Task} from job model in coordinator stream.
+   * @param consumer system consumer associated with a job's coordinator stream.
+   * @return list of {@link Task} constructed from job model in coordinator stream.
    */
-  protected JobModel getJobModel(JobInstance jobInstance) {
-    CoordinatorStreamSystemConsumer coordinatorSystemConsumer = null;
-    CoordinatorStreamSystemProducer coordinatorSystemProducer = null;
-    try {
-      CoordinatorStreamSystemFactory coordinatorStreamSystemFactory  = new CoordinatorStreamSystemFactory();
-      Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
-      LOG.info("Using config: {} to create coordinatorStream producer and consumer.", coordinatorSystemConfig);
-      coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
-      coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, METRICS_REGISTRY);
-      LOG.info("Registering coordinator system stream consumer.");
-      coordinatorSystemConsumer.register();
-      LOG.debug("Starting coordinator system stream consumer.");
-      coordinatorSystemConsumer.start();
-      LOG.debug("Bootstrapping coordinator system stream consumer.");
-      coordinatorSystemConsumer.bootstrap();
-      LOG.info("Registering coordinator system stream producer.");
-      coordinatorSystemProducer.register(SOURCE);
-
-      Config config = coordinatorSystemConsumer.getConfig();
-      LOG.info("Got config from coordinatorSystemConsumer: {}.", config);
-      ChangelogPartitionManager changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, SOURCE);
-      changelogManager.start();
-      LocalityManager localityManager = new LocalityManager(coordinatorSystemProducer, coordinatorSystemConsumer);
-      localityManager.start();
-
-      String jobCoordinatorSystemName = config.get(JobConfig.JOB_COORDINATOR_SYSTEM());
-
-      /**
-       * Select job coordinator system properties from config and instantiate SystemAdmin for it alone.
-       * Instantiating SystemAdmin's for other input/output systems defined in config is unnecessary.
-       */
-      Config systemAdminConfig = config.subset(String.format("systems.%s", jobCoordinatorSystemName), false);
-      scala.collection.immutable.Map<String, SystemAdmin> systemAdmins = JobModelManager.getSystemAdmins(systemAdminConfig);
-      StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
-      Map<TaskName, Integer> changeLogPartitionMapping = changelogManager.readChangeLogPartitionMapping();
-      return JobModelManager.readJobModel(config, changeLogPartitionMapping, localityManager, streamMetadataCache, null);
-    } finally {
-      if (coordinatorSystemConsumer != null) {
-        coordinatorSystemConsumer.stop();
-      }
-      if (coordinatorSystemProducer != null) {
-        coordinatorSystemProducer.stop();
-      }
-    }
+  protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
+    LocalityManager localityManager = new LocalityManager(null, consumer);
+    Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
+    Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
+    StorageConfig storageConfig = new StorageConfig(consumer.getConfig());
+    List<String> storeNames = JavaConverters.seqAsJavaListConverter(storageConfig.getStoreNames()).asJava();
+    return taskNameToContainerIdMapping.entrySet()
+                                       .stream()
+                                       .map(entry -> {
+        String hostName = containerIdToHostMapping.get(entry.getValue()).get(SetContainerHostMapping.HOST_KEY);
+        return new Task(hostName, entry.getKey(), entry.getValue(), new ArrayList<>(), storeNames);
+                                       }).collect(Collectors.toList());
   }
 }