SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / hcat / SqoopHCatExportFormat.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce.hcat;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Collections;
24 import java.util.Comparator;
25 import java.util.List;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.io.WritableComparable;
30 import org.apache.hadoop.mapreduce.InputSplit;
31 import org.apache.hadoop.mapreduce.JobContext;
32 import org.apache.hadoop.mapreduce.RecordReader;
33 import org.apache.hadoop.mapreduce.TaskAttemptContext;
34 import org.apache.hcatalog.data.HCatRecord;
35 import org.apache.hcatalog.mapreduce.HCatInputFormat;
36 import org.apache.sqoop.mapreduce.ExportInputFormat;
37
38 /**
39 * A combined HCatInputFormat equivalent that allows us to generate the number
40 * of splits to the number of map tasks.
41 *
42 * The logic is simple. We get the list of splits for HCatInputFormat. If it is
43 * less than the number of mappers, all is good. Else, we sort the splits by
44 * size and assign them to each of the mappers in a simple scheme. After
45 * assigning the splits to each of the mapper, for the next round we start with
46 * the mapper that got the last split. That way, the size of the split is
47 * distributed in a more uniform fashion than a simple round-robin assignment.
48 */
49 public class SqoopHCatExportFormat extends HCatInputFormat {
50 public static final Log LOG = LogFactory
51 .getLog(SqoopHCatExportFormat.class.getName());
52
53 @Override
54 public List<InputSplit> getSplits(JobContext job)
55 throws IOException, InterruptedException {
56 List<InputSplit> hCatSplits = super.getSplits(job);
57 int hCatSplitCount = hCatSplits.size();
58 int expectedSplitCount = ExportInputFormat.getNumMapTasks(job);
59 if (expectedSplitCount == 0) {
60 expectedSplitCount = hCatSplitCount;
61 }
62 LOG.debug("Expected split count " + expectedSplitCount);
63 LOG.debug("HCatInputFormat provided split count " + hCatSplitCount);
64 // Sort the splits by length descending.
65
66 Collections.sort(hCatSplits, new Comparator<InputSplit>() {
67 @Override
68 public int compare(InputSplit is1, InputSplit is2) {
69 try {
70 return (int) (is2.getLength() - is1.getLength());
71 } catch (Exception e) {
72 LOG.warn("Exception caught while sorting Input splits " + e);
73 }
74 return 0;
75 }
76 });
77 List<InputSplit> combinedSplits = new ArrayList<InputSplit>();
78
79 // The number of splits generated by HCatInputFormat is within
80 // our limits
81
82 if (hCatSplitCount <= expectedSplitCount) {
83 for (InputSplit split : hCatSplits) {
84 List<InputSplit> hcSplitList = new ArrayList<InputSplit>();
85 hcSplitList.add(split);
86 combinedSplits.add(new SqoopHCatInputSplit(hcSplitList));
87 }
88 return combinedSplits;
89 }
90 List<List<InputSplit>> combinedSplitList =
91 new ArrayList<List<InputSplit>>();
92 for (int i = 0; i < expectedSplitCount; i++) {
93 combinedSplitList.add(new ArrayList<InputSplit>());
94 }
95 boolean ascendingAssigment = true;
96
97 int lastSet = 0;
98 for (int i = 0; i < hCatSplitCount; ++i) {
99 int splitNum = i % expectedSplitCount;
100 int currentSet = i / expectedSplitCount;
101 if (currentSet != lastSet) {
102 ascendingAssigment = !ascendingAssigment;
103 }
104 if (ascendingAssigment) {
105 combinedSplitList.get(splitNum).add(hCatSplits.get(i));
106 } else {
107 combinedSplitList.
108 get(expectedSplitCount - 1 - splitNum).add(hCatSplits.get(i));
109 }
110 lastSet = currentSet;
111 }
112 for (int i = 0; i < expectedSplitCount; i++) {
113 SqoopHCatInputSplit sqoopSplit =
114 new SqoopHCatInputSplit(combinedSplitList.get(i));
115 combinedSplits.add(sqoopSplit);
116 }
117
118 return combinedSplits;
119
120 }
121
122 @Override
123 public RecordReader<WritableComparable, HCatRecord>
124 createRecordReader(InputSplit split,
125 TaskAttemptContext taskContext)
126 throws IOException, InterruptedException {
127 LOG.debug("Creating a SqoopHCatRecordReader");
128 return new SqoopHCatRecordReader(split, taskContext, this);
129 }
130
131 public RecordReader<WritableComparable, HCatRecord>
132 createHCatRecordReader(InputSplit split,
133 TaskAttemptContext taskContext)
134 throws IOException, InterruptedException {
135 LOG.debug("Creating a base HCatRecordReader");
136 return super.createRecordReader(split, taskContext);
137 }
138 }