66e60bda5511be6695095e845715931f02b2b39f
[sqoop.git] / src / java / com / cloudera / sqoop / tool / ImportTool.java
1 /**
2 * Licensed to Cloudera, Inc. under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. Cloudera, Inc. licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package com.cloudera.sqoop.tool;
20
21 import java.io.IOException;
22
23 import java.math.BigDecimal;
24
25 import java.sql.Connection;
26 import java.sql.ResultSet;
27 import java.sql.SQLException;
28 import java.sql.Statement;
29 import java.sql.Timestamp;
30 import java.util.List;
31 import java.util.Map;
32
33 import org.apache.commons.cli.CommandLine;
34 import org.apache.commons.cli.OptionBuilder;
35 import org.apache.commons.logging.Log;
36 import org.apache.commons.logging.LogFactory;
37
38 import org.apache.hadoop.util.StringUtils;
39
40 import com.cloudera.sqoop.Sqoop;
41 import com.cloudera.sqoop.SqoopOptions;
42 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
43 import com.cloudera.sqoop.cli.RelatedOptions;
44 import com.cloudera.sqoop.cli.ToolOptions;
45 import com.cloudera.sqoop.hive.HiveImport;
46 import com.cloudera.sqoop.manager.ImportJobContext;
47
48 import com.cloudera.sqoop.metastore.JobData;
49 import com.cloudera.sqoop.metastore.JobStorage;
50 import com.cloudera.sqoop.metastore.JobStorageFactory;
51 import com.cloudera.sqoop.util.AppendUtils;
52 import com.cloudera.sqoop.util.ImportException;
53 import org.apache.hadoop.fs.Path;
54
55 /**
56 * Tool that performs database imports to HDFS.
57 */
58 public class ImportTool extends BaseSqoopTool {
59
60 public static final Log LOG = LogFactory.getLog(ImportTool.class.getName());
61
62 private CodeGenTool codeGenerator;
63
64 // true if this is an all-tables import. Set by a subclass which
65 // overrides the run() method of this tool (which can only do
66 // a single table).
67 private boolean allTables;
68
69 public ImportTool() {
70 this("import", false);
71 }
72
73 public ImportTool(String toolName, boolean allTables) {
74 super(toolName);
75 this.codeGenerator = new CodeGenTool();
76 this.allTables = allTables;
77 }
78
79 @Override
80 protected boolean init(SqoopOptions sqoopOpts) {
81 boolean ret = super.init(sqoopOpts);
82 codeGenerator.setManager(manager);
83 return ret;
84 }
85
86 /**
87 * @return a list of jar files generated as part of this import process
88 */
89 public List<String> getGeneratedJarFiles() {
90 return this.codeGenerator.getGeneratedJarFiles();
91 }
92
93 /**
94 * @return true if the supplied options specify an incremental import.
95 */
96 private boolean isIncremental(SqoopOptions options) {
97 return !options.getIncrementalMode().equals(
98 SqoopOptions.IncrementalMode.None);
99 }
100
101 /**
102 * If this is an incremental import, then we should save the
103 * user's state back to the metastore (if this job was run
104 * from the metastore). Otherwise, log to the user what data
105 * they need to supply next time.
106 */
107 private void saveIncrementalState(SqoopOptions options)
108 throws IOException {
109 if (!isIncremental(options)) {
110 return;
111 }
112
113 Map<String, String> descriptor = options.getStorageDescriptor();
114 String jobName = options.getJobName();
115
116 if (null != jobName && null != descriptor) {
117 // Actually save it back to the metastore.
118 LOG.info("Saving incremental import state to the metastore");
119 JobStorageFactory ssf = new JobStorageFactory(options.getConf());
120 JobStorage storage = ssf.getJobStorage(descriptor);
121 storage.open(descriptor);
122 try {
123 // Save the 'parent' SqoopOptions; this does not contain the mutations
124 // to the SqoopOptions state that occurred over the course of this
125 // execution, except for the one we specifically want to memorize:
126 // the latest value of the check column.
127 JobData data = new JobData(options.getParent(), this);
128 storage.update(jobName, data);
129 LOG.info("Updated data for job: " + jobName);
130 } finally {
131 storage.close();
132 }
133 } else {
134 // If there wasn't a parent SqoopOptions, then the incremental
135 // state data was stored in the current SqoopOptions.
136 LOG.info("Incremental import complete! To run another incremental "
137 + "import of all data following this import, supply the "
138 + "following arguments:");
139 SqoopOptions.IncrementalMode incrementalMode =
140 options.getIncrementalMode();
141 switch (incrementalMode) {
142 case AppendRows:
143 LOG.info(" --incremental append");
144 break;
145 case DateLastModified:
146 LOG.info(" --incremental lastmodified");
147 break;
148 default:
149 LOG.warn("Undefined incremental mode: " + incrementalMode);
150 break;
151 }
152 LOG.info(" --check-column " + options.getIncrementalTestColumn());
153 LOG.info(" --last-value " + options.getIncrementalLastValue());
154 LOG.info("(Consider saving this with 'sqoop job --create')");
155 }
156 }
157
158 /**
159 * Return the max value in the incremental-import test column. This
160 * value must be numeric.
161 */
162 private BigDecimal getMaxColumnId(SqoopOptions options) throws SQLException {
163 StringBuilder sb = new StringBuilder();
164 sb.append("SELECT MAX(");
165 sb.append(options.getIncrementalTestColumn());
166 sb.append(") FROM ");
167 sb.append(options.getTableName());
168
169 String where = options.getWhereClause();
170 if (null != where) {
171 sb.append(" WHERE ");
172 sb.append(where);
173 }
174
175 Connection conn = manager.getConnection();
176 Statement s = null;
177 ResultSet rs = null;
178 try {
179 s = conn.createStatement();
180 rs = s.executeQuery(sb.toString());
181 if (!rs.next()) {
182 // This probably means the table is empty.
183 LOG.warn("Unexpected: empty results for max value query?");
184 return null;
185 }
186
187 return rs.getBigDecimal(1);
188 } finally {
189 try {
190 if (null != rs) {
191 rs.close();
192 }
193 } catch (SQLException sqlE) {
194 LOG.warn("SQL Exception closing resultset: " + sqlE);
195 }
196
197 try {
198 if (null != s) {
199 s.close();
200 }
201 } catch (SQLException sqlE) {
202 LOG.warn("SQL Exception closing statement: " + sqlE);
203 }
204 }
205 }
206
207 /**
208 * Initialize the constraints which set the incremental import range.
209 * @return false if an import is not necessary, because the dataset has not
210 * changed.
211 */
212 private boolean initIncrementalConstraints(SqoopOptions options,
213 ImportJobContext context) throws ImportException, IOException {
214
215 // If this is an incremental import, determine the constraints
216 // to inject in the WHERE clause or $CONDITIONS for a query.
217 // Also modify the 'last value' field of the SqoopOptions to
218 // specify the current job start time / start row.
219
220 if (!isIncremental(options)) {
221 return true;
222 }
223
224 SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
225 String nextIncrementalValue = null;
226
227 switch (incrementalMode) {
228 case AppendRows:
229 try {
230 BigDecimal nextVal = getMaxColumnId(options);
231 if (null != nextVal) {
232 nextIncrementalValue = nextVal.toString();
233 }
234 } catch (SQLException sqlE) {
235 throw new IOException(sqlE);
236 }
237 break;
238 case DateLastModified:
239 Timestamp dbTimestamp = manager.getCurrentDbTimestamp();
240 if (null == dbTimestamp) {
241 throw new IOException("Could not get current time from database");
242 }
243
244 nextIncrementalValue = manager.timestampToQueryString(dbTimestamp);
245 break;
246 default:
247 throw new ImportException("Undefined incremental import type: "
248 + incrementalMode);
249 }
250
251 // Build the WHERE clause components that are used to import
252 // only this incremental section.
253 StringBuilder sb = new StringBuilder();
254 String prevEndpoint = options.getIncrementalLastValue();
255
256 if (incrementalMode == SqoopOptions.IncrementalMode.DateLastModified
257 && null != prevEndpoint && !prevEndpoint.contains("\'")) {
258 // Incremental imports based on timestamps should be 'quoted' in
259 // ANSI SQL. If the user didn't specify single-quotes, put them
260 // around, here.
261 prevEndpoint = "'" + prevEndpoint + "'";
262 }
263
264 String checkColName = manager.escapeColName(
265 options.getIncrementalTestColumn());
266 LOG.info("Incremental import based on column " + checkColName);
267 if (null != prevEndpoint) {
268 if (prevEndpoint.equals(nextIncrementalValue)) {
269 LOG.info("No new rows detected since last import.");
270 return false;
271 }
272 LOG.info("Lower bound value: " + prevEndpoint);
273 sb.append(checkColName);
274 switch (incrementalMode) {
275 case AppendRows:
276 sb.append(" > ");
277 break;
278 case DateLastModified:
279 sb.append(" >= ");
280 break;
281 default:
282 throw new ImportException("Undefined comparison");
283 }
284 sb.append(prevEndpoint);
285 sb.append(" AND ");
286 }
287
288 if (null != nextIncrementalValue) {
289 sb.append(checkColName);
290 switch (incrementalMode) {
291 case AppendRows:
292 sb.append(" <= ");
293 break;
294 case DateLastModified:
295 sb.append(" < ");
296 break;
297 default:
298 throw new ImportException("Undefined comparison");
299 }
300 sb.append(nextIncrementalValue);
301 } else {
302 sb.append(checkColName);
303 sb.append(" IS NULL ");
304 }
305
306 LOG.info("Upper bound value: " + nextIncrementalValue);
307
308 String prevWhereClause = options.getWhereClause();
309 if (null != prevWhereClause) {
310 sb.append(" AND (");
311 sb.append(prevWhereClause);
312 sb.append(")");
313 }
314
315 String newConstraints = sb.toString();
316 options.setWhereClause(newConstraints);
317
318 // Save this state for next time.
319 SqoopOptions recordOptions = options.getParent();
320 if (null == recordOptions) {
321 recordOptions = options;
322 }
323 recordOptions.setIncrementalLastValue(nextIncrementalValue);
324
325 return true;
326 }
327
328 /**
329 * Import a table or query.
330 * @return true if an import was performed, false otherwise.
331 */
332 protected boolean importTable(SqoopOptions options, String tableName,
333 HiveImport hiveImport) throws IOException, ImportException {
334 String jarFile = null;
335
336 // Generate the ORM code for the tables.
337 jarFile = codeGenerator.generateORM(options, tableName);
338
339 // Do the actual import.
340 ImportJobContext context = new ImportJobContext(tableName, jarFile,
341 options, getOutputPath(options, tableName));
342
343 // If we're doing an incremental import, set up the
344 // filtering conditions used to get the latest records.
345 if (!initIncrementalConstraints(options, context)) {
346 return false;
347 }
348
349 if (null != tableName) {
350 manager.importTable(context);
351 } else {
352 manager.importQuery(context);
353 }
354
355 if (options.isAppendMode()) {
356 AppendUtils app = new AppendUtils(context);
357 app.append();
358 }
359
360 // If the user wants this table to be in Hive, perform that post-load.
361 if (options.doHiveImport()) {
362 hiveImport.importTable(tableName, options.getHiveTableName(), false);
363 }
364
365 saveIncrementalState(options);
366
367 return true;
368 }
369
370 /**
371 * @return the output path for the imported files;
372 * in append mode this will point to a temporary folder.
373 * if importing to hbase, this may return null.
374 */
375 private Path getOutputPath(SqoopOptions options, String tableName) {
376 // Get output directory
377 String hdfsWarehouseDir = options.getWarehouseDir();
378 String hdfsTargetDir = options.getTargetDir();
379 Path outputPath = null;
380 if (options.isAppendMode()) {
381 // Use temporary path, later removed when appending
382 outputPath = AppendUtils.getTempAppendDir(tableName);
383 LOG.debug("Using temporary folder: " + outputPath.getName());
384 } else {
385 // Try in this order: target-dir or warehouse-dir
386 if (hdfsTargetDir != null) {
387 outputPath = new Path(hdfsTargetDir);
388 } else if (hdfsWarehouseDir != null) {
389 outputPath = new Path(hdfsWarehouseDir, tableName);
390 } else if (null != tableName) {
391 outputPath = new Path(tableName);
392 }
393 }
394
395 return outputPath;
396 }
397
398 @Override
399 /** {@inheritDoc} */
400 public int run(SqoopOptions options) {
401 HiveImport hiveImport = null;
402
403 if (allTables) {
404 // We got into this method, but we should be in a subclass.
405 // (This method only handles a single table)
406 // This should not be reached, but for sanity's sake, test here.
407 LOG.error("ImportTool.run() can only handle a single table.");
408 return 1;
409 }
410
411 if (!init(options)) {
412 return 1;
413 }
414
415 codeGenerator.setManager(manager);
416
417 try {
418 if (options.doHiveImport()) {
419 hiveImport = new HiveImport(options, manager, options.getConf(), false);
420 }
421
422 // Import a single table (or query) the user specified.
423 importTable(options, options.getTableName(), hiveImport);
424 } catch (IllegalArgumentException iea) {
425 LOG.error("Imported Failed: " + iea.getMessage());
426 if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) {
427 throw iea;
428 }
429 return 1;
430 } catch (IOException ioe) {
431 LOG.error("Encountered IOException running import job: "
432 + StringUtils.stringifyException(ioe));
433 if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) {
434 throw new RuntimeException(ioe);
435 } else {
436 return 1;
437 }
438 } catch (ImportException ie) {
439 LOG.error("Error during import: " + ie.toString());
440 if (System.getProperty(Sqoop.SQOOP_RETHROW_PROPERTY) != null) {
441 throw new RuntimeException(ie);
442 } else {
443 return 1;
444 }
445 } finally {
446 destroy(options);
447 }
448
449 return 0;
450 }
451
452 /**
453 * Construct the set of options that control imports, either of one
454 * table or a batch of tables.
455 * @return the RelatedOptions that can be used to parse the import
456 * arguments.
457 */
458 protected RelatedOptions getImportOptions() {
459 // Imports
460 RelatedOptions importOpts = new RelatedOptions("Import control arguments");
461
462 importOpts.addOption(OptionBuilder
463 .withDescription("Use direct import fast path")
464 .withLongOpt(DIRECT_ARG)
465 .create());
466
467 if (!allTables) {
468 importOpts.addOption(OptionBuilder.withArgName("table-name")
469 .hasArg().withDescription("Table to read")
470 .withLongOpt(TABLE_ARG)
471 .create());
472 importOpts.addOption(OptionBuilder.withArgName("col,col,col...")
473 .hasArg().withDescription("Columns to import from table")
474 .withLongOpt(COLUMNS_ARG)
475 .create());
476 importOpts.addOption(OptionBuilder.withArgName("column-name")
477 .hasArg()
478 .withDescription("Column of the table used to split work units")
479 .withLongOpt(SPLIT_BY_ARG)
480 .create());
481 importOpts.addOption(OptionBuilder.withArgName("where clause")
482 .hasArg().withDescription("WHERE clause to use during import")
483 .withLongOpt(WHERE_ARG)
484 .create());
485 importOpts.addOption(OptionBuilder
486 .withDescription("Imports data in append mode")
487 .withLongOpt(APPEND_ARG)
488 .create());
489 importOpts.addOption(OptionBuilder.withArgName("dir")
490 .hasArg().withDescription("HDFS plain table destination")
491 .withLongOpt(TARGET_DIR_ARG)
492 .create());
493 importOpts.addOption(OptionBuilder.withArgName("statement")
494 .hasArg()
495 .withDescription("Import results of SQL 'statement'")
496 .withLongOpt(SQL_QUERY_ARG)
497 .create(SQL_QUERY_SHORT_ARG));
498 }
499
500 importOpts.addOption(OptionBuilder.withArgName("dir")
501 .hasArg().withDescription("HDFS parent for table destination")
502 .withLongOpt(WAREHOUSE_DIR_ARG)
503 .create());
504 importOpts.addOption(OptionBuilder
505 .withDescription("Imports data to SequenceFiles")
506 .withLongOpt(FMT_SEQUENCEFILE_ARG)
507 .create());
508 importOpts.addOption(OptionBuilder
509 .withDescription("Imports data as plain text (default)")
510 .withLongOpt(FMT_TEXTFILE_ARG)
511 .create());
512 importOpts.addOption(OptionBuilder
513 .withDescription("Imports data to Avro data files")
514 .withLongOpt(FMT_AVRODATAFILE_ARG)
515 .create());
516 importOpts.addOption(OptionBuilder.withArgName("n")
517 .hasArg().withDescription("Use 'n' map tasks to import in parallel")
518 .withLongOpt(NUM_MAPPERS_ARG)
519 .create(NUM_MAPPERS_SHORT_ARG));
520 importOpts.addOption(OptionBuilder
521 .withDescription("Enable compression")
522 .withLongOpt(COMPRESS_ARG)
523 .create(COMPRESS_SHORT_ARG));
524 importOpts.addOption(OptionBuilder.withArgName("codec")
525 .hasArg()
526 .withDescription("Compression codec to use for import")
527 .withLongOpt(COMPRESSION_CODEC_ARG)
528 .create());
529 importOpts.addOption(OptionBuilder.withArgName("n")
530 .hasArg()
531 .withDescription("Split the input stream every 'n' bytes "
532 + "when importing in direct mode")
533 .withLongOpt(DIRECT_SPLIT_SIZE_ARG)
534 .create());
535 importOpts.addOption(OptionBuilder.withArgName("n")
536 .hasArg()
537 .withDescription("Set the maximum size for an inline LOB")
538 .withLongOpt(INLINE_LOB_LIMIT_ARG)
539 .create());
540 importOpts.addOption(OptionBuilder.withArgName("n")
541 .hasArg()
542 .withDescription("Set number 'n' of rows to fetch from the "
543 + "database when more rows are needed")
544 .withLongOpt(FETCH_SIZE_ARG)
545 .create());
546
547 return importOpts;
548 }
549
550 /**
551 * Return options for incremental import.
552 */
553 protected RelatedOptions getIncrementalOptions() {
554 RelatedOptions incrementalOpts =
555 new RelatedOptions("Incremental import arguments");
556
557 incrementalOpts.addOption(OptionBuilder.withArgName("import-type")
558 .hasArg()
559 .withDescription(
560 "Define an incremental import of type 'append' or 'lastmodified'")
561 .withLongOpt(INCREMENT_TYPE_ARG)
562 .create());
563 incrementalOpts.addOption(OptionBuilder.withArgName("column")
564 .hasArg()
565 .withDescription("Source column to check for incremental change")
566 .withLongOpt(INCREMENT_COL_ARG)
567 .create());
568 incrementalOpts.addOption(OptionBuilder.withArgName("value")
569 .hasArg()
570 .withDescription("Last imported value in the incremental check column")
571 .withLongOpt(INCREMENT_LAST_VAL_ARG)
572 .create());
573
574 return incrementalOpts;
575 }
576
577 @Override
578 /** Configure the command-line arguments we expect to receive */
579 public void configureOptions(ToolOptions toolOptions) {
580
581 toolOptions.addUniqueOptions(getCommonOptions());
582 toolOptions.addUniqueOptions(getImportOptions());
583 if (!allTables) {
584 toolOptions.addUniqueOptions(getIncrementalOptions());
585 }
586 toolOptions.addUniqueOptions(getOutputFormatOptions());
587 toolOptions.addUniqueOptions(getInputFormatOptions());
588 toolOptions.addUniqueOptions(getHiveOptions(true));
589 toolOptions.addUniqueOptions(getHBaseOptions());
590
591 // get common codegen opts.
592 RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
593
594 // add import-specific codegen opts:
595 codeGenOpts.addOption(OptionBuilder.withArgName("file")
596 .hasArg()
597 .withDescription("Disable code generation; use specified jar")
598 .withLongOpt(JAR_FILE_NAME_ARG)
599 .create());
600
601 toolOptions.addUniqueOptions(codeGenOpts);
602 }
603
604 @Override
605 /** {@inheritDoc} */
606 public void printHelp(ToolOptions toolOptions) {
607 super.printHelp(toolOptions);
608 System.out.println("");
609 if (allTables) {
610 System.out.println("At minimum, you must specify --connect");
611 } else {
612 System.out.println(
613 "At minimum, you must specify --connect and --table");
614 }
615
616 System.out.println(
617 "Arguments to mysqldump and other subprograms may be supplied");
618 System.out.println(
619 "after a '--' on the command line.");
620 }
621
622 private void applyIncrementalOptions(CommandLine in, SqoopOptions out)
623 throws InvalidOptionsException {
624 if (in.hasOption(INCREMENT_TYPE_ARG)) {
625 String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG);
626 if ("append".equals(incrementalTypeStr)) {
627 out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows);
628 // This argument implies ability to append to the same directory.
629 out.setAppendMode(true);
630 } else if ("lastmodified".equals(incrementalTypeStr)) {
631 out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
632 } else {
633 throw new InvalidOptionsException("Unknown incremental import mode: "
634 + incrementalTypeStr + ". Use 'append' or 'lastmodified'."
635 + HELP_STR);
636 }
637 }
638
639 if (in.hasOption(INCREMENT_COL_ARG)) {
640 out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG));
641 }
642
643 if (in.hasOption(INCREMENT_LAST_VAL_ARG)) {
644 out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG));
645 }
646 }
647
648 @Override
649 /** {@inheritDoc} */
650 public void applyOptions(CommandLine in, SqoopOptions out)
651 throws InvalidOptionsException {
652
653 try {
654 applyCommonOptions(in, out);
655
656 if (in.hasOption(DIRECT_ARG)) {
657 out.setDirectMode(true);
658 }
659
660 if (!allTables) {
661 if (in.hasOption(TABLE_ARG)) {
662 out.setTableName(in.getOptionValue(TABLE_ARG));
663 }
664
665 if (in.hasOption(COLUMNS_ARG)) {
666 String[] cols= in.getOptionValue(COLUMNS_ARG).split(",");
667 for (int i=0; i<cols.length; i++) {
668 cols[i] = cols[i].trim();
669 }
670 out.setColumns(cols);
671 }
672
673 if (in.hasOption(SPLIT_BY_ARG)) {
674 out.setSplitByCol(in.getOptionValue(SPLIT_BY_ARG));
675 }
676
677 if (in.hasOption(WHERE_ARG)) {
678 out.setWhereClause(in.getOptionValue(WHERE_ARG));
679 }
680
681 if (in.hasOption(TARGET_DIR_ARG)) {
682 out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG));
683 }
684
685 if (in.hasOption(APPEND_ARG)) {
686 out.setAppendMode(true);
687 }
688
689 if (in.hasOption(SQL_QUERY_ARG)) {
690 out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
691 }
692 }
693
694 if (in.hasOption(WAREHOUSE_DIR_ARG)) {
695 out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG));
696 }
697
698 if (in.hasOption(FMT_SEQUENCEFILE_ARG)) {
699 out.setFileLayout(SqoopOptions.FileLayout.SequenceFile);
700 }
701
702 if (in.hasOption(FMT_TEXTFILE_ARG)) {
703 out.setFileLayout(SqoopOptions.FileLayout.TextFile);
704 }
705
706 if (in.hasOption(FMT_AVRODATAFILE_ARG)) {
707 out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
708 }
709
710 if (in.hasOption(NUM_MAPPERS_ARG)) {
711 out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
712 }
713
714 if (in.hasOption(COMPRESS_ARG)) {
715 out.setUseCompression(true);
716 }
717
718 if (in.hasOption(COMPRESSION_CODEC_ARG)) {
719 out.setCompressionCodec(in.getOptionValue(COMPRESSION_CODEC_ARG));
720 }
721
722 if (in.hasOption(DIRECT_SPLIT_SIZE_ARG)) {
723 out.setDirectSplitSize(Long.parseLong(in.getOptionValue(
724 DIRECT_SPLIT_SIZE_ARG)));
725 }
726
727 if (in.hasOption(INLINE_LOB_LIMIT_ARG)) {
728 out.setInlineLobLimit(Long.parseLong(in.getOptionValue(
729 INLINE_LOB_LIMIT_ARG)));
730 }
731
732 if (in.hasOption(FETCH_SIZE_ARG)) {
733 out.setFetchSize(new Integer(in.getOptionValue(FETCH_SIZE_ARG)));
734 }
735
736 if (in.hasOption(JAR_FILE_NAME_ARG)) {
737 out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
738 }
739
740 applyIncrementalOptions(in, out);
741 applyHiveOptions(in, out);
742 applyOutputFormatOptions(in, out);
743 applyInputFormatOptions(in, out);
744 applyCodeGenOptions(in, out, allTables);
745 applyHBaseOptions(in, out);
746 } catch (NumberFormatException nfe) {
747 throw new InvalidOptionsException("Error: expected numeric argument.\n"
748 + "Try --help for usage.");
749 }
750 }
751
752 /**
753 * Validate import-specific arguments.
754 * @param options the configured SqoopOptions to check
755 */
756 protected void validateImportOptions(SqoopOptions options)
757 throws InvalidOptionsException {
758 if (!allTables && options.getTableName() == null
759 && options.getSqlQuery() == null) {
760 throw new InvalidOptionsException(
761 "--table or --" + SQL_QUERY_ARG + " is required for import. "
762 + "(Or use sqoop import-all-tables.)"
763 + HELP_STR);
764 } else if (options.getExistingJarName() != null
765 && options.getClassName() == null) {
766 throw new InvalidOptionsException("Jar specified with --jar-file, but no "
767 + "class specified with --class-name." + HELP_STR);
768 } else if (options.getTargetDir() != null
769 && options.getWarehouseDir() != null) {
770 throw new InvalidOptionsException(
771 "--target-dir with --warehouse-dir are incompatible options."
772 + HELP_STR);
773 } else if (options.getTableName() != null
774 && options.getSqlQuery() != null) {
775 throw new InvalidOptionsException(
776 "Cannot specify --" + SQL_QUERY_ARG + " and --table together."
777 + HELP_STR);
778 } else if (options.getSqlQuery() != null
779 && options.getTargetDir() == null && options.getHBaseTable() == null) {
780 throw new InvalidOptionsException(
781 "Must specify destination with --target-dir."
782 + HELP_STR);
783 } else if (options.getSqlQuery() != null && options.doHiveImport()
784 && options.getHiveTableName() == null) {
785 throw new InvalidOptionsException(
786 "When importing a query to Hive, you must specify --"
787 + HIVE_TABLE_ARG + "." + HELP_STR);
788 } else if (options.getSqlQuery() != null && options.getNumMappers() > 1
789 && options.getSplitByCol() == null) {
790 throw new InvalidOptionsException(
791 "When importing query results in parallel, you must specify --"
792 + SPLIT_BY_ARG + "." + HELP_STR);
793 }
794 }
795
796 /**
797 * Validate the incremental import options.
798 */
799 private void validateIncrementalOptions(SqoopOptions options)
800 throws InvalidOptionsException {
801 if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
802 && options.getIncrementalTestColumn() == null) {
803 throw new InvalidOptionsException(
804 "For an incremental import, the check column must be specified "
805 + "with --" + INCREMENT_COL_ARG + ". " + HELP_STR);
806 }
807
808 if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None
809 && options.getIncrementalTestColumn() != null) {
810 throw new InvalidOptionsException(
811 "You must specify an incremental import mode with --"
812 + INCREMENT_TYPE_ARG + ". " + HELP_STR);
813 }
814
815 if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
816 && options.getTableName() == null) {
817 throw new InvalidOptionsException("Incremental imports require a table."
818 + HELP_STR);
819 }
820 }
821
822 @Override
823 /** {@inheritDoc} */
824 public void validateOptions(SqoopOptions options)
825 throws InvalidOptionsException {
826
827 // If extraArguments is full, check for '--' followed by args for
828 // mysqldump or other commands we rely on.
829 options.setExtraArgs(getSubcommandArgs(extraArguments));
830 int dashPos = getDashPosition(extraArguments);
831
832 if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
833 throw new InvalidOptionsException(HELP_STR);
834 }
835
836 validateImportOptions(options);
837 validateIncrementalOptions(options);
838 validateCommonOptions(options);
839 validateCodeGenOptions(options);
840 validateOutputFormatOptions(options);
841 validateHBaseOptions(options);
842 }
843 }
844