c2729119d31f7e585f204f2d31b2051eea71b72b
[sqoop.git] / src / java / org / apache / sqoop / hive / HiveImport.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.hive;
20
21 import java.io.BufferedWriter;
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.OutputStreamWriter;
25 import java.io.IOException;
26 import java.lang.reflect.InvocationTargetException;
27 import java.lang.reflect.Method;
28 import java.util.Arrays;
29 import java.util.LinkedList;
30 import java.util.List;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.fs.FileStatus;
35 import org.apache.hadoop.fs.FileSystem;
36 import org.apache.hadoop.fs.Path;
37 import org.apache.hadoop.conf.Configuration;
38 import org.apache.hadoop.mapred.FileOutputCommitter;
39 import org.apache.hadoop.util.ReflectionUtils;
40 import org.apache.hadoop.util.Shell;
41 import org.apache.hadoop.util.ToolRunner;
42 import org.apache.hadoop.util.Tool;
43 import org.apache.sqoop.io.CodecMap;
44 import org.apache.sqoop.util.Executor;
45 import org.apache.sqoop.util.LoggingAsyncSink;
46 import org.apache.sqoop.util.SubprocessSecurityManager;
47
48 import org.apache.sqoop.SqoopOptions;
49 import org.apache.sqoop.manager.ConnManager;
50 import org.apache.sqoop.util.ExitSecurityException;
51
52 /**
53 * Utility to import a table into the Hive metastore. Manages the connection
54 * to Hive itself as well as orchestrating the use of the other classes in this
55 * package.
56 */
57 public class HiveImport {
58
59 public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
60
61 private SqoopOptions options;
62 private ConnManager connManager;
63 private Configuration configuration;
64 private boolean generateOnly;
65 private static boolean testMode = false;
66
67 public static boolean getTestMode() {
68 return testMode;
69 }
70
71 public static void setTestMode(boolean mode) {
72 testMode = mode;
73 }
74
75 /** Entry point through which Hive invocation should be attempted. */
76 private static final String HIVE_MAIN_CLASS =
77 "org.apache.hadoop.hive.cli.CliDriver";
78
79 public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
80 final Configuration conf, final boolean generateOnly) {
81 this.options = opts;
82 this.connManager = connMgr;
83 this.configuration = conf;
84 this.generateOnly = generateOnly;
85 }
86
87
88 /**
89 * @return the filename of the hive executable to run to do the import
90 */
91 private String getHiveBinPath() {
92 // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
93 // exists.
94 // Fall back to just plain 'hive' and hope it's in the path.
95
96 String hiveHome = options.getHiveHome();
97 String hiveCommand = Shell.WINDOWS ? "hive.cmd" : "hive";
98 if (null == hiveHome) {
99 return hiveCommand;
100 }
101
102 Path p = new Path(hiveHome);
103 p = new Path(p, "bin");
104 p = new Path(p, hiveCommand);
105 String hiveBinStr = p.toString();
106 if (new File(hiveBinStr).exists()) {
107 return hiveBinStr;
108 } else {
109 return hiveCommand;
110 }
111 }
112
113 /**
114 * If we used a MapReduce-based upload of the data, remove the _logs dir
115 * from where we put it, before running Hive LOAD DATA INPATH.
116 */
117 private void removeTempLogs(Path tablePath) throws IOException {
118 FileSystem fs = tablePath.getFileSystem(configuration);
119 Path logsPath = new Path(tablePath, "_logs");
120 if (fs.exists(logsPath)) {
121 LOG.info("Removing temporary files from import process: " + logsPath);
122 if (!fs.delete(logsPath, true)) {
123 LOG.warn("Could not delete temporary files; "
124 + "continuing with import, but it may fail.");
125 }
126 }
127 }
128
129 /**
130 * @return true if we're just generating the DDL for the import, but
131 * not actually running it (i.e., --generate-only mode). If so, don't
132 * do any side-effecting actions in Hive.
133 */
134 private boolean isGenerateOnly() {
135 return generateOnly;
136 }
137
138 /**
139 * @return a File object that can be used to write the DDL statement.
140 * If we're in gen-only mode, this should be a file in the outdir, named
141 * after the Hive table we're creating. If we're in import mode, this should
142 * be a one-off temporary file.
143 */
144 private File getScriptFile(String outputTableName) throws IOException {
145 if (!isGenerateOnly()) {
146 return File.createTempFile("hive-script-", ".txt",
147 new File(options.getTempDir()));
148 } else {
149 return new File(new File(options.getCodeOutputDir()),
150 outputTableName + ".q");
151 }
152 }
153
154 /**
155 * Perform the import of data from an HDFS path to a Hive table.
156 *
157 * @param inputTableName the name of the table as loaded into HDFS
158 * @param outputTableName the name of the table to create in Hive.
159 * @param createOnly if true, run the CREATE TABLE statement but not
160 * LOAD DATA.
161 */
162 public void importTable(String inputTableName, String outputTableName,
163 boolean createOnly) throws IOException {
164
165 if (null == outputTableName) {
166 outputTableName = inputTableName;
167 }
168 LOG.debug("Hive.inputTable: " + inputTableName);
169 LOG.debug("Hive.outputTable: " + outputTableName);
170
171 // For testing purposes against our mock hive implementation,
172 // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
173 // environment variable for the child hive process. We also disable
174 // timestamp comments so that we have deterministic table creation scripts.
175 String expectedScript = System.getProperty("expected.script");
176 List<String> env = Executor.getCurEnvpStrings();
177 boolean debugMode = expectedScript != null;
178 if (debugMode) {
179 env.add("EXPECTED_SCRIPT=" + expectedScript);
180 env.add("TMPDIR=" + options.getTempDir());
181 }
182
183 // generate the HQL statements to run.
184 // reset the connection as it might have timed out
185 connManager.discardConnection(true);
186 TableDefWriter tableWriter = new TableDefWriter(options, connManager,
187 inputTableName, outputTableName,
188 configuration, !debugMode);
189 String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
190 String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
191 Path finalPath = tableWriter.getFinalPath();
192
193 if (!isGenerateOnly()) {
194 removeTempLogs(finalPath);
195 LOG.info("Loading uploaded data into Hive");
196
197 String codec = options.getCompressionCodec();
198 if (codec != null && (codec.equals(CodecMap.LZOP)
199 || codec.equals(CodecMap.getCodecClassName(CodecMap.LZOP)))) {
200 try {
201 Tool tool = ReflectionUtils.newInstance(Class.
202 forName("com.hadoop.compression.lzo.DistributedLzoIndexer").
203 asSubclass(Tool.class), configuration);
204 ToolRunner.run(configuration, tool,
205 new String[] { finalPath.toString() });
206 } catch (Exception ex) {
207 LOG.error("Error indexing lzo files", ex);
208 throw new IOException("Error indexing lzo files", ex);
209 }
210 }
211 }
212
213 // write them to a script file.
214 File scriptFile = getScriptFile(outputTableName);
215 try {
216 String filename = scriptFile.toString();
217 BufferedWriter w = null;
218 try {
219 FileOutputStream fos = new FileOutputStream(scriptFile);
220 w = new BufferedWriter(new OutputStreamWriter(fos));
221 w.write(createTableStr, 0, createTableStr.length());
222 if (!createOnly) {
223 w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
224 }
225 } catch (IOException ioe) {
226 LOG.error("Error writing Hive load-in script: " + ioe.toString());
227 ioe.printStackTrace();
228 throw ioe;
229 } finally {
230 if (null != w) {
231 try {
232 w.close();
233 } catch (IOException ioe) {
234 LOG.warn("IOException closing stream to Hive script: "
235 + ioe.toString());
236 }
237 }
238 }
239
240 if (!isGenerateOnly()) {
241 executeScript(filename, env);
242
243 LOG.info("Hive import complete.");
244
245 cleanUp(finalPath);
246 }
247 } finally {
248 if (!isGenerateOnly()) {
249 // User isn't interested in saving the DDL. Remove the file.
250 if (!scriptFile.delete()) {
251 LOG.warn("Could not remove temporary file: " + scriptFile.toString());
252 // try to delete the file later.
253 scriptFile.deleteOnExit();
254 }
255 }
256 }
257 }
258
259 /**
260 * Clean up after successful HIVE import.
261 *
262 * @param outputPath path to the output directory
263 * @throws IOException
264 */
265 private void cleanUp(Path outputPath) throws IOException {
266 FileSystem fs = outputPath.getFileSystem(configuration);
267
268 // HIVE is not always removing input directory after LOAD DATA statement
269 // (which is our export directory). We're removing export directory in case
270 // that is blank for case that user wants to periodically populate HIVE
271 // table (for example with --hive-overwrite).
272 try {
273 if (outputPath != null && fs.exists(outputPath)) {
274 FileStatus[] statuses = fs.listStatus(outputPath);
275 if (statuses.length == 0) {
276 LOG.info("Export directory is empty, removing it.");
277 fs.delete(outputPath, true);
278 } else if (statuses.length == 1 && statuses[0].getPath().getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME)) {
279 LOG.info("Export directory is contains the _SUCCESS file only, removing the directory.");
280 fs.delete(outputPath, true);
281 } else {
282 LOG.info("Export directory is not empty, keeping it.");
283 }
284 }
285 } catch(IOException e) {
286 LOG.error("Issue with cleaning (safe to ignore)", e);
287 }
288 }
289
290 @SuppressWarnings("unchecked")
291 /**
292 * Execute the script file via Hive.
293 * If Hive's jars are on the classpath, run it in the same process.
294 * Otherwise, execute the file with 'bin/hive'.
295 *
296 * @param filename The script file to run.
297 * @param env the environment strings to pass to any subprocess.
298 * @throws IOException if Hive did not exit successfully.
299 */
300 private void executeScript(String filename, List<String> env)
301 throws IOException {
302 SubprocessSecurityManager subprocessSM = null;
303
304 if (testMode) {
305 // We use external mock hive process for test mode as
306 // HCatalog dependency would have brought in Hive classes.
307 LOG.debug("Using external Hive process in test mode.");
308 executeExternalHiveScript(filename, env);
309 return;
310 }
311
312 try {
313 Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
314
315 // We loaded the CLI Driver in this JVM, so we will just
316 // call it in-process. The CliDriver class has a method:
317 // void main(String [] args) throws Exception.
318 //
319 // We'll call that here to invoke 'hive -f scriptfile'.
320 // Because this method will call System.exit(), we use
321 // a SecurityManager to prevent this.
322 LOG.debug("Using in-process Hive instance.");
323
324 subprocessSM = new SubprocessSecurityManager();
325 subprocessSM.install();
326
327 String[] argv = getHiveArgs("-f", filename);
328
329 // And invoke the static method on this array.
330 Method mainMethod = cliDriverClass.getMethod("main", String[].class);
331 mainMethod.invoke(null, (Object) argv);
332
333 } catch (ClassNotFoundException cnfe) {
334 // Hive is not on the classpath. Run externally.
335 // This is not an error path.
336 LOG.debug("Using external Hive process.");
337 executeExternalHiveScript(filename, env);
338 } catch (NoSuchMethodException nsme) {
339 // Could not find a handle to the main() method.
340 throw new IOException("Could not access CliDriver.main()", nsme);
341 } catch (IllegalAccessException iae) {
342 // Error getting a handle on the main() method.
343 throw new IOException("Could not access CliDriver.main()", iae);
344 } catch (InvocationTargetException ite) {
345 // We ran CliDriver.main() and an exception was thrown from within Hive.
346 // This may have been the ExitSecurityException triggered by the
347 // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
348 // an IOException and rethrow.
349
350 Throwable cause = ite.getCause();
351 if (cause instanceof ExitSecurityException) {
352 ExitSecurityException ese = (ExitSecurityException) cause;
353 int status = ese.getExitStatus();
354 if (status != 0) {
355 throw new IOException("Hive CliDriver exited with status=" + status);
356 }
357 } else {
358 throw new IOException("Exception thrown in Hive", ite);
359 }
360 } finally {
361 if (null != subprocessSM) {
362 // Uninstall the SecurityManager used to trap System.exit().
363 subprocessSM.uninstall();
364 }
365 }
366 }
367
368 /**
369 * Execute Hive via an external 'bin/hive' process.
370 * @param filename the Script file to run.
371 * @param env the environment strings to pass to any subprocess.
372 * @throws IOException if Hive did not exit successfully.
373 */
374 private void executeExternalHiveScript(String filename, List<String> env)
375 throws IOException {
376 // run Hive on the script and note the return code.
377 String hiveExec = getHiveBinPath();
378
379 String[] argv = getHiveArgs(hiveExec, "-f", filename);
380
381 LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
382 int ret = Executor.exec(argv, env.toArray(new String[0]), logSink, logSink);
383 if (0 != ret) {
384 throw new IOException("Hive exited with status " + ret);
385 }
386 }
387
388 private String[] getHiveArgs(String... args) throws IOException {
389 List<String> newArgs = new LinkedList<String>();
390 newArgs.addAll(Arrays.asList(args));
391
392 HiveConfig.addHiveConfigs(HiveConfig.getHiveConf(configuration), configuration);
393
394 if (configuration.getBoolean(HiveConfig.HIVE_SASL_ENABLED, false)) {
395 newArgs.add("--hiveconf");
396 newArgs.add("hive.metastore.sasl.enabled=true");
397 }
398
399 return newArgs.toArray(new String[newArgs.size()]);
400 }
401 }
402