SAMZA-570: Enabling auto-discovery of regex input topics
authorRay Matharu <rmatharu@linkedin.com>
Sat, 1 Dec 2018 01:06:30 +0000 (17:06 -0800)
committerJagadish <jvenkatraman@linkedin.com>
Sat, 1 Dec 2018 01:06:30 +0000 (17:06 -0800)
This PR makes the following changes

* Enriches StreamPartitionCountMonitor to periodically monitor input-regexes to match to actual inputs and stop the job when a new input stream is discovered.

* Add a new API to SysAdmin to allow listing of all streams, e.g., Kafka-topics. KafkaSysAdmin implementation of this uses KafkaConsumer's listTopics API. (Even if listTopics had 1 million topics with 100 bytes per topic total, temporary memory overhead will be 100 MB).

* Added config job.coordinator.monitor-input-regex.frequency.ms for the monitoring frequency, and job.coordinator.monitor-input-regex.%s for each input system. Users can then choose desired regex for each input system, e.g., job.coordinator.monitor-input-regex.kafka=test-.*.

* We can later enrich RegexTopicGen rewriter to add a monitor-input-regex config to allow periodic jonitoring

* Tested: Unit test for SPCM and tested with test jobs on local grid.

Author: Ray Matharu <rmatharu@linkedin.com>

Reviewers: Jagadish<jagadish@apache.org>

Closes #796 from rmatharu/newtopic-test

13 files changed:
samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java
samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/PartitionChangeException.java [moved from samza-core/src/main/java/org/apache/samza/PartitionChangeException.java with 93% similarity]
samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala

index 6ee7df2..8201b3d 100644 (file)
@@ -156,4 +156,12 @@ public interface SystemAdmin {
     return getSystemStreamMetadata(streamNames);
   }
 
+  /**
+   * Fetch the set of all available streams
+   * @return The set of all available SystemStreams.
+   */
+  default Set<SystemStream> getAllSystemStreams() {
+    throw new UnsupportedOperationException();
+  }
+
 }
index ff70df0..4c5a34b 100644 (file)
 package org.apache.samza.clustermanager;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
 import org.apache.samza.SamzaException;
-import org.apache.samza.PartitionChangeException;
 import org.apache.samza.checkpoint.CheckpointManager;
 import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.Config;
@@ -33,8 +36,11 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.InputStreamsDiscoveredException;
 import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.PartitionChangeException;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.StreamRegexMonitor;
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
@@ -49,9 +55,9 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.SystemClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
 
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Implements a JobCoordinator that is completely independent of the underlying cluster
@@ -134,7 +140,12 @@ public class ClusterBasedJobCoordinator {
   /**
    * The input topic partition count monitor
    */
-  private final StreamPartitionCountMonitor partitionMonitor;
+  private final Optional<StreamPartitionCountMonitor> partitionMonitor;
+
+  /**
+   * The input stream regex monitor
+   */
+  private final Optional<StreamRegexMonitor> inputStreamRegexMonitor;
 
   /**
    * Metrics to track stats around container failures, needed containers etc.
@@ -174,7 +185,8 @@ public class ClusterBasedJobCoordinator {
 
     // build a JobModelManager and ChangelogStreamManager and perform partition assignments.
     changelogStreamManager = new ChangelogStreamManager(coordinatorStreamManager);
-    jobModelManager = JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
+    jobModelManager =
+        JobModelManager.apply(coordinatorStreamManager.getConfig(), changelogStreamManager.readPartitionMapping());
 
     config = jobModelManager.jobModel().getConfig();
     hasDurableStores = new StorageConfig(config).hasDurableStores();
@@ -182,6 +194,7 @@ public class ClusterBasedJobCoordinator {
     // The systemAdmins should be started before partitionMonitor can be used. And it should be stopped when this coordinator is stopped.
     systemAdmins = new SystemAdmins(config);
     partitionMonitor = getPartitionCountMonitor(config, systemAdmins);
+    inputStreamRegexMonitor = getInputRegexMonitor(config, systemAdmins);
     clusterManagerConfig = new ClusterManagerConfig(config);
     isJmxEnabled = clusterManagerConfig.getJmxEnabledOnJobCoordinator();
 
@@ -226,7 +239,7 @@ public class ClusterBasedJobCoordinator {
 
       Map<TaskName, Integer> taskPartitionMappings = new HashMap<>();
       Map<String, ContainerModel> containers = jobModel.getContainers();
-      for (ContainerModel containerModel: containers.values()) {
+      for (ContainerModel containerModel : containers.values()) {
         for (TaskModel taskModel : containerModel.getTasks().values()) {
           taskPartitionMappings.put(taskModel.getTaskName(), taskModel.getChangelogPartition().getPartitionId());
         }
@@ -236,7 +249,8 @@ public class ClusterBasedJobCoordinator {
 
       containerProcessManager.start();
       systemAdmins.start();
-      partitionMonitor.start();
+      partitionMonitor.ifPresent(monitor -> monitor.start());
+      inputStreamRegexMonitor.ifPresent(monitor -> monitor.start());
 
       boolean isInterrupted = false;
 
@@ -270,7 +284,8 @@ public class ClusterBasedJobCoordinator {
   private void onShutDown() {
 
     try {
-      partitionMonitor.stop();
+      partitionMonitor.ifPresent(monitor -> monitor.stop());
+      inputStreamRegexMonitor.ifPresent(monitor -> monitor.stop());
       systemAdmins.stop();
       containerProcessManager.stop();
       coordinatorStreamManager.stop();
@@ -289,26 +304,63 @@ public class ClusterBasedJobCoordinator {
     }
   }
 
-  private StreamPartitionCountMonitor getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
+  private Optional<StreamPartitionCountMonitor> getPartitionCountMonitor(Config config, SystemAdmins systemAdmins) {
     StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
     Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
     if (inputStreamsToMonitor.isEmpty()) {
       throw new SamzaException("Input streams to a job can not be empty.");
     }
 
-    return new StreamPartitionCountMonitor(
-        inputStreamsToMonitor,
-        streamMetadata,
-        metrics,
-        new JobConfig(config).getMonitorPartitionChangeFrequency(),
-        streamsChanged -> {
-        // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
+    return Optional.of(new StreamPartitionCountMonitor(inputStreamsToMonitor, streamMetadata, metrics,
+        new JobConfig(config).getMonitorPartitionChangeFrequency(), streamsChanged -> {
+      // Fail the jobs with durable state store. Otherwise, application state.status remains UNDEFINED s.t. YARN job will be restarted
         if (hasDurableStores) {
           log.error("Input topic partition count changed in a job with durable state. Failing the job.");
           state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
         }
         coordinatorException = new PartitionChangeException("Input topic partition count changes detected.");
-      });
+      }));
+  }
+
+  private Optional<StreamRegexMonitor> getInputRegexMonitor(Config config, SystemAdmins systemAdmins) {
+
+    // if input regex monitor is not enabled return empty
+    if (new JobConfig(config).getMonitorRegexEnabled()) {
+      log.info("StreamRegexMonitor is disabled.");
+      return Optional.empty();
+    }
+
+    StreamMetadataCache streamMetadata = new StreamMetadataCache(systemAdmins, 0, SystemClock.instance());
+    Set<SystemStream> inputStreamsToMonitor = new TaskConfigJava(config).getAllInputStreams();
+    if (inputStreamsToMonitor.isEmpty()) {
+      throw new SamzaException("Input streams to a job can not be empty.");
+    }
+
+    // First list all rewriters
+    Option<String> rewritersList = new JobConfig(config).getConfigRewriters();
+
+    // if no rewriter is defined, there is nothing to monitor
+    if (!rewritersList.isDefined()) {
+      log.warn("No config rewriters are defined. No StreamRegexMonitor created.");
+      return Optional.empty();
+    }
+
+    // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+    Map<String, Pattern> inputRegexesToMonitor =
+        JavaConverters.mapAsJavaMapConverter(new JobConfig(config).getMonitorRegexPatternMap(rewritersList.get()))
+            .asJava();
+
+    return Optional.of(new StreamRegexMonitor(inputStreamsToMonitor, inputRegexesToMonitor, streamMetadata, metrics,
+        new JobConfig(config).getMonitorRegexFrequency(), new StreamRegexMonitor.Callback() {
+          @Override
+          public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+              Map<String, Pattern> regexesMonitored) {
+            log.error("New input system-streams discovered. Failing the job. New input streams: {}", newInputStreams,
+                " Existing input streams:", inputStreamsToMonitor);
+            state.status = SamzaApplicationState.SamzaAppStatus.FAILED;
+            coordinatorException = new InputStreamsDiscoveredException("New input streams added: " + newInputStreams);
+          }
+        }));
   }
 
   // The following two methods are package-private and for testing only
@@ -321,10 +373,9 @@ public class ClusterBasedJobCoordinator {
 
   @VisibleForTesting
   StreamPartitionCountMonitor getPartitionMonitor() {
-    return partitionMonitor;
+    return partitionMonitor.get();
   }
 
-
   /**
    * The entry point for the {@link ClusterBasedJobCoordinator}
    * @param args args
@@ -335,12 +386,14 @@ public class ClusterBasedJobCoordinator {
     try {
       //Read and parse the coordinator system config.
       log.info("Parsing coordinator system config {}", coordinatorSystemEnv);
-      coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
+      coordinatorSystemConfig =
+          new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(coordinatorSystemEnv, Config.class));
     } catch (IOException e) {
       log.error("Exception while reading coordinator stream config {}", e);
       throw new SamzaException(e);
     }
     ClusterBasedJobCoordinator jc = new ClusterBasedJobCoordinator(coordinatorSystemConfig);
     jc.run();
+    log.info("Finished ClusterBasedJobCoordinator run");
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java b/samza-core/src/main/java/org/apache/samza/coordinator/InputStreamsDiscoveredException.java
new file mode 100644 (file)
index 0000000..6e85f43
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.SamzaException;
+
+
+/**
+ * Exception to indicate that the new input streams have been added.
+ */
+public class InputStreamsDiscoveredException extends SamzaException {
+
+  public InputStreamsDiscoveredException(String message) {
+    super(message);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java b/samza-core/src/main/java/org/apache/samza/coordinator/StreamRegexMonitor.java
new file mode 100644 (file)
index 0000000..3c86bfb
--- /dev/null
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * A single-thread based monitor that periodically monitors the given set of stream regexes, and matches them to
+ * the given set of streams. If a stream matching a given regex that is not in the corresponding stream set is detected,
+ * it invokes a {@link StreamRegexMonitor.Callback} with the initial input set, the new input stream set, and the regexes
+ * being monitored.
+ */
+public class StreamRegexMonitor {
+  private static final Logger log = LoggerFactory.getLogger(StreamRegexMonitor.class);
+
+  // Factory of daemon-threads to create the single threaded executor pool
+  private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setDaemon(true)
+      .setNameFormat("Samza-" + StreamRegexMonitor.class.getSimpleName())
+      .build();
+
+  // Enum to describe the state of the regexMonitor
+  private enum State {
+    INIT, RUNNING, STOPPED
+  }
+
+  private final Set<SystemStream> streamsToMonitor;
+  private final Map<String, Pattern> systemRegexesToMonitor;
+  private final StreamMetadataCache metadataCache;
+  private final int inputRegexMonitorPeriodMs;
+
+  // Map of gauges (one per system), emitted when new input stream for that system is detected
+  private final Map<String, Gauge<Integer>> gauges;
+
+  private final Callback callbackMethod;
+
+  // Used to guard write access to state.
+  private final Object lock = new Object();
+
+  private final ScheduledExecutorService schedulerService = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+
+  private volatile State state = State.INIT;
+
+  /**
+   * A callback that is invoked when the {@link StreamRegexMonitor} detects a new input stream matching given regex.
+   */
+  public interface Callback {
+    /**
+     * Method to be called when new input streams are detected.
+     * @param initialInputSet The initial set of input streams
+     * @param newInputStreams The set of new input streams discovered
+     * @param regexesMonitored The set of regexes being monitored
+     */
+    void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+        Map<String, Pattern> regexesMonitored);
+  }
+
+  /**
+   * Default constructor.
+   *
+   *  @param streamsToMonitor  a set of SystemStreams to monitor
+   * @param systemRegexesToMonitor  map of regexes for each input system
+   * @param metadataCache     the metadata cache which will be used to fetch metadata for partition counts.
+   * @param metrics           the metrics registry to which the metrics should be added.
+   * @param inputRegexMonitorPeriodMs the period at which the monitor will check each input-regex
+   * @param monitorCallback   the callback method to be invoked when new input stream matching regex is detected
+   */
+  public StreamRegexMonitor(Set<SystemStream> streamsToMonitor, Map<String, Pattern> systemRegexesToMonitor,
+      StreamMetadataCache metadataCache, MetricsRegistry metrics, int inputRegexMonitorPeriodMs,
+      Callback monitorCallback) {
+    this.streamsToMonitor = streamsToMonitor;
+    this.systemRegexesToMonitor = systemRegexesToMonitor;
+    this.metadataCache = metadataCache;
+    this.callbackMethod = monitorCallback;
+    this.inputRegexMonitorPeriodMs = inputRegexMonitorPeriodMs;
+
+    // Pre-populate the gauges
+    Map<String, Gauge<Integer>> mutableGauges = new HashMap<>();
+    for (String systemToMonitor : systemRegexesToMonitor.keySet()) {
+      Gauge gauge = metrics.newGauge("job-coordinator", String.format("%s-new-input-streams", systemToMonitor), 0);
+      mutableGauges.put(systemToMonitor, gauge);
+    }
+    gauges = Collections.unmodifiableMap(mutableGauges);
+
+    log.info("Created {} with inputRegexMonitorPeriodMs: {} and systemRegexesToMonitor: {}", this.getClass().getName(),
+        this.inputRegexMonitorPeriodMs, this.systemRegexesToMonitor);
+  }
+
+  /**
+   * Starts the monitor.
+   */
+  public void start() {
+    synchronized (lock) {
+      switch (state) {
+        case INIT:
+          if (inputRegexMonitorPeriodMs > 0) {
+            schedulerService.scheduleAtFixedRate(new Runnable() {
+              @Override
+              public void run() {
+                monitorInputRegexes();
+              }
+            }, 0, inputRegexMonitorPeriodMs, TimeUnit.MILLISECONDS);
+          }
+          state = State.RUNNING;
+          break;
+
+        case RUNNING:
+          // start is idempotent
+          return;
+
+        case STOPPED:
+          throw new IllegalStateException("StreamRegexMonitor was stopped and cannot be restarted.");
+      }
+    }
+  }
+
+  /**
+   * Stops the monitor. Once it stops, it cannot be restarted.
+   */
+  public void stop() {
+    synchronized (lock) {
+      // We could also wait for full termination of the scheduler service, but it is overkill for
+      // our use case.
+      schedulerService.shutdownNow();
+
+      state = State.STOPPED;
+    }
+  }
+
+  private void monitorInputRegexes() {
+    log.debug("Running monitorInputRegexes");
+
+    try {
+      // obtain the list of SysStreams that match given patterns for all systems
+      Set<SystemStream> inputStreamsMatchingPattern = new HashSet<>();
+
+      // For each input system, for which we have a regex to monitor
+      for (String systemName : this.systemRegexesToMonitor.keySet()) {
+
+        try {
+          // obtain the list of SysStreams that match the regex for this system
+          // using the systemAdmin in the metadataCache
+          inputStreamsMatchingPattern.addAll(
+              JavaConverters.setAsJavaSetConverter(this.metadataCache.getAllSystemStreams(systemName))
+                  .asJava()
+                  .stream()
+                  .filter(x -> x.getStream().matches(this.systemRegexesToMonitor.get(systemName).pattern()))
+                  .collect(Collectors.toSet()));
+        } catch (UnsupportedOperationException e) {
+          log.error("UnsupportedOperationException while monitoring input regexes for system {}", systemName, e);
+        }
+      }
+
+      // if there is a stream that is in the input-Set but not in the streamsToMonitor
+      // since streamsToMonitor = task.inputs
+      if (!streamsToMonitor.containsAll(inputStreamsMatchingPattern)) {
+        log.info("New input system-streams discovered. InputStreamsMatchingPattern: {} but streamsToMonitor: {} ",
+            inputStreamsMatchingPattern, streamsToMonitor);
+
+        // invoke notify callback with new input streams
+        this.callbackMethod.onInputStreamsChanged(streamsToMonitor,
+            Sets.difference(inputStreamsMatchingPattern, streamsToMonitor), systemRegexesToMonitor);
+      } else {
+        log.info("No new input system-Streams discovered streamsToMonitor {} inputStreamsMatchingPattern {}",
+            streamsToMonitor, inputStreamsMatchingPattern);
+      }
+    } catch (Exception e) {
+      log.error("Exception while monitoring input regexes.", e);
+    }
+  }
+
+  @VisibleForTesting
+  boolean isRunning() {
+    return state == State.RUNNING;
+  }
+
+  /**
+   * Wait until this service has shutdown. Returns true if shutdown occurred within the timeout
+   * and false otherwise.
+   * <p>
+   * This is currently exposed at the package private level for tests only.
+   */
+  @VisibleForTesting
+  boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+    return schedulerService.awaitTermination(timeout, unit);
+  }
+}
index 2bc6420..5363e72 100644 (file)
@@ -21,12 +21,16 @@ package org.apache.samza.config
 
 
 import java.io.File
+import java.util.regex.Pattern
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
 
+import scala.collection.mutable
+
+
 object JobConfig {
   // job config constants
   val STREAM_JOB_FACTORY_CLASS = "job.factory.class" // streaming.job_factory_class
@@ -77,6 +81,15 @@ object JobConfig {
   val JOB_FAIL_CHECKPOINT_VALIDATION = "job.checkpoint.validation.enabled"
   val MONITOR_PARTITION_CHANGE = "job.coordinator.monitor-partition-change"
   val MONITOR_PARTITION_CHANGE_FREQUENCY_MS = "job.coordinator.monitor-partition-change.frequency.ms"
+
+  val MONITOR_INPUT_REGEX_FREQUENCY_MS = "job.coordinator.monitor-input-regex.frequency.ms"
+  val DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS = 300000
+
+  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
+  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
+  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
+
+
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
@@ -127,7 +140,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getCoordinatorSystemName = {
     val system = getCoordinatorSystemNameOrNull
     if (system == null) {
-      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution.")
+      throw new ConfigException("Missing job.coordinator.system configuration. Cannot proceed with job execution." + config)
     }
     system
   }
@@ -158,10 +171,44 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     }
   }
 
+  // StreamRegexMonitor is disabled if the MonitorRegexFRequency is <= 0
+  def getMonitorRegexEnabled = (getMonitorRegexFrequency <= 0)
+
   def getMonitorPartitionChangeFrequency = getInt(
     JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS,
     JobConfig.DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS)
 
+  def getMonitorRegexFrequency = getInt(
+    JobConfig.MONITOR_INPUT_REGEX_FREQUENCY_MS,
+    JobConfig.DEFAULT_MONITOR_INPUT_REGEX_FREQUENCY_MS)
+
+  def getMonitorRegexPatternMap(rewritersList : String) : mutable.HashMap[String, Pattern] = {
+    // Compile a map of each input-system to its corresponding input-monitor-regex patterns
+    val inputRegexesToMonitor: mutable.HashMap[String, Pattern] = mutable.HashMap[String, Pattern]()
+    val rewriters: Array[String] = rewritersList.split(",")
+    // iterate over each rewriter and obtain the system and regex for it
+    for (rewriterName <- rewriters) {
+      val rewriterSystem: Option[String] = new JobConfig(config).getRegexResolvedSystem(rewriterName)
+      val rewriterRegex: Option[String] = new JobConfig(config).getRegexResolvedStreams(rewriterName)
+      if (rewriterSystem.isDefined && rewriterRegex.isDefined) {
+        var patternForSystem: Option[Pattern] = inputRegexesToMonitor.get(rewriterSystem.get)
+        patternForSystem =
+          if (patternForSystem == None) Some(Pattern.compile(rewriterRegex.get))
+          else
+            Some(Pattern.compile(String.join("|", patternForSystem.get.pattern(), rewriterRegex.get)))
+        inputRegexesToMonitor.put(rewriterSystem.get, patternForSystem.get)
+      }
+    }
+    inputRegexesToMonitor
+  }
+
+  // regex-related config methods duplicated from KafkaConfig to avoid module dependency
+  def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+
+  def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+
+
+
   def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
 
   def getJobId = getOption(JobConfig.JOB_ID).getOrElse("1")
index edffac7..abd6942 100644 (file)
 
 package org.apache.samza.system
 
-import org.apache.samza.util.{Logging, Clock, SystemClock}
+import org.apache.samza.util.{Clock, Logging, SystemClock}
 import org.apache.samza.SamzaException
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
@@ -84,6 +86,16 @@ class StreamMetadataCache (
   }
 
   /**
+    * Returns the list of System Streams for this system.
+    * @param systemName
+    * @param pattern
+    */
+  def getAllSystemStreams(systemName: String): mutable.Set[SystemStream] = {
+    val systemAdmin = systemAdmins.getSystemAdmin(systemName)
+    systemAdmin.getAllSystemStreams().asScala
+  }
+
+  /**
    * Returns metadata about the given streams. If the metadata isn't in the cache, it is retrieved from the systems
    * using the given SystemAdmins.
    *
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java b/samza-core/src/test/scala/org/apache/samza/coordinator/TestInputRegexMonitor.java
new file mode 100644 (file)
index 0000000..84b026b
--- /dev/null
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.collection.JavaConversions$;
+
+
+public class TestInputRegexMonitor {
+
+  private StreamRegexMonitor streamRegexMonitor;
+  private CountDownLatch callbackCount;
+
+  private int inputRegexMs = 10;
+  private String systemName = "kafka";
+  private int expectedNumberOfCallbacks = 1;
+  private Set<SystemStream> inputStreamsDiscovered;
+  private final SystemStream sampleStream = new SystemStream(systemName, "test-1");
+
+  @Before
+  public void setUp() {
+
+    inputStreamsDiscovered = new HashSet<>();
+    Map<String, Pattern> patternMap = new HashMap<>();
+    patternMap.put(systemName, Pattern.compile("test-.*"));
+
+    StreamMetadataCache mockStreamMetadataCache = new MockStreamMetadataCache(null, 1, null);
+
+    MetricsRegistry metrics = Mockito.mock(MetricsRegistry.class);
+    this.callbackCount = new CountDownLatch(expectedNumberOfCallbacks);
+
+    // Creating an streamRegexMonitor with empty-input set and test-.* regex input
+    this.streamRegexMonitor =
+        new StreamRegexMonitor(new HashSet<>(), patternMap, mockStreamMetadataCache, metrics, inputRegexMs,
+            new StreamRegexMonitor.Callback() {
+        @Override
+        public void onInputStreamsChanged(Set<SystemStream> initialInputSet, Set<SystemStream> newInputStreams,
+            Map<String, Pattern> regexesMonitored) {
+          callbackCount.countDown();
+          inputStreamsDiscovered.addAll(newInputStreams);
+
+          // Check that the newInputStream discovered is "kafka" "Test-1"
+          Assert.assertTrue(inputStreamsDiscovered.size() == 1);
+          Assert.assertTrue(inputStreamsDiscovered.contains(sampleStream));
+        }
+      });
+  }
+
+  @Test
+  public void testStartStop() throws InterruptedException {
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+
+    // Normal start
+    streamRegexMonitor.start();
+    Assert.assertTrue(streamRegexMonitor.isRunning());
+
+    // Start ought to be idempotent
+    streamRegexMonitor.start();
+    Assert.assertTrue(streamRegexMonitor.isRunning());
+
+    // Normal stop
+    streamRegexMonitor.stop();
+    Assert.assertTrue(streamRegexMonitor.awaitTermination(1, TimeUnit.SECONDS));
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+
+    try {
+      streamRegexMonitor.start();
+    } catch (Exception e) {
+      Assert.assertTrue(e.getClass().equals(IllegalStateException.class));
+    }
+
+    // Stop ought to be idempotent
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+    streamRegexMonitor.stop();
+    Assert.assertFalse(streamRegexMonitor.isRunning());
+  }
+
+  @Test
+  public void testSchedulingAndInputAddition() throws Exception {
+    this.streamRegexMonitor.start();
+    try {
+      if (!callbackCount.await(1, TimeUnit.SECONDS)) {
+        throw new Exception(
+            "Did not see " + expectedNumberOfCallbacks + " callbacks after waiting. " + callbackCount.toString());
+      }
+    } finally {
+      System.out.println("CallbackCount is " + callbackCount.getCount());
+      this.streamRegexMonitor.stop();
+    }
+  }
+
+  private class MockStreamMetadataCache extends StreamMetadataCache {
+
+    public MockStreamMetadataCache(SystemAdmins systemAdmins, int cacheTTLms, Clock clock) {
+      super(systemAdmins, cacheTTLms, clock);
+    }
+
+    @Override
+    public scala.collection.mutable.Set getAllSystemStreams(String systemName) {
+      Set<SystemStream> s = new HashSet<>();
+      return JavaConversions$.MODULE$.asScalaSet(new HashSet<SystemStream>(Arrays.asList(sampleStream)));
+    }
+  }
+}
index 2aafab1..0cc7a90 100644 (file)
@@ -20,7 +20,8 @@
 package org.apache.samza.coordinator
 
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.HashMap
+import java.util.regex.Pattern
+
 import org.apache.samza.Partition
 import org.apache.samza.metrics.{Gauge, MetricsRegistryMap}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,6 +35,7 @@ import org.scalatest.junit.AssertionsForJUnit
 import org.scalatest.mockito.MockitoSugar
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable.HashMap
 
 
 class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@@ -43,6 +45,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val mockMetadataCache = mock[StreamMetadataCache]
     val inputSystemStream = new SystemStream("test-system", "test-stream")
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
+    val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
 
     val initialPartitionMetadata = new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
       {
@@ -209,6 +212,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
   def testScheduler(): Unit = {
     val mockMetadataCache = new MockStreamMetadataCache
     val inputSystemStream = new SystemStream("test-system", "test-stream")
+    val inputRegexMap : java.util.Map[String, Pattern] = HashMap("test-system"-> Pattern.compile(".*")).asJava
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
     val sampleCount = new CountDownLatch(2); // Verify 2 invocations
 
index 596b07a..c3c66c7 100644 (file)
@@ -54,6 +54,7 @@ import org.apache.samza.config.SystemConfig;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.ExponentialSleepStrategy;
@@ -679,6 +680,13 @@ public class KafkaSystemAdmin implements SystemAdmin {
     return () -> ZkUtils.apply(zkConnect, 6000, 6000, false);
   }
 
+  @Override
+  public Set<SystemStream> getAllSystemStreams() {
+    return ((Set<String>) this.metadataConsumer.listTopics().keySet()).stream()
+        .map(x -> new SystemStream(systemName, x))
+        .collect(Collectors.toSet());
+  }
+
   /**
    * Container for metadata about offsets.
    */
index 1954ac7..607feb0 100644 (file)
@@ -40,9 +40,6 @@ object KafkaConfig {
   val TOPIC_REPLICATION_FACTOR = "replication.factor"
   val TOPIC_DEFAULT_REPLICATION_FACTOR = "2"
 
-  val REGEX_RESOLVED_STREAMS = "job.config.rewriter.%s.regex"
-  val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
-  val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
 
   val SEGMENT_BYTES = "segment.bytes"
 
@@ -206,11 +203,11 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   }
 
   // regex resolver
-  def getRegexResolvedStreams(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_STREAMS format rewriterName)
+  def getRegexResolvedStreams(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_STREAMS format rewriterName)
 
-  def getRegexResolvedSystem(rewriterName: String) = getOption(KafkaConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
+  def getRegexResolvedSystem(rewriterName: String) = getOption(JobConfig.REGEX_RESOLVED_SYSTEM format rewriterName)
 
-  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
+  def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((JobConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true)
 
   /**
     * Gets the replication factor for the changelog topics. Uses the following precedence.
index e6068b0..654354b 100644 (file)
@@ -21,7 +21,8 @@ package org.apache.samza.config
 
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig.{Config2Kafka, REGEX_RESOLVED_STREAMS}
+import org.apache.samza.config.KafkaConfig.{Config2Kafka}
+import org.apache.samza.config.JobConfig.{REGEX_RESOLVED_STREAMS}
 import org.apache.samza.SamzaException
 import org.apache.samza.util.{Logging, StreamUtil}
 
index 69d7da6..8b19292 100644 (file)
@@ -24,7 +24,7 @@ import collection.JavaConverters._
 import org.junit.Assert._
 import org.junit.Test
 
-import KafkaConfig._
+import JobConfig._
 
 class TestRegExTopicGenerator {