SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / hcat / SqoopHCatImportMapper.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce.hcat;
20
21 import java.io.IOException;
22 import java.math.BigDecimal;
23 import java.sql.Date;
24 import java.sql.SQLException;
25 import java.sql.Time;
26 import java.sql.Timestamp;
27 import java.util.Map;
28 import java.util.Properties;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.Path;
34 import org.apache.hadoop.io.ArrayWritable;
35 import org.apache.hadoop.io.BytesWritable;
36 import org.apache.hadoop.io.DefaultStringifier;
37 import org.apache.hadoop.io.IntWritable;
38 import org.apache.hadoop.io.WritableComparable;
39 import org.apache.hcatalog.common.HCatConstants;
40 import org.apache.hcatalog.common.HCatUtil;
41 import org.apache.hcatalog.data.DefaultHCatRecord;
42 import org.apache.hcatalog.data.HCatRecord;
43 import org.apache.hcatalog.data.schema.HCatFieldSchema;
44 import org.apache.hcatalog.data.schema.HCatSchema;
45 import org.apache.hcatalog.mapreduce.InputJobInfo;
46 import org.apache.hcatalog.mapreduce.StorerInfo;
47 import org.apache.sqoop.lib.SqoopRecord;
48 import org.apache.sqoop.mapreduce.ImportJobBase;
49 import org.apache.sqoop.mapreduce.SqoopMapper;
50
51 import com.cloudera.sqoop.lib.BlobRef;
52 import com.cloudera.sqoop.lib.ClobRef;
53 import com.cloudera.sqoop.lib.DelimiterSet;
54 import com.cloudera.sqoop.lib.FieldFormatter;
55 import com.cloudera.sqoop.lib.LargeObjectLoader;
56
57 /**
58 * A mapper for HCatalog import.
59 */
60 public class SqoopHCatImportMapper extends
61 SqoopMapper<WritableComparable, SqoopRecord,
62 WritableComparable, HCatRecord> {
63 public static final Log LOG = LogFactory
64 .getLog(SqoopHCatImportMapper.class.getName());
65
66 private static boolean debugHCatImportMapper = false;
67
68 private InputJobInfo jobInfo;
69 private HCatSchema hCatFullTableSchema;
70 private int fieldCount;
71 private boolean bigDecimalFormatString;
72 private LargeObjectLoader lobLoader;
73 private HCatSchema partitionSchema = null;
74 private HCatSchema dataColsSchema = null;
75 private String stringDelimiterReplacements = null;
76 private ArrayWritable delimCharsArray;
77 private String hiveDelimsReplacement;
78 private boolean doHiveDelimsReplacement = false;
79 private DelimiterSet hiveDelimiters;
80 private String staticPartitionKey;
81 private int[] hCatFieldPositions;
82 private int colCount;
83
84 @Override
85 protected void setup(Context context)
86 throws IOException, InterruptedException {
87 Configuration conf = context.getConfiguration();
88 String inputJobInfoStr = conf.get(HCatConstants.HCAT_KEY_JOB_INFO);
89 jobInfo =
90 (InputJobInfo) HCatUtil.deserialize(inputJobInfoStr);
91 dataColsSchema = jobInfo.getTableInfo().getDataColumns();
92 partitionSchema =
93 jobInfo.getTableInfo().getPartitionColumns();
94 StringBuilder storerInfoStr = new StringBuilder(1024);
95 StorerInfo storerInfo = jobInfo.getTableInfo().getStorerInfo();
96 storerInfoStr.append("HCatalog Storer Info : ")
97 .append("\n\tHandler = ").append(storerInfo.getStorageHandlerClass())
98 .append("\n\tInput format class = ").append(storerInfo.getIfClass())
99 .append("\n\tOutput format class = ").append(storerInfo.getOfClass())
100 .append("\n\tSerde class = ").append(storerInfo.getSerdeClass());
101 Properties storerProperties = storerInfo.getProperties();
102 if (!storerProperties.isEmpty()) {
103 storerInfoStr.append("\nStorer properties ");
104 for (Map.Entry<Object, Object> entry : storerProperties.entrySet()) {
105 String key = (String) entry.getKey();
106 Object val = entry.getValue();
107 storerInfoStr.append("\n\t").append(key).append('=').append(val);
108 }
109 }
110 storerInfoStr.append("\n");
111 LOG.info(storerInfoStr);
112
113 hCatFullTableSchema = new HCatSchema(dataColsSchema.getFields());
114 for (HCatFieldSchema hfs : partitionSchema.getFields()) {
115 hCatFullTableSchema.append(hfs);
116 }
117 fieldCount = hCatFullTableSchema.size();
118 lobLoader = new LargeObjectLoader(conf,
119 new Path(jobInfo.getTableInfo().getTableLocation()));
120 bigDecimalFormatString = conf.getBoolean(
121 ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
122 ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
123 debugHCatImportMapper = conf.getBoolean(
124 SqoopHCatUtilities.DEBUG_HCAT_IMPORT_MAPPER_PROP, false);
125 IntWritable[] delimChars = DefaultStringifier.loadArray(conf,
126 SqoopHCatUtilities.HIVE_DELIMITERS_TO_REPLACE_PROP, IntWritable.class);
127 hiveDelimiters = new DelimiterSet(
128 (char) delimChars[0].get(), (char) delimChars[1].get(),
129 (char) delimChars[2].get(), (char) delimChars[3].get(),
130 delimChars[4].get() == 1 ? true : false);
131 hiveDelimsReplacement =
132 conf.get(SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_PROP);
133 if (hiveDelimsReplacement == null) {
134 hiveDelimsReplacement = "";
135 }
136 doHiveDelimsReplacement = Boolean.valueOf(conf.get(
137 SqoopHCatUtilities.HIVE_DELIMITERS_REPLACEMENT_ENABLED_PROP));
138
139 IntWritable[] fPos = DefaultStringifier.loadArray(conf,
140 SqoopHCatUtilities.HCAT_FIELD_POSITIONS_PROP, IntWritable.class);
141 hCatFieldPositions = new int[fPos.length];
142 for (int i = 0; i < fPos.length; ++i) {
143 hCatFieldPositions[i] = fPos[i].get();
144 }
145
146 LOG.debug("Hive delims replacement enabled : " + doHiveDelimsReplacement);
147 LOG.debug("Hive Delimiters : " + hiveDelimiters.toString());
148 LOG.debug("Hive delimiters replacement : " + hiveDelimsReplacement);
149 staticPartitionKey =
150 conf.get(SqoopHCatUtilities.HCAT_STATIC_PARTITION_KEY_PROP);
151 LOG.debug("Static partition key used : " + staticPartitionKey);
152
153
154 }
155
156 @Override
157 public void map(WritableComparable key, SqoopRecord value,
158 Context context)
159 throws IOException, InterruptedException {
160
161 try {
162 // Loading of LOBs was delayed until we have a Context.
163 value.loadLargeObjects(lobLoader);
164 } catch (SQLException sqlE) {
165 throw new IOException(sqlE);
166 }
167 if (colCount == -1) {
168 colCount = value.getFieldMap().size();
169 }
170 context.write(key, convertToHCatRecord(value));
171 }
172
173 @Override
174 protected void cleanup(Context context) throws IOException {
175 if (null != lobLoader) {
176 lobLoader.close();
177 }
178 }
179
180 private HCatRecord convertToHCatRecord(SqoopRecord sqr)
181 throws IOException {
182 Map<String, Object> fieldMap = sqr.getFieldMap();
183 HCatRecord result = new DefaultHCatRecord(fieldCount);
184
185 for (Map.Entry<String, Object> entry : fieldMap.entrySet()) {
186 String key = entry.getKey();
187 Object val = entry.getValue();
188 String hfn = key.toLowerCase();
189 if (staticPartitionKey != null && staticPartitionKey.equals(hfn)) {
190 continue;
191 }
192 HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
193 if (debugHCatImportMapper) {
194 LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
195 + " of type " + (val == null ? null : val.getClass().getName())
196 + ", hcattype " + hfs.getTypeString());
197 }
198 Object hCatVal = toHCat(val, hfs.getType(), hfs.getTypeString());
199
200 result.set(hfn, hCatFullTableSchema, hCatVal);
201 }
202
203 return result;
204 }
205
206
207 private Object toHCat(Object val, HCatFieldSchema.Type hfsType,
208 String hCatTypeString) {
209
210 if (val == null) {
211 return null;
212 }
213
214 Object retVal = null;
215
216 if (val instanceof Number) {
217 retVal = convertNumberTypes(val, hfsType);
218 } else if (val instanceof Boolean) {
219 retVal = convertBooleanTypes(val, hfsType);
220 } else if (val instanceof String) {
221 if (hfsType == HCatFieldSchema.Type.STRING) {
222 String str = (String) val;
223 if (doHiveDelimsReplacement) {
224 retVal = FieldFormatter
225 .hiveStringReplaceDelims(str, hiveDelimsReplacement,
226 hiveDelimiters);
227 } else {
228 retVal = str;
229 }
230 }
231 } else if (val instanceof java.util.Date) {
232 retVal = converDateTypes(val, hfsType);
233 } else if (val instanceof BytesWritable) {
234 if (hfsType == HCatFieldSchema.Type.BINARY) {
235 BytesWritable bw = (BytesWritable) val;
236 retVal = bw.getBytes();
237 }
238 } else if (val instanceof BlobRef) {
239 if (hfsType == HCatFieldSchema.Type.BINARY) {
240 BlobRef br = (BlobRef) val;
241 byte[] bytes = br.isExternal() ? br.toString().getBytes()
242 : br.getData();
243 retVal = bytes;
244 }
245 } else if (val instanceof ClobRef) {
246 if (hfsType == HCatFieldSchema.Type.STRING) {
247 ClobRef cr = (ClobRef) val;
248 String s = cr.isExternal() ? cr.toString() : cr.getData();
249 retVal = s;
250 }
251 } else {
252 throw new UnsupportedOperationException("Objects of type "
253 + val.getClass().getName() + " are not suported");
254 }
255 if (retVal == null) {
256 LOG.error("Objects of type "
257 + val.getClass().getName() + " can not be mapped to HCatalog type "
258 + hCatTypeString);
259 }
260 return retVal;
261 }
262
263 private Object converDateTypes(Object val,
264 HCatFieldSchema.Type hfsType) {
265 if (val instanceof java.sql.Date) {
266 if (hfsType == HCatFieldSchema.Type.BIGINT) {
267 return ((Date) val).getTime();
268 } else if (hfsType == HCatFieldSchema.Type.STRING) {
269 return val.toString();
270 }
271 } else if (val instanceof java.sql.Time) {
272 if (hfsType == HCatFieldSchema.Type.BIGINT) {
273 return ((Time) val).getTime();
274 } else if (hfsType == HCatFieldSchema.Type.STRING) {
275 return val.toString();
276 }
277 } else if (val instanceof java.sql.Timestamp) {
278 if (hfsType == HCatFieldSchema.Type.BIGINT) {
279 return ((Timestamp) val).getTime();
280 } else if (hfsType == HCatFieldSchema.Type.STRING) {
281 return val.toString();
282 }
283 }
284 return null;
285 }
286
287 private Object convertBooleanTypes(Object val,
288 HCatFieldSchema.Type hfsType) {
289 Boolean b = (Boolean) val;
290 if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
291 return b;
292 } else if (hfsType == HCatFieldSchema.Type.TINYINT) {
293 return (byte) (b ? 1 : 0);
294 } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
295 return (short) (b ? 1 : 0);
296 } else if (hfsType == HCatFieldSchema.Type.INT) {
297 return (int) (b ? 1 : 0);
298 } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
299 return (long) (b ? 1 : 0);
300 } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
301 return (float) (b ? 1 : 0);
302 } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
303 return (double) (b ? 1 : 0);
304 } else if (hfsType == HCatFieldSchema.Type.STRING) {
305 return val.toString();
306 }
307 return null;
308 }
309
310 private Object convertNumberTypes(Object val,
311 HCatFieldSchema.Type hfsType) {
312 if (!(val instanceof Number)) {
313 return null;
314 }
315 if (val instanceof BigDecimal && hfsType == HCatFieldSchema.Type.STRING) {
316 BigDecimal bd = (BigDecimal) val;
317 if (bigDecimalFormatString) {
318 return bd.toPlainString();
319 } else {
320 return bd.toString();
321 }
322 }
323 Number n = (Number) val;
324 if (hfsType == HCatFieldSchema.Type.TINYINT) {
325 return n.byteValue();
326 } else if (hfsType == HCatFieldSchema.Type.SMALLINT) {
327 return n.shortValue();
328 } else if (hfsType == HCatFieldSchema.Type.INT) {
329 return n.intValue();
330 } else if (hfsType == HCatFieldSchema.Type.BIGINT) {
331 return n.longValue();
332 } else if (hfsType == HCatFieldSchema.Type.FLOAT) {
333 return n.floatValue();
334 } else if (hfsType == HCatFieldSchema.Type.DOUBLE) {
335 return n.doubleValue();
336 } else if (hfsType == HCatFieldSchema.Type.BOOLEAN) {
337 return n.byteValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
338 } else if (hfsType == HCatFieldSchema.Type.STRING) {
339 return n.toString();
340 }
341 return null;
342 }
343 }