SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / hcat / SqoopHCatRecordReader.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce.hcat;
20
21 import java.io.IOException;
22
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.LogFactory;
25 import org.apache.hadoop.io.WritableComparable;
26 import org.apache.hadoop.mapreduce.InputSplit;
27 import org.apache.hadoop.mapreduce.RecordReader;
28 import org.apache.hadoop.mapreduce.TaskAttemptContext;
29 import org.apache.hcatalog.data.HCatRecord;
30
31 /**
32 * A Record Reader that can combine underlying splits.
33 */
34 public class SqoopHCatRecordReader extends
35 RecordReader<WritableComparable, HCatRecord> {
36 private final SqoopHCatExportFormat hCatExportFormat;
37 private SqoopHCatInputSplit hCatSplit;
38 private TaskAttemptContext context;
39 private int subIndex;
40 private long progress;
41
42 private RecordReader<WritableComparable, HCatRecord> curReader;
43
44 public static final Log LOG = LogFactory
45 .getLog(SqoopHCatRecordReader.class.getName());
46
47 public SqoopHCatRecordReader(final InputSplit split,
48 final TaskAttemptContext context, final SqoopHCatExportFormat inputFormat)
49 throws IOException {
50 this.hCatSplit = (SqoopHCatInputSplit) split;
51 this.context = context;
52 this.subIndex = 0;
53 this.curReader = null;
54 this.progress = 0L;
55 this.hCatExportFormat = inputFormat;
56
57 initNextRecordReader();
58 }
59
60 @Override
61 public void initialize(final InputSplit split,
62 final TaskAttemptContext ctxt)
63 throws IOException, InterruptedException {
64 this.hCatSplit = (SqoopHCatInputSplit) split;
65 this.context = ctxt;
66
67 if (null != this.curReader) {
68 this.curReader.initialize(((SqoopHCatInputSplit) split)
69 .get(0), context);
70 }
71 }
72
73 @Override
74 public boolean nextKeyValue() throws IOException, InterruptedException {
75 while (this.curReader == null || !this.curReader.nextKeyValue()) {
76 if (!initNextRecordReader()) {
77 return false;
78 }
79 }
80 return true;
81 }
82
83 @Override
84 public WritableComparable getCurrentKey() throws IOException,
85 InterruptedException {
86 return this.curReader.getCurrentKey();
87 }
88
89 @Override
90 public HCatRecord getCurrentValue() throws IOException, InterruptedException {
91 return this.curReader.getCurrentValue();
92 }
93
94 @Override
95 public void close() throws IOException {
96 if (this.curReader != null) {
97 this.curReader.close();
98 this.curReader = null;
99 }
100 }
101
102 @Override
103 public float getProgress() throws IOException, InterruptedException {
104 long subprogress = 0L;
105 if (null != this.curReader) {
106 subprogress = (long) (this.curReader.getProgress()
107 * this.hCatSplit.get(this.subIndex - 1).getLength());
108 }
109 // Indicate the total processed count.
110 return Math.min(1.0F, (this.progress + subprogress)
111 / (float) this.hCatSplit.getLength());
112 }
113
114 protected boolean initNextRecordReader() throws IOException {
115 if (this.curReader != null) {
116 // close current record reader if open
117 this.curReader.close();
118 this.curReader = null;
119 if (this.subIndex > 0) {
120 this.progress +=
121 this.hCatSplit.get(this.subIndex - 1).getLength();
122 }
123 LOG.debug("Closed current reader. Current progress = " + progress);
124 }
125
126 if (this.subIndex == this.hCatSplit.length()) {
127 LOG.debug("Done with all splits");
128 return false;
129 }
130
131 try {
132 // get a record reader for the subsplit-index chunk
133
134 this.curReader = this.hCatExportFormat.createHCatRecordReader(
135 this.hCatSplit.get(this.subIndex), this.context);
136
137 LOG.debug("Created a HCatRecordReader for split " + subIndex);
138 // initialize() for the first RecordReader will be called by MapTask;
139 // we're responsible for initializing subsequent RecordReaders.
140 if (this.subIndex > 0) {
141 this.curReader.initialize(this.hCatSplit.get(this.subIndex),
142 this.context);
143 LOG.info("Initialized reader with current split");
144 }
145 } catch (Exception e) {
146 throw new IOException("Error initializing HCat record reader", e);
147 }
148 LOG.debug("Created record reader for subsplit " + subIndex);
149 ++this.subIndex;
150 return true;
151 }
152 }
153