SAMZA-1726: Isolate InMemorySystemFactory to run separately per job
authorsanil15 <sanil.jain15@gmail.com>
Fri, 22 Jun 2018 19:44:57 +0000 (12:44 -0700)
committerxiliu <xiliu@linkedin.com>
Fri, 22 Jun 2018 19:44:57 +0000 (12:44 -0700)
Tested by running the corresponding integration and unit tests

Author: sanil15 <sanil.jain15@gmail.com>

Reviewers: Xinyu Liu <xinyuliu.us@gmail.com>

Closes #532 from Sanil15/SAMZA-1726

samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/system/inmemory/InMemorySystemFactory.java

diff --git a/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java b/samza-core/src/main/java/org/apache/samza/config/InMemorySystemConfig.java
new file mode 100644 (file)
index 0000000..b46ac41
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+/**
+ * A convenience class for fetching configs related to the {@link org.apache.samza.system.inmemory.InMemorySystemFactory}
+ */
+public class InMemorySystemConfig extends MapConfig {
+  /**
+   * <p>This Config determines Runtime behaviour of {@link org.apache.samza.system.inmemory.InMemorySystemFactory} </p>
+   * <p>
+   * If {@code INMEMORY_SCOPE} key is configured with a non null value in the configs, it creates an isolated
+   * InMemorySystem identified by the value of {@code INMEMORY_SCOPE} in {@link org.apache.samza.system.inmemory.InMemorySystemFactory}
+   * for the app while runtime. All the in memory streams (input/output/intermediate) are created using this isolated
+   * InMemorySystem.
+   * </p>
+   * <p>
+   * If {@code INMEMORY_SCOPE} key is not configured or is null for an app, it shares a default InMemorySystem
+   * identified by {@code DEFAULT_INMEMORY_SCOPE} value of {@code INMEMORY_SCOPE}
+   * This system is shared between all the applications missing {@code INMEMORY_SCOPE} key in their configs running
+   * in the same JVM using {@link org.apache.samza.system.inmemory.InMemorySystemFactory}
+   * </p>
+   */
+  public static final String INMEMORY_SCOPE = "inmemory.scope";
+
+  public static final String DEFAULT_INMEMORY_SCOPE = "SAME_DEFAULT_SCOPE";
+
+  public InMemorySystemConfig(Config config) {
+    super(config);
+  }
+
+  public String getInMemoryScope() {
+    return this.get(INMEMORY_SCOPE) == null ? DEFAULT_INMEMORY_SCOPE : this.get(INMEMORY_SCOPE);
+  }
+}
index f78b7f4..d534ca9 100644 (file)
@@ -19,7 +19,9 @@
 
 package org.apache.samza.system.inmemory;
 
+import java.util.concurrent.ConcurrentHashMap;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.InMemorySystemConfig;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemConsumer;
@@ -31,20 +33,25 @@ import org.apache.samza.system.SystemProducer;
  * Initial draft of in-memory {@link SystemFactory}. It is test only and not meant for production use right now.
  */
 public class InMemorySystemFactory implements SystemFactory {
-  private static final InMemoryManager MEMORY_MANAGER = new InMemoryManager();
+  private static final ConcurrentHashMap<String, InMemoryManager> IN_MEMORY_MANAGERS = new ConcurrentHashMap<>();
 
   @Override
   public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
-    return new InMemorySystemConsumer(MEMORY_MANAGER);
+    return new InMemorySystemConsumer(getOrDefaultInMemoryManagerByTestId(config));
   }
 
   @Override
   public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
-    return new InMemorySystemProducer(systemName, MEMORY_MANAGER);
+    return new InMemorySystemProducer(systemName, getOrDefaultInMemoryManagerByTestId(config));
   }
 
   @Override
   public SystemAdmin getAdmin(String systemName, Config config) {
-    return new InMemorySystemAdmin(MEMORY_MANAGER);
+    return new InMemorySystemAdmin(getOrDefaultInMemoryManagerByTestId(config));
+  }
+
+  private InMemoryManager getOrDefaultInMemoryManagerByTestId(Config config) {
+    InMemorySystemConfig inMemorySystemConfig = new InMemorySystemConfig(config);
+    return IN_MEMORY_MANAGERS.computeIfAbsent(inMemorySystemConfig.getInMemoryScope(), key -> new InMemoryManager());
   }
 }