SQOOP-931: Integrate HCatalog with Sqoop
[sqoop.git] / src / test / org / apache / sqoop / hcat / HCatalogTestUtils.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.sqoop.hcat;
20
21 import java.io.IOException;
22 import java.sql.Connection;
23 import java.sql.PreparedStatement;
24 import java.sql.ResultSet;
25 import java.sql.SQLException;
26 import java.sql.Types;
27 import java.util.ArrayList;
28 import java.util.HashMap;
29 import java.util.List;
30 import java.util.Map;
31
32 import org.apache.commons.logging.Log;
33 import org.apache.commons.logging.LogFactory;
34 import org.apache.hadoop.conf.Configuration;
35 import org.apache.hadoop.fs.FSDataOutputStream;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.hive.metastore.api.MetaException;
39 import org.apache.hadoop.io.BytesWritable;
40 import org.apache.hadoop.io.LongWritable;
41 import org.apache.hadoop.io.Text;
42 import org.apache.hadoop.io.WritableComparable;
43 import org.apache.hadoop.mapreduce.Job;
44 import org.apache.hadoop.mapreduce.Mapper;
45 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
46 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
47 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
48 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
49 import org.apache.hcatalog.data.DefaultHCatRecord;
50 import org.apache.hcatalog.data.HCatRecord;
51 import org.apache.hcatalog.data.schema.HCatFieldSchema;
52 import org.apache.hcatalog.data.schema.HCatSchema;
53 import org.apache.hcatalog.mapreduce.HCatInputFormat;
54 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
55 import org.apache.hcatalog.mapreduce.OutputJobInfo;
56 import org.apache.sqoop.config.ConfigurationConstants;
57 import org.apache.sqoop.mapreduce.hcat.SqoopHCatUtilities;
58 import org.junit.Assert;
59
60 import com.cloudera.sqoop.SqoopOptions;
61 import com.cloudera.sqoop.testutil.BaseSqoopTestCase;
62 import com.cloudera.sqoop.testutil.CommonArgs;
63
64 /**
65 * HCatalog common test utilities.
66 *
67 */
68 public final class HCatalogTestUtils {
69 protected Configuration conf;
70 private static List<HCatRecord> recsToLoad = new ArrayList<HCatRecord>();
71 private static List<HCatRecord> recsRead = new ArrayList<HCatRecord>();
72 private static final Log LOG = LogFactory.getLog(HCatalogTestUtils.class);
73 private FileSystem fs;
74 private final SqoopHCatUtilities utils = SqoopHCatUtilities.instance();
75 private static final double DELTAVAL = 1e-10;
76 public static final String SQOOP_HCATALOG_TEST_ARGS =
77 "sqoop.hcatalog.test.args";
78 private final boolean initialized = false;
79 private static String storageInfo = null;
80 public static final String STORED_AS_RCFILE = "stored as\n\trcfile\n";
81 public static final String STORED_AS_SEQFILE = "stored as\n\tsequencefile\n";
82 public static final String STORED_AS_TEXT = "stored as\n\ttextfile\n";
83
84 private HCatalogTestUtils() {
85 }
86
87 private static final class Holder {
88 @SuppressWarnings("synthetic-access")
89 private static final HCatalogTestUtils INSTANCE = new HCatalogTestUtils();
90
91 private Holder() {
92 }
93 }
94
95 @SuppressWarnings("synthetic-access")
96 public static HCatalogTestUtils instance() {
97 return Holder.INSTANCE;
98 }
99
100 public void initUtils() throws IOException, MetaException {
101 if (initialized) {
102 return;
103 }
104 conf = new Configuration();
105 if (!BaseSqoopTestCase.isOnPhysicalCluster()) {
106 conf.set(CommonArgs.FS_DEFAULT_NAME, CommonArgs.LOCAL_FS);
107 }
108 fs = FileSystem.get(conf);
109 fs.initialize(fs.getWorkingDirectory().toUri(), conf);
110 storageInfo = null;
111 SqoopHCatUtilities.setTestMode(true);
112 }
113
114 public static String getStorageInfo() {
115 if (null != storageInfo && storageInfo.length() > 0) {
116 return storageInfo;
117 } else {
118 return STORED_AS_RCFILE;
119 }
120 }
121
122 public void setStorageInfo(String info) {
123 storageInfo = info;
124 }
125
126 private static String getDropTableCmd(final String dbName,
127 final String tableName) {
128 return "DROP TABLE IF EXISTS " + dbName.toLowerCase() + "."
129 + tableName.toLowerCase();
130 }
131
132 private static String getHCatCreateTableCmd(String dbName,
133 String tableName, List<HCatFieldSchema> tableCols,
134 List<HCatFieldSchema> partKeys) {
135 StringBuilder sb = new StringBuilder();
136 sb.append("create table ").append(dbName.toLowerCase()).append('.');
137 sb.append(tableName.toLowerCase()).append(" (\n\t");
138 for (int i = 0; i < tableCols.size(); ++i) {
139 HCatFieldSchema hfs = tableCols.get(i);
140 if (i > 0) {
141 sb.append(",\n\t");
142 }
143 sb.append(hfs.getName().toLowerCase());
144 sb.append(' ').append(hfs.getTypeString());
145 }
146 sb.append(")\n");
147 if (partKeys != null && partKeys.size() > 0) {
148 sb.append("partitioned by (\n\t");
149 for (int i = 0; i < partKeys.size(); ++i) {
150 HCatFieldSchema hfs = partKeys.get(i);
151 if (i > 0) {
152 sb.append("\n\t,");
153 }
154 sb.append(hfs.getName().toLowerCase());
155 sb.append(' ').append(hfs.getTypeString());
156 }
157 sb.append(")\n");
158 }
159 sb.append(getStorageInfo());
160 LOG.info("Create table command : " + sb);
161 return sb.toString();
162 }
163
164 /**
165 * The record writer mapper for HCatalog tables that writes records from an in
166 * memory list.
167 */
168 public void createHCatTableUsingSchema(String dbName,
169 String tableName, List<HCatFieldSchema> tableCols,
170 List<HCatFieldSchema> partKeys)
171 throws Exception {
172
173 String databaseName = dbName == null
174 ? SqoopHCatUtilities.DEFHCATDB : dbName;
175 LOG.info("Dropping HCatalog table if it exists " + databaseName
176 + '.' + tableName);
177 String dropCmd = getDropTableCmd(databaseName, tableName);
178
179 try {
180 utils.launchHCatCli(dropCmd);
181 } catch (Exception e) {
182 LOG.debug("Drop hcatalog table exception : " + e);
183 LOG.info("Unable to drop table." + dbName + "."
184 + tableName + ". Assuming it did not exist");
185 }
186 LOG.info("Creating HCatalog table if it exists " + databaseName
187 + '.' + tableName);
188 String createCmd = getHCatCreateTableCmd(databaseName, tableName,
189 tableCols, partKeys);
190 utils.launchHCatCli(createCmd);
191 LOG.info("Created HCatalog table " + dbName + "." + tableName);
192 }
193
194 /**
195 * The record writer mapper for HCatalog tables that writes records from an in
196 * memory list.
197 */
198 public static class HCatWriterMapper extends
199 Mapper<LongWritable, Text, BytesWritable, HCatRecord> {
200
201 private static int writtenRecordCount = 0;
202
203 public static int getWrittenRecordCount() {
204 return writtenRecordCount;
205 }
206
207 public static void setWrittenRecordCount(int count) {
208 HCatWriterMapper.writtenRecordCount = count;
209 }
210
211 @Override
212 public void map(LongWritable key, Text value,
213 Context context)
214 throws IOException, InterruptedException {
215 try {
216 HCatRecord rec = recsToLoad.get(writtenRecordCount);
217 context.write(null, rec);
218 writtenRecordCount++;
219 } catch (Exception e) {
220 if (LOG.isDebugEnabled()) {
221 e.printStackTrace(System.err);
222 }
223 throw new IOException(e);
224 }
225 }
226 }
227
228 /**
229 * The record reader mapper for HCatalog tables that reads records into an in
230 * memory list.
231 */
232 public static class HCatReaderMapper extends
233 Mapper<WritableComparable, HCatRecord, BytesWritable, Text> {
234
235 private static int readRecordCount = 0; // test will be in local mode
236
237 public static int getReadRecordCount() {
238 return readRecordCount;
239 }
240
241 public static void setReadRecordCount(int count) {
242 HCatReaderMapper.readRecordCount = count;
243 }
244
245 @Override
246 public void map(WritableComparable key, HCatRecord value,
247 Context context) throws IOException, InterruptedException {
248 try {
249 recsRead.add(value);
250 readRecordCount++;
251 } catch (Exception e) {
252 if (LOG.isDebugEnabled()) {
253 e.printStackTrace(System.err);
254 }
255 throw new IOException(e);
256 }
257 }
258 }
259
260 private void createInputFile(Path path, int rowCount)
261 throws IOException {
262 if (fs.exists(path)) {
263 fs.delete(path, true);
264 }
265 FSDataOutputStream os = fs.create(path);
266 for (int i = 0; i < rowCount; i++) {
267 String s = i + "\n";
268 os.writeChars(s);
269 }
270 os.close();
271 }
272
273 public List<HCatRecord> loadHCatTable(String dbName,
274 String tableName, Map<String, String> partKeyMap,
275 HCatSchema tblSchema, List<HCatRecord> records)
276 throws Exception {
277
278 Job job = new Job(conf, "HCat load job");
279
280 job.setJarByClass(this.getClass());
281 job.setMapperClass(HCatWriterMapper.class);
282
283
284 // Just writ 10 lines to the file to drive the mapper
285 Path path = new Path(fs.getWorkingDirectory(),
286 "mapreduce/HCatTableIndexInput");
287
288 job.getConfiguration()
289 .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
290 int writeCount = records.size();
291 recsToLoad.clear();
292 recsToLoad.addAll(records);
293 createInputFile(path, writeCount);
294 // input/output settings
295 HCatWriterMapper.setWrittenRecordCount(0);
296
297 FileInputFormat.setInputPaths(job, path);
298 job.setInputFormatClass(TextInputFormat.class);
299 job.setOutputFormatClass(HCatOutputFormat.class);
300 OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
301 partKeyMap);
302
303 HCatOutputFormat.setOutput(job, outputJobInfo);
304 HCatOutputFormat.setSchema(job, tblSchema);
305 job.setMapOutputKeyClass(BytesWritable.class);
306 job.setMapOutputValueClass(DefaultHCatRecord.class);
307
308 job.setNumReduceTasks(0);
309 SqoopHCatUtilities.addJars(job, new SqoopOptions());
310 boolean success = job.waitForCompletion(true);
311
312 if (!success) {
313 throw new IOException("Loading HCatalog table with test records failed");
314 }
315 utils.invokeOutputCommitterForLocalMode(job);
316 LOG.info("Loaded " + HCatWriterMapper.writtenRecordCount + " records");
317 return recsToLoad;
318 }
319
320 /**
321 * Run a local map reduce job to read records from HCatalog table.
322 * @param readCount
323 * @param filter
324 * @return
325 * @throws Exception
326 */
327 public List<HCatRecord> readHCatRecords(String dbName,
328 String tableName, String filter) throws Exception {
329
330 HCatReaderMapper.setReadRecordCount(0);
331 recsRead.clear();
332
333 // Configuration conf = new Configuration();
334 Job job = new Job(conf, "HCatalog reader job");
335 job.setJarByClass(this.getClass());
336 job.setMapperClass(HCatReaderMapper.class);
337 job.getConfiguration()
338 .setInt(ConfigurationConstants.PROP_MAPRED_MAP_TASKS, 1);
339 // input/output settings
340 job.setInputFormatClass(HCatInputFormat.class);
341 job.setOutputFormatClass(TextOutputFormat.class);
342
343 HCatInputFormat.setInput(job, dbName, tableName).setFilter(filter);
344
345 job.setMapOutputKeyClass(BytesWritable.class);
346 job.setMapOutputValueClass(Text.class);
347
348 job.setNumReduceTasks(0);
349
350 Path path = new Path(fs.getWorkingDirectory(),
351 "mapreduce/HCatTableIndexOutput");
352 if (fs.exists(path)) {
353 fs.delete(path, true);
354 }
355
356 FileOutputFormat.setOutputPath(job, path);
357
358 job.waitForCompletion(true);
359 LOG.info("Read " + HCatReaderMapper.readRecordCount + " records");
360
361 return recsRead;
362 }
363
364 /**
365 * An enumeration type to hold the partition key type of the ColumnGenerator
366 * defined columns.
367 */
368 public enum KeyType {
369 NOT_A_KEY,
370 STATIC_KEY,
371 DYNAMIC_KEY
372 };
373
374 /**
375 * An enumeration type to hold the creation mode of the HCatalog table.
376 */
377 public enum CreateMode {
378 NO_CREATION,
379 CREATE,
380 CREATE_AND_LOAD,
381 };
382
383 /**
384 * When generating data for export tests, each column is generated according
385 * to a ColumnGenerator.
386 */
387 public interface ColumnGenerator {
388 /*
389 * The column name
390 */
391 String getName();
392
393 /**
394 * For a row with id rowNum, what should we write into that HCatalog column
395 * to export?
396 */
397 Object getHCatValue(int rowNum);
398
399 /**
400 * For a row with id rowNum, what should the database return for the given
401 * column's value?
402 */
403 Object getDBValue(int rowNum);
404
405 /** Return the column type to put in the CREATE TABLE statement. */
406 String getDBTypeString();
407
408 /** Return the SqlType for this column. */
409 int getSqlType();
410
411 /** Return the HCat type for this column. */
412 HCatFieldSchema.Type getHCatType();
413
414
415 /**
416 * If the field is a partition key, then whether is part of the static
417 * partitioning specification in imports or exports. Only one key can be a
418 * static partitioning key. After the first column marked as static, rest of
419 * the keys will be considered dynamic even if they are marked static.
420 */
421 KeyType getKeyType();
422 }
423
424 /**
425 * Return the column name for a column index. Each table contains two columns
426 * named 'id' and 'msg', and then an arbitrary number of additional columns
427 * defined by ColumnGenerators. These columns are referenced by idx 0, 1, 2
428 * and on.
429 * @param idx
430 * the index of the ColumnGenerator in the array passed to
431 * createTable().
432 * @return the name of the column
433 */
434 public static String forIdx(int idx) {
435 return "col" + idx;
436 }
437
438 public static ColumnGenerator colGenerator(final String name,
439 final String dbType, final int sqlType,
440 final HCatFieldSchema.Type hCatType, final Object hCatValue,
441 final Object dbValue, final KeyType keyType) {
442 return new ColumnGenerator() {
443
444 @Override
445 public String getName() {
446 return name;
447 }
448
449 @Override
450 public Object getDBValue(int rowNum) {
451 return dbValue;
452 }
453
454 @Override
455 public Object getHCatValue(int rowNum) {
456 return hCatValue;
457 }
458
459 @Override
460 public String getDBTypeString() {
461 return dbType;
462 }
463
464 @Override
465 public int getSqlType() {
466 return sqlType;
467 }
468
469 @Override
470 public HCatFieldSchema.Type getHCatType() {
471 return hCatType;
472 }
473
474 public KeyType getKeyType() {
475 return keyType;
476 }
477
478 };
479 }
480
481 public static void assertEquals(Object expectedVal,
482 Object actualVal) {
483
484 if (expectedVal != null && expectedVal instanceof byte[]) {
485 Assert
486 .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
487 } else {
488 if (expectedVal instanceof Float) {
489 if (actualVal instanceof Double) {
490 Assert.assertEquals(((Float) expectedVal).floatValue(),
491 ((Double) actualVal).doubleValue(), DELTAVAL);
492 } else {
493 Assert
494 .assertEquals("Got unexpected column value", expectedVal,
495 actualVal);
496 }
497 } else if (expectedVal instanceof Double) {
498 if (actualVal instanceof Float) {
499 Assert.assertEquals(((Double) expectedVal).doubleValue(),
500 ((Float) actualVal).doubleValue(), DELTAVAL);
501 } else {
502 Assert
503 .assertEquals("Got unexpected column value", expectedVal,
504 actualVal);
505 }
506 } else {
507 Assert
508 .assertEquals("Got unexpected column value", expectedVal,
509 actualVal);
510 }
511 }
512 }
513
514 /**
515 * Verify that on a given row, a column has a given value.
516 *
517 * @param id
518 * the id column specifying the row to test.
519 */
520 public void assertSqlColValForRowId(Connection conn,
521 String table, int id, String colName,
522 Object expectedVal) throws SQLException {
523 LOG.info("Verifying column " + colName + " has value " + expectedVal);
524
525 PreparedStatement statement = conn.prepareStatement(
526 "SELECT " + colName + " FROM " + table + " WHERE id = " + id,
527 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
528 Object actualVal = null;
529 try {
530 ResultSet rs = statement.executeQuery();
531 try {
532 rs.next();
533 actualVal = rs.getObject(1);
534 } finally {
535 rs.close();
536 }
537 } finally {
538 statement.close();
539 }
540
541 assertEquals(expectedVal, actualVal);
542 }
543
544 /**
545 * Verify that on a given row, a column has a given value.
546 *
547 * @param id
548 * the id column specifying the row to test.
549 */
550 public static void assertHCatColValForRowId(List<HCatRecord> recs,
551 HCatSchema schema, int id, String fieldName,
552 Object expectedVal) throws IOException {
553 LOG.info("Verifying field " + fieldName + " has value " + expectedVal);
554
555 Object actualVal = null;
556 for (HCatRecord rec : recs) {
557 if (rec.getInteger("id", schema).equals(id)) {
558 actualVal = rec.get(fieldName, schema);
559 break;
560 }
561 }
562 if (actualVal == null) {
563 throw new IOException("No record found with id = " + id);
564 }
565 if (expectedVal != null && expectedVal instanceof byte[]) {
566 Assert
567 .assertArrayEquals((byte[]) expectedVal, (byte[]) actualVal);
568 } else {
569 if (expectedVal instanceof Float) {
570 if (actualVal instanceof Double) {
571 Assert.assertEquals(((Float) expectedVal).floatValue(),
572 ((Double) actualVal).doubleValue(), DELTAVAL);
573 } else {
574 Assert
575 .assertEquals("Got unexpected column value", expectedVal,
576 actualVal);
577 }
578 } else if (expectedVal instanceof Double) {
579 if (actualVal instanceof Float) {
580 Assert.assertEquals(((Double) expectedVal).doubleValue(),
581 ((Float) actualVal).doubleValue(), DELTAVAL);
582 } else {
583 Assert
584 .assertEquals("Got unexpected column value", expectedVal,
585 actualVal);
586 }
587 } else {
588 Assert
589 .assertEquals("Got unexpected column value", expectedVal,
590 actualVal);
591 }
592 }
593 }
594
595 /**
596 * Return a SQL statement that drops a table, if it exists.
597 *
598 * @param tableName
599 * the table to drop.
600 * @return the SQL statement to drop that table.
601 */
602 public static String getSqlDropTableStatement(String tableName) {
603 return "DROP TABLE " + tableName + " IF EXISTS";
604 }
605
606 public static String getSqlCreateTableStatement(String tableName,
607 ColumnGenerator... extraCols) {
608 StringBuilder sb = new StringBuilder();
609 sb.append("CREATE TABLE ");
610 sb.append(tableName);
611 sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
612 int colNum = 0;
613 for (ColumnGenerator gen : extraCols) {
614 sb.append(", " + forIdx(colNum++) + " " + gen.getDBTypeString());
615 }
616 sb.append(")");
617 String cmd = sb.toString();
618 LOG.debug("Generated SQL create table command : " + cmd);
619 return cmd;
620 }
621
622 public static String getSqlInsertTableStatement(String tableName,
623 ColumnGenerator... extraCols) {
624 StringBuilder sb = new StringBuilder();
625 sb.append("INSERT INTO ");
626 sb.append(tableName);
627 sb.append(" (id, msg");
628 int colNum = 0;
629 for (ColumnGenerator gen : extraCols) {
630 sb.append(", " + forIdx(colNum++));
631 }
632 sb.append(") VALUES ( ?, ?");
633 for (int i = 0; i < extraCols.length; ++i) {
634 sb.append(",?");
635 }
636 sb.append(")");
637 String s = sb.toString();
638 LOG.debug("Generated SQL insert table command : " + s);
639 return s;
640 }
641
642 public void createSqlTable(Connection conn, boolean generateOnly,
643 int count, String table, ColumnGenerator... extraCols)
644 throws Exception {
645 PreparedStatement statement = conn.prepareStatement(
646 getSqlDropTableStatement(table),
647 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
648 try {
649 statement.executeUpdate();
650 conn.commit();
651 } finally {
652 statement.close();
653 }
654 statement = conn.prepareStatement(
655 getSqlCreateTableStatement(table, extraCols),
656 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
657 try {
658 statement.executeUpdate();
659 conn.commit();
660 } finally {
661 statement.close();
662 }
663 if (!generateOnly) {
664 loadSqlTable(conn, table, count, extraCols);
665 }
666 }
667
668 public HCatSchema createHCatTable(CreateMode mode, int count,
669 String table, ColumnGenerator... extraCols)
670 throws Exception {
671 HCatSchema hCatTblSchema = generateHCatTableSchema(extraCols);
672 HCatSchema hCatPartSchema = generateHCatPartitionSchema(extraCols);
673 HCatSchema hCatFullSchema = new HCatSchema(hCatTblSchema.getFields());
674 for (HCatFieldSchema hfs : hCatPartSchema.getFields()) {
675 hCatFullSchema.append(hfs);
676 }
677 if (mode != CreateMode.NO_CREATION) {
678
679 createHCatTableUsingSchema(null, table,
680 hCatTblSchema.getFields(), hCatPartSchema.getFields());
681 if (mode == CreateMode.CREATE_AND_LOAD) {
682 HCatSchema hCatLoadSchema = new HCatSchema(hCatTblSchema.getFields());
683 HCatSchema dynPartSchema =
684 generateHCatDynamicPartitionSchema(extraCols);
685 for (HCatFieldSchema hfs : dynPartSchema.getFields()) {
686 hCatLoadSchema.append(hfs);
687 }
688 loadHCatTable(hCatLoadSchema, table, count, extraCols);
689 }
690 }
691 return hCatFullSchema;
692 }
693
694 private void loadHCatTable(HCatSchema hCatSchema, String table,
695 int count, ColumnGenerator... extraCols)
696 throws Exception {
697 Map<String, String> staticKeyMap = new HashMap<String, String>();
698 for (ColumnGenerator col : extraCols) {
699 if (col.getKeyType() == KeyType.STATIC_KEY) {
700 staticKeyMap.put(col.getName(), (String) col.getHCatValue(0));
701 }
702 }
703 loadHCatTable(null, table, staticKeyMap,
704 hCatSchema, generateHCatRecords(count, hCatSchema, extraCols));
705 }
706
707 private void loadSqlTable(Connection conn, String table, int count,
708 ColumnGenerator... extraCols) throws Exception {
709 PreparedStatement statement = conn.prepareStatement(
710 getSqlInsertTableStatement(table, extraCols),
711 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
712 try {
713 for (int i = 0; i < count; ++i) {
714 statement.setObject(1, i, Types.INTEGER);
715 statement.setObject(2, "textfield" + i, Types.VARCHAR);
716 for (int j = 0; j < extraCols.length; ++j) {
717 statement.setObject(j + 3, extraCols[j].getDBValue(i),
718 extraCols[j].getSqlType());
719 }
720 statement.executeUpdate();
721 }
722 if (!conn.getAutoCommit()) {
723 conn.commit();
724 }
725 } finally {
726 statement.close();
727 }
728 }
729
730 private HCatSchema generateHCatTableSchema(ColumnGenerator... extraCols)
731 throws Exception {
732 List<HCatFieldSchema> hCatTblCols = new ArrayList<HCatFieldSchema>();
733 hCatTblCols.clear();
734 hCatTblCols.add(new HCatFieldSchema("id", HCatFieldSchema.Type.INT, ""));
735 hCatTblCols
736 .add(new HCatFieldSchema("msg", HCatFieldSchema.Type.STRING, ""));
737 for (ColumnGenerator gen : extraCols) {
738 if (gen.getKeyType() == KeyType.NOT_A_KEY) {
739 hCatTblCols
740 .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
741 }
742 }
743 HCatSchema hCatTblSchema = new HCatSchema(hCatTblCols);
744 return hCatTblSchema;
745 }
746
747 private HCatSchema generateHCatPartitionSchema(ColumnGenerator... extraCols)
748 throws Exception {
749 List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
750
751 for (ColumnGenerator gen : extraCols) {
752 if (gen.getKeyType() != KeyType.NOT_A_KEY) {
753 hCatPartCols
754 .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
755 }
756 }
757 HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
758 return hCatPartSchema;
759 }
760
761 private HCatSchema generateHCatDynamicPartitionSchema(
762 ColumnGenerator... extraCols) throws Exception {
763 List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
764 hCatPartCols.clear();
765 boolean staticFound = false;
766 for (ColumnGenerator gen : extraCols) {
767 if (gen.getKeyType() != KeyType.NOT_A_KEY) {
768 if (gen.getKeyType() == KeyType.STATIC_KEY && !staticFound) {
769 staticFound = true;
770 continue;
771 }
772 hCatPartCols
773 .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
774 }
775 }
776 HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
777 return hCatPartSchema;
778
779 }
780
781 private HCatSchema generateHCatStaticPartitionSchema(
782 ColumnGenerator... extraCols) throws Exception {
783 List<HCatFieldSchema> hCatPartCols = new ArrayList<HCatFieldSchema>();
784 hCatPartCols.clear();
785 for (ColumnGenerator gen : extraCols) {
786 if (gen.getKeyType() == KeyType.STATIC_KEY) {
787 hCatPartCols
788 .add(new HCatFieldSchema(gen.getName(), gen.getHCatType(), ""));
789 break;
790 }
791 }
792 HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
793 return hCatPartSchema;
794 }
795
796 private List<HCatRecord> generateHCatRecords(int numRecords,
797 HCatSchema hCatTblSchema, ColumnGenerator... extraCols) throws Exception {
798 List<HCatRecord> records = new ArrayList<HCatRecord>();
799 List<HCatFieldSchema> hCatTblCols = hCatTblSchema.getFields();
800 int size = hCatTblCols.size();
801 for (int i = 0; i < numRecords; ++i) {
802 DefaultHCatRecord record = new DefaultHCatRecord(size);
803 record.set(hCatTblCols.get(0).getName(), hCatTblSchema, i);
804 record.set(hCatTblCols.get(1).getName(), hCatTblSchema, "textfield" + i);
805 boolean staticFound = false;
806 int idx = 0;
807 for (int j = 0; j < extraCols.length; ++j) {
808 if (extraCols[j].getKeyType() == KeyType.STATIC_KEY
809 && !staticFound) {
810 staticFound = true;
811 continue;
812 }
813 record.set(hCatTblCols.get(idx + 2).getName(), hCatTblSchema,
814 extraCols[j].getHCatValue(i));
815 ++idx;
816 }
817
818 records.add(record);
819 }
820 return records;
821 }
822
823 public String hCatRecordDump(List<HCatRecord> recs,
824 HCatSchema schema) throws Exception {
825 List<String> fields = schema.getFieldNames();
826 int count = 0;
827 StringBuilder sb = new StringBuilder(1024);
828 for (HCatRecord rec : recs) {
829 sb.append("HCat Record : " + ++count).append('\n');
830 for (String field : fields) {
831 sb.append('\t').append(field).append('=');
832 sb.append(rec.get(field, schema)).append('\n');
833 sb.append("\n\n");
834 }
835 }
836 return sb.toString();
837 }
838
839 public Map<String, String> getAddlTestArgs() {
840 String addlArgs = System.getProperty(SQOOP_HCATALOG_TEST_ARGS);
841 Map<String, String> addlArgsMap = new HashMap<String, String>();
842 if (addlArgs != null) {
843 String[] argsArray = addlArgs.split(",");
844 for (String s : argsArray) {
845 String[] keyVal = s.split("=");
846 if (keyVal.length == 2) {
847 addlArgsMap.put(keyVal[0], keyVal[1]);
848 } else {
849 LOG.info("Ignoring malformed addl arg " + s);
850 }
851 }
852 }
853 return addlArgsMap;
854 }
855 }