SQOOP-2849: Sqoop2: Job failure when writing parquet in hdfs with data coming from...
[sqoop.git] / connector / connector-sdk / src / main / java / org / apache / sqoop / connector / idf / AVROIntermediateDataFormat.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.sqoop.connector.idf;
20
21 import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
22 import static org.apache.sqoop.connector.common.SqoopAvroUtils.*;
23
24 import org.apache.avro.Schema;
25 import org.apache.avro.generic.GenericData;
26 import org.apache.avro.generic.GenericDatumReader;
27 import org.apache.avro.generic.GenericDatumWriter;
28 import org.apache.avro.generic.GenericRecord;
29 import org.apache.avro.io.BinaryEncoder;
30 import org.apache.avro.io.DatumReader;
31 import org.apache.avro.io.DatumWriter;
32 import org.apache.avro.io.Decoder;
33 import org.apache.avro.io.DecoderFactory;
34 import org.apache.avro.io.EncoderFactory;
35 import org.apache.avro.util.Utf8;
36 import org.apache.sqoop.classification.InterfaceAudience;
37 import org.apache.sqoop.classification.InterfaceStability;
38 import org.apache.sqoop.common.SqoopException;
39 import org.apache.sqoop.connector.common.SqoopAvroUtils;
40 import org.apache.sqoop.error.code.IntermediateDataFormatError;
41 import org.apache.sqoop.schema.type.Column;
42 import org.apache.sqoop.utils.ClassUtils;
43 import org.joda.time.LocalDate;
44 import org.joda.time.LocalTime;
45
46 import java.io.DataInput;
47 import java.io.DataOutput;
48 import java.io.DataOutputStream;
49 import java.io.IOException;
50 import java.io.InputStream;
51 import java.math.BigDecimal;
52 import java.nio.ByteBuffer;
53 import java.util.Arrays;
54 import java.util.List;
55 import java.util.Map;
56 import java.util.Set;
57
58 /**
59 * IDF representing the intermediate format in Avro object
60 */
61 @InterfaceAudience.Public
62 @InterfaceStability.Unstable
63 @edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
64 public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRecord> {
65
66 private Schema avroSchema;
67
68 // need this default constructor for reflection magic used in execution engine
69 public AVROIntermediateDataFormat() {
70 }
71
72 // We need schema at all times
73 public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) {
74 setSchema(schema);
75 avroSchema = createAvroSchema(schema);
76 }
77
78 /**
79 * {@inheritDoc}
80 */
81 @Override
82 public void setCSVTextData(String text) {
83 super.validateSchema(schema);
84 // convert the CSV text to avro
85 this.data = toAVRO(text);
86 }
87
88 /**
89 * {@inheritDoc}
90 */
91 @Override
92 public String getCSVTextData() {
93 super.validateSchema(schema);
94 // convert avro to sqoop CSV
95 return toCSV(data);
96 }
97
98 /**
99 * {@inheritDoc}
100 */
101 @Override
102 public void setObjectData(Object[] data) {
103 super.validateSchema(schema);
104 // convert the object array to avro
105 this.data = toAVRO(data);
106 }
107
108 /**
109 * {@inheritDoc}
110 */
111 @Override
112 public Object[] getObjectData() {
113 super.validateSchema(schema);
114 // convert avro to object array
115 return toObject(data);
116 }
117
118 /**
119 * {@inheritDoc}
120 */
121 @Override
122 public void write(DataOutput out) throws IOException {
123 // do we need to write the schema?
124 DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
125 assert out instanceof DataOutputStream;
126 BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null);
127 writer.write(data, encoder);
128 }
129
130 /**
131 * {@inheritDoc}
132 */
133 @Override
134 public void read(DataInput in) throws IOException {
135 DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(avroSchema);
136 assert in instanceof InputStream;
137 Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null);
138 data = reader.read(null, decoder);
139 }
140
141 /**
142 * {@inheritDoc}
143 */
144 @Override
145 public Set<String> getJars() {
146
147 Set<String> jars = super.getJars();
148 jars.add(ClassUtils.jarForClass(GenericRecord.class));
149 return jars;
150 }
151
152 public GenericRecord toAVRO(String csv) {
153
154 String[] csvStringArray = parseCSVString(csv);
155
156 if (csvStringArray == null) {
157 return null;
158 }
159 Column[] columns = schema.getColumnsArray();
160 if (csvStringArray.length != columns.length) {
161 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
162 "The data " + csv + " has the wrong number of fields.");
163 }
164 GenericRecord avroObject = new GenericData.Record(avroSchema);
165 for (int i = 0; i < csvStringArray.length; i++) {
166 if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
167 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
168 columns[i].getName() + " does not support null values");
169 }
170 String name = SqoopAvroUtils.createAvroName(columns[i].getName());
171 if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
172 avroObject.put(name, null);
173 continue;
174 }
175 avroObject.put(name, toAVRO(csvStringArray[i], columns[i]));
176 }
177 return avroObject;
178 }
179
180 public Object toAVRO(String csvString, Column column) {
181 Object returnValue = null;
182
183 switch (column.getType()) {
184 case ARRAY:
185 case SET:
186 Object[] list = toList(csvString);
187 // store as a java collection
188 returnValue = Arrays.asList(list);
189 break;
190 case MAP:
191 // store as a map
192 returnValue = toMap(csvString);
193 break;
194 case ENUM:
195 returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString)));
196 break;
197 case TEXT:
198 returnValue = new Utf8(removeQuotes(csvString));
199 break;
200 case BINARY:
201 case UNKNOWN:
202 // avro accepts byte buffer for binary data
203 returnValue = ByteBuffer.wrap(toByteArray(csvString));
204 break;
205 case FIXED_POINT:
206 returnValue = toFixedPoint(csvString, column);
207 break;
208 case FLOATING_POINT:
209 returnValue = toFloatingPoint(csvString, column);
210 break;
211 case DECIMAL:
212 // TODO: store as FIXED in SQOOP-16161
213 returnValue = removeQuotes(csvString);
214 break;
215 case DATE:
216 // until 1.8 avro store as long
217 returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime();
218 break;
219 case TIME:
220 // until 1.8 avro store as long
221 returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis();
222 break;
223 case DATE_TIME:
224 // until 1.8 avro store as long
225 returnValue = toDateTimeInMillis(csvString, column);
226 break;
227 case BIT:
228 returnValue = Boolean.valueOf(removeQuotes(csvString));
229 break;
230 default:
231 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
232 "Column type from schema was not recognized for " + column.getType());
233 }
234 return returnValue;
235 }
236
237 public GenericRecord toAVRO(Object[] objectArray) {
238
239 if (objectArray == null) {
240 return null;
241 }
242 Column[] columns = schema.getColumnsArray();
243
244 if (objectArray.length != columns.length) {
245 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
246 "The data " + Arrays.toString(objectArray) + " has the wrong number of fields.");
247 }
248 // get avro schema from sqoop schema
249 GenericRecord avroObject = new GenericData.Record(avroSchema);
250 for (int i = 0; i < objectArray.length; i++) {
251 if (objectArray[i] == null && !columns[i].isNullable()) {
252 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
253 columns[i].getName() + " does not support null values");
254 }
255
256 String name = SqoopAvroUtils.createAvroName(columns[i].getName());
257
258 if (objectArray[i] == null) {
259 avroObject.put(name, null);
260 continue;
261 }
262
263 switch (columns[i].getType()) {
264 case ARRAY:
265 case SET:
266 avroObject.put(name, toList((Object[]) objectArray[i]));
267 break;
268 case ENUM:
269 GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
270 (String) objectArray[i]);
271 avroObject.put(name, enumValue);
272 break;
273 case TEXT:
274 avroObject.put(name, new Utf8((String) objectArray[i]));
275 break;
276 case BINARY:
277 case UNKNOWN:
278 avroObject.put(name, ByteBuffer.wrap((byte[]) objectArray[i]));
279 break;
280 case MAP:
281 case FIXED_POINT:
282 case FLOATING_POINT:
283 avroObject.put(name, objectArray[i]);
284 break;
285 case DECIMAL:
286 // TODO: store as FIXED in SQOOP-16161
287 avroObject.put(name, ((BigDecimal) objectArray[i]).toPlainString());
288 break;
289 case DATE_TIME:
290 if (objectArray[i] instanceof org.joda.time.DateTime) {
291 avroObject.put(name, ((org.joda.time.DateTime) objectArray[i]).toDate()
292 .getTime());
293 } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
294 avroObject.put(name, ((org.joda.time.LocalDateTime) objectArray[i])
295 .toDate().getTime());
296 }
297 break;
298 case TIME:
299 avroObject.put(name, ((org.joda.time.LocalTime) objectArray[i])
300 .toDateTimeToday().getMillis());
301 break;
302 case DATE:
303 avroObject.put(name, ((org.joda.time.LocalDate) objectArray[i]).toDate()
304 .getTime());
305 break;
306 case BIT:
307 avroObject.put(name, Boolean.valueOf(objectArray[i].toString()));
308 break;
309 default:
310 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
311 "Column type from schema was not recognized for " + columns[i].getType());
312 }
313 }
314
315 return avroObject;
316 }
317
318 @SuppressWarnings("unchecked")
319 public String toCSV(GenericRecord record) {
320 Column[] columns = this.schema.getColumnsArray();
321
322 StringBuilder csvString = new StringBuilder();
323 for (int i = 0; i < columns.length; i++) {
324
325 Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
326 if (obj == null && !columns[i].isNullable()) {
327 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
328 columns[i].getName() + " does not support null values");
329 }
330 if (obj == null) {
331 csvString.append(DEFAULT_NULL_VALUE);
332 } else {
333
334 switch (columns[i].getType()) {
335 case ARRAY:
336 case SET:
337 List<Object> objList = (List<Object>) obj;
338 csvString.append(toCSVList(toObjectArray(objList), columns[i]));
339 break;
340 case MAP:
341 Map<Object, Object> objMap = (Map<Object, Object>) obj;
342 csvString.append(toCSVMap(objMap, columns[i]));
343 break;
344 case ENUM:
345 case TEXT:
346 csvString.append(toCSVString(obj.toString()));
347 break;
348 case BINARY:
349 case UNKNOWN:
350 csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
351 break;
352 case FIXED_POINT:
353 csvString.append(toCSVFixedPoint(obj, columns[i]));
354 break;
355 case FLOATING_POINT:
356 csvString.append(toCSVFloatingPoint(obj, columns[i]));
357 break;
358 case DECIMAL:
359 // stored as string
360 csvString.append(toCSVDecimal(new BigDecimal(obj.toString())));
361 break;
362 case DATE:
363 // stored as long
364 Long dateInMillis = (Long) obj;
365 csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
366 break;
367 case TIME:
368 // stored as long
369 Long timeInMillis = (Long) obj;
370 csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), columns[i]));
371 break;
372 case DATE_TIME:
373 // stored as long
374 Long dateTimeInMillis = (Long) obj;
375 csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), columns[i]));
376 break;
377 case BIT:
378 csvString.append(toCSVBit(obj));
379 break;
380 default:
381 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
382 "Column type from schema was not recognized for " + columns[i].getType());
383 }
384 }
385 if (i < columns.length - 1) {
386 csvString.append(CSV_SEPARATOR_CHARACTER);
387 }
388
389 }
390
391 return csvString.toString();
392 }
393
394 @SuppressWarnings("unchecked")
395 public Object[] toObject(GenericRecord record) {
396
397 if (record == null) {
398 return null;
399 }
400 Column[] columns = schema.getColumnsArray();
401 Object[] object = new Object[columns.length];
402
403 for (int i = 0; i < columns.length; i++) {
404 Object obj = record.get(SqoopAvroUtils.createAvroName(columns[i].getName()));
405 Integer nameIndex = schema.getColumnNameIndex(SqoopAvroUtils.createAvroName(columns[i].getName()));
406 Column column = columns[nameIndex];
407 // null is a possible value
408 if (obj == null && !column.isNullable()) {
409 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
410 column.getName() + " does not support null values");
411 }
412 if (obj == null) {
413 object[nameIndex] = null;
414 continue;
415 }
416 switch (column.getType()) {
417 case ARRAY:
418 case SET:
419 object[nameIndex] = toObjectArray((List<Object>) obj);
420 break;
421 case ENUM:
422 // stored as enum symbol
423 case TEXT:
424 // stored as UTF8
425 object[nameIndex] = obj.toString();
426 break;
427 case DECIMAL:
428 // stored as string
429 object[nameIndex] = new BigDecimal(obj.toString());
430 break;
431 case BINARY:
432 case UNKNOWN:
433 // stored as byte buffer
434 object[nameIndex] = getBytesFromByteBuffer(obj);
435 break;
436 case MAP:
437 case FIXED_POINT:
438 case FLOATING_POINT:
439 // stored as java objects in avro as well
440 object[nameIndex] = obj;
441 break;
442 case DATE:
443 Long dateInMillis = (Long) obj;
444 object[nameIndex] = new org.joda.time.LocalDate(dateInMillis);
445 break;
446 case TIME:
447 Long timeInMillis = (Long) obj;
448 object[nameIndex] = new org.joda.time.LocalTime(timeInMillis);
449 break;
450 case DATE_TIME:
451 Long dateTimeInMillis = (Long) obj;
452 if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) {
453 object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis);
454 } else {
455 object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis);
456 }
457 break;
458 case BIT:
459 object[nameIndex] = toBit(obj.toString());
460 break;
461 default:
462 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
463 "Column type from schema was not recognized for " + column.getType());
464 }
465
466 }
467 return object;
468 }
469
470 public Schema getAvroSchema() {
471 return avroSchema;
472 }
473 }