SAMZA-2047: Fixing udfs that return samzasqlrelrecord type and other such types to...
authorAditya Toomula <atoomula@linkedin.com>
Wed, 2 Jan 2019 21:35:48 +0000 (13:35 -0800)
committerAditya Toomula <atoomula@linkedin.com>
Wed, 2 Jan 2019 21:35:48 +0000 (13:35 -0800)
Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: vjagadish1989,weiqingy

Closes #866 from atoomula/joinudf

samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java [new file with mode: 0644]
samza-sql/src/main/java/org/apache/samza/sql/translator/JoinTranslator.java
samza-sql/src/test/java/org/apache/samza/sql/fn/TestBuildOutputRecordUdf.java [new file with mode: 0644]
samza-sql/src/test/java/org/apache/samza/sql/testutil/SampleRelTableKeyConverter.java
samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlRemoteTable.java

diff --git a/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java b/samza-sql/src/main/java/org/apache/samza/sql/fn/BuildOutputRecordUdf.java
new file mode 100644 (file)
index 0000000..e752e6a
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.fn;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.Validate;
+import org.apache.samza.config.Config;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.apache.samza.sql.udfs.ScalarUdf;
+
+
+/**
+ * BuildOutputRecordUdf builds a SamzaSqlRelRecord with given list of key value pairs.
+ * Useful if you need to populate fields for a Kafka message.
+ *
+ * For example, given args = {k1, v1, k2, v2},
+ * it returns a SamzaSqlRelRecord with fieldNames={k1, k2} and fieldValues={v1, v2},
+ * where v1 or v2 can be any Object, including SamzaSqlRelRecord when you want to set nested field.
+ *
+ * Consider the below nested output schema:
+ * {
+ *   field1,
+ *   field2:{
+ *     field21,
+ *     field22
+ *   },
+ *   field3:{
+ *     field31:{
+ *       field311,
+ *       field312
+ *     },
+ *     field32
+ *   }
+ * };
+ * It could be built in the select statement as:
+ * select obj1 as field1,
+ *        BuildSamzaSqlRelRecord("field21", obj21, "field22", obj22) as field2,
+ *        BuildSamzaSqlRelRecord("field31", BuildSamzaSqlRelRecord("field311", obj311, "field312", obj312),
+ *                               "field32", obj32) as field3
+ *
+ * If no args is provided, it returns an empty SamzaSqlRelRecord (with empty field names and values list).
+ */
+public class BuildOutputRecordUdf implements ScalarUdf<SamzaSqlRelRecord> {
+  @Override
+  public void init(Config udfConfig) {
+  }
+
+  @Override
+  public SamzaSqlRelRecord execute(Object... args) {
+    int numOfArgs = args.length;
+    Validate.isTrue(numOfArgs % 2 == 0, "numOfArgs should be an even number");
+
+    List<String> fieldNames = new ArrayList<>();
+    List<Object> fieldValues = new ArrayList<>();
+
+    for (int i = 0; i < numOfArgs - 1; i += 2) {
+      fieldNames.add((String) args[i]);
+      // value can be instanceof SamzaSqlRelRecord, or any Object(string, int, long most likely)
+      fieldValues.add(args[i + 1]);
+    }
+
+    return new SamzaSqlRelRecord(fieldNames, fieldValues);
+  }
+}
index db0349d..0d625bc 100644 (file)
@@ -280,7 +280,7 @@ class JoinTranslator {
     if (sqlTypeName != SqlTypeName.BOOLEAN && sqlTypeName != SqlTypeName.TINYINT && sqlTypeName != SqlTypeName.SMALLINT
         && sqlTypeName != SqlTypeName.INTEGER && sqlTypeName != SqlTypeName.CHAR && sqlTypeName != SqlTypeName.BIGINT
         && sqlTypeName != SqlTypeName.VARCHAR && sqlTypeName != SqlTypeName.DOUBLE && sqlTypeName != SqlTypeName.FLOAT
-        && sqlTypeName != SqlTypeName.ANY) {
+        && sqlTypeName != SqlTypeName.ANY && sqlTypeName != SqlTypeName.OTHER) {
       log.error("Unsupported key type " + sqlTypeName + " used in join condition.");
       throw new SamzaException("Unsupported key type used in join condition.");
     }
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/fn/TestBuildOutputRecordUdf.java b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestBuildOutputRecordUdf.java
new file mode 100644 (file)
index 0000000..9747c65
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.sql.fn;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.apache.samza.sql.SamzaSqlRelRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestBuildOutputRecordUdf {
+
+  @Test
+  public void testNoArgs() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+
+    SamzaSqlRelRecord actualRecord = buildOutputRecordUdf.execute();
+    SamzaSqlRelRecord expectedRecord =
+        new SamzaSqlRelRecord(new ArrayList<>(), new ArrayList<>());
+
+    Assert.assertEquals(actualRecord.getFieldNames(), expectedRecord.getFieldNames());
+    Assert.assertEquals(actualRecord.getFieldValues(), expectedRecord.getFieldValues());
+  }
+
+  @Test
+  public void testSinglePair() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+
+    SamzaSqlRelRecord actualRecord = buildOutputRecordUdf.execute("key", "value");
+    SamzaSqlRelRecord expectedRecord =
+        new SamzaSqlRelRecord(Arrays.asList("key"), Arrays.asList("value"));
+
+    Assert.assertEquals(actualRecord.getFieldNames(), expectedRecord.getFieldNames());
+    Assert.assertEquals(actualRecord.getFieldValues(), expectedRecord.getFieldValues());
+  }
+
+  @Test
+  public void testMultiPairs() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+
+    SamzaSqlRelRecord actualRecord = buildOutputRecordUdf.execute("k1", "v1", "k2", "v2");
+    SamzaSqlRelRecord expectedRecord =
+        new SamzaSqlRelRecord(Arrays.asList("k1", "k2"), Arrays.asList("v1", "v2"));
+
+    Assert.assertEquals(actualRecord.getFieldNames(), expectedRecord.getFieldNames());
+    Assert.assertEquals(actualRecord.getFieldValues(), expectedRecord.getFieldValues());
+  }
+
+  @Test
+  public void testNestedRecord() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+    SamzaSqlRelRecord nestedSamzaSqlRelRecord =
+        new SamzaSqlRelRecord(Arrays.asList("k3"), Arrays.asList("v3"));
+
+    SamzaSqlRelRecord actualRecord = buildOutputRecordUdf.execute("k1", "v1", "k2", nestedSamzaSqlRelRecord);
+    SamzaSqlRelRecord expectedRecord =
+        new SamzaSqlRelRecord(Arrays.asList("k1", "k2"),
+            Arrays.asList("v1", nestedSamzaSqlRelRecord));
+
+    Assert.assertEquals(actualRecord.getFieldNames(), expectedRecord.getFieldNames());
+    Assert.assertEquals(actualRecord.getFieldValues(), expectedRecord.getFieldValues());
+  }
+
+  @Test(expected = NullPointerException.class)
+  public void testNullArgs() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+    buildOutputRecordUdf.execute(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testOddNumOfArgs() {
+    BuildOutputRecordUdf buildOutputRecordUdf = new BuildOutputRecordUdf();
+    buildOutputRecordUdf.execute("k1");
+  }
+}
index 9f3f8a0..ba40ee5 100644 (file)
@@ -31,6 +31,9 @@ public class SampleRelTableKeyConverter implements SamzaRelTableKeyConverter {
 
   @Override
   public Object convertToTableKeyFormat(SamzaSqlRelRecord relRecord) {
+    if (relRecord.getFieldValues().get(0) instanceof SamzaSqlRelRecord) {
+      relRecord = (SamzaSqlRelRecord) relRecord.getFieldValues().get(0);
+    }
     return relRecord.getFieldValues().stream().map(Object::toString).collect(Collectors.toList()).get(0);
   }
 }
index 3540a39..82e57d5 100644 (file)
@@ -35,6 +35,7 @@ import org.apache.samza.sql.avro.schemas.PageView;
 import org.apache.samza.sql.avro.schemas.PageViewCount;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
+import org.apache.samza.sql.fn.BuildOutputRecordUdf;
 import org.apache.samza.sql.fn.FlattenUdf;
 import org.apache.samza.sql.fn.RegexMatchUdf;
 import org.apache.samza.sql.impl.ConfigBasedIOResolverFactory;
@@ -95,7 +96,7 @@ public class SamzaSqlTestConfig {
         ConfigBasedUdfResolver.class.getName());
     staticConfigs.put(configUdfResolverDomain + ConfigBasedUdfResolver.CFG_UDF_CLASSES, Joiner.on(",")
         .join(MyTestUdf.class.getName(), RegexMatchUdf.class.getName(), FlattenUdf.class.getName(),
-            MyTestArrayUdf.class.getName()));
+            MyTestArrayUdf.class.getName(), BuildOutputRecordUdf.class.getName()));
 
     String avroSystemConfigPrefix =
         String.format(ConfigBasedIOResolverFactory.CFG_FMT_SAMZA_PREFIX, SAMZA_SYSTEM_TEST_AVRO);
index 439b8f7..c075677 100644 (file)
@@ -118,6 +118,37 @@ public class TestSamzaSqlRemoteTable extends SamzaSqlIntegrationTestHarness {
   }
 
   @Test
+  public void testSourceEndToEndWithKeyAndUdf() {
+    int numMessages = 20;
+
+    TestAvroSystemFactory.messages.clear();
+    RemoteStoreIOResolverTestFactory.records.clear();
+    Map<String, String> staticConfigs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(numMessages);
+    populateProfileTable(staticConfigs, numMessages);
+
+    String sql =
+        "Insert into testavro.enrichedPageViewTopic "
+            + "select pv.pageKey as __key__, pv.pageKey as pageKey, coalesce(null, 'N/A') as companyName,"
+            + "       p.name as profileName, p.address as profileAddress "
+            + "from testRemoteStore.Profile.`$table` as p "
+            + "join testavro.PAGEVIEW as pv "
+            + " on p.__key__ = BuildOutputRecord('id', pv.profileId)";
+
+    List<String> sqlStmts = Arrays.asList(sql);
+    staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));
+    runApplication(new MapConfig(staticConfigs));
+
+    List<String> outMessages = TestAvroSystemFactory.messages.stream()
+        .map(x -> ((GenericRecord) x.getMessage()).get("pageKey").toString() + ","
+            + (((GenericRecord) x.getMessage()).get("profileName") == null ? "null" :
+            ((GenericRecord) x.getMessage()).get("profileName").toString()))
+        .collect(Collectors.toList());
+    Assert.assertEquals(numMessages, outMessages.size());
+    List<String> expectedOutMessages = TestAvroSystemFactory.getPageKeyProfileNameJoin(numMessages);
+    Assert.assertEquals(expectedOutMessages, outMessages);
+  }
+
+  @Test
   public void testSourceEndToEndWithKeyWithNullForeignKeys() {
     int numMessages = 20;