SAMZA-1439: Address late review feedback from SAMZA-1434
authorJacob Maes <jmaes@linkedin.com>
Wed, 4 Oct 2017 21:02:26 +0000 (14:02 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 4 Oct 2017 21:02:26 +0000 (14:02 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes #313 from jmakes/samza-1439

samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java
samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java

diff --git a/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java b/samza-api/src/test/java/org/apache/samza/system/TestStreamSpec.java
new file mode 100644 (file)
index 0000000..8974877
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestStreamSpec {
+  @Test
+  public void testBasicConstructor() {
+    StreamSpec streamSpec = new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", 1);
+
+    assertEquals("dummyId", streamSpec.getId());
+    assertEquals("dummyPhysicalName", streamSpec.getPhysicalName());
+    assertEquals("dummySystemName", streamSpec.getSystemName());
+    assertEquals(1, streamSpec.getPartitionCount());
+
+    // SystemStream should use the physical name, not the streamId.
+    SystemStream systemStream = new SystemStream("dummySystemName", "dummyPhysicalName");
+    assertEquals(systemStream, streamSpec.toSystemStream());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidPartitionCount() {
+    new StreamSpec("dummyId", "dummyPhysicalName", "dummySystemName", -1);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidStreamId() {
+    new StreamSpec("dummy.Id", "dummyPhysicalName", "dummySystemName", 0);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidSystemName() {
+    new StreamSpec("dummyId", "dummyPhysicalName", "dummy.System.Name", 0);
+  }
+}
index 998ea1e..468aab9 100644 (file)
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 public class ExecutionPlanner {
   private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class);
 
-  private static final int MAX_INFERRED_PARTITIONS = 256;
+  static final int MAX_INFERRED_PARTITIONS = 256;
 
   private final Config config;
   private final StreamManager streamManager;
@@ -255,10 +255,17 @@ public class ExecutionPlanner {
     if (partitions < 0) {
       // use the following simple algo to figure out the partitions
       // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions))
-      // partition will be further bounded by MAX_INFERRED_PARTITIONS. This is important when running in hadoop.
+      // partition will be further bounded by MAX_INFERRED_PARTITIONS.
+      // This is important when running in hadoop where an HDFS input can have lots of files (partitions).
       int maxInPartitions = maxPartition(jobGraph.getSources());
       int maxOutPartitions = maxPartition(jobGraph.getSinks());
-      partitions = Math.min(Math.max(maxInPartitions, maxOutPartitions), MAX_INFERRED_PARTITIONS);
+      partitions = Math.max(maxInPartitions, maxOutPartitions);
+
+      if (partitions > MAX_INFERRED_PARTITIONS) {
+        partitions = MAX_INFERRED_PARTITIONS;
+        log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.",
+            partitions, MAX_INFERRED_PARTITIONS));
+      }
     }
     for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) {
       if (edge.getPartitionCount() <= 0) {
index 792fde5..62d85f1 100644 (file)
@@ -82,8 +82,7 @@ public class StreamEdge {
   }
 
   SystemStream getSystemStream() {
-    StreamSpec spec = getStreamSpec();
-    return new SystemStream(spec.getSystemName(), spec.getPhysicalName());
+    return getStreamSpec().toSystemStream();
   }
 
   String getFormattedSystemStream() {
index eb5ca7b..50b0a13 100644 (file)
@@ -67,6 +67,7 @@ public class TestExecutionPlanner {
   private StreamSpec input1;
   private StreamSpec input2;
   private StreamSpec input3;
+  private StreamSpec input4;
   private StreamSpec output1;
   private StreamSpec output2;
 
@@ -194,6 +195,7 @@ public class TestExecutionPlanner {
     input1 = new StreamSpec("input1", "input1", "system1");
     input2 = new StreamSpec("input2", "input2", "system2");
     input3 = new StreamSpec("input3", "input3", "system2");
+    input4 = new StreamSpec("input4", "input4", "system1");
 
     output1 = new StreamSpec("output1", "output1", "system1");
     output2 = new StreamSpec("output2", "output2", "system2");
@@ -202,6 +204,7 @@ public class TestExecutionPlanner {
     Map<String, Integer> system1Map = new HashMap<>();
     system1Map.put("input1", 64);
     system1Map.put("output1", 8);
+    system1Map.put("input4", ExecutionPlanner.MAX_INFERRED_PARTITIONS * 2);
     Map<String, Integer> system2Map = new HashMap<>();
     system2Map.put("input2", 16);
     system2Map.put("input3", 32);
@@ -218,6 +221,7 @@ public class TestExecutionPlanner {
     when(runner.getStreamSpec("input1")).thenReturn(input1);
     when(runner.getStreamSpec("input2")).thenReturn(input2);
     when(runner.getStreamSpec("input3")).thenReturn(input3);
+    when(runner.getStreamSpec("input4")).thenReturn(input4);
     when(runner.getStreamSpec("output1")).thenReturn(output1);
     when(runner.getStreamSpec("output2")).thenReturn(output2);
 
@@ -316,10 +320,10 @@ public class TestExecutionPlanner {
     StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
     ExecutionPlan plan = planner.plan(streamGraph);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
-    assertEquals(jobConfigs.size(), 1);
+    assertEquals(1, jobConfigs.size());
 
     // GCD of 8, 16, 1600 and 252 is 4
-    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
   }
 
   @Test
@@ -333,10 +337,10 @@ public class TestExecutionPlanner {
     StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
     ExecutionPlan plan = planner.plan(streamGraph);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
-    assertEquals(jobConfigs.size(), 1);
+    assertEquals(1, jobConfigs.size());
 
     // GCD of 8, 16, 1600 and 252 is 4
-    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "4");
+    assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
   }
 
 
@@ -350,7 +354,7 @@ public class TestExecutionPlanner {
     StreamGraphImpl streamGraph = createSimpleGraph();
     ExecutionPlan plan = planner.plan(streamGraph);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
-    assertEquals(jobConfigs.size(), 1);
+    assertEquals(1, jobConfigs.size());
     assertFalse(jobConfigs.get(0).containsKey(TaskConfig.WINDOW_MS()));
   }
 
@@ -365,8 +369,8 @@ public class TestExecutionPlanner {
     StreamGraphImpl streamGraph = createSimpleGraph();
     ExecutionPlan plan = planner.plan(streamGraph);
     List<JobConfig> jobConfigs = plan.getJobConfigs();
-    assertEquals(jobConfigs.size(), 1);
-    assertEquals(jobConfigs.get(0).get(TaskConfig.WINDOW_MS()), "2000");
+    assertEquals(1, jobConfigs.size());
+    assertEquals("2000", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
   }
 
   @Test
@@ -377,7 +381,7 @@ public class TestExecutionPlanner {
 
     // the partitions should be the same as input1
     jobGraph.getIntermediateStreams().forEach(edge -> {
-        assertTrue(edge.getPartitionCount() == 64); // max of input1 and output1
+        assertEquals(64, edge.getPartitionCount()); // max of input1 and output1
       });
   }
 
@@ -394,9 +398,27 @@ public class TestExecutionPlanner {
     edge.setPartitionCount(16);
     edges.add(edge);
 
-    assertEquals(ExecutionPlanner.maxPartition(edges), 32);
+    assertEquals(32, ExecutionPlanner.maxPartition(edges));
 
     edges = Collections.emptyList();
-    assertEquals(ExecutionPlanner.maxPartition(edges), StreamEdge.PARTITIONS_UNKNOWN);
+    assertEquals(StreamEdge.PARTITIONS_UNKNOWN, ExecutionPlanner.maxPartition(edges));
+  }
+
+  @Test
+  public void testMaxPartitionLimit() throws Exception {
+    int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
+
+    ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
+    StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
+
+    MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input4");
+    OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+    input1.partitionBy(m -> m.key, m -> m.value).map(kv -> kv).sendTo(output1);
+    JobGraph jobGraph = (JobGraph) planner.plan(streamGraph);
+
+    // the partitions should be the same as input1
+    jobGraph.getIntermediateStreams().forEach(edge -> {
+        assertEquals(partitionLimit, edge.getPartitionCount()); // max of input1 and output1
+      });
   }
 }
index 21afcb9..396b06b 100644 (file)
@@ -107,11 +107,11 @@ public class TestHdfsSystemConsumer {
     // verify events read from consumer
     int eventsReceived = 0;
     int totalEvents = (NUM_EVENTS + 1) * NUM_FILES; // one "End of Stream" event in the end
-    int remainingRetires = 100;
+    int remainingRetries = 100;
     Map<SystemStreamPartition, List<IncomingMessageEnvelope>> overallResults = new HashMap<>();
-    while (eventsReceived < totalEvents && remainingRetires > 0) {
-      remainingRetires--;
-      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 200);
+    while (eventsReceived < totalEvents && remainingRetries > 0) {
+      remainingRetries--;
+      Map<SystemStreamPartition, List<IncomingMessageEnvelope>> result = systemConsumer.poll(systemStreamPartitionSet, 1000);
       for(SystemStreamPartition ssp : result.keySet()) {
         List<IncomingMessageEnvelope> messageEnvelopeList = result.get(ssp);
         overallResults.putIfAbsent(ssp, new ArrayList<>());
@@ -122,7 +122,7 @@ public class TestHdfsSystemConsumer {
         eventsReceived += messageEnvelopeList.size();
       }
     }
-    Assert.assertEquals(eventsReceived, totalEvents);
+    Assert.assertEquals("Did not receive all the events. Retry counter = " + remainingRetries, totalEvents, eventsReceived);
     Assert.assertEquals(NUM_FILES, overallResults.size());
     overallResults.values().forEach(messages -> {
       Assert.assertEquals(NUM_EVENTS + 1, messages.size());
index fd53a45..a49c022 100644 (file)
@@ -151,6 +151,10 @@ public class KafkaStreamSpec extends StreamSpec {
       Properties properties) {
     super(id, topicName, systemName, partitionCount, false, propertiesToMap(properties));
 
+    if (partitionCount < 1) {
+      throw new IllegalArgumentException("Parameter 'partitionCount' must be > 0");
+    }
+
     if (replicationFactor <= 0) {
       throw new IllegalArgumentException(
           String.format("Replication factor %d must be greater than 0.", replicationFactor));
index 5612704..1758bf0 100644 (file)
@@ -57,4 +57,9 @@ public class TestKafkaStreamSpec {
     assertNull(kafkaConfig.get("replication.factor"));
     assertEquals("4", kafkaConfig.get("segment.bytes"));
   }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testInvalidPartitionCount() {
+    new KafkaStreamSpec("dummyId","dummyPhysicalName", "dummySystemName", 0);
+  }
 }