SAMZA-1283: Expose the buffered-message-size metric
authorXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 10 May 2017 23:50:49 +0000 (16:50 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Wed, 10 May 2017 23:50:49 +0000 (16:50 -0700)
Regardless of whether we enable size limit for the consumer buffer, this metric helps to see what's the buffer size and make configuring size limit easier.

Author: Xinyu Liu <xiliu@xiliu-ld.linkedin.biz>

Reviewers: Jagadish Venkatraman <vjagadish1989@gmail.com>

Closes #184 from xinyuiscool/SAMZA-1283

samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala

index 8238d2e..0205a44 100644 (file)
@@ -68,7 +68,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;  // size in bytes per SystemStreamPartition
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
   private final Clock clock;
-  protected final boolean fetchLimitByBytesEnabled;
 
   public BlockingEnvelopeMap() {
     this(new NoOpMetricsRegistry());
@@ -83,17 +82,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
-    this(metricsRegistry, clock, null, false);
+    this(metricsRegistry, clock, null);
   }
 
-  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
     metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
     this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
     this.clock = clock;
-    this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
-    // Created when size is disabled for code simplification, and as the overhead is negligible.
     this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
   }
 
@@ -103,7 +100,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
-    // Created when size is disabled for code simplification, and the overhead is negligible.
     bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
   }
 
@@ -155,9 +151,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
       if (outgoingList.size() > 0) {
         messagesToReturn.put(systemStreamPartition, outgoingList);
-        if (fetchLimitByBytesEnabled) {
-          subtractSizeOnQDrain(systemStreamPartition, outgoingList);
-        }
+        subtractSizeOnQDrain(systemStreamPartition, outgoingList);
       }
     }
 
@@ -183,9 +177,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
    */
   protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
     bufferedMessages.get(systemStreamPartition).put(envelope);
-    if (fetchLimitByBytesEnabled) {
-      bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
-    }
+    bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
   }
 
   /**
@@ -262,9 +254,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
       this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
 
       metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
-      if (fetchLimitByBytesEnabled) {
-        metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
-      }
+      metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
index afdae16..f5394c0 100644 (file)
@@ -213,7 +213,7 @@ public class TestBlockingEnvelopeMap {
     }
 
     public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
-      super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
+      super(new NoOpMetricsRegistry(), CLOCK, null);
       injectedQueue = new MockQueue();
     }
 
index f25bb68..aa13fd8 100644 (file)
@@ -127,8 +127,7 @@ private[kafka] class KafkaSystemConsumer(
     new Clock {
       def currentTimeMillis = clock()
     },
-    classOf[KafkaSystemConsumerMetrics].getName,
-    fetchLimitByBytesEnabled) with Toss with Logging {
+    classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()