b55f7a00f5ae6156f75c7a166b1fd5303b3976a6
[sqoop.git] / connector / connector-sdk / src / main / java / org / apache / sqoop / connector / idf / AVROIntermediateDataFormat.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.sqoop.connector.idf;
20
21 import static org.apache.sqoop.connector.common.SqoopIDFUtils.*;
22 import static org.apache.sqoop.connector.common.SqoopAvroUtils.*;
23
24 import org.apache.avro.Schema;
25 import org.apache.avro.generic.GenericData;
26 import org.apache.avro.generic.GenericDatumReader;
27 import org.apache.avro.generic.GenericDatumWriter;
28 import org.apache.avro.generic.GenericRecord;
29 import org.apache.avro.io.BinaryEncoder;
30 import org.apache.avro.io.DatumReader;
31 import org.apache.avro.io.DatumWriter;
32 import org.apache.avro.io.Decoder;
33 import org.apache.avro.io.DecoderFactory;
34 import org.apache.avro.io.EncoderFactory;
35 import org.apache.avro.util.Utf8;
36 import org.apache.sqoop.classification.InterfaceAudience;
37 import org.apache.sqoop.classification.InterfaceStability;
38 import org.apache.sqoop.common.SqoopException;
39 import org.apache.sqoop.error.code.IntermediateDataFormatError;
40 import org.apache.sqoop.schema.type.Column;
41 import org.apache.sqoop.utils.ClassUtils;
42 import org.joda.time.LocalDate;
43 import org.joda.time.LocalTime;
44
45 import java.io.DataInput;
46 import java.io.DataOutput;
47 import java.io.DataOutputStream;
48 import java.io.IOException;
49 import java.io.InputStream;
50 import java.math.BigDecimal;
51 import java.nio.ByteBuffer;
52 import java.util.Arrays;
53 import java.util.List;
54 import java.util.Map;
55 import java.util.Set;
56
57 /**
58 * IDF representing the intermediate format in Avro object
59 */
60 @InterfaceAudience.Public
61 @InterfaceStability.Unstable
62 @edu.umd.cs.findbugs.annotations.SuppressWarnings("PZLA_PREFER_ZERO_LENGTH_ARRAYS")
63 public class AVROIntermediateDataFormat extends IntermediateDataFormat<GenericRecord> {
64
65 private Schema avroSchema;
66
67 // need this default constructor for reflection magic used in execution engine
68 public AVROIntermediateDataFormat() {
69 }
70
71 // We need schema at all times
72 public AVROIntermediateDataFormat(org.apache.sqoop.schema.Schema schema) {
73 setSchema(schema);
74 avroSchema = createAvroSchema(schema);
75 }
76
77 /**
78 * {@inheritDoc}
79 */
80 @Override
81 public void setCSVTextData(String text) {
82 super.validateSchema(schema);
83 // convert the CSV text to avro
84 this.data = toAVRO(text);
85 }
86
87 /**
88 * {@inheritDoc}
89 */
90 @Override
91 public String getCSVTextData() {
92 super.validateSchema(schema);
93 // convert avro to sqoop CSV
94 return toCSV(data);
95 }
96
97 /**
98 * {@inheritDoc}
99 */
100 @Override
101 public void setObjectData(Object[] data) {
102 super.validateSchema(schema);
103 // convert the object array to avro
104 this.data = toAVRO(data);
105 }
106
107 /**
108 * {@inheritDoc}
109 */
110 @Override
111 public Object[] getObjectData() {
112 super.validateSchema(schema);
113 // convert avro to object array
114 return toObject(data);
115 }
116
117 /**
118 * {@inheritDoc}
119 */
120 @Override
121 public void write(DataOutput out) throws IOException {
122 // do we need to write the schema?
123 DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(avroSchema);
124 assert out instanceof DataOutputStream;
125 BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((DataOutputStream) out, null);
126 writer.write(data, encoder);
127 }
128
129 /**
130 * {@inheritDoc}
131 */
132 @Override
133 public void read(DataInput in) throws IOException {
134 DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(avroSchema);
135 assert in instanceof InputStream;
136 Decoder decoder = DecoderFactory.get().binaryDecoder((InputStream) in, null);
137 data = reader.read(null, decoder);
138 }
139
140 /**
141 * {@inheritDoc}
142 */
143 @Override
144 public Set<String> getJars() {
145
146 Set<String> jars = super.getJars();
147 jars.add(ClassUtils.jarForClass(GenericRecord.class));
148 return jars;
149 }
150
151 public GenericRecord toAVRO(String csv) {
152
153 String[] csvStringArray = parseCSVString(csv);
154
155 if (csvStringArray == null) {
156 return null;
157 }
158 Column[] columns = schema.getColumnsArray();
159 if (csvStringArray.length != columns.length) {
160 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
161 "The data " + csv + " has the wrong number of fields.");
162 }
163 GenericRecord avroObject = new GenericData.Record(avroSchema);
164 for (int i = 0; i < csvStringArray.length; i++) {
165 if (csvStringArray[i].equals(DEFAULT_NULL_VALUE) && !columns[i].isNullable()) {
166 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
167 columns[i].getName() + " does not support null values");
168 }
169 if (csvStringArray[i].equals(DEFAULT_NULL_VALUE)) {
170 avroObject.put(columns[i].getName(), null);
171 continue;
172 }
173 avroObject.put(columns[i].getName(), toAVRO(csvStringArray[i], columns[i]));
174 }
175 return avroObject;
176 }
177
178 public Object toAVRO(String csvString, Column column) {
179 Object returnValue = null;
180
181 switch (column.getType()) {
182 case ARRAY:
183 case SET:
184 Object[] list = toList(csvString);
185 // store as a java collection
186 returnValue = Arrays.asList(list);
187 break;
188 case MAP:
189 // store as a map
190 returnValue = toMap(csvString);
191 break;
192 case ENUM:
193 returnValue = new GenericData.EnumSymbol(createEnumSchema(column), (removeQuotes(csvString)));
194 break;
195 case TEXT:
196 returnValue = new Utf8(removeQuotes(csvString));
197 break;
198 case BINARY:
199 case UNKNOWN:
200 // avro accepts byte buffer for binary data
201 returnValue = ByteBuffer.wrap(toByteArray(csvString));
202 break;
203 case FIXED_POINT:
204 returnValue = toFixedPoint(csvString, column);
205 break;
206 case FLOATING_POINT:
207 returnValue = toFloatingPoint(csvString, column);
208 break;
209 case DECIMAL:
210 // TODO: store as FIXED in SQOOP-16161
211 returnValue = removeQuotes(csvString);
212 break;
213 case DATE:
214 // until 1.8 avro store as long
215 returnValue = ((LocalDate) toDate(csvString, column)).toDate().getTime();
216 break;
217 case TIME:
218 // until 1.8 avro store as long
219 returnValue = ((LocalTime) toTime(csvString, column)).toDateTimeToday().getMillis();
220 break;
221 case DATE_TIME:
222 // until 1.8 avro store as long
223 returnValue = toDateTimeInMillis(csvString, column);
224 break;
225 case BIT:
226 returnValue = Boolean.valueOf(removeQuotes(csvString));
227 break;
228 default:
229 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0004,
230 "Column type from schema was not recognized for " + column.getType());
231 }
232 return returnValue;
233 }
234
235 public GenericRecord toAVRO(Object[] objectArray) {
236
237 if (objectArray == null) {
238 return null;
239 }
240 Column[] columns = schema.getColumnsArray();
241
242 if (objectArray.length != columns.length) {
243 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
244 "The data " + Arrays.toString(objectArray) + " has the wrong number of fields.");
245 }
246 // get avro schema from sqoop schema
247 GenericRecord avroObject = new GenericData.Record(avroSchema);
248 for (int i = 0; i < objectArray.length; i++) {
249 if (objectArray[i] == null && !columns[i].isNullable()) {
250 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
251 columns[i].getName() + " does not support null values");
252 }
253 if (objectArray[i] == null) {
254 avroObject.put(columns[i].getName(), null);
255 continue;
256 }
257
258 switch (columns[i].getType()) {
259 case ARRAY:
260 case SET:
261 avroObject.put(columns[i].getName(), toList((Object[]) objectArray[i]));
262 break;
263 case ENUM:
264 GenericData.EnumSymbol enumValue = new GenericData.EnumSymbol(createEnumSchema(columns[i]),
265 (String) objectArray[i]);
266 avroObject.put(columns[i].getName(), enumValue);
267 break;
268 case TEXT:
269 avroObject.put(columns[i].getName(), new Utf8((String) objectArray[i]));
270 break;
271 case BINARY:
272 case UNKNOWN:
273 avroObject.put(columns[i].getName(), ByteBuffer.wrap((byte[]) objectArray[i]));
274 break;
275 case MAP:
276 case FIXED_POINT:
277 case FLOATING_POINT:
278 avroObject.put(columns[i].getName(), objectArray[i]);
279 break;
280 case DECIMAL:
281 // TODO: store as FIXED in SQOOP-16161
282 avroObject.put(columns[i].getName(), ((BigDecimal) objectArray[i]).toPlainString());
283 break;
284 case DATE_TIME:
285 if (objectArray[i] instanceof org.joda.time.DateTime) {
286 avroObject.put(columns[i].getName(), ((org.joda.time.DateTime) objectArray[i]).toDate()
287 .getTime());
288 } else if (objectArray[i] instanceof org.joda.time.LocalDateTime) {
289 avroObject.put(columns[i].getName(), ((org.joda.time.LocalDateTime) objectArray[i])
290 .toDate().getTime());
291 }
292 break;
293 case TIME:
294 avroObject.put(columns[i].getName(), ((org.joda.time.LocalTime) objectArray[i])
295 .toDateTimeToday().getMillis());
296 break;
297 case DATE:
298 avroObject.put(columns[i].getName(), ((org.joda.time.LocalDate) objectArray[i]).toDate()
299 .getTime());
300 break;
301 case BIT:
302 avroObject.put(columns[i].getName(), Boolean.valueOf(objectArray[i].toString()));
303 break;
304 default:
305 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
306 "Column type from schema was not recognized for " + columns[i].getType());
307 }
308 }
309
310 return avroObject;
311 }
312
313 @SuppressWarnings("unchecked")
314 public String toCSV(GenericRecord record) {
315 Column[] columns = this.schema.getColumnsArray();
316
317 StringBuilder csvString = new StringBuilder();
318 for (int i = 0; i < columns.length; i++) {
319
320 Object obj = record.get(columns[i].getName());
321 if (obj == null && !columns[i].isNullable()) {
322 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
323 columns[i].getName() + " does not support null values");
324 }
325 if (obj == null) {
326 csvString.append(DEFAULT_NULL_VALUE);
327 } else {
328
329 switch (columns[i].getType()) {
330 case ARRAY:
331 case SET:
332 List<Object> objList = (List<Object>) obj;
333 csvString.append(toCSVList(toObjectArray(objList), columns[i]));
334 break;
335 case MAP:
336 Map<Object, Object> objMap = (Map<Object, Object>) obj;
337 csvString.append(toCSVMap(objMap, columns[i]));
338 break;
339 case ENUM:
340 case TEXT:
341 csvString.append(toCSVString(obj.toString()));
342 break;
343 case BINARY:
344 case UNKNOWN:
345 csvString.append(toCSVByteArray(getBytesFromByteBuffer(obj)));
346 break;
347 case FIXED_POINT:
348 csvString.append(toCSVFixedPoint(obj, columns[i]));
349 break;
350 case FLOATING_POINT:
351 csvString.append(toCSVFloatingPoint(obj, columns[i]));
352 break;
353 case DECIMAL:
354 // stored as string
355 csvString.append(toCSVDecimal(new BigDecimal(obj.toString())));
356 break;
357 case DATE:
358 // stored as long
359 Long dateInMillis = (Long) obj;
360 csvString.append(toCSVDate(new org.joda.time.LocalDate(dateInMillis)));
361 break;
362 case TIME:
363 // stored as long
364 Long timeInMillis = (Long) obj;
365 csvString.append(toCSVTime(new org.joda.time.LocalTime(timeInMillis), columns[i]));
366 break;
367 case DATE_TIME:
368 // stored as long
369 Long dateTimeInMillis = (Long) obj;
370 csvString.append(toCSVDateTime(new org.joda.time.DateTime(dateTimeInMillis), columns[i]));
371 break;
372 case BIT:
373 csvString.append(toCSVBit(obj));
374 break;
375 default:
376 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
377 "Column type from schema was not recognized for " + columns[i].getType());
378 }
379 }
380 if (i < columns.length - 1) {
381 csvString.append(CSV_SEPARATOR_CHARACTER);
382 }
383
384 }
385
386 return csvString.toString();
387 }
388
389 @SuppressWarnings("unchecked")
390 public Object[] toObject(GenericRecord record) {
391
392 if (record == null) {
393 return null;
394 }
395 Column[] columns = schema.getColumnsArray();
396 Object[] object = new Object[columns.length];
397
398 for (int i = 0; i < columns.length; i++) {
399 Object obj = record.get(columns[i].getName());
400 Integer nameIndex = schema.getColumnNameIndex(columns[i].getName());
401 Column column = columns[nameIndex];
402 // null is a possible value
403 if (obj == null && !column.isNullable()) {
404 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0005,
405 column.getName() + " does not support null values");
406 }
407 if (obj == null) {
408 object[nameIndex] = null;
409 continue;
410 }
411 switch (column.getType()) {
412 case ARRAY:
413 case SET:
414 object[nameIndex] = toObjectArray((List<Object>) obj);
415 break;
416 case ENUM:
417 // stored as enum symbol
418 case TEXT:
419 // stored as UTF8
420 object[nameIndex] = obj.toString();
421 break;
422 case DECIMAL:
423 // stored as string
424 object[nameIndex] = new BigDecimal(obj.toString());
425 break;
426 case BINARY:
427 case UNKNOWN:
428 // stored as byte buffer
429 object[nameIndex] = getBytesFromByteBuffer(obj);
430 break;
431 case MAP:
432 case FIXED_POINT:
433 case FLOATING_POINT:
434 // stored as java objects in avro as well
435 object[nameIndex] = obj;
436 break;
437 case DATE:
438 Long dateInMillis = (Long) obj;
439 object[nameIndex] = new org.joda.time.LocalDate(dateInMillis);
440 break;
441 case TIME:
442 Long timeInMillis = (Long) obj;
443 object[nameIndex] = new org.joda.time.LocalTime(timeInMillis);
444 break;
445 case DATE_TIME:
446 Long dateTimeInMillis = (Long) obj;
447 if (((org.apache.sqoop.schema.type.DateTime) column).hasTimezone()) {
448 object[nameIndex] = new org.joda.time.DateTime(dateTimeInMillis);
449 } else {
450 object[nameIndex] = new org.joda.time.LocalDateTime(dateTimeInMillis);
451 }
452 break;
453 case BIT:
454 object[nameIndex] = toBit(obj.toString());
455 break;
456 default:
457 throw new SqoopException(IntermediateDataFormatError.INTERMEDIATE_DATA_FORMAT_0001,
458 "Column type from schema was not recognized for " + column.getType());
459 }
460
461 }
462 return object;
463 }
464
465 public Schema getAvroSchema() {
466 return avroSchema;
467 }
468 }