Convert a put to to a delete operation in ReadWriteTable and TableWriteFunction when...
authorWei Song <wsong@wsong-mn2.linkedin.biz>
Thu, 7 Jun 2018 23:51:30 +0000 (16:51 -0700)
committerPrateek Maheshwari <pmaheshwari@linkedin.com>
Thu, 7 Jun 2018 23:51:30 +0000 (16:51 -0700)
Currently, the behavior of putting a null value is inconsistent: it is a delete for RocksDB, and not supported in in-memory store, and on a case-by-case basis for remote tables. It is desirable to unify the behavior. Furthermore, it eases the writing of a change captured stream to a table. A change captured stream contains typically 3 types of events: INSERT, UPDATE and DELETE, and they need to be applied properly when written to a table to produce a correct snapshot. In a change captured stream the payload of a DELETE event is typically is null, and this would result in a delete operation to a table in sendTo() operator.

Author: Wei Song <wsong@wsong-mn2.linkedin.biz>

Closes #547 from weisong44/table-fix

samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
samza-core/src/main/java/org/apache/samza/table/caching/guava/GuavaCacheTable.java
samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java
samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
samza-sql/src/test/java/org/apache/samza/sql/testutil/TestIOResolverFactory.java

index d617153..def5afb 100644 (file)
@@ -19,7 +19,6 @@
 package org.apache.samza.table;
 
 import java.util.List;
-
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.storage.kv.Entry;
 
@@ -36,17 +35,21 @@ public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
   /**
    * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
    *
+   * The key is deleted from the table if value is {@code null}.
+   *
    * @param key the key with which the specified {@code value} is to be associated.
    * @param value the value with which the specified {@code key} is to be associated.
-   * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
    */
   void put(K key, V value);
 
   /**
    * Updates the mappings of the specified key-value {@code entries}.
    *
+   * A key is deleted from the table if its corresponding value is {@code null}.
+   *
    * @param entries the updated mappings to put into this table.
-   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key.
    */
   void putAll(List<Entry<K, V>> entries);
 
index 27bf971..3f8ab51 100644 (file)
@@ -66,7 +66,11 @@ public class GuavaCacheTable<K, V> implements ReadWriteTable<K, V> {
 
   @Override
   public void put(K key, V value) {
-    cache.put(key, value);
+    if (value != null) {
+      cache.put(key, value);
+    } else {
+      delete(key);
+    }
   }
 
   @Override
index a47e349..a640efb 100644 (file)
@@ -84,6 +84,12 @@ public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implem
    */
   @Override
   public void put(K key, V value) {
+
+    if (value == null) {
+      delete(key);
+      return;
+    }
+
     try {
       numPuts.inc();
       if (rateLimitWrites) {
index 3fb8fda..df54878 100644 (file)
@@ -22,7 +22,6 @@ package org.apache.samza.table.remote;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
-
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.ClosableFunction;
 import org.apache.samza.operators.functions.InitableFunction;
@@ -44,6 +43,9 @@ import org.apache.samza.storage.kv.Entry;
 public interface TableWriteFunction<K, V> extends Serializable, InitableFunction, ClosableFunction {
   /**
    * Store single table {@code record} with specified {@code key}. This method must be thread-safe.
+   *
+   * The key is deleted if record is {@code null}.
+   *
    * @param key key for the table record
    * @param record table record to be written
    */
@@ -51,6 +53,9 @@ public interface TableWriteFunction<K, V> extends Serializable, InitableFunction
 
   /**
    * Store the table {@code records} with specified {@code keys}. This method must be thread-safe.
+   *
+   * A key is deleted if its corresponding record is {@code null}.
+   *
    * @param records table records to be written
    */
   default void putAll(List<Entry<K, V>> records) {
index 4037f60..906ee1d 100644 (file)
@@ -42,7 +42,11 @@ public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadab
 
   @Override
   public void put(K key, V value) {
-    kvStore.put(key, value);
+    if (value != null) {
+      kvStore.put(key, value);
+    } else {
+      delete(key);
+    }
   }
 
   @Override
index bbe2a7e..8a20239 100644 (file)
@@ -23,7 +23,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.SamzaContainerContext;
@@ -34,9 +33,9 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.sql.data.SamzaSqlCompositeKey;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.interfaces.SqlIOResolver;
 import org.apache.samza.sql.interfaces.SqlIOResolverFactory;
-import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.storage.kv.RocksDbTableDescriptor;
 import org.apache.samza.table.ReadWriteTable;
 import org.apache.samza.table.Table;
@@ -91,8 +90,10 @@ public class TestIOResolverFactory implements SqlIOResolverFactory {
     public void put(Object key, Object value) {
       if (key == null) {
         records.put(System.nanoTime(), value);
-      } else {
+      } else if (value != null) {
         records.put(key, value);
+      } else {
+        delete(key);
       }
     }