SAMZA-1329: Switch SamzaTaskProxy to use LocalityManager.
[samza.git] / samza-rest / src / test / java / org / apache / samza / rest / resources / mock / MockTaskProxy.java
index de741ba..afebf1d 100644 (file)
  */
 package org.apache.samza.rest.resources.mock;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import java.util.Set;
-import org.apache.samza.Partition;
+import com.google.common.collect.ImmutableList;
+import java.util.List;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.job.model.ContainerModel;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.rest.model.Partition;
+import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.job.JobInstance;
 import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
 import org.apache.samza.rest.proxy.task.TaskResourceConfig;
-import org.apache.samza.system.SystemStreamPartition;
+import org.mockito.Mockito;
 
 
 public class MockTaskProxy extends SamzaTaskProxy {
-  public static final String SYSTEM_NAME = "testSystem";
-  public static final String STREAM_NAME = "testStream";
-  public static final Integer PARTITION_ID = 1;
-  public static final Set<SystemStreamPartition> SYSTEM_STREAM_PARTITIONS = ImmutableSet.of(
-      new SystemStreamPartition(SYSTEM_NAME, STREAM_NAME, new Partition(PARTITION_ID)));
+  public static final List<Partition> PARTITIONS = ImmutableList.of();
 
   public static final String TASK_1_NAME = "Task1";
   public static final String TASK_1_CONTAINER_ID = "1";
-  public static final Partition CHANGE_LOG_PARTITION = new Partition(0);
+  public static final String TASK_1_PREFERRED_HOST = "TASK_1_PREFERRED_HOST";
+  public static final List<String> TASK_1_STORE_NAMES = ImmutableList.of("Task1Store1", "Task1Store2");
 
   public static final String TASK_2_NAME = "Task2";
   public static final String TASK_2_CONTAINER_ID = "2";
+  public static final String TASK_2_PREFERRED_HOST = "TASK_1_PREFERRED_HOST";
+  public static final List<String> TASK_2_STORE_NAMES = ImmutableList.of("Task2Store1", "Task2Store2", "Task2Store3");
 
   public MockTaskProxy() {
     super(new TaskResourceConfig(new MapConfig()),
@@ -53,20 +49,13 @@ public class MockTaskProxy extends SamzaTaskProxy {
   }
 
   @Override
-  protected JobModel getJobModel(JobInstance jobInstance) {
-    if (jobInstance.getJobId().contains("Bad")
-        || jobInstance.getJobName().contains("Bad")) {
-      throw new IllegalArgumentException("No tasks found.");
-    }
-    TaskModel task1Model = new TaskModel(new TaskName(TASK_1_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
-    TaskModel task2Model = new TaskModel(new TaskName(TASK_2_NAME), SYSTEM_STREAM_PARTITIONS, CHANGE_LOG_PARTITION);
-    ContainerModel task1ContainerModel = new ContainerModel(TASK_1_CONTAINER_ID, 1,
-                                                            ImmutableMap.of(new TaskName(TASK_1_NAME),
-                                                                            task1Model));
-    ContainerModel task2ContainerModel = new ContainerModel(TASK_2_CONTAINER_ID, 2,
-                                                            ImmutableMap.of(new TaskName(TASK_2_NAME),
-                                                                            task2Model));
-    return new JobModel(new MapConfig(), ImmutableMap.of(TASK_1_CONTAINER_ID, task1ContainerModel,
-                                                         TASK_2_CONTAINER_ID, task2ContainerModel));
+  protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
+    return Mockito.mock(CoordinatorStreamSystemConsumer.class);
+  }
+
+  @Override
+  protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
+    return ImmutableList.of(new Task(TASK_1_PREFERRED_HOST, TASK_1_NAME, TASK_1_CONTAINER_ID, PARTITIONS, TASK_1_STORE_NAMES),
+                            new Task(TASK_2_PREFERRED_HOST, TASK_2_NAME, TASK_2_CONTAINER_ID, PARTITIONS, TASK_2_STORE_NAMES));
   }
 }