SAMZA-1755: Fix JsonRelConverter to recursively convert relRecords.
authorAditya Toomula <atoomula@linkedin.com>
Fri, 13 Jul 2018 22:26:34 +0000 (15:26 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 13 Jul 2018 22:26:34 +0000 (15:26 -0700)
Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: Srini P<spunuru@linkedin.com>

Closes #558 from atoomula/sql1

samza-tools/src/main/java/org/apache/samza/tools/json/JsonRelConverterFactory.java

index ce257f1..4db066a 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.RelSchemaProvider;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
@@ -55,12 +56,16 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
 
     @Override
     public KV<Object, Object> convertToSamzaMessage(SamzaSqlRelMessage relMessage) {
+      String jsonValue = convertToSamzaMessage(relMessage.getSamzaSqlRelRecord());
+      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+    }
 
+    private String convertToSamzaMessage(SamzaSqlRelRecord relRecord) {
       String jsonValue;
       ObjectNode node = mapper.createObjectNode();
 
-      List<String> fieldNames = relMessage.getSamzaSqlRelRecord().getFieldNames();
-      List<Object> values = relMessage.getSamzaSqlRelRecord().getFieldValues();
+      List<String> fieldNames = relRecord.getFieldNames();
+      List<Object> values = relRecord.getFieldValues();
 
       for (int index = 0; index < fieldNames.size(); index++) {
         Object value = values.get(index);
@@ -77,6 +82,9 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
           node.put(fieldNames.get(index), (Double) value);
         } else if (String.class.isAssignableFrom(value.getClass())) {
           node.put(fieldNames.get(index), (String) value);
+        } else if (SamzaSqlRelRecord.class.isAssignableFrom(value.getClass())) {
+          // If the value is a SamzaSqlRelRecord, call convertToSamzaMessage to convert the record to json string.
+          node.put(fieldNames.get(index), convertToSamzaMessage((SamzaSqlRelRecord) value));
         } else {
           node.put(fieldNames.get(index), value.toString());
         }
@@ -87,7 +95,7 @@ public class JsonRelConverterFactory implements SamzaRelConverterFactory {
         throw new SamzaException("Error json serializing object", e);
       }
 
-      return new KV<>(relMessage.getKey(), jsonValue.getBytes());
+      return jsonValue;
     }
   }
 }