SAMZA-2035: Detect CachingTableDescriptor as remote table.
authorAditya Toomula <atoomula@linkedin.com>
Tue, 11 Dec 2018 19:16:49 +0000 (11:16 -0800)
committerAditya Toomula <atoomula@linkedin.com>
Tue, 11 Dec 2018 19:16:49 +0000 (11:16 -0800)
Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: shenodaguirguis,weiqingy

Closes #854 from atoomula/cache and squashes the following commits:

ecc3b93f [Aditya Toomula] SAMZA-2035: Detect CachingTableDescriptor as remote table.
ecc3bdba [Aditya Toomula] Detect CachingTableDescriptor as remote table.

samza-sql/src/main/java/org/apache/samza/sql/interfaces/SqlIOConfig.java
samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
samza-sql/src/main/java/org/apache/samza/sql/translator/ScanTranslator.java

index 5fa30e7..d92faae 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.commons.lang.Validate;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StreamConfig;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.apache.samza.table.descriptors.TableDescriptor;
 import org.apache.samza.system.SystemStream;
@@ -155,6 +156,7 @@ public class SqlIOConfig {
   }
 
   public boolean isRemoteTable() {
-    return tableDescriptor.isPresent() && tableDescriptor.get() instanceof RemoteTableDescriptor;
+    return tableDescriptor.isPresent() && (tableDescriptor.get() instanceof RemoteTableDescriptor ||
+        tableDescriptor.get() instanceof CachingTableDescriptor);
   }
 }
index c99551e..db0349d 100644 (file)
@@ -49,6 +49,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.serializers.SamzaSqlRelMessageSerdeFactory;
 import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.table.Table;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -322,7 +323,8 @@ class JoinTranslator {
       SqlIOConfig sourceTableConfig = resolveSourceConfigForTable(relNode, context);
       if (sourceTableConfig == null || !sourceTableConfig.getTableDescriptor().isPresent()) {
         return JoinInputNode.InputType.STREAM;
-      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor) {
+      } else if (sourceTableConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+          sourceTableConfig.getTableDescriptor().get() instanceof CachingTableDescriptor) {
         return JoinInputNode.InputType.REMOTE_TABLE;
       } else {
         return JoinInputNode.InputType.LOCAL_TABLE;
index aa73f94..7f1ff39 100644 (file)
@@ -43,6 +43,7 @@ import org.apache.samza.sql.interfaces.SqlIOConfig;
 import org.apache.samza.sql.runner.SamzaSqlApplicationContext;
 import org.apache.samza.system.descriptors.DelegatingSystemDescriptor;
 import org.apache.samza.system.descriptors.GenericInputDescriptor;
+import org.apache.samza.table.descriptors.CachingTableDescriptor;
 import org.apache.samza.table.descriptors.RemoteTableDescriptor;
 
 
@@ -157,7 +158,8 @@ class ScanTranslator {
     final String source = sqlIOConfig.getSource();
 
     final boolean isRemoteTable = sqlIOConfig.getTableDescriptor().isPresent() &&
-        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor);
+        (sqlIOConfig.getTableDescriptor().get() instanceof RemoteTableDescriptor ||
+            sqlIOConfig.getTableDescriptor().get() instanceof CachingTableDescriptor);
 
     // For remote table, we don't have an input stream descriptor. The table descriptor is already defined by the
     // SqlIOResolverFactory.