End to end union test case (#916)
authorSrinivasulu Punuru <srinipunuru@users.noreply.github.com>
Wed, 13 Feb 2019 19:18:48 +0000 (11:18 -0800)
committerGitHub <noreply@github.com>
Wed, 13 Feb 2019 19:18:48 +0000 (11:18 -0800)
samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
samza-sql/src/test/java/org/apache/samza/sql/avro/TestAvroRelConversion.java
samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.avsc
samza-sql/src/test/java/org/apache/samza/sql/avro/schemas/ComplexRecord.java
samza-sql/src/test/java/org/apache/samza/sql/system/TestAvroSystemFactory.java
samza-test/src/test/java/org/apache/samza/test/samzasql/TestSamzaSqlEndToEnd.java

index 0d452dd..ba79e48 100644 (file)
@@ -207,7 +207,11 @@ public class AvroRelConverter implements SamzaRelConverter {
             .collect(Collectors.toMap(Map.Entry::getKey, e -> convertToAvroObject(e.getValue(),
                 getNonNullUnionSchema(schema).getValueType())));
       case UNION:
-        return convertToAvroObject(relObj, getNonNullUnionSchema(schema));
+        for (Schema unionSchema : schema.getTypes()) {
+          if (isSchemaCompatibleWithRelObj(relObj, unionSchema)) {
+            return convertToAvroObject(relObj, unionSchema);
+          }
+        }
       case ENUM:
         return new GenericData.EnumSymbol(schema, (String) relObj);
       case FIXED:
@@ -219,19 +223,6 @@ public class AvroRelConverter implements SamzaRelConverter {
     }
   }
 
-  // Two non-nullable types in a union is not yet supported.
-  static public Schema getNonNullUnionSchema(Schema schema) {
-    if (schema.getType().equals(Schema.Type.UNION)) {
-      if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(0);
-      }
-      if (schema.getTypes().get(1).getType() != Schema.Type.NULL) {
-        return schema.getTypes().get(1);
-      }
-    }
-    return schema;
-  }
-
   // Not doing any validations of data types with Avro schema considering the resource cost per message.
   // Casting would fail if the data types are not in sync with the schema.
   public Object convertToJavaObject(Object avroObj, Schema schema) {
@@ -267,7 +258,11 @@ public class AvroRelConverter implements SamzaRelConverter {
         return retVal;
       }
       case UNION:
-        return convertToJavaObject(avroObj, getNonNullUnionSchema(schema));
+        for (Schema unionSchema : schema.getTypes()) {
+          if (isSchemaCompatible(avroObj, unionSchema)) {
+            return convertToJavaObject(avroObj, unionSchema);
+          }
+        }
       case ENUM:
         return avroObj.toString();
       case FIXED:
@@ -283,4 +278,66 @@ public class AvroRelConverter implements SamzaRelConverter {
         return avroObj;
     }
   }
+
+  private boolean isSchemaCompatible(Object avroObj, Schema unionSchema) {
+    if (unionSchema.getType() == Schema.Type.NULL && avroObj == null) {
+      return true;
+    }
+    switch (unionSchema.getType()) {
+      case RECORD:
+        return avroObj instanceof IndexedRecord;
+      case ARRAY:
+        return avroObj instanceof GenericData.Array || avroObj instanceof List;
+      case MAP:
+        return avroObj instanceof Map;
+      case FIXED:
+        return avroObj instanceof GenericData.Fixed;
+      case BYTES:
+        return avroObj instanceof ByteBuffer;
+      case FLOAT:
+        return avroObj instanceof Float;
+      default:
+        return true;
+    }
+  }
+
+  private static boolean isSchemaCompatibleWithRelObj(Object relObj, Schema unionSchema) {
+    if (unionSchema.getType() == Schema.Type.NULL && relObj == null) {
+      return true;
+    }
+    switch (unionSchema.getType()) {
+      case RECORD:
+        return relObj instanceof SamzaSqlRelRecord;
+      case ARRAY:
+        return relObj instanceof List;
+      case MAP:
+        return relObj instanceof Map;
+      case FIXED:
+        return relObj instanceof ByteString;
+      case BYTES:
+        return relObj instanceof ByteString;
+      case FLOAT:
+        return relObj instanceof Float || relObj instanceof Double;
+      default:
+        return true;
+    }
+  }
+
+  // Two non-nullable types in a union is not yet supported.
+  public static Schema getNonNullUnionSchema(Schema schema) {
+    if (schema.getType().equals(Schema.Type.UNION)) {
+      List<Schema> types = schema.getTypes();
+      // Typically a nullable field's schema is configured as an union of Null and a Type.
+      // This is to check whether the Union is a Nullable field
+      if (types.size() == 2) {
+        if (types.get(0).getType() == Schema.Type.NULL) {
+          return types.get(1);
+        } else if ((types.get(1).getType() == Schema.Type.NULL)) {
+          return types.get(0);
+        }
+      }
+    }
+
+    return schema;
+  }
 }
index 47f76e8..c3f1caf 100644 (file)
@@ -56,6 +56,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.planner.RelSchemaConverter;
 import org.apache.samza.sql.schema.SqlSchema;
@@ -194,6 +195,7 @@ public class TestAvroRelConversion {
   @Test
   public void testComplexRecordConversion() throws IOException {
     GenericData.Record record = new GenericData.Record(ComplexRecord.SCHEMA$);
+
     record.put("id", id);
     record.put("bool_value", boolValue);
     record.put("double_value", doubleValue);
@@ -204,7 +206,7 @@ public class TestAvroRelConversion {
     record.put("long_value", longValue);
     record.put("array_values", arrayValue);
     record.put("map_values", mapValue);
-    record.put("union_value", id);
+    record.put("union_value", testStrValue);
 
     ComplexRecord complexRecord = new ComplexRecord();
     complexRecord.id = id;
@@ -219,18 +221,10 @@ public class TestAvroRelConversion {
     complexRecord.array_values.addAll(arrayValue);
     complexRecord.map_values = new HashMap<>();
     complexRecord.map_values.putAll(mapValue);
-    complexRecord.union_value = id;
-
-    byte[] serializedData = bytesFromGenericRecord(record);
-    validateAvroSerializedData(serializedData, id);
-
-    serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
-    validateAvroSerializedData(serializedData, id);
-
-    record.put("union_value", testStrValue);
     complexRecord.union_value = testStrValue;
 
-    serializedData = bytesFromGenericRecord(record);
+
+    byte[] serializedData = bytesFromGenericRecord(record);
     validateAvroSerializedData(serializedData, testStrValue);
 
     serializedData = encodeAvroSpecificRecord(ComplexRecord.class, complexRecord);
index 5106bc2..5e78bd9 100644 (file)
           ]
         },
         {
-          "name": "union_value",
-          "doc": "union Value.",
-          "type": ["null", "int", "string"],
-          "default":null
-        },
-        {
           "name" : "empty_record",
           "type" : [ "null", {
             "type" : "record",
                 ]
               }
             ]
+        },
+        {
+          "name": "union_value",
+          "doc": "union Value.",
+          "type": ["null", "SubRecord", "string"],
+          "default":null
         }
     ]
 }
index ccc7929..7796004 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.samza.sql.avro.schemas;
 
 @SuppressWarnings("all")
 public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"union_value\",\"type\":[\"null\",\"int\",\"string\"],\"doc\":\"union Value.\",\"default\":null},{\"name\":\"empty_record\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"emptySubRecord\",\"fields\":[]}]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]}]}");
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"ComplexRecord\",\"namespace\":\"org.apache.samza.sql.avro.schemas\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"Record id.\",\"default\":null},{\"name\":\"bool_value\",\"type\":[\"null\",\"boolean\"],\"doc\":\"Boolean Value.\",\"default\":null},{\"name\":\"double_value\",\"type\":[\"null\",\"double\"],\"doc\":\"double Value.\",\"default\":null},{\"name\":\"float_value\",\"type\":[\"null\",\"float\"],\"doc\":\"float Value.\",\"default\":null},{\"name\":\"string_value\",\"type\":[\"null\",\"string\"],\"doc\":\"string Value.\",\"default\":null},{\"name\":\"bytes_value\",\"type\":[\"null\",\"bytes\"],\"doc\":\"bytes Value.\",\"default\":null},{\"name\":\"long_value\",\"type\":[\"null\",\"long\"],\"doc\":\"long Value.\",\"default\":null},{\"name\":\"fixed_value\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"MyFixed\",\"size\":16}],\"doc\":\"fixed Value.\"},{\"name\":\"array_values\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"doc\":\"array values in the record.\",\"default\":[]},{\"name\":\"map_values\",\"type\":[\"null\",{\"type\":\"map\",\"values\":\"string\"}],\"doc\":\"map values in the record.\",\"default\":[]},{\"name\":\"enum_value\",\"type\":[\"null\",{\"type\":\"enum\",\"name\":\"TestEnumType\",\"symbols\":[\"foo\",\"bar\"]}],\"doc\":\"enum value.\",\"default\":[]},{\"name\":\"empty_record\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"emptySubRecord\",\"fields\":[]}]},{\"name\":\"array_records\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"SubRecord\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"doc\":\"sub record id\"},{\"name\":\"sub_values\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"doc\":\"Sub record \"}]}],\"doc\":\"array of records.\",\"default\":[]},{\"name\":\"union_value\",\"type\":[\"null\",\"SubRecord\",\"string\"],\"doc\":\"union Value.\",\"default\":null}]}");
   /** Record id. */
   public java.lang.Integer id;
   /** Boolean Value. */
@@ -43,11 +43,11 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
   public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> map_values;
   /** enum value. */
   public org.apache.samza.sql.avro.schemas.TestEnumType enum_value;
-  /** union Value. */
-  public java.lang.Object union_value;
   public org.apache.samza.sql.avro.schemas.emptySubRecord empty_record;
   /** array of records. */
   public org.apache.samza.sql.avro.schemas.SubRecord array_records;
+  /** union Value. */
+  public java.lang.Object union_value;
   public org.apache.avro.Schema getSchema() { return SCHEMA$; }
   // Used by DatumWriter.  Applications should not call.
   public java.lang.Object get(int field$) {
@@ -63,9 +63,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 8: return array_values;
     case 9: return map_values;
     case 10: return enum_value;
-    case 11: return union_value;
-    case 12: return empty_record;
-    case 13: return array_records;
+    case 11: return empty_record;
+    case 12: return array_records;
+    case 13: return union_value;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
@@ -84,9 +84,9 @@ public class ComplexRecord extends org.apache.avro.specific.SpecificRecordBase i
     case 8: array_values = (java.util.List<java.lang.CharSequence>)value$; break;
     case 9: map_values = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
     case 10: enum_value = (org.apache.samza.sql.avro.schemas.TestEnumType)value$; break;
-    case 11: union_value = (java.lang.Object)value$; break;
-    case 12: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
-    case 13: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+    case 11: empty_record = (org.apache.samza.sql.avro.schemas.emptySubRecord)value$; break;
+    case 12: array_records = (org.apache.samza.sql.avro.schemas.SubRecord)value$; break;
+    case 13: union_value = (java.lang.Object)value$; break;
     default: throw new org.apache.avro.AvroRuntimeException("Bad index");
     }
   }
index ec25c9d..8304598 100644 (file)
@@ -42,6 +42,7 @@ import org.apache.samza.sql.avro.schemas.PhoneNumber;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.avro.schemas.SimpleRecord;
 import org.apache.samza.sql.avro.schemas.StreetNumRecord;
+import org.apache.samza.sql.avro.schemas.SubRecord;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
@@ -319,6 +320,7 @@ public class TestAvroSystemFactory implements SystemFactory {
     }
 
     private Object createComplexRecord(int index) {
+
       GenericRecord record = new GenericData.Record(ComplexRecord.SCHEMA$);
       record.put("id", index);
       record.put("string_value", "Name" + index);
@@ -330,6 +332,11 @@ public class TestAvroSystemFactory implements SystemFactory {
           new GenericData.Array<>(index, ComplexRecord.SCHEMA$.getField("array_values").schema().getTypes().get(1));
       arrayValues.addAll(IntStream.range(0, index).mapToObj(String::valueOf).collect(Collectors.toList()));
       record.put("array_values", arrayValues);
+//      record.put("union_value", "unionStrValue");
+      GenericRecord subRecord = new GenericData.Record(SubRecord.SCHEMA$);
+      subRecord.put("id", index);
+      subRecord.put("sub_values", arrayValues);
+      record.put("union_value", subRecord);
       Map<String, String> mapValues = new HashMap<>();
       mapValues.put("key0", "value0");
       record.put("map_values", mapValues);
index 9deb561..35c83e9 100644 (file)
@@ -292,7 +292,7 @@ public class TestSamzaSqlEndToEnd extends SamzaSqlIntegrationTestHarness {
 
     String sql1 =
         "Insert into testavro.outputTopic"
-            + " select map_values['key0'] as string_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
+            + " select map_values['key0'] as string_value, union_value, array_values[0] as string_value, map_values, id, bytes_value, fixed_value, float_value "
             + " from testavro.COMPLEX1";
     List<String> sqlStmts = Collections.singletonList(sql1);
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_SQL_STMTS_JSON, JsonUtil.toJson(sqlStmts));