SAMZA-1548; Add start() and stop() to SystemAdmin
authorDong Lin <lindong28@gmail.com>
Fri, 26 Jan 2018 01:28:13 +0000 (17:28 -0800)
committerPrateek Maheshwari <pmaheshw@linkedin.com>
Fri, 26 Jan 2018 01:28:13 +0000 (17:28 -0800)
This patch adds start() and stop() to SystemAdmin interface. This can be useful for e.g. kafka.admin.AdminClient which needs to be started before it can be used.

Since we add this method in interface and expect AdminClient to be stateful and probably has its own thread, there will be higher cost to instantiate a new SystemAdmin. Thus we probably want to re-use the SystemAdmin instances instead of creating SystemAdmin on demand when needed. Therefore, this patch also adds SystemAdmins class to help manage a map from system to SystemAdmin, similar to the existing SystemProducers class in Samza.

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes #397 from lindong28/SAMZA-1548

48 files changed:
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala [deleted file]
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
samza-core/src/main/scala/org/apache/samza/system/chooser/DefaultChooser.scala
samza-core/src/main/scala/org/apache/samza/util/Util.scala
samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
samza-core/src/test/java/org/apache/samza/execution/TestStreamManager.java
samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java

index e765540..dce7030 100644 (file)
@@ -28,6 +28,17 @@ import java.util.Set;
  * utility methods that Samza needs in order to interact with a system.
  */
 public interface SystemAdmin {
+
+  /*
+   * Start this SystemAdmin
+   */
+  default void start() {};
+
+  /*
+   * Stop this SystemAdmin
+   */
+  default void stop() {};
+
   /**
    * Fetches the offsets for the messages immediately after the supplied offsets
    * for a group of SystemStreamPartitions.
index d1b532f..d709254 100644 (file)
@@ -29,7 +29,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.job.JobRunner;
 import org.apache.samza.job.model.ContainerModel;
@@ -111,8 +110,7 @@ public class ConfigManager {
     }
 
     this.config = config;
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
-    this.coordinatorStreamConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
+    this.coordinatorStreamConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     this.yarnUtil = new YarnUtil(rmAddress, rmPort);
   }
 
index ca3384d..2b65ae0 100644 (file)
@@ -42,12 +42,14 @@ import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlobUtils;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.LeaseBlobManager;
+import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.TableUtils;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -91,6 +93,7 @@ public class AzureJobCoordinator implements JobCoordinator {
   private RenewLeaseScheduler renewLease;
   private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
   private StreamMetadataCache streamMetadataCache = null;
+  private SystemAdmins systemAdmins = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel jobModel = null;
 
@@ -124,9 +127,12 @@ public class AzureJobCoordinator implements JobCoordinator {
 
   @Override
   public void start() {
-
     LOG.info("Starting Azure job coordinator.");
-    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+
+    // The systemAdmins should be started before streamMetadataCache can be used. And it should be stopped when this coordinator is stopped.
+    systemAdmins = new SystemAdmins(config);
+    systemAdmins.start();
+    streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
     table.addProcessorEntity(INITIAL_STATE, processorId, false);
 
     // Start scheduler for heartbeating
@@ -164,6 +170,8 @@ public class AzureJobCoordinator implements JobCoordinator {
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
+
+    systemAdmins.stop();
   }
 
   @Override
index 3d67cae..91b94f4 100644 (file)
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
-import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.PartitionChangeException;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
@@ -37,10 +35,9 @@ import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SystemClock;
-import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +134,8 @@ public class ClusterBasedJobCoordinator {
    */
   volatile private Exception coordinatorException = null;
 
+  private SystemAdmins systemAdmins = null;
+
   /**
    * Creates a new ClusterBasedJobCoordinator instance from a config. Invoke run() to actually
    * run the jobcoordinator.
@@ -153,7 +152,9 @@ public class ClusterBasedJobCoordinator {
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
     state = new SamzaApplicationState(jobModelManager);
-    partitionMonitor = getPartitionCountMonitor(config);
+    // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
+    systemAdmins = new SystemAdmins(config);
+    partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabled();
 
@@ -186,6 +187,7 @@ public class ClusterBasedJobCoordinator {
       log.info("Starting Cluster Based Job Coordinator");
 
       containerProcessManager.start();
+      systemAdmins.start();
       partitionMonitor.start();
 
       boolean isInterrupted = false;
@@ -221,6 +223,7 @@ public class ClusterBasedJobCoordinator {
 
     try {
       partitionMonitor.stop();
+      systemAdmins.stop();
       containerProcessManager.stop();
     } catch (Throwable e) {
       log.error("Exception while stopping task manager {}", e);
@@ -242,9 +245,8 @@ public class ClusterBasedJobCoordinator {
     return jobModelManager;
   }
 
-  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config) {
-    Map<String, SystemAdmin> systemAdmins = new JavaSystemConfig(config).getSystemAdmins();
-    StreamMetadataCache streamMetadata = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 0, SystemClock.instance());
+  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");
index 16e8221..65b266e 100644 (file)
@@ -139,7 +139,6 @@ public class StreamPartitionCountMonitor {
               }
             }, monitorPeriodMs, monitorPeriodMs, TimeUnit.MILLISECONDS);
           }
-
           state = State.RUNNING;
           break;
 
index c343865..4bbf452 100644 (file)
@@ -32,16 +32,19 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,18 +66,30 @@ public class CoordinatorStreamSystemConsumer {
   private final Object bootstrapLock = new Object();
   private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();
 
-  public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+  public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
+    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
+    SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry);
+
     this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
     this.systemConsumer = systemConsumer;
     this.systemAdmin = systemAdmin;
-    this.configMap = new HashMap();
+    this.configMap = new HashMap<>();
     this.isBootstrapped = false;
-    this.keySerde = keySerde;
-    this.messageSerde = messageSerde;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
+  // Used only for test
   public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
-    this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+    this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+    this.systemConsumer = systemConsumer;
+    this.systemAdmin = systemAdmin;
+    this.configMap = new HashMap<>();
+    this.isBootstrapped = false;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
   /**
@@ -124,6 +139,7 @@ public class CoordinatorStreamSystemConsumer {
     }
     log.info("Starting coordinator stream system consumer.");
     systemConsumer.start();
+    systemAdmin.start();
     isStarted = true;
   }
 
@@ -133,6 +149,7 @@ public class CoordinatorStreamSystemConsumer {
   public void stop() {
     log.info("Stopping coordinator stream system consumer.");
     systemConsumer.stop();
+    systemAdmin.stop();
     isStarted = false;
   }
 
index 36cf759..b984e73 100644 (file)
@@ -27,12 +27,15 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,21 +53,31 @@ public class CoordinatorStreamSystemProducer {
   private final SystemAdmin systemAdmin;
   private boolean isStarted;
 
-  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
-    this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+
+  public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
+    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
+    SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry);
+    this.systemStream = coordinatorSystemStream;
+    this.systemProducer = systemProducer;
+    this.systemAdmin = systemAdmin;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
-  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+  // Used only for test
+  public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
     this.systemStream = systemStream;
     this.systemProducer = systemProducer;
     this.systemAdmin = systemAdmin;
-    this.keySerde = keySerde;
-    this.messageSerde = messageSerde;
+    this.keySerde = new JsonSerde<>();
+    this.messageSerde = new JsonSerde<>();
   }
 
   /**
    * Registers a source with the underlying SystemProducer.
-   * 
+   *
    * @param source
    *          The source to register.
    */
@@ -82,6 +95,7 @@ public class CoordinatorStreamSystemProducer {
     }
     log.info("Starting coordinator stream producer.");
     systemProducer.start();
+    systemAdmin.start();
     isStarted = true;
   }
 
@@ -91,12 +105,13 @@ public class CoordinatorStreamSystemProducer {
   public void stop() {
     log.info("Stopping coordinator stream producer.");
     systemProducer.stop();
+    systemAdmin.stop();
     isStarted = false;
   }
 
   /**
    * Serialize and send a coordinator stream message.
-   * 
+   *
    * @param message
    *          The message to send.
    */
@@ -119,7 +134,7 @@ public class CoordinatorStreamSystemProducer {
   /**
    * Helper method that sends a series of SetConfig messages to the coordinator
    * stream.
-   * 
+   *
    * @param source
    *          An identifier to denote which source is sending a message. This
    *          can be any arbitrary string.
index 77594dc..daca6a0 100644 (file)
@@ -47,7 +47,7 @@ public class CoordinatorStreamWriter {
 
 
   public CoordinatorStreamWriter(Config config) {
-    coordinatorStreamSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
+    coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap());
   }
 
   /**
index 3028e5f..0a6eb83 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.samza.config.*;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.util.Util;
@@ -46,10 +47,10 @@ import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 public class StreamManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
 
-  private final Map<String, SystemAdmin> sysAdmins;
+  private final SystemAdmins systemAdmins;
 
-  public StreamManager(Map<String, SystemAdmin> sysAdmins) {
-    this.sysAdmins = sysAdmins;
+  public StreamManager(SystemAdmins systemAdmins) {
+    this.systemAdmins = systemAdmins;
   }
 
   public void createStreams(List<StreamSpec> streams) {
@@ -59,7 +60,7 @@ public class StreamManager {
 
     for (Map.Entry<String, Collection<StreamSpec>> entry : streamsGroupedBySystem.asMap().entrySet()) {
       String systemName = entry.getKey();
-      SystemAdmin systemAdmin = sysAdmins.get(systemName);
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
 
       for (StreamSpec stream : entry.getValue()) {
         LOGGER.info("Creating stream {} with partitions {} on system {}",
@@ -72,7 +73,7 @@ public class StreamManager {
   Map<String, Integer> getStreamPartitionCounts(String systemName, Set<String> streamNames) {
     Map<String, Integer> streamToPartitionCount = new HashMap<>();
 
-    SystemAdmin systemAdmin = sysAdmins.get(systemName);
+    SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemName);
     if (systemAdmin == null) {
       throw new SamzaException(String.format("System %s does not exist.", systemName));
     }
@@ -106,7 +107,7 @@ public class StreamManager {
           .collect(Collectors.toSet());
       intStreams.forEach(stream -> {
           LOGGER.info("Clear intermediate stream {} in system {}", stream.getPhysicalName(), stream.getSystemName());
-          sysAdmins.get(stream.getSystemName()).clearStream(stream);
+          systemAdmins.getSystemAdmin(stream.getSystemName()).clearStream(stream);
         });
 
       //Find checkpoint stream and clean up
@@ -126,7 +127,7 @@ public class StreamManager {
           LOGGER.info("Clear store {} changelog {}", store, changelog);
           SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
           StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1);
-          sysAdmins.get(spec.getSystemName()).clearStream(spec);
+          systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec);
         }
       }
     } catch (Exception e) {
index b8a8ca1..609b0ec 100644 (file)
@@ -23,7 +23,6 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.ApplicationConfig.ApplicationMode;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StreamConfig;
@@ -32,6 +31,7 @@ import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmins;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +50,12 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
   private static final Logger log = LoggerFactory.getLogger(AbstractApplicationRunner.class);
 
   private final StreamManager streamManager;
+  private final SystemAdmins systemAdmins;
 
   public AbstractApplicationRunner(Config config) {
     super(config);
-    this.streamManager = new StreamManager(new JavaSystemConfig(config).getSystemAdmins());
+    this.systemAdmins = new SystemAdmins(config);
+    this.streamManager = new StreamManager(systemAdmins);
   }
 
   @Override
@@ -63,6 +65,16 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner {
     return getStreamSpec(streamId, physicalName);
   }
 
+  @Override
+  public void run(StreamApplication streamApp) {
+    systemAdmins.start();
+  }
+
+  @Override
+  public void kill(StreamApplication streamApp) {
+    systemAdmins.stop();
+  }
+
   /**
    * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
    *
index e9b6bc8..5c5ee84 100644 (file)
@@ -147,6 +147,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
+      super.run(app);
       // 1. initialize and plan
       ExecutionPlan plan = getExecutionPlan(app);
 
@@ -181,6 +182,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     processors.forEach(StreamProcessor::stop);
+    super.kill(streamApp);
   }
 
   @Override
index 6750ccd..998df8b 100644 (file)
@@ -72,6 +72,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
+    super.run(streamApp);
     Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
     container = SamzaContainer$.MODULE$.apply(
index 1ead841..ea218d0 100644 (file)
@@ -25,7 +25,6 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.JobRunner;
@@ -59,6 +58,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void run(StreamApplication app) {
     try {
+      super.run(app);
       // TODO: run.id needs to be set for standalone: SAMZA-1531
       // run.id is based on current system time with the most significant bits in UUID (8 digits) to avoid collision
       String runId = String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
@@ -95,6 +95,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
           JobRunner runner = new JobRunner(jobConfig);
           runner.kill();
         });
+      super.kill(app);
     } catch (Throwable t) {
       throw new SamzaException("Failed to kill application", t);
     }
@@ -149,9 +150,7 @@ public class RemoteApplicationRunner extends AbstractApplicationRunner {
   }
 
   private Config getConfigFromPrevRun() {
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(
-        config, new MetricsRegistryMap());
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap());
     consumer.register();
     consumer.start();
     consumer.bootstrap();
index 5147169..e45b778 100644 (file)
  */
 package org.apache.samza.standalone;
 
-import org.apache.samza.SamzaException;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobModelManager;
@@ -30,15 +28,12 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Standalone Job Coordinator does not implement any leader elector module or cluster manager
@@ -111,21 +106,9 @@ public class PassthroughJobCoordinator implements JobCoordinator {
 
   @Override
   public JobModel getJobModel() {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
-    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
-    for (String systemName: systemConfig.getSystemNames()) {
-      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
-      if (systemFactoryClassName == null) {
-        LOGGER.error(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-        throw new SamzaException(String.format("A stream uses system %s, which is missing from the configuration.", systemName));
-      }
-      SystemFactory systemFactory = Util.<SystemFactory>getObj(systemFactoryClassName);
-      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
-    }
-
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(
-        Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
-
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
+    systemAdmins.start();
     String containerId = Integer.toString(config.getInt(JobConfig.PROCESSOR_ID()));
 
     /** TODO:
@@ -134,8 +117,10 @@ public class PassthroughJobCoordinator implements JobCoordinator {
      TaskNameGrouper with the LocalityManager! Hence, groupers should be a property of the jobcoordinator
      (job.coordinator.task.grouper, instead of task.systemstreampartition.grouper)
      */
-    return JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
+    JobModel jobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(), null, streamMetadataCache,
         Collections.singletonList(containerId));
+    systemAdmins.stop();
+    return jobModel;
   }
 
   @Override
index a47183e..c55f21f 100644 (file)
@@ -40,7 +40,7 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.serializers.ByteSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
@@ -61,13 +61,12 @@ public class StorageRecovery extends CommandLine {
   private Config jobConfig;
   private int maxPartitionNumber = 0;
   private File storeBaseDir = null;
-  private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<String, SystemStream>();
-  private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<String, StorageEngineFactory<?, ?>>();
-  private HashMap<String, SystemFactory> systemFactories = new HashMap<String, SystemFactory>();
-  private HashMap<String, SystemAdmin> systemAdmins = new HashMap<String, SystemAdmin>();
-  private Map<String, ContainerModel> containers = new HashMap<String, ContainerModel>();
-  private List<TaskStorageManager> taskStorageManagers = new ArrayList<TaskStorageManager>();
+  private HashMap<String, SystemStream> changeLogSystemStreams = new HashMap<>();
+  private HashMap<String, StorageEngineFactory<?, ?>> storageEngineFactories = new HashMap<>();
+  private Map<String, ContainerModel> containers = new HashMap<>();
+  private List<TaskStorageManager> taskStorageManagers = new ArrayList<>();
   private Logger log = LoggerFactory.getLogger(StorageRecovery.class);
+  private SystemAdmins systemAdmins = null;
 
   /**
    * Construct the StorageRecovery
@@ -80,6 +79,7 @@ public class StorageRecovery extends CommandLine {
   StorageRecovery(Config config, String path) {
     jobConfig = config;
     storeBaseDir = new File(path, "state");
+    systemAdmins = new SystemAdmins(config);
   }
 
   /**
@@ -90,7 +90,6 @@ public class StorageRecovery extends CommandLine {
     log.info("setting up the recovery...");
 
     getContainerModels();
-    getSystemFactoriesAndAdmins();
     getChangeLogSystemStreamsAndStorageFactories();
     getChangeLogMaxPartitionNumber();
     getTaskStorageManagers();
@@ -104,11 +103,13 @@ public class StorageRecovery extends CommandLine {
 
     log.info("start recovering...");
 
+    systemAdmins.start();
     for (TaskStorageManager taskStorageManager : taskStorageManagers) {
       taskStorageManager.init();
       taskStorageManager.stopStores();
       log.debug("restored " + taskStorageManager.toString());
     }
+    systemAdmins.stop();
 
     log.info("successfully recovered in " + storeBaseDir.toString());
   }
@@ -123,27 +124,6 @@ public class StorageRecovery extends CommandLine {
   }
 
   /**
-   * get the SystemFactories and SystemAdmins specified in the config file and
-   * put them into the maps
-   */
-  private void getSystemFactoriesAndAdmins() {
-    JavaSystemConfig systemConfig = new JavaSystemConfig(jobConfig);
-    List<String> systems = systemConfig.getSystemNames();
-
-    for (String system : systems) {
-      String systemFactory = systemConfig.getSystemFactory(system);
-      if (systemFactory == null) {
-        throw new SamzaException("A stream uses system " + system + " which is missing from the configuration.");
-      }
-      systemFactories.put(system, Util.<SystemFactory>getObj(systemFactory));
-      systemAdmins.put(system, Util.<SystemFactory>getObj(systemFactory).getAdmin(system, jobConfig));
-    }
-
-    log.info("Got system factories: " + systemFactories.keySet().toString());
-    log.info("Got system admins: " + systemAdmins.keySet().toString());
-  }
-
-  /**
    * get the changelog streams and the storage factories from the config file
    * and put them into the maps
    */
@@ -175,7 +155,8 @@ public class StorageRecovery extends CommandLine {
    * get the SystemConsumers for the stores
    */
   private HashMap<String, SystemConsumer> getStoreConsumers() {
-    HashMap<String, SystemConsumer> storeConsumers = new HashMap<String, SystemConsumer>();
+    HashMap<String, SystemConsumer> storeConsumers = new HashMap<>();
+    Map<String, SystemFactory> systemFactories = new JavaSystemConfig(jobConfig).getSystemFactories();
 
     for (Entry<String, SystemStream> entry : changeLogSystemStreams.entrySet()) {
       String storeSystem = entry.getValue().getSystem();
@@ -207,7 +188,7 @@ public class StorageRecovery extends CommandLine {
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
   private void getTaskStorageManagers() {
-    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+    StreamMetadataCache streamMetadataCache = new StreamMetadataCache(systemAdmins, 5000, SystemClock.instance());
 
     for (ContainerModel containerModel : containers.values()) {
       HashMap<String, StorageEngine> taskStores = new HashMap<String, StorageEngine>();
@@ -249,7 +230,7 @@ public class StorageRecovery extends CommandLine {
             storeBaseDir,
             storeBaseDir,
             taskModel.getChangelogPartition(),
-            Util.javaMapAsScalaMap(systemAdmins),
+            systemAdmins,
             new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(),
             new SystemClock());
 
diff --git a/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java b/samza-core/src/main/java/org/apache/samza/system/SystemAdmins.java
new file mode 100644 (file)
index 0000000..ae96b2d
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system;
+
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+
+
+public class SystemAdmins {
+  private final Map<String, SystemAdmin> systemAdminMap;
+
+  public Map<String, SystemAdmin> getSystemAdminsMap() {
+    return systemAdminMap;
+  }
+
+  public SystemAdmins(Config config) {
+    JavaSystemConfig systemConfig = new JavaSystemConfig(config);
+    this.systemAdminMap = systemConfig.getSystemAdmins();
+  }
+
+  // Used only for test
+  public SystemAdmins(Map<String, SystemAdmin> systemAdminMap) {
+    this.systemAdminMap = systemAdminMap;
+  }
+
+  public void start() {
+    for (SystemAdmin systemAdmin: systemAdminMap.values()) {
+      systemAdmin.start();
+    }
+  }
+
+  public void stop() {
+    for (SystemAdmin systemAdmin: systemAdminMap.values()) {
+      systemAdmin.stop();
+    }
+  }
+
+  public SystemAdmin getSystemAdmin(String systemName) {
+    if (!systemAdminMap.containsKey(systemName)) {
+      throw new SamzaException("Cannot get systemAdmin for system " + systemName);
+    }
+    return systemAdminMap.get(systemName);
+  }
+
+}
index f0c2ec7..801033d 100644 (file)
@@ -48,8 +48,10 @@ import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.MetricsReporterLoader;
+import org.apache.samza.util.SystemClock;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,6 +90,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final Map<String, MetricsReporter> reporters;
 
   private StreamMetadataCache streamMetadataCache = null;
+  private SystemAdmins systemAdmins = null;
   private ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
@@ -120,13 +123,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         LOG.error("Received exception from in JobCoordinator Processing!", throwable);
         stop();
       });
+    systemAdmins = new SystemAdmins(config);
+    streamMetadataCache = new StreamMetadataCache(systemAdmins, METADATA_CACHE_TTL_MS, SystemClock.instance());
   }
 
   @Override
   public void start() {
     startMetrics();
-    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
-
+    systemAdmins.start();
     zkController.register();
   }
 
@@ -144,6 +148,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
+    systemAdmins.stop();
   }
 
   private void startMetrics() {
@@ -196,7 +201,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
     if (!hasCreatedChangeLogStreams) {
-      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions);
+      JobModelManager.createChangeLogStreams(new StorageConfig(config), jobModel.maxChangeLogStreamPartitions, systemAdmins);
       hasCreatedChangeLogStreams = true;
     }
     // Assign the next version of JobModel
index 1b2ce80..4959974 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemAdmins
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -75,7 +76,7 @@ object OffsetManager extends Logging {
     systemStreamMetadata: Map[SystemStream, SystemStreamMetadata],
     config: Config,
     checkpointManager: CheckpointManager = null,
-    systemAdmins: Map[String, SystemAdmin] = Map(),
+    systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
     checkpointListeners: Map[String, CheckpointListener] = Map(),
     offsetManagerMetrics: OffsetManagerMetrics = new OffsetManagerMetrics) = {
     debug("Building offset manager for %s." format systemStreamMetadata)
@@ -141,7 +142,7 @@ class OffsetManager(
    * SystemAdmins that are used to get next offsets from last checkpointed
    * offsets. Map is from system name to SystemAdmin class for the system.
    */
-  val systemAdmins: Map[String, SystemAdmin] = Map(),
+  val systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin]),
 
   /**
    * Map of checkpointListeners for the systems that chose to provide one.
@@ -396,10 +397,7 @@ class OffsetManager(
         taskName -> {
           sspToOffsets.asScala.groupBy(_._1.getSystem).flatMap {
             case (systemName, systemStreamPartitionOffsets) =>
-              systemAdmins
-                .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
-                .getOffsetsAfter(systemStreamPartitionOffsets.asJava)
-                .asScala
+              systemAdmins.getSystemAdmin(systemName).getOffsetsAfter(systemStreamPartitionOffsets.asJava).asScala
           }
         }
       }
index f465bfc..5664754 100644 (file)
@@ -38,7 +38,7 @@ import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -60,11 +60,7 @@ object SamzaContainer extends Logging {
 
   def getLocalityManager(containerName: String, config: Config): LocalityManager = {
     val registryMap = new MetricsRegistryMap(containerName)
-    val coordinatorSystemProducer =
-      new CoordinatorStreamSystemFactory()
-        .getCoordinatorStreamSystemProducer(
-          config,
-          new SamzaContainerMetrics(containerName, registryMap).registry)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry)
     new LocalityManager(coordinatorSystemProducer)
   }
 
@@ -151,13 +147,11 @@ object SamzaContainer extends Logging {
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
       (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
     }).toMap
-
-    val systemAdmins = systemNames
-      .map(systemName => (systemName, systemFactories(systemName).getAdmin(systemName, config)))
-      .toMap
-
     info("Got system factories: %s" format systemFactories.keys)
 
+    val systemAdmins = new SystemAdmins(config)
+    info("Got system admins: %s" format systemAdmins.getSystemAdminsMap().keySet())
+
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
     val inputStreamMetadata = streamMetadataCache.getStreamMetadata(inputSystemStreams)
 
@@ -360,12 +354,9 @@ object SamzaContainer extends Logging {
     // create a map of consumers with callbacks to pass to the OffsetManager
     val checkpointListeners = consumers.filter(_._2.isInstanceOf[CheckpointListener])
       .map { case (system, consumer) => (system, consumer.asInstanceOf[CheckpointListener])}
-
     info("Got checkpointListeners : %s" format checkpointListeners)
 
-    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager,
-      systemAdmins, checkpointListeners, offsetManagerMetrics)
-
+    val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
     val dropDeserializationError = config.getDropDeserialization match {
@@ -629,6 +620,7 @@ object SamzaContainer extends Logging {
       containerContext = containerContext,
       taskInstances = taskInstances,
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
@@ -647,6 +639,7 @@ class SamzaContainer(
   containerContext: SamzaContainerContext,
   taskInstances: Map[TaskName, TaskInstance],
   runLoop: Runnable,
+  systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
@@ -686,6 +679,7 @@ class SamzaContainer(
       jmxServer = new JmxServer()
 
       startMetrics
+      startAdmins
       startOffsetManager
       startLocalityManager
       startStores
@@ -733,6 +727,7 @@ class SamzaContainer(
       shutdownOffsetManager
       shutdownMetrics
       shutdownSecurityManger
+      shutdownAdmins
 
       if (!status.equals(SamzaContainerStatus.FAILED)) {
         status = SamzaContainerStatus.STOPPED
@@ -891,6 +886,13 @@ class SamzaContainer(
     taskInstances.values.foreach(_.initTask)
   }
 
+  def startAdmins {
+    info("Starting admin multiplexer.")
+
+    systemAdmins.start
+  }
+
+
   def startProducers {
     info("Registering task instances with producers.")
 
@@ -959,6 +961,13 @@ class SamzaContainer(
     consumerMultiplexer.stop
   }
 
+  def shutdownAdmins {
+    info("Shutting down admin multiplexer.")
+
+    systemAdmins.stop
+  }
+
+
   def shutdownProducers {
     info("Shutting down producer multiplexer.")
 
index f2a5074..c7d76c2 100644 (file)
@@ -39,7 +39,7 @@ class TaskInstance(
   val taskName: TaskName,
   config: Config,
   val metrics: TaskInstanceMetrics,
-  systemAdmins: Map[String, SystemAdmin],
+  systemAdmins: SystemAdmins,
   consumerMultiplexer: SystemConsumers,
   collector: TaskInstanceCollector,
   containerContext: SamzaContainerContext,
@@ -57,7 +57,7 @@ class TaskInstance(
   val isClosableTask = task.isInstanceOf[ClosableTask]
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
-  val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
+  val context = new TaskContextImpl(taskName, metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
                                     storageManager, tableManager, jobModel, streamMetadataCache)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
@@ -258,7 +258,7 @@ class TaskInstance(
           val startingOffset = offsetManager.getStartingOffset(taskName, envelope.getSystemStreamPartition)
               .getOrElse(throw new SamzaException("No offset defined for SystemStreamPartition: %s" format envelope.getSystemStreamPartition))
           val system = envelope.getSystemStreamPartition.getSystem
-          others(system).offsetComparator(envelope.getOffset, startingOffset) match {
+          others.getSystemAdmin(system).offsetComparator(envelope.getOffset, startingOffset) match {
             case null => {
               info("offsets in " + system + " is not comparable. Set all SystemStreamPartitions to catched-up")
               ssp2CaughtupMapping(envelope.getSystemStreamPartition) = true // not comparable
index 99b1abe..4a804dd 100644 (file)
@@ -23,14 +23,10 @@ package org.apache.samza.coordinator
 import java.util
 import java.util.concurrent.atomic.AtomicReference
 
-import org.apache.samza.config.ClusterManagerConfig
-import org.apache.samza.config.JobConfig
+import org.apache.samza.config._
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.MapConfig
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.Config
-import org.apache.samza.config.StorageConfig
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
 import org.apache.samza.container.grouper.task.BalancingTaskNameGrouper
 import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
@@ -39,7 +35,6 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
@@ -79,9 +74,8 @@ object JobModelManager extends Logging {
    *                                from the coordinator stream, and instantiate a JobModelManager.
    */
   def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobModelManager = {
-    val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
-    val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
-    val coordinatorSystemProducer: CoordinatorStreamSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemConsumer: CoordinatorStreamSystemConsumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+    val coordinatorSystemProducer: CoordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
     info("Registering coordinator system stream consumer.")
     coordinatorSystemConsumer.register
     debug("Starting coordinator system stream consumer.")
@@ -103,9 +97,8 @@ object JobModelManager extends Logging {
     localityManager.start()
 
     // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = getSystemAdmins(config)
-
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins = systemAdmins, cacheTTLms = 0)
+    val systemAdmins = new SystemAdmins(config)
+    val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
     val previousChangelogPartitionMapping = changelogManager.readChangeLogPartitionMapping()
 
     val processorList = new ListBuffer[String]()
@@ -113,9 +106,8 @@ object JobModelManager extends Logging {
     for (i <- 0 until containerCount) {
       processorList += i.toString
     }
-
-    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager,
-      streamMetadataCache, processorList.toList.asJava)
+    systemAdmins.start()
+    val jobModelManager = getJobModelManager(config, previousChangelogPartitionMapping, localityManager, streamMetadataCache, processorList.toList.asJava)
     val jobModel = jobModelManager.jobModel
     // Save the changelog mapping back to the ChangelogPartitionmanager
     // newChangelogPartitionMapping is the merging of all current task:changelog
@@ -130,9 +122,10 @@ object JobModelManager extends Logging {
     info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
     changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
 
-    createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
-    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+    createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
+    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions, systemAdmins)
 
+    systemAdmins.stop()
     jobModelManager
   }
 
@@ -275,25 +268,7 @@ object JobModelManager extends Logging {
     }
   }
 
-  /**
-   * Instantiates the system admins based upon the system factory class available in {@param config}.
-   * @param config contains adequate information to instantiate the SystemAdmin.
-   * @return a map of SystemName(String) to the instantiated SystemAdmin.
-   */
-  def getSystemAdmins(config: Config) : Map[String, SystemAdmin] = {
-    val systemNames = getSystemNames(config)
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-    systemAdmins
-  }
-
-  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int) {
+  def createChangeLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins) {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
@@ -301,10 +276,7 @@ object JobModelManager extends Logging {
       .mapValues(Util.getSystemStreamFromNames(_))
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val systemAdmin = Util.getObj[SystemFactory](config
-        .getSystemFactory(systemStream.getSystem)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
-        ).getAdmin(systemStream.getSystem, config)
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
 
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogPartitions)
       if (systemAdmin.createStream(changelogSpec)) {
@@ -316,7 +288,7 @@ object JobModelManager extends Logging {
     }
   }
 
-  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = {
+  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int, systemAdmins: SystemAdmins): Unit = {
     val changeLogSystemStreams = config
       .getStoreNames
       .filter(config.getChangelogStream(_).isDefined)
@@ -326,11 +298,7 @@ object JobModelManager extends Logging {
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val accessLog = config.getAccessLogEnabled(storeName)
       if (accessLog) {
-        val systemAdmin = Util.getObj[SystemFactory](config
-          .getSystemFactory(systemStream.getSystem)
-          .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
-        ).getAdmin(systemStream.getSystem, config)
-
+        val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
         val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream),
           config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions)
         systemAdmin.createStream(accessLogSpec)
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
deleted file mode 100644 (file)
index 9283812..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.coordinator.stream
-
-import org.apache.samza.SamzaException
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.{Config, SystemConfig}
-import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.{SystemFactory, SystemStream}
-import org.apache.samza.util.Util
-
-/**
- * A helper class that does wiring for CoordinatorStreamSystemConsumer and
- * CoordinatorStreamSystemProducer. This factory should only be used in
- * situations where the underlying SystemConsumer/SystemProducer does not
- * exist.
- */
-class CoordinatorStreamSystemFactory {
-  def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
-    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
-    val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
-    new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
-  }
-
-  def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
-    val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
-    val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
-    new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
-  }
-}
\ No newline at end of file
index 0e973e9..7a250b2 100644 (file)
@@ -23,7 +23,7 @@ package org.apache.samza.job
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish}
 import org.apache.samza.metrics.MetricsRegistryMap
@@ -78,21 +78,23 @@ class JobRunner(config: Config) extends Logging {
   def run(resetJobConfig: Boolean = true) = {
     debug("config: %s" format (config))
     val jobFactory: StreamJobFactory = getJobFactory
-    val factory = new CoordinatorStreamSystemFactory
-    val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-    val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+    val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
 
     // Create the coordinator stream if it doesn't exist
     info("Creating coordinator stream")
-    val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+    val coordinatorSystemStream = Util.getCoordinatorSystemStream(config)
+    val systemFactory = Util.getCoordinatorSystemFactory(config)
     val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
     val streamName = coordinatorSystemStream.getStream
     val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+    systemAdmin.start()
     if (systemAdmin.createStream(coordinatorSpec)) {
       info("Created coordinator stream %s." format streamName)
     } else {
       info("Coordinator stream %s already exists." format streamName)
     }
+    systemAdmin.stop()
 
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
index 62dcdb0..476e215 100644 (file)
@@ -56,7 +56,7 @@ class TaskStorageManager(
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   partition: Partition,
-  systemAdmins: Map[String, SystemAdmin],
+  systemAdmins: SystemAdmins,
   changeLogDeleteRetentionsInMs: Map[String, Long],
   clock: Clock) extends Logging {
 
@@ -210,9 +210,7 @@ class TaskStorageManager(
     info("Validating change log streams: " + changeLogSystemStreams)
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
-      val systemAdmin = systemAdmins
-        .getOrElse(systemStream.getSystem,
-                   throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
       val changelogSpec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream, systemStream.getSystem, changeLogStreamPartitions)
 
       systemAdmin.validateStream(changelogSpec)
@@ -230,8 +228,7 @@ class TaskStorageManager(
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
       val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-      val admin = systemAdmins.getOrElse(systemStream.getSystem,
-        throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val admin = systemAdmins.getSystemAdmin(systemStream.getSystem)
       val consumer = storeConsumers(storeName)
 
       val offset = getStartingOffset(systemStreamPartition, admin)
@@ -334,9 +331,7 @@ class TaskStorageManager(
     debug("Persisting logged key value stores")
 
     for ((storeName, systemStream) <- changeLogSystemStreams.filterKeys(storeName => persistedStores.contains(storeName))) {
-      val systemAdmin = systemAdmins
-              .getOrElse(systemStream.getSystem,
-                         throw new SamzaException("Unable to get system admin for store " + storeName + " and system stream " + systemStream))
+      val systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem)
 
       debug("Fetching newest offset for store %s" format(storeName))
       try {
@@ -345,7 +340,7 @@ class TaskStorageManager(
           // rather than newest and oldest offsets for all SSPs. Use it if we can.
           systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new SystemStreamPartition(systemStream.getSystem, systemStream.getStream, partition), 3)
         } else {
-          val streamToMetadata = systemAdmins(systemStream.getSystem)
+          val streamToMetadata = systemAdmins.getSystemAdmin(systemStream.getSystem)
                   .getSystemStreamMetadata(Set(systemStream.getStream).asJava)
           val sspMetadata = streamToMetadata
                   .get(systemStream.getStream)
index 271279f..637858b 100644 (file)
 
 package org.apache.samza.system
 
-import org.apache.samza.config.Config
-import org.apache.samza.util.{Util, Logging, Clock, SystemClock}
+import org.apache.samza.util.{Logging, Clock, SystemClock}
 import org.apache.samza.SamzaException
 import scala.collection.JavaConverters._
-import org.apache.samza.config.SystemConfig.Config2System
 
-object StreamMetadataCache {
-  def apply(cacheTtlMs: Int = 5000, config: Config): StreamMetadataCache = {
-    val systemNames = config.getSystemNames.toSet
-    // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
-      val systemFactoryClassName = config
-        .getSystemFactory(systemName)
-        .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-      systemName -> systemFactory.getAdmin(systemName, config)
-    }).toMap
-
-    new StreamMetadataCache(systemAdmins, cacheTtlMs, SystemClock.instance)
-  }
-}
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
  * 5 seconds), so that we can make many metadata requests in quick succession without
@@ -48,7 +31,7 @@ object StreamMetadataCache {
  */
 class StreamMetadataCache (
     /** System implementations from which the actual metadata is loaded on cache miss */
-    systemAdmins: Map[String, SystemAdmin],
+    systemAdmins: SystemAdmins,
 
     /** Maximum age (in milliseconds) of a cache entry */
     val cacheTTLms: Int = 5000,
@@ -59,6 +42,7 @@ class StreamMetadataCache (
   private case class CacheEntry(metadata: SystemStreamMetadata, lastRefreshMs: Long)
   private var cache = Map[SystemStream, CacheEntry]()
   private val lock = new Object
+
   /**
    * Returns metadata about each of the given streams (such as first offset, newest
    * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
@@ -77,8 +61,7 @@ class StreamMetadataCache (
       .groupBy[String](_.getSystem)
       .flatMap {
         case (systemName, systemStreams) =>
-          val systemAdmin = systemAdmins
-            .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
+          val systemAdmin = systemAdmins.getSystemAdmin(systemName)
           val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
             systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
           } else {
index b39439d..212ec05 100644 (file)
@@ -19,7 +19,7 @@
 
 package org.apache.samza.system.chooser
 
-import org.apache.samza.SamzaException
+import java.util.HashMap
 import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsRegistry
@@ -72,7 +72,7 @@ class BootstrappingChooser(
    * A map from system stream name to SystemAdmin that is used for
    * offset comparisons.
    */
-  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
 
   /**
    * The number of lagging partitions for each SystemStream that's behind.
@@ -135,7 +135,7 @@ class BootstrappingChooser(
     wrapped.register(systemStreamPartition, offset)
 
     val system = systemStreamPartition.getSystem
-    val systemAdmin = systemAdmins.getOrElse(system, throw new SamzaException("SystemAdmin is undefined for System: %s" format system))
+    val systemAdmin = systemAdmins.getSystemAdmin(system)
     /**
      * SAMZA-1100: When a input SystemStream is consumed as both bootstrap and broadcast
      * BootstrappingChooser should record the lowest offset for each registered SystemStreamPartition.
@@ -198,8 +198,8 @@ class BootstrappingChooser(
           updatedSystemStreams += systemStream -> (updatedSystemStreams.getOrElse(systemStream, 0) - 1)
         }
 
-        // If the offset we just read is the same as the offset for the last 
-        // message (newest) in this system stream partition, then we have read 
+        // If the offset we just read is the same as the offset for the last
+        // message (newest) in this system stream partition, then we have read
         // all messages, and can mark this SSP as bootstrapped.
         checkOffset(systemStreamPartition, offset, OffsetType.NEWEST)
       }
@@ -246,7 +246,7 @@ class BootstrappingChooser(
   private def checkOffset(systemStreamPartition: SystemStreamPartition, offset: String, offsetType: OffsetType) {
     val systemStream = systemStreamPartition.getSystemStream
     val systemStreamMetadata = bootstrapStreamMetadata.getOrElse(systemStreamPartition.getSystemStream, null)
-    // Metadata for system/stream, and system/stream/partition are allowed to 
+    // Metadata for system/stream, and system/stream/partition are allowed to
     // be null since not all streams are bootstrap streams.
     val systemStreamPartitionMetadata = if (systemStreamMetadata != null) {
       systemStreamMetadata
@@ -256,8 +256,8 @@ class BootstrappingChooser(
       null
     }
     val offsetToCheck = if (systemStreamPartitionMetadata == null) {
-      // Use null for offsetToCheck in cases where the partition metadata was 
-      // null. A null partition metadata implies that the stream is not a 
+      // Use null for offsetToCheck in cases where the partition metadata was
+      // null. A null partition metadata implies that the stream is not a
       // bootstrap stream, and therefore, there is no need to check its offset.
       null
     } else {
@@ -266,8 +266,8 @@ class BootstrappingChooser(
 
     trace("Check %s offset %s against %s for %s." format (offsetType, offset, offsetToCheck, systemStreamPartition))
 
-    // The SSP is no longer lagging if the envelope's offset equals the 
-    // latest offset. 
+    // The SSP is no longer lagging if the envelope's offset equals the
+    // latest offset.
     if (offset != null && offset.equals(offsetToCheck)) {
       laggingSystemStreamPartitions -= systemStreamPartition
       systemStreamLagCounts += systemStream -> (systemStreamLagCounts(systemStream) - 1)
@@ -277,7 +277,7 @@ class BootstrappingChooser(
       if (systemStreamLagCounts(systemStream) == 0) {
         info("Bootstrap stream is fully caught up: %s" format systemStream)
 
-        // If the lag count is 0, then no partition for this stream is lagging 
+        // If the lag count is 0, then no partition for this stream is lagging
         // (the stream has been fully caught up).
         systemStreamLagCounts -= systemStream
       }
index c0805c4..35c68c2 100644 (file)
@@ -22,14 +22,18 @@ package org.apache.samza.system.chooser
 import org.apache.samza.SamzaException
 import org.apache.samza.config.{Config, DefaultChooserConfig, TaskConfigJava}
 import org.apache.samza.metrics.{MetricsRegistry, MetricsRegistryMap}
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemAdmin, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system._
 import org.apache.samza.util.Logging
-
+import java.util.HashMap
 import scala.collection.JavaConverters._
 
 
 object DefaultChooser extends Logging {
-  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata], chooserFactory: MessageChooserFactory, config: Config, registry: MetricsRegistry, systemAdmins: Map[String, SystemAdmin]) = {
+  def apply(inputStreamMetadata: Map[SystemStream, SystemStreamMetadata],
+            chooserFactory: MessageChooserFactory,
+            config: Config,
+            registry: MetricsRegistry,
+            systemAdmins: SystemAdmins) = {
     val chooserConfig = new DefaultChooserConfig(config)
     val batchSize = if (chooserConfig.getChooserBatchSize > 0) Some(chooserConfig.getChooserBatchSize) else None
 
@@ -251,7 +255,7 @@ class DefaultChooser(
    * Defines a mapping from SystemStream name to SystemAdmin.
    * This is useful for determining if a bootstrap SystemStream is caught up.
    */
-  systemAdmins: Map[String, SystemAdmin] = Map()) extends MessageChooser with Logging {
+  systemAdmins: SystemAdmins = new SystemAdmins(new HashMap[String, SystemAdmin])) extends MessageChooser with Logging {
 
   val chooser = {
     val useBatching = batchSize.isDefined
index cc2a097..ea23760 100644 (file)
@@ -223,20 +223,28 @@ object Util extends Logging {
   }
 
   /**
-   * Get the coordinator system and system factory from the configuration
+   * Get the coordinator system stream from the configuration
    * @param config
    * @return
    */
-  def getCoordinatorSystemStreamAndFactory(config: Config) = {
+  def getCoordinatorSystemStream(config: Config) = {
     val systemName = config.getCoordinatorSystemName
     val (jobName, jobId) = Util.getJobNameAndId(config)
     val streamName = Util.getCoordinatorStreamName(jobName, jobId)
-    val coordinatorSystemStream = new SystemStream(systemName, streamName)
+    new SystemStream(systemName, streamName)
+  }
+
+  /**
+    * Get the coordinator system factory from the configuration
+    * @param config
+    * @return
+    */
+  def getCoordinatorSystemFactory(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
     val systemFactoryClassName = config
       .getSystemFactory(systemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
-    (coordinatorSystemStream, systemFactory)
+    Util.getObj[SystemFactory](systemFactoryClassName)
   }
 
   /**
index 264966d..877adc5 100644 (file)
@@ -27,7 +27,6 @@ import org.apache.samza.Partition;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.MockSystemFactory;
@@ -72,8 +71,7 @@ public class TestClusterBasedJobCoordinator {
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
-    CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+    CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
     ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
@@ -91,8 +89,7 @@ public class TestClusterBasedJobCoordinator {
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
-    CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
-    CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
+    CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
     producer.writeConfig("test-job", config);
 
     ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
index 0a3e9c8..db8ab19 100644 (file)
@@ -34,6 +34,7 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.junit.Before;
@@ -233,7 +234,7 @@ public class TestExecutionPlanner {
     systemAdmins = new HashMap<>();
     systemAdmins.put("system1", systemAdmin1);
     systemAdmins.put("system2", systemAdmin2);
-    streamManager = new StreamManager(systemAdmins);
+    streamManager = new StreamManager(new SystemAdmins(systemAdmins));
 
     runner = mock(ApplicationRunner.class);
     when(runner.getStreamSpec("input1")).thenReturn(input1);
index b48c82d..3c2ba70 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Test;
 
@@ -113,7 +114,7 @@ public class TestJobGraphJsonGenerator {
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
     systemAdmins.put("system1", systemAdmin1);
     systemAdmins.put("system2", systemAdmin2);
-    StreamManager streamManager = new StreamManager(systemAdmins);
+    StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins));
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
@@ -186,7 +187,7 @@ public class TestJobGraphJsonGenerator {
     SystemAdmin systemAdmin2 = createSystemAdmin(system2Map);
     systemAdmins.put("hdfs", systemAdmin1);
     systemAdmins.put("kafka", systemAdmin2);
-    StreamManager streamManager = new StreamManager(systemAdmins);
+    StreamManager streamManager = new StreamManager(new SystemAdmins(systemAdmins));
 
     StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
     MessageStream<KV<String, PageViewEvent>> inputStream = streamGraph.getInputStream("PageView");
index dc36df8..ed28067 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
@@ -58,7 +59,7 @@ public class TestStreamManager {
     sysAdmins.put(SYSTEM1, admin1);
     sysAdmins.put(SYSTEM2, admin2);
 
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     manager.createStreams(specList);
 
     ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
@@ -95,7 +96,7 @@ public class TestStreamManager {
     Set<String> streams = new HashSet<>();
     streams.add(STREAM1);
     streams.add(STREAM2);
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     Map<String, Integer> counts = manager.getStreamPartitionCounts(SYSTEM1, streams);
 
     assertTrue(counts.get(STREAM1).equals(1));
@@ -131,7 +132,7 @@ public class TestStreamManager {
     config.put("stores.test-store.factory", "dummyfactory");
     config.put("stores.test-store.changelog", SYSTEM2 + "." + STREAM2);
 
-    StreamManager manager = new StreamManager(sysAdmins);
+    StreamManager manager = new StreamManager(new SystemAdmins(sysAdmins));
     manager.clearStreamsFromPreviousRun(new MapConfig(config));
 
     ArgumentCaptor<StreamSpec> captor = ArgumentCaptor.forClass(StreamSpec.class);
index eb0ebe9..870d586 100644 (file)
@@ -93,11 +93,13 @@ public class TestApplicationRunnerMain {
 
     @Override
     public void run(StreamApplication streamApp) {
+      super.run(streamApp);
       runCount++;
     }
 
     @Override
     public void kill(StreamApplication streamApp) {
+      super.kill(streamApp);
       killCount++;
     }
 
index a4f2328..64c0088 100644 (file)
 package org.apache.samza.checkpoint
 
 import java.util
-import java.util.Collections
-import java.util.Collections.EmptyMap
-
 import org.apache.samza.container.TaskName
 import org.apache.samza.Partition
 import org.apache.samza.system._
 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
 import org.junit.Assert._
-import org.junit.{Ignore, Test}
+import org.junit.Test
 import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.scalatest.Assertions.intercept
@@ -64,7 +61,7 @@ class TestOffsetManager {
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
     assertTrue(checkpointManager.isStarted)
@@ -97,7 +94,7 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val systemAdmins = Map("test-system" -> getSystemAdmin)
+    val systemAdmins = new SystemAdmins(Map("test-system" -> getSystemAdmin).asJava)
     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
@@ -155,7 +152,7 @@ class TestOffsetManager {
     val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
     val config = new MapConfig
-    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+    val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, new SystemAdmins(systemAdmins.asJava))
     // Register both partitions. Partition 2 shouldn't have a checkpoint.
     offsetManager.register(taskName1, Set(systemStreamPartition1))
     offsetManager.register(taskName2, Set(systemStreamPartition2))
@@ -264,7 +261,7 @@ class TestOffsetManager {
       Map()
 
     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager,
-                                      systemAdmins, checkpointListeners, new OffsetManagerMetrics)
+                                      new SystemAdmins(systemAdmins.asJava), checkpointListeners, new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition, systemStreamPartition2))
 
     offsetManager.start
@@ -310,7 +307,7 @@ class TestOffsetManager {
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
+    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, new SystemAdmins(systemAdmins.asJava), Map(), new OffsetManagerMetrics)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
 
@@ -376,7 +373,7 @@ class TestOffsetManager {
     }
   }
 
-  private def getSystemAdmin = {
+  private def getSystemAdmin: SystemAdmin = {
     new SystemAdmin {
       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
         offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
index 8ff6e88..63d58c9 100644 (file)
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.{SamzaContainerStatus, Partition}
+import org.apache.samza.{Partition, SamzaContainerStatus}
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.config.{Config, MapConfig}
 import org.apache.samza.coordinator.JobModelManager
@@ -33,8 +33,8 @@ import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.storage.TaskStorageManager
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemConsumer, SystemConsumers, SystemProducer, SystemProducers, SystemStream, SystemStreamPartition}
-import org.apache.samza.task.{ClosableTask, InitableTask, MessageCollector, StreamTask, TaskContext, TaskCoordinator, TaskInstanceCollector}
+import org.apache.samza.system._
+import org.apache.samza.task._
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.junit.Assert._
 import org.junit.Test
@@ -45,6 +45,7 @@ import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
 import org.mockito.Mockito.when
+
 import scala.collection.JavaConversions._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
@@ -128,8 +129,8 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new SystemStreamPartition("test", "stream1", new Partition(1)),
       new SystemStreamPartition("test", "stream2", new Partition(0)),
       new SystemStreamPartition("test", "stream2", new Partition(1)))
-    val systemAdmins = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin)
-    val metadata = new StreamMetadataCache(systemAdmins).getStreamMetadata(inputStreams.map(_.getSystemStream).toSet)
+    val systemAdminMap = Map("test" -> new SinglePartitionWithoutOffsetsSystemAdmin)
+    val metadata = new StreamMetadataCache(new SystemAdmins(systemAdminMap)).getStreamMetadata(inputStreams.map(_.getSystemStream))
     assertNotNull(metadata)
     assertEquals(2, metadata.size)
     val stream1Metadata = metadata(new SystemStream("test", "stream1"))
@@ -158,6 +159,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -190,6 +192,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -239,6 +242,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -270,6 +274,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -319,6 +324,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -351,6 +357,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = runLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -398,6 +405,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -433,6 +441,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -474,6 +483,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -509,6 +519,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunLoop,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
@@ -542,6 +553,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     }
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val systemAdmins = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -576,6 +588,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = null,
+      systemAdmins = systemAdmins,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = containerMetrics)
index c45c5a1..de1647f 100644 (file)
@@ -49,6 +49,7 @@ import org.scalatest.Assertions.intercept
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
+import scala.collection.JavaConverters._
 
 class TestTaskInstance {
   @Test
@@ -322,7 +323,8 @@ class TestTaskInstance {
     val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
-    val systemAdmins = Map("system" -> new MockSystemAdmin)
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val systemAdmins = new SystemAdmins(Map("system" -> systemAdmin).asJava)
     var result = new ListBuffer[IncomingMessageEnvelope]
 
     val task = new StreamTask {
index f092d75..3b65a62 100644 (file)
@@ -240,7 +240,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val systemNames = Set("test")
 
     // Map the name of each system to the corresponding SystemAdmin
-    val systemAdmins = systemNames.map(systemName => {
+    val systemAdminMap = systemNames.map(systemName => {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
@@ -248,7 +248,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       systemName -> systemFactory.getAdmin(systemName, config)
     }).toMap
 
-    val streamMetadataCache = new StreamMetadataCache(systemAdmins)
+    val streamMetadataCache = new StreamMetadataCache(new SystemAdmins(systemAdminMap.asJava))
     val getInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getInputStreamPartitions)
     val getMatchedInputStreamPartitions = PrivateMethod[immutable.Set[Any]]('getMatchedInputStreamPartitions)
 
index c7eab3b..b66cd64 100644 (file)
 package org.apache.samza.coordinator
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-
+import java.util.HashMap
 import org.apache.samza.Partition
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{StreamMetadataCache, SystemAdmin, SystemStream, SystemStreamMetadata}
+import org.apache.samza.system._
 import org.junit.Assert._
 import org.junit.Test
 import org.mockito.Matchers
@@ -234,7 +234,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     }
   }
 
-  class MockStreamMetadataCache extends StreamMetadataCache(Map[String, SystemAdmin]()) {
+  class MockStreamMetadataCache extends StreamMetadataCache(new SystemAdmins(new HashMap[String, SystemAdmin])) {
     /**
      * Returns metadata about each of the given streams (such as first offset, newest
      * offset, etc). If the metadata isn't in the cache, it is retrieved from the systems
index b239119..0b951f4 100644 (file)
@@ -21,11 +21,11 @@ package org.apache.samza.processor
 import java.util.Collections
 
 import org.apache.samza.config.MapConfig
-import org.apache.samza.container.{SamzaContainerListener, RunLoop, SamzaContainer, SamzaContainerContext, SamzaContainerMetrics, TaskInstance, TaskInstanceMetrics, TaskName}
+import org.apache.samza.container._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.system.chooser.RoundRobinChooser
-import org.apache.samza.system.{SystemConsumer, SystemConsumers, SystemProducer, SystemProducers}
+import org.apache.samza.system._
 import org.apache.samza.task.{StreamTask, TaskInstanceCollector}
 
 
@@ -33,6 +33,7 @@ object StreamProcessorTestUtils {
   def getDummyContainer(mockRunloop: RunLoop, streamTask: StreamTask) = {
     val config = new MapConfig
     val taskName = new TaskName("taskName")
+    val adminMultiplexer = new SystemAdmins(config)
     val consumerMultiplexer = new SystemConsumers(
       new RoundRobinChooser,
       Map[String, SystemConsumer]())
@@ -56,6 +57,7 @@ object StreamProcessorTestUtils {
       containerContext = containerContext,
       taskInstances = Map(taskName -> taskInstance),
       runLoop = mockRunloop,
+      systemAdmins = adminMultiplexer,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics)
index 29f6eb7..90a4c01 100644 (file)
@@ -718,7 +718,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       storeBaseDir = storeBaseDir,
       loggedStoreBaseDir = loggedStoreBaseDir,
       partition = partition,
-      systemAdmins = systemAdmins,
+      systemAdmins = new SystemAdmins(systemAdmins.asJava),
       new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs,
       SystemClock.instance
     )
index f55e4bf..e48764b 100644 (file)
@@ -42,7 +42,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
     val systemAdmins = Map("foo" -> mock[SystemAdmin])
     when(systemAdmins("foo").getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava)
     val streams = Set(new SystemStream("foo", "bar"))
-    val cache = new StreamMetadataCache(systemAdmins)
+    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava))
 
     val result = cache.getStreamMetadata(streams)
     streams shouldEqual result.keySet
@@ -56,7 +56,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
     val systemAdmins = Map("system" -> mock[SystemAdmin])
     when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava)
     val streams = Set(new SystemStream("system", "stream"))
-    val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = clock)
+    val cache = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava), clock = clock)
 
     when(clock.currentTimeMillis).thenReturn(0)
     cache.getStreamMetadata(streams)
@@ -84,7 +84,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
       new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"),
       new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b")
     )
-    val result = new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+    val result = new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     result.keySet shouldEqual streams
     streams.foreach(stream => {
       val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
@@ -101,7 +101,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
       .thenReturn(makeMetadata(Set("stream1")).asJava) // metadata doesn't include stream2
     val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+      new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown streams")
   }
@@ -113,7 +113,7 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
       .thenReturn(Map[String, SystemStreamMetadata]("stream" -> null).asJava)
     val streams = Set(new SystemStream("system", "stream"))
     val exception = intercept[SamzaException] {
-      new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
+      new StreamMetadataCache(new SystemAdmins(systemAdmins.asJava)).getStreamMetadata(streams)
     }
     exception.getMessage should startWith ("Cannot get metadata for unknown streams")
   }
index 3c07545..e56206a 100644 (file)
@@ -21,13 +21,10 @@ package org.apache.samza.system.chooser
 
 import java.util.Arrays
 
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system._
 import org.apache.samza.Partition
 import org.apache.samza.container.MockSystemAdmin
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.junit.Assert._
 import org.junit.Test
@@ -187,7 +184,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val mock = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
 
     chooser.register(envelope1.getSystemStreamPartition, "1")
     chooser.register(envelope2.getSystemStreamPartition, "1")
@@ -205,7 +204,9 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
     val mock = new MockMessageChooser
     val metadata1 = getMetadata(envelope1, "123")
     val metadata2 = getMetadata(envelope2, "321")
-    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1, envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val chooser = new BootstrappingChooser(mock, Map(envelope1.getSystemStreamPartition.getSystemStream -> metadata1,
+      envelope2.getSystemStreamPartition.getSystemStream -> metadata2), new BootstrappingChooserMetrics(), new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
 
     // Envelope1 is registered by multiple tasks, each one of them having different offsets.
     chooser.register(envelope1.getSystemStreamPartition, "1")
@@ -234,7 +235,13 @@ object TestBootstrappingChooser {
   // just batch size defined should behave just like plain vanilla batching
   // chooser.
   @Parameters
-  def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = Arrays.asList(
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), Map("kafka" -> new MockSystemAdmin))),
-    Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) => new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = Map("kafka" -> new MockSystemAdmin))))
+  def parameters: java.util.Collection[Array[(MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser]] = {
+    val systemAdmin: SystemAdmin = new MockSystemAdmin
+    val systemAdmins = new SystemAdmins(Map("kafka" -> systemAdmin).asJava)
+    Arrays.asList(
+      Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) =>
+        new BootstrappingChooser(wrapped, bootstrapStreamMetadata, new BootstrappingChooserMetrics(), systemAdmins)),
+      Array((wrapped: MessageChooser, bootstrapStreamMetadata: Map[SystemStream, SystemStreamMetadata]) =>
+        new DefaultChooser(wrapped, bootstrapStreamMetadata = bootstrapStreamMetadata, registry = new MetricsRegistryMap(), systemAdmins = systemAdmins)))
+  }
 }
index b873762..df5282c 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.config.{DefaultChooserConfig, MapConfig}
 import org.apache.samza.container.MockSystemAdmin
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{IncomingMessageEnvelope, SystemStream, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.system._
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.junit.Assert._
 import org.junit.Test
@@ -56,6 +56,7 @@ class TestDefaultChooser {
       envelope5.getSystemStreamPartition().getPartition() -> env5Metadata).asJava)
     val stream3Metadata = new SystemStreamMetadata("stream3", Map(
       envelope8.getSystemStreamPartition().getPartition() -> env8Metadata).asJava)
+    val systemAdmin: SystemAdmin = new MockSystemAdmin()
     val chooser = new DefaultChooser(
       mock0,
       Some(2),
@@ -70,7 +71,7 @@ class TestDefaultChooser {
         envelope1.getSystemStreamPartition.getSystemStream -> streamMetadata,
         envelope8.getSystemStreamPartition.getSystemStream -> stream3Metadata),
       new MetricsRegistryMap(),
-      Map("kafka" -> new MockSystemAdmin()))
+      new SystemAdmins(Map("kafka" -> systemAdmin).asJava))
 
     chooser.register(envelope1.getSystemStreamPartition, null)
     chooser.register(envelope2.getSystemStreamPartition, null)
index ca138c7..50d22b1 100644 (file)
@@ -77,6 +77,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
     Preconditions.checkNotNull(systemConsumer)
     Preconditions.checkNotNull(systemAdmin)
 
+    systemAdmin.start()
+
     info(s"Creating checkpoint stream: ${checkpointSpec.getPhysicalName} with " +
       s"partition count: ${checkpointSpec.getPartitionCount}")
     systemAdmin.createStream(checkpointSpec)
@@ -171,6 +173,8 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   }
 
   override def stop = {
+    systemAdmin.stop()
+
     if (systemProducer != null) {
       systemProducer.stop
     } else {
index a9a9bd7..1f4672d 100644 (file)
@@ -148,6 +148,17 @@ class KafkaSystemAdmin(
 
   import KafkaSystemAdmin._
 
+  @volatile var running = false
+
+  override def start() = {
+    running = true
+  }
+
+  override def stop() = {
+    running = false
+  }
+
+
   override def getSystemStreamPartitionCounts(streams: util.Set[String], cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
     getSystemStreamPartitionCounts(streams, new ExponentialSleepStrategy(initialDelayMs = 500), cacheTTL)
   }
index 3530713..3a1ffe9 100644 (file)
@@ -144,6 +144,7 @@ private[kafka] class KafkaSystemConsumer(
       }
     }
 
+    systemAdmin.start()
     refreshBrokers
   }
 
@@ -161,6 +162,7 @@ private[kafka] class KafkaSystemConsumer(
   }
 
   def stop() {
+    systemAdmin.stop()
     brokerProxies.values.foreach(_.stop)
   }
 
index da7b907..4d52877 100644 (file)
@@ -35,7 +35,6 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.rest.model.Task;
@@ -93,10 +92,9 @@ public class SamzaTaskProxy implements TaskProxy {
    * @return built and initialized CoordinatorStreamSystemConsumer.
    */
   protected CoordinatorStreamSystemConsumer initializeCoordinatorStreamConsumer(JobInstance jobInstance) {
-    CoordinatorStreamSystemFactory coordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory();
     Config coordinatorSystemConfig = getCoordinatorSystemConfig(jobInstance);
     LOG.debug("Using config: {} to create coordinatorStream consumer.", coordinatorSystemConfig);
-    CoordinatorStreamSystemConsumer consumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(coordinatorSystemConfig, METRICS_REGISTRY);
     LOG.debug("Registering coordinator system stream consumer.");
     consumer.register();
     LOG.debug("Starting coordinator system stream consumer.");
index 5215f7e..d432be7 100644 (file)
@@ -120,6 +120,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
+    super.run(streamApp);
     Validate.isInstanceOf(SamzaSqlApplication.class, streamApp);
     appRunner.run(streamApp);
   }
@@ -127,6 +128,7 @@ public class SamzaSqlApplicationRunner extends AbstractApplicationRunner {
   @Override
   public void kill(StreamApplication streamApp) {
     appRunner.kill(streamApp);
+    super.kill(streamApp);
   }
 
   @Override