SQOOP-2754: Sqoop2: Fix bug in integration test for Mysql and Postgresql
[sqoop.git] / repository / repository-mysql / src / test / java / org / apache / sqoop / integration / repository / mysql / TestJobHandling.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.integration.repository.mysql;
19
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.List;
23 import java.util.Map;
24
25 import org.apache.sqoop.common.SqoopException;
26 import org.apache.sqoop.common.test.db.TableName;
27 import org.apache.sqoop.model.MConfig;
28 import org.apache.sqoop.model.MConnector;
29 import org.apache.sqoop.model.MJob;
30 import org.apache.sqoop.model.MLink;
31 import org.apache.sqoop.model.MMapInput;
32 import org.apache.sqoop.model.MStringInput;
33 import org.apache.sqoop.model.MSubmission;
34 import org.apache.sqoop.submission.SubmissionStatus;
35 import org.testng.Assert;
36 import org.testng.annotations.BeforeMethod;
37 import org.testng.annotations.Test;
38
39 import static org.testng.Assert.assertEquals;
40 import static org.testng.Assert.assertNull;
41 import static org.testng.Assert.assertFalse;
42 import static org.testng.Assert.assertNotNull;
43 import static org.testng.Assert.assertTrue;
44
45 /**
46 * Test driver methods on MySql repository.
47 */
48 @Test(groups = "mysql")
49 public class TestJobHandling extends MySqlTestCase {
50
51 public static final String CONNECTOR_A_NAME = "A";
52 public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
53 public static final String CONNECTOR_A_VERSION = "1.0-test";
54 public static final String CONNECTOR_B_NAME = "B";
55 public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
56 public static final String CONNECTOR_B_VERSION = "1.0-test";
57 public static final String LINK_A_NAME = "Link-A";
58 public static final String LINK_B_NAME = "Link-B";
59 public static final String JOB_A_NAME = "Job-A";
60 public static final String JOB_B_NAME = "Job-B";
61
62 @BeforeMethod(alwaysRun = true)
63 public void setUp() throws Exception {
64 super.setUp();
65
66 handler.registerDriver(getDriver(), provider.getConnection());
67 MConnector connectorA = getConnector(CONNECTOR_A_NAME,
68 CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
69 MConnector connectorB = getConnector(CONNECTOR_B_NAME,
70 CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
71 handler.registerConnector(connectorA, provider.getConnection());
72 handler.registerConnector(connectorB, provider.getConnection());
73 MLink linkA = getLink(LINK_A_NAME, connectorA);
74 MLink linkB = getLink(LINK_B_NAME, connectorB);
75 handler.createLink(linkA, provider.getConnection());
76 handler.createLink(linkB, provider.getConnection());
77 handler.createJob(getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB),
78 provider.getConnection());
79 handler.createJob(getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA),
80 provider.getConnection());
81 }
82
83 @Test
84 public void testFindJobFail() throws Exception {
85 for (MJob job : handler.findJobs(provider.getConnection())) {
86 handler.deleteJob(job.getName(), provider.getConnection());
87 }
88
89 // Let's try to find non existing job
90 assertNull(handler.findJob(1, provider.getConnection()));
91 }
92
93 @Test
94 public void testFindJobSuccess() throws Exception {
95 MJob firstJob = handler.findJob(1, provider.getConnection());
96 assertNotNull(firstJob);
97 assertEquals(1, firstJob.getPersistenceId());
98 assertEquals(JOB_A_NAME, firstJob.getName());
99
100 List<MConfig> configs;
101
102 configs = firstJob.getFromJobConfig().getConfigs();
103 assertEquals(2, configs.size());
104 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
105 assertNull(configs.get(0).getInputs().get(1).getValue());
106 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
107 assertNull(configs.get(1).getInputs().get(1).getValue());
108
109 configs = firstJob.getToJobConfig().getConfigs();
110 assertEquals(2, configs.size());
111 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
112 assertNull(configs.get(0).getInputs().get(1).getValue());
113 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
114 assertNull(configs.get(1).getInputs().get(1).getValue());
115
116 configs = firstJob.getDriverConfig().getConfigs();
117 assertEquals(2, configs.size());
118 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
119 assertNull(configs.get(0).getInputs().get(1).getValue());
120 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
121 assertNull(configs.get(1).getInputs().get(1).getValue());
122 }
123
124 @Test
125 public void testFindJobs() throws Exception {
126 List<MJob> list;
127
128 list = handler.findJobs(provider.getConnection());
129 assertEquals(2, list.size());
130 assertEquals(JOB_A_NAME, list.get(0).getName());
131 assertEquals(JOB_B_NAME, list.get(1).getName());
132
133 // Delete jobs
134 for (MJob job : handler.findJobs(provider.getConnection())) {
135 handler.deleteJob(job.getName(), provider.getConnection());
136 }
137
138 // Load all two links on loaded repository
139 list = handler.findJobs(provider.getConnection());
140 assertEquals(0, list.size());
141 }
142
143 @Test
144 public void testFindJobsByConnector() throws Exception {
145 List<MJob> list = handler
146 .findJobsForConnectorUpgrade(
147 handler.findConnector("A", provider.getConnection())
148 .getPersistenceId(), provider.getConnection());
149 List<String> jobNames = new ArrayList<String>();
150 for (MJob job : list) {
151 jobNames.add(job.getName());
152 }
153 assertEquals(2, list.size());
154 assertTrue(jobNames.contains(JOB_A_NAME));
155 assertTrue(jobNames.contains(JOB_B_NAME));
156 }
157
158 @Test
159 public void testFindJobsForNonExistingConnector() throws Exception {
160 List<MJob> list = handler
161 .findJobsForConnectorUpgrade(11, provider.getConnection());
162 assertEquals(0, list.size());
163 }
164
165 @Test
166 public void testExistsJob() throws Exception {
167 assertTrue(handler.existsJob(JOB_A_NAME, provider.getConnection()));
168 assertTrue(handler.existsJob(JOB_B_NAME, provider.getConnection()));
169 assertFalse(handler.existsJob("NONEXISTJOB", provider.getConnection()));
170
171 // Delete jobs
172 for (MJob job : handler.findJobs(provider.getConnection())) {
173 handler.deleteJob(job.getName(), provider.getConnection());
174 }
175
176 // There shouldn't be anything on empty repository
177 assertFalse(handler.existsJob(JOB_A_NAME, provider.getConnection()));
178 assertFalse(handler.existsJob(JOB_A_NAME, provider.getConnection()));
179 assertFalse(handler.existsJob("NONEXISTJOB", provider.getConnection()));
180 }
181
182 @Test
183 public void testInUseJob() throws Exception {
184 MSubmission submission = getSubmission(
185 handler.findJob(JOB_A_NAME, provider.getConnection()), SubmissionStatus.RUNNING);
186 handler.createSubmission(submission, provider.getConnection());
187
188 assertTrue(handler.inUseJob(JOB_A_NAME, provider.getConnection()));
189 assertFalse(handler.inUseJob(JOB_B_NAME, provider.getConnection()));
190 assertFalse(handler.inUseJob("NONEXISTJOB", provider.getConnection()));
191 }
192
193 @Test
194 public void testCreateJob() throws Exception {
195 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
196 Assert.assertEquals(
197 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
198
199 MJob retrieved = handler.findJob(1, provider.getConnection());
200 assertEquals(1, retrieved.getPersistenceId());
201
202 List<MConfig> configs;
203 configs = retrieved.getFromJobConfig().getConfigs();
204 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
205 assertNull(configs.get(0).getInputs().get(1).getValue());
206 configs = retrieved.getToJobConfig().getConfigs();
207 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
208 assertNull(configs.get(0).getInputs().get(1).getValue());
209
210 configs = retrieved.getDriverConfig().getConfigs();
211 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
212 assertNull(configs.get(0).getInputs().get(1).getValue());
213 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
214 assertNull(configs.get(1).getInputs().get(1).getValue());
215 }
216
217 @Test
218 public void testCreateDuplicateJob() throws Exception {
219 // Duplicate jobs
220 MJob job = handler.findJob(JOB_A_NAME, provider.getConnection());
221 job.setPersistenceId(MJob.PERSISTANCE_ID_DEFAULT);
222 try {
223 handler.createJob(job, provider.getConnection());
224 Assert.fail("SqoopException should be thrown.");
225 } catch (SqoopException se) {
226 // ignore the excepted exception.
227 }
228 }
229
230 @Test
231 public void testUpdateJob() throws Exception {
232 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
233 Assert.assertEquals(
234 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
235
236 MJob job = handler.findJob(1, provider.getConnection());
237
238 List<MConfig> configs;
239
240 configs = job.getFromJobConfig().getConfigs();
241 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
242 ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
243
244 configs = job.getToJobConfig().getConfigs();
245 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
246 ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
247
248 configs = job.getDriverConfig().getConfigs();
249 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
250 ((MMapInput) configs.get(0).getInputs().get(1))
251 .setValue(new HashMap<String, String>()); // inject new map value
252 ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Updated");
253 ((MMapInput) configs.get(1).getInputs().get(1))
254 .setValue(new HashMap<String, String>()); // inject new map value
255
256 job.setName("name");
257
258 handler.updateJob(job, provider.getConnection());
259
260 assertEquals(1, job.getPersistenceId());
261 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
262 Assert.assertEquals(
263 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 14);
264
265 MJob retrieved = handler.findJob(1, provider.getConnection());
266 assertEquals("name", retrieved.getName());
267
268 configs = job.getFromJobConfig().getConfigs();
269 assertEquals(2, configs.size());
270 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
271 assertNull(configs.get(0).getInputs().get(1).getValue());
272 configs = job.getToJobConfig().getConfigs();
273 assertEquals(2, configs.size());
274 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
275 assertNull(configs.get(0).getInputs().get(1).getValue());
276
277 configs = retrieved.getDriverConfig().getConfigs();
278 assertEquals(2, configs.size());
279 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
280 assertNotNull(configs.get(0).getInputs().get(1).getValue());
281 assertEquals(((Map) configs.get(0).getInputs().get(1).getValue()).size(), 0);
282 }
283
284 @Test
285 public void testEnableAndDisableJob() throws Exception {
286 // disable job 1
287 handler.enableJob(JOB_A_NAME, false, provider.getConnection());
288
289 MJob retrieved = handler.findJob(1, provider.getConnection());
290 assertNotNull(retrieved);
291 assertEquals(false, retrieved.getEnabled());
292
293 // enable job 1
294 handler.enableJob(JOB_A_NAME, true, provider.getConnection());
295
296 retrieved = handler.findJob(1, provider.getConnection());
297 assertNotNull(retrieved);
298 assertEquals(true, retrieved.getEnabled());
299 }
300
301 @Test
302 public void testDeleteJob() throws Exception {
303 handler.deleteJob(JOB_A_NAME, provider.getConnection());
304 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 1);
305 Assert.assertEquals(
306 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 6);
307
308 handler.deleteJob(JOB_B_NAME, provider.getConnection());
309 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 0);
310 Assert.assertEquals(
311 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 0);
312 }
313 }