SAMZA-1803: Make InMemoryManager system aware
authorBharath Kumarasubramanian <bkumaras@linkedin.com>
Thu, 9 Aug 2018 17:49:51 +0000 (10:49 -0700)
committerBoris S <boryas@apache.org>
Thu, 9 Aug 2018 17:49:51 +0000 (10:49 -0700)
- Fix getSystemStreamMetadata in InMemoryManager to filter based on the system name on top of stream names

Author: Bharath Kumarasubramanian <bkumaras@linkedin.com>

Reviewers: Xinyu Liu <xinyuiscool@github.com>

Closes #601 from bharathkk/in-memory-fix

samza-core/src/main/java/org/apache/samza/system/inmemory/InMemoryManager.java
samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemAdmin.java
samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java

index 03bc8d7..a704bc4 100644 (file)
@@ -116,15 +116,17 @@ class InMemoryManager {
   /**
    * Fetch system stream metadata for the given streams.
    *
+   * @param systemName system name
    * @param streamNames set of input streams
    *
    * @return a {@link Map} of stream to {@link SystemStreamMetadata}
    */
-  Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
+  Map<String, SystemStreamMetadata> getSystemStreamMetadata(String systemName, Set<String> streamNames) {
     Map<String, Map<SystemStreamPartition, List<IncomingMessageEnvelope>>> result =
         bufferedMessages.entrySet()
             .stream()
-            .filter(map -> streamNames.contains(map.getKey().getStream()))
+            .filter(entry -> systemName.equals(entry.getKey().getSystem()) 
+                && streamNames.contains(entry.getKey().getStream()))
             .collect(Collectors.groupingBy(entry -> entry.getKey().getStream(),
                 Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
 
@@ -147,7 +149,7 @@ class InMemoryManager {
   }
 
   private SystemStreamMetadata constructSystemStreamMetadata(
-      String systemName,
+      String streamName,
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> sspToMessagesForSystem) {
 
     Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata =
@@ -163,7 +165,7 @@ class InMemoryManager {
 
               }));
 
-    return new SystemStreamMetadata(systemName, partitionMetadata);
+    return new SystemStreamMetadata(streamName, partitionMetadata);
   }
 
   private List<IncomingMessageEnvelope> poll(SystemStreamPartition ssp, String offset) {
index 327615e..65f45a0 100644 (file)
@@ -34,9 +34,11 @@ import org.apache.samza.system.SystemStreamPartition;
  */
 public class InMemorySystemAdmin implements SystemAdmin {
   private final InMemoryManager inMemoryManager;
+  private final String systemName;
 
-  public InMemorySystemAdmin(InMemoryManager manager) {
+  public InMemorySystemAdmin(String systemName, InMemoryManager manager) {
     inMemoryManager = manager;
+    this.systemName = systemName;
   }
 
   @Override
@@ -78,7 +80,7 @@ public class InMemorySystemAdmin implements SystemAdmin {
    */
   @Override
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
-    return inMemoryManager.getSystemStreamMetadata(streamNames);
+    return inMemoryManager.getSystemStreamMetadata(systemName, streamNames);
   }
 
   /**
index d534ca9..bb41a4e 100644 (file)
@@ -47,7 +47,7 @@ public class InMemorySystemFactory implements SystemFactory {
 
   @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
-    return new InMemorySystemAdmin(getOrDefaultInMemoryManagerByTestId(config));
+    return new InMemorySystemAdmin(systemName, getOrDefaultInMemoryManagerByTestId(config));
   }
 
   private InMemoryManager getOrDefaultInMemoryManagerByTestId(Config config) {