SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / test / com / cloudera / sqoop / testutil / ExportJobTestCase.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package com.cloudera.sqoop.testutil;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.PreparedStatement;
24 import java.sql.ResultSet;
25 import java.sql.SQLException;
26 import java.util.ArrayList;
27 import java.util.List;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.util.StringUtils;
33 import org.junit.Before;
34
35 import com.cloudera.sqoop.Sqoop;
36 import com.cloudera.sqoop.SqoopOptions;
37 import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
38 import com.cloudera.sqoop.tool.ExportTool;
39
40 /**
41 * Class that implements common methods required for tests which export data
42 * from HDFS to databases, to verify correct export.
43 */
44 public abstract class ExportJobTestCase extends BaseSqoopTestCase {
45
46 public static final Log LOG = LogFactory.getLog(
47 ExportJobTestCase.class.getName());
48
49 @Before
50 public void setUp() {
51 // start the server
52 super.setUp();
53
54 if (useHsqldbTestServer()) {
55 // throw away any existing data that might be in the database.
56 try {
57 this.getTestServer().dropExistingSchema();
58 } catch (SQLException sqlE) {
59 fail(sqlE.toString());
60 }
61 }
62 }
63
64 protected String getTablePrefix() {
65 return "EXPORT_TABLE_";
66 }
67
68 /**
69 * @return the maximum rows to fold into an INSERT statement.
70 * HSQLDB can only support the single-row INSERT syntax. Other databases
71 * can support greater numbers of rows per statement.
72 */
73 protected int getMaxRowsPerStatement() {
74 return 1;
75 }
76
77 /**
78 * Create the argv to pass to Sqoop.
79 * @param includeHadoopFlags if true, then include -D various.settings=values
80 * @param rowsPerStmt number of rows to export in a single INSERT statement.
81 * @param statementsPerTx ## of statements to use in a transaction.
82 * @return the argv as an array of strings.
83 */
84 protected String [] getArgv(boolean includeHadoopFlags,
85 int rowsPerStmt, int statementsPerTx, String... additionalArgv) {
86 ArrayList<String> args = new ArrayList<String>();
87
88 if (includeHadoopFlags) {
89 CommonArgs.addHadoopFlags(args);
90 args.add("-D");
91 int realRowsPerStmt = Math.min(rowsPerStmt, getMaxRowsPerStatement());
92 if (realRowsPerStmt != rowsPerStmt) {
93 LOG.warn("Rows per statement set to " + realRowsPerStmt
94 + " by getMaxRowsPerStatement() limit.");
95 }
96 args.add(ExportOutputFormat.RECORDS_PER_STATEMENT_KEY + "="
97 + realRowsPerStmt);
98 args.add("-D");
99 args.add(ExportOutputFormat.STATEMENTS_PER_TRANSACTION_KEY + "="
100 + statementsPerTx);
101 }
102
103 // Any additional Hadoop flags (-D foo=bar) are prepended.
104 if (null != additionalArgv) {
105 boolean prevIsFlag = false;
106 for (String arg : additionalArgv) {
107 if (arg.equals("-D")) {
108 args.add(arg);
109 prevIsFlag = true;
110 } else if (prevIsFlag) {
111 args.add(arg);
112 prevIsFlag = false;
113 }
114 }
115 }
116 boolean isHCatJob = false;
117 // The sqoop-specific additional args are then added.
118 if (null != additionalArgv) {
119 boolean prevIsFlag = false;
120 for (String arg : additionalArgv) {
121 if (arg.equals("-D")) {
122 prevIsFlag = true;
123 continue;
124 } else if (prevIsFlag) {
125 prevIsFlag = false;
126 continue;
127 } else {
128 // normal argument.
129 if (!isHCatJob && arg.equals("--hcatalog-table")) {
130 isHCatJob = true;
131 }
132 args.add(arg);
133 }
134 }
135 }
136
137 if (usesSQLtable()) {
138 args.add("--table");
139 args.add(getTableName());
140 }
141 // Only add export-dir if hcatalog-table is not there in additional argv
142 if (!isHCatJob) {
143 args.add("--export-dir");
144 args.add(getTablePath().toString());
145 }
146 args.add("--connect");
147 args.add(getConnectString());
148 args.add("--fields-terminated-by");
149 args.add("\\t");
150 args.add("--lines-terminated-by");
151 args.add("\\n");
152 args.add("-m");
153 args.add("1");
154
155 LOG.debug("args:");
156 for (String a : args) {
157 LOG.debug(" " + a);
158 }
159
160 return args.toArray(new String[0]);
161 }
162
163 protected boolean usesSQLtable() {
164 return true;
165 }
166
167 /** When exporting text columns, what should the text contain? */
168 protected String getMsgPrefix() {
169 return "textfield";
170 }
171
172
173 /** @return the minimum 'id' value in the table */
174 protected int getMinRowId(Connection conn) throws SQLException {
175 PreparedStatement statement = conn.prepareStatement(
176 "SELECT MIN(id) FROM " + getTableName(),
177 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
178 int minVal = 0;
179 try {
180 ResultSet rs = statement.executeQuery();
181 try {
182 rs.next();
183 minVal = rs.getInt(1);
184 } finally {
185 rs.close();
186 }
187 } finally {
188 statement.close();
189 }
190
191 return minVal;
192 }
193
194 /** @return the maximum 'id' value in the table */
195 protected int getMaxRowId(Connection conn) throws SQLException {
196 PreparedStatement statement = conn.prepareStatement(
197 "SELECT MAX(id) FROM " + getTableName(),
198 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
199 int maxVal = 0;
200 try {
201 ResultSet rs = statement.executeQuery();
202 try {
203 rs.next();
204 maxVal = rs.getInt(1);
205 } finally {
206 rs.close();
207 }
208 } finally {
209 statement.close();
210 }
211
212 return maxVal;
213 }
214
215 /**
216 * Check that we got back the expected row set.
217 * @param expectedNumRecords The number of records we expected to load
218 * into the database.
219 */
220 protected void verifyExport(int expectedNumRecords)
221 throws IOException, SQLException {
222 Connection conn = getConnection();
223 verifyExport(expectedNumRecords, conn);
224 }
225
226 /**
227 * Check that we got back the expected row set.
228 * @param expectedNumRecords The number of records we expected to load
229 * into the database.
230 * @param conn the db connection to use.
231 */
232 protected void verifyExport(int expectedNumRecords, Connection conn)
233 throws IOException, SQLException {
234 LOG.info("Verifying export: " + getTableName());
235 // Check that we got back the correct number of records.
236 PreparedStatement statement = conn.prepareStatement(
237 "SELECT COUNT(*) FROM " + getTableName(),
238 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
239 int actualNumRecords = 0;
240 ResultSet rs = null;
241 try {
242 rs = statement.executeQuery();
243 try {
244 rs.next();
245 actualNumRecords = rs.getInt(1);
246 } finally {
247 rs.close();
248 }
249 } finally {
250 statement.close();
251 }
252
253 assertEquals("Got back unexpected row count", expectedNumRecords,
254 actualNumRecords);
255
256 if (expectedNumRecords == 0) {
257 return; // Nothing more to verify.
258 }
259
260 // Check that we start with row 0.
261 int minVal = getMinRowId(conn);
262 assertEquals("Minimum row was not zero", 0, minVal);
263
264 // Check that the last row we loaded is numRows - 1
265 int maxVal = getMaxRowId(conn);
266 assertEquals("Maximum row had invalid id", expectedNumRecords - 1, maxVal);
267
268 // Check that the string values associated with these points match up.
269 statement = conn.prepareStatement("SELECT msg FROM " + getTableName()
270 + " WHERE id = " + minVal,
271 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
272 String minMsg = "";
273 try {
274 rs = statement.executeQuery();
275 try {
276 rs.next();
277 minMsg = rs.getString(1);
278 } finally {
279 rs.close();
280 }
281 } finally {
282 statement.close();
283 }
284
285 assertEquals("Invalid msg field for min value", getMsgPrefix() + minVal,
286 minMsg);
287
288 statement = conn.prepareStatement("SELECT msg FROM " + getTableName()
289 + " WHERE id = " + maxVal,
290 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
291 String maxMsg = "";
292 try {
293 rs = statement.executeQuery();
294 try {
295 rs.next();
296 maxMsg = rs.getString(1);
297 } finally {
298 rs.close();
299 }
300 } finally {
301 statement.close();
302 }
303
304 assertEquals("Invalid msg field for min value", getMsgPrefix() + maxVal,
305 maxMsg);
306 }
307
308 /**
309 * Run a MapReduce-based export (using the argv provided to control
310 * execution).
311 * @return the generated jar filename
312 */
313 protected List<String> runExport(String [] argv) throws IOException {
314 // run the tool through the normal entry-point.
315 int ret;
316 List<String> generatedJars = null;
317 try {
318 ExportTool exporter = new ExportTool();
319 Configuration conf = getConf();
320 SqoopOptions opts = getSqoopOptions(conf);
321 Sqoop sqoop = new Sqoop(exporter, conf, opts);
322 ret = Sqoop.runSqoop(sqoop, argv);
323 generatedJars = exporter.getGeneratedJarFiles();
324 } catch (Exception e) {
325 LOG.error("Got exception running Sqoop: "
326 + StringUtils.stringifyException(e));
327 ret = 1;
328 }
329
330 // expect a successful return.
331 if (0 != ret) {
332 throw new IOException("Failure during job; return status " + ret);
333 }
334
335 return generatedJars;
336 }
337
338 }