y
authorbharathkk <codin.martial@gmail.com>
Sat, 4 Aug 2018 00:21:33 +0000 (17:21 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Sat, 4 Aug 2018 00:21:33 +0000 (17:21 -0700)
- Fixed serde issues during persisted offsets locally
- Added unit tests for TaskSideInputStorageManager
- Added integration tests for table integration

Author: bharathkk <codin.martial@gmail.com>
Author: Bharath Kumarasubramanian <bkumaras@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apache.org>

Closes #594 from bharathkk/side-input-tests

samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java
samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java

index 66c9106..3982623 100644 (file)
@@ -19,6 +19,7 @@
 
 package org.apache.samza.storage;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 
 import java.io.File;
@@ -38,6 +39,7 @@ import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -50,6 +52,8 @@ import org.apache.samza.util.Clock;
 import org.apache.samza.util.FileUtil;
 
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectWriter;
+import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
@@ -63,7 +67,10 @@ public class TaskSideInputStorageManager {
   private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
   private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
   private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
+  private static final TypeReference<HashMap<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
+      new TypeReference<HashMap<SystemStreamPartition, String>>() { };
+  private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
 
   private final Clock clock;
   private final Map<String, SideInputsProcessor> storeToProcessor;
@@ -185,6 +192,14 @@ public class TaskSideInputStorageManager {
   }
 
   /**
+   * For unit testing only
+   */
+  @VisibleForTesting
+  void updateLastProcessedOffset(SystemStreamPartition ssp, String offset) {
+    lastProcessedOffsets.put(ssp, offset);
+  }
+
+  /**
    * Processes the incoming side input message envelope and updates the last processed offset for its SSP.
    *
    * @param message incoming message to be processed
@@ -221,7 +236,7 @@ public class TaskSideInputStorageManager {
           FileUtil.rm(storeLocation);
         }
 
-        if (!storeLocation.exists()) {
+        if (isPersistedStore(storeName) && !storeLocation.exists()) {
           LOG.info("Creating {} as the store directory for the side input store {}", storePath, storeName);
           storeLocation.mkdirs();
         }
@@ -232,7 +247,8 @@ public class TaskSideInputStorageManager {
    * Writes the offset files for all side input stores one by one. There is one offset file per store.
    * Its contents are a JSON encoded mapping from each side input SSP to its last processed offset, and a checksum.
    */
-  private void writeOffsetFiles() {
+  @VisibleForTesting
+  void writeOffsetFiles() {
     storeToSSps.entrySet().stream()
         .filter(entry -> isPersistedStore(entry.getKey())) // filter out in-memory side input stores
         .forEach((entry) -> {
@@ -242,7 +258,7 @@ public class TaskSideInputStorageManager {
               .collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
 
             try {
-              String fileContents = OBJECT_MAPPER.writeValueAsString(offsets);
+              String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
               File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
               FileUtil.writeWithChecksum(offsetFile, fileContents);
             } catch (Exception e) {
@@ -257,7 +273,8 @@ public class TaskSideInputStorageManager {
    * @return a {@link Map} of {@link SystemStreamPartition} to offset in the offset files.
    */
   @SuppressWarnings("unchecked")
-  private Map<SystemStreamPartition, String> getFileOffsets() {
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getFileOffsets() {
     LOG.info("Loading initial offsets from the file for side input stores.");
     Map<SystemStreamPartition, String> fileOffsets = new HashMap<>();
 
@@ -268,7 +285,7 @@ public class TaskSideInputStorageManager {
         if (isValidSideInputStore(storeName, storeLocation)) {
           try {
             String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
-            Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, Map.class);
+            Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
             fileOffsets.putAll(offsets);
           } catch (Exception e) {
             LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
@@ -279,7 +296,8 @@ public class TaskSideInputStorageManager {
     return fileOffsets;
   }
 
-  private File getStoreLocation(String storeName) {
+  @VisibleForTesting
+  File getStoreLocation(String storeName) {
     return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
   }
 
@@ -292,7 +310,8 @@ public class TaskSideInputStorageManager {
    * @param oldestOffsets oldest offsets from the source
    * @return a {@link Map} of {@link SystemStreamPartition} to offset
    */
-  private Map<SystemStreamPartition, String> getStartingOffsets(
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getStartingOffsets(
       Map<SystemStreamPartition, String> fileOffsets, Map<SystemStreamPartition, String> oldestOffsets) {
     Map<SystemStreamPartition, String> startingOffsets = new HashMap<>();
 
@@ -317,7 +336,8 @@ public class TaskSideInputStorageManager {
    *
    * @return a {@link Map} of {@link SystemStreamPartition} to their oldest offset.
    */
-  private Map<SystemStreamPartition, String> getOldestOffsets() {
+  @VisibleForTesting
+  Map<SystemStreamPartition, String> getOldestOffsets() {
     Map<SystemStreamPartition, String> oldestOffsets = new HashMap<>();
 
     // Step 1
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTaskSideInputStorageManager.java
new file mode 100644 (file)
index 0000000..2d60f7b
--- /dev/null
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.collect.ImmutableSet;
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ScalaJavaUtil;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestTaskSideInputStorageManager {
+  private static final String LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "logged-store";
+  private static final String NON_LOGGED_STORE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "non-logged-store";
+
+  @Test
+  public void testInit() {
+    final String storeName = "test-init-store";
+    final String taskName = "test-init-task";
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ImmutableSet.of())
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+
+    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
+    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
+  }
+
+  @Test
+  public void testFlush() {
+    final String storeName = "test-flush-store";
+    final String taskName = "test-flush-task";
+    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+    final String offset = "123";
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ImmutableSet.of(ssp))
+        .build();
+    Map<String, StorageEngine> stores = new HashMap<>();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
+    testSideInputStorageManager.flush();
+
+    for (StorageEngine storageEngine : stores.values()) {
+      verify(storageEngine).flush();
+    }
+
+    verify(testSideInputStorageManager).writeOffsetFiles();
+
+    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
+    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
+
+    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
+    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
+    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
+  }
+
+  @Test
+  public void testStop() {
+    final String storeName = "test-stop-store";
+    final String taskName = "test-stop-task";
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
+        .addInMemoryStore(storeName, ImmutableSet.of())
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    testSideInputStorageManager.stop();
+
+    verify(testSideInputStorageManager.getStore(storeName)).stop();
+    verify(testSideInputStorageManager).writeOffsetFiles();
+  }
+
+  @Test
+  public void testWriteOffsetFilesForNonPersistedStore() {
+    final String storeName = "test-write-offset-non-persisted-store";
+    final String taskName = "test-write-offset-for-non-persisted-task";
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, NON_LOGGED_STORE_DIR)
+        .addInMemoryStore(storeName, ImmutableSet.of())
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    testSideInputStorageManager.writeOffsetFiles(); // should be no-op
+    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
+
+    assertFalse("Store directory: " + storeDir.getPath() + " should not be created for non-persisted store", storeDir.exists());
+  }
+
+  @Test
+  public void testWriteOffsetFilesForPersistedStore() {
+    final String storeName = "test-write-offset-persisted-store";
+    final String storeName2 = "test-write-offset-persisted-store-2";
+
+    final String taskName = "test-write-offset-for-persisted-task";
+    final String offset = "123";
+    final SystemStreamPartition ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0));
+    final SystemStreamPartition ssp2 = new SystemStreamPartition("test-system2", "test-stream2", new Partition(0));
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ImmutableSet.of(ssp))
+        .addLoggedStore(storeName2, ImmutableSet.of(ssp2))
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    testSideInputStorageManager.updateLastProcessedOffset(ssp, offset);
+    testSideInputStorageManager.updateLastProcessedOffset(ssp2, offset);
+    testSideInputStorageManager.writeOffsetFiles();
+    File storeDir = testSideInputStorageManager.getStoreLocation(storeName);
+
+    assertTrue("Store directory: " + storeDir.getPath() + " is missing.", storeDir.exists());
+
+    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
+
+    assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
+    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
+
+    assertTrue("Failed to get offset for ssp: " + ssp2.toString() + " from file.", fileOffsets.containsKey(ssp2));
+    assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp2), offset);
+  }
+
+  @Test
+  public void testGetFileOffsets() {
+    final String storeName = "test-get-file-offsets-store";
+    final String taskName = "test-get-file-offsets-task";
+    final String offset = "123";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
+        .collect(Collectors.toSet());
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ssps)
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    ssps.forEach(ssp -> testSideInputStorageManager.updateLastProcessedOffset(ssp, offset));
+    testSideInputStorageManager.writeOffsetFiles();
+
+    Map<SystemStreamPartition, String> fileOffsets = testSideInputStorageManager.getFileOffsets();
+
+    ssps.forEach(ssp -> {
+        assertTrue("Failed to get offset for ssp: " + ssp.toString() + " from file.", fileOffsets.containsKey(ssp));
+        assertEquals("Mismatch between last processed offset and file offset.", fileOffsets.get(ssp), offset);
+      });
+  }
+
+  @Test
+  public void testGetStartingOffsets() {
+    final String storeName = "test-get-starting-offset-store";
+    final String taskName = "test-get-starting-offset-task";
+
+    Set<SystemStreamPartition> ssps = IntStream.range(1, 6)
+        .mapToObj(idx -> new SystemStreamPartition("test-system", "test-stream", new Partition(idx)))
+        .collect(Collectors.toSet());
+
+
+    TaskSideInputStorageManager testSideInputStorageManager = new MockTaskSideInputStorageManagerBuilder(taskName, LOGGED_STORE_DIR)
+        .addLoggedStore(storeName, ssps)
+        .build();
+
+    initializeSideInputStorageManager(testSideInputStorageManager);
+    Map<SystemStreamPartition, String> fileOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId + 10 : partitionId;
+            return String.valueOf(offset);
+          }));
+
+    Map<SystemStreamPartition, String> oldestOffsets = ssps.stream()
+        .collect(Collectors.toMap(Function.identity(), ssp -> {
+            int partitionId = ssp.getPartition().getPartitionId();
+            int offset = partitionId % 2 == 0 ? partitionId : partitionId + 10;
+
+            return String.valueOf(offset);
+          }));
+
+    doCallRealMethod().when(testSideInputStorageManager).getStartingOffsets(fileOffsets, oldestOffsets);
+
+    Map<SystemStreamPartition, String> startingOffsets =
+        testSideInputStorageManager.getStartingOffsets(fileOffsets, oldestOffsets);
+
+    assertTrue("Failed to get starting offsets for all ssps", startingOffsets.size() == 5);
+  }
+
+  private void initializeSideInputStorageManager(TaskSideInputStorageManager testSideInputStorageManager) {
+    doReturn(new HashMap<>()).when(testSideInputStorageManager).getStartingOffsets(any(), any());
+    testSideInputStorageManager.init();
+  }
+
+  private static final class MockTaskSideInputStorageManagerBuilder {
+    private final TaskName taskName;
+    private final String storeBaseDir;
+
+    private Clock clock = mock(Clock.class);
+    private Map<String, SideInputsProcessor> storeToProcessor = new HashMap<>();
+    private Map<String, StorageEngine> stores = new HashMap<>();
+    private Map<String, Set<SystemStreamPartition>> storeToSSps = new HashMap<>();
+    private StreamMetadataCache streamMetadataCache = mock(StreamMetadataCache.class);
+    private SystemAdmins systemAdmins = mock(SystemAdmins.class);
+
+    public MockTaskSideInputStorageManagerBuilder(String taskName, String storeBaseDir) {
+      this.taskName = new TaskName(taskName);
+      this.storeBaseDir = storeBaseDir;
+
+      initializeMocks();
+    }
+
+    private void initializeMocks() {
+      SystemAdmin admin = mock(SystemAdmin.class);
+      doAnswer(invocation -> {
+          String offset1 = invocation.getArgumentAt(0, String.class);
+          String offset2 = invocation.getArgumentAt(1, String.class);
+
+          return Long.compare(Long.parseLong(offset1), Long.parseLong(offset2));
+        }).when(admin).offsetComparator(any(), any());
+      doAnswer(invocation -> {
+          Map<SystemStreamPartition, String> sspToOffsets = invocation.getArgumentAt(0, Map.class);
+
+          return sspToOffsets.entrySet()
+              .stream()
+              .collect(Collectors.toMap(Map.Entry::getKey,
+                  entry -> String.valueOf(Long.parseLong(entry.getValue()) + 1)));
+        }).when(admin).getOffsetsAfter(any());
+      doReturn(admin).when(systemAdmins).getSystemAdmin("test-system");
+
+      doReturn(ScalaJavaUtil.toScalaMap(new HashMap<>())).when(streamMetadataCache).getStreamMetadata(any(), anyBoolean());
+    }
+
+    MockTaskSideInputStorageManagerBuilder addInMemoryStore(String storeName, Set<SystemStreamPartition> ssps) {
+      StorageEngine storageEngine = mock(StorageEngine.class);
+      when(storageEngine.getStoreProperties()).thenReturn(
+          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(false).build());
+
+      stores.put(storeName, storageEngine);
+      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
+      storeToSSps.put(storeName, ssps);
+
+      return this;
+    }
+
+    MockTaskSideInputStorageManagerBuilder addLoggedStore(String storeName, Set<SystemStreamPartition> ssps) {
+      StorageEngine storageEngine = mock(StorageEngine.class);
+      when(storageEngine.getStoreProperties()).thenReturn(
+          new StoreProperties.StorePropertiesBuilder().setLoggedStore(false).setPersistedToDisk(true).build());
+
+      stores.put(storeName, storageEngine);
+      storeToProcessor.put(storeName, mock(SideInputsProcessor.class));
+      storeToSSps.put(storeName, ssps);
+
+      return this;
+    }
+
+    TaskSideInputStorageManager build() {
+      return spy(new TaskSideInputStorageManager(taskName, streamMetadataCache, storeBaseDir, stores,
+          storeToProcessor, storeToSSps, systemAdmins, mock(Config.class), clock));
+    }
+  }
+}
\ No newline at end of file
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
new file mode 100644 (file)
index 0000000..d9016b1
--- /dev/null
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.RocksDbTableDescriptor;
+import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.table.Table;
+import org.apache.samza.test.framework.TestRunner;
+import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness {
+  private static final String PAGEVIEW_STREAM = "pageview";
+  private static final String PROFILE_STREAM = "profile";
+  private static final String ENRICHED_PAGEVIEW_STREAM = "enrichedpageview";
+
+  @Test
+  public void testJoinWithSideInputsTable() {
+    runTest(
+        "side-input-join",
+        new PageViewProfileJoin(),
+        Arrays.asList(TestTableData.generatePageViews(10)),
+        Arrays.asList(TestTableData.generateProfiles(10)));
+  }
+
+  @Test
+  public void testJoinWithDurableSideInputTable() {
+    runTest(
+        "durable-side-input",
+        new DurablePageViewProfileJoin(),
+        Arrays.asList(TestTableData.generatePageViews(5)),
+        Arrays.asList(TestTableData.generateProfiles(5)));
+  }
+
+  private void runTest(String systemName, StreamApplication app, List<TestTableData.PageView> pageViews,
+      List<TestTableData.Profile> profiles) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
+    configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
+    configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
+
+    CollectionStream<TestTableData.PageView> pageViewStream =
+        CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews);
+    CollectionStream<TestTableData.Profile> profileStream =
+        CollectionStream.of(systemName, PROFILE_STREAM, profiles);
+
+    CollectionStream<TestTableData.EnrichedPageView> outputStream =
+        CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM);
+
+    TestRunner
+        .of(app)
+        .addInputStream(pageViewStream)
+        .addInputStream(profileStream)
+        .addOutputStream(outputStream)
+        .addConfigs(new MapConfig(configs))
+        .run(Duration.ofMillis(100000));
+
+    try {
+      Map<Integer, List<TestTableData.EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000));
+      List<TestTableData.EnrichedPageView> results = result.values().stream()
+          .flatMap(List::stream)
+          .collect(Collectors.toList());
+
+      List<TestTableData.EnrichedPageView> expectedEnrichedPageviews = pageViews.stream()
+          .flatMap(pv -> profiles.stream()
+              .filter(profile -> pv.memberId == profile.memberId)
+              .map(profile -> new TestTableData.EnrichedPageView(pv.pageKey, profile.memberId, profile.company)))
+          .collect(Collectors.toList());
+
+      boolean successfulJoin = results.stream().allMatch(expectedEnrichedPageviews::contains);
+      assertEquals("Mismatch between the expected and actual join count", results.size(),
+          expectedEnrichedPageviews.size());
+      assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  static class PageViewProfileJoin implements StreamApplication {
+    static final String PROFILE_TABLE = "profile-table";
+
+    @Override
+    public void init(StreamGraph graph, Config config) {
+      Table<KV<Integer, TestTableData.Profile>> table = graph.getTable(getTableDescriptor());
+
+      graph.getInputStream(PAGEVIEW_STREAM, new NoOpSerde<TestTableData.PageView>())
+          .partitionBy(TestTableData.PageView::getMemberId, v -> v, "partition-page-view")
+          .join(table, new TestLocalTable.PageViewToProfileJoinFunction())
+          .sendTo(graph.getOutputStream(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<>()));
+    }
+
+    protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() {
+      return new InMemoryTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE)
+          .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde()))
+          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+              TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
+              int key = profile.getMemberId();
+
+              return ImmutableList.of(new Entry<>(key, profile));
+            });
+    }
+  }
+
+  static class DurablePageViewProfileJoin extends PageViewProfileJoin {
+    @Override
+    protected TableDescriptor<Integer, TestTableData.Profile, ?> getTableDescriptor() {
+      return new RocksDbTableDescriptor<Integer, TestTableData.Profile>(PROFILE_TABLE)
+          .withSerde(KVSerde.of(new IntegerSerde(), new TestTableData.ProfileJsonSerde()))
+          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+          .withSideInputsProcessor((msg, store) -> {
+              TestTableData.Profile profile = (TestTableData.Profile) msg.getMessage();
+              int key = profile.getMemberId();
+
+              return ImmutableList.of(new Entry<>(key, profile));
+            });
+    }
+  }
+}
index dfd0d1b..ed73961 100644 (file)
@@ -20,8 +20,8 @@
 package org.apache.samza.test.table;
 
 import java.io.Serializable;
+import java.util.Objects;
 import java.util.Random;
-
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
@@ -112,6 +112,26 @@ public class TestTableData {
       super(pageKey, memberId);
       this.company = company;
     }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(company, memberId, pageKey);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      EnrichedPageView that = (EnrichedPageView) o;
+      return Objects.equals(company, that.company) && Objects.equals(memberId, that.memberId) && Objects.equals(pageKey,
+          that.pageKey);
+    }
   }
 
   public static class PageViewJsonSerdeFactory implements SerdeFactory<PageView> {