SAMZA-1275: Kafka throws when users configure replication.factor for …
authorJacob Maes <jmaes@linkedin.com>
Tue, 9 May 2017 22:58:14 +0000 (15:58 -0700)
committerJacob Maes <jmaes@linkedin.com>
Tue, 9 May 2017 22:58:14 +0000 (15:58 -0700)
…Kafka default stream

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>, Xinyu Liu <xiliu@linkedin.com>

Closes #176 from jmakes/samza-1275

samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaStreamSpec.java
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java [new file with mode: 0644]

index 3255f70..8cf4923 100644 (file)
@@ -22,14 +22,19 @@ package org.apache.samza.system.kafka;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import kafka.log.LogConfig;
 import org.apache.samza.config.KafkaConfig;
 import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
  * Extends StreamSpec with the ability to easily get the topic replication factor.
  */
 public class KafkaStreamSpec extends StreamSpec {
+  private static Logger LOG = LoggerFactory.getLogger(KafkaStreamSpec.class);
+
   private static final int DEFAULT_REPLICATION_FACTOR = 2;
 
   /**
@@ -62,6 +67,30 @@ public class KafkaStreamSpec extends StreamSpec {
   }
 
   /**
+   * Filter out properties from the original config that are not supported by Kafka.
+   * For example, we allow users to set replication.factor as a property of the streams
+   * and then parse it out so we can pass it separately as Kafka requires. But Kafka
+   * will also throw if replication.factor is passed as a property on a new topic.
+   *
+   * @param originalConfig  The original config to filter
+   * @return                The filtered config
+   */
+  private static Map<String, String> filterUnsupportedProperties(Map<String, String> originalConfig) {
+    Map<String, String> filteredConfig = new HashMap<>();
+    for (Map.Entry<String, String> entry: originalConfig.entrySet()) {
+      // Kafka requires replication factor, but not as a property, so we have to filter it out.
+      if (!KafkaConfig.TOPIC_REPLICATION_FACTOR().equals(entry.getKey())) {
+        if (LogConfig.configNames().contains(entry.getKey())) {
+          filteredConfig.put(entry.getKey(), entry.getValue());
+        } else {
+          LOG.warn("Property '{}' is not a valid Kafka topic config. It will be ignored.");
+        }
+      }
+    }
+    return filteredConfig;
+  }
+
+  /**
    * Converts any StreamSpec to a KafkaStreamSpec.
    * If the original spec already is a KafkaStreamSpec, it is simply returned.
    *
@@ -81,7 +110,7 @@ public class KafkaStreamSpec extends StreamSpec {
                                 originalSpec.getSystemName(),
                                 originalSpec.getPartitionCount(),
                                 replicationFactor,
-                                mapToProperties(originalSpec.getConfig()));
+                                mapToProperties(filterUnsupportedProperties(originalSpec.getConfig())));
   }
 
   /**
@@ -109,7 +138,7 @@ public class KafkaStreamSpec extends StreamSpec {
    * @param systemName        The System name on which this stream will exist. Corresponds to a named implementation of the
    *                          Samza System abstraction. See {@link org.apache.samza.system.SystemFactory}
    *
-   * @param partitionCount    The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned.
+   * @param partitionCount    The number of partitions for the stream. A value of {@code 1} indicates unpartitioned.
    *
    * @param replicationFactor The number of topic replicas in the Kafka cluster for durability.
    *
diff --git a/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java b/samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaStreamSpec.java
new file mode 100644 (file)
index 0000000..69345a3
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.system.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * See also the general StreamSpec tests in {@link org.apache.samza.runtime.TestAbstractApplicationRunner}
+ */
+public class TestKafkaStreamSpec {
+
+  @Test
+  public void testUnsupportedConfigStrippedFromProperties() {
+    StreamSpec original = new StreamSpec("dummyId","dummyPhysicalName", "dummySystemName", ImmutableMap.of("segment.bytes", "4", "replication.factor", "7"));
+
+    // First verify the original
+    assertEquals("7", original.get("replication.factor"));
+    assertEquals("4", original.get("segment.bytes"));
+
+    Map<String, String> config = original.getConfig();
+    assertEquals("7", config.get("replication.factor"));
+    assertEquals("4", config.get("segment.bytes"));
+
+
+    // Now verify the Kafka spec
+    KafkaStreamSpec spec = KafkaStreamSpec.fromSpec(original);
+    assertNull(spec.get("replication.factor"));
+    assertEquals("4", spec.get("segment.bytes"));
+
+    Properties kafkaProperties = spec.getProperties();
+    Map<String, String> kafkaConfig = spec.getConfig();
+    assertNull(kafkaProperties.get("replication.factor"));
+    assertEquals("4", kafkaProperties.get("segment.bytes"));
+
+    assertNull(kafkaConfig.get("replication.factor"));
+    assertEquals("4", kafkaConfig.get("segment.bytes"));
+  }
+}