SAMZA-913: fix CoordinatorSystemConsumer bootstrap missing messages issue
authorJacob Maes <jacob.maes@gmail.com>
Fri, 25 Mar 2016 18:52:19 +0000 (11:52 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Fri, 25 Mar 2016 18:52:19 +0000 (11:52 -0700)
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java

index e1a7626..8e1057b 100644 (file)
@@ -152,6 +152,10 @@ public class CoordinatorStreamSystemConsumer {
         }
         CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
         log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+        // Remove any existing entry. Set.add() does not add if the element already exists.
+        if (bootstrappedStreamSet.remove(coordinatorStreamMessage)) {
+          log.debug("Removed duplicate message: {}", coordinatorStreamMessage);
+        }
         bootstrappedStreamSet.add(coordinatorStreamMessage);
         if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
           String configKey = coordinatorStreamMessage.getKey();
@@ -182,7 +186,9 @@ public class CoordinatorStreamSystemConsumer {
     bootstrap();
     LinkedHashSet<CoordinatorStreamMessage> bootstrappedStream = new LinkedHashSet<CoordinatorStreamMessage>();
     for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+      log.trace("Considering message: {}", coordinatorStreamMessage);
       if (type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+        log.trace("Adding message: {}", coordinatorStreamMessage);
         bootstrappedStream.add(coordinatorStreamMessage);
       }
     }
index 0e73e18..417772c 100644 (file)
 
 package org.apache.samza.coordinator.stream;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
@@ -44,6 +40,15 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anySet;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class TestCoordinatorStreamSystemConsumer {
   @Test
   public void testCoordinatorStreamSystemConsumer() {
@@ -110,6 +115,42 @@ public class TestCoordinatorStreamSystemConsumer {
     return true;
   }
 
+  /**
+   * Verify that if a particular key-value is written, then another, then the original again,
+   * that the original occurs last in the set.
+   */
+  @Test
+  public void testOrderKeyRewrite() throws InterruptedException {
+    final SystemStream systemStream = new SystemStream("system", "stream");
+    final SystemStreamPartition ssp = new SystemStreamPartition(systemStream, new Partition(0));
+    final SystemConsumer systemConsumer = mock(SystemConsumer.class);
+
+    final List<IncomingMessageEnvelope> list = new ArrayList<>();
+    SetConfig setConfig1 = new SetConfig("source", "key1", "value1");
+    SetConfig setConfig2 = new SetConfig("source", "key1", "value2");
+    SetConfig setConfig3 = new SetConfig("source", "key1", "value1");
+    list.add(createIncomingMessageEnvelope(setConfig1, ssp));
+    list.add(createIncomingMessageEnvelope(setConfig2, ssp));
+    list.add(createIncomingMessageEnvelope(setConfig3, ssp));
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> messages = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>() {
+      {
+        put(ssp, list);
+      }
+    };
+    when(systemConsumer.poll(anySet(), anyLong())).thenReturn(messages, Collections.<SystemStreamPartition, List<IncomingMessageEnvelope>>emptyMap());
+
+    CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+
+    consumer.bootstrap();
+
+    Set<CoordinatorStreamMessage> bootstrappedMessages = consumer.getBoostrappedStream();
+
+    assertEquals(2, bootstrappedMessages.size()); // First message should have been removed as a duplicate
+    CoordinatorStreamMessage[] coordinatorStreamMessages = bootstrappedMessages.toArray(new CoordinatorStreamMessage[2]);
+    assertEquals(setConfig2, coordinatorStreamMessages[0]);
+    assertEquals(setConfig3, coordinatorStreamMessages[1]); //Config 3 MUST be the last message, not config 2
+  }
+
   private static class MockSystemConsumer implements SystemConsumer {
     private boolean started = false;
     private boolean stopped = false;
@@ -172,4 +213,14 @@ public class TestCoordinatorStreamSystemConsumer {
       return stopped;
     }
   }
+
+  private IncomingMessageEnvelope createIncomingMessageEnvelope(CoordinatorStreamMessage message, SystemStreamPartition ssp) {
+    try {
+      byte[] key = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getKeyArray()).getBytes("UTF-8");
+      byte[] value = SamzaObjectMapper.getObjectMapper().writeValueAsString(message.getMessageMap()).getBytes("UTF-8");
+      return new IncomingMessageEnvelope(ssp, null, key, value);
+    } catch (Exception e) {
+      return null;
+    }
+  }
 }