SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / DataDrivenImportJob.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce;
20
21 import java.io.IOException;
22 import java.sql.SQLException;
23 import org.apache.avro.Schema;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.io.LongWritable;
27 import org.apache.hadoop.io.NullWritable;
28 import org.apache.hadoop.io.Text;
29 import org.apache.hadoop.mapreduce.InputFormat;
30 import org.apache.hadoop.mapreduce.Job;
31 import org.apache.hadoop.mapreduce.Mapper;
32 import org.apache.hadoop.mapreduce.OutputFormat;
33 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
34 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
35 import com.cloudera.sqoop.SqoopOptions;
36 import com.cloudera.sqoop.config.ConfigurationHelper;
37 import com.cloudera.sqoop.lib.LargeObjectLoader;
38 import com.cloudera.sqoop.manager.ConnManager;
39 import com.cloudera.sqoop.manager.ImportJobContext;
40 import com.cloudera.sqoop.mapreduce.ImportJobBase;
41 import com.cloudera.sqoop.mapreduce.db.DBConfiguration;
42 import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
43 import com.cloudera.sqoop.orm.AvroSchemaGenerator;
44
45 /**
46 * Actually runs a jdbc import job using the ORM files generated by the
47 * sqoop.orm package. Uses DataDrivenDBInputFormat.
48 */
49 public class DataDrivenImportJob extends ImportJobBase {
50
51 public static final Log LOG = LogFactory.getLog(
52 DataDrivenImportJob.class.getName());
53
54 @SuppressWarnings("unchecked")
55 public DataDrivenImportJob(final SqoopOptions opts) {
56 super(opts, null, DataDrivenDBInputFormat.class, null, null);
57 }
58
59 public DataDrivenImportJob(final SqoopOptions opts,
60 final Class<? extends InputFormat> inputFormatClass,
61 ImportJobContext context) {
62 super(opts, null, inputFormatClass, null, context);
63 }
64
65 @Override
66 protected void configureMapper(Job job, String tableName,
67 String tableClassName) throws IOException {
68 if (isHCatJob) {
69 LOG.info("Configuring mapper for HCatalog import job");
70 job.setOutputKeyClass(LongWritable.class);
71 job.setOutputValueClass(SqoopHCatUtilities.getImportValueClass());
72 job.setMapperClass(SqoopHCatUtilities.getImportMapperClass());
73 return;
74 }
75 if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
76 // For text files, specify these as the output types; for
77 // other types, we just use the defaults.
78 job.setOutputKeyClass(Text.class);
79 job.setOutputValueClass(NullWritable.class);
80 } else if (options.getFileLayout()
81 == SqoopOptions.FileLayout.AvroDataFile) {
82 ConnManager connManager = getContext().getConnManager();
83 AvroSchemaGenerator generator = new AvroSchemaGenerator(options,
84 connManager, tableName);
85 Schema schema = generator.generate();
86 AvroJob.setMapOutputSchema(job.getConfiguration(), schema);
87 }
88
89 job.setMapperClass(getMapperClass());
90 }
91
92 @Override
93 protected Class<? extends Mapper> getMapperClass() {
94 if (options.getHCatTableName() != null) {
95 return SqoopHCatUtilities.getImportMapperClass();
96 }
97 if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
98 return TextImportMapper.class;
99 } else if (options.getFileLayout()
100 == SqoopOptions.FileLayout.SequenceFile) {
101 return SequenceFileImportMapper.class;
102 } else if (options.getFileLayout()
103 == SqoopOptions.FileLayout.AvroDataFile) {
104 return AvroImportMapper.class;
105 }
106
107 return null;
108 }
109
110 @Override
111 protected Class<? extends OutputFormat> getOutputFormatClass()
112 throws ClassNotFoundException {
113 if (isHCatJob) {
114 LOG.debug("Returning HCatOutputFormat for output format");
115 return SqoopHCatUtilities.getOutputFormatClass();
116 }
117 if (options.getFileLayout() == SqoopOptions.FileLayout.TextFile) {
118 return RawKeyTextOutputFormat.class;
119 } else if (options.getFileLayout()
120 == SqoopOptions.FileLayout.SequenceFile) {
121 return SequenceFileOutputFormat.class;
122 } else if (options.getFileLayout()
123 == SqoopOptions.FileLayout.AvroDataFile) {
124 return AvroOutputFormat.class;
125 }
126
127 return null;
128 }
129
130 /**
131 * Build the boundary query for the column of the result set created by
132 * the given query.
133 * @param col column name whose boundaries we're interested in.
134 * @param query sub-query used to create the result set.
135 * @return input boundary query as a string
136 */
137 private String buildBoundaryQuery(String col, String query) {
138 if (col == null || options.getNumMappers() == 1) {
139 return "";
140 }
141
142 // Replace table name with alias 't1' if column name is a fully
143 // qualified name. This is needed because "tableName"."columnName"
144 // in the input boundary query causes a SQL syntax error in most dbs
145 // including Oracle and MySQL.
146 String alias = "t1";
147 int dot = col.lastIndexOf('.');
148 String qualifiedName = (dot == -1) ? col : alias + col.substring(dot);
149
150 ConnManager mgr = getContext().getConnManager();
151 String ret = mgr.getInputBoundsQuery(qualifiedName, query);
152 if (ret != null) {
153 return ret;
154 }
155
156 return "SELECT MIN(" + qualifiedName + "), MAX(" + qualifiedName + ") "
157 + "FROM (" + query + ") AS " + alias;
158 }
159
160 @Override
161 protected void configureInputFormat(Job job, String tableName,
162 String tableClassName, String splitByCol) throws IOException {
163 ConnManager mgr = getContext().getConnManager();
164 try {
165 String username = options.getUsername();
166 if (null == username || username.length() == 0) {
167 DBConfiguration.configureDB(job.getConfiguration(),
168 mgr.getDriverClass(), options.getConnectString(),
169 options.getFetchSize(), options.getConnectionParams());
170 } else {
171 DBConfiguration.configureDB(job.getConfiguration(),
172 mgr.getDriverClass(), options.getConnectString(),
173 username, options.getPassword(), options.getFetchSize(),
174 options.getConnectionParams());
175 }
176
177 if (null != tableName) {
178 // Import a table.
179 String [] colNames = options.getColumns();
180 if (null == colNames) {
181 colNames = mgr.getColumnNames(tableName);
182 }
183
184 String [] sqlColNames = null;
185 if (null != colNames) {
186 sqlColNames = new String[colNames.length];
187 for (int i = 0; i < colNames.length; i++) {
188 sqlColNames[i] = mgr.escapeColName(colNames[i]);
189 }
190 }
191
192 // It's ok if the where clause is null in DBInputFormat.setInput.
193 String whereClause = options.getWhereClause();
194
195 // We can't set the class properly in here, because we may not have the
196 // jar loaded in this JVM. So we start by calling setInput() with
197 // DBWritable and then overriding the string manually.
198 DataDrivenDBInputFormat.setInput(job, DBWritable.class,
199 mgr.escapeTableName(tableName), whereClause,
200 mgr.escapeColName(splitByCol), sqlColNames);
201
202 // If user specified boundary query on the command line propagate it to
203 // the job
204 if (options.getBoundaryQuery() != null) {
205 DataDrivenDBInputFormat.setBoundingQuery(job.getConfiguration(),
206 options.getBoundaryQuery());
207 }
208 } else {
209 // Import a free-form query.
210 String inputQuery = options.getSqlQuery();
211 String sanitizedQuery = inputQuery.replace(
212 DataDrivenDBInputFormat.SUBSTITUTE_TOKEN, " (1 = 1) ");
213
214 String inputBoundingQuery = options.getBoundaryQuery();
215 if (inputBoundingQuery == null) {
216 inputBoundingQuery = buildBoundaryQuery(splitByCol, sanitizedQuery);
217 }
218 DataDrivenDBInputFormat.setInput(job, DBWritable.class,
219 inputQuery, inputBoundingQuery);
220 new DBConfiguration(job.getConfiguration()).setInputOrderBy(
221 splitByCol);
222 }
223
224 LOG.debug("Using table class: " + tableClassName);
225 job.getConfiguration().set(ConfigurationHelper.getDbInputClassProperty(),
226 tableClassName);
227
228 job.getConfiguration().setLong(LargeObjectLoader.MAX_INLINE_LOB_LEN_KEY,
229 options.getInlineLobLimit());
230
231 LOG.debug("Using InputFormat: " + inputFormatClass);
232 job.setInputFormatClass(inputFormatClass);
233 } finally {
234 try {
235 mgr.close();
236 } catch (SQLException sqlE) {
237 LOG.warn("Error closing connection: " + sqlE);
238 }
239 }
240 }
241 }
242