SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from...
[sqoop.git] / connector / connector-sdk / src / main / java / org / apache / sqoop / connector / common / SqoopAvroUtils.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.connector.common;
19
20 import org.apache.avro.Schema;
21 import org.apache.log4j.Logger;
22 import org.apache.sqoop.classification.InterfaceAudience;
23 import org.apache.sqoop.classification.InterfaceStability;
24 import org.apache.sqoop.common.SqoopException;
25 import org.apache.sqoop.error.code.IntermediateDataFormatError;
26 import org.apache.sqoop.schema.type.AbstractComplexListType;
27 import org.apache.sqoop.schema.type.Column;
28 import org.apache.sqoop.schema.type.FixedPoint;
29 import org.apache.sqoop.schema.type.FloatingPoint;
30
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Set;
35
36 @InterfaceAudience.Public
37 @InterfaceStability.Unstable
38 public class SqoopAvroUtils {
39
40 private static final Logger LOG = Logger.getLogger(SqoopAvroUtils.class);
41
42 public static final String COLUMN_TYPE = "columnType";
43 public static final String SQOOP_SCHEMA_NAMESPACE = "org.apache.sqoop";
44
45 /**
46 * Creates an Avro schema from a Sqoop schema.
47 */
48 public static Schema createAvroSchema(org.apache.sqoop.schema.Schema sqoopSchema) {
49 // avro schema names cannot start with quotes, lets just remove them
50 String name = createAvroName(sqoopSchema.getName());
51 String doc = sqoopSchema.getNote();
52 String namespace = SQOOP_SCHEMA_NAMESPACE;
53 Schema schema = Schema.createRecord(name, doc, namespace, false);
54
55 List<Schema.Field> fields = new ArrayList<Schema.Field>();
56 for (Column column : sqoopSchema.getColumnsArray()) {
57 Schema.Field field = new Schema.Field(createAvroName(column.getName()), createAvroFieldSchema(column), null, null);
58 field.addProp(COLUMN_TYPE, column.getType().toString());
59 fields.add(field);
60 }
61 schema.setFields(fields);
62 return schema;
63 }
64
65 // From the avro docs:
66 // The name portion of a fullname, record field names, and enum symbols must:
67 // start with [A-Za-z_]
68 // subsequently contain only [A-Za-z0-9_]
69 public static String createAvroName(String name) {
70 String avroName = name.replaceFirst("^[0-9]", "").replaceAll("[^a-zA-Z0-9_]", "");
71 LOG.debug("Replacing name: " + name + " with Avro name: " + avroName);
72 return avroName;
73 }
74
75 public static Schema createAvroFieldSchema(Column column) {
76 Schema schema = toAvroFieldType(column);
77 if (!column.isNullable()) {
78 return schema;
79 } else {
80 List<Schema> union = new ArrayList<Schema>();
81 union.add(schema);
82 union.add(Schema.create(Schema.Type.NULL));
83 return Schema.createUnion(union);
84 }
85 }
86
87 public static Schema toAvroFieldType(Column column) throws IllegalArgumentException {
88 switch (column.getType()) {
89 case ARRAY:
90 case SET:
91 assert column instanceof AbstractComplexListType;
92 AbstractComplexListType listColumn = (AbstractComplexListType) column;
93 return Schema.createArray(toAvroFieldType(listColumn.getListType()));
94 case UNKNOWN:
95 case BINARY:
96 return Schema.create(Schema.Type.BYTES);
97 case BIT:
98 return Schema.create(Schema.Type.BOOLEAN);
99 case DATE:
100 case DATE_TIME:
101 case TIME:
102 // avro 1.8 will have date type
103 // https://issues.apache.org/jira/browse/AVRO-739
104 return Schema.create(Schema.Type.LONG);
105 case DECIMAL:
106 // TODO: is string ok, used it since kite code seems to use it
107 return Schema.create(Schema.Type.STRING);
108 case ENUM:
109 return createEnumSchema(column);
110 case FIXED_POINT:
111 if (SqoopIDFUtils.isInteger(column)) {
112 return Schema.create(Schema.Type.INT);
113 } else {
114 return Schema.create(Schema.Type.LONG);
115 }
116 case FLOATING_POINT:
117 assert column instanceof FloatingPoint;
118 Long byteSize = ((FloatingPoint) column).getByteSize();
119 if (byteSize != null && byteSize <= (Float.SIZE/Byte.SIZE)) {
120 return Schema.create(Schema.Type.FLOAT);
121 } else {
122 return Schema.create(Schema.Type.DOUBLE);
123 }
124 case MAP:
125 assert column instanceof org.apache.sqoop.schema.type.Map;
126 org.apache.sqoop.schema.type.Map mapColumn = (org.apache.sqoop.schema.type.Map) column;
127 return Schema.createArray(toAvroFieldType(mapColumn.getValue()));
128 case TEXT:
129 return Schema.create(Schema.Type.STRING);
130 default:
131 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004, column.getType().name());
132 }
133 }
134
135 public static Schema createEnumSchema(Column column) {
136 assert column instanceof org.apache.sqoop.schema.type.Enum;
137 Set<String> options = ((org.apache.sqoop.schema.type.Enum) column).getOptions();
138 List<String> listOptions = new ArrayList<String>(options);
139 return Schema.createEnum(createAvroName(column.getName()), null, SQOOP_SCHEMA_NAMESPACE, listOptions);
140 }
141
142 public static byte[] getBytesFromByteBuffer(Object obj) {
143 ByteBuffer buffer = (ByteBuffer) obj;
144 byte[] bytes = new byte[buffer.remaining()];
145 buffer.duplicate().get(bytes);
146 return bytes;
147 }
148
149 }