SAMZA-1271; Guarantee predictable, deterministic order for operator initialization...
authorvjagadish1989 <jvenkatr@linkedin.com>
Mon, 5 Jun 2017 17:27:23 +0000 (10:27 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Mon, 5 Jun 2017 17:27:23 +0000 (10:27 -0700)
Currently, the order of initialization of operators in the Samza high level API is not deterministic. The non-determinism arises from two primary causes:

- No fixed order of iteration for all subscribed `OperatorSpec`s for a given `MessageStream`
- No fixed order of iteration for all the `OperatorImpl`s in the `OperatorImplGraph`

We aim to provide the following 2 guarantees in this patch:
For any 2 operators A, B in the graph, if B consumes the output of A:
- A is initialized before B is initialized
- A is finalized only after B is finalized

Author: vjagadish1989 <jvenkatr@linkedin.com>

Reviewers: Prateek Maheshwari<pmaheshw@linkedin.com>

Closes #211 from vjagadish1989/deterministic_order

samza-api/src/main/java/org/apache/samza/operators/functions/ClosableFunction.java
samza-api/src/main/java/org/apache/samza/operators/functions/InitableFunction.java
samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java [new file with mode: 0644]

index 2e73652..fe7137f 100644 (file)
@@ -26,6 +26,10 @@ import org.apache.samza.annotation.InterfaceStability;
  *
  * <p> Implement {@link #close()} to free resources used during the execution of the function, clean up state etc.
  *
+ * <p> Order of finalization: {@link ClosableFunction}s are invoked in the reverse topological order of operators in the
+ * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
+ * from operator A, then operator B is guaranteed to be closed before operator A.
+ *
  */
 @InterfaceStability.Unstable
 public interface ClosableFunction {
index 4f9fad7..b08c6cd 100644 (file)
@@ -25,6 +25,11 @@ import org.apache.samza.task.TaskContext;
 
 /**
  * A function that can be initialized before execution.
+ *
+ * <p> Order of initialization: {@link InitableFunction}s are invoked in the topological order of operators in the
+ * {@link org.apache.samza.operators.StreamGraph}. For any two operators A and B in the graph, if operator B consumes results
+ * from operator A, then operator A is guaranteed to be initialized before operator B.
+ *
  */
 @InterfaceStability.Unstable
 public interface InitableFunction {
index 0c84e90..9912f95 100644 (file)
@@ -41,7 +41,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.function.Function;
@@ -61,8 +61,10 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
 
   /**
    * The set of operators that consume the messages in this {@link MessageStream}
+   *
+   * Use a LinkedHashSet since we need deterministic ordering in initializing/closing operators.
    */
-  private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+  private final Set<OperatorSpec> registeredOperatorSpecs = new LinkedHashSet<>();
 
   /**
    * Default constructor
index 1f1d282..fcce5eb 100644 (file)
@@ -31,8 +31,8 @@ import org.apache.samza.system.StreamSpec;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -51,8 +51,9 @@ public class StreamGraphImpl implements StreamGraph {
    */
   private int opId = 0;
 
-  private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
-  private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
+  // Using LHM for deterministic order in initializing and closing operators.
+  private final Map<StreamSpec, InputStreamInternal> inStreams = new LinkedHashMap<>();
+  private final Map<StreamSpec, OutputStreamInternal> outStreams = new LinkedHashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
index 78a6d1e..e99b3ee 100644 (file)
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.operators.impl;
 
+import com.google.common.collect.Lists;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.StreamGraphImpl;
@@ -31,9 +32,12 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 
 
@@ -47,8 +51,10 @@ public class OperatorImplGraph {
    * A mapping from {@link OperatorSpec}s to their {@link OperatorImpl}s in this graph. Used to avoid creating
    * multiple {@link OperatorImpl}s for an {@link OperatorSpec}, e.g., when it's reached from different
    * input {@link MessageStreamImpl}s.
+   *
+   * Using LHM for deterministic ordering in initializing and closing operators.
    */
-  private final Map<OperatorSpec, OperatorImpl> operatorImpls = new HashMap<>();
+  private final Map<OperatorSpec, OperatorImpl> operatorImpls = new LinkedHashMap<>();
 
   /**
    * A mapping from input {@link SystemStream}s to their {@link OperatorImpl} sub-DAG in this graph.
@@ -99,13 +105,10 @@ public class OperatorImplGraph {
     return Collections.unmodifiableCollection(this.rootOperators.values());
   }
 
-  /**
-   * Get all {@link OperatorImpl}s for the graph.
-   *
-   * @return  an unmodifiable view of all {@link OperatorImpl}s for the graph
-   */
-  public Collection<OperatorImpl> getAllOperators() {
-    return Collections.unmodifiableCollection(this.operatorImpls.values());
+  public void close() {
+    List<OperatorImpl> initializationOrder = new ArrayList<>(operatorImpls.values());
+    List<OperatorImpl> finalizationOrder = Lists.reverse(initializationOrder);
+    finalizationOrder.forEach(OperatorImpl::close);
   }
 
   /**
index 0b93bbe..66e2c58 100644 (file)
@@ -71,6 +71,11 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         mapFn.init(config, context);
       }
+
+      @Override
+      public void close() {
+        mapFn.close();
+      }
     }, nextStream, OperatorSpec.OpCode.MAP, opId);
   }
 
@@ -101,6 +106,12 @@ public class OperatorSpecs {
       public void init(Config config, TaskContext context) {
         filterFn.init(config, context);
       }
+
+      @Override
+      public void close() {
+        filterFn.close();
+      }
+
     }, nextStream, OperatorSpec.OpCode.FILTER, opId);
   }
 
index a5f3f85..50ae775 100644 (file)
@@ -22,7 +22,6 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.ContextManager;
 import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.impl.OperatorImpl;
 import org.apache.samza.operators.impl.OperatorImplGraph;
 import org.apache.samza.operators.impl.RootOperatorImpl;
 import org.apache.samza.operators.stream.InputStreamInternal;
@@ -32,7 +31,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.util.Clock;
 import org.apache.samza.util.SystemClock;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -142,8 +140,6 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
     if (this.contextManager != null) {
       this.contextManager.close();
     }
-
-    Collection<OperatorImpl> allOperators = operatorImplGraph.getAllOperators();
-    allOperators.forEach(OperatorImpl::close);
+    operatorImplGraph.close();
   }
 }
index 666bbb8..9d95217 100644 (file)
  */
 package org.apache.samza.operators;
 
+import junit.framework.Assert;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.data.MessageType;
 import org.apache.samza.operators.data.TestInputMessageEnvelope;
 import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.stream.InputStreamInternal;
 import org.apache.samza.operators.stream.InputStreamInternalImpl;
 import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
 import org.apache.samza.operators.stream.OutputStreamInternalImpl;
@@ -30,6 +32,7 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 
@@ -193,4 +196,30 @@ public class TestStreamGraphImpl {
     assertEquals(graph.getNextOpId(), 1);
   }
 
+  @Test
+  public void testGetInputStreamPreservesInsertionOrder() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+
+    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
+    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+    StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
+    when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
+
+    graph.getInputStream("test-stream-1", (k, v) -> v);
+    graph.getInputStream("test-stream-2", (k, v) -> v);
+    graph.getInputStream("test-stream-3", (k, v) -> v);
+
+    ArrayList<InputStreamInternal> inputMessageStreams = new ArrayList<>(graph.getInputStreams().values());
+    Assert.assertEquals(inputMessageStreams.size(), 3);
+    Assert.assertEquals(inputMessageStreams.get(0).getStreamSpec(), testStreamSpec1);
+    Assert.assertEquals(inputMessageStreams.get(1).getStreamSpec(), testStreamSpec2);
+    Assert.assertEquals(inputMessageStreams.get(2).getStreamSpec(), testStreamSpec3);
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
new file mode 100644 (file)
index 0000000..67e5b46
--- /dev/null
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.SystemClock;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static junit.framework.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestOperatorImplGraph {
+
+  @Test
+  public void testOperatorGraphInitAndClose() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
+    when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+    StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
+    when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+    Config mockConfig = mock(Config.class);
+    TaskContext mockContext = createMockContext();
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    List<String> initializationOrder = new ArrayList<>();
+    List<String> finalizationOrder = new ArrayList<>();
+
+    MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", (k, v) -> v);
+    MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", (k, v) -> v);
+
+    inputStream1.map(createMapFunction("1", initializationOrder, finalizationOrder))
+               .map(createMapFunction("2", initializationOrder, finalizationOrder));
+
+    inputStream2.map(createMapFunction("3", initializationOrder, finalizationOrder))
+        .map(createMapFunction("4", initializationOrder, finalizationOrder));
+
+    OperatorImplGraph implGraph = new OperatorImplGraph(SystemClock.instance());
+
+    // Assert that initialization occurs in topological order.
+    implGraph.init(graph, mockConfig, mockContext);
+    assertEquals(initializationOrder.get(0), "1");
+    assertEquals(initializationOrder.get(1), "2");
+    assertEquals(initializationOrder.get(2), "3");
+    assertEquals(initializationOrder.get(3), "4");
+
+    // Assert that finalization occurs in reverse topological order.
+    implGraph.close();
+    assertEquals(finalizationOrder.get(0), "4");
+    assertEquals(finalizationOrder.get(1), "3");
+    assertEquals(finalizationOrder.get(2), "2");
+    assertEquals(finalizationOrder.get(3), "1");
+  }
+
+  private TaskContext createMockContext() {
+    TaskContext mockContext = mock(TaskContext.class);
+    when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+    return mockContext;
+  }
+
+  /**
+   * Creates an identity map function that appends to the provided lists when init/close is invoked.
+   */
+  private MapFunction<Object, Object> createMapFunction(String id, List<String> initializationOrder, List<String> finalizationOrder) {
+    return new MapFunction<Object, Object>() {
+      @Override
+      public void init(Config config, TaskContext context) {
+        initializationOrder.add(id);
+      }
+
+      @Override
+      public void close() {
+        finalizationOrder.add(id);
+      }
+
+      @Override
+      public Object apply(Object message) {
+        return message;
+      }
+    };
+  }
+}
+