SAMZA-1742: Add metrics reporter parameter to LocalApplicationRunner constructor.
authorDaniel Nishimura <dnishimura@linkedin.com>
Fri, 8 Jun 2018 21:02:53 +0000 (14:02 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 8 Jun 2018 21:02:53 +0000 (14:02 -0700)
vjagadish1989 this has already been reviewed and approved by you and cameronlee314 internally. Please approve here. Thanks!

Author: Daniel Nishimura <dnishimura@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #550 from dnishimura/samza-1742-localapplicationrunner-custom-metrics

samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java

index d64e57a..d3df741 100644 (file)
@@ -23,6 +23,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -42,6 +43,7 @@ import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.operators.StreamGraphSpec;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.processor.StreamProcessorLifecycleListener;
@@ -65,6 +67,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
+  private final Map<String, MetricsReporter> customMetricsReporters;
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
@@ -124,8 +127,13 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   }
 
   public LocalApplicationRunner(Config config) {
+    this(config, new HashMap<>());
+  }
+
+  public LocalApplicationRunner(Config config, Map<String, MetricsReporter> customMetricsReporters) {
     super(config);
-    uid = UUID.randomUUID().toString();
+    this.uid = UUID.randomUUID().toString();
+    this.customMetricsReporters = customMetricsReporters;
   }
 
   @Override
@@ -313,10 +321,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
   private StreamProcessor getStreamProcessorInstance(Config config, Object taskFactory, StreamProcessorLifecycleListener listener) {
     if (taskFactory instanceof StreamTaskFactory) {
       return new StreamProcessor(
-          config, new HashMap<>(), (StreamTaskFactory) taskFactory, listener);
+          config, customMetricsReporters, (StreamTaskFactory) taskFactory, listener);
     } else if (taskFactory instanceof AsyncStreamTaskFactory) {
       return new StreamProcessor(
-          config, new HashMap<>(), (AsyncStreamTaskFactory) taskFactory, listener);
+          config, customMetricsReporters, (AsyncStreamTaskFactory) taskFactory, listener);
     } else {
       throw new SamzaException(String.format("%s is not a valid task factory",
           taskFactory.getClass().getCanonicalName()));