SQOOP-3309: Implement HiveServer2 client
[sqoop.git] / src / java / org / apache / sqoop / tool / ImportTool.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.tool;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.ResultSet;
24 import java.sql.ResultSetMetaData;
25 import java.sql.SQLException;
26 import java.sql.Statement;
27 import java.sql.Types;
28 import java.util.List;
29 import java.util.Map;
30
31 import org.apache.commons.cli.CommandLine;
32 import org.apache.commons.cli.OptionBuilder;
33 import org.apache.commons.logging.Log;
34 import org.apache.commons.logging.LogFactory;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.util.StringUtils;
39 import org.apache.sqoop.avro.AvroSchemaMismatchException;
40
41 import org.apache.sqoop.SqoopOptions;
42 import org.apache.sqoop.SqoopOptions.InvalidOptionsException;
43 import org.apache.sqoop.cli.RelatedOptions;
44 import org.apache.sqoop.cli.ToolOptions;
45 import org.apache.sqoop.hive.HiveClient;
46 import org.apache.sqoop.hive.HiveClientFactory;
47 import org.apache.sqoop.manager.ImportJobContext;
48 import org.apache.sqoop.mapreduce.MergeJob;
49 import org.apache.sqoop.metastore.JobData;
50 import org.apache.sqoop.metastore.JobStorage;
51 import org.apache.sqoop.metastore.JobStorageFactory;
52 import org.apache.sqoop.orm.ClassWriter;
53 import org.apache.sqoop.orm.TableClassName;
54 import org.apache.sqoop.util.AppendUtils;
55 import org.apache.sqoop.util.ClassLoaderStack;
56 import org.apache.sqoop.util.ImportException;
57
58 import static org.apache.sqoop.manager.SupportedManagers.MYSQL;
59
60 /**
61 * Tool that performs database imports to HDFS.
62 */
63 public class ImportTool extends BaseSqoopTool {
64
65 public static final Log LOG = LogFactory.getLog(ImportTool.class.getName());
66
67 private static final String IMPORT_FAILED_ERROR_MSG = "Import failed: ";
68
69 private CodeGenTool codeGenerator;
70
71 // true if this is an all-tables import. Set by a subclass which
72 // overrides the run() method of this tool (which can only do
73 // a single table).
74 private boolean allTables;
75
76 // store check column type for incremental option
77 private int checkColumnType;
78
79 // Set classloader for local job runner
80 private ClassLoader prevClassLoader = null;
81
82 private final HiveClientFactory hiveClientFactory;
83
84 public ImportTool() {
85 this("import", false);
86 }
87
88 public ImportTool(String toolName, boolean allTables) {
89 this(toolName, new CodeGenTool(), allTables, new HiveClientFactory());
90 }
91
92 public ImportTool(String toolName, CodeGenTool codeGenerator, boolean allTables, HiveClientFactory hiveClientFactory) {
93 super(toolName);
94 this.codeGenerator = codeGenerator;
95 this.allTables = allTables;
96 this.hiveClientFactory = hiveClientFactory;
97 }
98
99 @Override
100 protected boolean init(SqoopOptions sqoopOpts) {
101 boolean ret = super.init(sqoopOpts);
102 codeGenerator.setManager(manager);
103 return ret;
104 }
105
106 /**
107 * @return a list of jar files generated as part of this import process
108 */
109 public List<String> getGeneratedJarFiles() {
110 return this.codeGenerator.getGeneratedJarFiles();
111 }
112
113 /**
114 * If jars must be loaded into the local environment, do so here.
115 */
116 private void loadJars(Configuration conf, String ormJarFile,
117 String tableClassName) throws IOException {
118
119 boolean isLocal = "local".equals(conf.get("mapreduce.jobtracker.address"))
120 || "local".equals(conf.get("mapred.job.tracker"));
121 if (isLocal) {
122 // If we're using the LocalJobRunner, then instead of using the compiled
123 // jar file as the job source, we're running in the current thread. Push
124 // on another classloader that loads from that jar in addition to
125 // everything currently on the classpath.
126 this.prevClassLoader = ClassLoaderStack.addJarFile(ormJarFile,
127 tableClassName);
128 }
129 }
130
131 /**
132 * If any classloader was invoked by loadJars, free it here.
133 */
134 private void unloadJars() {
135 if (null != this.prevClassLoader) {
136 // unload the special classloader for this jar.
137 ClassLoaderStack.setCurrentClassLoader(this.prevClassLoader);
138 }
139 }
140
141 /**
142 * @return true if the supplied options specify an incremental import.
143 */
144 private boolean isIncremental(SqoopOptions options) {
145 return !options.getIncrementalMode().equals(
146 SqoopOptions.IncrementalMode.None);
147 }
148
149 /**
150 * If this is an incremental import, then we should save the
151 * user's state back to the metastore (if this job was run
152 * from the metastore). Otherwise, log to the user what data
153 * they need to supply next time.
154 */
155 private void saveIncrementalState(SqoopOptions options)
156 throws IOException {
157 if (!isIncremental(options)) {
158 return;
159 }
160
161 Map<String, String> descriptor = options.getStorageDescriptor();
162 String jobName = options.getJobName();
163
164 if (null != jobName && null != descriptor) {
165 // Actually save it back to the metastore.
166 LOG.info("Saving incremental import state to the metastore");
167 JobStorageFactory ssf = new JobStorageFactory(options.getConf());
168 JobStorage storage = ssf.getJobStorage(descriptor);
169 storage.open(descriptor);
170 try {
171 // Save the 'parent' SqoopOptions; this does not contain the mutations
172 // to the SqoopOptions state that occurred over the course of this
173 // execution, except for the one we specifically want to memorize:
174 // the latest value of the check column.
175 JobData data = new JobData(options.getParent(), this);
176 storage.update(jobName, data);
177 LOG.info("Updated data for job: " + jobName);
178 } finally {
179 storage.close();
180 }
181 } else {
182 // If there wasn't a parent SqoopOptions, then the incremental
183 // state data was stored in the current SqoopOptions.
184 LOG.info("Incremental import complete! To run another incremental "
185 + "import of all data following this import, supply the "
186 + "following arguments:");
187 SqoopOptions.IncrementalMode incrementalMode =
188 options.getIncrementalMode();
189 switch (incrementalMode) {
190 case AppendRows:
191 LOG.info(" --incremental append");
192 break;
193 case DateLastModified:
194 LOG.info(" --incremental lastmodified");
195 break;
196 default:
197 LOG.warn("Undefined incremental mode: " + incrementalMode);
198 break;
199 }
200 LOG.info(" --check-column " + options.getIncrementalTestColumn());
201 LOG.info(" --last-value " + options.getIncrementalLastValue());
202 LOG.info("(Consider saving this with 'sqoop job --create')");
203 }
204 }
205
206 /**
207 * Return the max value in the incremental-import test column. This
208 * value must be numeric.
209 */
210 private Object getMaxColumnId(SqoopOptions options) throws SQLException {
211 StringBuilder sb = new StringBuilder();
212 String query;
213
214 sb.append("SELECT MAX(");
215 sb.append(manager.escapeColName(options.getIncrementalTestColumn()));
216 sb.append(") FROM ");
217
218 if (options.getTableName() != null) {
219 // Table import
220 sb.append(manager.escapeTableName(options.getTableName()));
221
222 String where = options.getWhereClause();
223 if (null != where) {
224 sb.append(" WHERE ");
225 sb.append(where);
226 }
227 query = sb.toString();
228 } else {
229 // Free form table based import
230 sb.append("(");
231 sb.append(options.getSqlQuery());
232 sb.append(") sqoop_import_query_alias");
233
234 query = sb.toString().replaceAll("\\$CONDITIONS", "(1 = 1)");
235 }
236
237 Connection conn = manager.getConnection();
238 Statement s = null;
239 ResultSet rs = null;
240 try {
241 LOG.info("Maximal id query for free form incremental import: " + query);
242 s = conn.createStatement();
243 rs = s.executeQuery(query);
244 if (!rs.next()) {
245 // This probably means the table is empty.
246 LOG.warn("Unexpected: empty results for max value query?");
247 return null;
248 }
249
250 ResultSetMetaData rsmd = rs.getMetaData();
251 checkColumnType = rsmd.getColumnType(1);
252 if (checkColumnType == Types.TIMESTAMP) {
253 return rs.getTimestamp(1);
254 } else if (checkColumnType == Types.DATE) {
255 return rs.getDate(1);
256 } else if (checkColumnType == Types.TIME) {
257 return rs.getTime(1);
258 } else {
259 return rs.getObject(1);
260 }
261 } finally {
262 try {
263 if (null != rs) {
264 rs.close();
265 }
266 } catch (SQLException sqlE) {
267 LOG.warn("SQL Exception closing resultset: " + sqlE);
268 }
269
270 try {
271 if (null != s) {
272 s.close();
273 }
274 } catch (SQLException sqlE) {
275 LOG.warn("SQL Exception closing statement: " + sqlE);
276 }
277 }
278 }
279
280 /**
281 * Determine if a column is date/time.
282 * @return true if column type is TIMESTAMP, DATE, or TIME.
283 */
284 private boolean isDateTimeColumn(int columnType) {
285 return (columnType == Types.TIMESTAMP)
286 || (columnType == Types.DATE)
287 || (columnType == Types.TIME);
288 }
289
290 /**
291 * Initialize the constraints which set the incremental import range.
292 * @return false if an import is not necessary, because the dataset has not
293 * changed.
294 */
295 private boolean initIncrementalConstraints(SqoopOptions options,
296 ImportJobContext context) throws ImportException, IOException {
297
298 // If this is an incremental import, determine the constraints
299 // to inject in the WHERE clause or $CONDITIONS for a query.
300 // Also modify the 'last value' field of the SqoopOptions to
301 // specify the current job start time / start row.
302
303 if (!isIncremental(options)) {
304 return true;
305 }
306
307 SqoopOptions.IncrementalMode incrementalMode = options.getIncrementalMode();
308 String nextIncrementalValue = null;
309
310 Object nextVal;
311 switch (incrementalMode) {
312 case AppendRows:
313 try {
314 nextVal = getMaxColumnId(options);
315 if (isDateTimeColumn(checkColumnType)) {
316 nextIncrementalValue = (nextVal == null) ? null
317 : manager.datetimeToQueryString(nextVal.toString(),
318 checkColumnType);
319 } else if (manager.isCharColumn(checkColumnType)) {
320 throw new ImportException("Character column "
321 + "(" + options.getIncrementalTestColumn() + ") can not be used "
322 + "to determine which rows to incrementally import.");
323 } else {
324 nextIncrementalValue = (nextVal == null) ? null : nextVal.toString();
325 }
326 } catch (SQLException sqlE) {
327 throw new IOException(sqlE);
328 }
329 break;
330 case DateLastModified:
331 if (options.getMergeKeyCol() == null && !options.isAppendMode()) {
332 Path outputPath = getOutputPath(options, context.getTableName(), false);
333 FileSystem fs = outputPath.getFileSystem(options.getConf());
334 if (fs.exists(outputPath)) {
335 throw new ImportException("--" + MERGE_KEY_ARG + " or " + "--" + APPEND_ARG
336 + " is required when using --" + this.INCREMENT_TYPE_ARG
337 + " lastmodified and the output directory exists.");
338 }
339 }
340 checkColumnType = manager.getColumnTypes(options.getTableName(),
341 options.getSqlQuery()).get(options.getIncrementalTestColumn());
342 nextVal = manager.getCurrentDbTimestamp();
343 if (null == nextVal) {
344 throw new IOException("Could not get current time from database");
345 }
346 nextIncrementalValue = manager.datetimeToQueryString(nextVal.toString(),
347 checkColumnType);
348 break;
349 default:
350 throw new ImportException("Undefined incremental import type: "
351 + incrementalMode);
352 }
353
354 // Build the WHERE clause components that are used to import
355 // only this incremental section.
356 StringBuilder sb = new StringBuilder();
357 String prevEndpoint = options.getIncrementalLastValue();
358
359 if (isDateTimeColumn(checkColumnType) && null != prevEndpoint
360 && !prevEndpoint.startsWith("\'") && !prevEndpoint.endsWith("\'")) {
361 // Incremental imports based on date/time should be 'quoted' in
362 // ANSI SQL. If the user didn't specify single-quotes, put them
363 // around, here.
364 prevEndpoint = manager.datetimeToQueryString(prevEndpoint,
365 checkColumnType);
366 }
367
368 String checkColName = manager.escapeColName(
369 options.getIncrementalTestColumn());
370 LOG.info("Incremental import based on column " + checkColName);
371 if (null != prevEndpoint) {
372 if (prevEndpoint.equals(nextIncrementalValue)) {
373 LOG.info("No new rows detected since last import.");
374 return false;
375 }
376 LOG.info("Lower bound value: " + prevEndpoint);
377 sb.append(checkColName);
378 switch (incrementalMode) {
379 case AppendRows:
380 sb.append(" > ");
381 break;
382 case DateLastModified:
383 sb.append(" >= ");
384 break;
385 default:
386 throw new ImportException("Undefined comparison");
387 }
388 sb.append(prevEndpoint);
389 sb.append(" AND ");
390 }
391
392 if (null != nextIncrementalValue) {
393 sb.append(checkColName);
394 switch (incrementalMode) {
395 case AppendRows:
396 sb.append(" <= ");
397 break;
398 case DateLastModified:
399 sb.append(" < ");
400 break;
401 default:
402 throw new ImportException("Undefined comparison");
403 }
404 sb.append(nextIncrementalValue);
405 } else {
406 sb.append(checkColName);
407 sb.append(" IS NULL ");
408 }
409
410 LOG.info("Upper bound value: " + nextIncrementalValue);
411
412 if (options.getTableName() != null) {
413 // Table based import
414 String prevWhereClause = options.getWhereClause();
415 if (null != prevWhereClause) {
416 sb.append(" AND (");
417 sb.append(prevWhereClause);
418 sb.append(")");
419 }
420
421 String newConstraints = sb.toString();
422 options.setWhereClause(newConstraints);
423 } else {
424 // Incremental based import
425 sb.append(" AND $CONDITIONS");
426 String newQuery = options.getSqlQuery().replace(
427 "$CONDITIONS", sb.toString());
428 options.setSqlQuery(newQuery);
429 }
430 // Save this state for next time.
431 SqoopOptions recordOptions = options.getParent();
432 if (null == recordOptions) {
433 recordOptions = options;
434 }
435 recordOptions.setIncrementalLastValue(
436 (nextVal == null) ? null : nextVal.toString());
437
438 return true;
439 }
440
441 /**
442 * Merge HDFS output directories
443 */
444 protected void lastModifiedMerge(SqoopOptions options, ImportJobContext context) throws IOException {
445 if (context.getDestination() == null) {
446 return;
447 }
448
449 Path userDestDir = getOutputPath(options, context.getTableName(), false);
450 FileSystem fs = userDestDir.getFileSystem(options.getConf());
451 if (fs.exists(context.getDestination())) {
452 LOG.info("Final destination exists, will run merge job.");
453 if (fs.exists(userDestDir)) {
454 String tableClassName = null;
455 if (!context.getConnManager().isORMFacilitySelfManaged()) {
456 tableClassName =
457 new TableClassName(options).getClassForTable(context.getTableName());
458 }
459 Path destDir = getOutputPath(options, context.getTableName());
460 options.setExistingJarName(context.getJarFile());
461 options.setClassName(tableClassName);
462 options.setMergeOldPath(userDestDir.toString());
463 options.setMergeNewPath(context.getDestination().toString());
464 // Merge to temporary directory so that original directory remains intact.
465 options.setTargetDir(destDir.toString());
466
467 // Local job tracker needs jars in the classpath.
468 if (options.getFileLayout() == SqoopOptions.FileLayout.ParquetFile) {
469 loadJars(options.getConf(), context.getJarFile(), ClassWriter.toJavaIdentifier("codegen_" +
470 context.getTableName()));
471 } else {
472 loadJars(options.getConf(), context.getJarFile(), context.getTableName());
473 }
474
475 MergeJob mergeJob = new MergeJob(options);
476 if (mergeJob.runMergeJob()) {
477 // Rename destination directory to proper location.
478 Path tmpDir = getOutputPath(options, context.getTableName());
479 fs.rename(userDestDir, tmpDir);
480 fs.rename(destDir, userDestDir);
481 fs.delete(tmpDir, true);
482 } else {
483 LOG.error("Merge MapReduce job failed!");
484 }
485
486 unloadJars();
487 } else {
488 // Create parent directory(ies), otherwise fs.rename would fail
489 if(!fs.exists(userDestDir.getParent())) {
490 fs.mkdirs(userDestDir.getParent());
491 }
492
493 // And finally move the data
494 LOG.info("Moving data from temporary directory " + context.getDestination() + " to final destination " + userDestDir);
495 if(!fs.rename(context.getDestination(), userDestDir)) {
496 throw new RuntimeException("Couldn't move data from temporary directory " + context.getDestination() + " to final destination " + userDestDir);
497 }
498 }
499 }
500 }
501
502 /**
503 * Import a table or query.
504 * @return true if an import was performed, false otherwise.
505 */
506 protected boolean importTable(SqoopOptions options) throws IOException, ImportException {
507 String jarFile = null;
508
509 // Generate the ORM code for the tables.
510 jarFile = codeGenerator.generateORM(options, options.getTableName());
511
512 Path outputPath = getOutputPath(options, options.getTableName());
513
514 // Do the actual import.
515 ImportJobContext context = new ImportJobContext(options.getTableName(), jarFile,
516 options, outputPath);
517
518 // If we're doing an incremental import, set up the
519 // filtering conditions used to get the latest records.
520 if (!initIncrementalConstraints(options, context)) {
521 return false;
522 }
523
524 if (options.isDeleteMode()) {
525 deleteTargetDir(context);
526 }
527
528 if (null != options.getTableName()) {
529 manager.importTable(context);
530 } else {
531 manager.importQuery(context);
532 }
533
534 if (options.isAppendMode()) {
535 AppendUtils app = new AppendUtils(context);
536 app.append();
537 } else if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified) {
538 lastModifiedMerge(options, context);
539 }
540
541 // If the user wants this table to be in Hive, perform that post-load.
542 if (options.doHiveImport()) {
543 // For Parquet file, the import action will create hive table directly via
544 // kite. So there is no need to do hive import as a post step again.
545 if (options.getFileLayout() != SqoopOptions.FileLayout.ParquetFile) {
546 HiveClient hiveClient = hiveClientFactory.createHiveClient(options, manager);
547 hiveClient.importTable();
548 }
549 }
550
551 saveIncrementalState(options);
552
553 return true;
554 }
555
556 private void deleteTargetDir(ImportJobContext context) throws IOException {
557
558 SqoopOptions options = context.getOptions();
559 Path destDir = context.getDestination();
560 FileSystem fs = destDir.getFileSystem(options.getConf());
561
562 if (fs.exists(destDir)) {
563 fs.delete(destDir, true);
564 LOG.info("Destination directory " + destDir + " deleted.");
565 return;
566 } else {
567 LOG.info("Destination directory " + destDir + " is not present, "
568 + "hence not deleting.");
569 }
570 }
571
572 /**
573 * @return the output path for the imported files;
574 * in append mode this will point to a temporary folder.
575 * if importing to hbase, this may return null.
576 */
577 private Path getOutputPath(SqoopOptions options, String tableName) {
578 return getOutputPath(options, tableName, options.isAppendMode()
579 || options.getIncrementalMode().equals(SqoopOptions.IncrementalMode.DateLastModified));
580 }
581
582 /**
583 * @return the output path for the imported files;
584 * if importing to hbase, this may return null.
585 */
586 private Path getOutputPath(SqoopOptions options, String tableName, boolean temp) {
587 // Get output directory
588 String hdfsWarehouseDir = options.getWarehouseDir();
589 String hdfsTargetDir = options.getTargetDir();
590 Path outputPath = null;
591 if (temp) {
592 // Use temporary path, later removed when appending
593 String salt = tableName;
594 if(salt == null && options.getSqlQuery() != null) {
595 salt = Integer.toHexString(options.getSqlQuery().hashCode());
596 }
597 outputPath = AppendUtils.getTempAppendDir(salt, options);
598 LOG.debug("Using temporary folder: " + outputPath.getName());
599 } else {
600 // Try in this order: target-dir or warehouse-dir
601 if (hdfsTargetDir != null) {
602 outputPath = new Path(hdfsTargetDir);
603 } else if (hdfsWarehouseDir != null) {
604 outputPath = new Path(hdfsWarehouseDir, tableName);
605 } else if (null != tableName) {
606 outputPath = new Path(tableName);
607 }
608 }
609
610 return outputPath;
611 }
612
613 @Override
614 /** {@inheritDoc} */
615 public int run(SqoopOptions options) {
616 if (allTables) {
617 // We got into this method, but we should be in a subclass.
618 // (This method only handles a single table)
619 // This should not be reached, but for sanity's sake, test here.
620 LOG.error("ImportTool.run() can only handle a single table.");
621 return 1;
622 }
623
624 if (!init(options)) {
625 return 1;
626 }
627
628 codeGenerator.setManager(manager);
629
630 try {
631 // Import a single table (or query) the user specified.
632 importTable(options);
633 } catch (IllegalArgumentException iea) {
634 LOG.error(IMPORT_FAILED_ERROR_MSG + iea.getMessage());
635 rethrowIfRequired(options, iea);
636 return 1;
637 } catch (IOException ioe) {
638 LOG.error(IMPORT_FAILED_ERROR_MSG + StringUtils.stringifyException(ioe));
639 rethrowIfRequired(options, ioe);
640 return 1;
641 } catch (ImportException ie) {
642 LOG.error(IMPORT_FAILED_ERROR_MSG + ie.toString());
643 rethrowIfRequired(options, ie);
644 return 1;
645 } catch (AvroSchemaMismatchException e) {
646 LOG.error(IMPORT_FAILED_ERROR_MSG, e);
647 rethrowIfRequired(options, e);
648 return 1;
649 } finally {
650 destroy(options);
651 }
652
653 return 0;
654 }
655
656 /**
657 * Construct the set of options that control imports, either of one
658 * table or a batch of tables.
659 * @return the RelatedOptions that can be used to parse the import
660 * arguments.
661 */
662 @SuppressWarnings("static-access")
663 protected RelatedOptions getImportOptions() {
664 // Imports
665 RelatedOptions importOpts = new RelatedOptions("Import control arguments");
666
667 importOpts.addOption(OptionBuilder
668 .withDescription("Use direct import fast path")
669 .withLongOpt(DIRECT_ARG)
670 .create());
671
672 if (!allTables) {
673 importOpts.addOption(OptionBuilder.withArgName("table-name")
674 .hasArg().withDescription("Table to read")
675 .withLongOpt(TABLE_ARG)
676 .create());
677 importOpts.addOption(OptionBuilder.withArgName("col,col,col...")
678 .hasArg().withDescription("Columns to import from table")
679 .withLongOpt(COLUMNS_ARG)
680 .create());
681 importOpts.addOption(OptionBuilder.withArgName("column-name")
682 .hasArg()
683 .withDescription("Column of the table used to split work units")
684 .withLongOpt(SPLIT_BY_ARG)
685 .create());
686 importOpts
687 .addOption(OptionBuilder
688 .withArgName("size")
689 .hasArg()
690 .withDescription(
691 "Upper Limit of rows per split for split columns of Date/Time/Timestamp and integer types. For date or timestamp fields it is calculated in seconds. split-limit should be greater than 0")
692 .withLongOpt(SPLIT_LIMIT_ARG)
693 .create());
694 importOpts.addOption(OptionBuilder.withArgName("where clause")
695 .hasArg().withDescription("WHERE clause to use during import")
696 .withLongOpt(WHERE_ARG)
697 .create());
698 importOpts.addOption(OptionBuilder
699 .withDescription("Imports data in append mode")
700 .withLongOpt(APPEND_ARG)
701 .create());
702 importOpts.addOption(OptionBuilder
703 .withDescription("Imports data in delete mode")
704 .withLongOpt(DELETE_ARG)
705 .create());
706 importOpts.addOption(OptionBuilder.withArgName("dir")
707 .hasArg().withDescription("HDFS plain table destination")
708 .withLongOpt(TARGET_DIR_ARG)
709 .create());
710 importOpts.addOption(OptionBuilder.withArgName("statement")
711 .hasArg()
712 .withDescription("Import results of SQL 'statement'")
713 .withLongOpt(SQL_QUERY_ARG)
714 .create(SQL_QUERY_SHORT_ARG));
715 importOpts.addOption(OptionBuilder.withArgName("statement")
716 .hasArg()
717 .withDescription("Set boundary query for retrieving max and min"
718 + " value of the primary key")
719 .withLongOpt(SQL_QUERY_BOUNDARY)
720 .create());
721 importOpts.addOption(OptionBuilder.withArgName("column")
722 .hasArg().withDescription("Key column to use to join results")
723 .withLongOpt(MERGE_KEY_ARG)
724 .create());
725
726 addValidationOpts(importOpts);
727 }
728
729 importOpts.addOption(OptionBuilder.withArgName("dir")
730 .hasArg().withDescription("HDFS parent for table destination")
731 .withLongOpt(WAREHOUSE_DIR_ARG)
732 .create());
733 importOpts.addOption(OptionBuilder
734 .withDescription("Imports data to SequenceFiles")
735 .withLongOpt(FMT_SEQUENCEFILE_ARG)
736 .create());
737 importOpts.addOption(OptionBuilder
738 .withDescription("Imports data as plain text (default)")
739 .withLongOpt(FMT_TEXTFILE_ARG)
740 .create());
741 importOpts.addOption(OptionBuilder
742 .withDescription("Imports data to Avro data files")
743 .withLongOpt(FMT_AVRODATAFILE_ARG)
744 .create());
745 importOpts.addOption(OptionBuilder
746 .withDescription("Imports data to Parquet files")
747 .withLongOpt(BaseSqoopTool.FMT_PARQUETFILE_ARG)
748 .create());
749 importOpts.addOption(OptionBuilder.withArgName("n")
750 .hasArg().withDescription("Use 'n' map tasks to import in parallel")
751 .withLongOpt(NUM_MAPPERS_ARG)
752 .create(NUM_MAPPERS_SHORT_ARG));
753 importOpts.addOption(OptionBuilder.withArgName("name")
754 .hasArg().withDescription("Set name for generated mapreduce job")
755 .withLongOpt(MAPREDUCE_JOB_NAME)
756 .create());
757 importOpts.addOption(OptionBuilder
758 .withDescription("Enable compression")
759 .withLongOpt(COMPRESS_ARG)
760 .create(COMPRESS_SHORT_ARG));
761 importOpts.addOption(OptionBuilder.withArgName("codec")
762 .hasArg()
763 .withDescription("Compression codec to use for import")
764 .withLongOpt(COMPRESSION_CODEC_ARG)
765 .create());
766 importOpts.addOption(OptionBuilder.withArgName("n")
767 .hasArg()
768 .withDescription("Split the input stream every 'n' bytes "
769 + "when importing in direct mode")
770 .withLongOpt(DIRECT_SPLIT_SIZE_ARG)
771 .create());
772 importOpts.addOption(OptionBuilder.withArgName("n")
773 .hasArg()
774 .withDescription("Set the maximum size for an inline LOB")
775 .withLongOpt(INLINE_LOB_LIMIT_ARG)
776 .create());
777 importOpts.addOption(OptionBuilder.withArgName("n")
778 .hasArg()
779 .withDescription("Set number 'n' of rows to fetch from the "
780 + "database when more rows are needed")
781 .withLongOpt(FETCH_SIZE_ARG)
782 .create());
783 importOpts.addOption(OptionBuilder.withArgName("reset-mappers")
784 .withDescription("Reset the number of mappers to one mapper if no split key available")
785 .withLongOpt(AUTORESET_TO_ONE_MAPPER)
786 .create());
787 return importOpts;
788 }
789
790 /**
791 * Return options for incremental import.
792 */
793 protected RelatedOptions getIncrementalOptions() {
794 RelatedOptions incrementalOpts =
795 new RelatedOptions("Incremental import arguments");
796
797 incrementalOpts.addOption(OptionBuilder.withArgName("import-type")
798 .hasArg()
799 .withDescription(
800 "Define an incremental import of type 'append' or 'lastmodified'")
801 .withLongOpt(INCREMENT_TYPE_ARG)
802 .create());
803 incrementalOpts.addOption(OptionBuilder.withArgName("column")
804 .hasArg()
805 .withDescription("Source column to check for incremental change")
806 .withLongOpt(INCREMENT_COL_ARG)
807 .create());
808 incrementalOpts.addOption(OptionBuilder.withArgName("value")
809 .hasArg()
810 .withDescription("Last imported value in the incremental check column")
811 .withLongOpt(INCREMENT_LAST_VAL_ARG)
812 .create());
813
814 return incrementalOpts;
815 }
816
817 @Override
818 /** Configure the command-line arguments we expect to receive */
819 public void configureOptions(ToolOptions toolOptions) {
820
821 toolOptions.addUniqueOptions(getCommonOptions());
822 toolOptions.addUniqueOptions(getImportOptions());
823 if (!allTables) {
824 toolOptions.addUniqueOptions(getIncrementalOptions());
825 }
826 toolOptions.addUniqueOptions(getOutputFormatOptions());
827 toolOptions.addUniqueOptions(getInputFormatOptions());
828 toolOptions.addUniqueOptions(getHiveOptions(true));
829 toolOptions.addUniqueOptions(getHBaseOptions());
830 toolOptions.addUniqueOptions(getHCatalogOptions());
831 toolOptions.addUniqueOptions(getHCatImportOnlyOptions());
832 toolOptions.addUniqueOptions(getAccumuloOptions());
833
834 // get common codegen opts.
835 RelatedOptions codeGenOpts = getCodeGenOpts(allTables);
836
837 // add import-specific codegen opts:
838 codeGenOpts.addOption(OptionBuilder.withArgName("file")
839 .hasArg()
840 .withDescription("Disable code generation; use specified jar")
841 .withLongOpt(JAR_FILE_NAME_ARG)
842 .create());
843
844 toolOptions.addUniqueOptions(codeGenOpts);
845 }
846
847 @Override
848 /** {@inheritDoc} */
849 public void printHelp(ToolOptions toolOptions) {
850 super.printHelp(toolOptions);
851 System.out.println("");
852 if (allTables) {
853 System.out.println("At minimum, you must specify --connect");
854 } else {
855 System.out.println(
856 "At minimum, you must specify --connect and --table");
857 }
858
859 System.out.println(
860 "Arguments to mysqldump and other subprograms may be supplied");
861 System.out.println(
862 "after a '--' on the command line.");
863 }
864
865 private void applyIncrementalOptions(CommandLine in, SqoopOptions out)
866 throws InvalidOptionsException {
867 if (in.hasOption(INCREMENT_TYPE_ARG)) {
868 String incrementalTypeStr = in.getOptionValue(INCREMENT_TYPE_ARG);
869 if ("append".equals(incrementalTypeStr)) {
870 out.setIncrementalMode(SqoopOptions.IncrementalMode.AppendRows);
871 // This argument implies ability to append to the same directory.
872 out.setAppendMode(true);
873 } else if ("lastmodified".equals(incrementalTypeStr)) {
874 out.setIncrementalMode(SqoopOptions.IncrementalMode.DateLastModified);
875 } else {
876 throw new InvalidOptionsException("Unknown incremental import mode: "
877 + incrementalTypeStr + ". Use 'append' or 'lastmodified'."
878 + HELP_STR);
879 }
880 }
881
882 if (in.hasOption(INCREMENT_COL_ARG)) {
883 out.setIncrementalTestColumn(in.getOptionValue(INCREMENT_COL_ARG));
884 }
885
886 if (in.hasOption(INCREMENT_LAST_VAL_ARG)) {
887 out.setIncrementalLastValue(in.getOptionValue(INCREMENT_LAST_VAL_ARG));
888 }
889 }
890
891 @Override
892 /** {@inheritDoc} */
893 public void applyOptions(CommandLine in, SqoopOptions out)
894 throws InvalidOptionsException {
895
896 try {
897 applyCommonOptions(in, out);
898
899 if (in.hasOption(DIRECT_ARG)) {
900 out.setDirectMode(true);
901 }
902
903 if (!allTables) {
904 if (in.hasOption(TABLE_ARG)) {
905 out.setTableName(in.getOptionValue(TABLE_ARG));
906 }
907
908 if (in.hasOption(COLUMNS_ARG)) {
909 String[] cols= in.getOptionValue(COLUMNS_ARG).split(",");
910 for (int i=0; i<cols.length; i++) {
911 cols[i] = cols[i].trim();
912 }
913 out.setColumns(cols);
914 }
915
916 if (in.hasOption(SPLIT_BY_ARG)) {
917 out.setSplitByCol(in.getOptionValue(SPLIT_BY_ARG));
918 }
919
920 if (in.hasOption(SPLIT_LIMIT_ARG)) {
921 out.setSplitLimit(Integer.parseInt(in.getOptionValue(SPLIT_LIMIT_ARG)));
922 }
923
924 if (in.hasOption(WHERE_ARG)) {
925 out.setWhereClause(in.getOptionValue(WHERE_ARG));
926 }
927
928 if (in.hasOption(TARGET_DIR_ARG)) {
929 out.setTargetDir(in.getOptionValue(TARGET_DIR_ARG));
930 }
931
932 if (in.hasOption(APPEND_ARG)) {
933 out.setAppendMode(true);
934 }
935
936 if (in.hasOption(DELETE_ARG)) {
937 out.setDeleteMode(true);
938 }
939
940 if (in.hasOption(SQL_QUERY_ARG)) {
941 out.setSqlQuery(in.getOptionValue(SQL_QUERY_ARG));
942 }
943
944 if (in.hasOption(SQL_QUERY_BOUNDARY)) {
945 out.setBoundaryQuery(in.getOptionValue(SQL_QUERY_BOUNDARY));
946 }
947
948 if (in.hasOption(MERGE_KEY_ARG)) {
949 out.setMergeKeyCol(in.getOptionValue(MERGE_KEY_ARG));
950 }
951
952 applyValidationOptions(in, out);
953 }
954
955 if (in.hasOption(WAREHOUSE_DIR_ARG)) {
956 out.setWarehouseDir(in.getOptionValue(WAREHOUSE_DIR_ARG));
957 }
958
959 if (in.hasOption(FMT_SEQUENCEFILE_ARG)) {
960 out.setFileLayout(SqoopOptions.FileLayout.SequenceFile);
961 }
962
963 if (in.hasOption(FMT_TEXTFILE_ARG)) {
964 out.setFileLayout(SqoopOptions.FileLayout.TextFile);
965 }
966
967 if (in.hasOption(FMT_AVRODATAFILE_ARG)) {
968 out.setFileLayout(SqoopOptions.FileLayout.AvroDataFile);
969 }
970
971 if (in.hasOption(FMT_PARQUETFILE_ARG)) {
972 out.setFileLayout(SqoopOptions.FileLayout.ParquetFile);
973 }
974
975 if (in.hasOption(NUM_MAPPERS_ARG)) {
976 out.setNumMappers(Integer.parseInt(in.getOptionValue(NUM_MAPPERS_ARG)));
977 }
978
979 if (in.hasOption(MAPREDUCE_JOB_NAME)) {
980 out.setMapreduceJobName(in.getOptionValue(MAPREDUCE_JOB_NAME));
981 }
982
983 if (in.hasOption(COMPRESS_ARG)) {
984 out.setUseCompression(true);
985 }
986
987 if (in.hasOption(COMPRESSION_CODEC_ARG)) {
988 out.setCompressionCodec(in.getOptionValue(COMPRESSION_CODEC_ARG));
989 }
990
991 if (in.hasOption(DIRECT_SPLIT_SIZE_ARG)) {
992 out.setDirectSplitSize(Long.parseLong(in.getOptionValue(
993 DIRECT_SPLIT_SIZE_ARG)));
994 }
995
996 if (in.hasOption(INLINE_LOB_LIMIT_ARG)) {
997 out.setInlineLobLimit(Long.parseLong(in.getOptionValue(
998 INLINE_LOB_LIMIT_ARG)));
999 }
1000
1001 if (in.hasOption(FETCH_SIZE_ARG)) {
1002 out.setFetchSize(new Integer(in.getOptionValue(FETCH_SIZE_ARG)));
1003 }
1004
1005 if (in.hasOption(JAR_FILE_NAME_ARG)) {
1006 out.setExistingJarName(in.getOptionValue(JAR_FILE_NAME_ARG));
1007 }
1008
1009 if (in.hasOption(AUTORESET_TO_ONE_MAPPER)) {
1010 out.setAutoResetToOneMapper(true);
1011 }
1012
1013 if (in.hasOption(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)) {
1014 out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
1015 ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
1016 }
1017
1018 applyIncrementalOptions(in, out);
1019 applyHiveOptions(in, out);
1020 applyOutputFormatOptions(in, out);
1021 applyInputFormatOptions(in, out);
1022 applyCodeGenOptions(in, out, allTables);
1023 applyHBaseOptions(in, out);
1024 applyHCatalogOptions(in, out);
1025 applyAccumuloOptions(in, out);
1026
1027 } catch (NumberFormatException nfe) {
1028 throw new InvalidOptionsException("Error: expected numeric argument.\n"
1029 + "Try --help for usage.");
1030 }
1031 }
1032
1033 /**
1034 * Validate import-specific arguments.
1035 * @param options the configured SqoopOptions to check
1036 */
1037 protected void validateImportOptions(SqoopOptions options)
1038 throws InvalidOptionsException {
1039 if (!allTables && options.getTableName() == null
1040 && options.getSqlQuery() == null) {
1041 throw new InvalidOptionsException(
1042 "--table or --" + SQL_QUERY_ARG + " is required for import. "
1043 + "(Or use sqoop import-all-tables.)"
1044 + HELP_STR);
1045 } else if (options.getExistingJarName() != null
1046 && options.getClassName() == null) {
1047 throw new InvalidOptionsException("Jar specified with --jar-file, but no "
1048 + "class specified with --class-name." + HELP_STR);
1049 } else if (options.getTargetDir() != null
1050 && options.getWarehouseDir() != null) {
1051 throw new InvalidOptionsException(
1052 "--target-dir with --warehouse-dir are incompatible options."
1053 + HELP_STR);
1054 } else if (options.getTableName() != null
1055 && options.getSqlQuery() != null) {
1056 throw new InvalidOptionsException(
1057 "Cannot specify --" + SQL_QUERY_ARG + " and --table together."
1058 + HELP_STR);
1059 } else if (options.getSqlQuery() != null
1060 && options.getTargetDir() == null
1061 && options.getHBaseTable() == null
1062 && options.getHCatTableName() == null
1063 && options.getAccumuloTable() == null) {
1064 throw new InvalidOptionsException(
1065 "Must specify destination with --target-dir. "
1066 + HELP_STR);
1067 } else if (options.getSqlQuery() != null && options.doHiveImport()
1068 && options.getHiveTableName() == null) {
1069 throw new InvalidOptionsException(
1070 "When importing a query to Hive, you must specify --"
1071 + HIVE_TABLE_ARG + "." + HELP_STR);
1072 } else if (options.getSqlQuery() != null && options.getNumMappers() > 1
1073 && options.getSplitByCol() == null) {
1074 throw new InvalidOptionsException(
1075 "When importing query results in parallel, you must specify --"
1076 + SPLIT_BY_ARG + "." + HELP_STR);
1077 } else if (options.isDirect()) {
1078 validateDirectImportOptions(options);
1079 } else if (allTables && options.isValidationEnabled()) {
1080 throw new InvalidOptionsException("Validation is not supported for "
1081 + "all tables but single table only.");
1082 } else if (options.getSqlQuery() != null && options.isValidationEnabled()) {
1083 throw new InvalidOptionsException("Validation is not supported for "
1084 + "free from query but single table only.");
1085 } else if (options.getWhereClause() != null
1086 && options.isValidationEnabled()) {
1087 throw new InvalidOptionsException("Validation is not supported for "
1088 + "where clause but single table only.");
1089 } else if (options.getIncrementalMode()
1090 != SqoopOptions.IncrementalMode.None && options.isValidationEnabled()) {
1091 throw new InvalidOptionsException("Validation is not supported for "
1092 + "incremental imports but single table only.");
1093 } else if ((options.getTargetDir() != null
1094 || options.getWarehouseDir() != null)
1095 && options.getHCatTableName() != null) {
1096 throw new InvalidOptionsException("--hcatalog-table cannot be used "
1097 + " --warehouse-dir or --target-dir options");
1098 } else if (options.isDeleteMode() && options.isAppendMode()) {
1099 throw new InvalidOptionsException("--append and --delete-target-dir can"
1100 + " not be used together.");
1101 } else if (options.isDeleteMode() && options.getIncrementalMode()
1102 != SqoopOptions.IncrementalMode.None) {
1103 throw new InvalidOptionsException("--delete-target-dir can not be used"
1104 + " with incremental imports.");
1105 } else if (options.getAutoResetToOneMapper()
1106 && (options.getSplitByCol() != null)) {
1107 throw new InvalidOptionsException("--autoreset-to-one-mapper and"
1108 + " --split-by cannot be used together.");
1109 }
1110 }
1111
1112 void validateDirectImportOptions(SqoopOptions options) throws InvalidOptionsException {
1113 validateDirectMysqlOptions(options);
1114 validateDirectDropHiveDelimOption(options);
1115 validateHasDirectConnectorOption(options);
1116 }
1117
1118 void validateDirectDropHiveDelimOption(SqoopOptions options) throws InvalidOptionsException {
1119 if (options.doHiveDropDelims()) {
1120 throw new InvalidOptionsException(
1121 "Direct import currently do not support dropping hive delimiters,"
1122 + " please remove parameter --hive-drop-import-delims.");
1123 }
1124 }
1125
1126 void validateDirectMysqlOptions(SqoopOptions options) throws InvalidOptionsException {
1127 if (!MYSQL.isTheManagerTypeOf(options)) {
1128 return;
1129 }
1130 if (options.getFileLayout() != SqoopOptions.FileLayout.TextFile) {
1131 throw new InvalidOptionsException(
1132 "MySQL direct import currently supports only text output format. "
1133 + "Parameters --as-sequencefile --as-avrodatafile and --as-parquetfile are not "
1134 + "supported with --direct params in MySQL case.");
1135 }
1136 if (options.getNullStringValue() != null || options.getNullNonStringValue() != null) {
1137 throw new InvalidOptionsException(
1138 "The --direct option is not compatible with the --null-string or " +
1139 "--null-non-string command for MySQL imports");
1140 }
1141 }
1142 /**
1143 * Validate the incremental import options.
1144 */
1145 private void validateIncrementalOptions(SqoopOptions options)
1146 throws InvalidOptionsException {
1147 if (options.getIncrementalMode() != SqoopOptions.IncrementalMode.None
1148 && options.getIncrementalTestColumn() == null) {
1149 throw new InvalidOptionsException(
1150 "For an incremental import, the check column must be specified "
1151 + "with --" + INCREMENT_COL_ARG + ". " + HELP_STR);
1152 }
1153
1154 if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.None
1155 && options.getIncrementalTestColumn() != null) {
1156 throw new InvalidOptionsException(
1157 "You must specify an incremental import mode with --"
1158 + INCREMENT_TYPE_ARG + ". " + HELP_STR);
1159 }
1160
1161 if (options.getIncrementalMode() == SqoopOptions.IncrementalMode.DateLastModified
1162 && options.getFileLayout() == SqoopOptions.FileLayout.AvroDataFile) {
1163 throw new InvalidOptionsException("--"
1164 + INCREMENT_TYPE_ARG + " lastmodified cannot be used in conjunction with --"
1165 + FMT_AVRODATAFILE_ARG + "." + HELP_STR);
1166 }
1167 }
1168
1169 @Override
1170 /** {@inheritDoc} */
1171 public void validateOptions(SqoopOptions options)
1172 throws InvalidOptionsException {
1173
1174 // If extraArguments is full, check for '--' followed by args for
1175 // mysqldump or other commands we rely on.
1176 options.setExtraArgs(getSubcommandArgs(extraArguments));
1177 int dashPos = getDashPosition(extraArguments);
1178
1179 if (hasUnrecognizedArgs(extraArguments, 0, dashPos)) {
1180 throw new InvalidOptionsException(HELP_STR);
1181 }
1182
1183 validateImportOptions(options);
1184 validateIncrementalOptions(options);
1185 validateCommonOptions(options);
1186 validateCodeGenOptions(options);
1187 validateOutputFormatOptions(options);
1188 validateHBaseOptions(options);
1189 validateHiveOptions(options);
1190 validateHCatalogOptions(options);
1191 validateAccumuloOptions(options);
1192 }
1193 }
1194