SAMZA-2043: Consolidate ReadableTable and ReadWriteTable
authorWei Song <wsong@linkedin.com>
Mon, 17 Dec 2018 23:11:27 +0000 (15:11 -0800)
committerWei Song <wsong@linkedin.com>
Mon, 17 Dec 2018 23:11:27 +0000 (15:11 -0800)
So far we've not seen a lot of use in maintaining separate implementation for ReadableTable and ReadWriteTable, which adds quite a bit complexity. Hence consolidating them.

Author: Wei Song <wsong@linkedin.com>

Reviewers: Xinyu Liu <xiliu@linkedin.com>

Closes #861 from weisong44/SAMZA-2043

42 files changed:
samza-api/src/main/java/org/apache/samza/context/TaskContext.java
samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
samza-api/src/main/java/org/apache/samza/table/ReadableTable.java [deleted file]
samza-api/src/main/java/org/apache/samza/table/Table.java
samza-api/src/main/java/org/apache/samza/table/TableProvider.java
samza-api/src/main/java/org/apache/samza/table/descriptors/BaseTableDescriptor.java
samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
samza-api/src/main/java/org/apache/samza/table/descriptors/RemoteTableDescriptor.java
samza-api/src/main/java/org/apache/samza/table/utils/SerdeUtils.java
samza-api/src/test/java/org/apache/samza/table/remote/TestTableRateLimiter.java
samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
samza-core/src/main/java/org/apache/samza/table/BaseReadWriteTable.java [moved from samza-core/src/main/java/org/apache/samza/table/BaseReadableTable.java with 78% similarity]
samza-core/src/main/java/org/apache/samza/table/TableManager.java
samza-core/src/main/java/org/apache/samza/table/caching/CachingTable.java
samza-core/src/main/java/org/apache/samza/table/caching/CachingTableProvider.java
samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTableProvider.java
samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java [deleted file]
samza-core/src/main/java/org/apache/samza/table/remote/RemoteTable.java [moved from samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java with 50% similarity]
samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java
samza-core/src/main/java/org/apache/samza/table/utils/TableMetrics.java [moved from samza-core/src/main/java/org/apache/samza/table/utils/TableWriteMetrics.java with 73% similarity]
samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java [deleted file]
samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
samza-core/src/test/java/org/apache/samza/table/caching/TestCachingTable.java
samza-core/src/test/java/org/apache/samza/table/descriptors/TestLocalTableDescriptor.java
samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java [moved from samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteReadWriteTable.java with 91% similarity]
samza-core/src/test/java/org/apache/samza/table/remote/descriptors/TestRemoteTableDescriptor.java
samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java [deleted file]
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTable.java [moved from samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadWriteTable.java with 59% similarity]
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalTableProvider.java
samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableProvider.java
samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableRead.java [moved from samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadableTable.java with 92% similarity]
samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalTableWrite.java [moved from samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalReadWriteTable.java with 97% similarity]
samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableEndToEnd.java [moved from samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java with 97% similarity]
samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTableEndToEnd.java [moved from samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java with 90% similarity]

index cdf7404..8adfcea 100644 (file)
@@ -25,8 +25,6 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 
 
 /**
@@ -63,17 +61,15 @@ public interface TaskContext {
   KeyValueStore<?, ?> getStore(String storeName);
 
   /**
-   * Gets the {@link Table} corresponding to the {@code tableId} for this task.
+   * Gets the {@link ReadWriteTable} corresponding to the {@code tableId} for this task.
    *
-   * The returned table should be cast with the concrete type parameters based on the configured table serdes, and
-   * whether it is {@link ReadWriteTable} or {@link ReadableTable}. E.g., if using string key and integer value
-   * serde for a writable table, it should be cast to a {@code ReadWriteTable<String, Integer>}.
-   *
-   * @param tableId id of the {@link Table} to get
-   * @return the {@link Table} associated with {@code tableId} for this task
+   * @param tableId id of the {@link ReadWriteTable} to get
+   * @param <K> the type of the key in this table
+   * @param <V> the type of the value in this table
+   * @return the {@link ReadWriteTable} associated with {@code tableId} for this task
    * @throws IllegalArgumentException if there is no table associated with {@code tableId}
    */
-  Table<?> getTable(String tableId);
+  <K, V> ReadWriteTable<K, V> getTable(String tableId);
 
   /**
    * Gets the {@link CallbackScheduler} for this task, which can be used to schedule a callback to be executed
index 083a1b5..ffb87a4 100644 (file)
 package org.apache.samza.table;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
 
 /**
@@ -32,7 +34,51 @@ import org.apache.samza.storage.kv.Entry;
  * @param <V> the type of the value in this table
  */
 @InterfaceStability.Unstable
-public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+public interface ReadWriteTable<K, V> extends Table {
+
+  /**
+   * Initializes the table during container initialization.
+   * Guaranteed to be invoked as the first operation on the table.
+   * @param context {@link Context} corresponding to this table
+   */
+  default void init(Context context) {
+  }
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Asynchronously gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return completableFuture for the requested value
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  CompletableFuture<V> getAsync(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Asynchronously gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return completableFuture for the requested entries
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
 
   /**
    * Updates the mapping of the specified key-value pair;
@@ -114,4 +160,9 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
    * Flushes the underlying store of this table, if applicable.
    */
   void flush();
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
 }
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
deleted file mode 100644 (file)
index 6c88fd3..0000000
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.context.Context;
-import org.apache.samza.operators.KV;
-
-
-/**
- *
- * A table that supports get by one or more keys
- *
- * @param <K> the type of the record key in this table
- * @param <V> the type of the record value in this table
- */
-@InterfaceStability.Unstable
-public interface ReadableTable<K, V> extends Table<KV<K, V>> {
-  /**
-   * Initializes the table during container initialization.
-   * Guaranteed to be invoked as the first operation on the table.
-   * @param context {@link Context} corresponding to this table
-   */
-  default void init(Context context) {
-  }
-
-  /**
-   * Gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  V get(K key);
-
-  /**
-   * Asynchronously gets the value associated with the specified {@code key}.
-   *
-   * @param key the key with which the associated value is to be fetched.
-   * @return completableFuture for the requested value
-   * @throws NullPointerException if the specified {@code key} is {@code null}.
-   */
-  CompletableFuture<V> getAsync(K key);
-
-  /**
-   * Gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return a map of the keys that were found and their respective values.
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  Map<K, V> getAll(List<K> keys);
-
-  /**
-   * Asynchronously gets the values with which the specified {@code keys} are associated.
-   *
-   * @param keys the keys with which the associated values are to be fetched.
-   * @return completableFuture for the requested entries
-   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
-   */
-  CompletableFuture<Map<K, V>> getAllAsync(List<K> keys);
-
-  /**
-   * Close the table and release any resources acquired
-   */
-  void close();
-}
index 234d15b..c454012 100644 (file)
@@ -36,8 +36,6 @@ import org.apache.samza.task.InitableTask;
  * hybrid tables. For remote data sources, a {@code RemoteTable} provides optimized access with caching, rate-limiting,
  * and retry support.
  * <p>
- * Depending on the implementation, a {@link Table} can be a {@link ReadableTable} or a {@link ReadWriteTable}.
- * <p>
  * Use a {@link TableDescriptor} to specify the properties of a {@link Table}. For High Level API
  * {@link StreamApplication}s, use {@link StreamApplicationDescriptor#getTable} to obtain the {@link Table} instance for
  * the descriptor that can be used with the {@link MessageStream} operators like {@link MessageStream#sendTo(Table)}.
index 2dec989..36cad2e 100644 (file)
@@ -34,10 +34,10 @@ public interface TableProvider {
   void init(Context context);
 
   /**
-   * Get an instance of the table for read/write operations
+   * Get an instance of the {@link ReadWriteTable}
    * @return the underlying table
    */
-  Table getTable();
+  ReadWriteTable getTable();
 
   /**
    * Shutdown the underlying table
index 26c2ae3..52eca5f 100644 (file)
@@ -56,6 +56,7 @@ abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K,
    * @param value the value
    * @return this table descriptor instance
    */
+  @SuppressWarnings("unchecked")
   public D withConfig(String key, String value) {
     config.put(key, value);
     return (D) this;
index 1ebb580..1623710 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.samza.table.utils.SerdeUtils;
  * @param <V> the type of the value in this table
  * @param <D> the type of the concrete table descriptor
  */
+@SuppressWarnings("unchecked")
 abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<K, V, D>>
     extends BaseTableDescriptor<K, V, D> {
 
index 7286004..4b15c47 100644 (file)
@@ -235,6 +235,7 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
     if (!tagCreditsMap.isEmpty()) {
       RateLimiter defaultRateLimiter;
       try {
+        @SuppressWarnings("unchecked")
         Class<? extends RateLimiter> clazz = (Class<? extends RateLimiter>) Class.forName(DEFAULT_RATE_LIMITER_CLASS_NAME);
         Constructor<? extends RateLimiter> ctor = clazz.getConstructor(Map.class);
         defaultRateLimiter = ctor.newInstance(tagCreditsMap);
index a7b66e5..338baf4 100644 (file)
@@ -54,6 +54,7 @@ public final class SerdeUtils {
    * @return deserialized object instance
    * @param <T> type of the object
    */
+  @SuppressWarnings("unchecked")
   public static <T> T deserialize(String name, String strObject) {
     try {
       byte [] bytes = Base64.getDecoder().decode(strObject);
index ea9acbd..3235d5a 100644 (file)
@@ -25,9 +25,9 @@ import java.util.Collections;
 import org.apache.samza.metrics.Timer;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.util.RateLimiter;
+import org.junit.Assert;
 import org.junit.Test;
 
-import junit.framework.Assert;
 
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyMap;
index ec52f8a..a29c2b3 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.scheduler.CallbackScheduler;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableManager;
 
 import java.util.HashMap;
@@ -83,7 +83,7 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public Table getTable(String tableId) {
+  public <K, V> ReadWriteTable<K, V> getTable(String tableId) {
     return this.tableManager.getTable(tableId);
   }
 
index 0d39c1b..6d84b17 100644 (file)
@@ -44,7 +44,7 @@ public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void>
 
   SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Context context) {
     this.sendToTableOpSpec = sendToTableOpSpec;
-    this.table = (ReadWriteTable) context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
+    this.table = context.getTaskContext().getTable(sendToTableOpSpec.getTableId());
   }
 
   @Override
index 96f07d1..e3fc266 100644 (file)
@@ -22,7 +22,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 
@@ -42,11 +42,11 @@ import java.util.Collections;
 class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> {
 
   private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
-  private final ReadableTable<K, ?> table;
+  private final ReadWriteTable<K, ?> table;
 
   StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec, Context context) {
     this.joinOpSpec = joinOpSpec;
-    this.table = (ReadableTable) context.getTaskContext().getTable(joinOpSpec.getTableId());
+    this.table = context.getTaskContext().getTable(joinOpSpec.getTableId());
   }
 
   @Override
@@ -21,27 +21,25 @@ package org.apache.samza.table;
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
-import org.apache.samza.table.utils.TableReadMetrics;
-import org.apache.samza.table.utils.TableWriteMetrics;
+import org.apache.samza.table.utils.TableMetrics;
 import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Base class for all readable tables
+ * Base class for a concrete table implementation
  *
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
+abstract public class BaseReadWriteTable<K, V> implements ReadWriteTable<K, V> {
 
   protected final Logger logger;
 
   protected final String tableId;
 
-  protected TableReadMetrics readMetrics;
-  protected TableWriteMetrics writeMetrics;
+  protected TableMetrics metrics;
 
   protected HighResolutionClock clock;
 
@@ -49,7 +47,7 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
    * Construct an instance
    * @param tableId Id of the table
    */
-  public BaseReadableTable(String tableId) {
+  public BaseReadWriteTable(String tableId) {
     Preconditions.checkArgument(tableId != null & !tableId.isEmpty(),
         String.format("Invalid table Id: %s", tableId));
     this.tableId = tableId;
@@ -62,11 +60,7 @@ abstract public class BaseReadableTable<K, V> implements ReadableTable<K, V> {
     clock = metricsConfig.getMetricsTimerEnabled()
         ? () -> System.nanoTime()
         : () -> 0L;
-
-    readMetrics = new TableReadMetrics(context, this, tableId);
-    if (this instanceof ReadWriteTable) {
-      writeMetrics = new TableWriteMetrics(context, this, tableId);
-    }
+    metrics = new TableMetrics(context, this, tableId);
   }
 
   public String getTableId() {
index d3ba771..5a3777e 100644 (file)
@@ -54,7 +54,7 @@ public class TableManager {
 
   static class TableCtx {
     private TableProvider tableProvider;
-    private Table table;
+    private ReadWriteTable table;
   }
 
   private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
@@ -110,7 +110,7 @@ public class TableManager {
    * @param tableId Id of the table
    * @return table instance
    */
-  public Table getTable(String tableId) {
+  public ReadWriteTable getTable(String tableId) {
     Preconditions.checkState(initialized, "TableManager has not been initialized.");
 
     TableCtx ctx = tableContexts.get(tableId);
index e63bf61..2fde79a 100644 (file)
@@ -23,9 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
 import java.util.ArrayList;
@@ -41,34 +40,32 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
 
 /**
- * A composite table incorporating a cache with a Samza table. The cache is
+ * A hybrid table incorporating a cache with a Samza table. The cache is
  * represented as a {@link ReadWriteTable}.
  *
- * The intented use case is to optimize the latency of accessing the actual table, eg.
+ * The intented use case is to optimize the latency of accessing the actual table, e.g.
  * remote tables, when eventual consistency between cache and table is acceptable.
  * The cache is expected to support TTL such that the values can be refreshed at some
  * point.
  *
- * If the actual table is read-write table, CachingTable supports both write-through
- * and write-around (writes bypassing cache) policies. For write-through policy, it
- * supports read-after-write semantics because the value is cached after written to
- * the table.
+ * {@link CachingTable} supports write-through and write-around (writes bypassing cache) policies.
+ * For write-through policy, it supports read-after-write semantics because the value is
+ * cached after written to the table.
  *
- * Note that there is no synchronization in CachingTable because it is impossible to
+ * Note that there is no synchronization in {@link CachingTable} because it is impossible to
  * implement a critical section between table read/write and cache update in the async
  * code paths without serializing all async operations for the same keys. Given stale
- * data is a presumed trade off for using a cache for table, it should be acceptable
- * for the data in table and cache are out-of-sync. Moreover, unsynchronized operations
- * in CachingTable also deliver higher performance when there is contention.
+ * data is a presumed trade-off for using a cache with table, it should be acceptable
+ * for the data in table and cache to be temporarily out-of-sync. Moreover, unsynchronized
+ * operations in {@link CachingTable} also deliver higher performance when there is contention.
  *
  * @param <K> type of the table key
  * @param <V> type of the table value
  */
-public class CachingTable<K, V> extends BaseReadableTable<K, V>
+public class CachingTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
-  private final ReadableTable<K, V> rdTable;
-  private final ReadWriteTable<K, V> rwTable;
+  private final ReadWriteTable<K, V> table;
   private final ReadWriteTable<K, V> cache;
   private final boolean isWriteAround;
 
@@ -76,10 +73,9 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   private AtomicLong hitCount = new AtomicLong();
   private AtomicLong missCount = new AtomicLong();
 
-  public CachingTable(String tableId, ReadableTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
+  public CachingTable(String tableId, ReadWriteTable<K, V> table, ReadWriteTable<K, V> cache, boolean isWriteAround) {
     super(tableId);
-    this.rdTable = table;
-    this.rwTable = table instanceof ReadWriteTable ? (ReadWriteTable) table : null;
+    this.table = table;
     this.cache = cache;
     this.isWriteAround = isWriteAround;
   }
@@ -114,16 +110,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public V get(K key) {
     try {
       return getAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<V> getAsync(K key) {
-    incCounter(readMetrics.numGets);
+    incCounter(metrics.numGets);
     V value = cache.get(key);
     if (value != null) {
       hitCount.incrementAndGet();
@@ -133,14 +127,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
     long startNs = clock.nanoTime();
     missCount.incrementAndGet();
 
-    return rdTable.getAsync(key).handle((result, e) -> {
+    return table.getAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get the record for " + key, e);
         } else {
           if (result != null) {
             cache.put(key, result);
           }
-          updateTimer(readMetrics.getNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getNs, clock.nanoTime() - startNs);
           return result;
         }
       });
@@ -150,16 +144,14 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public Map<K, V> getAll(List<K> keys) {
     try {
       return getAllAsync(keys).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    incCounter(readMetrics.numGetAlls);
+    incCounter(metrics.numGetAlls);
     // Make a copy of entries which might be immutable
     Map<K, V> getAllResult = new HashMap<>();
     List<K> missingKeys = lookupCache(keys, getAllResult);
@@ -169,7 +161,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
     }
 
     long startNs = clock.nanoTime();
-    return rdTable.getAllAsync(missingKeys).handle((records, e) -> {
+    return table.getAllAsync(missingKeys).handle((records, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to get records for " + keys, e);
         } else {
@@ -179,7 +171,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
                 .collect(Collectors.toList()));
             getAllResult.putAll(records);
           }
-          updateTimer(readMetrics.getAllNs, clock.nanoTime() - startNs);
+          updateTimer(metrics.getAllNs, clock.nanoTime() - startNs);
           return getAllResult;
         }
       });
@@ -189,20 +181,18 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void put(K key, V value) {
     try {
       putAsync(key, value).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAsync(K key, V value) {
-    incCounter(writeMetrics.numPuts);
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
+    incCounter(metrics.numPuts);
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
 
     long startNs = clock.nanoTime();
-    return rwTable.putAsync(key, value).handle((result, e) -> {
+    return table.putAsync(key, value).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException(String.format("Failed to put a record, key=%s, value=%s", key, value), e);
         } else if (!isWriteAround) {
@@ -212,7 +202,7 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
             cache.put(key, value);
           }
         }
-        updateTimer(writeMetrics.putNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -221,26 +211,24 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void putAll(List<Entry<K, V>> records) {
     try {
       putAllAsync(records).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    incCounter(writeMetrics.numPutAlls);
+    incCounter(metrics.numPutAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot write to a read-only table: " + rdTable);
-    return rwTable.putAllAsync(records).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot write to a read-only table: " + table);
+    return table.putAllAsync(records).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to put records " + records, e);
         } else if (!isWriteAround) {
           cache.putAll(records);
         }
 
-        updateTimer(writeMetrics.putAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.putAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -249,25 +237,23 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
   public void delete(K key) {
     try {
       deleteAsync(key).get();
-    } catch (InterruptedException e) {
-      throw new SamzaException(e);
     } catch (Exception e) {
-      throw (SamzaException) e.getCause();
+      throw new SamzaException(e);
     }
   }
 
   @Override
   public CompletableFuture<Void> deleteAsync(K key) {
-    incCounter(writeMetrics.numDeletes);
+    incCounter(metrics.numDeletes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
-    return rwTable.deleteAsync(key).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+    return table.deleteAsync(key).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + key, e);
         } else if (!isWriteAround) {
           cache.delete(key);
         }
-        updateTimer(writeMetrics.deleteNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteNs, clock.nanoTime() - startNs);
         return result;
       });
   }
@@ -283,33 +269,33 @@ public class CachingTable<K, V> extends BaseReadableTable<K, V>
 
   @Override
   public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    incCounter(writeMetrics.numDeleteAlls);
+    incCounter(metrics.numDeleteAlls);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot delete from a read-only table: " + rdTable);
-    return rwTable.deleteAllAsync(keys).handle((result, e) -> {
+    Preconditions.checkNotNull(table, "Cannot delete from a read-only table: " + table);
+    return table.deleteAllAsync(keys).handle((result, e) -> {
         if (e != null) {
           throw new SamzaException("Failed to delete the record for " + keys, e);
         } else if (!isWriteAround) {
           cache.deleteAll(keys);
         }
-        updateTimer(writeMetrics.deleteAllNs, clock.nanoTime() - startNs);
+        updateTimer(metrics.deleteAllNs, clock.nanoTime() - startNs);
         return result;
       });
   }
 
   @Override
   public synchronized void flush() {
-    incCounter(writeMetrics.numFlushes);
+    incCounter(metrics.numFlushes);
     long startNs = clock.nanoTime();
-    Preconditions.checkNotNull(rwTable, "Cannot flush a read-only table: " + rdTable);
-    rwTable.flush();
-    updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
+    Preconditions.checkNotNull(table, "Cannot flush a read-only table: " + table);
+    table.flush();
+    updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
   }
 
   @Override
   public void close() {
-    this.cache.close();
-    this.rdTable.close();
+    cache.close();
+    table.close();
   }
 
   double hitRate() {
index d835809..e533cf4 100644 (file)
@@ -25,8 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.BaseTableProvider;
@@ -47,18 +45,18 @@ public class CachingTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
 
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     String realTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.REAL_TABLE_ID);
-    ReadableTable table = (ReadableTable) this.context.getTaskContext().getTable(realTableId);
+    ReadWriteTable table = this.context.getTaskContext().getTable(realTableId);
 
     String cacheTableId = tableConfig.getForTable(tableId, CachingTableDescriptor.CACHE_TABLE_ID);
     ReadWriteTable cache;
 
     if (cacheTableId != null) {
-      cache = (ReadWriteTable) this.context.getTaskContext().getTable(cacheTableId);
+      cache = this.context.getTaskContext().getTable(cacheTableId);
     } else {
       cache = createDefaultCacheTable(realTableId, tableConfig);
       defaultCaches.add(cache);
index b75a0bc..d8a5d9c 100644 (file)
@@ -23,7 +23,7 @@ import com.google.common.cache.Cache;
 import org.apache.samza.SamzaException;
 import org.apache.samza.context.Context;
 import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.table.BaseReadWriteTable;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
@@ -40,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  * @param <K> type of the key in the cache
  * @param <V> type of the value in the cache
  */
-public class GuavaCacheTable<K, V> extends BaseReadableTable<K, V>
+public class GuavaCacheTable<K, V> extends BaseReadWriteTable<K, V>
     implements ReadWriteTable<K, V> {
 
   private final Cache<K, V> cache;
index e45719e..042d3c7 100644 (file)
@@ -24,7 +24,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
 import org.apache.samza.table.utils.SerdeUtils;
@@ -44,7 +44,7 @@ public class GuavaCacheTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
     Cache guavaCache = SerdeUtils.deserialize(GuavaCacheTableDescriptor.GUAVA_CACHE,
diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
deleted file mode 100644 (file)
index 80c2cac..0000000
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.table.remote;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import org.apache.samza.SamzaException;
-import org.apache.samza.config.MetricsConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.utils.TableMetricsUtil;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.stream.Collectors;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-
-/**
- * Remote store backed read writable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
-
-  protected final TableWriteFunction<K, V> writeFn;
-  protected final TableRateLimiter writeRateLimiter;
-
-  public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
-      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
-      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
-    super(tableId, readFn, readRateLimiter, tableExecutor, callbackExecutor);
-    Preconditions.checkNotNull(writeFn, "null write function");
-    this.writeFn = writeFn;
-    this.writeRateLimiter = writeRateLimiter;
-  }
-
-  @Override
-  public void init(Context context) {
-    super.init(context);
-    MetricsConfig metricsConfig = new MetricsConfig(context.getJobContext().getConfig());
-    if (metricsConfig.getMetricsTimerEnabled()) {
-      TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
-      writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
-    }
-  }
-
-  @Override
-  public void put(K key, V value) {
-    try {
-      putAsync(key, value).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAsync(K key, V value) {
-    Preconditions.checkNotNull(key);
-    if (value == null) {
-      return deleteAsync(key);
-    }
-
-    return execute(writeRateLimiter, key, value, writeFn::putAsync, writeMetrics.numPuts, writeMetrics.putNs)
-        .exceptionally(e -> {
-            throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
-          });
-  }
-
-  @Override
-  public void putAll(List<Entry<K, V>> entries) {
-    try {
-      putAllAsync(entries).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
-    Preconditions.checkNotNull(records);
-    if (records.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    List<K> deleteKeys = records.stream()
-        .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
-
-    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
-        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
-
-    List<Entry<K, V>> putRecords = records.stream()
-        .filter(e -> e.getValue() != null).collect(Collectors.toList());
-
-    // Return the combined future
-    return CompletableFuture.allOf(
-        deleteFuture,
-        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, writeMetrics.numPutAlls, writeMetrics.putAllNs))
-        .exceptionally(e -> {
-            String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
-            throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
-          });
-  }
-
-  @Override
-  public void delete(K key) {
-    try {
-      deleteAsync(key).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAsync(K key) {
-    Preconditions.checkNotNull(key);
-    return execute(writeRateLimiter, key, writeFn::deleteAsync, writeMetrics.numDeletes, writeMetrics.deleteNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void deleteAll(List<K> keys) {
-    try {
-      deleteAllAsync(keys).get();
-    } catch (Exception e) {
-      throw new SamzaException(e);
-    }
-  }
-
-  @Override
-  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
-    Preconditions.checkNotNull(keys);
-    if (keys.isEmpty()) {
-      return CompletableFuture.completedFuture(null);
-    }
-
-    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs)
-        .exceptionally(e -> {
-            throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
-          });
-  }
-
-  @Override
-  public void flush() {
-    try {
-      incCounter(writeMetrics.numFlushes);
-      long startNs = clock.nanoTime();
-      writeFn.flush();
-      updateTimer(writeMetrics.flushNs, clock.nanoTime() - startNs);
-    } catch (Exception e) {
-      String errMsg = "Failed to flush remote store";
-      logger.error(errMsg, e);
-      throw new SamzaException(errMsg, e);
-    }
-  }
-
-  @Override
-  public void close() {
-    writeFn.close();
-    super.close();
-  }
-
-  /**
-   * Execute an async request given a table record (key+value)
-   * @param rateLimiter helper for rate limiting
-   * @param key key of the table record
-   * @param value value of the table record
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
-      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
-            .thenCompose((r) -> method.apply(key, value))
-        : method.apply(key, value);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  /**
-   * Execute an async request given a collection of table records
-   * @param rateLimiter helper for rate limiting
-   * @param records list of records
-   * @param method method to be executed
-   * @param timer latency metric to be updated
-   * @return CompletableFuture of the operation
-   */
-  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
-      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
-      Counter counter, Timer timer) {
-    incCounter(counter);
-    final long startNs = clock.nanoTime();
-    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
-        ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
-            .thenCompose((r) -> method.apply(records))
-        : method.apply(records);
-    return completeExecution(ioFuture, startNs, timer);
-  }
-
-  @VisibleForTesting
-  public TableWriteFunction<K, V> getWriteFn() {
-    return writeFn;
-  }
-
-  @VisibleForTesting
-  public TableRateLimiter getWriteRateLimiter() {
-    return writeRateLimiter;
-  }
-}
@@ -21,32 +21,37 @@ package org.apache.samza.table.remote;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.context.Context;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.table.BaseReadWriteTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 
-import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
 import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
 
+
 /**
- * A Samza {@link org.apache.samza.table.Table} backed by a remote data-store or service.
+ * A Samza {@link ReadWriteTable} backed by a remote data-store or service.
  * <p>
  * Many stream-processing applications require to look-up data from remote data sources eg: databases,
  * web-services, RPC systems to process messages in the stream. Such access to adjunct datasets can be
- * naturally modeled as a join between the incoming stream and a {@link RemoteReadableTable}.
+ * naturally modeled as a join between the incoming stream and a table.
  * <p>
  * Example use-cases include:
  * <ul>
@@ -55,11 +60,9 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
  *  <li> A notifications-system that sends out emails may require a query to an external database to process its message. </li>
  * </ul>
  * <p>
- * A {@link RemoteReadableTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
+ * A {@link RemoteTable} is meant to be used with a {@link TableReadFunction} and a {@link TableWriteFunction}
  * which encapsulate the functionality of reading and writing data to the remote service. These provide a
- * pluggable means to specify I/O operations on the table. While the base implementation merely delegates to
- * these reader and writer functions, sub-classes of {@link RemoteReadableTable} may provide rich functionality like
- * caching or throttling on top of them.
+ * pluggable means to specify I/O operations on the table.
  *
  * For async IO methods, requests are dispatched by a single-threaded executor after invoking the rateLimiter.
  * Optionally, an executor can be specified for invoking the future callbacks which otherwise are
@@ -70,29 +73,37 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
+public class RemoteTable<K, V> extends BaseReadWriteTable<K, V>
+    implements ReadWriteTable<K, V> {
 
   protected final ExecutorService callbackExecutor;
   protected final ExecutorService tableExecutor;
   protected final TableReadFunction<K, V> readFn;
+  protected final TableWriteFunction<K, V> writeFn;
   protected final TableRateLimiter<K, V> readRateLimiter;
+  protected final TableRateLimiter writeRateLimiter;
 
   /**
-   * Construct a RemoteReadableTable instance
+   * Construct a RemoteTable instance
    * @param tableId table id
    * @param readFn {@link TableReadFunction} for read operations
-   * @param rateLimiter helper for rate limiting
+   * @param writeFn {@link TableWriteFunction} for read operations
+   * @param readRateLimiter helper for read rate limiting
+   * @param writeRateLimiter helper for write rate limiting
    * @param tableExecutor executor for issuing async requests
    * @param callbackExecutor executor for invoking async callbacks
    */
-  public RemoteReadableTable(String tableId, TableReadFunction<K, V> readFn,
-      TableRateLimiter<K, V> rateLimiter, ExecutorService tableExecutor, ExecutorService callbackExecutor) {
+  public RemoteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
+      TableRateLimiter<K, V> readRateLimiter, TableRateLimiter<K, V> writeRateLimiter,
+      ExecutorService tableExecutor, ExecutorService callbackExecutor) {
     super(tableId);
     Preconditions.checkNotNull(readFn, "null read function");
     this.readFn = readFn;
-    this.readRateLimiter = rateLimiter;
-    this.callbackExecutor = callbackExecutor;
+    this.writeFn = writeFn;
+    this.readRateLimiter = readRateLimiter;
+    this.writeRateLimiter = writeRateLimiter;
     this.tableExecutor = tableExecutor;
+    this.callbackExecutor = callbackExecutor;
   }
 
   @Override
@@ -102,6 +113,9 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
     if (metricsConfig.getMetricsTimerEnabled()) {
       TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, this, tableId);
       readRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("get-throttle-ns"));
+      if (writeRateLimiter != null) {
+        writeRateLimiter.setTimerMetric(tableMetricsUtil.newTimer("put-throttle-ns"));
+      }
     }
   }
 
@@ -117,13 +131,13 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
   @Override
   public CompletableFuture<V> getAsync(K key) {
     Preconditions.checkNotNull(key);
-    return execute(readRateLimiter, key, readFn::getAsync, readMetrics.numGets, readMetrics.getNs)
+    return execute(readRateLimiter, key, readFn::getAsync, metrics.numGets, metrics.getNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + key, e);
             }
             if (result == null) {
-              incCounter(readMetrics.numMissedLookups);
+              incCounter(metrics.numMissedLookups);
             }
             return result;
           });
@@ -144,21 +158,147 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
     if (keys.isEmpty()) {
       return CompletableFuture.completedFuture(Collections.EMPTY_MAP);
     }
-    return execute(readRateLimiter, keys, readFn::getAllAsync, readMetrics.numGetAlls, readMetrics.getAllNs)
+    return execute(readRateLimiter, keys, readFn::getAllAsync, metrics.numGetAlls, metrics.getAllNs)
         .handle((result, e) -> {
             if (e != null) {
               throw new SamzaException("Failed to get the records for " + keys, e);
             }
-            result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
+            result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
             return result;
           });
   }
 
+  @Override
+  public void put(K key, V value) {
+    try {
+      putAsync(key, value).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAsync(K key, V value) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    if (value == null) {
+      return deleteAsync(key);
+    }
+
+    return execute(writeRateLimiter, key, value, writeFn::putAsync, metrics.numPuts, metrics.putNs)
+        .exceptionally(e -> {
+            throw new SamzaException("Failed to put a record with key=" + key, (Throwable) e);
+          });
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    try {
+      putAllAsync(entries).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> putAllAsync(List<Entry<K, V>> records) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(records);
+    if (records.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    List<K> deleteKeys = records.stream()
+        .filter(e -> e.getValue() == null).map(Entry::getKey).collect(Collectors.toList());
+
+    CompletableFuture<Void> deleteFuture = deleteKeys.isEmpty()
+        ? CompletableFuture.completedFuture(null) : deleteAllAsync(deleteKeys);
+
+    List<Entry<K, V>> putRecords = records.stream()
+        .filter(e -> e.getValue() != null).collect(Collectors.toList());
+
+    // Return the combined future
+    return CompletableFuture.allOf(
+        deleteFuture,
+        executeRecords(writeRateLimiter, putRecords, writeFn::putAllAsync, metrics.numPutAlls, metrics.putAllNs))
+        .exceptionally(e -> {
+            String strKeys = records.stream().map(r -> r.getKey().toString()).collect(Collectors.joining(","));
+            throw new SamzaException(String.format("Failed to put records with keys=" + strKeys), e);
+          });
+  }
+
+  @Override
+  public void delete(K key) {
+    try {
+      deleteAsync(key).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAsync(K key) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(key);
+    return execute(writeRateLimiter, key, writeFn::deleteAsync, metrics.numDeletes, metrics.deleteNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete the record for " + key), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    try {
+      deleteAllAsync(keys).get();
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  @Override
+  public CompletableFuture<Void> deleteAllAsync(List<K> keys) {
+    Preconditions.checkNotNull(writeFn, "null write function");
+    Preconditions.checkNotNull(keys);
+    if (keys.isEmpty()) {
+      return CompletableFuture.completedFuture(null);
+    }
+
+    return execute(writeRateLimiter, keys, writeFn::deleteAllAsync, metrics.numDeleteAlls, metrics.deleteAllNs)
+        .exceptionally(e -> {
+            throw new SamzaException(String.format("Failed to delete records for " + keys), (Throwable) e);
+          });
+  }
+
+  @Override
+  public void flush() {
+    if (writeFn != null) {
+      try {
+        incCounter(metrics.numFlushes);
+        long startNs = clock.nanoTime();
+        writeFn.flush();
+        updateTimer(metrics.flushNs, clock.nanoTime() - startNs);
+      } catch (Exception e) {
+        String errMsg = "Failed to flush remote store";
+        logger.error(errMsg, e);
+        throw new SamzaException(errMsg, e);
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    readFn.close();
+    if (writeFn != null) {
+      writeFn.close();
+    }
+  }
+
   /**
    * Execute an async request given a table key
    * @param rateLimiter helper for rate limiting
    * @param key key of the table record
    * @param method method to be executed
+   * @param counter count metric to be updated
    * @param timer latency metric to be updated
    * @param <T> return type
    * @return CompletableFuture of the operation
@@ -169,8 +309,8 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
     final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
-            .thenCompose((r) -> method.apply(key))
+        .runAsync(() -> rateLimiter.throttle(key), tableExecutor)
+        .thenCompose((r) -> method.apply(key))
         : method.apply(key);
     return completeExecution(ioFuture, startNs, timer);
   }
@@ -180,6 +320,7 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
    * @param rateLimiter helper for rate limiting
    * @param keys collection of keys
    * @param method method to be executed
+   * @param counter count metric to be updated
    * @param timer latency metric to be updated
    * @param <T> return type
    * @return CompletableFuture of the operation
@@ -190,8 +331,8 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
     final long startNs = clock.nanoTime();
     CompletableFuture<T> ioFuture = rateLimiter.isRateLimited()
         ? CompletableFuture
-            .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
-            .thenCompose((r) -> method.apply(keys))
+        .runAsync(() -> rateLimiter.throttle(keys), tableExecutor)
+        .thenCompose((r) -> method.apply(keys))
         : method.apply(keys);
     return completeExecution(ioFuture, startNs, timer);
   }
@@ -219,9 +360,48 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
     return ioFuture;
   }
 
-  @Override
-  public void close() {
-    readFn.close();
+  /**
+   * Execute an async request given a table record (key+value)
+   * @param rateLimiter helper for rate limiting
+   * @param key key of the table record
+   * @param value value of the table record
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> execute(TableRateLimiter<K, V> rateLimiter,
+      K key, V value, BiFunction<K, V, CompletableFuture<Void>> method, Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttle(key, value), tableExecutor)
+            .thenCompose((r) -> method.apply(key, value))
+        : method.apply(key, value);
+    return completeExecution(ioFuture, startNs, timer);
+  }
+
+  /**
+   * Execute an async request given a collection of table records
+   * @param rateLimiter helper for rate limiting
+   * @param records list of records
+   * @param method method to be executed
+   * @param counter count metric to be updated
+   * @param timer latency metric to be updated
+   * @return CompletableFuture of the operation
+   */
+  protected CompletableFuture<Void> executeRecords(TableRateLimiter<K, V> rateLimiter,
+      Collection<Entry<K, V>> records, Function<Collection<Entry<K, V>>, CompletableFuture<Void>> method,
+      Counter counter, Timer timer) {
+    incCounter(counter);
+    final long startNs = clock.nanoTime();
+    CompletableFuture<Void> ioFuture = rateLimiter.isRateLimited()
+        ? CompletableFuture
+            .runAsync(() -> rateLimiter.throttleRecords(records), tableExecutor)
+            .thenCompose((r) -> method.apply(records))
+        : method.apply(records);
+    return completeExecution(ioFuture, startNs, timer);
   }
 
   @VisibleForTesting
@@ -243,4 +423,14 @@ public class RemoteReadableTable<K, V> extends BaseReadableTable<K, V> {
   public TableRateLimiter<K, V> getReadRateLimiter() {
     return readRateLimiter;
   }
+
+  @VisibleForTesting
+  public TableWriteFunction<K, V> getWriteFn() {
+    return writeFn;
+  }
+
+  @VisibleForTesting
+  public TableRateLimiter getWriteRateLimiter() {
+    return writeRateLimiter;
+  }
 }
index 93a0521..8b6bc1a 100644 (file)
@@ -21,8 +21,7 @@ package org.apache.samza.table.remote;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.config.JavaTableConfig;
-import org.apache.samza.context.Context;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
@@ -45,9 +44,7 @@ import java.util.concurrent.ScheduledExecutorService;
  */
 public class RemoteTableProvider extends BaseTableProvider {
 
-  private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
-
-  private boolean readOnly;
+  private final List<RemoteTable<?, ?>> tables = new ArrayList<>();
 
   /**
    * Map of tableId -> executor service for async table IO and callbacks. The same executors
@@ -63,21 +60,13 @@ public class RemoteTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public void init(Context context) {
-    super.init(context);
-    JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
-    this.readOnly = tableConfig.getForTable(tableId, RemoteTableDescriptor.WRITE_FN) == null;
-  }
-
-  @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
 
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
 
-    RemoteReadableTable table;
-
     JavaTableConfig tableConfig = new JavaTableConfig(context.getJobContext().getConfig());
 
+    // Read part
     TableReadFunction readFn = getReadFn(tableConfig);
     RateLimiter rateLimiter = deserializeObject(tableConfig, RemoteTableDescriptor.RATE_LIMITER);
     if (rateLimiter != null) {
@@ -86,34 +75,29 @@ public class RemoteTableProvider extends BaseTableProvider {
     TableRateLimiter.CreditFunction<?, ?> readCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.READ_CREDIT_FN);
     TableRateLimiter readRateLimiter = new TableRateLimiter(tableId, rateLimiter, readCreditFn, RemoteTableDescriptor.RL_READ_TAG);
 
-    TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
-    TableRateLimiter writeRateLimiter = null;
-
     TableRetryPolicy readRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.READ_RETRY_POLICY);
-    TableRetryPolicy writeRetryPolicy = null;
-
-    if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
-      retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
-          Thread thread = new Thread(runnable);
-          thread.setName("table-retry-executor");
-          thread.setDaemon(true);
-          return thread;
-        });
-    }
-
     if (readRetryPolicy != null) {
+      if (retryExecutor == null) {
+        retryExecutor = createRetryExecutor();
+      }
       readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
     }
 
-    TableWriteFunction writeFn = getWriteFn(tableConfig);
-
     boolean isRateLimited = readRateLimiter.isRateLimited();
-    if (!readOnly) {
-      writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
+
+    // Write part
+    TableWriteFunction writeFn = getWriteFn(tableConfig);
+    TableRateLimiter writeRateLimiter = null;
+    TableRetryPolicy writeRetryPolicy = null;
+    if (writeFn != null) {
+      TableRateLimiter.CreditFunction<?, ?> writeCreditFn = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_CREDIT_FN);
       writeRateLimiter = new TableRateLimiter(tableId, rateLimiter, writeCreditFn, RemoteTableDescriptor.RL_WRITE_TAG);
       isRateLimited |= writeRateLimiter.isRateLimited();
       writeRetryPolicy = deserializeObject(tableConfig, RemoteTableDescriptor.WRITE_RETRY_POLICY);
       if (writeRetryPolicy != null) {
+        if (retryExecutor == null) {
+          retryExecutor = createRetryExecutor();
+        }
         writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
       }
     }
@@ -140,13 +124,8 @@ public class RemoteTableProvider extends BaseTableProvider {
             }));
     }
 
-    if (readOnly) {
-      table = new RemoteReadableTable(tableId, readFn, readRateLimiter,
-          tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    } else {
-      table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter,
-          writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
-    }
+    RemoteTable table = new RemoteTable(tableId, readFn, writeFn, readRateLimiter,
+        writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
 
     TableMetricsUtil metricsUtil = new TableMetricsUtil(this.context, table, tableId);
     if (readRetryPolicy != null) {
@@ -192,5 +171,14 @@ public class RemoteTableProvider extends BaseTableProvider {
     }
     return writeFn;
   }
+
+  private ScheduledExecutorService createRetryExecutor() {
+    return Executors.newSingleThreadScheduledExecutor(runnable -> {
+        Thread thread = new Thread(runnable);
+        thread.setName("table-retry-executor");
+        thread.setDaemon(true);
+        return thread;
+      });
+  }
 }
 
@@ -24,8 +24,18 @@ import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 
 
-public class TableWriteMetrics {
+/**
+ * Utility class that contains the default set of read metrics.
+ */
+public class TableMetrics {
 
+  // Read metrics
+  public final Timer getNs;
+  public final Timer getAllNs;
+  public final Counter numGets;
+  public final Counter numGetAlls;
+  public final Counter numMissedLookups;
+  // Write metrics
   public final Counter numPuts;
   public final Timer putNs;
   public final Counter numPutAlls;
@@ -38,14 +48,21 @@ public class TableWriteMetrics {
   public final Timer flushNs;
 
   /**
-   * Utility class that contains the default set of write metrics.
+   * Constructor based on container and task container context
    *
    * @param context {@link Context} for this task
    * @param table underlying table
    * @param tableId table Id
    */
-  public TableWriteMetrics(Context context, Table table, String tableId) {
+  public TableMetrics(Context context, Table table, String tableId) {
     TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
+    // Read metrics
+    numGets = tableMetricsUtil.newCounter("num-gets");
+    getNs = tableMetricsUtil.newTimer("get-ns");
+    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
+    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
+    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
+    // Write metrics
     numPuts = tableMetricsUtil.newCounter("num-puts");
     putNs = tableMetricsUtil.newTimer("put-ns");
     numPutAlls = tableMetricsUtil.newCounter("num-putAlls");
diff --git a/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java b/samza-core/src/main/java/org/apache/samza/table/utils/TableReadMetrics.java
deleted file mode 100644 (file)
index e77fcfd..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.table.utils;
-
-import org.apache.samza.context.Context;
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.Table;
-
-
-/**
- * Utility class that contains the default set of read metrics.
- */
-public class TableReadMetrics {
-
-  public final Timer getNs;
-  public final Timer getAllNs;
-  public final Counter numGets;
-  public final Counter numGetAlls;
-  public final Counter numMissedLookups;
-
-  /**
-   * Constructor based on container and task container context
-   *
-   * @param context {@link Context} for this task
-   * @param table underlying table
-   * @param tableId table Id
-   */
-  public TableReadMetrics(Context context, Table table, String tableId) {
-    TableMetricsUtil tableMetricsUtil = new TableMetricsUtil(context, table, tableId);
-    numGets = tableMetricsUtil.newCounter("num-gets");
-    getNs = tableMetricsUtil.newTimer("get-ns");
-    numGetAlls = tableMetricsUtil.newCounter("num-getAlls");
-    getAllNs = tableMetricsUtil.newTimer("getAll-ns");
-    numMissedLookups = tableMetricsUtil.newCounter("num-missed-lookups");
-  }
-
-}
index 4112c8b..8fd161b 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
-import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.TaskCoordinator;
 import org.junit.Test;
@@ -71,7 +71,7 @@ public class TestStreamTableJoinOperatorImpl {
             return record.getKey();
           }
         });
-    ReadableTable table = mock(ReadableTable.class);
+    ReadWriteTable table = mock(ReadWriteTable.class);
     when(table.get("1")).thenReturn("r1");
     when(table.get("2")).thenReturn(null);
     Context context = new MockContext();
index a3b1963..e60b6ff 100644 (file)
@@ -49,12 +49,12 @@ public class TestTableManager {
 
   public static class DummyTableProviderFactory implements TableProviderFactory {
 
-    static ReadableTable table;
+    static ReadWriteTable table;
     static TableProvider tableProvider;
 
     @Override
     public TableProvider getTableProvider(String tableId) {
-      table = mock(ReadableTable.class);
+      table = mock(ReadWriteTable.class);
       tableProvider = mock(TableProvider.class);
       when(tableProvider.getTable()).thenReturn(table);
       return tableProvider;
index 5a19767..c304bfd 100644 (file)
@@ -35,11 +35,10 @@ import org.apache.samza.table.descriptors.BaseTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.caching.guava.GuavaCacheTable;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
@@ -138,11 +137,11 @@ public class TestCachingTable {
     return Pair.of(cacheTable, cacheStore);
   }
 
-  private void initTables(ReadableTable ... tables) {
+  private void initTables(ReadWriteTable ... tables) {
     initTables(false, tables);
   }
 
-  private void initTables(boolean isTimerMetricsDisabled, ReadableTable ... tables) {
+  private void initTables(boolean isTimerMetricsDisabled, ReadWriteTable ... tables) {
     Map<String, String> config = new HashMap<>();
     if (isTimerMetricsDisabled) {
       config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -242,7 +241,7 @@ public class TestCachingTable {
 
   @Test
   public void testNonexistentKeyInTable() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture(null)).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = getMockCache().getLeft();
     CachingTable<String, String> cachingTable = new CachingTable<>("myTable", table, cache, false);
@@ -255,7 +254,7 @@ public class TestCachingTable {
 
   @Test
   public void testKeyEviction() {
-    ReadableTable<String, String> table = mock(ReadableTable.class);
+    ReadWriteTable<String, String> table = mock(ReadWriteTable.class);
     doReturn(CompletableFuture.completedFuture("3")).when(table).getAsync(any());
     ReadWriteTable<String, String> cache = mock(ReadWriteTable.class);
 
@@ -283,7 +282,7 @@ public class TestCachingTable {
     TableRateLimiter<String, String> rateLimitHelper = mock(TableRateLimiter.class);
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId + "-remote", readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 
@@ -402,7 +401,7 @@ public class TestCachingTable {
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAsync(any(), any());
     doReturn(CompletableFuture.completedFuture(null)).when(writeFn).deleteAsync(any());
 
-    final RemoteReadWriteTable<String, String> remoteTable = new RemoteReadWriteTable<>(
+    final RemoteTable<String, String> remoteTable = new RemoteTable<>(
         tableId, readFn, writeFn, rateLimitHelper, rateLimitHelper,
         Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor());
 
index 486295a..a764a8b 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.context.Context;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.table.TableProviderFactory;
 import org.junit.Test;
@@ -143,7 +143,7 @@ public class TestLocalTableDescriptor {
     }
 
     @Override
-    public Table getTable() {
+    public ReadWriteTable getTable() {
       throw new SamzaException("Not implemented");
     }
 
@@ -19,7 +19,6 @@
 
 package org.apache.samza.table.remote;
 
-import junit.framework.Assert;
 import org.apache.samza.context.Context;
 import org.apache.samza.context.MockContext;
 import org.apache.samza.metrics.Counter;
@@ -30,7 +29,10 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
+
+import org.junit.Assert;
 import org.junit.Test;
+
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
@@ -53,7 +55,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 
-public class TestRemoteReadWriteTable {
+public class TestRemoteTable {
   private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor();
 
   public static Context getMockContext() {
@@ -66,14 +68,14 @@ public class TestRemoteReadWriteTable {
     return context;
   }
 
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
       TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn) {
     return getTable(tableId, readFn, writeFn, null);
   }
 
-  private <K, V, T extends RemoteReadableTable<K, V>> T getTable(String tableId,
+  private <K, V, T extends RemoteTable<K, V>> T getTable(String tableId,
       TableReadFunction<K, V> readFn, TableWriteFunction<K, V> writeFn, ExecutorService cbExecutor) {
-    RemoteReadableTable<K, V> table;
+    RemoteTable<K, V> table;
 
     TableRateLimiter<K, V> readRateLimiter = mock(TableRateLimiter.class);
     TableRateLimiter<K, V> writeRateLimiter = mock(TableRateLimiter.class);
@@ -82,11 +84,7 @@ public class TestRemoteReadWriteTable {
 
     ExecutorService tableExecutor = Executors.newSingleThreadExecutor();
 
-    if (writeFn == null) {
-      table = new RemoteReadableTable<K, V>(tableId, readFn, readRateLimiter, tableExecutor, cbExecutor);
-    } else {
-      table = new RemoteReadWriteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
-    }
+    table = new RemoteTable<K, V>(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor);
 
     Context context = getMockContext();
 
@@ -119,7 +117,7 @@ public class TestRemoteReadWriteTable {
       TableRetryPolicy policy = new TableRetryPolicy();
       readFn = new RetriableReadFunction<>(policy, readFn, schedExec);
     }
-    RemoteReadableTable<String, String> table = getTable(tableId, readFn, null);
+    RemoteTable<String, String> table = getTable(tableId, readFn, null);
     Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get());
     verify(table.readRateLimiter, times(1)).throttle(anyString());
   }
@@ -153,8 +151,8 @@ public class TestRemoteReadWriteTable {
     doReturn(CompletableFuture.completedFuture("bar1")).when(readFn1).getAsync(anyString());
     doReturn(CompletableFuture.completedFuture("bar2")).when(readFn1).getAsync(anyString());
 
-    RemoteReadableTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
-    RemoteReadableTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
+    RemoteTable<String, String> table1 = getTable("testGetMultipleTables-1", readFn1, null);
+    RemoteTable<String, String> table2 = getTable("testGetMultipleTables-2", readFn2, null);
 
     CompletableFuture<String> future1 = table1.getAsync("foo1");
     CompletableFuture<String> future2 = table2.getAsync("foo2");
@@ -195,7 +193,7 @@ public class TestRemoteReadWriteTable {
       }
       writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec);
     }
-    RemoteReadWriteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
+    RemoteTable<String, String> table = getTable(tableId, mock(TableReadFunction.class), writeFn);
     if (sync) {
       table.put("foo", isDelete ? null : "bar");
     } else {
@@ -249,7 +247,7 @@ public class TestRemoteReadWriteTable {
 
   private void doTestDelete(boolean sync, boolean error) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDelete-" + sync + error,
+    RemoteTable<String, String> table = getTable("testDelete-" + sync + error,
         mock(TableReadFunction.class), writeFn);
     CompletableFuture<Void> future;
     if (error) {
@@ -302,7 +300,7 @@ public class TestRemoteReadWriteTable {
     }
     // Sync is backed by async so needs to mock the async method
     doReturn(future).when(readFn).getAllAsync(any());
-    RemoteReadableTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
+    RemoteTable<String, String> table = getTable("testGetAll-" + sync + error + partial, readFn, null);
     Assert.assertEquals(res, sync ? table.getAll(Arrays.asList("foo1", "foo2"))
         : table.getAllAsync(Arrays.asList("foo1", "foo2")).get());
     verify(table.readRateLimiter, times(1)).throttle(anyCollection());
@@ -331,7 +329,7 @@ public class TestRemoteReadWriteTable {
 
   public void doTestPutAll(boolean sync, boolean error, boolean hasDelete) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
+    RemoteTable<String, String> table = getTable("testPutAll-" + sync + error + hasDelete,
         mock(TableReadFunction.class), writeFn);
     CompletableFuture<Void> future;
     if (error) {
@@ -394,7 +392,7 @@ public class TestRemoteReadWriteTable {
 
   public void doTestDeleteAll(boolean sync, boolean error) throws Exception {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
+    RemoteTable<String, String> table = getTable("testDeleteAll-" + sync + error,
         mock(TableReadFunction.class), writeFn);
     CompletableFuture<Void> future;
     if (error) {
@@ -435,7 +433,7 @@ public class TestRemoteReadWriteTable {
   @Test
   public void testFlush() {
     TableWriteFunction<String, String> writeFn = mock(TableWriteFunction.class);
-    RemoteReadWriteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
+    RemoteTable<String, String> table = getTable("testFlush", mock(TableReadFunction.class), writeFn);
     table.flush();
     verify(writeFn, times(1)).flush();
   }
@@ -445,7 +443,7 @@ public class TestRemoteReadWriteTable {
     TableReadFunction<String, String> readFn = mock(TableReadFunction.class);
     // Sync is backed by async so needs to mock the async method
     doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString());
-    RemoteReadableTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
+    RemoteTable<String, String> table = getTable("testGetWithCallbackExecutor", readFn, null,
         Executors.newSingleThreadExecutor());
     Thread testThread = Thread.currentThread();
 
index 3d8e36f..907242f 100644 (file)
@@ -36,17 +36,17 @@ import org.apache.samza.metrics.Timer;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.remote.RemoteTableProvider;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-
 import org.apache.samza.table.retry.RetriableReadFunction;
 import org.apache.samza.table.retry.RetriableWriteFunction;
 import org.apache.samza.table.retry.TableRetryPolicy;
 import org.apache.samza.util.EmbeddedTaggedRateLimiter;
 import org.apache.samza.util.RateLimiter;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -207,8 +207,8 @@ public class TestRemoteTableDescriptor {
     RemoteTableProvider provider = new RemoteTableProvider(desc.getTableId());
     provider.init(createMockContext(desc));
     Table table = provider.getTable();
-    Assert.assertTrue(table instanceof RemoteReadWriteTable);
-    RemoteReadWriteTable rwTable = (RemoteReadWriteTable) table;
+    Assert.assertTrue(table instanceof RemoteTable);
+    RemoteTable rwTable = (RemoteTable) table;
     if (numRateLimitOps > 0) {
       Assert.assertTrue(!rlGets || rwTable.getReadRateLimiter() != null);
       Assert.assertTrue(!rlPuts || rwTable.getWriteRateLimiter() != null);
index 5116cab..050ea55 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.remote.TableReadFunction;
 import org.apache.samza.table.remote.TableWriteFunction;
-import org.apache.samza.table.remote.TestRemoteReadWriteTable;
+import org.apache.samza.table.remote.TestRemoteTable;
 import org.apache.samza.table.utils.TableMetricsUtil;
 import org.junit.Test;
 
@@ -54,7 +54,7 @@ public class TestRetriableTableFunctions {
 
   public TableMetricsUtil getMetricsUtil(String tableId) {
     Table table = mock(Table.class);
-    Context context = TestRemoteReadWriteTable.getMockContext();
+    Context context = TestRemoteTable.getMockContext();
     return new TableMetricsUtil(context, table, tableId);
   }
 
index ddb79ba..fc9ce76 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.inmemory;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -28,7 +28,9 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 
 public class TestInMemoryTableDescriptor {
index 62fb3da..319fb0f 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.storage.kv.descriptors;
 
 import java.util.Map;
-import junit.framework.Assert;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.MapConfig;
@@ -30,7 +30,9 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerde;
 import org.apache.samza.storage.kv.LocalTableProviderFactory;
 import org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory;
+
 import org.junit.Test;
+import org.junit.Assert;
 
 public class TestRocksDbTableDescriptor {
 
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalReadableTable.java
deleted file mode 100644 (file)
index 29ddb15..0000000
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.storage.kv;
-
-import com.google.common.base.Preconditions;
-
-import com.google.common.base.Supplier;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.samza.metrics.Counter;
-import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.BaseReadableTable;
-
-import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
-import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
-
-/**
- * A store backed readable table
- *
- * @param <K> the type of the key in this table
- * @param <V> the type of the value in this table
- */
-public class LocalReadableTable<K, V> extends BaseReadableTable<K, V> {
-
-  protected final KeyValueStore<K, V> kvStore;
-
-  /**
-   * Constructs an instance of {@link LocalReadableTable}
-   * @param tableId the table Id
-   * @param kvStore the backing store
-   */
-  public LocalReadableTable(String tableId, KeyValueStore<K, V> kvStore) {
-    super(tableId);
-    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
-    this.kvStore = kvStore;
-  }
-
-  @Override
-  public V get(K key) {
-    V result = instrument(readMetrics.numGets, readMetrics.getNs, () -> kvStore.get(key));
-    if (result == null) {
-      incCounter(readMetrics.numMissedLookups);
-    }
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<V> getAsync(K key) {
-    CompletableFuture<V> future = new CompletableFuture();
-    try {
-      future.complete(get(key));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public Map<K, V> getAll(List<K> keys) {
-    Map<K, V> result = instrument(readMetrics.numGetAlls, readMetrics.getAllNs, () -> kvStore.getAll(keys));
-    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(readMetrics.numMissedLookups));
-    return result;
-  }
-
-  @Override
-  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
-    CompletableFuture<Map<K, V>> future = new CompletableFuture();
-    try {
-      future.complete(getAll(keys));
-    } catch (Exception e) {
-      future.completeExceptionally(e);
-    }
-    return future;
-  }
-
-  @Override
-  public void close() {
-    // The KV store is not closed here as it may still be needed by downstream operators,
-    // it will be closed by the SamzaContainer
-  }
-
-  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
-    incCounter(counter);
-    long startNs = clock.nanoTime();
-    T result = func.get();
-    updateTimer(timer, clock.nanoTime() - startNs);
-    return result;
-  }
-}
  */
 package org.apache.samza.storage.kv;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.table.BaseReadWriteTable;
 
 import static org.apache.samza.table.utils.TableMetricsUtil.incCounter;
 import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
@@ -35,22 +39,63 @@ import static org.apache.samza.table.utils.TableMetricsUtil.updateTimer;
  * @param <K> the type of the key in this table
  * @param <V> the type of the value in this table
  */
-public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
-    implements ReadWriteTable<K, V> {
+public class LocalTable<K, V> extends BaseReadWriteTable<K, V> {
+
+  protected final KeyValueStore<K, V> kvStore;
 
   /**
-   * Constructs an instance of {@link LocalReadWriteTable}
+   * Constructs an instance of {@link LocalTable}
    * @param tableId the table Id
    * @param kvStore the backing store
    */
-  public LocalReadWriteTable(String tableId, KeyValueStore kvStore) {
-    super(tableId, kvStore);
+  public LocalTable(String tableId, KeyValueStore kvStore) {
+    super(tableId);
+    Preconditions.checkNotNull(kvStore, "null KeyValueStore");
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public V get(K key) {
+    V result = instrument(metrics.numGets, metrics.getNs, () -> kvStore.get(key));
+    if (result == null) {
+      incCounter(metrics.numMissedLookups);
+    }
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<V> getAsync(K key) {
+    CompletableFuture<V> future = new CompletableFuture();
+    try {
+      future.complete(get(key));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    Map<K, V> result = instrument(metrics.numGetAlls, metrics.getAllNs, () -> kvStore.getAll(keys));
+    result.values().stream().filter(Objects::isNull).forEach(v -> incCounter(metrics.numMissedLookups));
+    return result;
+  }
+
+  @Override
+  public CompletableFuture<Map<K, V>> getAllAsync(List<K> keys) {
+    CompletableFuture<Map<K, V>> future = new CompletableFuture();
+    try {
+      future.complete(getAll(keys));
+    } catch (Exception e) {
+      future.completeExceptionally(e);
+    }
+    return future;
   }
 
   @Override
   public void put(K key, V value) {
     if (value != null) {
-      instrument(writeMetrics.numPuts, writeMetrics.putNs, () -> kvStore.put(key, value));
+      instrument(metrics.numPuts, metrics.putNs, () -> kvStore.put(key, value));
     } else {
       delete(key);
     }
@@ -81,7 +126,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
       });
 
     if (!toPut.isEmpty()) {
-      instrument(writeMetrics.numPutAlls, writeMetrics.putAllNs, () -> kvStore.putAll(toPut));
+      instrument(metrics.numPutAlls, metrics.putAllNs, () -> kvStore.putAll(toPut));
     }
 
     if (!toDelete.isEmpty()) {
@@ -103,7 +148,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void delete(K key) {
-    instrument(writeMetrics.numDeletes, writeMetrics.deleteNs, () -> kvStore.delete(key));
+    instrument(metrics.numDeletes, metrics.deleteNs, () -> kvStore.delete(key));
   }
 
   @Override
@@ -120,7 +165,7 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void deleteAll(List<K> keys) {
-    instrument(writeMetrics.numDeleteAlls, writeMetrics.deleteAllNs, () -> kvStore.deleteAll(keys));
+    instrument(metrics.numDeleteAlls, metrics.deleteAllNs, () -> kvStore.deleteAll(keys));
   }
 
   @Override
@@ -137,7 +182,21 @@ public class LocalReadWriteTable<K, V> extends LocalReadableTable<K, V>
 
   @Override
   public void flush() {
-    instrument(writeMetrics.numFlushes, writeMetrics.flushNs, () -> kvStore.flush());
+    instrument(metrics.numFlushes, metrics.flushNs, () -> kvStore.flush());
+  }
+
+  @Override
+  public void close() {
+    // The KV store is not closed here as it may still be needed by downstream operators,
+    // it will be closed by the SamzaContainer
+  }
+
+  private <T> T instrument(Counter counter, Timer timer, Supplier<T> func) {
+    incCounter(counter);
+    long startNs = clock.nanoTime();
+    T result = func.get();
+    updateTimer(timer, clock.nanoTime() - startNs);
+    return result;
   }
 
   private interface Func0 {
index 3be61d0..5099a7e 100644 (file)
@@ -20,8 +20,7 @@ package org.apache.samza.storage.kv;
 
 import com.google.common.base.Preconditions;
 import org.apache.samza.context.Context;
-import org.apache.samza.table.ReadableTable;
-import org.apache.samza.table.Table;
+import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.BaseTableProvider;
 
 /**
@@ -53,10 +52,10 @@ public class LocalTableProvider extends BaseTableProvider {
   }
 
   @Override
-  public Table getTable() {
+  public ReadWriteTable getTable() {
     Preconditions.checkNotNull(context, String.format("Table %s not initialized", tableId));
     Preconditions.checkNotNull(kvStore, "Store not initialized for table " + tableId);
-    ReadableTable table = new LocalReadWriteTable(tableId, kvStore);
+    ReadWriteTable table = new LocalTable(tableId, kvStore);
     table.init(this.context);
     return table;
   }
index 5367931..263ab56 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.storage.kv;
 
-import junit.framework.Assert;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ContainerContext;
 import org.apache.samza.context.Context;
@@ -28,6 +27,7 @@ import org.apache.samza.context.TaskContext;
 import org.apache.samza.table.TableProvider;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.Test;
+import org.junit.Assert;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.*;
@@ -32,15 +32,15 @@ import org.apache.samza.context.JobContext;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.table.ReadableTable;
 
+import org.apache.samza.table.ReadWriteTable;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.mockito.Mockito.*;
 
-public class TestLocalReadableTable {
+public class TestLocalTableRead {
 
   public static final String TABLE_ID = "t1";
 
@@ -80,7 +80,7 @@ public class TestLocalReadableTable {
     numMissedLookups = new Counter("");
 
     metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadableTable.class.getSimpleName();
+    String groupName = LocalTable.class.getSimpleName();
     when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-gets")).thenReturn(numGets);
     when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-getAlls")).thenReturn(numGetAlls);
     when(metricsRegistry.newCounter(groupName, TABLE_ID + "-num-missed-lookups")).thenReturn(numMissedLookups);
@@ -91,7 +91,7 @@ public class TestLocalReadableTable {
 
   @Test
   public void testGet() throws Exception {
-    ReadableTable table = createTable(false);
+    ReadWriteTable table = createTable(false);
     Assert.assertEquals("v1", table.get("k1"));
     Assert.assertEquals("v2", table.getAsync("k2").get());
     Assert.assertNull(table.get("k3"));
@@ -106,7 +106,7 @@ public class TestLocalReadableTable {
 
   @Test
   public void testGetAll() throws Exception {
-    ReadableTable table = createTable(false);
+    ReadWriteTable table = createTable(false);
     Assert.assertEquals(values, table.getAll(keys));
     Assert.assertEquals(values, table.getAllAsync(keys).get());
     verify(kvStore, times(2)).getAll(any());
@@ -121,7 +121,7 @@ public class TestLocalReadableTable {
 
   @Test
   public void testTimerDisabled() throws Exception {
-    ReadableTable table = createTable(true);
+    ReadWriteTable table = createTable(true);
     table.get("");
     table.getAsync("").get();
     table.getAll(keys);
@@ -134,7 +134,7 @@ public class TestLocalReadableTable {
     Assert.assertEquals(0, getCallbackNs.getSnapshot().getAverage(), 0.001);
   }
 
-  private LocalReadableTable createTable(boolean isTimerDisabled) {
+  private LocalTable createTable(boolean isTimerDisabled) {
     Map<String, String> config = new HashMap<>();
     if (isTimerDisabled) {
       config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -147,7 +147,7 @@ public class TestLocalReadableTable {
     when(context.getContainerContext()).thenReturn(containerContext);
     when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
 
-    LocalReadableTable table =  new LocalReadableTable("t1", kvStore);
+    LocalTable table =  new LocalTable("t1", kvStore);
     table.init(context);
 
     return table;
@@ -41,7 +41,7 @@ import org.junit.Test;
 import static org.mockito.Mockito.*;
 
 
-public class TestLocalReadWriteTable {
+public class TestLocalTableWrite {
 
   public static final String TABLE_ID = "t1";
 
@@ -79,7 +79,7 @@ public class TestLocalReadWriteTable {
     deleteCallbackNs = new Timer("");
 
     metricsRegistry = mock(MetricsRegistry.class);
-    String groupName = LocalReadWriteTable.class.getSimpleName();
+    String groupName = LocalTable.class.getSimpleName();
     when(metricsRegistry.newTimer(groupName, TABLE_ID + "-put-ns")).thenReturn(putNs);
     when(metricsRegistry.newTimer(groupName, TABLE_ID + "-putAll-ns")).thenReturn(putAllNs);
     when(metricsRegistry.newTimer(groupName, TABLE_ID + "-delete-ns")).thenReturn(deleteNs);
@@ -226,7 +226,7 @@ public class TestLocalReadWriteTable {
     Assert.assertEquals(0, deleteCallbackNs.getSnapshot().getAverage(), 0.001);
   }
 
-  private LocalReadWriteTable createTable(boolean isTimerDisabled) {
+  private LocalTable createTable(boolean isTimerDisabled) {
     Map<String, String> config = new HashMap<>();
     if (isTimerDisabled) {
       config.put(MetricsConfig.METRICS_TIMER_ENABLED(), "false");
@@ -239,7 +239,7 @@ public class TestLocalReadWriteTable {
     when(context.getContainerContext()).thenReturn(containerContext);
     when(containerContext.getContainerMetricsRegistry()).thenReturn(metricsRegistry);
 
-    LocalReadWriteTable table =  new LocalReadWriteTable("t1", kvStore);
+    LocalTable table =  new LocalTable("t1", kvStore);
     table.init(context);
 
     return table;
index 2137a46..ab650f2 100644 (file)
@@ -229,7 +229,7 @@ public class StreamTaskIntegrationTest {
 
     @Override
     public void init(Context context) throws Exception {
-      profileViewTable = (ReadWriteTable<Integer, Profile>) context.getTaskContext().getTable("profile-view-store");
+      profileViewTable = context.getTaskContext().getTable("profile-view-store");
     }
 
     @Override
@@ -49,7 +49,6 @@ import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.storage.kv.inmemory.descriptors.InMemoryTableDescriptor;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.table.ReadWriteTable;
-import org.apache.samza.table.ReadableTable;
 import org.apache.samza.table.Table;
 import org.apache.samza.task.InitableTask;
 import org.apache.samza.task.MessageCollector;
@@ -77,7 +76,7 @@ import static org.junit.Assert.assertTrue;
 /**
  * This test class tests sendTo() and join() for local tables
  */
-public class TestLocalTable extends AbstractIntegrationTestHarness {
+public class TestLocalTableEndToEnd extends AbstractIntegrationTestHarness {
 
   @Test
   public void testSendTo() throws Exception {
@@ -296,11 +295,11 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     private static Map<String, MyMapFunction> taskToMapFunctionMap = new HashMap<>();
 
     private transient List<Profile> received;
-    private transient ReadableTable table;
+    private transient ReadWriteTable table;
 
     @Override
     public void init(Context context) {
-      table = (ReadableTable) context.getTaskContext().getTable("t1");
+      table = context.getTaskContext().getTable("t1");
       this.received = new ArrayList<>();
 
       taskToMapFunctionMap.put(context.getTaskContext().getTaskModel().getTaskName().getTaskName(), this);
@@ -348,7 +347,7 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
     private ReadWriteTable<Integer, PageView> pageViewTable;
     @Override
     public void init(Context context) throws Exception {
-      pageViewTable = (ReadWriteTable<Integer, PageView>) context.getTaskContext().getTable("t1");
+      pageViewTable = context.getTaskContext().getTable("t1");
     }
     @Override
     public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
@@ -54,8 +54,7 @@ import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.GuavaCacheTableDescriptor;
-import org.apache.samza.table.remote.RemoteReadWriteTable;
-import org.apache.samza.table.remote.RemoteReadableTable;
+import org.apache.samza.table.remote.RemoteTable;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.remote.TableRateLimiter;
 import org.apache.samza.table.remote.TableReadFunction;
@@ -80,7 +79,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.withSettings;
 
 
-public class TestRemoteTable extends AbstractIntegrationTestHarness {
+public class TestRemoteTableEndToEnd extends AbstractIntegrationTestHarness {
 
   static Map<String, List<EnrichedPageView>> writtenRecords = new HashMap<>();
 
@@ -184,7 +183,7 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     String profiles = Base64Serializer.serialize(generateProfiles(count));
 
     int partitionCount = 4;
-    Map<String, String> configs = TestLocalTable.getBaseJobConfig(bootstrapUrl(), zkConnect());
+    Map<String, String> configs = TestLocalTableEndToEnd.getBaseJobConfig(bootstrapUrl(), zkConnect());
 
     configs.put("streams.PageView.samza.system", "test");
     configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
@@ -266,8 +265,8 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     future.completeExceptionally(new RuntimeException("Expected test exception"));
     doReturn(future).when(reader).getAsync(anyString());
     TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadableTable<String, ?> table = new RemoteReadableTable<>(
-        "table1", reader, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
+    RemoteTable<String, String> table = new RemoteTable<>("table1", reader, null,
+        rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
     table.init(createMockContext());
     table.get("abc");
   }
@@ -280,9 +279,32 @@ public class TestRemoteTable extends AbstractIntegrationTestHarness {
     future.completeExceptionally(new RuntimeException("Expected test exception"));
     doReturn(future).when(writer).putAsync(anyString(), any());
     TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
-    RemoteReadWriteTable<String, String> table = new RemoteReadWriteTable<String, String>(
+    RemoteTable<String, String> table = new RemoteTable<String, String>(
         "table1", reader, writer, rateLimitHelper, rateLimitHelper, Executors.newSingleThreadExecutor(), null);
     table.init(createMockContext());
     table.put("abc", "efg");
   }
+
+  @Test
+  public void testUninitializedWriter() {
+    TableReadFunction<String, String> reader = mock(TableReadFunction.class);
+    TableRateLimiter rateLimitHelper = mock(TableRateLimiter.class);
+    RemoteTable<String, String> table = new RemoteTable<String, String>(
+        "table1", reader, null, rateLimitHelper, null, Executors.newSingleThreadExecutor(), null);
+    table.init(createMockContext());
+    int failureCount = 0;
+    try {
+      table.put("abc", "efg");
+    } catch (SamzaException ex) {
+      ++failureCount;
+    }
+    try {
+      table.delete("abc");
+    } catch (SamzaException ex) {
+      ++failureCount;
+    }
+    table.flush();
+    table.close();
+    Assert.assertEquals(2, failureCount);
+  }
 }