7a87feec2129837dc0cb8ddae1d80eb336ee7c80
[sqoop.git] / repository / repository-mysql / src / test / java / org / apache / sqoop / integration / repository / mysql / TestJobHandling.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.integration.repository.mysql;
19
20 import java.util.HashMap;
21 import java.util.List;
22 import java.util.Map;
23
24 import org.apache.sqoop.common.SqoopException;
25 import org.apache.sqoop.common.test.db.TableName;
26 import org.apache.sqoop.model.MConfig;
27 import org.apache.sqoop.model.MConnector;
28 import org.apache.sqoop.model.MJob;
29 import org.apache.sqoop.model.MLink;
30 import org.apache.sqoop.model.MMapInput;
31 import org.apache.sqoop.model.MStringInput;
32 import org.apache.sqoop.model.MSubmission;
33 import org.apache.sqoop.submission.SubmissionStatus;
34 import org.testng.Assert;
35 import org.testng.annotations.BeforeMethod;
36 import org.testng.annotations.Test;
37
38 import static org.testng.Assert.assertEquals;
39 import static org.testng.Assert.assertNull;
40 import static org.testng.Assert.assertFalse;
41 import static org.testng.Assert.assertNotNull;
42 import static org.testng.Assert.assertTrue;
43
44 /**
45 * Test driver methods on MySql repository.
46 */
47 @Test(groups = "mysql")
48 public class TestJobHandling extends MySqlTestCase {
49
50 public static final String CONNECTOR_A_NAME = "A";
51 public static final String CONNECTOR_A_CLASSNAME = "org.apache.sqoop.test.A";
52 public static final String CONNECTOR_A_VERSION = "1.0-test";
53 public static final String CONNECTOR_B_NAME = "B";
54 public static final String CONNECTOR_B_CLASSNAME = "org.apache.sqoop.test.B";
55 public static final String CONNECTOR_B_VERSION = "1.0-test";
56 public static final String LINK_A_NAME = "Link-A";
57 public static final String LINK_B_NAME = "Link-B";
58 public static final String JOB_A_NAME = "Job-A";
59 public static final String JOB_B_NAME = "Job-B";
60
61 @BeforeMethod(alwaysRun = true)
62 public void setUp() throws Exception {
63 super.setUp();
64
65 handler.registerDriver(getDriver(), provider.getConnection());
66 MConnector connectorA = getConnector(CONNECTOR_A_NAME,
67 CONNECTOR_A_CLASSNAME, CONNECTOR_A_VERSION, true, true);
68 MConnector connectorB = getConnector(CONNECTOR_B_NAME,
69 CONNECTOR_B_CLASSNAME, CONNECTOR_B_VERSION, true, true);
70 handler.registerConnector(connectorA, provider.getConnection());
71 handler.registerConnector(connectorB, provider.getConnection());
72 MLink linkA = getLink(LINK_A_NAME, connectorA);
73 MLink linkB = getLink(LINK_B_NAME, connectorB);
74 handler.createLink(linkA, provider.getConnection());
75 handler.createLink(linkB, provider.getConnection());
76 handler.createJob(getJob(JOB_A_NAME, connectorA, connectorB, linkA, linkB),
77 provider.getConnection());
78 handler.createJob(getJob(JOB_B_NAME, connectorB, connectorA, linkB, linkA),
79 provider.getConnection());
80 }
81
82 @Test
83 public void testFindJobFail() throws Exception {
84 for (MJob job : handler.findJobs(provider.getConnection())) {
85 handler.deleteJob(job.getName(), provider.getConnection());
86 }
87
88 // Let's try to find non existing job
89 assertNull(handler.findJob(1, provider.getConnection()));
90 }
91
92 @Test
93 public void testFindJobSuccess() throws Exception {
94 MJob firstJob = handler.findJob(1, provider.getConnection());
95 assertNotNull(firstJob);
96 assertEquals(1, firstJob.getPersistenceId());
97 assertEquals(JOB_A_NAME, firstJob.getName());
98
99 List<MConfig> configs;
100
101 configs = firstJob.getFromJobConfig().getConfigs();
102 assertEquals(2, configs.size());
103 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
104 assertNull(configs.get(0).getInputs().get(1).getValue());
105 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
106 assertNull(configs.get(1).getInputs().get(1).getValue());
107
108 configs = firstJob.getToJobConfig().getConfigs();
109 assertEquals(2, configs.size());
110 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
111 assertNull(configs.get(0).getInputs().get(1).getValue());
112 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
113 assertNull(configs.get(1).getInputs().get(1).getValue());
114
115 configs = firstJob.getDriverConfig().getConfigs();
116 assertEquals(2, configs.size());
117 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
118 assertNull(configs.get(0).getInputs().get(1).getValue());
119 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
120 assertNull(configs.get(1).getInputs().get(1).getValue());
121 }
122
123 @Test
124 public void testFindJobs() throws Exception {
125 List<MJob> list;
126
127 list = handler.findJobs(provider.getConnection());
128 assertEquals(2, list.size());
129 assertEquals(JOB_A_NAME, list.get(0).getName());
130 assertEquals(JOB_B_NAME, list.get(1).getName());
131
132 // Delete jobs
133 for (MJob job : handler.findJobs(provider.getConnection())) {
134 handler.deleteJob(job.getName(), provider.getConnection());
135 }
136
137 // Load all two links on loaded repository
138 list = handler.findJobs(provider.getConnection());
139 assertEquals(0, list.size());
140 }
141
142 @Test
143 public void testFindJobsByConnector() throws Exception {
144 List<MJob> list = handler
145 .findJobsForConnectorUpgrade(
146 handler.findConnector("A", provider.getConnection())
147 .getPersistenceId(), provider.getConnection());
148 assertEquals(2, list.size());
149 assertEquals(JOB_A_NAME, list.get(0).getName());
150 assertEquals(JOB_B_NAME, list.get(1).getName());
151 }
152
153 @Test
154 public void testFindJobsForNonExistingConnector() throws Exception {
155 List<MJob> list = handler
156 .findJobsForConnectorUpgrade(11, provider.getConnection());
157 assertEquals(0, list.size());
158 }
159
160 @Test
161 public void testExistsJob() throws Exception {
162 assertTrue(handler.existsJob(JOB_A_NAME, provider.getConnection()));
163 assertTrue(handler.existsJob(JOB_B_NAME, provider.getConnection()));
164 assertFalse(handler.existsJob("NONEXISTJOB", provider.getConnection()));
165
166 // Delete jobs
167 for (MJob job : handler.findJobs(provider.getConnection())) {
168 handler.deleteJob(job.getName(), provider.getConnection());
169 }
170
171 // There shouldn't be anything on empty repository
172 assertFalse(handler.existsJob(JOB_A_NAME, provider.getConnection()));
173 assertFalse(handler.existsJob(JOB_A_NAME, provider.getConnection()));
174 assertFalse(handler.existsJob("NONEXISTJOB", provider.getConnection()));
175 }
176
177 @Test
178 public void testInUseJob() throws Exception {
179 MSubmission submission = getSubmission(
180 handler.findJob(JOB_A_NAME, provider.getConnection()), SubmissionStatus.RUNNING);
181 handler.createSubmission(submission, provider.getConnection());
182
183 assertTrue(handler.inUseJob(JOB_A_NAME, provider.getConnection()));
184 assertFalse(handler.inUseJob(JOB_B_NAME, provider.getConnection()));
185 assertFalse(handler.inUseJob("NONEXISTJOB", provider.getConnection()));
186 }
187
188 @Test
189 public void testCreateJob() throws Exception {
190 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
191 Assert.assertEquals(
192 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
193
194 MJob retrieved = handler.findJob(1, provider.getConnection());
195 assertEquals(1, retrieved.getPersistenceId());
196
197 List<MConfig> configs;
198 configs = retrieved.getFromJobConfig().getConfigs();
199 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
200 assertNull(configs.get(0).getInputs().get(1).getValue());
201 configs = retrieved.getToJobConfig().getConfigs();
202 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
203 assertNull(configs.get(0).getInputs().get(1).getValue());
204
205 configs = retrieved.getDriverConfig().getConfigs();
206 assertEquals("Value1", configs.get(0).getInputs().get(0).getValue());
207 assertNull(configs.get(0).getInputs().get(1).getValue());
208 assertEquals("Value2", configs.get(1).getInputs().get(0).getValue());
209 assertNull(configs.get(1).getInputs().get(1).getValue());
210 }
211
212 @Test
213 public void testCreateDuplicateJob() throws Exception {
214 // Duplicate jobs
215 MJob job = handler.findJob(JOB_A_NAME, provider.getConnection());
216 job.setPersistenceId(MJob.PERSISTANCE_ID_DEFAULT);
217 try {
218 handler.createJob(job, provider.getConnection());
219 Assert.fail("SqoopException should be thrown.");
220 } catch (SqoopException se) {
221 // ignore the excepted exception.
222 }
223 }
224
225 @Test
226 public void testUpdateJob() throws Exception {
227 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
228 Assert.assertEquals(
229 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 12);
230
231 MJob job = handler.findJob(1, provider.getConnection());
232
233 List<MConfig> configs;
234
235 configs = job.getFromJobConfig().getConfigs();
236 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
237 ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
238
239 configs = job.getToJobConfig().getConfigs();
240 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
241 ((MMapInput) configs.get(0).getInputs().get(1)).setValue(null);
242
243 configs = job.getDriverConfig().getConfigs();
244 ((MStringInput) configs.get(0).getInputs().get(0)).setValue("Updated");
245 ((MMapInput) configs.get(0).getInputs().get(1))
246 .setValue(new HashMap<String, String>()); // inject new map value
247 ((MStringInput) configs.get(1).getInputs().get(0)).setValue("Updated");
248 ((MMapInput) configs.get(1).getInputs().get(1))
249 .setValue(new HashMap<String, String>()); // inject new map value
250
251 job.setName("name");
252
253 handler.updateJob(job, provider.getConnection());
254
255 assertEquals(1, job.getPersistenceId());
256 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 2);
257 Assert.assertEquals(
258 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 14);
259
260 MJob retrieved = handler.findJob(1, provider.getConnection());
261 assertEquals("name", retrieved.getName());
262
263 configs = job.getFromJobConfig().getConfigs();
264 assertEquals(2, configs.size());
265 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
266 assertNull(configs.get(0).getInputs().get(1).getValue());
267 configs = job.getToJobConfig().getConfigs();
268 assertEquals(2, configs.size());
269 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
270 assertNull(configs.get(0).getInputs().get(1).getValue());
271
272 configs = retrieved.getDriverConfig().getConfigs();
273 assertEquals(2, configs.size());
274 assertEquals("Updated", configs.get(0).getInputs().get(0).getValue());
275 assertNotNull(configs.get(0).getInputs().get(1).getValue());
276 assertEquals(((Map) configs.get(0).getInputs().get(1).getValue()).size(), 0);
277 }
278
279 @Test
280 public void testEnableAndDisableJob() throws Exception {
281 // disable job 1
282 handler.enableJob(JOB_A_NAME, false, provider.getConnection());
283
284 MJob retrieved = handler.findJob(1, provider.getConnection());
285 assertNotNull(retrieved);
286 assertEquals(false, retrieved.getEnabled());
287
288 // enable job 1
289 handler.enableJob(JOB_A_NAME, true, provider.getConnection());
290
291 retrieved = handler.findJob(1, provider.getConnection());
292 assertNotNull(retrieved);
293 assertEquals(true, retrieved.getEnabled());
294 }
295
296 @Test
297 public void testDeleteJob() throws Exception {
298 handler.deleteJob(JOB_A_NAME, provider.getConnection());
299 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 1);
300 Assert.assertEquals(
301 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 6);
302
303 handler.deleteJob(JOB_B_NAME, provider.getConnection());
304 Assert.assertEquals(provider.rowCount(new TableName("SQOOP", "SQ_JOB")), 0);
305 Assert.assertEquals(
306 provider.rowCount(new TableName("SQOOP", "SQ_JOB_INPUT")), 0);
307 }
308 }