SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / java / org / apache / sqoop / mapreduce / JobBase.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.mapreduce;
20
21 import java.io.File;
22 import java.io.IOException;
23 import java.util.HashSet;
24 import java.util.List;
25 import java.util.Set;
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.mapreduce.InputFormat;
32 import org.apache.hadoop.mapreduce.Job;
33 import org.apache.hadoop.mapreduce.Mapper;
34 import org.apache.hadoop.mapreduce.OutputFormat;
35 import org.apache.hadoop.util.StringUtils;
36 import com.cloudera.sqoop.SqoopOptions;
37 import com.cloudera.sqoop.config.ConfigurationHelper;
38 import com.cloudera.sqoop.manager.ConnManager;
39 import com.cloudera.sqoop.tool.SqoopTool;
40 import com.cloudera.sqoop.util.ClassLoaderStack;
41 import com.cloudera.sqoop.util.Jars;
42
43 /**
44 * Base class for configuring and running a MapReduce job.
45 * Allows dependency injection, etc, for easy customization of import job types.
46 */
47 public class JobBase {
48
49 public static final Log LOG = LogFactory.getLog(JobBase.class.getName());
50
51 protected SqoopOptions options;
52 protected Class<? extends Mapper> mapperClass;
53 protected Class<? extends InputFormat> inputFormatClass;
54 protected Class<? extends OutputFormat> outputFormatClass;
55
56 private Job mrJob;
57
58 private ClassLoader prevClassLoader = null;
59 protected final boolean isHCatJob;
60
61 public static final String PROPERTY_VERBOSE = "sqoop.verbose";
62
63 public JobBase() {
64 this(null);
65 }
66
67 public JobBase(final SqoopOptions opts) {
68 this(opts, null, null, null);
69 }
70
71 public JobBase(final SqoopOptions opts,
72 final Class<? extends Mapper> mapperClass,
73 final Class<? extends InputFormat> inputFormatClass,
74 final Class<? extends OutputFormat> outputFormatClass) {
75
76 this.options = opts;
77 this.mapperClass = mapperClass;
78 this.inputFormatClass = inputFormatClass;
79 this.outputFormatClass = outputFormatClass;
80 isHCatJob = options.getHCatTableName() != null;
81 }
82
83 /**
84 * @return the mapper class to use for the job.
85 */
86 protected Class<? extends Mapper> getMapperClass()
87 throws ClassNotFoundException {
88 return this.mapperClass;
89 }
90
91 /**
92 * @return the inputformat class to use for the job.
93 */
94 protected Class<? extends InputFormat> getInputFormatClass()
95 throws ClassNotFoundException {
96 return this.inputFormatClass;
97 }
98
99 /**
100 * @return the outputformat class to use for the job.
101 */
102 protected Class<? extends OutputFormat> getOutputFormatClass()
103 throws ClassNotFoundException {
104 return this.outputFormatClass;
105 }
106
107 /** Set the OutputFormat class to use for this job. */
108 public void setOutputFormatClass(Class<? extends OutputFormat> cls) {
109 this.outputFormatClass = cls;
110 }
111
112 /** Set the InputFormat class to use for this job. */
113 public void setInputFormatClass(Class<? extends InputFormat> cls) {
114 this.inputFormatClass = cls;
115 }
116
117 /** Set the Mapper class to use for this job. */
118 public void setMapperClass(Class<? extends Mapper> cls) {
119 this.mapperClass = cls;
120 }
121
122 /**
123 * Set the SqoopOptions configuring this job.
124 */
125 public void setOptions(SqoopOptions opts) {
126 this.options = opts;
127 }
128
129 /**
130 * Put jar files required by Sqoop into the DistributedCache.
131 * @param job the Job being submitted.
132 * @param mgr the ConnManager to use.
133 */
134 protected void cacheJars(Job job, ConnManager mgr)
135 throws IOException {
136
137 Configuration conf = job.getConfiguration();
138 FileSystem fs = FileSystem.getLocal(conf);
139 Set<String> localUrls = new HashSet<String>();
140
141 addToCache(Jars.getSqoopJarPath(), fs, localUrls);
142 if (null != mgr) {
143 addToCache(Jars.getDriverClassJar(mgr), fs, localUrls);
144 addToCache(Jars.getJarPathForClass(mgr.getClass()), fs, localUrls);
145 }
146
147 SqoopTool tool = this.options.getActiveSqoopTool();
148 if (null != tool) {
149 // Make sure the jar for the tool itself is on the classpath. (In case
150 // this is a third-party plugin tool.)
151 addToCache(Jars.getJarPathForClass(tool.getClass()), fs, localUrls);
152 List<String> toolDeps = tool.getDependencyJars();
153 if (null != toolDeps) {
154 for (String depFile : toolDeps) {
155 addToCache(depFile, fs, localUrls);
156 }
157 }
158 }
159
160 // If the user specified a particular jar file name,
161
162 // Add anything in $SQOOP_HOME/lib, if this is set.
163 String sqoopHome = System.getenv("SQOOP_HOME");
164 if (null != sqoopHome) {
165 File sqoopHomeFile = new File(sqoopHome);
166 File sqoopLibFile = new File(sqoopHomeFile, "lib");
167 if (sqoopLibFile.exists()) {
168 addDirToCache(sqoopLibFile, fs, localUrls);
169 }
170 } else {
171 LOG.warn("SQOOP_HOME is unset. May not be able to find "
172 + "all job dependencies.");
173 }
174
175 // If we didn't put anything in our set, then there's nothing to cache.
176 if (localUrls.isEmpty()) {
177 return;
178 }
179
180 // Add these to the 'tmpjars' array, which the MR JobSubmitter
181 // will upload to HDFS and put in the DistributedCache libjars.
182 String tmpjars = conf.get("tmpjars");
183 StringBuilder sb = new StringBuilder();
184 if (null != tmpjars) {
185 sb.append(tmpjars);
186 sb.append(",");
187 }
188 sb.append(StringUtils.arrayToString(localUrls.toArray(new String[0])));
189 conf.set("tmpjars", sb.toString());
190 }
191
192 private void addToCache(String file, FileSystem fs, Set<String> localUrls) {
193 if (null == file) {
194 return;
195 }
196
197 Path p = new Path(file);
198 String qualified = p.makeQualified(fs).toString();
199 LOG.debug("Adding to job classpath: " + qualified);
200 localUrls.add(qualified);
201 }
202
203 /**
204 * Add the .jar elements of a directory to the DCache classpath,
205 * nonrecursively.
206 */
207 private void addDirToCache(File dir, FileSystem fs, Set<String> localUrls) {
208 if (null == dir) {
209 return;
210 }
211
212 for (File libfile : dir.listFiles()) {
213 if (libfile.exists() && !libfile.isDirectory()
214 && libfile.getName().endsWith("jar")) {
215 addToCache(libfile.toString(), fs, localUrls);
216 }
217 }
218 }
219
220 /**
221 * If jars must be loaded into the local environment, do so here.
222 */
223 protected void loadJars(Configuration conf, String ormJarFile,
224 String tableClassName) throws IOException {
225
226 boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
227 || "local".equals(conf.get("mapred.job.tracker"));
228 if (isLocal) {
229 // If we're using the LocalJobRunner, then instead of using the compiled
230 // jar file as the job source, we're running in the current thread. Push
231 // on another classloader that loads from that jar in addition to
232 // everything currently on the classpath.
233 this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
234 tableClassName);
235 }
236 }
237
238 /**
239 * If any classloader was invoked by loadJars, free it here.
240 */
241 protected void unloadJars() {
242 if (null != this.prevClassLoader) {
243 // unload the special classloader for this jar.
244 ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
245 }
246 }
247
248 /**
249 * Configure the inputformat to use for the job.
250 */
251 protected void configureInputFormat(Job job, String tableName,
252 String tableClassName, String splitByCol)
253 throws ClassNotFoundException, IOException {
254 //TODO: 'splitByCol' is import-job specific; lift it out of this API.
255 Class<? extends InputFormat> ifClass = getInputFormatClass();
256 LOG.debug("Using InputFormat: " + ifClass);
257 job.setInputFormatClass(ifClass);
258 }
259
260 /**
261 * Configure the output format to use for the job.
262 */
263 protected void configureOutputFormat(Job job, String tableName,
264 String tableClassName) throws ClassNotFoundException, IOException {
265 Class<? extends OutputFormat> ofClass = getOutputFormatClass();
266 LOG.debug("Using OutputFormat: " + ofClass);
267 job.setOutputFormatClass(ofClass);
268 }
269
270 /**
271 * Set the mapper class implementation to use in the job,
272 * as well as any related configuration (e.g., map output types).
273 */
274 protected void configureMapper(Job job, String tableName,
275 String tableClassName) throws ClassNotFoundException, IOException {
276 job.setMapperClass(getMapperClass());
277 }
278
279 /**
280 * Configure the number of map/reduce tasks to use in the job.
281 */
282 protected int configureNumTasks(Job job) throws IOException {
283 int numMapTasks = options.getNumMappers();
284 if (numMapTasks < 1) {
285 numMapTasks = SqoopOptions.DEFAULT_NUM_MAPPERS;
286 LOG.warn("Invalid mapper count; using " + numMapTasks + " mappers.");
287 }
288
289 ConfigurationHelper.setJobNumMaps(job, numMapTasks);
290 job.setNumReduceTasks(0);
291 return numMapTasks;
292 }
293
294 /** Set the main job that will be run. */
295 protected void setJob(Job job) {
296 mrJob = job;
297 }
298
299 /**
300 * @return the main MapReduce job that is being run, or null if no
301 * job has started.
302 */
303 public Job getJob() {
304 return mrJob;
305 }
306
307 /**
308 * Actually run the MapReduce job.
309 */
310 protected boolean runJob(Job job) throws ClassNotFoundException, IOException,
311 InterruptedException {
312 return job.waitForCompletion(true);
313 }
314
315 /**
316 * Display a notice on the log that the current MapReduce job has
317 * been retired, and thus Counters are unavailable.
318 * @param log the Log to display the info to.
319 */
320 protected void displayRetiredJobNotice(Log log) {
321 log.info("The MapReduce job has already been retired. Performance");
322 log.info("counters are unavailable. To get this information, ");
323 log.info("you will need to enable the completed job store on ");
324 log.info("the jobtracker with:");
325 log.info("mapreduce.jobtracker.persist.jobstatus.active = true");
326 log.info("mapreduce.jobtracker.persist.jobstatus.hours = 1");
327 log.info("A jobtracker restart is required for these settings");
328 log.info("to take effect.");
329 }
330
331 /**
332 * Save interesting options to constructed job. Goal here is to propagate some
333 * of them to the job itself, so that they can be easily accessed. We're
334 * propagating only interesting global options (like verbose flag).
335 *
336 * @param job Destination job to save options
337 */
338 protected void propagateOptionsToJob(Job job) {
339 Configuration configuration = job.getConfiguration();
340
341 // So far, propagate only verbose flag
342 configuration.setBoolean(PROPERTY_VERBOSE, options.getVerbose());
343 }
344 }