SAMZA-1802: Enable host affinity when RocksDB is present
authorWei Song <wsong@linkedin.com>
Tue, 7 Aug 2018 17:13:01 +0000 (10:13 -0700)
committerWei Song <wsong@linkedin.com>
Tue, 7 Aug 2018 17:13:01 +0000 (10:13 -0700)
We should enable host affinity when RocksDB table is present, this should be done in RocksDB table provider.

Author: Wei Song <wsong@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@linkedin.com>

Closes #600 from weisong44/add-host-affinity and squashes the following commits:

78e1b84a [Wei Song] Enable host affinity in RocksDB table provider
a15a7c9a [Wei Song] Merge remote-tracking branch 'upstream/master'
5cbf9af9 [Wei Song] Merge remote-tracking branch 'upstream/master'
3f7ed71f [Wei Song] Added self to committer list

samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableProvider.java
samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java

index eb8188f..dce7cc0 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.storage.kv;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.samza.config.ClusterManagerConfig;
 import org.apache.samza.config.JavaTableConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.table.TableSpec;
@@ -56,6 +57,9 @@ public class RocksDbTableProvider extends BaseLocalStoreBackedTableProvider {
       tableConfig.put(realKey, v);
     });
 
+    // Enable host affinity
+    tableConfig.put(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, "true");
+
     logger.info("Generated configuration for table " + tableSpec.getId());
 
     return tableConfig;
index 3ed29ca..817fb9f 100644 (file)
@@ -81,7 +81,7 @@ public class TestTableDescriptorsProvider {
     String tableRewriterName = "tableRewriter";
     configs.put("tables.descriptors.provider.class", MySampleTableDescriptorsProvider.class.getName());
     Config resultConfig = new MySampleTableConfigRewriter().rewrite(tableRewriterName, new MapConfig(configs));
-    Assert.assertTrue(resultConfig.size() == 17);
+    Assert.assertTrue(resultConfig.size() == 18);
 
     String localTableId = "local-table-1";
     String remoteTableId = "remote-table-1";