Initial version of Table API
authorWei Song <wsong@linkedin.com>
Tue, 5 Dec 2017 21:23:44 +0000 (13:23 -0800)
committerPrateek Maheshwari <pmaheshw@linkedin.com>
Tue, 5 Dec 2017 21:23:44 +0000 (13:23 -0800)
Initial version of table API, it includes
 - Core table API (Table, TableDescriptor, TableSpec)
 - Local table implementation for in-memory and RocksDb
 - The writeTo() and stream-table join operators

nickpan47 xinyuiscool prateekm could you help review?

Author: Wei Song <wsong@linkedin.com>

Reviewers: Yi Pan <nickpan47@gmail.com>, Christopher Pettitt <cpettitt@linkedin.com>

Closes #349 from weisong44/table-api-14

59 files changed:
build.gradle
samza-api/src/main/java/org/apache/samza/operators/KV.java
samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/ReadableTable.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/Table.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/TableProvider.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/table/TableSpec.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java
samza-core/src/main/java/org/apache/samza/execution/JobGraph.java
samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
samza-core/src/main/java/org/apache/samza/operators/TableImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/table/TableManager.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/table/TestTableManager.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java [new file with mode: 0644]
samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java [new file with mode: 0644]
samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java [new file with mode: 0644]
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java [new file with mode: 0644]
samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java [new file with mode: 0644]
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java [new file with mode: 0644]
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java [new file with mode: 0644]
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java [new file with mode: 0644]
samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java [new file with mode: 0644]
samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java [new file with mode: 0644]
samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java [new file with mode: 0644]
samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java [new file with mode: 0644]
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java [new file with mode: 0644]
samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java [new file with mode: 0644]
samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java [new file with mode: 0644]
samza-test/src/test/java/org/apache/samza/test/util/ArraySystemConsumer.java
samza-test/src/test/java/org/apache/samza/test/util/SimpleSystemAdmin.java

index 50cc5e0..be1baf7 100644 (file)
@@ -565,6 +565,15 @@ project(":samza-kv_$scalaVersion") {
 project(":samza-kv-inmemory_$scalaVersion") {
   apply plugin: 'scala'
 
+  // Force scala joint compilation
+  sourceSets.main.scala.srcDir "src/main/java"
+  sourceSets.test.scala.srcDir "src/test/java"
+
+  // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+  // tasks.compileTestJava.enabled = false
+  sourceSets.main.java.srcDirs = []
+  sourceSets.test.java.srcDirs = []
+
   dependencies {
     compile project(':samza-api')
     compile project(":samza-core_$scalaVersion")
index 0bed3b9..824bcb4 100644 (file)
@@ -25,7 +25,7 @@ package org.apache.samza.operators;
  * @param <K> type of the key
  * @param <V> type of the value
  */
-public class KV<K, V> {
+public final class KV<K, V> {
   public final K key;
   public final V value;
 
index acb2f33..f0a5526 100644 (file)
  */
 package org.apache.samza.operators;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
+import org.apache.samza.table.Table;
 
 
 /**
@@ -154,6 +156,34 @@ public interface MessageStream<M> {
       Duration ttl, String id);
 
   /**
+   * Joins this {@link MessageStream} with another {@link Table} using the provided
+   * pairwise {@link StreamTableJoinFunction}.
+   * <p>
+   * The type of input message is expected to be {@link KV}.
+   * <p>
+   * Records are looked up from the joined table using the join key, join function
+   * is applied and join results are emitted as matches are found.
+   * <p>
+   * The join function allows implementation of both inner and left outer join. A null will be
+   * passed to the join function, if no record matching the join key is found in the table.
+   * The join function can choose to return an instance of JM (outer left join) or null
+   * (inner join); if null is returned, it won't be processed further.
+   * <p>
+   * Both the input stream and table being joined must have the same number of partitions,
+   * and should be partitioned by the same join key.
+   * <p>
+   *
+   * @param table the table being joined
+   * @param joinFn the join function
+   * @param <K> the type of join key
+   * @param <R> the type of table record
+   * @param <JM> the type of messages resulting from the {@code joinFn}
+   * @return the joined {@link MessageStream}
+   */
+  <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+      StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn);
+
+  /**
    * Merges all {@code otherStreams} with this {@link MessageStream}.
    * <p>
    * The merged stream contains messages from all streams in the order they arrive.
@@ -235,4 +265,15 @@ public interface MessageStream<M> {
    */
   <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
       Function<? super M, ? extends V> valueExtractor, String id);
+
+  /**
+   * Sends messages in this {@link MessageStream} to a {@link Table}. The type of input message is expected
+   * to be {@link KV}, otherwise a {@link ClassCastException} will be thrown.
+   *
+   * @param table the table to write messages to
+   * @param <K> the type of key in the table
+   * @param <V> the type of record value in the table
+   */
+  <K, V> void sendTo(Table<KV<K, V>> table);
+
 }
index 4930631..6871bc7 100644 (file)
@@ -20,6 +20,7 @@ package org.apache.samza.operators;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.Table;
 
 
 /**
@@ -121,6 +122,20 @@ public interface StreamGraph {
   <M> OutputStream<M> getOutputStream(String streamId);
 
   /**
+   * Gets the {@link Table} corresponding to the {@link TableDescriptor}.
+   * <p>
+   * Multiple invocations of this method with the same {@link TableDescriptor} will throw an
+   * {@link IllegalStateException}.
+   *
+   * @param tableDesc the {@link TableDescriptor}
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the {@link Table} corresponding to the {@code tableDesc}
+   * @throws IllegalStateException when invoked multiple times with the same {@link TableDescriptor}
+   */
+  <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc);
+
+  /**
    * Sets the {@link ContextManager} for this {@link StreamGraph}.
    * <p>
    * The provided {@link ContextManager} can be used to setup shared context between the operator functions
diff --git a/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java b/samza-api/src/main/java/org/apache/samza/operators/TableDescriptor.java
new file mode 100644 (file)
index 0000000..a60b6a9
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.KVSerde;
+
+/**
+ * User facing class to collect metadata that fully describes a
+ * Samza table. This interface should be implemented by concrete table implementations.
+ * <p>
+ * Typical user code should look like the following, notice <code>withConfig()</code>
+ * is defined in this class and the rest in subclasses.
+ *
+ * <pre>
+ * {@code
+ * TableDescriptor<Integer, String, ?> tableDesc = new RocksDbTableDescriptor("tbl")
+ *     .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde("UTF-8")))
+ *     .withBlockSize(1024)
+ *     .withConfig("some-key", "some-value");
+ * }
+ * </pre>
+
+ * Once constructed, a table descriptor can be registered with the system. Internally,
+ * the table descriptor is then converted to a {@link org.apache.samza.table.TableSpec},
+ * which is used to track tables internally.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+@InterfaceStability.Unstable
+public interface TableDescriptor<K, V, D extends TableDescriptor<K, V, D>> {
+
+  /**
+   * Get the Id of the table
+   * @return Id of the table
+   */
+  String getTableId();
+
+  /**
+   * Set the Serde for this table
+   * @param serde the serde
+   * @return this table descriptor instance
+   * @throws IllegalArgumentException if null is provided
+   */
+  D withSerde(KVSerde<K, V> serde);
+
+  /**
+   * Add a configuration entry for the table
+   * @param key the key
+   * @param value the value
+   * @return this table descriptor instance
+   */
+  D withConfig(String key, String value);
+
+}
\ No newline at end of file
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/StreamTableJoinFunction.java
new file mode 100644 (file)
index 0000000..6afcf67
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Joins incoming messages with records from a table by the join key.
+ *
+ * @param <K> the type of join key
+ * @param <M>  type of input message
+ * @param <R>  type of the table record
+ * @param <JM> type of join results
+ */
+@InterfaceStability.Unstable
+public interface StreamTableJoinFunction<K, M, R, JM> extends InitableFunction, ClosableFunction {
+
+  /**
+   * Joins the provided messages and table record, returns the joined message.
+   *
+   * @param message  the input message
+   * @param record  the table record value
+   * @return  the join result
+   */
+  JM apply(M message, R record);
+
+  /**
+   * Retrieve the join key from incoming messages
+   *
+   * @param message incoming message
+   * @return the join key
+   */
+  K getMessageKey(M message);
+
+  /**
+   * Retrieve the join key from table record
+   *
+   * @param record table record
+   * @return the join key
+   */
+  K getRecordKey(R record);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java b/samza-api/src/main/java/org/apache/samza/table/LocalStoreBackedTableProvider.java
new file mode 100644 (file)
index 0000000..21630ab
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.storage.StorageEngine;
+
+
+/**
+ * Interface for tables backed by Samza local stores. The backing stores are
+ * injected during initialization of the table. Since the lifecycle
+ * of the underlying stores are already managed by Samza container,
+ * the table provider will not manage the lifecycle of the backing
+ * stores.
+ */
+public interface LocalStoreBackedTableProvider extends TableProvider {
+  /**
+   * Initializes the table provider with the backing store
+   * @param store the backing store
+   */
+  void init(StorageEngine store);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadWriteTable.java
new file mode 100644 (file)
index 0000000..d617153
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.storage.kv.Entry;
+
+/**
+ *
+ * A table that supports get, put and delete by one or more keys
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+@InterfaceStability.Unstable
+public interface ReadWriteTable<K, V> extends ReadableTable<K, V> {
+
+  /**
+   * Updates the mapping of the specified key-value pair; Associates the specified {@code key} with the specified {@code value}.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param value the value with which the specified {@code key} is to be associated.
+   * @throws NullPointerException if the specified {@code key} or {@code value} is {@code null}.
+   */
+  void put(K key, V value);
+
+  /**
+   * Updates the mappings of the specified key-value {@code entries}.
+   *
+   * @param entries the updated mappings to put into this table.
+   * @throws NullPointerException if any of the specified {@code entries} has {@code null} as key or value.
+   */
+  void putAll(List<Entry<K, V>> entries);
+
+  /**
+   * Deletes the mapping for the specified {@code key} from this table (if such mapping exists).
+   *
+   * @param key the key for which the mapping is to be deleted.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  void delete(K key);
+
+  /**
+   * Deletes the mappings for the specified {@code keys} from this table.
+   *
+   * @param keys the keys for which the mappings are to be deleted.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  void deleteAll(List<K> keys);
+
+
+  /**
+   * Flushes the underlying store of this table, if applicable.
+   */
+  void flush();
+
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java b/samza-api/src/main/java/org/apache/samza/table/ReadableTable.java
new file mode 100644 (file)
index 0000000..5ad6e0f
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.KV;
+
+
+/**
+ *
+ * A table that supports get by one or more keys
+ *
+ * @param <K> the type of the record key in this table
+ * @param <V> the type of the record value in this table
+ */
+@InterfaceStability.Unstable
+public interface ReadableTable<K, V> extends Table<KV<K, V>> {
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+   * @throws NullPointerException if the specified {@code key} is {@code null}.
+   */
+  V get(K key);
+
+  /**
+   * Gets the values with which the specified {@code keys} are associated.
+   *
+   * @param keys the keys with which the associated values are to be fetched.
+   * @return a map of the keys that were found and their respective values.
+   * @throws NullPointerException if the specified {@code keys} list, or any of the keys, is {@code null}.
+   */
+  Map<K, V> getAll(List<K> keys);
+
+  /**
+   * Close the table and release any resources acquired
+   */
+  void close();
+
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/Table.java b/samza-api/src/main/java/org/apache/samza/table/Table.java
new file mode 100644 (file)
index 0000000..767e176
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ *
+ * Marker interface for a table.
+ *
+ * @param <R> the type of records in the table
+ */
+@InterfaceStability.Unstable
+public interface Table<R> {
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProvider.java b/samza-api/src/main/java/org/apache/samza/table/TableProvider.java
new file mode 100644 (file)
index 0000000..54c6f5d
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * A table provider provides the implementation for a table. It ensures a table is
+ * properly constructed and also manages its lifecycle.
+ */
+@InterfaceStability.Unstable
+public interface TableProvider {
+  /**
+   * Get an instance of the table for read/write operations
+   * @return the underlying table
+   */
+  Table getTable();
+
+  /**
+   * Generate any configuration for this table, the generated configuration
+   * is used by Samza container to construct this table and any components
+   * necessary
+   * .
+   * @param config the current configuration
+   * @return configuration for this table
+   */
+  Map<String, String> generateConfig(Map<String, String> config);
+
+  /**
+   * Start the underlying table
+   */
+  void start();
+
+  /**
+   * Stop the underlying table
+   */
+  void stop();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java b/samza-api/src/main/java/org/apache/samza/table/TableProviderFactory.java
new file mode 100644 (file)
index 0000000..1bb0196
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Factory of a table provider object
+ */
+@InterfaceStability.Unstable
+public interface TableProviderFactory {
+  /**
+   * Constructs an instances of the table provider based on a given table spec
+   * @param tableSpec the table spec
+   * @return the table provider
+   */
+  TableProvider getTableProvider(TableSpec tableSpec);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/table/TableSpec.java b/samza-api/src/main/java/org/apache/samza/table/TableSpec.java
new file mode 100644 (file)
index 0000000..68043f9
--- /dev/null
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.KVSerde;
+
+
+/**
+ * TableSpec is a blueprint for creating, validating, or simply describing a table in the runtime environment.
+ *
+ * It is typically created indirectly by constructing an instance of {@link org.apache.samza.operators.TableDescriptor},
+ * and then invoke <code>BaseTableDescriptor.getTableSpec()</code>.
+ *
+ * It has specific attributes for common behaviors that Samza uses.
+ *
+ * It has the table provider factory, which provides the actual table implementation.
+ *
+ * It also includes a map of configurations which may be implementation-specific.
+ *
+ * It is immutable by design.
+ */
+@InterfaceStability.Unstable
+public class TableSpec {
+
+  private final String id;
+  private final KVSerde serde;
+  private final String tableProviderFactoryClassName;
+  private final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Default constructor
+   */
+  public TableSpec() {
+    this.id = null;
+    this.serde = null;
+    this.tableProviderFactoryClassName = null;
+  }
+
+  /**
+   * Constructs a {@link TableSpec}
+   *
+   * @param tableId Id of the table
+   * @param tableProviderFactoryClassName table provider factory
+   * @param serde the serde
+   * @param config implementation specific configuration
+   */
+  public TableSpec(String tableId, KVSerde serde, String tableProviderFactoryClassName,
+      Map<String, String> config) {
+    this.id = tableId;
+    this.serde = serde;
+    this.tableProviderFactoryClassName = tableProviderFactoryClassName;
+    this.config.putAll(config);
+  }
+
+  /**
+   * Get the Id of the table
+   * @return Id of the table
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the serde
+   * @param <K> the type of the key
+   * @param <V> the type of the value
+   * @return the key serde
+   */
+  public <K, V> KVSerde<K, V> getSerde() {
+    return serde;
+  }
+
+  /**
+   * Get the class name of the table provider factory
+   * @return class name of the table provider factory
+   */
+  public String getTableProviderFactoryClassName() {
+    return tableProviderFactoryClassName;
+  }
+
+  /**
+   * Get implementation configuration for the table
+   * @return configuration for the table
+   */
+  public Map<String, String> getConfig() {
+    return Collections.unmodifiableMap(config);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || !getClass().equals(o.getClass())) {
+      return false;
+    }
+    return id.equals(((TableSpec) o).id);
+  }
+
+  @Override
+  public int hashCode() {
+    return id.hashCode();
+  }
+}
index 4ef3d30..11ffacc 100644 (file)
 
 package org.apache.samza.task;
 
+import java.util.Set;
+
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
 
-import java.util.Set;
 
 /**
  * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during
@@ -37,6 +39,8 @@ public interface TaskContext {
 
   Object getStore(String name);
 
+  Table getTable(String tableId);
+
   TaskName getTaskName();
 
   SamzaContainerContext getSamzaContainerContext();
diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaTableConfig.java
new file mode 100644 (file)
index 0000000..6cc3986
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class for handling table configuration
+ */
+public class JavaTableConfig extends MapConfig {
+
+  // Prefix
+  public static final String TABLES_PREFIX = "tables.";
+  public static final String TABLE_ID_PREFIX = TABLES_PREFIX + "%s";
+
+  // Suffix
+  public static final String TABLE_PROVIDER_FACTORY_SUFFIX = ".provider.factory";
+
+  // Config keys
+  public static final String TABLE_PROVIDER_FACTORY = String.format("%s.provider.factory", TABLE_ID_PREFIX);
+  public static final String TABLE_KEY_SERDE = String.format("%s.key.serde", TABLE_ID_PREFIX);
+  public static final String TABLE_VALUE_SERDE = String.format("%s.value.serde", TABLE_ID_PREFIX);
+
+
+  public JavaTableConfig(Config config) {
+    super(config);
+  }
+
+  /**
+   * Get Id's of all tables
+   * @return list of table Id's
+   */
+  public List<String> getTableIds() {
+    Config subConfig = subset(TABLES_PREFIX, true);
+    Set<String> tableNames = subConfig.keySet().stream()
+        .filter(k -> k.endsWith(TABLE_PROVIDER_FACTORY_SUFFIX))
+        .map(k -> k.substring(0, k.indexOf(".")))
+        .collect(Collectors.toSet());
+    return new LinkedList<>(tableNames);
+  }
+
+  /**
+   * Get the {@link org.apache.samza.table.TableProviderFactory} class for a table
+   * @param tableId Id of the table
+   * @return the {@link org.apache.samza.table.TableProviderFactory} class name
+   */
+  public String getTableProviderFactory(String tableId) {
+    return get(String.format(TABLE_PROVIDER_FACTORY, tableId), null);
+  }
+
+  /**
+   * Get registry keys of key serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getKeySerde(String tableId) {
+    return get(String.format(TABLE_KEY_SERDE, tableId), null);
+  }
+
+  /**
+   * Get registry keys of value serde for this table
+   * @param tableId Id of the table
+   * @return serde retistry key
+   */
+  public String getValueSerde(String tableId) {
+    return get(String.format(TABLE_VALUE_SERDE, tableId), null);
+  }
+}
index aa622a3..0248486 100644 (file)
 
 package org.apache.samza.container;
 
-import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
@@ -27,13 +30,13 @@ import org.apache.samza.storage.TaskStorageManager;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableManager;
 import org.apache.samza.task.TaskContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+import com.google.common.collect.ImmutableSet;
 
 public class TaskContextImpl implements TaskContext {
   private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class);
@@ -44,6 +47,7 @@ public class TaskContextImpl implements TaskContext {
   private final Set<SystemStreamPartition> systemStreamPartitions;
   private final OffsetManager offsetManager;
   private final TaskStorageManager storageManager;
+  private final TableManager tableManager;
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
   private final Map<String, Object> objectRegistry = new HashMap<>();
@@ -56,6 +60,7 @@ public class TaskContextImpl implements TaskContext {
                          Set<SystemStreamPartition> systemStreamPartitions,
                          OffsetManager offsetManager,
                          TaskStorageManager storageManager,
+                         TableManager tableManager,
                          JobModel jobModel,
                          StreamMetadataCache streamMetadataCache) {
     this.taskName = taskName;
@@ -64,6 +69,7 @@ public class TaskContextImpl implements TaskContext {
     this.systemStreamPartitions = ImmutableSet.copyOf(systemStreamPartitions);
     this.offsetManager = offsetManager;
     this.storageManager = storageManager;
+    this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
   }
@@ -89,6 +95,16 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
+  public Table getTable(String tableId) {
+    if (tableManager != null) {
+      return tableManager.getTable(tableId);
+    } else {
+      LOG.warn("No table manager found");
+      return null;
+    }
+  }
+
+  @Override
   public TaskName getTaskName() {
     return taskName;
   }
index 468aab9..e2c122a 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -95,6 +96,7 @@ public class ExecutionPlanner {
     Set<StreamSpec> sourceStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
     Set<StreamSpec> sinkStreams = new HashSet<>(streamGraph.getOutputStreams().keySet());
     Set<StreamSpec> intStreams = new HashSet<>(sourceStreams);
+    Set<TableSpec> tables = new HashSet<>(streamGraph.getTables().keySet());
     intStreams.retainAll(sinkStreams);
     sourceStreams.removeAll(intStreams);
     sinkStreams.removeAll(intStreams);
@@ -113,6 +115,9 @@ public class ExecutionPlanner {
     // add intermediate streams
     intStreams.forEach(spec -> jobGraph.addIntermediateStream(spec, node, node));
 
+    // add tables
+    tables.forEach(spec -> jobGraph.addTable(spec, node));
+
     jobGraph.validate();
 
     return jobGraph;
index 2a09e90..4a09260 100644 (file)
@@ -30,11 +30,13 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,6 +57,7 @@ import org.slf4j.LoggerFactory;
   private final Set<StreamEdge> sources = new HashSet<>();
   private final Set<StreamEdge> sinks = new HashSet<>();
   private final Set<StreamEdge> intermediateStreams = new HashSet<>();
+  private final Set<TableSpec> tables = new HashSet<>();
   private final Config config;
   private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator();
 
@@ -86,6 +89,11 @@ import org.slf4j.LoggerFactory;
         .collect(Collectors.toList());
   }
 
+  void addTable(TableSpec tableSpec, JobNode node) {
+    tables.add(tableSpec);
+    node.addTable(tableSpec);
+  }
+
   @Override
   public String getPlanAsJson() throws Exception {
     return jsonGenerator.toJson(this);
@@ -211,6 +219,14 @@ import org.slf4j.LoggerFactory;
   }
 
   /**
+   * Return the tables in the graph
+   * @return unmodifiable set of {@link TableSpec}
+   */
+  Set<TableSpec> getTables() {
+    return Collections.unmodifiableSet(tables);
+  }
+
+  /**
    * Return the intermediate streams in the graph
    * @return unmodifiable set of {@link StreamEdge}
    */
index 03845e3..2729fa3 100644 (file)
@@ -28,12 +28,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.TableSpec;
 import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.ObjectMapper;
 
@@ -54,6 +57,15 @@ import org.codehaus.jackson.map.ObjectMapper;
     int partitionCount;
   }
 
+  static final class TableSpecJson {
+    @JsonProperty("id")
+    String id;
+    @JsonProperty("tableProviderFactory")
+    String tableProviderFactory;
+    @JsonProperty("config")
+    Map<String, String> config;
+  }
+
   static final class StreamEdgeJson {
     @JsonProperty("streamSpec")
     StreamSpecJson streamSpec;
@@ -97,6 +109,8 @@ import org.codehaus.jackson.map.ObjectMapper;
     Map<String, StreamEdgeJson> sinkStreams;
     @JsonProperty("intermediateStreams")
     Map<String, StreamEdgeJson> intermediateStreams;
+    @JsonProperty("tables")
+    Map<String, TableSpecJson> tables;
     @JsonProperty("applicationName")
     String applicationName;
     @JsonProperty("applicationId")
@@ -119,9 +133,11 @@ import org.codehaus.jackson.map.ObjectMapper;
     jobGraphJson.sourceStreams = new HashMap<>();
     jobGraphJson.sinkStreams = new HashMap<>();
     jobGraphJson.intermediateStreams = new HashMap<>();
+    jobGraphJson.tables = new HashMap<>();
     jobGraph.getSources().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sourceStreams));
     jobGraph.getSinks().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.sinkStreams));
     jobGraph.getIntermediateStreamEdges().forEach(e -> buildStreamEdgeJson(e, jobGraphJson.intermediateStreams));
+    jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables));
 
     jobGraphJson.jobs = jobGraph.getJobNodes().stream()
         .map(jobNode -> buildJobNodeJson(jobNode))
@@ -206,6 +222,11 @@ import org.codehaus.jackson.map.ObjectMapper;
       map.put("outputStreamId", outputStream.getStreamSpec().getId());
     }
 
+    if (spec instanceof StreamTableJoinOperatorSpec) {
+      TableSpec tableSpec = ((StreamTableJoinOperatorSpec) spec).getTableSpec();
+      map.put("tableId", tableSpec.getId());
+    }
+
     if (spec instanceof JoinOperatorSpec) {
       map.put("ttlMs", ((JoinOperatorSpec) spec).getTtlMs());
     }
@@ -247,4 +268,33 @@ import org.codehaus.jackson.map.ObjectMapper;
     }
     return edgeJson;
   }
+
+  /**
+   * Get or create the JSON POJO for a {@link TableSpec}
+   * @param tableSpec the {@link TableSpec}
+   * @param tableSpecs a map of tableId to {@link TableSpecJson}
+   * @return JSON representation of the {@link TableSpec}
+   */
+  private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) {
+    String tableId = tableSpec.getId();
+    TableSpecJson tableSpecJson = tableSpecs.get(tableId);
+    if (tableSpecJson == null) {
+      tableSpecJson = buildTableJson(tableSpec);
+      tableSpecs.put(tableId, tableSpecJson);
+    }
+    return tableSpecJson;
+  }
+
+  /**
+   * Create the JSON POJO for a {@link TableSpec}
+   * @param tableSpec the {@link TableSpec}
+   * @return JSON representation of the {@link TableSpec}
+   */
+  private TableSpecJson buildTableJson(TableSpec tableSpec) {
+    TableSpecJson tableSpecJson = new TableSpecJson();
+    tableSpecJson.id = tableSpec.getId();
+    tableSpecJson.tableProviderFactory = tableSpec.getTableProviderFactoryClassName();
+    tableSpecJson.config = tableSpec.getConfig();
+    return tableSpecJson;
+  }
 }
index 2e89292..4e337d9 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.execution;
 
-import com.google.common.base.Joiner;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
@@ -29,7 +28,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.SerializerConfig;
@@ -47,10 +48,16 @@ import org.apache.samza.operators.util.MathUtils;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Joiner;
+
+
 /**
  * A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
  * to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
@@ -67,6 +74,7 @@ public class JobNode {
   private final StreamGraphImpl streamGraph;
   private final List<StreamEdge> inEdges = new ArrayList<>();
   private final List<StreamEdge> outEdges = new ArrayList<>();
+  private final List<TableSpec> tables = new ArrayList<>();
   private final Config config;
 
   JobNode(String jobName, String jobId, StreamGraphImpl streamGraph, Config config) {
@@ -109,6 +117,10 @@ public class JobNode {
     return outEdges;
   }
 
+  void addTable(TableSpec tableSpec) {
+    tables.add(tableSpec);
+  }
+
   /**
    * Generate the configs for a job
    * @param executionPlanJson JSON representation of the execution plan
@@ -147,6 +159,19 @@ public class JobNode {
     // write serialized serde instances and stream serde configs to configs
     addSerdeConfigs(configs);
 
+    tables.forEach(tableSpec -> {
+        // Table provider factory
+        configs.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableSpec.getId()),
+            tableSpec.getTableProviderFactoryClassName());
+
+        // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs()
+
+        // Generate additional configuration
+        TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName());
+        TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
+        configs.putAll(tableProvider.generateConfig(configs));
+      });
+
     log.info("Job {} has generated configs {}", jobName, configs);
 
     String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
@@ -209,11 +234,21 @@ public class JobNode {
         }
       });
 
+    // collect all key and msg serde instances for tables
+    Map<String, Serde> tableKeySerdes = new HashMap<>();
+    Map<String, Serde> tableValueSerdes = new HashMap<>();
+    tables.forEach(tableSpec -> {
+        tableKeySerdes.put(tableSpec.getId(), tableSpec.getSerde().getKeySerde());
+        tableValueSerdes.put(tableSpec.getId(), tableSpec.getSerde().getValueSerde());
+      });
+
     // for each unique stream or store serde instance, generate a unique name and serialize to config
     HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
     serdes.addAll(streamMsgSerdes.values());
     serdes.addAll(storeKeySerdes.values());
     serdes.addAll(storeMsgSerdes.values());
+    serdes.addAll(tableKeySerdes.values());
+    serdes.addAll(tableValueSerdes.values());
     SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
     Base64.Encoder base64Encoder = Base64.getEncoder();
     Map<Serde, String> serdeUUIDs = new HashMap<>();
@@ -247,6 +282,17 @@ public class JobNode {
         String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
         configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
       });
+
+    // set key and msg serdes for tables to the serde names generated above
+    tableKeySerdes.forEach((tableId, serde) -> {
+        String keySerdeConfigKey = String.format(JavaTableConfig.TABLE_KEY_SERDE, tableId);
+        configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+      });
+
+    tableValueSerdes.forEach((tableId, serde) -> {
+        String valueSerdeConfigKey = String.format(JavaTableConfig.TABLE_VALUE_SERDE, tableId);
+        configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+      });
   }
 
   /**
@@ -264,10 +310,14 @@ public class JobNode {
 
     // Filter out the join operators, and obtain a list of their ttl values
     List<Long> joinTtlIntervals = operatorSpecs.stream()
-        .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.JOIN)
+        .filter(spec -> spec instanceof JoinOperatorSpec)
         .map(spec -> ((JoinOperatorSpec) spec).getTtlMs())
         .collect(Collectors.toList());
 
+    if (joinTtlIntervals.isEmpty()) {
+      return -1;
+    }
+
     // Combine both the above lists
     List<Long> candidateTimerIntervals = new ArrayList<>(joinTtlIntervals);
     candidateTimerIntervals.addAll(windowTimerIntervals);
diff --git a/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/operators/BaseTableDescriptor.java
new file mode 100644 (file)
index 0000000..b875c2e
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Base class for all table descriptor implementations.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseTableDescriptor<K, V, D extends BaseTableDescriptor<K, V, D>>
+    implements TableDescriptor<K, V, D> {
+
+  protected final String tableId;
+
+  protected KVSerde<K, V> serde = KVSerde.of(new NoOpSerde(), new NoOpSerde());
+
+  protected final Map<String, String> config = new HashMap<>();
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table
+   */
+  protected BaseTableDescriptor(String tableId) {
+    this.tableId = tableId;
+  }
+
+  @Override
+  public D withConfig(String key, String value) {
+    config.put(key, value);
+    return (D) this;
+  }
+
+  @Override
+  public D withSerde(KVSerde<K, V> serde) {
+    if (serde == null) {
+      throw new IllegalArgumentException("Serde cannot be null");
+    }
+    this.serde = serde;
+    return (D) this;
+  }
+
+  @Override
+  public String getTableId() {
+    return tableId;
+  }
+
+  /**
+   * Generate config for {@link TableSpec}; this method is used internally.
+   * @param tableSpecConfig configuration for the {@link TableSpec}
+   */
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    tableSpecConfig.putAll(config);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly; this method is used internally.
+   */
+  protected void validate() {
+  }
+
+  /**
+   * Create a {@link TableSpec} from this table descriptor; this method is used internally.
+   *
+   * @return the {@link TableSpec}
+   */
+  abstract public TableSpec getTableSpec();
+}
index 3f4e40d..07af54f 100644 (file)
 
 package org.apache.samza.operators;
 
+import java.time.Duration;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
@@ -32,18 +37,18 @@ import org.apache.samza.operators.spec.OperatorSpecs;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
-
-import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
 
 
 /**
@@ -138,6 +143,16 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   }
 
   @Override
+  public <K, R extends KV, JM> MessageStream<JM> join(Table<R> table,
+      StreamTableJoinFunction<? extends K, ? super M, ? super R, ? extends JM> joinFn) {
+    TableSpec tableSpec = ((TableImpl) table).getTableSpec();
+    StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec = OperatorSpecs.createStreamTableJoinOperatorSpec(
+        tableSpec, (StreamTableJoinFunction<K, M, R, JM>) joinFn, this.graph.getNextOpId(OpCode.JOIN));
+    this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
+    return new MessageStreamImpl<>(this.graph, joinOpSpec);
+  }
+
+  @Override
   public MessageStream<M> merge(Collection<? extends MessageStream<? extends M>> otherStreams) {
     if (otherStreams.isEmpty()) return this;
     String opId = this.graph.getNextOpId(OpCode.MERGE);
@@ -176,4 +191,12 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
   protected OperatorSpec<?, M> getOperatorSpec() {
     return this.operatorSpec;
   }
+
+  @Override
+  public <K, V> void sendTo(Table<KV<K, V>> table) {
+    SendToTableOperatorSpec<K, V> op = OperatorSpecs.createSendToTableOperatorSpec(
+        this.operatorSpec, ((TableImpl) table).getTableSpec(), this.graph.getNextOpId(OpCode.SEND_TO));
+    this.operatorSpec.registerNextOperatorSpec(op);
+  }
+
 }
index d014cb9..b607c62 100644 (file)
  */
 package org.apache.samza.operators;
 
-import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
@@ -34,17 +42,12 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
+import com.google.common.base.Preconditions;
 
 /**
  * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
@@ -57,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
   // We use a LHM for deterministic order in initializing and closing operators.
   private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
   private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
+  private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>();
   private final ApplicationRunner runner;
   private final Config config;
 
@@ -146,6 +150,18 @@ public class StreamGraphImpl implements StreamGraph {
   }
 
   @Override
+  public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) {
+    TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec();
+    if (tables.containsKey(tableSpec)) {
+      throw new IllegalStateException(String.format(
+          "getTable() invoked multiple times with the same tableId: %s",
+          tableDesc.getTableId()));
+    }
+    tables.put(tableSpec, new TableImpl(tableSpec));
+    return tables.get(tableSpec);
+  }
+
+  @Override
   public StreamGraph withContextManager(ContextManager contextManager) {
     this.contextManager = contextManager;
     return this;
@@ -163,7 +179,7 @@ public class StreamGraphImpl implements StreamGraph {
    */
   <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) {
     StreamSpec streamSpec = runner.getStreamSpec(streamId);
-    
+
     Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
         "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
 
@@ -190,6 +206,10 @@ public class StreamGraphImpl implements StreamGraph {
     return Collections.unmodifiableMap(outputStreams);
   }
 
+  public Map<TableSpec, TableImpl> getTables() {
+    return Collections.unmodifiableMap(tables);
+  }
+
   public ContextManager getContextManager() {
     return this.contextManager;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java
new file mode 100644 (file)
index 0000000..e671534
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * This class is the holder of a {@link TableSpec}
+ */
+public class TableImpl implements Table {
+
+  private final TableSpec tableSpec;
+
+  public TableImpl(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+}
index 0bb12d2..ea278c1 100644 (file)
  */
 package org.apache.samza.operators.impl;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
@@ -29,6 +34,7 @@ import org.apache.samza.operators.KV;
 import org.apache.samza.operators.StreamGraphImpl;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.impl.store.TimestampedValue;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -36,8 +42,9 @@ import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.impl.store.TimestampedValue;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.task.TaskContext;
@@ -45,14 +52,9 @@ import org.apache.samza.util.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 
 /**
  * The DAG of {@link OperatorImpl}s corresponding to the DAG of {@link OperatorSpec}s.
@@ -212,6 +214,10 @@ public class OperatorImplGraph {
       return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
     } else if (operatorSpec instanceof JoinOperatorSpec) {
       return createPartialJoinOperatorImpl(prevOperatorSpec, (JoinOperatorSpec) operatorSpec, config, context, clock);
+    } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) {
+      return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, config, context);
+    } else if (operatorSpec instanceof SendToTableOperatorSpec) {
+      return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, config, context);
     }
     throw new IllegalArgumentException(
         String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/SendToTableOperatorImpl.java
new file mode 100644 (file)
index 0000000..5ce1328
--- /dev/null
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
+import org.apache.samza.table.ReadWriteTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a send-stream-to-table operator that stores the record
+ * in the table.
+ *
+ * @param <K> the type of the record key
+ * @param <V> the type of the record value
+ */
+public class SendToTableOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Void> {
+
+  private final SendToTableOperatorSpec<K, V> sendToTableOpSpec;
+  private final ReadWriteTable<K, V> table;
+
+  SendToTableOperatorImpl(SendToTableOperatorSpec<K, V> sendToTableOpSpec, Config config, TaskContext context) {
+    this.sendToTableOpSpec = sendToTableOpSpec;
+    this.table = (ReadWriteTable) context.getTable(sendToTableOpSpec.getTableSpec().getId());
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+  }
+
+  @Override
+  protected Collection<Void> handleMessage(KV<K, V> message, MessageCollector collector, TaskCoordinator coordinator) {
+    table.put(message.getKey(), message.getValue());
+    // there should be no further chained operators since this is a terminal operator.
+    return Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+    table.close();
+  }
+
+  @Override
+  protected OperatorSpec<KV<K, V>, Void> getOperatorSpec() {
+    return sendToTableOpSpec;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamTableJoinOperatorImpl.java
new file mode 100644 (file)
index 0000000..54a5770
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+
+/**
+ * Implementation of a stream-table join operator that first retrieve the value of
+ * the message key from incoming message, and then apply the join function.
+ *
+ * @param <K> type of the join key
+ * @param <M> type of input messages
+ * @param <R> type of the table record
+ * @param <JM> type of the join result
+ */
+class StreamTableJoinOperatorImpl<K, M, R extends KV, JM> extends OperatorImpl<M, JM> {
+
+  private final StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec;
+  private final ReadableTable<K, ?> table;
+
+  StreamTableJoinOperatorImpl(StreamTableJoinOperatorSpec<K, M, R, JM> joinOpSpec,
+      Config config, TaskContext context) {
+    this.joinOpSpec = joinOpSpec;
+    this.table = (ReadableTable) context.getTable(joinOpSpec.getTableSpec().getId());
+  }
+
+  @Override
+  protected void handleInit(Config config, TaskContext context) {
+    this.joinOpSpec.getJoinFn().init(config, context);
+  }
+
+  @Override
+  public Collection<JM> handleMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
+    K key = joinOpSpec.getJoinFn().getMessageKey(message);
+    Object recordValue = table.get(key);
+    R record = recordValue != null ? (R) KV.of(key, recordValue) : null;
+    JM output = joinOpSpec.getJoinFn().apply(message, record);
+
+    // The support for inner and outer join will be provided in the jonFn. For inner join, the joinFn might
+    // return null, when the corresponding record is absent in the table.
+    return output != null ?
+        Collections.singletonList(output)
+      : Collections.emptyList();
+  }
+
+  @Override
+  protected void handleClose() {
+    this.joinOpSpec.getJoinFn().close();
+  }
+
+  protected OperatorSpec<M, JM> getOperatorSpec() {
+    return joinOpSpec;
+  }
+
+}
index 17f1b49..2a5991c 100644 (file)
  */
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.operators.functions.WatermarkFunction;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreamImpl;
-
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.functions.WatermarkFunction;
+
 /**
- * A stream operator specification that holds all the information required to transform 
+ * A stream operator specification that holds all the information required to transform
  * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce the output
  * {@link org.apache.samza.operators.MessageStreamImpl}.
  *
index 1b3b8aa..c752fe2 100644 (file)
 
 package org.apache.samza.operators.spec;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.Function;
+
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -26,15 +30,13 @@ import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.apache.samza.task.TaskContext;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
-
 
 /**
  * Factory methods for creating {@link OperatorSpec} instances.
@@ -242,4 +244,38 @@ public class OperatorSpecs {
         },
         OperatorSpec.OpCode.MERGE, opId);
   }
+
+  /**
+   * Creates a {@link StreamTableJoinOperatorSpec} with a join function.
+   *
+   * @param tableSpec the table spec for the table on the right side of the join
+   * @param joinFn the user-defined join function to get join keys and results
+   * @param opId the unique ID of the operator
+   * @param <K> the type of join key
+   * @param <M> the type of input messages
+   * @param <R> the type of table record
+   * @param <JM> the type of the join result
+   * @return the {@link StreamTableJoinOperatorSpec}
+   */
+  public static <K, M, R, JM> StreamTableJoinOperatorSpec<K, M, R, JM> createStreamTableJoinOperatorSpec(
+      TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+    return new StreamTableJoinOperatorSpec(tableSpec, joinFn, opId);
+  }
+
+  /**
+   * Creates a {@link SendToTableOperatorSpec} with a key extractor and a value extractor function,
+   * the type of incoming message is expected to be KV&#60;K, V&#62;.
+   *
+   * @param inputOpSpec the operator spec for the input stream
+   * @param tableSpec the table spec for the underlying table
+   * @param opId the unique ID of the operator
+   * @param <K> the type of the table record key
+   * @param <V> the type of the table record value
+   * @return the {@link SendToTableOperatorSpec}
+   */
+  public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec(
+      OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) {
+    return new SendToTableOperatorSpec(inputOpSpec, tableSpec, opId);
+  }
+
 }
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
new file mode 100644 (file)
index 0000000..9084be2
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * The spec for operator that writes a stream to a table by extracting keys and values
+ * from the incoming messages.
+ *
+ * @param <K> the type of the table record key
+ * @param <V> the type of the table record value
+ */
+@InterfaceStability.Unstable
+public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> {
+
+  private final OperatorSpec<?, KV<K, V>> inputOpSpec;
+  private final TableSpec tableSpec;
+
+  /**
+   * Constructor for a {@link SendToTableOperatorSpec}.
+   *
+   * @param inputOpSpec  the operator spec of the input stream
+   * @param tableSpec  the table spec of the table written to
+   * @param opId  the unique ID for this operator
+   */
+  SendToTableOperatorSpec(OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) {
+    super(OpCode.SEND_TO, opId);
+    this.inputOpSpec = inputOpSpec;
+    this.tableSpec = tableSpec;
+  }
+
+  public OperatorSpec<?, KV<K, V>> getInputOpSpec() {
+    return inputOpSpec;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return null;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
new file mode 100644 (file)
index 0000000..730913a
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.functions.WatermarkFunction;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * The spec for stream-table join operator that retrieves a record from the table using key
+ * derived from the incoming message and joins with the incoming message.
+ *
+ * @param <M>  the type of input messages
+ * @param <R>  the type of table record
+ * @param <JM>  the type of join result
+ */
+@InterfaceStability.Unstable
+public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM> {
+
+  private final TableSpec tableSpec;
+  private final StreamTableJoinFunction<K, M, R, JM> joinFn;
+
+  /**
+   * Constructor for {@link StreamTableJoinOperatorSpec}.
+   *
+   * @param tableSpec  the table spec for the table on the right side of the join
+   * @param joinFn  the user-defined join function to get join keys and results
+   * @param opId  the unique ID for this operator
+   */
+  StreamTableJoinOperatorSpec(TableSpec tableSpec, StreamTableJoinFunction<K, M, R, JM> joinFn, String opId) {
+    super(OpCode.JOIN, opId);
+    this.tableSpec = tableSpec;
+    this.joinFn = joinFn;
+  }
+
+  public TableSpec getTableSpec() {
+    return tableSpec;
+  }
+
+  public StreamTableJoinFunction<K, M, R, JM> getJoinFn() {
+    return this.joinFn;
+  }
+
+  @Override
+  public WatermarkFunction getWatermarkFn() {
+    return joinFn instanceof WatermarkFunction ? (WatermarkFunction) joinFn : null;
+  }
+
+}
diff --git a/samza-core/src/main/java/org/apache/samza/table/TableManager.java b/samza-core/src/main/java/org/apache/samza/table/TableManager.java
new file mode 100644 (file)
index 0000000..c3555f3
--- /dev/null
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link TableManager} manages tables within a Samza task. For each table, it maintains
+ * the {@link TableSpec} and the {@link TableProvider}. It is used at execution for
+ * {@link org.apache.samza.container.TaskInstance} to retrieve table instances for
+ * read/write operations.
+ *
+ * A {@link TableManager} is constructed from job configuration, the {@link TableSpec}
+ * and {@link TableProvider} are constructed by processing the job configuration.
+ *
+ * After a {@link TableManager} is constructed, local tables are associated with
+ * local store instances created during {@link org.apache.samza.container.SamzaContainer}
+ * initialization.
+ *
+ * Method {@link TableManager#getTable(String)} will throw {@link IllegalStateException},
+ * if it's called before initialization.
+ *
+ * For store backed tables, the list of stores must be injected into the constructor.
+ */
+public class TableManager {
+
+  static public class TableCtx {
+    private TableSpec tableSpec;
+    private TableProvider tableProvider;
+  }
+
+  private final Logger logger = LoggerFactory.getLogger(TableManager.class.getName());
+
+  // tableId -> TableCtx
+  private final Map<String, TableCtx> tables = new HashMap<>();
+
+  private boolean localTablesInitialized;
+
+  /**
+   * Construct a table manager instance
+   * @param config the job configuration
+   * @param serdes Serde instances for tables
+   */
+  public TableManager(Config config, Map<String, Serde<Object>> serdes) {
+
+    new JavaTableConfig(config).getTableIds().forEach(tableId -> {
+
+        // Construct the table provider
+        String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
+
+        // Construct the KVSerde
+        JavaTableConfig tableConfig = new JavaTableConfig(config);
+        KVSerde serde = KVSerde.of(
+            serdes.get(tableConfig.getKeySerde(tableId)),
+            serdes.get(tableConfig.getValueSerde(tableId)));
+
+        TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
+            config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));
+
+        addTable(tableSpec);
+
+        logger.info("Added table " + tableSpec.getId());
+      });
+  }
+
+  /**
+   * Initialize all local table
+   * @param stores stores created locally
+   */
+  public void initLocalTables(Map<String, StorageEngine> stores) {
+    tables.values().forEach(ctx -> {
+        if (ctx.tableProvider instanceof LocalStoreBackedTableProvider) {
+          StorageEngine store = stores.get(ctx.tableSpec.getId());
+          if (store == null) {
+            throw new SamzaException(String.format(
+                "Backing store for table %s was not injected by SamzaContainer",
+                ctx.tableSpec.getId()));
+          }
+          ((LocalStoreBackedTableProvider) ctx.tableProvider).init(store);
+        }
+      });
+
+    localTablesInitialized = true;
+  }
+
+  /**
+   * Add a table to the table manager
+   * @param tableSpec the table spec
+   */
+  private void addTable(TableSpec tableSpec) {
+    if (tables.containsKey(tableSpec.getId())) {
+      throw new SamzaException("Table " + tableSpec.getId() + " already exists");
+    }
+    TableCtx ctx = new TableCtx();
+    TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName());
+    ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
+    ctx.tableSpec = tableSpec;
+    tables.put(tableSpec.getId(), ctx);
+  }
+
+  /**
+   * Start the table manager, internally it starts all tables
+   */
+  public void start() {
+    tables.values().forEach(ctx -> ctx.tableProvider.start());
+  }
+
+  /**
+   * Shutdown the table manager, internally it shuts down all tables
+   */
+  public void shutdown() {
+    tables.values().forEach(ctx -> ctx.tableProvider.stop());
+  }
+
+  /**
+   * Get a table instance
+   * @param tableId Id of the table
+   * @return table instance
+   */
+  public Table getTable(String tableId) {
+    if (!localTablesInitialized) {
+      throw new IllegalStateException("Local tables in TableManager not initialized.");
+    }
+    return tables.get(tableId).tableProvider.getTable();
+  }
+}
index 412e9dc..f465bfc 100644 (file)
 package org.apache.samza.container
 
 import java.io.File
+import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.util
-import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import java.net.{URL, UnknownHostException}
 import java.util.Base64
+import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
 
-import org.apache.samza.{SamzaContainerStatus, SamzaException}
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config._
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.container.disk.DiskQuotaPolicyFactory
-import org.apache.samza.container.disk.DiskSpaceMonitor
+import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
-import org.apache.samza.container.disk.NoThrottlingDiskQuotaPolicyFactory
-import org.apache.samza.container.disk.PollingScanDiskSpaceMonitor
+import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-import org.apache.samza.job.model.ContainerModel
 import org.apache.samza.job.model.JobModel
-import org.apache.samza.metrics.JmxServer
-import org.apache.samza.metrics.JvmMetrics
-import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.metrics.MetricsReporter
-import org.apache.samza.serializers.IntermediateMessageSerde
-import org.apache.samza.serializers.NoOpSerde
-import org.apache.samza.serializers.SerializableSerde
-import org.apache.samza.serializers.Serde
-import org.apache.samza.serializers.SerdeFactory
-import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.serializers.StringSerde
+import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
+import org.apache.samza.serializers._
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.storage.StorageEngineFactory
-import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemConsumersMetrics
-import org.apache.samza.system.SystemFactory
-import org.apache.samza.system.SystemProducers
-import org.apache.samza.system.SystemProducersMetrics
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.chooser.DefaultChooser
-import org.apache.samza.system.chooser.MessageChooserFactory
-import org.apache.samza.system.chooser.RoundRobinChooserFactory
+import org.apache.samza.storage.{StorageEngineFactory, TaskStorageManager}
+import org.apache.samza.system._
+import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
+import org.apache.samza.table.TableManager
 import org.apache.samza.task._
-import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Throttleable
-import org.apache.samza.util.MetricsReporterLoader
-import org.apache.samza.util.SystemClock
-import org.apache.samza.util.Util
 import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util._
+import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
 import scala.collection.JavaConverters._
 
@@ -568,6 +540,11 @@ object SamzaContainer extends Logging {
         new StorageConfig(config).getChangeLogDeleteRetentionsInMs,
         new SystemClock)
 
+      val tableManager = new TableManager(config, serdes.asJava)
+      tableManager.initLocalTables(taskStores.asJava)
+
+      info("Got table manager");
+
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions
         .asScala
@@ -586,6 +563,7 @@ object SamzaContainer extends Logging {
           containerContext = containerContext,
           offsetManager = offsetManager,
           storageManager = storageManager,
+          tableManager = tableManager,
           reporters = reporters,
           systemStreamPartitions = systemStreamPartitions,
           exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config),
@@ -711,6 +689,7 @@ class SamzaContainer(
       startOffsetManager
       startLocalityManager
       startStores
+      startTableManager
       startDiskSpaceMonitor
       startHostStatisticsMonitor
       startProducers
@@ -745,6 +724,7 @@ class SamzaContainer(
 
       shutdownConsumers
       shutdownTask
+      shutdownTableManager
       shutdownStores
       shutdownDiskSpaceMonitor
       shutdownHostStatisticsMonitor
@@ -885,9 +865,9 @@ class SamzaContainer(
   }
 
   def startStores {
-    info("Starting task instance stores.")
     taskInstances.values.foreach(taskInstance => {
       val startTime = System.currentTimeMillis()
+      info("Starting stores in task instance %s" format taskInstance.taskName)
       taskInstance.startStores
       // Measuring the time to restore the stores
       val timeToRestore = System.currentTimeMillis() - startTime
@@ -898,6 +878,13 @@ class SamzaContainer(
     })
   }
 
+  def startTableManager: Unit = {
+    taskInstances.values.foreach(taskInstance => {
+      info("Starting table manager in task instance %s" format taskInstance.taskName)
+      taskInstance.startTableManager
+    })
+  }
+
   def startTask {
     info("Initializing stream tasks.")
 
@@ -1003,6 +990,12 @@ class SamzaContainer(
     taskInstances.values.foreach(_.shutdownStores)
   }
 
+  def shutdownTableManager: Unit = {
+    info("Shutting down task instance table manager.")
+
+    taskInstances.values.foreach(_.shutdownTableManager)
+  }
+
   def shutdownLocalityManager {
     if(localityManager != null) {
       info("Shutting down locality manager.")
index acec365..f2a5074 100644 (file)
@@ -27,20 +27,9 @@ import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.storage.TaskStorageManager
-import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.system.StreamMetadataCache
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemConsumers
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.task.AsyncStreamTask
-import org.apache.samza.task.ClosableTask
-import org.apache.samza.task.EndOfStreamListenerTask
-import org.apache.samza.task.InitableTask
-import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.task.StreamTask
-import org.apache.samza.task.TaskCallbackFactory
-import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.task.WindowableTask
+import org.apache.samza.system._
+import org.apache.samza.table.TableManager
+import org.apache.samza.task._
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
@@ -56,6 +45,7 @@ class TaskInstance(
   containerContext: SamzaContainerContext,
   val offsetManager: OffsetManager = new OffsetManager,
   storageManager: TaskStorageManager = null,
+  tableManager: TableManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
   val systemStreamPartitions: Set[SystemStreamPartition] = Set(),
   val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler,
@@ -68,7 +58,7 @@ class TaskInstance(
   val isAsyncTask = task.isInstanceOf[AsyncStreamTask]
 
   val context = new TaskContextImpl(taskName,metrics, containerContext, systemStreamPartitions.asJava, offsetManager,
-                                    storageManager, jobModel, streamMetadataCache)
+                                    storageManager, tableManager, jobModel, streamMetadataCache)
 
   // store the (ssp -> if this ssp is catched up) mapping. "catched up"
   // means the same ssp in other taskInstances have the same offset as
@@ -101,6 +91,16 @@ class TaskInstance(
     }
   }
 
+  def startTableManager {
+    if (tableManager != null) {
+      debug("Starting table manager for taskName: %s" format taskName)
+
+      tableManager.start
+    } else {
+      debug("Skipping table manager initialization for taskName: %s" format taskName)
+    }
+  }
+
   def initTask {
     if (isInitableTask) {
       debug("Initializing task for taskName: %s" format taskName)
@@ -225,6 +225,16 @@ class TaskInstance(
     }
   }
 
+  def shutdownTableManager {
+    if (tableManager != null) {
+      debug("Shutting down table manager for taskName: %s" format taskName)
+
+      tableManager.shutdown
+    } else {
+      debug("Skipping table manager shutdown for taskName: %s" format taskName)
+    }
+  }
+
   override def toString() = "TaskInstance for class %s and taskName %s." format (task.getClass.getName, taskName)
 
   def toDetailedString() = "TaskInstance [taskName = %s, windowable=%s, closable=%s endofstreamlistener=%s]" format
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJavaTableConfig.java
new file mode 100644 (file)
index 0000000..2775ca7
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+import static junit.framework.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+
+public class TestJavaTableConfig {
+  @Test
+  public void testGetTableIds() {
+    Set<String> ids = Sets.newHashSet("t1", "t2");
+    Map<String, String> map = ids.stream()
+        .map(id -> String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, id))
+        .collect(Collectors.toMap(key -> key, key -> key + "-provider-factory"));
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+
+    assertEquals(2, tableConfig.getTableIds().size());
+
+    ids.removeAll(tableConfig.getTableIds());
+    assertTrue(ids.isEmpty());
+  }
+
+  @Test
+  public void testGetTableProperties() {
+    Map<String, String> map = new HashMap<>();
+    map.put("tables.t1.spec", "t1-spec");
+    map.put("tables.t1.provider.factory", "t1-provider-factory");
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(map));
+    assertEquals("t1-provider-factory", tableConfig.getTableProviderFactory("t1"));
+  }
+
+}
index d97d494..96e234e 100644 (file)
  */
 package org.apache.samza.operators;
 
-import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -27,14 +32,17 @@ import org.apache.samza.operators.functions.FoldLeftFunction;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
 import org.apache.samza.operators.spec.JoinOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec.OpCode;
 import org.apache.samza.operators.spec.OutputOperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.operators.spec.SendToTableOperatorSpec;
 import org.apache.samza.operators.spec.SinkOperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
 import org.apache.samza.operators.windows.Window;
@@ -42,14 +50,11 @@ import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.Windows;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.time.Duration;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.function.Function;
-import java.util.function.Supplier;
+import com.google.common.collect.ImmutableList;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -313,6 +318,57 @@ public class TestMessageStreamImpl {
   }
 
   @Test
+  public void testSendToTable() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec inputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source = new MessageStreamImpl<>(mockGraph, inputOpSpec);
+
+    TableSpec tableSpec = new TableSpec();
+    TableImpl table = new TableImpl(tableSpec);
+
+    source.sendTo(table);
+
+    ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(inputOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+    assertTrue(registeredOpSpec instanceof SendToTableOperatorSpec);
+    SendToTableOperatorSpec sendToTableOperatorSpec = (SendToTableOperatorSpec) registeredOpSpec;
+
+    assertEquals(OpCode.SEND_TO, sendToTableOperatorSpec.getOpCode());
+    assertEquals(inputOpSpec, sendToTableOperatorSpec.getInputOpSpec());
+    assertEquals(tableSpec, sendToTableOperatorSpec.getTableSpec());
+  }
+
+  @Test
+  public void testStreamTableJoin() {
+    StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+    OperatorSpec leftInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<KV<String, TestMessageEnvelope>> source1 = new MessageStreamImpl<>(mockGraph, leftInputOpSpec);
+    OperatorSpec rightInputOpSpec = mock(OperatorSpec.class);
+    MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>(mockGraph, rightInputOpSpec);
+
+    TableSpec tableSpec = new TableSpec();
+    TableImpl table = new TableImpl(tableSpec);
+
+    source2.sendTo(table);
+
+    StreamTableJoinFunction<String, KV<String, TestMessageEnvelope>, KV<String, TestMessageEnvelope>, TestOutputMessageEnvelope>
+        mockJoinFn = mock(StreamTableJoinFunction.class);
+    source1.join(table, mockJoinFn);
+
+    ArgumentCaptor<OperatorSpec> leftRegisteredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+    verify(leftInputOpSpec).registerNextOperatorSpec(leftRegisteredOpCaptor.capture());
+    OperatorSpec<?, TestMessageEnvelope> leftRegisteredOpSpec = leftRegisteredOpCaptor.getValue();
+
+    assertTrue(leftRegisteredOpSpec instanceof StreamTableJoinOperatorSpec);
+    StreamTableJoinOperatorSpec joinOpSpec = (StreamTableJoinOperatorSpec) leftRegisteredOpSpec;
+    assertEquals(OpCode.JOIN, joinOpSpec.getOpCode());
+    assertEquals(mockJoinFn, joinOpSpec.getJoinFn());
+    assertEquals(tableSpec, joinOpSpec.getTableSpec());
+  }
+
+  @Test
   public void testMerge() {
     StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
     OperatorSpec mockOpSpec1 = mock(OperatorSpec.class);
index cf0a198..3bb44b5 100644 (file)
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
+ * KIND, either express or implied.  See the License for THE
  * specific language governing permissions and limitations
  * under the License.
  */
 package org.apache.samza.operators;
 
-import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
@@ -33,10 +35,11 @@ import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.collect.ImmutableList;
+import junit.framework.Assert;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -583,4 +586,16 @@ public class TestStreamGraphImpl {
     Assert.assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
     Assert.assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
   }
+
+  @Test
+  public void testGetTable() {
+    ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+    Config mockConfig = mock(Config.class);
+    StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+    BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+    when(mockTableDescriptor.getTableSpec()).thenReturn(
+        new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>()));
+    Assert.assertNotNull(graph.getTable(mockTableDescriptor));
+  }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamTableJoinOperatorImpl.java
new file mode 100644 (file)
index 0000000..d8b2e8d
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.Collection;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.operators.spec.StreamTableJoinOperatorSpec;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.TableSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestStreamTableJoinOperatorImpl {
+  @Test
+  public void testHandleMessage() {
+
+    String tableId = "t1";
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn(tableId);
+
+    StreamTableJoinOperatorSpec mockJoinOpSpec = mock(StreamTableJoinOperatorSpec.class);
+    when(mockJoinOpSpec.getTableSpec()).thenReturn(tableSpec);
+    when(mockJoinOpSpec.getJoinFn()).thenReturn(
+        new StreamTableJoinFunction<String, KV<String, String>, KV<String, String>, String>() {
+          @Override
+          public String apply(KV<String, String> message, KV<String, String> record) {
+            if ("1".equals(message.getKey())) {
+              Assert.assertEquals("m1", message.getValue());
+              Assert.assertEquals("r1", record.getValue());
+              return "m1r1";
+            } else if ("2".equals(message.getKey())) {
+              Assert.assertEquals("m2", message.getValue());
+              Assert.assertNull(record);
+              return null;
+            }
+            throw new SamzaException("Should never reach here!");
+          }
+
+          @Override
+          public String getMessageKey(KV<String, String> message) {
+            return message.getKey();
+          }
+
+          @Override
+          public String getRecordKey(KV<String, String> record) {
+            return record.getKey();
+          }
+        });
+    Config config = mock(Config.class);
+    ReadableTable table = mock(ReadableTable.class);
+    when(table.get("1")).thenReturn("r1");
+    when(table.get("2")).thenReturn(null);
+    TaskContext mockTaskContext = mock(TaskContext.class);
+    when(mockTaskContext.getTable(tableId)).thenReturn(table);
+
+    MessageCollector mockMessageCollector = mock(MessageCollector.class);
+    TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
+
+    StreamTableJoinOperatorImpl streamTableJoinOperator = new StreamTableJoinOperatorImpl(
+        mockJoinOpSpec, config, mockTaskContext);
+
+    // Table has the key
+    Collection<TestMessageEnvelope> result;
+    result = streamTableJoinOperator.handleMessage(KV.of("1", "m1"), mockMessageCollector, mockTaskCoordinator);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("m1r1", result.iterator().next());
+    // Table doesn't have the key
+    result = streamTableJoinOperator.handleMessage(KV.of("2", "m2"), mockMessageCollector, mockTaskCoordinator);
+    Assert.assertEquals(0, result.size());
+  }
+
+}
diff --git a/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java b/samza-core/src/test/java/org/apache/samza/table/TestTableManager.java
new file mode 100644 (file)
index 0000000..df5b9e5
--- /dev/null
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.table;
+
+import java.lang.reflect.Field;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.storage.StorageEngine;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestTableManager {
+
+  private static final String TABLE_ID = "t1";
+
+  public static class DummyTableProviderFactory implements TableProviderFactory {
+
+    static Table table;
+    static LocalStoreBackedTableProvider tableProvider;
+
+    @Override
+    public TableProvider getTableProvider(TableSpec tableSpec) {
+      table = mock(Table.class);
+      tableProvider = mock(LocalStoreBackedTableProvider.class);
+      when(tableProvider.getTable()).thenReturn(table);
+      return tableProvider;
+    }
+  }
+
+  @Test
+  public void testInitByConfig() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
+    map.put(String.format("tables.%s.some.config", TABLE_ID), "xyz");
+    addKeySerde(map);
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutProviderFactory() {
+    Map<String, String> map = new HashMap<>();
+    addKeySerde(map);
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutKeySerde() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = Exception.class)
+  public void testInitFailsWithoutValueSerde() {
+    Map<String, String> map = new HashMap<>();
+    map.put(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, TABLE_ID), DummyTableProviderFactory.class.getName());
+    addValueSerde(map);
+    doTestInit(map);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testInitFailsWithoutInitializingLocalTables() {
+    TableManager tableManager = new TableManager(new MapConfig(new HashMap<>()), new HashMap<>());
+    tableManager.getTable("dummy");
+  }
+
+  private void doTestInit(Map<String, String> map) {
+    Map<String, StorageEngine> storageEngines = new HashMap<>();
+    storageEngines.put(TABLE_ID, mock(StorageEngine.class));
+
+    Map<String, Serde<Object>> serdeMap = new HashMap<>();
+    SerializableSerde<Serde> serializableSerde = new SerializableSerde();
+    map.keySet().stream()
+        .filter(k -> k.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX()))
+        .forEach(k -> {
+            String serdeName = k
+                .replace(String.format(SerializerConfig.SERIALIZER_PREFIX(), ""), "")
+                .replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), "");
+            String serializedSerde = map.get(k);
+            byte[] bytes = Base64.getDecoder().decode(serializedSerde);
+            Serde serde = serializableSerde.fromBytes(bytes);
+            serdeMap.put(serdeName, serde);
+          });
+
+    TableManager tableManager = new TableManager(new MapConfig(map), serdeMap);
+    tableManager.initLocalTables(storageEngines);
+
+    Table table = tableManager.getTable(TABLE_ID);
+    verify(DummyTableProviderFactory.tableProvider, times(1)).init(anyObject());
+    Assert.assertEquals(DummyTableProviderFactory.table, table);
+
+    Map<String, TableManager.TableCtx> ctxMap = getFieldValue(tableManager, "tables");
+    TableManager.TableCtx ctx = ctxMap.get(TABLE_ID);
+
+    TableSpec tableSpec = getFieldValue(ctx, "tableSpec");
+    Assert.assertEquals(TABLE_ID, tableSpec.getId());
+    Assert.assertEquals(DummyTableProviderFactory.class.getName(), tableSpec.getTableProviderFactoryClassName());
+    Assert.assertEquals(IntegerSerde.class, tableSpec.getSerde().getKeySerde().getClass());
+    Assert.assertEquals(StringSerde.class, tableSpec.getSerde().getValueSerde().getClass());
+    Assert.assertEquals("xyz", tableSpec.getConfig().get("some.config"));
+
+    TableProvider tableProvider = getFieldValue(ctx, "tableProvider");
+    Assert.assertNotNull(tableProvider);
+  }
+
+  private void addKeySerde(Map<String, String> map) {
+    String serdeId = "key-serde";
+    map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
+        serializeSerde(new IntegerSerde()));
+    map.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, TABLE_ID), serdeId);
+  }
+
+  private void addValueSerde(Map<String, String> map) {
+    String serdeId = "value-serde";
+    map.put(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeId),
+            serializeSerde(new StringSerde("UTF-8")));
+    map.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, TABLE_ID), serdeId);
+  }
+
+  private String serializeSerde(Serde serde) {
+    return Base64.getEncoder().encodeToString(new SerializableSerde().toBytes(serde));
+  }
+
+  private <T> T getFieldValue(Object object, String fieldName) {
+    Field field = null;
+    try {
+      field = object.getClass().getDeclaredField(fieldName);
+      field.setAccessible(true);
+      return (T) field.get(object);
+    } catch (NoSuchFieldException | IllegalAccessException ex) {
+      throw new SamzaException(ex);
+    } finally {
+      if (field != null) {
+        field.setAccessible(false);
+      }
+    }
+  }
+
+}
index b399f5f..28a4f8b 100644 (file)
@@ -30,6 +30,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
@@ -47,16 +48,15 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
 import org.junit.Test;
+
 import scala.Option;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.*;
 
 // TODO(spvenkat) SAMZA-1183: Fix all commented out tests.
 public class TestAsyncRunLoop {
@@ -86,7 +86,7 @@ public class TestAsyncRunLoop {
     scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
     return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
-        manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null);
+        manager, null, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()), null, null);
   }
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) {
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java
new file mode 100644 (file)
index 0000000..2681fb3
--- /dev/null
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.storage.kv.BaseLocalStoreBackedTableDescriptor;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for in-memory tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class InMemoryTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, InMemoryTableDescriptor<K, V>> {
+
+  public InMemoryTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+  }
+
+  @Override
+  public TableSpec getTableSpec() {
+
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    return new TableSpec(tableId, serde, InMemoryTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  private void addInMemoryConfig(Map<String, String> map, String key, String value) {
+    map.put("inmemory." + key, value);
+  }
+}
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProvider.java
new file mode 100644 (file)
index 0000000..c1c2f1c
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.kv.BaseLocalStoreBackedTableProvider;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider of an in-memory table
+ */
+public class InMemoryTableProvider extends BaseLocalStoreBackedTableProvider {
+
+  public InMemoryTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Map<String, String> generateConfig(Map<String, String> config) {
+
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Store factory configuration
+    tableConfig.put(String.format(
+        StorageConfig.FACTORY(), tableSpec.getId()),
+        InMemoryKeyValueStorageEngineFactory.class.getName());
+
+    // Common store configuration
+    tableConfig.putAll(generateCommonStoreConfig(config));
+
+    // Rest of the configuration
+    tableSpec.getConfig().forEach((k, v) -> {
+      String realKey = k.startsWith("inmemory.") ?
+          String.format("stores.%s", tableSpec.getId()) + "." + k.substring("inmemory.".length())
+        : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+      tableConfig.put(realKey, v);
+    });
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}
diff --git a/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java b/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableProviderFactory.java
new file mode 100644 (file)
index 0000000..f05982a
--- /dev/null
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv.inmemory;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+/**
+ * Factory class for an in-memory table provider
+ */
+public class InMemoryTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new InMemoryTableProvider(tableSpec);
+  }
+}
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableDescriptor.java
new file mode 100644 (file)
index 0000000..840fb70
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestInMemoryTableDescriptor {
+  @Test
+  public void testTableSpec() {
+
+    TableSpec tableSpec = new InMemoryTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withConfig("inmemory.abc", "xyz")
+        .getTableSpec();
+
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+  }
+
+  private String getConfig(TableSpec tableSpec, String key) {
+    return tableSpec.getConfig().get("inmemory." + key);
+  }
+}
diff --git a/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java b/samza-kv-inmemory/src/test/java/org/apache/samza/storage/kv/inmemory/TestInMemoryTableProvider.java
new file mode 100644 (file)
index 0000000..76b7a66
--- /dev/null
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv.inmemory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestInMemoryTableProvider {
+  @Test
+  public void testGenerateConfig() {
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("inmemory.c1", "c1-value");
+    tableSpecConfig.put("inmemory.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new InMemoryTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        InMemoryKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java
new file mode 100644 (file)
index 0000000..2c62159
--- /dev/null
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table descriptor for RocksDb backed tables
+ *
+ * @param <K> the type of the key
+ * @param <V> the type of the value
+ */
+public class RocksDbTableDescriptor<K, V> extends BaseLocalStoreBackedTableDescriptor<K, V, RocksDbTableDescriptor<K, V>> {
+
+  static final public String WRITE_BATCH_SIZE = "write.batch.size";
+  static final public String OBJECT_CACHE_SIZE = "object.cache.size";
+  static final public String CONTAINER_CACHE_SIZE_BYTES = "container.cache.size.bytes";
+  static final public String CONTAINER_WRITE_BUFFER_SIZE_BYTES = "container.write.buffer.size.bytes";
+  static final public String ROCKSDB_COMPRESSION = "rocksdb.compression";
+  static final public String ROCKSDB_BLOCK_SIZE_BYTES = "rocksdb.block.size.bytes";
+  static final public String ROCKSDB_TTL_MS = "rocksdb.ttl.ms";
+  static final public String ROCKSDB_COMPACTION_STYLE = "rocksdb.compaction.style";
+  static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
+  static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
+  static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+
+  protected Integer writeBatchSize;
+  protected Integer objectCacheSize;
+  private Integer cacheSize;
+  private Integer writeBufferSize;
+  private Integer blockSize;
+  private Integer ttl;
+  private Integer numWriteBuffers;
+  private Integer maxLogFileSize;
+  private Integer numLogFilesToKeep;
+  private String compressionType;
+  private String compactionStyle;
+
+  public RocksDbTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  /**
+   * Refer to <code>stores.store-name.write.batch.size</code> in Samza configuration guide
+   * @param writeBatchSize write batch size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withWriteBatchSize(int writeBatchSize) {
+    this.writeBatchSize = writeBatchSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.object.cache.size</code> in Samza configuration guide
+   * @param objectCacheSize the object cache size
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor withObjectCacheSize(int objectCacheSize) {
+    this.objectCacheSize = objectCacheSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.container.cache.size.bytes</code> in Samza configuration guide
+   * @param cacheSize the cache size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCacheSize(int cacheSize) {
+    this.cacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.container.write.buffer.size.bytes</code> in Samza configuration guide
+   * @param writeBufferSize the write buffer size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withWriteBufferSize(int writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.compression</code> in Samza configuration guide
+   * @param compressionType the compression type
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompressionType(String compressionType) {
+    this.compressionType = compressionType;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.block.size.bytes</code> in Samza configuration guide
+   * @param blockSize the block size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withBlockSize(int blockSize) {
+    this.blockSize = blockSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.ttl.ms</code> in Samza configuration guide
+   * @param ttl the time to live in milliseconds
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withTtl(int ttl) {
+    this.ttl = ttl;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.compaction.style</code> in Samza configuration guide
+   * @param compactionStyle the compaction style
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withCompactionStyle(String compactionStyle) {
+    this.compactionStyle = compactionStyle;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
+   * @param numWriteBuffers the number of write buffers
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumWriteBuffers(int numWriteBuffers) {
+    this.numWriteBuffers = numWriteBuffers;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.max.log.file.size.bytes</code> in Samza configuration guide
+   * @param maxLogFileSize the maximal log file size in bytes
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxLogFileSize(int maxLogFileSize) {
+    this.maxLogFileSize = maxLogFileSize;
+    return this;
+  }
+
+  /**
+   * Refer to <code>stores.store-name.rocksdb.num.write.buffers</code> in Samza configuration guide
+   * @param numLogFilesToKeep the number of log files to keep
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withNumLogFilesToKeep(int numLogFilesToKeep) {
+    this.numLogFilesToKeep = numLogFilesToKeep;
+    return this;
+  }
+
+  /**
+   * Create a table spec based on this table description
+   * @return the table spec
+   */
+  @Override
+  public TableSpec getTableSpec() {
+
+    validate();
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    generateTableSpecConfig(tableSpecConfig);
+
+    return new TableSpec(tableId, serde, RocksDbTableProviderFactory.class.getName(), tableSpecConfig);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+
+    super.generateTableSpecConfig(tableSpecConfig);
+
+    if (writeBatchSize != null) {
+      addRocksDbConfig(tableSpecConfig, WRITE_BATCH_SIZE, writeBatchSize.toString());
+    }
+    if (objectCacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, OBJECT_CACHE_SIZE, objectCacheSize.toString());
+    }
+    if (cacheSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_CACHE_SIZE_BYTES, cacheSize.toString());
+    }
+    if (writeBufferSize != null) {
+      addRocksDbConfig(tableSpecConfig, CONTAINER_WRITE_BUFFER_SIZE_BYTES, writeBufferSize.toString());
+    }
+    if (compressionType != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPRESSION, compressionType);
+    }
+    if (blockSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_BLOCK_SIZE_BYTES, blockSize.toString());
+    }
+    if (ttl != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_TTL_MS, ttl.toString());
+    }
+    if (compactionStyle != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_COMPACTION_STYLE, compactionStyle);
+    }
+    if (numWriteBuffers != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_NUM_WRITE_BUFFERS, numWriteBuffers.toString());
+    }
+    if (maxLogFileSize != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, maxLogFileSize.toString());
+    }
+    if (numLogFilesToKeep != null) {
+      addRocksDbConfig(tableSpecConfig, ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString());
+    }
+  }
+
+  private void addRocksDbConfig(Map<String, String> map, String key, String value) {
+    map.put("rocksdb." + key, value);
+  }
+
+}
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
new file mode 100644 (file)
index 0000000..eb8188f
--- /dev/null
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.table.TableSpec;
+
+
+/**
+ * Table provider for tables backed by RocksDb.
+ */
+public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
+
+  public RocksDbTableProvider(TableSpec tableSpec) {
+    super(tableSpec);
+  }
+
+  @Override
+  public Map<String, String> generateConfig(Map<String, String> config) {
+
+    Map<String, String> tableConfig = new HashMap<>();
+
+    // Store factory configuration
+    tableConfig.put(String.format(
+        StorageConfig.FACTORY(), tableSpec.getId()),
+        RocksDbKeyValueStorageEngineFactory.class.getName());
+
+    // Common store configuration
+    tableConfig.putAll(generateCommonStoreConfig(config));
+
+    // Rest of the configuration
+    tableSpec.getConfig().forEach((k, v) -> {
+      String realKey = k.startsWith("rocksdb.") ?
+          String.format("stores.%s", tableSpec.getId()) + "." + k.substring("rocksdb.".length())
+        : String.format(JavaTableConfig.TABLE_ID_PREFIX, tableSpec.getId()) + "." + k;
+      tableConfig.put(realKey, v);
+    });
+
+    logger.info("Generated configuration for table " + tableSpec.getId());
+
+    return tableConfig;
+  }
+
+}
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProviderFactory.java
new file mode 100644 (file)
index 0000000..dbe0f97
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableProviderFactory;
+import org.apache.samza.table.TableSpec;
+
+
+public class RocksDbTableProviderFactory implements TableProviderFactory {
+  @Override
+  public TableProvider getTableProvider(TableSpec tableSpec) {
+    return new RocksDbTableProvider(tableSpec);
+  }
+}
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableDescriptor.java
new file mode 100644 (file)
index 0000000..49fe6eb
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableDescriptor {
+
+  @Test
+  public void testMinimal() {
+    new RocksDbTableDescriptor<Integer, String>("1")
+        .validate();
+  }
+
+  @Test
+  public void testSerde() {
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .getTableSpec();
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertEquals(tableSpec.getSerde().getKeySerde().getClass(), IntegerSerde.class);
+    Assert.assertEquals(tableSpec.getSerde().getValueSerde().getClass(), StringSerde.class);
+  }
+
+  @Test
+  public void testTableSpec() {
+
+    TableSpec tableSpec = new RocksDbTableDescriptor<Integer, String>("1")
+        .withSerde(KVSerde.of(new IntegerSerde(), new StringSerde()))
+        .withBlockSize(1)
+        .withCacheSize(2)
+        .withCompactionStyle("fifo")
+        .withCompressionType("snappy")
+        .withMaxLogFileSize(3)
+        .withNumLogFilesToKeep(4)
+        .withNumWriteBuffers(5)
+        .withObjectCacheSize(6)
+        .withTtl(7)
+        .withWriteBatchSize(8)
+        .withWriteBufferSize(9)
+        .withConfig("rocksdb.abc", "xyz")
+        .getTableSpec();
+
+    Assert.assertNotNull(tableSpec.getSerde());
+    Assert.assertNotNull(tableSpec.getSerde().getKeySerde());
+    Assert.assertNotNull(tableSpec.getSerde().getValueSerde());
+    Assert.assertEquals("1", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES));
+    Assert.assertEquals("2", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES));
+    Assert.assertEquals("3", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES));
+    Assert.assertEquals("4", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_KEEP_LOG_FILE_NUM));
+    Assert.assertEquals("5", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_NUM_WRITE_BUFFERS));
+    Assert.assertEquals("6", getConfig(tableSpec, RocksDbTableDescriptor.OBJECT_CACHE_SIZE));
+    Assert.assertEquals("7", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_TTL_MS));
+    Assert.assertEquals("8", getConfig(tableSpec, RocksDbTableDescriptor.WRITE_BATCH_SIZE));
+    Assert.assertEquals("9", getConfig(tableSpec, RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES));
+    Assert.assertEquals("snappy", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPRESSION));
+    Assert.assertEquals("fifo", getConfig(tableSpec, RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE));
+    Assert.assertEquals("xyz", getConfig(tableSpec, "abc"));
+  }
+
+  private String getConfig(TableSpec tableSpec, String key) {
+    return tableSpec.getConfig().get("rocksdb." + key);
+  }
+}
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/TestRocksDbTableProvider.java
new file mode 100644 (file)
index 0000000..beda5da
--- /dev/null
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.table.TableProvider;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+
+public class TestRocksDbTableProvider {
+  @Test
+  public void testGenerateConfig() {
+
+    Map<String, String> tableSpecConfig = new HashMap<>();
+    tableSpecConfig.put("rocksdb.c1", "c1-value");
+    tableSpecConfig.put("rocksdb.c2", "c2-value");
+    tableSpecConfig.put("c3", "c3-value");
+    tableSpecConfig.put("c4", "c4-value");
+
+    TableSpec tableSpec = new TableSpec("t1", KVSerde.of(new IntegerSerde(), new IntegerSerde()),
+        "my-table-provider-factory", tableSpecConfig);
+
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    TableProvider tableProvider = new RocksDbTableProvider(tableSpec);
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+    Assert.assertEquals(
+        RocksDbKeyValueStorageEngineFactory.class.getName(),
+        tableConfig.get(String.format(StorageConfig.FACTORY(), "t1")));
+    Assert.assertEquals("c1-value", tableConfig.get("stores.t1.c1"));
+    Assert.assertEquals("c2-value", tableConfig.get("stores.t1.c2"));
+    Assert.assertEquals("c3-value", tableConfig.get("tables.t1.c3"));
+    Assert.assertEquals("c4-value", tableConfig.get("tables.t1.c4"));
+  }
+}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableDescriptor.java
new file mode 100644 (file)
index 0000000..1f9b57b
--- /dev/null
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.Map;
+
+import org.apache.samza.operators.BaseTableDescriptor;
+
+
+/**
+ * Table descriptor for store backed tables.
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ * @param <D> the type of the concrete table descriptor
+ */
+abstract public class BaseLocalStoreBackedTableDescriptor<K, V, D extends BaseLocalStoreBackedTableDescriptor<K, V, D>>
+    extends BaseTableDescriptor<K, V, D> {
+
+  /**
+   * Constructs a table descriptor instance
+   * @param tableId Id of the table
+   */
+  public BaseLocalStoreBackedTableDescriptor(String tableId) {
+    super(tableId);
+  }
+
+  @Override
+  protected void generateTableSpecConfig(Map<String, String> tableSpecConfig) {
+    super.generateTableSpecConfig(tableSpecConfig);
+  }
+
+  /**
+   * Validate that this table descriptor is constructed properly
+   */
+  protected void validate() {
+    super.validate();
+  }
+
+}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/BaseLocalStoreBackedTableProvider.java
new file mode 100644 (file)
index 0000000..4af0f1d
--- /dev/null
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.table.LocalStoreBackedTableProvider;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Base class for tables backed by Samza stores, see {@link LocalStoreBackedTableProvider}.
+ */
+abstract public class BaseLocalStoreBackedTableProvider implements LocalStoreBackedTableProvider {
+
+  protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+  protected final TableSpec tableSpec;
+
+  protected KeyValueStore kvStore;
+
+  public BaseLocalStoreBackedTableProvider(TableSpec tableSpec) {
+    this.tableSpec = tableSpec;
+  }
+
+  @Override
+  public void init(StorageEngine store) {
+    kvStore = (KeyValueStore) store;
+    logger.info("Initialized backing store for table " + tableSpec.getId());
+  }
+
+  @Override
+  public Table getTable() {
+    if (kvStore == null) {
+      throw new SamzaException("Store not initialized for table " + tableSpec.getId());
+    }
+    return new LocalStoreBackedReadWriteTable(kvStore);
+  }
+
+  @Override
+  public void start() {
+    logger.info("Starting table provider for table " + tableSpec.getId());
+  }
+
+  @Override
+  public void stop() {
+    logger.info("Stopping table provider for table " + tableSpec.getId());
+  }
+
+  protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) {
+
+    Map<String, String> storeConfig = new HashMap<>();
+
+    // We assume the configuration for serde are already generated for this table,
+    // so we simply carry them over to store configuration.
+    //
+    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config));
+
+    String keySerde = tableConfig.getKeySerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
+
+    String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
+    storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
+
+    return storeConfig;
+  }
+}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadWriteTable.java
new file mode 100644 (file)
index 0000000..3149c86
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.List;
+
+import org.apache.samza.table.ReadWriteTable;
+
+
+/**
+ * A store backed readable and writable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalStoreBackedReadWriteTable<K, V> extends LocalStoreBackedReadableTable<K, V>
+    implements ReadWriteTable<K, V> {
+
+  /**
+   * Constructs an instance of {@link LocalStoreBackedReadWriteTable}
+   * @param kvStore the backing store
+   */
+  public LocalStoreBackedReadWriteTable(KeyValueStore kvStore) {
+    super(kvStore);
+  }
+
+  @Override
+  public void put(K key, V value) {
+    kvStore.put(key, value);
+  }
+
+  @Override
+  public void putAll(List<Entry<K, V>> entries) {
+    entries.forEach(e -> kvStore.put(e.getKey(), e.getValue()));
+  }
+
+  @Override
+  public void delete(K key) {
+    kvStore.delete(key);
+  }
+
+  @Override
+  public void deleteAll(List<K> keys) {
+    keys.forEach(k -> kvStore.delete(k));
+  }
+
+  @Override
+  public void flush() {
+    kvStore.flush();
+  }
+
+}
diff --git a/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java b/samza-kv/src/main/java/org/apache/samza/storage/kv/LocalStoreBackedReadableTable.java
new file mode 100644 (file)
index 0000000..fead086
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage.kv;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.samza.table.ReadableTable;
+
+
+/**
+ * A store backed readable table
+ *
+ * @param <K> the type of the key in this table
+ * @param <V> the type of the value in this table
+ */
+public class LocalStoreBackedReadableTable<K, V> implements ReadableTable<K, V> {
+
+  protected KeyValueStore<K, V> kvStore;
+
+  /**
+   * Constructs an instance of {@link LocalStoreBackedReadableTable}
+   * @param kvStore the backing store
+   */
+  public LocalStoreBackedReadableTable(KeyValueStore<K, V> kvStore) {
+    this.kvStore = kvStore;
+  }
+
+  @Override
+  public V get(K key) {
+    return kvStore.get(key);
+  }
+
+  @Override
+  public Map<K, V> getAll(List<K> keys) {
+    return keys.stream().collect(Collectors.toMap(k -> k, k -> kvStore.get(k)));
+  }
+
+  @Override
+  public void close() {
+    // The KV store is not closed here as it may still be needed by downstream operators,
+    // it will be closed by the SamzaContainer
+  }
+}
diff --git a/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java b/samza-kv/src/test/java/org/apache/samza/storage/kv/TestLocalBaseStoreBackedTableProvider.java
new file mode 100644 (file)
index 0000000..9c95637
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.JavaTableConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.storage.StorageEngine;
+import org.apache.samza.table.TableSpec;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestLocalBaseStoreBackedTableProvider {
+
+  private BaseLocalStoreBackedTableProvider tableProvider;
+
+  @Before
+  public void prepare() {
+    TableSpec tableSpec = mock(TableSpec.class);
+    when(tableSpec.getId()).thenReturn("t1");
+    tableProvider = new BaseLocalStoreBackedTableProvider(tableSpec) {
+      @Override
+      public Map<String, String> generateConfig(Map<String, String> config) {
+        return generateCommonStoreConfig(config);
+      }
+    };
+  }
+
+  @Test
+  public void testInit() {
+    StorageEngine store = mock(KeyValueStorageEngine.class);
+    tableProvider.init(store);
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test(expected = SamzaException.class)
+  public void testInitFail() {
+    Assert.assertNotNull(tableProvider.getTable());
+  }
+
+  @Test
+  public void testGenerateCommonStoreConfig() {
+    Map<String, String> config = new HashMap<>();
+    config.put(String.format(JavaTableConfig.TABLE_KEY_SERDE, "t1"), "ks1");
+    config.put(String.format(JavaTableConfig.TABLE_VALUE_SERDE, "t1"), "vs1");
+
+    Map<String, String> tableConfig = tableProvider.generateConfig(config);
+    Assert.assertEquals("ks1", tableConfig.get(String.format(StorageConfig.KEY_SERDE(), "t1")));
+    Assert.assertEquals("vs1", tableConfig.get(String.format(StorageConfig.MSG_SERDE(), "t1")));
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
new file mode 100644 (file)
index 0000000..8f7eb5d
--- /dev/null
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.StreamTableJoinFunction;
+import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
+import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
+import org.apache.samza.table.ReadableTable;
+import org.apache.samza.table.Table;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
+import org.apache.samza.test.table.TestTableData.EnrichedPageView;
+import org.apache.samza.test.table.TestTableData.PageView;
+import org.apache.samza.test.table.TestTableData.PageViewJsonSerde;
+import org.apache.samza.test.table.TestTableData.PageViewJsonSerdeFactory;
+import org.apache.samza.test.table.TestTableData.Profile;
+import org.apache.samza.test.table.TestTableData.ProfileJsonSerde;
+import org.apache.samza.test.util.ArraySystemFactory;
+import org.apache.samza.test.util.Base64Serializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test class tests sendTo() and join() for local tables
+ */
+public class TestLocalTable extends AbstractIntegrationTestHarness {
+
+  @Test
+  public void testSendTo() throws  Exception {
+
+    int count = 10;
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    MyMapFunction mapFn = new MyMapFunction();
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+          .map(mapFn)
+          .sendTo(table);
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, mapFn.received.size());
+    assertEquals(count, new HashSet(mapFn.received).size());
+    mapFn.received.forEach(p -> Assert.assertTrue(mapFn.table.get(p.getMemberId()) != null));
+  }
+
+  @Test
+  public void testStreamTableJoin() throws Exception {
+
+    List<PageView> received = new LinkedList<>();
+    List<EnrichedPageView> joined = new LinkedList<>();
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.PageView.samza.system", "test");
+    configs.put("streams.PageView.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile.samza.system", "test");
+    configs.put("streams.Profile.samza.bootstrap", "true");
+    configs.put("streams.Profile.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile.partitionCount", String.valueOf(partitionCount));
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> table = streamGraph.getTable(new InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      streamGraph.getInputStream("Profile", new NoOpSerde<Profile>())
+          .map(m -> new KV(m.getMemberId(), m))
+          .sendTo(table);
+
+      streamGraph.getInputStream("PageView", new NoOpSerde<PageView>())
+          .map(pv -> {
+              received.add(pv);
+              return pv;
+            })
+          .partitionBy(PageView::getMemberId, v -> v, "p1")
+          .join(table, new PageViewToProfileJoinFunction())
+          .sink((m, collector, coordinator) -> joined.add(m));
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, received.size());
+    assertEquals(count * partitionCount, joined.size());
+    assertTrue(joined.get(0) instanceof EnrichedPageView);
+  }
+
+  @Test
+  public void testDualStreamTableJoin() throws Exception {
+
+    List<Profile> sentToProfileTable1 = new LinkedList<>();
+    List<Profile> sentToProfileTable2 = new LinkedList<>();
+    List<EnrichedPageView> joinedPageViews1 = new LinkedList<>();
+    List<EnrichedPageView> joinedPageViews2 = new LinkedList<>();
+
+    KVSerde<Integer, Profile> profileKVSerde = KVSerde.of(new IntegerSerde(), new ProfileJsonSerde());
+    KVSerde<Integer, PageView> pageViewKVSerde = KVSerde.of(new IntegerSerde(), new PageViewJsonSerde());
+
+    PageViewToProfileJoinFunction joinFn1 = new PageViewToProfileJoinFunction();
+    PageViewToProfileJoinFunction joinFn2 = new PageViewToProfileJoinFunction();
+
+    int count = 10;
+    PageView[] pageViews = TestTableData.generatePageViews(count);
+    Profile[] profiles = TestTableData.generateProfiles(count);
+
+    int partitionCount = 4;
+    Map<String, String> configs = getBaseJobConfig();
+
+    configs.put("streams.Profile1.samza.system", "test");
+    configs.put("streams.Profile1.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile1.samza.bootstrap", "true");
+    configs.put("streams.Profile1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.Profile2.samza.system", "test");
+    configs.put("streams.Profile2.source", Base64Serializer.serialize(profiles));
+    configs.put("streams.Profile2.samza.bootstrap", "true");
+    configs.put("streams.Profile2.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView1.samza.system", "test");
+    configs.put("streams.PageView1.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView1.partitionCount", String.valueOf(partitionCount));
+
+    configs.put("streams.PageView2.samza.system", "test");
+    configs.put("streams.PageView2.source", Base64Serializer.serialize(pageViews));
+    configs.put("streams.PageView2.partitionCount", String.valueOf(partitionCount));
+
+    final LocalApplicationRunner runner = new LocalApplicationRunner(new MapConfig(configs));
+    final StreamApplication app = (streamGraph, cfg) -> {
+
+      Table<KV<Integer, Profile>> profileTable = streamGraph.getTable(new InMemoryTableDescriptor("t1")
+          .withSerde(KVSerde.of(new IntegerSerde(), new ProfileJsonSerde())));
+
+      MessageStream<Profile> profileStream1 = streamGraph.getInputStream("Profile1", new NoOpSerde<Profile>());
+      MessageStream<Profile> profileStream2 = streamGraph.getInputStream("Profile2", new NoOpSerde<Profile>());
+
+      profileStream1
+          .map(m -> {
+              sentToProfileTable1.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+      profileStream2
+          .map(m -> {
+              sentToProfileTable2.add(m);
+              return new KV(m.getMemberId(), m);
+            })
+          .sendTo(profileTable);
+
+      MessageStream<PageView> pageViewStream1 = streamGraph.getInputStream("PageView1", new NoOpSerde<PageView>());
+      MessageStream<PageView> pageViewStream2 = streamGraph.getInputStream("PageView2", new NoOpSerde<PageView>());
+
+      pageViewStream1
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p1")
+          .join(profileTable, joinFn1)
+          .sink((m, collector, coordinator) -> joinedPageViews1.add(m));
+
+      pageViewStream2
+          .partitionBy(PageView::getMemberId, v -> v, pageViewKVSerde, "p2")
+          .join(profileTable, joinFn2)
+          .sink((m, collector, coordinator) -> joinedPageViews2.add(m));
+    };
+
+    runner.run(app);
+    runner.waitForFinish();
+
+    assertEquals(count * partitionCount, sentToProfileTable1.size());
+    assertEquals(count * partitionCount, sentToProfileTable2.size());
+    assertEquals(count * partitionCount, joinFn1.count);
+    assertEquals(count * partitionCount, joinFn2.count);
+    assertEquals(count * partitionCount, joinedPageViews1.size());
+    assertEquals(count * partitionCount, joinedPageViews2.size());
+    assertTrue(joinedPageViews1.get(0) instanceof EnrichedPageView);
+    assertTrue(joinedPageViews2.get(0) instanceof EnrichedPageView);
+  }
+
+  private Map<String, String> getBaseJobConfig() {
+    Map<String, String> configs = new HashMap<>();
+    configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
+
+    configs.put(JobConfig.JOB_NAME(), "test-table-job");
+    configs.put(JobConfig.PROCESSOR_ID(), "1");
+    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
+    configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+    configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+
+    // For intermediate streams
+    configs.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
+    configs.put("systems.kafka.producer.bootstrap.servers", bootstrapUrl());
+    configs.put("systems.kafka.consumer.zookeeper.connect", zkConnect());
+    configs.put("systems.kafka.samza.key.serde", "int");
+    configs.put("systems.kafka.samza.msg.serde", "json");
+    configs.put("systems.kafka.default.stream.replication.factor", "1");
+    configs.put("job.default.system", "kafka");
+
+    configs.put("serializers.registry.int.class", "org.apache.samza.serializers.IntegerSerdeFactory");
+    configs.put("serializers.registry.json.class", PageViewJsonSerdeFactory.class.getName());
+
+    return configs;
+  }
+
+  private class MyMapFunction implements MapFunction<Profile, KV<Integer, Profile>> {
+
+    private List<Profile> received = new ArrayList<>();
+    private ReadableTable table;
+
+    @Override
+    public void init(Config config, TaskContext context) {
+      table = (ReadableTable) context.getTable("t1");
+    }
+
+    @Override
+    public KV<Integer, Profile> apply(Profile profile) {
+      received.add(profile);
+      return new KV(profile.getMemberId(), profile);
+    }
+  }
+
+  private class PageViewToProfileJoinFunction implements StreamTableJoinFunction
+      <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
+    private int count;
+    @Override
+    public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) {
+      ++count;
+      return r == null ? null :
+          new EnrichedPageView(m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany());
+    }
+
+    @Override
+    public Integer getMessageKey(KV<Integer, PageView> message) {
+      return message.getKey();
+    }
+
+    @Override
+    public Integer getRecordKey(KV<Integer, Profile> record) {
+      return record.getKey();
+    }
+  }
+}
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableData.java
new file mode 100644 (file)
index 0000000..dfd0d1b
--- /dev/null
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.table;
+
+import java.io.Serializable;
+import java.util.Random;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerdeFactory;
+import org.codehaus.jackson.annotate.JsonCreator;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+
+public class TestTableData {
+
+  public static class PageView implements Serializable {
+    @JsonProperty("pageKey")
+    final String pageKey;
+    @JsonProperty("memberId")
+    final int memberId;
+
+    @JsonProperty("pageKey")
+    public String getPageKey() {
+      return pageKey;
+    }
+
+    @JsonProperty("memberId")
+    public int getMemberId() {
+      return memberId;
+    }
+
+    @JsonCreator
+    public PageView(@JsonProperty("pageKey") String pageKey, @JsonProperty("memberId") int memberId) {
+      this.pageKey = pageKey;
+      this.memberId = memberId;
+    }
+  }
+
+  public static class Profile implements Serializable {
+    @JsonProperty("memberId")
+    final int memberId;
+
+    @JsonProperty("company")
+    final String company;
+
+    @JsonProperty("memberId")
+    public int getMemberId() {
+      return memberId;
+    }
+
+    @JsonProperty("company")
+    public String getCompany() {
+      return company;
+    }
+
+    @JsonCreator
+    public Profile(@JsonProperty("memberId") int memberId, @JsonProperty("company") String company) {
+      this.memberId = memberId;
+      this.company = company;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null || !(o instanceof Profile)) {
+        return false;
+      }
+      return ((Profile) o).getMemberId() == memberId;
+    }
+
+    @Override
+    public int hashCode() {
+      return memberId;
+    }
+  }
+
+  public static class EnrichedPageView extends PageView {
+
+    @JsonProperty("company")
+    final String company;
+
+    @JsonProperty("company")
+    public String getCompany() {
+      return company;
+    }
+
+    @JsonCreator
+    public EnrichedPageView(
+        @JsonProperty("pageKey") String pageKey,
+        @JsonProperty("memberId") int memberId,
+        @JsonProperty("company") String company) {
+      super(pageKey, memberId);
+      this.company = company;
+    }
+  }
+
+  public static class PageViewJsonSerdeFactory implements SerdeFactory<PageView> {
+    @Override public Serde<PageView> getSerde(String name, Config config) {
+      return new PageViewJsonSerde();
+    }
+  }
+
+  public static class ProfileJsonSerdeFactory implements SerdeFactory<Profile> {
+    @Override public Serde<Profile> getSerde(String name, Config config) {
+      return new ProfileJsonSerde();
+    }
+  }
+
+  public static class PageViewJsonSerde implements Serde<PageView> {
+
+    @Override
+    public PageView fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<PageView>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(PageView pv) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(pv).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  public static class ProfileJsonSerde implements Serde<Profile> {
+
+    @Override
+    public Profile fromBytes(byte[] bytes) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<Profile>() { });
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+
+    @Override
+    public byte[] toBytes(Profile p) {
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.writeValueAsString(p).getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException(e);
+      }
+    }
+  }
+
+  private static final String[] PAGEKEYS = {"inbox", "home", "search", "pymk", "group", "job"};
+
+  static public PageView[] generatePageViews(int count) {
+    Random random = new Random();
+    PageView[] pageviews = new PageView[count];
+    for (int i = 0; i < count; i++) {
+      String pagekey = PAGEKEYS[random.nextInt(PAGEKEYS.length - 1)];
+      int memberId = random.nextInt(10);
+      pageviews[i] = new PageView(pagekey, memberId);
+    }
+    return pageviews;
+  }
+
+  private static final String[] COMPANIES = {"MSFT", "LKND", "GOOG", "FB", "AMZN", "CSCO"};
+
+  static public Profile[] generateProfiles(int count) {
+    Random random = new Random();
+    Profile[] profiles = new Profile[count];
+    for (int i = 0; i < count; i++) {
+      String company = COMPANIES[random.nextInt(COMPANIES.length - 1)];
+      profiles[i] = new Profile(i, company);
+    }
+    return profiles;
+  }
+
+}
index 832457b..6ba28ae 100644 (file)
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -58,9 +59,10 @@ public class ArraySystemConsumer implements SystemConsumer {
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long l) throws InterruptedException {
     if (!done) {
       Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopeMap = new HashMap<>();
+      final AtomicInteger offset = new AtomicInteger(0);
       set.forEach(ssp -> {
           List<IncomingMessageEnvelope> envelopes = Arrays.stream(getArrayObjects(ssp.getSystemStream().getStream(), config))
-              .map(object -> new IncomingMessageEnvelope(ssp, null, null, object)).collect(Collectors.toList());
+              .map(object -> new IncomingMessageEnvelope(ssp, String.valueOf(offset.incrementAndGet()), null, object)).collect(Collectors.toList());
           envelopes.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
           envelopeMap.put(ssp, envelopes);
         });
index 8890a2f..c735c74 100644 (file)
@@ -24,7 +24,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+
 import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemStreamMetadata;
@@ -50,13 +52,17 @@ public class SimpleSystemAdmin implements SystemAdmin {
   @Override
   public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames) {
     return streamNames.stream()
-        .collect(Collectors.toMap(
-            Function.<String>identity(),
-            streamName -> {
+        .collect(Collectors.toMap(Function.identity(), streamName -> {
+            int messageCount = isBootstrapStream(streamName) ? getMessageCount(streamName) : -1;
+            String oldestOffset = messageCount < 0 ? null : "0";
+            String newestOffset = messageCount < 0 ? null : String.valueOf(messageCount - 1);
+            String upcomingOffset = messageCount < 0 ? null : String.valueOf(messageCount);
             Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> metadataMap = new HashMap<>();
             int partitionCount = config.getInt("streams." + streamName + ".partitionCount", 1);
             for (int i = 0; i < partitionCount; i++) {
-              metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(null, null, null));
+              metadataMap.put(new Partition(i), new SystemStreamMetadata.SystemStreamPartitionMetadata(
+                  oldestOffset, newestOffset, upcomingOffset
+              ));
             }
             return new SystemStreamMetadata(streamName, metadataMap);
           }));
@@ -71,5 +77,17 @@ public class SimpleSystemAdmin implements SystemAdmin {
     }
     return offset1.compareTo(offset2);
   }
+
+  private int getMessageCount(String streamName) {
+    try {
+      return Base64Serializer.deserialize(config.get("streams." + streamName + ".source"), Object[].class).length;
+    } catch (Exception e) {
+      throw new SamzaException(e);
+    }
+  }
+
+  private boolean isBootstrapStream(String streamName) {
+    return "true".equalsIgnoreCase(config.get("streams." + streamName + ".samza.bootstrap", "false"));
+  }
 }