SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from...
authorJarek Jarcec Cecho <jarcec@apache.org>
Mon, 29 Feb 2016 19:37:06 +0000 (11:37 -0800)
committerJarek Jarcec Cecho <jarcec@apache.org>
Mon, 29 Feb 2016 19:37:06 +0000 (11:37 -0800)
(Abraham Fine via Jarek Jarcec Cecho)

connector/connector-sdk/src/main/java/org/apache/sqoop/connector/common/SqoopAvroUtils.java
connector/connector-sdk/src/main/java/org/apache/sqoop/connector/idf/AVROIntermediateDataFormat.java
connector/connector-sdk/src/test/java/org/apache/sqoop/connector/idf/TestAVROIntermediateDataFormat.java

index 89bc0f2..f34521c 100644 (file)
@@ -18,6 +18,7 @@
 package org.apache.sqoop.connector.common;
 
 import org.apache.avro.Schema;
+import org.apache.log4j.Logger;
 import org.apache.sqoop.classification.InterfaceAudience;
 import org.apache.sqoop.classification.InterfaceStability;
 import org.apache.sqoop.common.SqoopException;
@@ -36,6 +37,8 @@ import java.util.Set;
 @InterfaceStability.Unstable
 public class SqoopAvroUtils {
 
+  private static final Logger LOG = Logger.getLogger(SqoopAvroUtils.class);
+
   public static final String COLUMN_TYPE = "columnType";
   public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
 
@@ -44,14 +47,14 @@ public class SqoopAvroUtils {
    */
   public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
     // avro schema names cannot start with quotes, lets just remove them
-    String name = sqoopSchema.getName().replace("\"", "");
+    String name = createAvroName(sqoopSchema.getName());
     String doc = sqoopSchema.getNote();
     String namespace = SQOOP_SCHEMA_NAMESPACE;
     Schema schema = Schema.createRecord(name, doc, namespace, false);
 
     List<Schema.Field> fields = new ArrayList<Schema.Field>();
     for (Column column : sqoopSchema.getColumnsArray()) {
-      Schema.Field field = new Schema.Field(column.getName(), createAvroFieldSchema(column), null, null);
+      Schema.Field field = new Schema.Field(createAvroName(column.getName()), createAvroFieldSchema(column), null, null);
       field.addProp(COLUMN_TYPE, column.getType().toString());
       fields.add(field);
     }
@@ -59,6 +62,16 @@ public class SqoopAvroUtils {
     return schema;
   }
 
+  // From the avro docs:
+  // The name portion of a fullname, record field names, and enum symbols must:
+  // start with [A-Za-z_]
+  // subsequently contain only [A-Za-z0-9_]
+  public static String createAvroName(String name) {
+    String avroName = name.replaceFirst("^[0-9]", "").replaceAll("[^a-zA-Z0-9_]", "");
+    LOG.debug("Replacing name: " + name + " with Avro name: " + avroName);
+    return avroName;
+  }
+
   public static Schema createAvroFieldSchema(Column column) {
     Schema schema = toAvroFieldType(column);
     if (!column.isNullable()) {
@@ -123,7 +136,7 @@ public class SqoopAvroUtils {
     assert column instanceof org.apache.sqoop.schema.type.Enum;
     Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
     List<String> listOptions = new ArrayList<String>(options);
-    return Schema.createEnum(column.getName(), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
+    return Schema.createEnum(createAvroName(column.getName()), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
   }
 
   public static byte[] getBytesFromByteBuffer(Object obj) {
index b55f7a0..650e24c 100644 (file)
@@ -36,6 +36,7 @@ import org.apache.avro.util.Utf8;
 import org.apache.sqoop.classification.InterfaceAudience;
 import org.apache.sqoop.classification.InterfaceStability;
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.common.SqoopAvroUtils;
 import org.apache.sqoop.error.code.IntermediateDataFormatError;
 import org.apache.sqoop.schema.type.Column;
 import org.apache.sqoop.utils.ClassUtils;
@@ -166,11 +167,12 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
       }
+      String name = SqoopAvroUtils.createAvroName(columns[i].getName());
       if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
-        avroObject.put(columns[i].getName(), null);
+        avroObject.put(name, null);
         continue;
       }
-      avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i]));
+      avroObject.put(name, toAVRO(csvStringArray[i], columns[i]));
     }
     return avroObject;
   }
@@ -250,56 +252,59 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
       }
+
+      String name = SqoopAvroUtils.createAvroName(columns[i].getName());
+
       if (objectArray[i] == null) {
-        avroObject.put(columns[i].getName(), null);
+        avroObject.put(name, null);
         continue;
       }
 
       switch (columns[i].getType()) {
       case ARRAY:
       case SET:
-        avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i]));
+        avroObject.put(name, toList((Object[]) objectArray[i]));
         break;
       case ENUM:
         GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
             (String) objectArray[i]);
-        avroObject.put(columns[i].getName(), enumValue);
+        avroObject.put(name, enumValue);
         break;
       case TEXT:
-        avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i]));
+        avroObject.put(name, new Utf8((String) objectArray[i]));
         break;
       case BINARY:
       case UNKNOWN:
-        avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i]));
+        avroObject.put(name, ByteBuffer.wrap((byte[]) objectArray[i]));
         break;
       case MAP:
       case FIXED_POINT:
       case FLOATING_POINT:
-        avroObject.put(columns[i].getName(), objectArray[i]);
+        avroObject.put(name, objectArray[i]);
         break;
       case DECIMAL:
         // TODO: store as FIXED in SQOOP-16161
-        avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString());
+        avroObject.put(name, ((BigDecimal) objectArray[i]).toPlainString());
         break;
       case DATE_TIME:
         if (objectArray[i] instanceof org.joda.time.DateTime) {
-          avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate()
+          avroObject.put(name, ((org.joda.time.DateTime) objectArray[i]).toDate()
               .getTime());
         } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
-          avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i])
+          avroObject.put(name, ((org.joda.time.LocalDateTime) objectArray[i])
               .toDate().getTime());
         }
         break;
       case TIME:
-        avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i])
+        avroObject.put(name, ((org.joda.time.LocalTime) objectArray[i])
             .toDateTimeToday().getMillis());
         break;
       case DATE:
-        avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate()
+        avroObject.put(name, ((org.joda.time.LocalDate) objectArray[i]).toDate()
             .getTime());
         break;
       case BIT:
-        avroObject.put(columns[i].getName(), Boolean.valueOf(objectArray[i].toString()));
+        avroObject.put(name, Boolean.valueOf(objectArray[i].toString()));
         break;
       default:
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
@@ -317,7 +322,7 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     StringBuilder csvString = new StringBuilder();
     for (int i = 0; i < columns.length; i++) {
 
-      Object obj = record.get(columns[i].getName());
+      Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
       if (obj == null && !columns[i].isNullable()) {
         throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
             columns[i].getName() + " does not support null values");
@@ -396,8 +401,8 @@ public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRe
     Object[] object = new Object[columns.length];
 
     for (int i = 0; i < columns.length; i++) {
-      Object obj = record.get(columns[i].getName());
-      Integer nameIndex = schema.getColumnNameIndex(columns[i].getName());
+      Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
+      Integer nameIndex = schema.getColumnNameIndex(SqoopAvroUtils.createAvroName(columns[i].getName()));
       Column column = columns[nameIndex];
       // null is a possible value
       if (obj == null && !column.isNullable()) {
index 3c4d7de..cd4445d 100644 (file)
@@ -39,6 +39,7 @@ import org.apache.sqoop.schema.type.Decimal;
 import org.apache.sqoop.schema.type.FixedPoint;
 import org.apache.sqoop.schema.type.Text;
 import org.joda.time.LocalDateTime;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -545,4 +546,10 @@ public class TestAVROIntermediateDataFormat {
     dataFormat.getData();
   }
 
+  @Test
+  public void testSchemaWithBadCharacters() {
+    Schema schema = new Schema("9`\" blah`^&*(^&*(%$^&").addColumn(new Text("one").setNullable(false));
+    AVROIntermediateDataFormat dataFormat = new AVROIntermediateDataFormat(schema);
+    Assert.assertEquals(dataFormat.getAvroSchema().getName(), "blah");
+  }
 }