SAMZA-1700: Clean up SystemAdmins instance creation flows
authorCameron Lee <calee@linkedin.com>
Thu, 10 May 2018 17:45:41 +0000 (10:45 -0700)
committerPrateek Maheshwari <pmaheshw@linkedin.com>
Thu, 10 May 2018 17:45:41 +0000 (10:45 -0700)
The SystemAdmins class has a special "test" constructor for building SystemAdmins instances for unit tests. However, that test constructor has leaked into non-test code.
This makes it harder to manage all flows which use SystemAdmins. An upcoming change is to fix lifecycle management for SystemAdmins in the ApplicationRunner classes, so doing some clean up will help that future change.

Author: Cameron Lee <calee@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes #509 from cameronlee314/systemadmins_cleanup

17 files changed:
samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala

index 7e8f527..242ac67 100644 (file)
 package org.apache.samza.system;
 
 import java.util.Map;
+import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.MapConfig;
 
 
+/**
+ * Provides a mapping from system name to a {@link SystemAdmin}. Needs to be started before use and stopped after use.
+ */
 public class SystemAdmins {
   private final Map<String, SystemAdmin> systemAdminMap;
 
-  public Map<String, SystemAdmin> getSystemAdminsMap() {
-    return systemAdminMap;
-  }
-
   public SystemAdmins(Config config) {
     JavaSystemConfig systemConfig = new JavaSystemConfig(config);
     this.systemAdminMap = systemConfig.getSystemAdmins();
   }
 
-  // Used only for test
-  public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) {
-    this.systemAdminMap = systemAdminMap;
+  /**
+   * Creates a new instance of {@link SystemAdmins} with an empty admin mapping.
+   * @return New empty instance of {@link SystemAdmins}
+   */
+  public static SystemAdmins empty() {
+    return new SystemAdmins(new MapConfig());
   }
 
   public void start() {
@@ -60,4 +64,8 @@ public class SystemAdmins {
     }
     return systemAdminMap.get(systemName);
   }
+
+  public Set<String> getSystemNames() {
+    return systemAdminMap.keySet();
+  }
 }
index 4959974..ba2dfd9 100644 (file)
@@ -76,7 +76,7 @@ object OffsetManager extends Logging {
     systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
     config: Config,
     checkpointManager: CheckpointManager = null,
-    systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
+    systemAdmins: SystemAdmins = SystemAdmins.empty(),
     checkpointListeners: Map[String, CheckpointListener] = Map(),
     offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
@@ -142,7 +142,7 @@ class OffsetManager(
    * SystemAdmins that are used to get next offsets from last checkpointed
    * offsets. Map is from system name to SystemAdmin class for the system.
    */
-  val systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
+  val systemAdmins: SystemAdmins = SystemAdmins.empty(),
 
   /**
    * Map of checkpointListeners for the systems that chose to provide one.
index ad5cb9a..5380fc9 100644 (file)
@@ -192,7 +192,7 @@ object SamzaContainer extends Logging {
     info("Got system factories: %s" format systemFactories.keys)
 
     val systemAdmins = new SystemAdmins(config)
-    info("Got system admins: %s" format systemAdmins.getSystemAdminsMap().keySet())
+    info("Got system admins: %s" format systemAdmins.getSystemNames)
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
     val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams)
index ffc0c90..38e2cfa 100644 (file)
@@ -72,7 +72,7 @@ class BootstrappingChooser(
    * A map from system stream name to SystemAdmin that is used for
    * offset comparisons.
    */
-  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = SystemAdmins.empty()) extends MessageChooser with Logging {
 
   /**
    * The number of lagging partitions for each SystemStream that's behind.
index 35c68c2..9b77a8c 100644 (file)
@@ -255,7 +255,7 @@ class DefaultChooser(
    * Defines a mapping from SystemStream name to SystemAdmin.
    * This is useful for determining if a bootstrap SystemStream is caught up.
    */
-  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = SystemAdmins.empty()) extends MessageChooser with Logging {
 
   val chooser = {
     val useBatching = batchSize.isDefined
index db8ab19..664f3b1 100644 (file)
 
 package org.apache.samza.execution;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -40,20 +48,8 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestExecutionPlanner {
@@ -61,7 +57,7 @@ public class TestExecutionPlanner {
   private static final String DEFAULT_SYSTEM = "test-system";
   private static final int DEFAULT_PARTITIONS = 10;
 
-  private Map<String, SystemAdmin> systemAdmins;
+  private SystemAdmins systemAdmins;
   private StreamManager streamManager;
   private ApplicationRunner runner;
   private Config config;
@@ -231,10 +227,10 @@ public class TestExecutionPlanner {
 
     SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
-    systemAdmins = new HashMap<>();
-    systemAdmins.put("system1", systemAdmin1);
-    systemAdmins.put("system2", systemAdmin2);
-    streamManager = new StreamManager(new SystemAdmins(systemAdmins));
+    systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin("system1")).thenReturn(systemAdmin1);
+    when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
+    streamManager = new StreamManager(systemAdmins);
 
     runner = mock(ApplicationRunner.class);
     when(runner.getStreamSpec("input1")).thenReturn(input1);
index 3c2ba70..f218e89 100644 (file)
@@ -19,6 +19,9 @@
 
 package org.apache.samza.execution;
 
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
@@ -40,14 +43,9 @@ import org.apache.samza.system.SystemAdmins;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.apache.samza.execution.TestExecutionPlanner.*;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 
 public class TestJobGraphJsonGenerator {
@@ -109,12 +107,12 @@ public class TestJobGraphJsonGenerator {
     system2Map.put("input3", 32);
     system2Map.put("output2", 16);
 
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
     SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
-    systemAdmins.put("system1", systemAdmin1);
-    systemAdmins.put("system2", systemAdmin2);
-    StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins));
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin("system1")).thenReturn(systemAdmin1);
+    when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
+    StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
@@ -182,12 +180,12 @@ public class TestJobGraphJsonGenerator {
     Map<String, Integer> system2Map = new HashMap<>();
     system2Map.put("PageViewCount", 16);
 
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
     SystemAdmin systemAdmin1 = createSystemAdmin(system1Map);
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
-    systemAdmins.put("hdfs", systemAdmin1);
-    systemAdmins.put("kafka", systemAdmin2);
-    StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins));
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin("hdfs")).thenReturn(systemAdmin1);
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
+    StreamManager streamManager = new StreamManager(systemAdmins);
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     MessageStream<KV<String, PageViewEvent>> inputStream = streamGraph.getInputStream("PageView");
index ed28067..f5fa12a 100644 (file)
@@ -55,11 +55,10 @@ public class TestStreamManager {
 
     SystemAdmin admin1 = mock(SystemAdmin.class);
     SystemAdmin admin2 = mock(SystemAdmin.class);
-    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
-    sysAdmins.put(SYSTEM1, admin1);
-    sysAdmins.put(SYSTEM2, admin2);
-
-    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
+    when(systemAdmins.getSystemAdmin(SYSTEM2)).thenReturn(admin2);
+    StreamManager manager = new StreamManager(systemAdmins);
     manager.createStreams(specList);
 
     ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
@@ -74,8 +73,8 @@ public class TestStreamManager {
   @Test
   public void testGetStreamPartitionCounts() {
     SystemAdmin admin1 = mock(SystemAdmin.class);
-    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
-    sysAdmins.put(SYSTEM1, admin1);
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
 
     Map<String, SystemStreamMetadata> map = new HashMap<>();
     SystemStreamMetadata meta1 = mock(SystemStreamMetadata.class);
@@ -96,7 +95,7 @@ public class TestStreamManager {
     Set<String> streams = new HashSet<>();
     streams.add(STREAM1);
     streams.add(STREAM2);
-    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
+    StreamManager manager = new StreamManager(systemAdmins);
     Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, streams);
 
     assertTrue(counts.get(STREAM1).equals(1));
@@ -115,9 +114,9 @@ public class TestStreamManager {
   public void testClearStreamsFromPreviousRun() {
     SystemAdmin admin1 = mock(SystemAdmin.class);
     SystemAdmin admin2 = mock(SystemAdmin.class);
-    Map<String, SystemAdmin> sysAdmins = new HashMap<>();
-    sysAdmins.put(SYSTEM1, admin1);
-    sysAdmins.put(SYSTEM2, admin2);
+    SystemAdmins systemAdmins = mock(SystemAdmins.class);
+    when(systemAdmins.getSystemAdmin(SYSTEM1)).thenReturn(admin1);
+    when(systemAdmins.getSystemAdmin(SYSTEM2)).thenReturn(admin2);
 
     String runId = "123";
     Map<String, String> config = new HashMap<>();
@@ -132,7 +131,7 @@ public class TestStreamManager {
     config.put("stores.test-store.factory", "dummyfactory");
     config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2);
 
-    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
+    StreamManager manager = new StreamManager(systemAdmins);
     manager.clearStreamsFromPreviousRun(new MapConfig(config));
 
     ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
index a4dcc46..7a16099 100644 (file)
@@ -28,6 +28,7 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
@@ -60,8 +61,9 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava.asInstanceOf[java.util.Map[String, SystemAdmin]]), Map(), new OffsetManagerMetrics)
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -94,7 +96,8 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val systemAdmins = new SystemAdmins(Map("test-system" -> getSystemAdmin).asJava.asInstanceOf[java.util.Map[String, SystemAdmin]])
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
@@ -150,9 +153,10 @@ class TestOffsetManager {
     val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45").asJava)
     // Checkpoint manager only has partition 1.
     val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
     val config = new MapConfig
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava.asInstanceOf[java.util.Map[String, SystemAdmin]]))
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
     offsetManager.register(taskName1, Set(systemStreamPartition1))
     offsetManager.register(taskName2, Set(systemStreamPartition2))
@@ -252,16 +256,18 @@ class TestOffsetManager {
     val checkpointManager = getCheckpointManager1(systemStreamPartition,
                                                  new Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> "100").asJava),
                                                  taskName)
-    val systemAdmins = Map(systemName -> getSystemAdmin, systemName2->getSystemAdmin)
     val consumer = new SystemConsumerWithCheckpointCallback
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin(systemName)).thenReturn(getSystemAdmin)
+    when(systemAdmins.getSystemAdmin(systemName2)).thenReturn(getSystemAdmin)
 
     val checkpointListeners: Map[String, CheckpointListener] = if(consumer.isInstanceOf[CheckpointListener])
       Map(systemName -> consumer.asInstanceOf[CheckpointListener])
     else
       Map()
 
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager,
-                                      new SystemAdmins(systemAdmins.asJava.asInstanceOf[java.util.Map[String, SystemAdmin]]), checkpointListeners, new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins,
+      checkpointListeners, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition, systemStreamPartition2))
 
     offsetManager.start
@@ -306,8 +312,9 @@ class TestOffsetManager {
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, new SystemAdmins(systemAdmins.asJava.asInstanceOf[java.util.Map[String, SystemAdmin]]), Map(), new OffsetManagerMetrics)
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
 
index 63d58c9..c002f76 100644 (file)
@@ -23,30 +23,29 @@ import java.util
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.server.{HttpServer, JobServlet}
 import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.system._
+import org.apache.samza.system.chooser.RoundRobinChooser
 import org.apache.samza.task._
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
+import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito.when
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
-import scala.collection.JavaConverters._
-import org.mockito.Mockito.when
-
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Test
@@ -129,8 +128,9 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new SystemStreamPartition("test", "stream1", new Partition(1)),
       new SystemStreamPartition("test", "stream2", new Partition(0)),
       new SystemStreamPartition("test", "stream2", new Partition(1)))
-    val systemAdminMap = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin)
-    val metadata = new StreamMetadataCache(new SystemAdmins(systemAdminMap)).getStreamMetadata(inputStreams.map(_.getSystemStream))
+    val systemAdmins = mock[SystemAdmins]
+    when(systemAdmins.getSystemAdmin("test")).thenReturn(new SinglePartitionWithoutOffsetsSystemAdmin)
+    val metadata = new StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream))
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
     val stream1Metadata = metadata(new SystemStream("test", "stream1"))
index de1647f..4ff7848 100644 (file)
@@ -323,8 +323,8 @@ class TestTaskInstance {
     val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
-    val systemAdmin: SystemAdmin = new MockSystemAdmin
-    val systemAdmins = new SystemAdmins(Map("system" -> systemAdmin).asJava)
+    val systemAdmins = Mockito.mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("system")).thenReturn(new MockSystemAdmin)
     var result = new ListBuffer[IncomingMessageEnvelope]
 
     val task = new StreamTask {
index 95a0a11..42610ae 100644 (file)
@@ -21,33 +21,26 @@ package org.apache.samza.coordinator
 
 import java.util
 
-import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
-import org.apache.samza.job.local.ProcessJobFactory
-import org.apache.samza.job.local.ThreadJobFactory
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.util.{HttpUtil, Util}
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-
-import scala.collection.JavaConverters._
-import org.apache.samza.config.MapConfig
-import org.apache.samza.config.TaskConfig
-import org.apache.samza.config.SystemConfig
-import org.apache.samza.container.SamzaContainer
-import org.apache.samza.container.TaskName
-import org.apache.samza.system._
 import org.apache.samza.Partition
-import org.apache.samza.SamzaException
-import org.apache.samza.job.model.JobModel
-import org.apache.samza.job.model.ContainerModel
-import org.apache.samza.job.model.TaskModel
-import org.apache.samza.config.JobConfig
+import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFactory
+import org.apache.samza.config.{JobConfig, MapConfig, SystemConfig, TaskConfig}
+import org.apache.samza.container.{SamzaContainer, TaskName}
 import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, MockCoordinatorStreamSystemFactory, MockCoordinatorStreamWrappedConsumer}
 import org.apache.samza.job.MockJobFactory
+import org.apache.samza.job.local.{ProcessJobFactory, ThreadJobFactory}
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.serializers.model.SamzaObjectMapper
 import org.apache.samza.storage.ChangelogStreamManager
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system._
+import org.apache.samza.util.HttpUtil
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.{FlatSpec, PrivateMethodTester}
 
+import scala.collection.JavaConverters._
 import scala.collection.immutable
 
 
@@ -238,19 +231,14 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
   @Test
   def testWithPartitionAssignmentWithMockJobFactory {
     val config = new SystemConfig(getTestConfig(classOf[MockJobFactory]))
-
-    val systemNames = Set("test")
-
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdminMap = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-
-    val streamMetadataCache = new StreamMetadataCache(new SystemAdmins(systemAdminMap.asJava))
+    val systemStream = new SystemStream("test", "stream1")
+    val streamMetadataCache = mock(classOf[StreamMetadataCache])
+    when(streamMetadataCache.getStreamMetadata(Set(systemStream), true)).thenReturn(
+      Map(systemStream -> new SystemStreamMetadata(systemStream.getStream,
+        Map(new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
+          new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
+          new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
+        ).asJava)))
     val getInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
     val getMatchedInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
 
index 399543c..2aafab1 100644 (file)
@@ -234,7 +234,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     }
   }
 
-  class MockStreamMetadataCache extends StreamMetadataCache(new SystemAdmins(new HashMap[String, SystemAdmin])) {
+  class MockStreamMetadataCache extends StreamMetadataCache(SystemAdmins.empty()) {
     /**
      * Returns metadata about each of the given streams (such as first offset, newest
      * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
index 31d3ef6..d092577 100644 (file)
@@ -662,7 +662,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
   var changeLogSystemStreams: Map[String, SystemStream] = Map()
   var streamMetadataCache = mock[StreamMetadataCache]
   var partition: Partition = new Partition(0)
-  var systemAdmins: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin])
+  var systemAdminsMap: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin])
   var taskName: TaskName = new TaskName("testTask")
   var storeBaseDir: File = TaskStorageManagerBuilder.defaultStoreBaseDir
   var loggedStoreBaseDir: File =  TaskStorageManagerBuilder.defaultLoggedStoreBaseDir
@@ -693,7 +693,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
   }
 
   def setSystemAdmin(system: String, systemAdmin: SystemAdmin) = {
-    systemAdmins = systemAdmins ++ Map(system -> systemAdmin)
+    systemAdminsMap = systemAdminsMap ++ Map(system -> systemAdmin)
     this
   }
 
@@ -718,9 +718,17 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       nonLoggedStoreBaseDir = storeBaseDir,
       loggedStoreBaseDir = loggedStoreBaseDir,
       partition = partition,
-      systemAdmins = new SystemAdmins(systemAdmins.asJava),
+      systemAdmins = buildSystemAdmins(systemAdminsMap),
       new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs,
       SystemClock.instance
     )
   }
+
+  private def buildSystemAdmins(systemAdminsMap: Map[String, SystemAdmin]): SystemAdmins = {
+    val systemAdmins = mock[SystemAdmins]
+    systemAdminsMap.foreach { case (system, systemAdmin) =>
+      when(systemAdmins.getSystemAdmin(system)).thenReturn(systemAdmin)
+    }
+    systemAdmins
+  }
 }
index e48764b..646c883 100644 (file)
 
 package org.apache.samza.system
 
-import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.util.Clock
-import org.junit.Test
+import org.apache.samza.{Partition, SamzaException}
+import org.junit.{Before, Test}
 import org.mockito.Mockito._
-import org.scalatest.{Matchers => ScalaTestMatchers}
+import org.mockito.{Mock, MockitoAnnotations}
 import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
+import org.scalatest.{Matchers => ScalaTestMatchers}
+
 import scala.collection.JavaConverters._
 
 class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with ScalaTestMatchers {
-  def makeMetadata(streamNames: Set[String] = Set("stream"), numPartitions: Int = 4) = {
+  private val SYSTEM = "system"
+  private val OTHER_SYSTEM = "otherSystem"
+  private val cacheTTL = 500
+
+  @Mock
+  var systemAdmin: SystemAdmin = _
+  @Mock
+  var otherSystemAdmin: SystemAdmin = _
+  @Mock
+  var systemAdmins: SystemAdmins = _
+  @Mock
+  var clock: Clock = _
+  var cache: StreamMetadataCache = _
+
+  @Before
+  def setup(): Unit = {
+    MockitoAnnotations.initMocks(this)
+    when(systemAdmins.getSystemAdmin(SYSTEM)).thenReturn(systemAdmin)
+    when(systemAdmins.getSystemAdmin(OTHER_SYSTEM)).thenReturn(otherSystemAdmin)
+    cache = new StreamMetadataCache(systemAdmins, cacheTTL, clock)
+  }
+
+  private def makeMetadata(streamNames: Set[String] = Set("stream"), numPartitions: Int = 4): Map[String, SystemStreamMetadata] = {
     val partitions = (0 until numPartitions).map(partition => {
       new Partition(partition) -> new SystemStreamPartitionMetadata("oldest", "newest", "upcoming")
     }).toMap
@@ -38,82 +62,73 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
   }
 
   @Test
-  def testFetchUncachedMetadataFromSystemAdmin {
-    val systemAdmins = Map("foo" -> mock[SystemAdmin])
-    when(systemAdmins("foo").getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava)
-    val streams = Set(new SystemStream("foo", "bar"))
-    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava))
-
+  def testFetchUncachedMetadataFromSystemAdmin() {
+    when(systemAdmin.getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava)
+    val streams = Set(new SystemStream(SYSTEM, "bar"))
     val result = cache.getStreamMetadata(streams)
     streams shouldEqual result.keySet
-    result(new SystemStream("foo", "bar")).getSystemStreamPartitionMetadata.size should equal(4)
-    verify(systemAdmins("foo"), times(1)).getSystemStreamMetadata(Set("bar").asJava)
+    result(new SystemStream(SYSTEM, "bar")).getSystemStreamPartitionMetadata.size should equal(4)
+    verify(systemAdmin).getSystemStreamMetadata(Set("bar").asJava)
   }
 
   @Test
-  def testCacheExpiry {
-    val clock = mock[Clock]
-    val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava)
-    val streams = Set(new SystemStream("system", "stream"))
-    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava), clock = clock)
+  def testCacheExpiry() {
+    when(systemAdmin.getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava)
+    val streams = Set(new SystemStream(SYSTEM, "stream"))
 
     when(clock.currentTimeMillis).thenReturn(0)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream").asJava)
+    verify(systemAdmin).getSystemStreamMetadata(Set("stream").asJava)
 
-    when(clock.currentTimeMillis).thenReturn(cache.cacheTTLms / 2)
+    when(clock.currentTimeMillis).thenReturn(cacheTTL / 2)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream").asJava)
+    verify(systemAdmin).getSystemStreamMetadata(Set("stream").asJava)
 
-    when(clock.currentTimeMillis).thenReturn(2 * cache.cacheTTLms)
+    when(clock.currentTimeMillis).thenReturn(2 * cacheTTL)
     cache.getStreamMetadata(streams)
     cache.getStreamMetadata(streams)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(2)).getSystemStreamMetadata(Set("stream").asJava)
+    verify(systemAdmin, times(2)).getSystemStreamMetadata(Set("stream").asJava)
   }
 
   @Test
-  def testGroupingRequestsBySystem {
-    val systemAdmins = Map("sys1" -> mock[SystemAdmin], "sys2" -> mock[SystemAdmin])
-    when(systemAdmins("sys1").getSystemStreamMetadata(Set("stream1a", "stream1b").asJava))
+  def testGroupingRequestsBySystem() {
+    when(systemAdmin.getSystemStreamMetadata(Set("stream1a", "stream1b").asJava))
       .thenReturn(makeMetadata(Set("stream1a", "stream1b"), numPartitions = 3).asJava)
-    when(systemAdmins("sys2").getSystemStreamMetadata(Set("stream2a", "stream2b").asJava))
+    when(otherSystemAdmin.getSystemStreamMetadata(Set("stream2a", "stream2b").asJava))
       .thenReturn(makeMetadata(Set("stream2a", "stream2b"), numPartitions = 5).asJava)
     val streams = Set(
-      new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"),
-      new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b")
+      new SystemStream(SYSTEM, "stream1a"), new SystemStream(SYSTEM, "stream1b"),
+      new SystemStream(OTHER_SYSTEM, "stream2a"), new SystemStream(OTHER_SYSTEM, "stream2b")
     )
-    val result = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
+    val result = cache.getStreamMetadata(streams)
     result.keySet shouldEqual streams
     streams.foreach(stream => {
-      val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
+      val expectedPartitions = if (stream.getSystem == SYSTEM) 3 else 5
       result(stream).getSystemStreamPartitionMetadata.size shouldEqual expectedPartitions
     })
-    verify(systemAdmins("sys1"), times(1)).getSystemStreamMetadata(Set("stream1a", "stream1b").asJava)
-    verify(systemAdmins("sys2"), times(1)).getSystemStreamMetadata(Set("stream2a", "stream2b").asJava)
+    verify(systemAdmin).getSystemStreamMetadata(Set("stream1a", "stream1b").asJava)
+    verify(otherSystemAdmin).getSystemStreamMetadata(Set("stream2a", "stream2b").asJava)
   }
 
   @Test
-  def testSystemOmitsStreamFromResult {
-    val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream1", "stream2").asJava))
+  def testSystemOmitsStreamFromResult() {
+    when(systemAdmin.getSystemStreamMetadata(Set("stream1", "stream2").asJava))
       .thenReturn(makeMetadata(Set("stream1")).asJava) // metadata doesn't include stream2
-    val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2"))
+    val streams = Set(new SystemStream(SYSTEM, "stream1"), new SystemStream(SYSTEM, "stream2"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
+      cache.getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown streams")
   }
 
   @Test
-  def testSystemReturnsNullMetadata {
-    val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava))
+  def testSystemReturnsNullMetadata() {
+    when(systemAdmin.getSystemStreamMetadata(Set("stream").asJava))
       .thenReturn(Map[String, SystemStreamMetadata]("stream" -> null).asJava)
-    val streams = Set(new SystemStream("system", "stream"))
+    val streams = Set(new SystemStream(SYSTEM, "stream"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
+      cache.getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown streams")
   }
index 02791bb..a017518 100644 (file)
@@ -31,6 +31,7 @@ import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
+import org.mockito.Mockito.{mock, when}
 
 import scala.collection.JavaConverters._
 
@@ -181,12 +182,13 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
 
   @Test
   def testChooserRegisteredCorrectSsps {
-    val mock = new MockMessageChooser
+    val mockMessageChooser = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val systemAdmin: SystemAdmin = new MockSystemAdmin
-    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
-      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
+    val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), systemAdmins)
 
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, "1")
@@ -201,12 +203,13 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
 
   @Test
   def testChooserRegisterWithStreamUsedAsBootstrapAndBroadcast: Unit = {
-    val mock = new MockMessageChooser
+    val mockMessageChooser = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val systemAdmin: SystemAdmin = new MockSystemAdmin
-    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
-      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
+    val chooser = new BootstrappingChooser(mockMessageChooser, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), systemAdmins)
 
     // Envelope1 is registered by multiple tasks, each one of them having different offsets.
     chooser.register(envelope1.getSystemStreamPartition, "1")
@@ -236,8 +239,8 @@ object TestBootstrappingChooser {
   // chooser.
   @Parameters
   def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = {
-    val systemAdmin: SystemAdmin = new MockSystemAdmin
-    val systemAdmins = new SystemAdmins(Map("kafka" -> systemAdmin).asJava)
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
     Arrays.asList(
       Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) =>
         new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), systemAdmins)),
index e21bd9c..c4c702d 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.samza.system._
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito.{mock, when}
 
 import scala.collection.JavaConverters._
 
@@ -56,7 +57,8 @@ class TestDefaultChooser {
       envelope5.getSystemStreamPartition().getPartition() -> env5Metadata).asJava)
     val stream3Metadata = new SystemStreamMetadata("stream3", Map(
       envelope8.getSystemStreamPartition().getPartition() -> env8Metadata).asJava)
-    val systemAdmin: SystemAdmin = new MockSystemAdmin()
+    val systemAdmins = mock(classOf[SystemAdmins])
+    when(systemAdmins.getSystemAdmin("kafka")).thenReturn(new MockSystemAdmin)
     val chooser = new DefaultChooser(
       mock0,
       Some(2),
@@ -71,7 +73,7 @@ class TestDefaultChooser {
         envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata,
         envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata),
       new MetricsRegistryMap(),
-      new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
+      systemAdmins)
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.register(envelope2.getSystemStreamPartition, null)