e9920058858653bec7407bf7992eb6445401e813
[sqoop.git] / src / java / org / apache / sqoop / tool / ImportTool.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.tool;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.ResultSet;
24 import java.sql.ResultSetMetaData;
25 import java.sql.SQLException;
26 import java.sql.Statement;
27 import java.sql.Types;
28 import java.util.List;
29 import java.util.Map;
30
31 import org.apache.commons.cli.CommandLine;
32 import org.apache.commons.cli.OptionBuilder;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.util.StringUtils;
39 import org.apache.sqoop.avro.AvroSchemaMismatchException;
40
41 import org.apache.sqoop.SqoopOptions;
42 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
43 import org.apache.sqoop.cli.RelatedOptions;
44 import org.apache.sqoop.cli.ToolOptions;
45 import org.apache.sqoop.hive.HiveImport;
46 import org.apache.sqoop.manager.ImportJobContext;
47 import org.apache.sqoop.mapreduce.MergeJob;
48 import org.apache.sqoop.metastore.JobData;
49 import org.apache.sqoop.metastore.JobStorage;
50 import org.apache.sqoop.metastore.JobStorageFactory;
51 import org.apache.sqoop.orm.ClassWriter;
52 import org.apache.sqoop.orm.TableClassName;
53 import org.apache.sqoop.util.AppendUtils;
54 import org.apache.sqoop.util.ClassLoaderStack;
55 import org.apache.sqoop.util.ImportException;
56
57 import static org.apache.sqoop.manager.SupportedManagers.MYSQL;
58
59 /**
60 * Tool that performs database imports to HDFS.
61 */
62 public class ImportTool extends BaseSqoopTool {
63
64 public static final Log LOG = LogFactory.getLog(ImportTool.class.getName());
65
66 private static final String IMPORT_FAILED_ERROR_MSG = "Import failed: ";
67
68 private CodeGenTool codeGenerator;
69
70 // true if this is an all-tables import. Set by a subclass which
71 // overrides the run() method of this tool (which can only do
72 // a single table).
73 private boolean allTables;
74
75 // store check column type for incremental option
76 private int checkColumnType;
77
78 // Set classloader for local job runner
79 private ClassLoader prevClassLoader = null;
80
81 public ImportTool() {
82 this("import", false);
83 }
84
85 public ImportTool(String toolName, boolean allTables) {
86 this(toolName, new CodeGenTool(), allTables);
87 }
88
89 public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables) {
90 super(toolName);
91 this.codeGenerator = codeGenerator;
92 this.allTables = allTables;
93 }
94
95 @Override
96 protected boolean init(SqoopOptions sqoopOpts) {
97 boolean ret = super.init(sqoopOpts);
98 codeGenerator.setManager(manager);
99 return ret;
100 }
101
102 /**
103 * @return a list of jar files generated as part of this import process
104 */
105 public List<String> getGeneratedJarFiles() {
106 return this.codeGenerator.getGeneratedJarFiles();
107 }
108
109 /**
110 * If jars must be loaded into the local environment, do so here.
111 */
112 private void loadJars(Configuration conf, String ormJarFile,
113 String tableClassName) throws IOException {
114
115 boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
116 || "local".equals(conf.get("mapred.job.tracker"));
117 if (isLocal) {
118 // If we're using the LocalJobRunner, then instead of using the compiled
119 // jar file as the job source, we're running in the current thread. Push
120 // on another classloader that loads from that jar in addition to
121 // everything currently on the classpath.
122 this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
123 tableClassName);
124 }
125 }
126
127 /**
128 * If any classloader was invoked by loadJars, free it here.
129 */
130 private void unloadJars() {
131 if (null != this.prevClassLoader) {
132 // unload the special classloader for this jar.
133 ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
134 }
135 }
136
137 /**
138 * @return true if the supplied options specify an incremental import.
139 */
140 private boolean isIncremental(SqoopOptions options) {
141 return !options.getIncrementalMode().equals(
142 SqoopOptions.IncrementalMode.None);
143 }
144
145 /**
146 * If this is an incremental import, then we should save the
147 * user's state back to the metastore (if this job was run
148 * from the metastore). Otherwise, log to the user what data
149 * they need to supply next time.
150 */
151 private void saveIncrementalState(SqoopOptions options)
152 throws IOException {
153 if (!isIncremental(options)) {
154 return;
155 }
156
157 Map<String, String> descriptor = options.getStorageDescriptor();
158 String jobName = options.getJobName();
159
160 if (null != jobName && null != descriptor) {
161 // Actually save it back to the metastore.
162 LOG.info("Saving incremental import state to the metastore");
163 JobStorageFactory ssf = new JobStorageFactory(options.getConf());
164 JobStorage storage = ssf.getJobStorage(descriptor);
165 storage.open(descriptor);
166 try {
167 // Save the 'parent' SqoopOptions; this does not contain the mutations
168 // to the SqoopOptions state that occurred over the course of this
169 // execution, except for the one we specifically want to memorize:
170 // the latest value of the check column.
171 JobData data = new JobData(options.getParent(), this);
172 storage.update(jobName, data);
173 LOG.info("Updated data for job: " + jobName);
174 } finally {
175 storage.close();
176 }
177 } else {
178 // If there wasn't a parent SqoopOptions, then the incremental
179 // state data was stored in the current SqoopOptions.
180 LOG.info("Incremental import complete! To run another incremental "
181 + "import of all data following this import, supply the "
182 + "following arguments:");
183 SqoopOptions.IncrementalMode incrementalMode =
184 options.getIncrementalMode();
185 switch (incrementalMode) {
186 case AppendRows:
187 LOG.info(" --incremental append");
188 break;
189 case DateLastModified:
190 LOG.info(" --incremental lastmodified");
191 break;
192 default:
193 LOG.warn("Undefined incremental mode: " + incrementalMode);
194 break;
195 }
196 LOG.info(" --check-column " + options.getIncrementalTestColumn());
197 LOG.info(" --last-value " + options.getIncrementalLastValue());
198 LOG.info("(Consider saving this with 'sqoop job --create')");
199 }
200 }
201
202 /**
203 * Return the max value in the incremental-import test column. This
204 * value must be numeric.
205 */
206 private Object getMaxColumnId(SqoopOptions options) throws SQLException {
207 StringBuilder sb = new StringBuilder();
208 String query;
209
210 sb.append("SELECT MAX(");
211 sb.append(manager.escapeColName(options.getIncrementalTestColumn()));
212 sb.append(") FROM ");
213
214 if (options.getTableName() != null) {
215 // Table import
216 sb.append(manager.escapeTableName(options.getTableName()));
217
218 String where = options.getWhereClause();
219 if (null != where) {
220 sb.append(" WHERE ");
221 sb.append(where);
222 }
223 query = sb.toString();
224 } else {
225 // Free form table based import
226 sb.append("(");
227 sb.append(options.getSqlQuery());
228 sb.append(") sqoop_import_query_alias");
229
230 query = sb.toString().replaceAll("\\$CONDITIONS", "(1 = 1)");
231 }
232
233 Connection conn = manager.getConnection();
234 Statement s = null;
235 ResultSet rs = null;
236 try {
237 LOG.info("Maximal id query for free form incremental import: " + query);
238 s = conn.createStatement();
239 rs = s.executeQuery(query);
240 if (!rs.next()) {
241 // This probably means the table is empty.
242 LOG.warn("Unexpected: empty results for max value query?");
243 return null;
244 }
245
246 ResultSetMetaData rsmd = rs.getMetaData();
247 checkColumnType = rsmd.getColumnType(1);
248 if (checkColumnType == Types.TIMESTAMP) {
249 return rs.getTimestamp(1);
250 } else if (checkColumnType == Types.DATE) {
251 return rs.getDate(1);
252 } else if (checkColumnType == Types.TIME) {
253 return rs.getTime(1);
254 } else {
255 return rs.getObject(1);
256 }
257 } finally {
258 try {
259 if (null != rs) {
260 rs.close();
261 }
262 } catch (SQLException sqlE) {
263 LOG.warn("SQL Exception closing resultset: " + sqlE);
264 }
265
266 try {
267 if (null != s) {
268 s.close();
269 }
270 } catch (SQLException sqlE) {
271 LOG.warn("SQL Exception closing statement: " + sqlE);
272 }
273 }
274 }
275
276 /**
277 * Determine if a column is date/time.
278 * @return true if column type is TIMESTAMP, DATE, or TIME.
279 */
280 private boolean isDateTimeColumn(int columnType) {
281 return (columnType == Types.TIMESTAMP)
282 || (columnType == Types.DATE)
283 || (columnType == Types.TIME);
284 }
285
286 /**
287 * Initialize the constraints which set the incremental import range.
288 * @return false if an import is not necessary, because the dataset has not
289 * changed.
290 */
291 private boolean initIncrementalConstraints(SqoopOptions options,
292 ImportJobContext context) throws ImportException, IOException {
293
294 // If this is an incremental import, determine the constraints
295 // to inject in the WHERE clause or $CONDITIONS for a query.
296 // Also modify the 'last value' field of the SqoopOptions to
297 // specify the current job start time / start row.
298
299 if (!isIncremental(options)) {
300 return true;
301 }
302
303 SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
304 String nextIncrementalValue = null;
305
306 Object nextVal;
307 switch (incrementalMode) {
308 case AppendRows:
309 try {
310 nextVal = getMaxColumnId(options);
311 if (isDateTimeColumn(checkColumnType)) {
312 nextIncrementalValue = (nextVal == null) ? null
313 : manager.datetimeToQueryString(nextVal.toString(),
314 checkColumnType);
315 } else if (manager.isCharColumn(checkColumnType)) {
316 throw new ImportException("Character column "
317 + "(" + options.getIncrementalTestColumn() + ") can not be used "
318 + "to determine which rows to incrementally import.");
319 } else {
320 nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
321 }
322 } catch (SQLException sqlE) {
323 throw new IOException(sqlE);
324 }
325 break;
326 case DateLastModified:
327 if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
328 Path outputPath = getOutputPath(options, context.getTableName(), false);
329 FileSystem fs = outputPath.getFileSystem(options.getConf());
330 if (fs.exists(outputPath)) {
331 throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
332 + " is required when using --" + this.INCREMENT_TYPE_ARG
333 + " lastmodified and the output directory exists.");
334 }
335 }
336 checkColumnType = manager.getColumnTypes(options.getTableName(),
337 options.getSqlQuery()).get(options.getIncrementalTestColumn());
338 nextVal = manager.getCurrentDbTimestamp();
339 if (null == nextVal) {
340 throw new IOException("Could not get current time from database");
341 }
342 nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
343 checkColumnType);
344 break;
345 default:
346 throw new ImportException("Undefined incremental import type: "
347 + incrementalMode);
348 }
349
350 // Build the WHERE clause components that are used to import
351 // only this incremental section.
352 StringBuilder sb = new StringBuilder();
353 String prevEndpoint = options.getIncrementalLastValue();
354
355 if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
356 && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
357 // Incremental imports based on date/time should be 'quoted' in
358 // ANSI SQL. If the user didn't specify single-quotes, put them
359 // around, here.
360 prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
361 checkColumnType);
362 }
363
364 String checkColName = manager.escapeColName(
365 options.getIncrementalTestColumn());
366 LOG.info("Incremental import based on column " + checkColName);
367 if (null != prevEndpoint) {
368 if (prevEndpoint.equals(nextIncrementalValue)) {
369 LOG.info("No new rows detected since last import.");
370 return false;
371 }
372 LOG.info("Lower bound value: " + prevEndpoint);
373 sb.append(checkColName);
374 switch (incrementalMode) {
375 case AppendRows:
376 sb.append(" > ");
377 break;
378 case DateLastModified:
379 sb.append(" >= ");
380 break;
381 default:
382 throw new ImportException("Undefined comparison");
383 }
384 sb.append(prevEndpoint);
385 sb.append(" AND ");
386 }
387
388 if (null != nextIncrementalValue) {
389 sb.append(checkColName);
390 switch (incrementalMode) {
391 case AppendRows:
392 sb.append(" <= ");
393 break;
394 case DateLastModified:
395 sb.append(" < ");
396 break;
397 default:
398 throw new ImportException("Undefined comparison");
399 }
400 sb.append(nextIncrementalValue);
401 } else {
402 sb.append(checkColName);
403 sb.append(" IS NULL ");
404 }
405
406 LOG.info("Upper bound value: " + nextIncrementalValue);
407
408 if (options.getTableName() != null) {
409 // Table based import
410 String prevWhereClause = options.getWhereClause();
411 if (null != prevWhereClause) {
412 sb.append(" AND (");
413 sb.append(prevWhereClause);
414 sb.append(")");
415 }
416
417 String newConstraints = sb.toString();
418 options.setWhereClause(newConstraints);
419 } else {
420 // Incremental based import
421 sb.append(" AND $CONDITIONS");
422 String newQuery = options.getSqlQuery().replace(
423 "$CONDITIONS", sb.toString());
424 options.setSqlQuery(newQuery);
425 }
426 // Save this state for next time.
427 SqoopOptions recordOptions = options.getParent();
428 if (null == recordOptions) {
429 recordOptions = options;
430 }
431 recordOptions.setIncrementalLastValue(
432 (nextVal == null) ? null : nextVal.toString());
433
434 return true;
435 }
436
437 /**
438 * Merge HDFS output directories
439 */
440 protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException {
441 if (context.getDestination() == null) {
442 return;
443 }
444
445 Path userDestDir = getOutputPath(options, context.getTableName(), false);
446 FileSystem fs = userDestDir.getFileSystem(options.getConf());
447 if (fs.exists(context.getDestination())) {
448 LOG.info("Final destination exists, will run merge job.");
449 if (fs.exists(userDestDir)) {
450 String tableClassName = null;
451 if (!context.getConnManager().isORMFacilitySelfManaged()) {
452 tableClassName =
453 new TableClassName(options).getClassForTable(context.getTableName());
454 }
455 Path destDir = getOutputPath(options, context.getTableName());
456 options.setExistingJarName(context.getJarFile());
457 options.setClassName(tableClassName);
458 options.setMergeOldPath(userDestDir.toString());
459 options.setMergeNewPath(context.getDestination().toString());
460 // Merge to temporary directory so that original directory remains intact.
461 options.setTargetDir(destDir.toString());
462
463 // Local job tracker needs jars in the classpath.
464 if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
465 loadJars(options.getConf(), context.getJarFile(), ClassWriter.toJavaIdentifier("codegen_" +
466 context.getTableName()));
467 } else {
468 loadJars(options.getConf(), context.getJarFile(), context.getTableName());
469 }
470
471 MergeJob mergeJob = new MergeJob(options);
472 if (mergeJob.runMergeJob()) {
473 // Rename destination directory to proper location.
474 Path tmpDir = getOutputPath(options, context.getTableName());
475 fs.rename(userDestDir, tmpDir);
476 fs.rename(destDir, userDestDir);
477 fs.delete(tmpDir, true);
478 } else {
479 LOG.error("Merge MapReduce job failed!");
480 }
481
482 unloadJars();
483 } else {
484 // Create parent directory(ies), otherwise fs.rename would fail
485 if(!fs.exists(userDestDir.getParent())) {
486 fs.mkdirs(userDestDir.getParent());
487 }
488
489 // And finally move the data
490 LOG.info("Moving data from temporary directory " + context.getDestination() + " to final destination " + userDestDir);
491 if(!fs.rename(context.getDestination(), userDestDir)) {
492 throw new RuntimeException("Couldn't move data from temporary directory " + context.getDestination() + " to final destination " + userDestDir);
493 }
494 }
495 }
496 }
497
498 /**
499 * Import a table or query.
500 * @return true if an import was performed, false otherwise.
501 */
502 protected boolean importTable(SqoopOptions options, String tableName,
503 HiveImport hiveImport) throws IOException, ImportException {
504 String jarFile = null;
505
506 // Generate the ORM code for the tables.
507 jarFile = codeGenerator.generateORM(options, tableName);
508
509 Path outputPath = getOutputPath(options, tableName);
510
511 // Do the actual import.
512 ImportJobContext context = new ImportJobContext(tableName, jarFile,
513 options, outputPath);
514
515 // If we're doing an incremental import, set up the
516 // filtering conditions used to get the latest records.
517 if (!initIncrementalConstraints(options, context)) {
518 return false;
519 }
520
521 if (options.isDeleteMode()) {
522 deleteTargetDir(context);
523 }
524
525 if (null != tableName) {
526 manager.importTable(context);
527 } else {
528 manager.importQuery(context);
529 }
530
531 if (options.isAppendMode()) {
532 AppendUtils app = new AppendUtils(context);
533 app.append();
534 } else if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified) {
535 lastModifiedMerge(options, context);
536 }
537
538 // If the user wants this table to be in Hive, perform that post-load.
539 if (options.doHiveImport()) {
540 // For Parquet file, the import action will create hive table directly via
541 // kite. So there is no need to do hive import as a post step again.
542 if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
543 hiveImport.importTable(tableName, options.getHiveTableName(), false);
544 }
545 }
546
547 saveIncrementalState(options);
548
549 return true;
550 }
551
552 private void deleteTargetDir(ImportJobContext context) throws IOException {
553
554 SqoopOptions options = context.getOptions();
555 Path destDir = context.getDestination();
556 FileSystem fs = destDir.getFileSystem(options.getConf());
557
558 if (fs.exists(destDir)) {
559 fs.delete(destDir, true);
560 LOG.info("Destination directory " + destDir + " deleted.");
561 return;
562 } else {
563 LOG.info("Destination directory " + destDir + " is not present, "
564 + "hence not deleting.");
565 }
566 }
567
568 /**
569 * @return the output path for the imported files;
570 * in append mode this will point to a temporary folder.
571 * if importing to hbase, this may return null.
572 */
573 private Path getOutputPath(SqoopOptions options, String tableName) {
574 return getOutputPath(options, tableName, options.isAppendMode()
575 || options.getIncrementalMode().equals(SqoopOptions.IncrementalMode.DateLastModified));
576 }
577
578 /**
579 * @return the output path for the imported files;
580 * if importing to hbase, this may return null.
581 */
582 private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) {
583 // Get output directory
584 String hdfsWarehouseDir = options.getWarehouseDir();
585 String hdfsTargetDir = options.getTargetDir();
586 Path outputPath = null;
587 if (temp) {
588 // Use temporary path, later removed when appending
589 String salt = tableName;
590 if(salt == null && options.getSqlQuery() != null) {
591 salt = Integer.toHexString(options.getSqlQuery().hashCode());
592 }
593 outputPath = AppendUtils.getTempAppendDir(salt, options);
594 LOG.debug("Using temporary folder: " + outputPath.getName());
595 } else {
596 // Try in this order: target-dir or warehouse-dir
597 if (hdfsTargetDir != null) {
598 outputPath = new Path(hdfsTargetDir);
599 } else if (hdfsWarehouseDir != null) {
600 outputPath = new Path(hdfsWarehouseDir, tableName);
601 } else if (null != tableName) {
602 outputPath = new Path(tableName);
603 }
604 }
605
606 return outputPath;
607 }
608
609 @Override
610 /** {@inheritDoc} */
611 public int run(SqoopOptions options) {
612 HiveImport hiveImport = null;
613
614 if (allTables) {
615 // We got into this method, but we should be in a subclass.
616 // (This method only handles a single table)
617 // This should not be reached, but for sanity's sake, test here.
618 LOG.error("ImportTool.run() can only handle a single table.");
619 return 1;
620 }
621
622 if (!init(options)) {
623 return 1;
624 }
625
626 codeGenerator.setManager(manager);
627
628 try {
629 if (options.doHiveImport()) {
630 hiveImport = new HiveImport(options, manager, options.getConf(), false);
631 }
632
633 // Import a single table (or query) the user specified.
634 importTable(options, options.getTableName(), hiveImport);
635 } catch (IllegalArgumentException iea) {
636 LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
637 rethrowIfRequired(options, iea);
638 return 1;
639 } catch (IOException ioe) {
640 LOG.error(IMPORT_FAILED_ERROR_MSG + StringUtils.stringifyException(ioe));
641 rethrowIfRequired(options, ioe);
642 return 1;
643 } catch (ImportException ie) {
644 LOG.error(IMPORT_FAILED_ERROR_MSG + ie.toString());
645 rethrowIfRequired(options, ie);
646 return 1;
647 } catch (AvroSchemaMismatchException e) {
648 LOG.error(IMPORT_FAILED_ERROR_MSG, e);
649 rethrowIfRequired(options, e);
650 return 1;
651 } finally {
652 destroy(options);
653 }
654
655 return 0;
656 }
657
658 /**
659 * Construct the set of options that control imports, either of one
660 * table or a batch of tables.
661 * @return the RelatedOptions that can be used to parse the import
662 * arguments.
663 */
664 @SuppressWarnings("static-access")
665 protected RelatedOptions getImportOptions() {
666 // Imports
667 RelatedOptions importOpts = new RelatedOptions("Import control arguments");
668
669 importOpts.addOption(OptionBuilder
670 .withDescription("Use direct import fast path")
671 .withLongOpt(DIRECT_ARG)
672 .create());
673
674 if (!allTables) {
675 importOpts.addOption(OptionBuilder.withArgName("table-name")
676 .hasArg().withDescription("Table to read")
677 .withLongOpt(TABLE_ARG)
678 .create());
679 importOpts.addOption(OptionBuilder.withArgName("col,col,col...")
680 .hasArg().withDescription("Columns to import from table")
681 .withLongOpt(COLUMNS_ARG)
682 .create());
683 importOpts.addOption(OptionBuilder.withArgName("column-name")
684 .hasArg()
685 .withDescription("Column of the table used to split work units")
686 .withLongOpt(SPLIT_BY_ARG)
687 .create());
688 importOpts
689 .addOption(OptionBuilder
690 .withArgName("size")
691 .hasArg()
692 .withDescription(
693 "Upper Limit of rows per split for split columns of Date/Time/Timestamp and integer types. For date or timestamp fields it is calculated in seconds. split-limit should be greater than 0")
694 .withLongOpt(SPLIT_LIMIT_ARG)
695 .create());
696 importOpts.addOption(OptionBuilder.withArgName("where clause")
697 .hasArg().withDescription("WHERE clause to use during import")
698 .withLongOpt(WHERE_ARG)
699 .create());
700 importOpts.addOption(OptionBuilder
701 .withDescription("Imports data in append mode")
702 .withLongOpt(APPEND_ARG)
703 .create());
704 importOpts.addOption(OptionBuilder
705 .withDescription("Imports data in delete mode")
706 .withLongOpt(DELETE_ARG)
707 .create());
708 importOpts.addOption(OptionBuilder.withArgName("dir")
709 .hasArg().withDescription("HDFS plain table destination")
710 .withLongOpt(TARGET_DIR_ARG)
711 .create());
712 importOpts.addOption(OptionBuilder.withArgName("statement")
713 .hasArg()
714 .withDescription("Import results of SQL 'statement'")
715 .withLongOpt(SQL_QUERY_ARG)
716 .create(SQL_QUERY_SHORT_ARG));
717 importOpts.addOption(OptionBuilder.withArgName("statement")
718 .hasArg()
719 .withDescription("Set boundary query for retrieving max and min"
720 + " value of the primary key")
721 .withLongOpt(SQL_QUERY_BOUNDARY)
722 .create());
723 importOpts.addOption(OptionBuilder.withArgName("column")
724 .hasArg().withDescription("Key column to use to join results")
725 .withLongOpt(MERGE_KEY_ARG)
726 .create());
727
728 addValidationOpts(importOpts);
729 }
730
731 importOpts.addOption(OptionBuilder.withArgName("dir")
732 .hasArg().withDescription("HDFS parent for table destination")
733 .withLongOpt(WAREHOUSE_DIR_ARG)
734 .create());
735 importOpts.addOption(OptionBuilder
736 .withDescription("Imports data to SequenceFiles")
737 .withLongOpt(FMT_SEQUENCEFILE_ARG)
738 .create());
739 importOpts.addOption(OptionBuilder
740 .withDescription("Imports data as plain text (default)")
741 .withLongOpt(FMT_TEXTFILE_ARG)
742 .create());
743 importOpts.addOption(OptionBuilder
744 .withDescription("Imports data to Avro data files")
745 .withLongOpt(FMT_AVRODATAFILE_ARG)
746 .create());
747 importOpts.addOption(OptionBuilder
748 .withDescription("Imports data to Parquet files")
749 .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
750 .create());
751 importOpts.addOption(OptionBuilder.withArgName("n")
752 .hasArg().withDescription("Use 'n' map tasks to import in parallel")
753 .withLongOpt(NUM_MAPPERS_ARG)
754 .create(NUM_MAPPERS_SHORT_ARG));
755 importOpts.addOption(OptionBuilder.withArgName("name")
756 .hasArg().withDescription("Set name for generated mapreduce job")
757 .withLongOpt(MAPREDUCE_JOB_NAME)
758 .create());
759 importOpts.addOption(OptionBuilder
760 .withDescription("Enable compression")
761 .withLongOpt(COMPRESS_ARG)
762 .create(COMPRESS_SHORT_ARG));
763 importOpts.addOption(OptionBuilder.withArgName("codec")
764 .hasArg()
765 .withDescription("Compression codec to use for import")
766 .withLongOpt(COMPRESSION_CODEC_ARG)
767 .create());
768 importOpts.addOption(OptionBuilder.withArgName("n")
769 .hasArg()
770 .withDescription("Split the input stream every 'n' bytes "
771 + "when importing in direct mode")
772 .withLongOpt(DIRECT_SPLIT_SIZE_ARG)
773 .create());
774 importOpts.addOption(OptionBuilder.withArgName("n")
775 .hasArg()
776 .withDescription("Set the maximum size for an inline LOB")
777 .withLongOpt(INLINE_LOB_LIMIT_ARG)
778 .create());
779 importOpts.addOption(OptionBuilder.withArgName("n")
780 .hasArg()
781 .withDescription("Set number 'n' of rows to fetch from the "
782 + "database when more rows are needed")
783 .withLongOpt(FETCH_SIZE_ARG)
784 .create());
785 importOpts.addOption(OptionBuilder.withArgName("reset-mappers")
786 .withDescription("Reset the number of mappers to one mapper if no split key available")
787 .withLongOpt(AUTORESET_TO_ONE_MAPPER)
788 .create());
789 return importOpts;
790 }
791
792 /**
793 * Return options for incremental import.
794 */
795 protected RelatedOptions getIncrementalOptions() {
796 RelatedOptions incrementalOpts =
797 new RelatedOptions("Incremental import arguments");
798
799 incrementalOpts.addOption(OptionBuilder.withArgName("import-type")
800 .hasArg()
801 .withDescription(
802 "Define an incremental import of type 'append' or 'lastmodified'")
803 .withLongOpt(INCREMENT_TYPE_ARG)
804 .create());
805 incrementalOpts.addOption(OptionBuilder.withArgName("column")
806 .hasArg()
807 .withDescription("Source column to check for incremental change")
808 .withLongOpt(INCREMENT_COL_ARG)
809 .create());
810 incrementalOpts.addOption(OptionBuilder.withArgName("value")
811 .hasArg()
812 .withDescription("Last imported value in the incremental check column")
813 .withLongOpt(INCREMENT_LAST_VAL_ARG)
814 .create());
815
816 return incrementalOpts;
817 }
818
819 @Override
820 /** Configure the command-line arguments we expect to receive */
821 public void configureOptions(ToolOptions toolOptions) {
822
823 toolOptions.addUniqueOptions(getCommonOptions());
824 toolOptions.addUniqueOptions(getImportOptions());
825 if (!allTables) {
826 toolOptions.addUniqueOptions(getIncrementalOptions());
827 }
828 toolOptions.addUniqueOptions(getOutputFormatOptions());
829 toolOptions.addUniqueOptions(getInputFormatOptions());
830 toolOptions.addUniqueOptions(getHiveOptions(true));
831 toolOptions.addUniqueOptions(getHBaseOptions());
832 toolOptions.addUniqueOptions(getHCatalogOptions());
833 toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
834 toolOptions.addUniqueOptions(getAccumuloOptions());
835
836 // get common codegen opts.
837 RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
838
839 // add import-specific codegen opts:
840 codeGenOpts.addOption(OptionBuilder.withArgName("file")
841 .hasArg()
842 .withDescription("Disable code generation; use specified jar")
843 .withLongOpt(JAR_FILE_NAME_ARG)
844 .create());
845
846 toolOptions.addUniqueOptions(codeGenOpts);
847 }
848
849 @Override
850 /** {@inheritDoc} */
851 public void printHelp(ToolOptions toolOptions) {
852 super.printHelp(toolOptions);
853 System.out.println("");
854 if (allTables) {
855 System.out.println("At minimum, you must specify --connect");
856 } else {
857 System.out.println(
858 "At minimum, you must specify --connect and --table");
859 }
860
861 System.out.println(
862 "Arguments to mysqldump and other subprograms may be supplied");
863 System.out.println(
864 "after a '--' on the command line.");
865 }
866
867 private void applyIncrementalOptions(CommandLine in, SqoopOptions out)
868 throws InvalidOptionsException {
869 if (in.hasOption(INCREMENT_TYPE_ARG)) {
870 String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG);
871 if ("append".equals(incrementalTypeStr)) {
872 out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows);
873 // This argument implies ability to append to the same directory.
874 out.setAppendMode(true);
875 } else if ("lastmodified".equals(incrementalTypeStr)) {
876 out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
877 } else {
878 throw new InvalidOptionsException("Unknown incremental import mode: "
879 + incrementalTypeStr + ". Use 'append' or 'lastmodified'."
880 + HELP_STR);
881 }
882 }
883
884 if (in.hasOption(INCREMENT_COL_ARG)) {
885 out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG));
886 }
887
888 if (in.hasOption(INCREMENT_LAST_VAL_ARG)) {
889 out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG));
890 }
891 }
892
893 @Override
894 /** {@inheritDoc} */
895 public void applyOptions(CommandLine in, SqoopOptions out)
896 throws InvalidOptionsException {
897
898 try {
899 applyCommonOptions(in, out);
900
901 if (in.hasOption(DIRECT_ARG)) {
902 out.setDirectMode(true);
903 }
904
905 if (!allTables) {
906 if (in.hasOption(TABLE_ARG)) {
907 out.setTableName(in.getOptionValue(TABLE_ARG));
908 }
909
910 if (in.hasOption(COLUMNS_ARG)) {
911 String[] cols= in.getOptionValue(COLUMNS_ARG).split(",");
912 for (int i=0; i<cols.length; i++) {
913 cols[i] = cols[i].trim();
914 }
915 out.setColumns(cols);
916 }
917
918 if (in.hasOption(SPLIT_BY_ARG)) {
919 out.setSplitByCol(in.getOptionValue(SPLIT_BY_ARG));
920 }
921
922 if (in.hasOption(SPLIT_LIMIT_ARG)) {
923 out.setSplitLimit(Integer.parseInt(in.getOptionValue(SPLIT_LIMIT_ARG)));
924 }
925
926 if (in.hasOption(WHERE_ARG)) {
927 out.setWhereClause(in.getOptionValue(WHERE_ARG));
928 }
929
930 if (in.hasOption(TARGET_DIR_ARG)) {
931 out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG));
932 }
933
934 if (in.hasOption(APPEND_ARG)) {
935 out.setAppendMode(true);
936 }
937
938 if (in.hasOption(DELETE_ARG)) {
939 out.setDeleteMode(true);
940 }
941
942 if (in.hasOption(SQL_QUERY_ARG)) {
943 out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
944 }
945
946 if (in.hasOption(SQL_QUERY_BOUNDARY)) {
947 out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
948 }
949
950 if (in.hasOption(MERGE_KEY_ARG)) {
951 out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG));
952 }
953
954 applyValidationOptions(in, out);
955 }
956
957 if (in.hasOption(WAREHOUSE_DIR_ARG)) {
958 out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG));
959 }
960
961 if (in.hasOption(FMT_SEQUENCEFILE_ARG)) {
962 out.setFileLayout(SqoopOptions.FileLayout.SequenceFile);
963 }
964
965 if (in.hasOption(FMT_TEXTFILE_ARG)) {
966 out.setFileLayout(SqoopOptions.FileLayout.TextFile);
967 }
968
969 if (in.hasOption(FMT_AVRODATAFILE_ARG)) {
970 out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
971 }
972
973 if (in.hasOption(FMT_PARQUETFILE_ARG)) {
974 out.setFileLayout(SqoopOptions.FileLayout.ParquetFile);
975 }
976
977 if (in.hasOption(NUM_MAPPERS_ARG)) {
978 out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
979 }
980
981 if (in.hasOption(MAPREDUCE_JOB_NAME)) {
982 out.setMapreduceJobName(in.getOptionValue(MAPREDUCE_JOB_NAME));
983 }
984
985 if (in.hasOption(COMPRESS_ARG)) {
986 out.setUseCompression(true);
987 }
988
989 if (in.hasOption(COMPRESSION_CODEC_ARG)) {
990 out.setCompressionCodec(in.getOptionValue(COMPRESSION_CODEC_ARG));
991 }
992
993 if (in.hasOption(DIRECT_SPLIT_SIZE_ARG)) {
994 out.setDirectSplitSize(Long.parseLong(in.getOptionValue(
995 DIRECT_SPLIT_SIZE_ARG)));
996 }
997
998 if (in.hasOption(INLINE_LOB_LIMIT_ARG)) {
999 out.setInlineLobLimit(Long.parseLong(in.getOptionValue(
1000 INLINE_LOB_LIMIT_ARG)));
1001 }
1002
1003 if (in.hasOption(FETCH_SIZE_ARG)) {
1004 out.setFetchSize(new Integer(in.getOptionValue(FETCH_SIZE_ARG)));
1005 }
1006
1007 if (in.hasOption(JAR_FILE_NAME_ARG)) {
1008 out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
1009 }
1010
1011 if (in.hasOption(AUTORESET_TO_ONE_MAPPER)) {
1012 out.setAutoResetToOneMapper(true);
1013 }
1014
1015 if (in.hasOption(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)) {
1016 out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
1017 ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
1018 }
1019
1020 applyIncrementalOptions(in, out);
1021 applyHiveOptions(in, out);
1022 applyOutputFormatOptions(in, out);
1023 applyInputFormatOptions(in, out);
1024 applyCodeGenOptions(in, out, allTables);
1025 applyHBaseOptions(in, out);
1026 applyHCatalogOptions(in, out);
1027 applyAccumuloOptions(in, out);
1028
1029 } catch (NumberFormatException nfe) {
1030 throw new InvalidOptionsException("Error: expected numeric argument.\n"
1031 + "Try --help for usage.");
1032 }
1033 }
1034
1035 /**
1036 * Validate import-specific arguments.
1037 * @param options the configured SqoopOptions to check
1038 */
1039 protected void validateImportOptions(SqoopOptions options)
1040 throws InvalidOptionsException {
1041 if (!allTables && options.getTableName() == null
1042 && options.getSqlQuery() == null) {
1043 throw new InvalidOptionsException(
1044 "--table or --" + SQL_QUERY_ARG + " is required for import. "
1045 + "(Or use sqoop import-all-tables.)"
1046 + HELP_STR);
1047 } else if (options.getExistingJarName() != null
1048 && options.getClassName() == null) {
1049 throw new InvalidOptionsException("Jar specified with --jar-file, but no "
1050 + "class specified with --class-name." + HELP_STR);
1051 } else if (options.getTargetDir() != null
1052 && options.getWarehouseDir() != null) {
1053 throw new InvalidOptionsException(
1054 "--target-dir with --warehouse-dir are incompatible options."
1055 + HELP_STR);
1056 } else if (options.getTableName() != null
1057 && options.getSqlQuery() != null) {
1058 throw new InvalidOptionsException(
1059 "Cannot specify --" + SQL_QUERY_ARG + " and --table together."
1060 + HELP_STR);
1061 } else if (options.getSqlQuery() != null
1062 && options.getTargetDir() == null
1063 && options.getHBaseTable() == null
1064 && options.getHCatTableName() == null
1065 && options.getAccumuloTable() == null) {
1066 throw new InvalidOptionsException(
1067 "Must specify destination with --target-dir. "
1068 + HELP_STR);
1069 } else if (options.getSqlQuery() != null && options.doHiveImport()
1070 && options.getHiveTableName() == null) {
1071 throw new InvalidOptionsException(
1072 "When importing a query to Hive, you must specify --"
1073 + HIVE_TABLE_ARG + "." + HELP_STR);
1074 } else if (options.getSqlQuery() != null && options.getNumMappers() > 1
1075 && options.getSplitByCol() == null) {
1076 throw new InvalidOptionsException(
1077 "When importing query results in parallel, you must specify --"
1078 + SPLIT_BY_ARG + "." + HELP_STR);
1079 } else if (options.isDirect()) {
1080 validateDirectImportOptions(options);
1081 } else if (allTables && options.isValidationEnabled()) {
1082 throw new InvalidOptionsException("Validation is not supported for "
1083 + "all tables but single table only.");
1084 } else if (options.getSqlQuery() != null && options.isValidationEnabled()) {
1085 throw new InvalidOptionsException("Validation is not supported for "
1086 + "free from query but single table only.");
1087 } else if (options.getWhereClause() != null
1088 && options.isValidationEnabled()) {
1089 throw new InvalidOptionsException("Validation is not supported for "
1090 + "where clause but single table only.");
1091 } else if (options.getIncrementalMode()
1092 != SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) {
1093 throw new InvalidOptionsException("Validation is not supported for "
1094 + "incremental imports but single table only.");
1095 } else if ((options.getTargetDir() != null
1096 || options.getWarehouseDir() != null)
1097 && options.getHCatTableName() != null) {
1098 throw new InvalidOptionsException("--hcatalog-table cannot be used "
1099 + " --warehouse-dir or --target-dir options");
1100 } else if (options.isDeleteMode() && options.isAppendMode()) {
1101 throw new InvalidOptionsException("--append and --delete-target-dir can"
1102 + " not be used together.");
1103 } else if (options.isDeleteMode() && options.getIncrementalMode()
1104 != SqoopOptions.IncrementalMode.None) {
1105 throw new InvalidOptionsException("--delete-target-dir can not be used"
1106 + " with incremental imports.");
1107 } else if (options.getAutoResetToOneMapper()
1108 && (options.getSplitByCol() != null)) {
1109 throw new InvalidOptionsException("--autoreset-to-one-mapper and"
1110 + " --split-by cannot be used together.");
1111 }
1112 }
1113
1114 void validateDirectImportOptions(SqoopOptions options) throws InvalidOptionsException {
1115 validateDirectMysqlOptions(options);
1116 validateDirectDropHiveDelimOption(options);
1117 validateHasDirectConnectorOption(options);
1118 }
1119
1120 void validateDirectDropHiveDelimOption(SqoopOptions options) throws InvalidOptionsException {
1121 if (options.doHiveDropDelims()) {
1122 throw new InvalidOptionsException(
1123 "Direct import currently do not support dropping hive delimiters,"
1124 + " please remove parameter --hive-drop-import-delims.");
1125 }
1126 }
1127
1128 void validateDirectMysqlOptions(SqoopOptions options) throws InvalidOptionsException {
1129 if (!MYSQL.isTheManagerTypeOf(options)) {
1130 return;
1131 }
1132 if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
1133 throw new InvalidOptionsException(
1134 "MySQL direct import currently supports only text output format. "
1135 + "Parameters --as-sequencefile --as-avrodatafile and --as-parquetfile are not "
1136 + "supported with --direct params in MySQL case.");
1137 }
1138 if (options.getNullStringValue() != null || options.getNullNonStringValue() != null) {
1139 throw new InvalidOptionsException(
1140 "The --direct option is not compatible with the --null-string or " +
1141 "--null-non-string command for MySQL imports");
1142 }
1143 }
1144 /**
1145 * Validate the incremental import options.
1146 */
1147 private void validateIncrementalOptions(SqoopOptions options)
1148 throws InvalidOptionsException {
1149 if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
1150 && options.getIncrementalTestColumn() == null) {
1151 throw new InvalidOptionsException(
1152 "For an incremental import, the check column must be specified "
1153 + "with --" + INCREMENT_COL_ARG + ". " + HELP_STR);
1154 }
1155
1156 if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None
1157 && options.getIncrementalTestColumn() != null) {
1158 throw new InvalidOptionsException(
1159 "You must specify an incremental import mode with --"
1160 + INCREMENT_TYPE_ARG + ". " + HELP_STR);
1161 }
1162
1163 if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified
1164 && options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
1165 throw new InvalidOptionsException("--"
1166 + INCREMENT_TYPE_ARG + " lastmodified cannot be used in conjunction with --"
1167 + FMT_AVRODATAFILE_ARG + "." + HELP_STR);
1168 }
1169 }
1170
1171 @Override
1172 /** {@inheritDoc} */
1173 public void validateOptions(SqoopOptions options)
1174 throws InvalidOptionsException {
1175
1176 // If extraArguments is full, check for '--' followed by args for
1177 // mysqldump or other commands we rely on.
1178 options.setExtraArgs(getSubcommandArgs(extraArguments));
1179 int dashPos = getDashPosition(extraArguments);
1180
1181 if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
1182 throw new InvalidOptionsException(HELP_STR);
1183 }
1184
1185 validateImportOptions(options);
1186 validateIncrementalOptions(options);
1187 validateCommonOptions(options);
1188 validateCodeGenOptions(options);
1189 validateOutputFormatOptions(options);
1190 validateHBaseOptions(options);
1191 validateHiveOptions(options);
1192 validateHCatalogOptions(options);
1193 validateAccumuloOptions(options);
1194 }
1195 }
1196