SAMZA-1907: Add metrics to monitor watermarks
authorxinyuiscool <xiliu@linkedin.com>
Tue, 25 Sep 2018 18:20:14 +0000 (11:20 -0700)
committerxiliu <xiliu@linkedin.com>
Tue, 25 Sep 2018 18:20:14 +0000 (11:20 -0700)
Add initial metric to monitor the aggregated watermark time.

Author: xinyuiscool <xiliu@linkedin.com>

Reviewers: Boris Shkolnik <boris@apache.org>

Closes #658 from xinyuiscool/SAMZA-1907

samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestWatermarkStates.java

index f0c0997..eedf45d 100644 (file)
@@ -338,6 +338,9 @@ public abstract class OperatorImpl<M, RM> {
       }
       // populate the watermark through the dag
       onWatermark(watermark, collector, coordinator);
+
+      // update metrics
+      watermarkStates.updateAggregateMetric(ssp, watermark);
     }
   }
 
index 7f62e00..367576a 100644 (file)
@@ -23,8 +23,10 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.JoinFunction;
@@ -114,7 +116,7 @@ public class OperatorImplGraph {
         new EndOfStreamStates(context.getSystemStreamPartitions(), producerTaskCounts));
     // set states for watermark
     taskContext.registerObject(WatermarkStates.class.getName(),
-        new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts));
+        new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts, getMetricsRegistry(context)));
 
     specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> {
         SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId);
@@ -403,4 +405,9 @@ public class OperatorImplGraph {
   private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) {
     return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet());
   }
+
+  private static MetricsRegistry getMetricsRegistry(TaskContext context) {
+    final SamzaContainerContext containerContext = context.getSamzaContainerContext();
+    return containerContext != null ? containerContext.metricsRegistry : context.getMetricsRegistry();
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkMetrics.java
new file mode 100644 (file)
index 0000000..657ba2a
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+class WatermarkMetrics extends MetricsBase {
+  private final Map<SystemStreamPartition, Gauge<Long>> aggregates = new ConcurrentHashMap<>();
+
+  WatermarkMetrics(MetricsRegistry registry) {
+    super("watermark-", registry);
+  }
+
+  void setAggregateTime(SystemStreamPartition systemStreamPartition, long time) {
+    final Gauge<Long> aggregate = aggregates.computeIfAbsent(systemStreamPartition,
+        ssp -> newGauge(String.format("%s-%s-aggr-watermark",
+        ssp.getStream(), ssp.getPartition().getPartitionId()), 0L));
+    aggregate.set(time);
+  }
+}
index 5cc66e2..b363b2c 100644 (file)
 
 package org.apache.samza.operators.impl;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.WatermarkMessage;
@@ -80,13 +83,26 @@ class WatermarkStates {
   }
 
   private final Map<SystemStreamPartition, WatermarkState> watermarkStates;
+  private final List<SystemStreamPartition> intermediateSsps;
+  private final WatermarkMetrics watermarkMetrics;
+
+  WatermarkStates(
+      Set<SystemStreamPartition> ssps,
+      Map<SystemStream, Integer> producerTaskCounts,
+      MetricsRegistry metricsRegistry) {
+    final Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
+    final List<SystemStreamPartition> intSsps = new ArrayList<>();
 
-  WatermarkStates(Set<SystemStreamPartition> ssps, Map<SystemStream, Integer> producerTaskCounts) {
-    Map<SystemStreamPartition, WatermarkState> states = new HashMap<>();
     ssps.forEach(ssp -> {
-        states.put(ssp, new WatermarkState(producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0)));
+        final int producerCount = producerTaskCounts.getOrDefault(ssp.getSystemStream(), 0);
+        states.put(ssp, new WatermarkState(producerCount));
+        if (producerCount != 0) {
+          intSsps.add(ssp);
+        }
       });
     this.watermarkStates = Collections.unmodifiableMap(states);
+    this.watermarkMetrics = new WatermarkMetrics(metricsRegistry);
+    this.intermediateSsps = Collections.unmodifiableList(intSsps);
   }
 
   /**
@@ -116,4 +132,12 @@ class WatermarkStates {
   long getWatermarkPerSSP(SystemStreamPartition ssp) {
     return watermarkStates.get(ssp).getWatermarkTime();
   }
+
+  void updateAggregateMetric(SystemStreamPartition ssp, long time) {
+    if (intermediateSsps.contains(ssp)) {
+      // Only report the aggregates watermarks for intermediate streams
+      // to reduce the amount of metrics
+      watermarkMetrics.setAggregateTime(ssp, time);
+    }
+  }
 }
index a726069..2b41ed2 100644 (file)
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.samza.Partition;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
@@ -53,7 +54,7 @@ public class TestWatermarkStates {
     producerCounts.put(intermediate, 2);
 
     // advance watermark on input to 5
-    WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts);
+    WatermarkStates watermarkStates = new WatermarkStates(ssps, producerCounts, new MetricsRegistryMap());
     IncomingMessageEnvelope envelope = IncomingMessageEnvelope.buildWatermarkEnvelope(inputPartition0, 5L);
     watermarkStates.update((WatermarkMessage) envelope.getMessage(),
         envelope.getSystemStreamPartition());