SAMZA-1256: Improve trace logging for troubleshooting the fluent API
authorJacob Maes <jmaes@linkedin.com>
Wed, 3 May 2017 20:54:35 +0000 (13:54 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 3 May 2017 20:54:35 +0000 (13:54 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Jagadish <jvenkatr@linkedin.com>, Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #155 from jmakes/operator-trace-logging

samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
samza-core/src/main/java/org/apache/samza/operators/WindowState.java
samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java

index a1e7774..6c66654 100644 (file)
@@ -52,7 +52,7 @@ public class WindowKey<K> {
   @Override
   public String toString() {
     String wndKey = "";
-    if (!(key instanceof Void)) {
+    if (!(key instanceof Void) && key != null) {
       wndKey = String.format("%s:", key.toString());
     }
     return String.format("%s%s", wndKey, paneId);
index 4e80862..801044b 100644 (file)
@@ -41,4 +41,9 @@ public class WindowState<WV> {
   public long getEarliestTimestamp() {
     return earliestRecvTime;
   }
+
+  @Override
+  public String toString() {
+    return String.format("WindowState: {time=%d, value=%s}", earliestRecvTime, wv);
+  }
 }
index 49fefc0..c089737 100644 (file)
@@ -70,4 +70,9 @@ public class TriggerKey<WK> {
   public FiringType getType() {
     return type;
   }
+
+  @Override
+  public String toString() {
+    return String.format("TriggerKey: {type=%s, key=%s}", type, key);
+  }
 }
\ No newline at end of file
index b99f719..a297aba 100644 (file)
@@ -109,6 +109,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
 
     WindowKey<WK> storeKey =  getStoreKey(message);
     WindowState<WV> existingState = store.get(storeKey);
+    LOG.trace("Store key ({}) has existing state ({})", storeKey, existingState);
     WindowState<WV> newState = applyFoldFunction(existingState, message);
 
     LOG.trace("New window value: {}, earliest timestamp: {}",
@@ -185,7 +186,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
     long earliestTimestamp;
 
     if (existingState == null) {
-      LOG.trace("No existing state found for key");
+      LOG.trace("No existing state found for key. Invoking initializer.");
       wv = window.getInitializer().get();
       earliestTimestamp = clock.currentTimeMillis();
     } else {