SAMZA-1740: Moving SamzaSqlRelRecord to samza-api as it is needed to be used in UDFs
authorAditya Toomula <atoomula@linkedin.com>
Mon, 11 Jun 2018 17:49:45 +0000 (10:49 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Mon, 11 Jun 2018 17:49:45 +0000 (10:49 -0700)
Please see description in the ticket. Also, implementing equals and hashCode methods for SamzaSqlRelRecord and SamzaSqlRelMessage.

Author: Aditya Toomula <atoomula@linkedin.com>

Reviewers: Srini P<spunuru@linkedin.com>, Jagadish <jagadish@apache.org>

Closes #545 from atoomula/sql

samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java [new file with mode: 0644]
samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java [new file with mode: 0644]
samza-sql/src/main/java/org/apache/samza/sql/avro/AvroRelConverter.java
samza-sql/src/main/java/org/apache/samza/sql/data/SamzaSqlRelMessage.java
samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelMessageSerdeFactory.java
samza-sql/src/main/java/org/apache/samza/sql/serializers/SamzaSqlRelRecordSerdeFactory.java
samza-sql/src/test/java/org/apache/samza/sql/data/TestSamzaSqlRelMessage.java
samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelMessageSerde.java [moved from samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelMessageSerde.java with 99% similarity]
samza-sql/src/test/java/org/apache/samza/sql/serializers/TestSamzaSqlRelRecordSerde.java [moved from samza-sql/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecordSerde.java with 95% similarity]

diff --git a/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java b/samza-api/src/main/java/org/apache/samza/sql/SamzaSqlRelRecord.java
new file mode 100644 (file)
index 0000000..e17a273
--- /dev/null
@@ -0,0 +1,113 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+
+package org.apache.samza.sql;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.samza.annotation.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+
+/**
+ * Samza sql relational record. A record consists of list of column values and the associated column names.
+ * A column value could be nested, meaning, it could be another SamzaSqlRelRecord.
+ * Right now we do not store any metadata (like nullability, etc) other than the column name in the SamzaSqlRelRecord.
+ */
+@InterfaceStability.Unstable
+public class SamzaSqlRelRecord implements Serializable {
+
+  @JsonProperty("fieldNames")
+  private final ArrayList<String> fieldNames;
+  @JsonProperty("fieldValues")
+  private final ArrayList<Object> fieldValues;
+
+  /**
+   * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values.
+   * @param fieldNames Ordered list of field names in the row.
+   * @param fieldValues  Ordered list of all the values in the row. Some of the fields can be null. This could be
+   *                     result of delete change capture event in the stream or because of the result of the outer
+   *                     join or the fields themselves are null in the original stream.
+   */
+  public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames,
+      @JsonProperty("fieldValues") List<Object> fieldValues) {
+    if (fieldNames.size() != fieldValues.size()) {
+      throw new IllegalArgumentException("Field Names and values are not of same length.");
+    }
+
+    this.fieldNames = new ArrayList<>();
+    this.fieldValues = new ArrayList<>();
+
+    this.fieldNames.addAll(fieldNames);
+    this.fieldValues.addAll(fieldValues);
+  }
+
+  /**
+   * Get the field names of all the columns in the relational message.
+   * @return the field names of all columns.
+   */
+  @JsonProperty("fieldNames")
+  public List<String> getFieldNames() {
+    return this.fieldNames;
+  }
+
+  /**
+   * Get the field values of all the columns in the relational message.
+   * @return the field values of all columns.
+   */
+  @JsonProperty("fieldValues")
+  public List<Object> getFieldValues() {
+    return this.fieldValues;
+  }
+
+  /**
+   * Get the value of the field corresponding to the field name.
+   * @param name Name of the field.
+   * @return returns the value of the field.
+   */
+  public Optional<Object> getField(String name) {
+    for (int index = 0; index < fieldNames.size(); index++) {
+      if (fieldNames.get(index).equals(name)) {
+        return Optional.ofNullable(fieldValues.get(index));
+      }
+    }
+
+    return Optional.empty();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fieldNames, fieldValues);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SamzaSqlRelRecord other = (SamzaSqlRelRecord) obj;
+    return Objects.equals(fieldNames, other.fieldNames) && Objects.equals(fieldValues, other.fieldValues);
+  }
+}
diff --git a/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java b/samza-api/src/test/java/org/apache/samza/sql/TestSamzaSqlRelRecord.java
new file mode 100644 (file)
index 0000000..ac27991
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.sql;
+
+import java.util.Arrays;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestSamzaSqlRelRecord {
+  @Test
+  public void testEquality() {
+    SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object"));
+    SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object"));
+    assertEquals(relRecord1, relRecord2);
+    assertEquals(relRecord1.hashCode(), relRecord2.hashCode());
+  }
+
+  @Test
+  public void testInEquality() {
+    SamzaSqlRelRecord relRecord1 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, "object"));
+    SamzaSqlRelRecord relRecord2 = new SamzaSqlRelRecord(Arrays.asList("id", "name"), Arrays.asList(1L, null));
+    assertNotEquals(relRecord1, relRecord2);
+    assertNotEquals(relRecord1.hashCode(), relRecord2.hashCode());
+  }
+}
index c9c30cc..ed22cc5 100644 (file)
@@ -32,14 +32,13 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.KV;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
 import org.apache.samza.sql.interfaces.SamzaRelConverter;
 import org.apache.samza.system.SystemStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
-
 
 /**
  * This class converts a Samza Avro messages to Relational messages and vice versa.
index 9bf1870..3ebbb23 100644 (file)
@@ -22,14 +22,9 @@ package org.apache.samza.sql.data;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
+import java.util.Objects;
 import org.apache.commons.lang.Validate;
-import org.apache.samza.SamzaException;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 
@@ -96,6 +91,7 @@ public class SamzaSqlRelMessage implements Serializable {
 
   /**
    * Creates the SamzaSqlRelMessage from {@link SamzaSqlRelRecord}.
+   * @param samzaSqlRelRecord represents the rel record.
    */
   public SamzaSqlRelMessage(@JsonProperty("samzaSqlRelRecord") SamzaSqlRelRecord samzaSqlRelRecord) {
     this(samzaSqlRelRecord.getFieldNames(), samzaSqlRelRecord.getFieldValues());
@@ -110,67 +106,20 @@ public class SamzaSqlRelMessage implements Serializable {
     return key;
   }
 
-  /**
-   * Samza sql relational record. A record consists of list of column values and the associated column names.
-   * A column value could be nested, meaning, it could be another SamzaSqlRelRecord.
-   * Right now we do not store any metadata (like nullability, etc) other than the column name in the SamzaSqlRelRecord.
-   */
-  public static class SamzaSqlRelRecord implements Serializable {
-
-    @JsonProperty("fieldNames")
-    private final List<String> fieldNames;
-    @JsonProperty("fieldValues")
-    private final List<Object> fieldValues;
-
-    /**
-     * Creates a {@link SamzaSqlRelRecord} from the list of relational fields and values.
-     * @param fieldNames Ordered list of field names in the row.
-     * @param fieldValues  Ordered list of all the values in the row. Some of the fields can be null. This could be
-     *                     result of delete change capture event in the stream or because of the result of the outer
-     *                     join or the fields themselves are null in the original stream.
-     */
-    public SamzaSqlRelRecord(@JsonProperty("fieldNames") List<String> fieldNames,
-        @JsonProperty("fieldValues") List<Object> fieldValues) {
-      Validate.isTrue(fieldNames.size() == fieldValues.size(), "Field Names and values are not of same length.");
-
-      this.fieldNames = new ArrayList<>();
-      this.fieldValues = new ArrayList<>();
-
-      this.fieldNames.addAll(fieldNames);
-      this.fieldValues.addAll(fieldValues);
-    }
-
-    /**
-     * Get the field names of all the columns in the relational message.
-     * @return the field names of all columns.
-     */
-    @JsonProperty("fieldNames")
-    public List<String> getFieldNames() {
-      return this.fieldNames;
-    }
-
-    /**
-     * Get the field values of all the columns in the relational message.
-     * @return the field values of all columns.
-     */
-    @JsonProperty("fieldValues")
-    public List<Object> getFieldValues() {
-      return this.fieldValues;
-    }
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, samzaSqlRelRecord);
+  }
 
-    /**
-     * Get the value of the field corresponding to the field name.
-     * @param name Name of the field.
-     * @return returns the value of the field.
-     */
-    public Optional<Object> getField(String name) {
-      for (int index = 0; index < fieldNames.size(); index++) {
-        if (fieldNames.get(index).equals(name)) {
-          return Optional.ofNullable(fieldValues.get(index));
-        }
-      }
-
-      return Optional.empty();
-    }
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    SamzaSqlRelMessage other = (SamzaSqlRelMessage) obj;
+    return Objects.equals(key, other.key) && Objects.equals(samzaSqlRelRecord, other.samzaSqlRelRecord);
   }
 }
index 45542ca..c3906bd 100644 (file)
@@ -30,7 +30,7 @@ import org.codehaus.jackson.type.TypeReference;
 
 /**
  * A serializer for {@link SamzaSqlRelMessage}. This serializer preserves the type information as
- * {@link SamzaSqlRelMessage} contains nested {@link org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * {@link SamzaSqlRelMessage} contains nested {@link org.apache.samza.sql.SamzaSqlRelRecord}
  * records.
  */
 public final class SamzaSqlRelMessageSerdeFactory implements SerdeFactory<SamzaSqlRelMessage> {
index 8a22047..a78bcda 100644 (file)
@@ -23,37 +23,37 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerdeFactory;
-import org.apache.samza.sql.data.SamzaSqlRelMessage;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 
 
 /**
- * A serializer for {@link SamzaSqlRelMessage.SamzaSqlRelRecord}. This serializer preserves the type information as
- * {@link SamzaSqlRelMessage.SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelMessage.SamzaSqlRelRecord}
+ * A serializer for {@link SamzaSqlRelRecord}. This serializer preserves the type information as
+ * {@link SamzaSqlRelRecord} and contains nested {@link SamzaSqlRelRecord}
  * records.
  */
-public final class SamzaSqlRelRecordSerdeFactory implements SerdeFactory<SamzaSqlRelMessage.SamzaSqlRelRecord> {
-  public Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> getSerde(String name, Config config) {
+public final class SamzaSqlRelRecordSerdeFactory implements SerdeFactory<SamzaSqlRelRecord> {
+  public Serde<SamzaSqlRelRecord> getSerde(String name, Config config) {
     return new SamzaSqlRelRecordSerde();
   }
 
-  public final static class SamzaSqlRelRecordSerde implements Serde<SamzaSqlRelMessage.SamzaSqlRelRecord> {
+  public final static class SamzaSqlRelRecordSerde implements Serde<SamzaSqlRelRecord> {
 
     @Override
-    public SamzaSqlRelMessage.SamzaSqlRelRecord fromBytes(byte[] bytes) {
+    public SamzaSqlRelRecord fromBytes(byte[] bytes) {
       try {
         ObjectMapper mapper = new ObjectMapper();
         // Enable object typing to handle nested records
         mapper.enableDefaultTyping();
-        return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelMessage.SamzaSqlRelRecord>() {});
+        return mapper.readValue(new String(bytes, "UTF-8"), new TypeReference<SamzaSqlRelRecord>() {});
       } catch (Exception e) {
         throw new SamzaException(e);
       }
     }
 
     @Override
-    public byte[] toBytes(SamzaSqlRelMessage.SamzaSqlRelRecord p) {
+    public byte[] toBytes(SamzaSqlRelRecord p) {
       try {
         ObjectMapper mapper = new ObjectMapper();
         // Enable object typing to handle nested records
index 93e6223..d0a2f59 100644 (file)
@@ -43,4 +43,22 @@ public class TestSamzaSqlRelMessage {
     SamzaSqlRelMessage message = new SamzaSqlRelMessage(names, values);
     Assert.assertFalse(message.getSamzaSqlRelRecord().getField("field3").isPresent());
   }
+
+  @Test
+  public void testEquality() {
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message2 =
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value1", "value2"));
+    Assert.assertEquals(message1, message2);
+    Assert.assertEquals(message1.hashCode(), message2.hashCode());
+  }
+
+  @Test
+  public void testInEquality() {
+    SamzaSqlRelMessage message1 = new SamzaSqlRelMessage(names, values);
+    SamzaSqlRelMessage message2 =
+        new SamzaSqlRelMessage(Arrays.asList("field1", "field2"), Arrays.asList("value2", "value2"));
+    Assert.assertNotEquals(message1, message2);
+    Assert.assertNotEquals(message1.hashCode(), message2.hashCode());
+  }
 }
@@ -17,7 +17,7 @@
 * under the License.
 */
 
-package org.apache.samza.sql;
+package org.apache.samza.sql.serializers;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -28,18 +28,17 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.samza.config.MapConfig;
+import org.apache.samza.sql.SamzaSqlRelRecord;
 import org.apache.samza.sql.avro.AvroRelConverter;
 import org.apache.samza.sql.avro.AvroRelSchemaProvider;
 import org.apache.samza.sql.avro.ConfigBasedAvroRelSchemaProviderFactory;
 import org.apache.samza.sql.avro.schemas.Profile;
 import org.apache.samza.sql.data.SamzaSqlRelMessage;
-import org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory;
 import org.apache.samza.system.SystemStream;
 import org.junit.Assert;
 import org.junit.Test;
 
 import static org.apache.samza.sql.serializers.SamzaSqlRelRecordSerdeFactory.SamzaSqlRelRecordSerde;
-import static org.apache.samza.sql.data.SamzaSqlRelMessage.SamzaSqlRelRecord;
 
 
 public class TestSamzaSqlRelRecordSerde {