SQOOP-3309: Implement HiveServer2 client
[sqoop.git] / src / java / org / apache / sqoop / hive / HiveImport.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.hive;
20
21 import java.io.BufferedWriter;
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.OutputStreamWriter;
25 import java.io.IOException;
26 import java.lang.reflect.InvocationTargetException;
27 import java.lang.reflect.Method;
28 import java.util.Arrays;
29 import java.util.LinkedList;
30 import java.util.List;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.conf.Configuration;
36 import org.apache.hadoop.util.Shell;
37 import org.apache.sqoop.util.Executor;
38 import org.apache.sqoop.util.LoggingAsyncSink;
39 import org.apache.sqoop.util.SubprocessSecurityManager;
40
41 import org.apache.sqoop.SqoopOptions;
42 import org.apache.sqoop.manager.ConnManager;
43 import org.apache.sqoop.util.ExitSecurityException;
44
45 /**
46 * Utility to import a table into the Hive metastore. Manages the connection
47 * to Hive itself as well as orchestrating the use of the other classes in this
48 * package.
49 */
50 public class HiveImport implements HiveClient {
51
52 public static final Log LOG = LogFactory.getLog(HiveImport.class.getName());
53
54 private SqoopOptions options;
55 private ConnManager connManager;
56 private Configuration configuration;
57 private boolean generateOnly;
58 private HiveClientCommon hiveClientCommon;
59 private static boolean testMode = false;
60
61 public static boolean getTestMode() {
62 return testMode;
63 }
64
65 public static void setTestMode(boolean mode) {
66 testMode = mode;
67 }
68
69 /** Entry point through which Hive invocation should be attempted. */
70 private static final String HIVE_MAIN_CLASS =
71 "org.apache.hadoop.hive.cli.CliDriver";
72
73 public HiveImport(final SqoopOptions opts, final ConnManager connMgr,
74 final Configuration conf, final boolean generateOnly, final HiveClientCommon hiveClientCommon) {
75 this.options = opts;
76 this.connManager = connMgr;
77 this.configuration = conf;
78 this.generateOnly = generateOnly;
79 this.hiveClientCommon = hiveClientCommon;
80 }
81
82 public HiveImport(SqoopOptions opts, ConnManager connMgr, Configuration conf, boolean generateOnly) {
83 this(opts, connMgr, conf, generateOnly, new HiveClientCommon());
84 }
85
86 /**
87 * @return the filename of the hive executable to run to do the import
88 */
89 private String getHiveBinPath() {
90 // If the user has $HIVE_HOME set, then use $HIVE_HOME/bin/hive if it
91 // exists.
92 // Fall back to just plain 'hive' and hope it's in the path.
93
94 String hiveHome = options.getHiveHome();
95 String hiveCommand = Shell.WINDOWS ? "hive.cmd" : "hive";
96 if (null == hiveHome) {
97 return hiveCommand;
98 }
99
100 Path p = new Path(hiveHome);
101 p = new Path(p, "bin");
102 p = new Path(p, hiveCommand);
103 String hiveBinStr = p.toString();
104 if (new File(hiveBinStr).exists()) {
105 return hiveBinStr;
106 } else {
107 return hiveCommand;
108 }
109 }
110
111 /**
112 * @return true if we're just generating the DDL for the import, but
113 * not actually running it (i.e., --generate-only mode). If so, don't
114 * do any side-effecting actions in Hive.
115 */
116 boolean isGenerateOnly() {
117 return generateOnly;
118 }
119
120 /**
121 * @return a File object that can be used to write the DDL statement.
122 * If we're in gen-only mode, this should be a file in the outdir, named
123 * after the Hive table we're creating. If we're in import mode, this should
124 * be a one-off temporary file.
125 */
126 private File getScriptFile(String outputTableName) throws IOException {
127 if (!isGenerateOnly()) {
128 return File.createTempFile("hive-script-", ".txt",
129 new File(options.getTempDir()));
130 } else {
131 return new File(new File(options.getCodeOutputDir()),
132 outputTableName + ".q");
133 }
134 }
135
136 /**
137 * Perform the import of data from an HDFS path to a Hive table.
138 *
139 * @param inputTableName the name of the table as loaded into HDFS
140 * @param outputTableName the name of the table to create in Hive.
141 * @param createOnly if true, run the CREATE TABLE statement but not
142 * LOAD DATA.
143 */
144 public void importTable(String inputTableName, String outputTableName,
145 boolean createOnly) throws IOException {
146
147 if (null == outputTableName) {
148 outputTableName = inputTableName;
149 }
150 LOG.debug("Hive.inputTable: " + inputTableName);
151 LOG.debug("Hive.outputTable: " + outputTableName);
152
153 // For testing purposes against our mock hive implementation,
154 // if the sysproperty "expected.script" is set, we set the EXPECTED_SCRIPT
155 // environment variable for the child hive process. We also disable
156 // timestamp comments so that we have deterministic table creation scripts.
157 String expectedScript = System.getProperty("expected.script");
158 List<String> env = Executor.getCurEnvpStrings();
159 boolean debugMode = expectedScript != null;
160 if (debugMode) {
161 env.add("EXPECTED_SCRIPT=" + expectedScript);
162 env.add("TMPDIR=" + options.getTempDir());
163 }
164
165 // generate the HQL statements to run.
166 TableDefWriter tableWriter = new TableDefWriter(options, connManager,
167 inputTableName, outputTableName,
168 configuration, !debugMode);
169 String createTableStr = tableWriter.getCreateTableStmt() + ";\n";
170 String loadDataStmtStr = tableWriter.getLoadDataStmt() + ";\n";
171 Path finalPath = tableWriter.getFinalPath();
172
173 if (!isGenerateOnly()) {
174 hiveClientCommon.removeTempLogs(configuration, finalPath);
175 LOG.info("Loading uploaded data into Hive");
176
177 hiveClientCommon.indexLzoFiles(options, finalPath);
178 }
179
180 // write them to a script file.
181 File scriptFile = getScriptFile(outputTableName);
182 try {
183 String filename = scriptFile.toString();
184 BufferedWriter w = null;
185 try {
186 FileOutputStream fos = new FileOutputStream(scriptFile);
187 w = new BufferedWriter(new OutputStreamWriter(fos));
188 w.write(createTableStr, 0, createTableStr.length());
189 if (!createOnly) {
190 w.write(loadDataStmtStr, 0, loadDataStmtStr.length());
191 }
192 } catch (IOException ioe) {
193 LOG.error("Error writing Hive load-in script: " + ioe.toString());
194 ioe.printStackTrace();
195 throw ioe;
196 } finally {
197 if (null != w) {
198 try {
199 w.close();
200 } catch (IOException ioe) {
201 LOG.warn("IOException closing stream to Hive script: "
202 + ioe.toString());
203 }
204 }
205 }
206
207 if (!isGenerateOnly()) {
208 executeScript(filename, env);
209
210 LOG.info("Hive import complete.");
211
212 hiveClientCommon.cleanUp(configuration, finalPath);
213 }
214 } finally {
215 if (!isGenerateOnly()) {
216 // User isn't interested in saving the DDL. Remove the file.
217 if (!scriptFile.delete()) {
218 LOG.warn("Could not remove temporary file: " + scriptFile.toString());
219 // try to delete the file later.
220 scriptFile.deleteOnExit();
221 }
222 }
223 }
224 }
225
226 @SuppressWarnings("unchecked")
227 /**
228 * Execute the script file via Hive.
229 * If Hive's jars are on the classpath, run it in the same process.
230 * Otherwise, execute the file with 'bin/hive'.
231 *
232 * @param filename The script file to run.
233 * @param env the environment strings to pass to any subprocess.
234 * @throws IOException if Hive did not exit successfully.
235 */
236 private void executeScript(String filename, List<String> env)
237 throws IOException {
238 SubprocessSecurityManager subprocessSM = null;
239
240 if (testMode) {
241 // We use external mock hive process for test mode as
242 // HCatalog dependency would have brought in Hive classes.
243 LOG.debug("Using external Hive process in test mode.");
244 executeExternalHiveScript(filename, env);
245 return;
246 }
247
248 try {
249 Class cliDriverClass = Class.forName(HIVE_MAIN_CLASS);
250
251 // We loaded the CLI Driver in this JVM, so we will just
252 // call it in-process. The CliDriver class has a method:
253 // void main(String [] args) throws Exception.
254 //
255 // We'll call that here to invoke 'hive -f scriptfile'.
256 // Because this method will call System.exit(), we use
257 // a SecurityManager to prevent this.
258 LOG.debug("Using in-process Hive instance.");
259
260 subprocessSM = new SubprocessSecurityManager();
261 subprocessSM.install();
262
263 String[] argv = getHiveArgs("-f", filename);
264
265 // And invoke the static method on this array.
266 Method mainMethod = cliDriverClass.getMethod("main", String[].class);
267 mainMethod.invoke(null, (Object) argv);
268
269 } catch (ClassNotFoundException cnfe) {
270 // Hive is not on the classpath. Run externally.
271 // This is not an error path.
272 LOG.debug("Using external Hive process.");
273 executeExternalHiveScript(filename, env);
274 } catch (NoSuchMethodException nsme) {
275 // Could not find a handle to the main() method.
276 throw new IOException("Could not access CliDriver.main()", nsme);
277 } catch (IllegalAccessException iae) {
278 // Error getting a handle on the main() method.
279 throw new IOException("Could not access CliDriver.main()", iae);
280 } catch (InvocationTargetException ite) {
281 // We ran CliDriver.main() and an exception was thrown from within Hive.
282 // This may have been the ExitSecurityException triggered by the
283 // SubprocessSecurityManager. If so, handle it. Otherwise, wrap in
284 // an IOException and rethrow.
285
286 Throwable cause = ite.getCause();
287 if (cause instanceof ExitSecurityException) {
288 ExitSecurityException ese = (ExitSecurityException) cause;
289 int status = ese.getExitStatus();
290 if (status != 0) {
291 throw new IOException("Hive CliDriver exited with status=" + status);
292 }
293 } else {
294 throw new IOException("Exception thrown in Hive", ite);
295 }
296 } finally {
297 if (null != subprocessSM) {
298 // Uninstall the SecurityManager used to trap System.exit().
299 subprocessSM.uninstall();
300 }
301 }
302 }
303
304 /**
305 * Execute Hive via an external 'bin/hive' process.
306 * @param filename the Script file to run.
307 * @param env the environment strings to pass to any subprocess.
308 * @throws IOException if Hive did not exit successfully.
309 */
310 private void executeExternalHiveScript(String filename, List<String> env)
311 throws IOException {
312 // run Hive on the script and note the return code.
313 String hiveExec = getHiveBinPath();
314
315 String[] argv = getHiveArgs(hiveExec, "-f", filename);
316
317 LoggingAsyncSink logSink = new LoggingAsyncSink(LOG);
318 int ret = Executor.exec(argv, env.toArray(new String[0]), logSink, logSink);
319 if (0 != ret) {
320 throw new IOException("Hive exited with status " + ret);
321 }
322 }
323
324 private String[] getHiveArgs(String... args) throws IOException {
325 List<String> newArgs = new LinkedList<String>();
326 newArgs.addAll(Arrays.asList(args));
327
328 HiveConfig.addHiveConfigs(HiveConfig.getHiveConf(configuration), configuration);
329
330 if (configuration.getBoolean(HiveConfig.HIVE_SASL_ENABLED, false)) {
331 newArgs.add("--hiveconf");
332 newArgs.add("hive.metastore.sasl.enabled=true");
333 }
334
335 return newArgs.toArray(new String[newArgs.size()]);
336 }
337
338 @Override
339 public void importTable() throws IOException {
340 importTable(options.getTableName(), options.getHiveTableName(), false);
341 }
342
343 @Override
344 public void createTable() throws IOException {
345 importTable(options.getTableName(), options.getHiveTableName(), true);
346 }
347
348 SqoopOptions getOptions() {
349 return options;
350 }
351
352 ConnManager getConnManager() {
353 return connManager;
354 }
355
356 Configuration getConfiguration() {
357 return configuration;
358 }
359
360 }
361