SAMZA-2097: Allow table parts configuration generation
authorWei Song <wsong@linkedin.com>
Wed, 6 Feb 2019 01:19:56 +0000 (17:19 -0800)
committerWei Song <wsong@linkedin.com>
Wed, 6 Feb 2019 01:19:56 +0000 (17:19 -0800)
Sometimes a table part may use some 3rd party libraries that require certain configuration, these can't be handled from within a table descriptor itself. We should allow table parts to participate in the configuration generation in toConfig() method.

Author: Wei Song <wsong@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes #906 from weisong44/SAMZA-2097

samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/remote/TableRateLimiter.java
samza-api/src/main/java/org/apache/samza/table/remote/TableReadFunction.java
samza-api/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
samza-api/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java
samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java

index 4b15c47..f8effdc 100644 (file)
@@ -26,6 +26,8 @@ import java.util.Map;
 
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.table.remote.TablePart;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -225,13 +227,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
 
     Map<String, String> tableConfig = new HashMap<>(super.toConfig(jobConfig));
 
-    // Serialize and store reader/writer functions
-    addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
-
-    if (writeFn != null) {
-      addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
-    }
-
+    // Handle rate limiter
     if (!tagCreditsMap.isEmpty()) {
       RateLimiter defaultRateLimiter;
       try {
@@ -243,29 +239,52 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
         throw new SamzaException("Failed to create default rate limiter", ex);
       }
       addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", defaultRateLimiter), tableConfig);
+      if (defaultRateLimiter instanceof TablePart) {
+        addTablePartConfig((TablePart) defaultRateLimiter, jobConfig, tableConfig);
+      }
     } else if (rateLimiter != null) {
       addTableConfig(RATE_LIMITER, SerdeUtils.serialize("rate limiter", rateLimiter), tableConfig);
+      if (rateLimiter instanceof TablePart) {
+        addTablePartConfig((TablePart) rateLimiter, jobConfig, tableConfig);
+      }
     }
 
-    // Serialize the readCredit functions
+    // Handle readCredit functions
     if (readCreditFn != null) {
       addTableConfig(READ_CREDIT_FN, SerdeUtils.serialize("read credit function", readCreditFn), tableConfig);
+      addTablePartConfig(readCreditFn, jobConfig, tableConfig);
     }
-    // Serialize the writeCredit functions
+
+    // Handle writeCredit functions
     if (writeCreditFn != null) {
       addTableConfig(WRITE_CREDIT_FN, SerdeUtils.serialize("write credit function", writeCreditFn), tableConfig);
+      addTablePartConfig(writeCreditFn, jobConfig, tableConfig);
     }
 
+    // Handle read retry policy
     if (readRetryPolicy != null) {
       addTableConfig(READ_RETRY_POLICY, SerdeUtils.serialize("read retry policy", readRetryPolicy), tableConfig);
+      addTablePartConfig(readRetryPolicy, jobConfig, tableConfig);
     }
 
+    // Handle write retry policy
     if (writeRetryPolicy != null) {
       addTableConfig(WRITE_RETRY_POLICY, SerdeUtils.serialize("write retry policy", writeRetryPolicy), tableConfig);
+      addTablePartConfig(writeRetryPolicy, jobConfig, tableConfig);
     }
 
     addTableConfig(ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize), tableConfig);
 
+    // Handle table reader function
+    addTableConfig(READ_FN, SerdeUtils.serialize("read function", readFn), tableConfig);
+    addTablePartConfig(readFn, jobConfig, tableConfig);
+
+    // Handle table write function
+    if (writeFn != null) {
+      addTableConfig(WRITE_FN, SerdeUtils.serialize("write function", writeFn), tableConfig);
+      addTablePartConfig(writeFn, jobConfig, tableConfig);
+    }
+
     return Collections.unmodifiableMap(tableConfig);
   }
 
@@ -278,4 +297,15 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
     Preconditions.checkArgument(asyncCallbackPoolSize <= 20,
         "too many threads for async callback executor.");
   }
+
+  /**
+   * Helper method to add table part config items to table configuration
+   * @param tablePart table part
+   * @param jobConfig job configuration
+   * @param tableConfig table configuration
+   */
+  protected void addTablePartConfig(TablePart tablePart, Config jobConfig, Map<String, String> tableConfig) {
+    tableConfig.putAll(tablePart.toConfig(jobConfig, new MapConfig(tableConfig)));
+  }
+
 }
diff --git a/samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java b/samza-api/src/main/java/org/apache/samza/table/remote/TablePart.java
new file mode 100644 (file)
index 0000000..d54f248
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.table.remote;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+
+
+/**
+ * A building block of a remote table
+ */
+@InterfaceStability.Unstable
+public interface TablePart {
+
+  /**
+   * Generate configuration for this building block. There are situations where this object
+   * or its external dependencies may require certain configuration, this method allows
+   * generation and inclusion of them in the job configuration.
+   *
+   * @param jobConfig job configuration
+   * @param tableConfig so far generated configuration for this table
+   * @return configuration for this build block
+   */
+  default Map<String, String> toConfig(Config jobConfig, Config tableConfig) {
+    return Collections.emptyMap();
+  }
+
+}
index 0758dd2..f52bcbe 100644 (file)
@@ -58,7 +58,7 @@ public class TableRateLimiter<K, V> {
    * @param <V> the type of the value
    */
   @InterfaceStability.Unstable
-  public interface CreditFunction<K, V> extends Serializable {
+  public interface CreditFunction<K, V> extends TablePart, Serializable {
     /**
      * Get the number of credits required for the {@code key} and {@code value} pair.
      * @param key table key
index d54f83d..04fc918 100644 (file)
@@ -46,7 +46,7 @@ import com.google.common.collect.Iterables;
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface TableReadFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+public interface TableReadFunction<K, V> extends TablePart, InitableFunction, ClosableFunction, Serializable {
   /**
    * Fetch single table record for a specified {@code key}. This method must be thread-safe.
    * The default implementation calls getAsync and blocks on the completion afterwards.
index 1e3dc4c..3b06664 100644 (file)
@@ -47,7 +47,7 @@ import com.google.common.collect.Iterables;
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
+public interface TableWriteFunction<K, V> extends TablePart, InitableFunction, ClosableFunction, Serializable {
   /**
    * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
    * The default implementation calls putAsync and blocks on the completion afterwards.
index 162eb07..078bce6 100644 (file)
@@ -24,6 +24,7 @@ import java.time.Duration;
 import java.util.function.Predicate;
 
 import com.google.common.base.Preconditions;
+import org.apache.samza.table.remote.TablePart;
 
 
 /**
@@ -35,7 +36,8 @@ import com.google.common.base.Preconditions;
  *
  * Retry libraries can implement a subset or all features as described by this common policy.
  */
-public class TableRetryPolicy implements Serializable {
+public class TableRetryPolicy implements TablePart, Serializable {
+
   enum BackoffType {
     /**
      * No backoff in between two retry attempts.
index 4703752..ab87b89 100644 (file)
@@ -20,6 +20,8 @@
 package org.apache.samza.table.remote.descriptors;
 
 import com.google.common.collect.ImmutableMap;
+
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.TaskName;
@@ -41,6 +43,7 @@ import org.apache.samza.table.ratelimit.AsyncRateLimitedTable;
 import org.apache.samza.table.remote.AsyncRemoteTable;
 import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
+import org.apache.samza.table.remote.TablePart;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -57,12 +60,21 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import static org.mockito.Mockito.*;
+import static org.apache.samza.table.remote.TableRateLimiter.CreditFunction;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.withSettings;
 
 
 public class TestRemoteTableDescriptor {
-  private void doTestSerialize(RateLimiter rateLimiter, TableRateLimiter.CreditFunction readCredFn,
-      TableRateLimiter.CreditFunction writeCredFn) {
+
+  private void doTestSerialize(RateLimiter rateLimiter, CreditFunction readCredFn, CreditFunction writeCredFn) {
     String tableId = "1";
     RemoteTableDescriptor desc = new RemoteTableDescriptor(tableId)
         .withReadFunction(createMockTableReadFunction())
@@ -132,6 +144,65 @@ public class TestRemoteTableDescriptor {
     desc.toConfig(new MapConfig());
   }
 
+  @Test
+  public void testTablePartToConfigDefault() {
+    TableReadFunction readFn = createMockTableReadFunction();
+    when(readFn.toConfig(any(), any())).thenReturn(createConfigPair(1));
+    Map<String, String> tableConfig = new RemoteTableDescriptor("1")
+        .withReadFunction(readFn)
+        .withReadRateLimit(100)
+        .toConfig(new MapConfig());
+    verify(readFn, times(1)).toConfig(any(), any());
+    Assert.assertEquals("v1", tableConfig.get("k1"));
+  }
+
+  @Test
+  public void testTablePartToConfig() {
+
+    int keys = 0;
+
+    TableReadFunction readFn = createMockTableReadFunction();
+    when(readFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    TableWriteFunction writeFn = createMockTableWriteFunction();
+    when(writeFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    RateLimiter rateLimiter = createMockRateLimiter();
+    when(((TablePart) rateLimiter).toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    CreditFunction readCredFn = createMockCreditFunction();
+    when(readCredFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    CreditFunction writeCredFn = createMockCreditFunction();
+    when(writeCredFn.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    TableRetryPolicy readRetryPolicy = createMockTableRetryPolicy();
+    when(readRetryPolicy.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    TableRetryPolicy writeRetryPolicy = createMockTableRetryPolicy();
+    when(writeRetryPolicy.toConfig(any(), any())).thenReturn(createConfigPair(keys++));
+
+    Map<String, String> tableConfig = new RemoteTableDescriptor("1")
+        .withReadFunction(readFn)
+        .withWriteFunction(writeFn)
+        .withRateLimiter(rateLimiter, readCredFn, writeCredFn)
+        .withReadRetryPolicy(readRetryPolicy)
+        .withWriteRetryPolicy(writeRetryPolicy)
+        .toConfig(new MapConfig());
+
+    verify(readFn, times(1)).toConfig(any(), any());
+    verify(writeFn, times(1)).toConfig(any(), any());
+    verify((TablePart) rateLimiter, times(1)).toConfig(any(), any());
+    verify(readCredFn, times(1)).toConfig(any(), any());
+    verify(writeCredFn, times(1)).toConfig(any(), any());
+    verify(readRetryPolicy, times(1)).toConfig(any(), any());
+    verify(writeRetryPolicy, times(1)).toConfig(any(), any());
+
+    for (int n = 0; n < keys; n++) {
+      Assert.assertEquals("v" + n, tableConfig.get("k" + n));
+    }
+  }
+
   private Context createMockContext(TableDescriptor tableDescriptor) {
     Context context = mock(Context.class);
 
@@ -161,13 +232,14 @@ public class TestRemoteTableDescriptor {
     when(jobModel.getContainers()).thenReturn(ImmutableMap.of(containerId, containerModel));
 
     JobContext jobContext = mock(JobContext.class);
-    when(jobContext.getConfig()).thenReturn(new MapConfig(tableDescriptor.toConfig(new MapConfig())));
+    Config jobConfig = new MapConfig(tableDescriptor.toConfig(new MapConfig()));
+    when(jobContext.getConfig()).thenReturn(jobConfig);
     when(context.getJobContext()).thenReturn(jobContext);
 
     return context;
   }
 
-  static class CountingCreditFunction<K, V> implements TableRateLimiter.CreditFunction<K, V> {
+  static class CountingCreditFunction<K, V> implements CreditFunction<K, V> {
     int numCalls = 0;
     @Override
     public int getCredits(K key, V value) {
@@ -270,7 +342,11 @@ public class TestRemoteTableDescriptor {
   }
 
   private RateLimiter createMockRateLimiter() {
-    return mock(RateLimiter.class, withSettings().serializable());
+    return mock(RateLimiter.class, withSettings().serializable().extraInterfaces(TablePart.class));
+  }
+
+  private CreditFunction createMockCreditFunction() {
+    return mock(CreditFunction.class, withSettings().serializable());
   }
 
   private TableReadFunction createMockTableReadFunction() {
@@ -281,6 +357,16 @@ public class TestRemoteTableDescriptor {
     return mock(TableWriteFunction.class, withSettings().serializable());
   }
 
+  private TableRetryPolicy createMockTableRetryPolicy() {
+    return mock(TableRetryPolicy.class, withSettings().serializable());
+  }
+
+  private Map<String, String> createConfigPair(int n) {
+    Map<String, String> config = new HashMap<>();
+    config.put("k" + n, "v" + n);
+    return config;
+  }
+
   private void assertExists(String key, String tableId, Map<String, String> config) {
     String realKey = JavaTableConfig.buildKey(tableId, key);
     Assert.assertTrue(config.containsKey(realKey));