SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / hcat / SqoopHCatExportMapper.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce.hcat;
20
21 import java.io.IOException;
22 import java.math.BigDecimal;
23 import java.sql.Date;
24 import java.sql.Time;
25 import java.sql.Timestamp;
26 import java.util.List;
27 import java.util.Map;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.io.BytesWritable;
33 import org.apache.hadoop.io.DefaultStringifier;
34 import org.apache.hadoop.io.IntWritable;
35 import org.apache.hadoop.io.MapWritable;
36 import org.apache.hadoop.io.NullWritable;
37 import org.apache.hadoop.io.Text;
38 import org.apache.hadoop.io.WritableComparable;
39 import org.apache.hadoop.util.ReflectionUtils;
40 import org.apache.hcatalog.common.HCatConstants;
41 import org.apache.hcatalog.common.HCatUtil;
42 import org.apache.hcatalog.data.HCatRecord;
43 import org.apache.hcatalog.data.schema.HCatFieldSchema;
44 import org.apache.hcatalog.data.schema.HCatSchema;
45 import org.apache.hcatalog.mapreduce.InputJobInfo;
46 import org.apache.sqoop.lib.SqoopRecord;
47 import org.apache.sqoop.mapreduce.AutoProgressMapper;
48 import org.apache.sqoop.mapreduce.ExportJobBase;
49
50 /**
51 * A mapper that works on combined hcat splits.
52 */
53 public class SqoopHCatExportMapper
54 extends
55 AutoProgressMapper<WritableComparable, HCatRecord,
56 SqoopRecord, WritableComparable> {
57 public static final Log LOG = LogFactory
58 .getLog(SqoopHCatExportMapper.class.getName());
59 private InputJobInfo jobInfo;
60 private HCatSchema hCatFullTableSchema;
61 private List<HCatFieldSchema> hCatSchemaFields;
62
63 private SqoopRecord sqoopRecord;
64 private static final String TIMESTAMP_TYPE = "java.sql.Timestamp";
65 private static final String TIME_TYPE = "java.sql.Time";
66 private static final String DATE_TYPE = "java.sql.Date";
67 private static final String BIG_DECIMAL_TYPE = "java.math.BigDecimal";
68 private static final String FLOAT_TYPE = "Float";
69 private static final String DOUBLE_TYPE = "Double";
70 private static final String BYTE_TYPE = "Byte";
71 private static final String SHORT_TYPE = "Short";
72 private static final String INTEGER_TYPE = "Integer";
73 private static final String LONG_TYPE = "Long";
74 private static final String BOOLEAN_TYPE = "Boolean";
75 private static final String STRING_TYPE = "String";
76 private static final String BYTESWRITABLE =
77 "org.apache.hadoop.io.BytesWritable";
78 private static boolean debugHCatExportMapper = false;
79 private MapWritable colTypesJava;
80 private MapWritable colTypesSql;
81
82 @Override
83 protected void setup(Context context)
84 throws IOException, InterruptedException {
85 super.setup(context);
86
87 Configuration conf = context.getConfiguration();
88
89 colTypesJava = DefaultStringifier.load(conf,
90 SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_JAVA, MapWritable.class);
91 colTypesSql = DefaultStringifier.load(conf,
92 SqoopHCatUtilities.HCAT_DB_OUTPUT_COLTYPES_SQL, MapWritable.class);
93 // Instantiate a copy of the user's class to hold and parse the record.
94
95 String recordClassName = conf.get(
96 ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY);
97 if (null == recordClassName) {
98 throw new IOException("Export table class name ("
99 + ExportJobBase.SQOOP_EXPORT_TABLE_CLASS_KEY
100 + ") is not set!");
101 }
102 debugHCatExportMapper = conf.getBoolean(
103 SqoopHCatUtilities.DEBUG_HCAT_EXPORT_MAPPER_PROP, false);
104 try {
105 Class cls = Class.forName(recordClassName, true,
106 Thread.currentThread().getContextClassLoader());
107 sqoopRecord = (SqoopRecord) ReflectionUtils.newInstance(cls, conf);
108 } catch (ClassNotFoundException cnfe) {
109 throw new IOException(cnfe);
110 }
111
112 if (null == sqoopRecord) {
113 throw new IOException("Could not instantiate object of type "
114 + recordClassName);
115 }
116
117 String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
118 jobInfo =
119 (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
120 HCatSchema tableSchema = jobInfo.getTableInfo().getDataColumns();
121 HCatSchema partitionSchema =
122 jobInfo.getTableInfo().getPartitionColumns();
123 hCatFullTableSchema = new HCatSchema(tableSchema.getFields());
124 for (HCatFieldSchema hfs : partitionSchema.getFields()) {
125 hCatFullTableSchema.append(hfs);
126 }
127 hCatSchemaFields = hCatFullTableSchema.getFields();
128
129 }
130
131 @Override
132 public void map(WritableComparable key, HCatRecord value,
133 Context context)
134 throws IOException, InterruptedException {
135 context.write(convertToSqoopRecord(value), NullWritable.get());
136 }
137
138 private SqoopRecord convertToSqoopRecord(HCatRecord hcr)
139 throws IOException {
140 Text key = new Text();
141 for (Map.Entry<String, Object> e : sqoopRecord.getFieldMap().entrySet()) {
142 String colName = e.getKey();
143 String hfn = colName.toLowerCase();
144 key.set(hfn);
145 String javaColType = colTypesJava.get(key).toString();
146 int sqlType = ((IntWritable) colTypesSql.get(key)).get();
147 HCatFieldSchema field =
148 hCatFullTableSchema.get(hfn);
149 HCatFieldSchema.Type fieldType = field.getType();
150 Object hCatVal =
151 hcr.get(hfn, hCatFullTableSchema);
152 String hCatTypeString = field.getTypeString();
153 Object sqlVal = convertToSqoop(hCatVal, fieldType,
154 javaColType, hCatTypeString);
155 if (debugHCatExportMapper) {
156 LOG.debug("hCatVal " + hCatVal + " of type "
157 + (hCatVal == null ? null : hCatVal.getClass().getName())
158 + ",sqlVal " + sqlVal + " of type "
159 + (sqlVal == null ? null : sqlVal.getClass().getName())
160 + ",java type " + javaColType + ", sql type = "
161 + SqoopHCatUtilities.sqlTypeString(sqlType));
162 }
163 sqoopRecord.setField(colName, sqlVal);
164 }
165 return sqoopRecord;
166 }
167
168 private Object convertToSqoop(Object val,
169 HCatFieldSchema.Type fieldType, String javaColType,
170 String hCatTypeString) throws IOException {
171
172 if (val == null) {
173 return null;
174 }
175
176 switch (fieldType) {
177 case INT:
178 case TINYINT:
179 case SMALLINT:
180 case FLOAT:
181 case DOUBLE:
182 val = convertNumberTypes(val, javaColType);
183 if (val != null) {
184 return val;
185 }
186 break;
187 case BOOLEAN:
188 val = convertBooleanTypes(val, javaColType);
189 if (val != null) {
190 return val;
191 }
192 break;
193 case BIGINT:
194 if (javaColType.equals(DATE_TYPE)) {
195 return new Date((Long) val);
196 } else if (javaColType.equals(TIME_TYPE)) {
197 return new Time((Long) val);
198 } else if (javaColType.equals(TIMESTAMP_TYPE)) {
199 return new Timestamp((Long) val);
200 } else {
201 val = convertNumberTypes(val, javaColType);
202 if (val != null) {
203 return val;
204 }
205 }
206 break;
207 case STRING:
208 val = convertStringTypes(val, javaColType);
209 if (val != null) {
210 return val;
211 }
212 break;
213 case BINARY:
214 val = convertBinaryTypes(val, javaColType);
215 if (val != null) {
216 return val;
217 }
218 break;
219 case ARRAY:
220 case MAP:
221 case STRUCT:
222 default:
223 throw new IOException("Cannot convert HCatalog type "
224 + fieldType);
225 }
226 LOG.error("Cannot convert HCatalog object of "
227 + " type " + hCatTypeString + " to java object type "
228 + javaColType);
229 return null;
230 }
231
232 private Object convertBinaryTypes(Object val, String javaColType) {
233 byte[] bb = (byte[]) val;
234 if (javaColType.equals(BYTESWRITABLE)) {
235 BytesWritable bw = new BytesWritable();
236 bw.set(bb, 0, bb.length);
237 return bw;
238 }
239 return null;
240 }
241
242 private Object convertStringTypes(Object val, String javaColType) {
243 String valStr = val.toString();
244 if (javaColType.equals(BIG_DECIMAL_TYPE)) {
245 return new BigDecimal(valStr);
246 } else if (javaColType.equals(DATE_TYPE)
247 || javaColType.equals(TIME_TYPE)
248 || javaColType.equals(TIMESTAMP_TYPE)) {
249 // Oracle expects timestamps for Date also by default based on version
250 // Just allow all date types to be assignment compatible
251 if (valStr.length() == 10) { // Date in yyyy-mm-dd format
252 Date d = Date.valueOf(valStr);
253 if (javaColType.equals(DATE_TYPE)) {
254 return d;
255 } else if (javaColType.equals(TIME_TYPE)) {
256 return new Time(d.getTime());
257 } else if (javaColType.equals(TIMESTAMP_TYPE)) {
258 return new Timestamp(d.getTime());
259 }
260 } else if (valStr.length() == 8) { // time in hh:mm:ss
261 Time t = Time.valueOf(valStr);
262 if (javaColType.equals(DATE_TYPE)) {
263 return new Date(t.getTime());
264 } else if (javaColType.equals(TIME_TYPE)) {
265 return t;
266 } else if (javaColType.equals(TIMESTAMP_TYPE)) {
267 return new Timestamp(t.getTime());
268 }
269 } else if (valStr.length() == 19) { // timestamp in yyyy-mm-dd hh:ss:mm
270 Timestamp ts = Timestamp.valueOf(valStr);
271 if (javaColType.equals(DATE_TYPE)) {
272 return new Date(ts.getTime());
273 } else if (javaColType.equals(TIME_TYPE)) {
274 return new Time(ts.getTime());
275 } else if (javaColType.equals(TIMESTAMP_TYPE)) {
276 return ts;
277 }
278 } else {
279 return null;
280 }
281 } else if (javaColType.equals(STRING_TYPE)) {
282 return valStr;
283 } else if (javaColType.equals(BOOLEAN_TYPE)) {
284 return Boolean.valueOf(valStr);
285 } else if (javaColType.equals(BYTE_TYPE)) {
286 return Byte.parseByte(valStr);
287 } else if (javaColType.equals(SHORT_TYPE)) {
288 return Short.parseShort(valStr);
289 } else if (javaColType.equals(INTEGER_TYPE)) {
290 return Integer.parseInt(valStr);
291 } else if (javaColType.equals(LONG_TYPE)) {
292 return Long.parseLong(valStr);
293 } else if (javaColType.equals(FLOAT_TYPE)) {
294 return Float.parseFloat(valStr);
295 } else if (javaColType.equals(DOUBLE_TYPE)) {
296 return Double.parseDouble(valStr);
297 }
298 return null;
299 }
300
301 private Object convertBooleanTypes(Object val, String javaColType) {
302 Boolean b = (Boolean) val;
303 if (javaColType.equals(BOOLEAN_TYPE)) {
304 return b;
305 } else if (javaColType.equals(BYTE_TYPE)) {
306 return (byte) (b ? 1 : 0);
307 } else if (javaColType.equals(SHORT_TYPE)) {
308 return (short) (b ? 1 : 0);
309 } else if (javaColType.equals(INTEGER_TYPE)) {
310 return (int) (b ? 1 : 0);
311 } else if (javaColType.equals(LONG_TYPE)) {
312 return (long) (b ? 1 : 0);
313 } else if (javaColType.equals(FLOAT_TYPE)) {
314 return (float) (b ? 1 : 0);
315 } else if (javaColType.equals(DOUBLE_TYPE)) {
316 return (double) (b ? 1 : 0);
317 } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
318 return new BigDecimal(b ? 1 : 0);
319 } else if (javaColType.equals(STRING_TYPE)) {
320 return val.toString();
321 }
322 return null;
323 }
324
325 private Object convertNumberTypes(Object val, String javaColType) {
326 Number n = (Number) val;
327 if (javaColType.equals(BYTE_TYPE)) {
328 return n.byteValue();
329 } else if (javaColType.equals(SHORT_TYPE)) {
330 return n.shortValue();
331 } else if (javaColType.equals(INTEGER_TYPE)) {
332 return n.intValue();
333 } else if (javaColType.equals(LONG_TYPE)) {
334 return n.longValue();
335 } else if (javaColType.equals(FLOAT_TYPE)) {
336 return n.floatValue();
337 } else if (javaColType.equals(DOUBLE_TYPE)) {
338 return n.doubleValue();
339 } else if (javaColType.equals(BIG_DECIMAL_TYPE)) {
340 return new BigDecimal(n.doubleValue());
341 } else if (javaColType.equals(BOOLEAN_TYPE)) {
342 return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
343 } else if (javaColType.equals(STRING_TYPE)) {
344 return n.toString();
345 }
346 return null;
347 }
348
349 }