SQOOP-2823: Sqoop2: RESTiliency: Remove repetitive try-catch block for accessing...
[sqoop.git] / server / src / main / java / org / apache / sqoop / handler / JobRequestHandler.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.sqoop.handler;
19
20 import java.io.IOException;
21 import java.util.Arrays;
22 import java.util.Date;
23 import java.util.List;
24 import java.util.Locale;
25
26 import org.apache.log4j.Logger;
27 import org.apache.sqoop.audit.AuditLoggerManager;
28 import org.apache.sqoop.common.Direction;
29 import org.apache.sqoop.common.SqoopException;
30 import org.apache.sqoop.connector.ConnectorManager;
31 import org.apache.sqoop.connector.spi.SqoopConnector;
32 import org.apache.sqoop.driver.Driver;
33 import org.apache.sqoop.driver.JobManager;
34 import org.apache.sqoop.json.JSONUtils;
35 import org.apache.sqoop.json.JobBean;
36 import org.apache.sqoop.json.JsonBean;
37 import org.apache.sqoop.json.SubmissionBean;
38 import org.apache.sqoop.json.ValidationResultBean;
39 import org.apache.sqoop.model.*;
40 import org.apache.sqoop.repository.Repository;
41 import org.apache.sqoop.repository.RepositoryManager;
42 import org.apache.sqoop.request.HttpEventContext;
43 import org.apache.sqoop.security.authorization.AuthorizationEngine;
44 import org.apache.sqoop.security.AuthorizationManager;
45 import org.apache.sqoop.server.RequestContext;
46 import org.apache.sqoop.server.RequestHandler;
47 import org.apache.sqoop.server.common.ServerError;
48 import org.apache.sqoop.submission.SubmissionStatus;
49 import org.apache.sqoop.validation.ConfigValidationResult;
50 import org.apache.sqoop.validation.Status;
51 import org.json.simple.JSONObject;
52
53 public class JobRequestHandler implements RequestHandler {
54 private static final long serialVersionUID = 1L;
55
56 /** enum for representing the actions supported on the job resource*/
57 enum JobAction {
58 ENABLE("enable"),
59 DISABLE("disable"),
60 START("start"),
61 STOP("stop"),
62 ;
63 JobAction(String name) {
64 this.name = name;
65 }
66
67 String name;
68
69 public static JobAction fromString(String name) {
70 if (name != null) {
71 for (JobAction action : JobAction.values()) {
72 if (name.equalsIgnoreCase(action.name)) {
73 return action;
74 }
75 }
76 }
77 return null;
78 }
79 }
80
81 private static final Logger LOG = Logger.getLogger(JobRequestHandler.class);
82
83 static final String JOBS_PATH = "jobs";
84 static final String JOB_PATH = "job";
85 static final String STATUS = "status";
86
87 public JobRequestHandler() {
88 LOG.info("JobRequestHandler initialized");
89 }
90
91 @Override
92 public JsonBean handleEvent(RequestContext ctx) {
93 LOG.info("Got job request");
94 switch (ctx.getMethod()) {
95 case GET:
96 if (STATUS.equals(ctx.getLastURLElement())) {
97 return getJobStatus(ctx);
98 }
99 return getJobs(ctx);
100 case POST:
101 return createUpdateJob(ctx, true);
102 case PUT:
103 JobAction action = JobAction.fromString(ctx.getLastURLElement());
104 if (action != null) {
105 switch (action) {
106 case ENABLE:
107 return enableJob(ctx, true);
108 case DISABLE:
109 return enableJob(ctx, false);
110 case START:
111 return startJob(ctx);
112 case STOP:
113 return stopJob(ctx);
114 }
115 }
116 return createUpdateJob(ctx, false);
117 case DELETE:
118 return deleteJob(ctx);
119 }
120
121 return null;
122 }
123
124 /**
125 * Delete job from repository.
126 *
127 * @param ctx
128 * Context object
129 * @return Empty bean
130 */
131 private JsonBean deleteJob(RequestContext ctx) {
132
133 Repository repository = RepositoryManager.getInstance().getRepository();
134
135 String jobIdentifier = ctx.getLastURLElement();
136 MJob job = HandlerUtils.getJobFromIdentifier(jobIdentifier);
137 String jobName = job.getName();
138
139 // Authorization check
140 AuthorizationEngine.deleteJob(ctx.getUserName(), jobName);
141
142 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
143 ctx.getRequest().getRemoteAddr(), "delete", "job", jobIdentifier);
144 repository.deleteJob(jobName);
145 MResource resource = new MResource(jobName, MResource.TYPE.JOB);
146 AuthorizationManager.getInstance().getAuthorizationHandler().removeResource(resource);
147 return JsonBean.EMPTY_BEAN;
148 }
149
150 /**
151 * Update or create job in repository.
152 *
153 * @param ctx
154 * Context object
155 * @return Validation bean object
156 */
157 private JsonBean createUpdateJob(RequestContext ctx, boolean create) {
158
159 Repository repository = RepositoryManager.getInstance().getRepository();
160
161 JobBean bean = new JobBean();
162 bean.restore(JSONUtils.parse(ctx.getReader()));
163
164 String username = ctx.getUserName();
165
166 // Get job object
167 List<MJob> jobs = bean.getJobs();
168
169 if (jobs.size() != 1) {
170 throw new SqoopException(ServerError.SERVER_0003, "Expected one job but got " + jobs.size());
171 }
172
173 // Job object
174 MJob postedJob = jobs.get(0);
175 String oldJobName = ctx.getLastURLElement();
176
177 // Authorization check
178 if (create) {
179 AuthorizationEngine.createJob(ctx.getUserName(), postedJob.getFromLinkName(), postedJob.getToLinkName());
180 } else {
181 AuthorizationEngine.updateJob(ctx.getUserName(), postedJob.getFromLinkName(), postedJob.getToLinkName(),
182 oldJobName);
183 }
184
185 // Verify that user is not trying to spoof us
186 MFromConfig fromConfig = ConnectorManager.getInstance()
187 .getConnectorConfigurable(postedJob.getFromConnectorName()).getFromConfig();
188 MToConfig toConfig = ConnectorManager.getInstance()
189 .getConnectorConfigurable(postedJob.getToConnectorName()).getToConfig();
190 MDriverConfig driverConfig = Driver.getInstance().getDriver().getDriverConfig();
191
192 if (!fromConfig.equals(postedJob.getFromJobConfig())
193 || !driverConfig.equals(postedJob.getDriverConfig())
194 || !toConfig.equals(postedJob.getToJobConfig())) {
195 throw new SqoopException(ServerError.SERVER_0003, "Detected incorrect config structure");
196 }
197
198 // if update get the job id from the request URI
199 if (!create) {
200 MJob existingJob = HandlerUtils.getJobFromIdentifier(oldJobName);
201 if (postedJob.getPersistenceId() == MPersistableEntity.PERSISTANCE_ID_DEFAULT) {
202 postedJob.setPersistenceId(existingJob.getPersistenceId());
203 }
204 }
205
206 // Corresponding connectors for this
207 SqoopConnector fromConnector = ConnectorManager.getInstance().getSqoopConnector(
208 postedJob.getFromConnectorName());
209 SqoopConnector toConnector = ConnectorManager.getInstance().getSqoopConnector(
210 postedJob.getToConnectorName());
211
212 if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
213 throw new SqoopException(ServerError.SERVER_0004, "Connector "
214 + fromConnector.getClass().getCanonicalName() + " does not support FROM direction.");
215 }
216
217 if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
218 throw new SqoopException(ServerError.SERVER_0004, "Connector "
219 + toConnector.getClass().getCanonicalName() + " does not support TO direction.");
220 }
221
222 // Validate user supplied data
223 ConfigValidationResult fromConfigValidator = ConfigUtils.validateConfigs(
224 postedJob.getFromJobConfig().getConfigs(),
225 fromConnector.getJobConfigurationClass(Direction.FROM));
226 ConfigValidationResult toConfigValidator = ConfigUtils.validateConfigs(
227 postedJob.getToJobConfig().getConfigs(),
228 toConnector.getJobConfigurationClass(Direction.TO));
229 ConfigValidationResult driverConfigValidator = ConfigUtils.validateConfigs(postedJob
230 .getDriverConfig().getConfigs(), Driver.getInstance().getDriverJobConfigurationClass());
231 Status finalStatus = Status.getWorstStatus(fromConfigValidator.getStatus(),
232 toConfigValidator.getStatus(), driverConfigValidator.getStatus());
233 // Return back validations in all cases
234 ValidationResultBean validationResultBean = new ValidationResultBean(fromConfigValidator, toConfigValidator, driverConfigValidator);
235
236 // If we're good enough let's perform the action
237 if (finalStatus.canProceed()) {
238 if (create) {
239 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
240 ctx.getRequest().getRemoteAddr(), "create", "job",
241 String.valueOf(postedJob.getPersistenceId()));
242
243 postedJob.setCreationUser(username);
244 postedJob.setLastUpdateUser(username);
245 repository.createJob(postedJob);
246 validationResultBean.setName(postedJob.getName());
247 } else {
248 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
249 ctx.getRequest().getRemoteAddr(), "update", "job",
250 String.valueOf(postedJob.getPersistenceId()));
251
252 postedJob.setLastUpdateUser(username);
253 repository.updateJob(postedJob);
254 }
255 }
256 return validationResultBean;
257 }
258
259 private JsonBean getJobs(RequestContext ctx) {
260 String connectorIdentifier = ctx.getLastURLElement();
261 JobBean jobBean;
262 Locale locale = ctx.getAcceptLanguageHeader();
263 Repository repository = RepositoryManager.getInstance().getRepository();
264 // jobs by connector
265 if (ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM) != null) {
266 connectorIdentifier = ctx.getParameterValue(CONNECTOR_NAME_QUERY_PARAM);
267 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
268 ctx.getRequest().getRemoteAddr(), "get", "jobsByConnector", connectorIdentifier);
269 List<MJob> jobList = repository.findJobsForConnector(connectorIdentifier);
270
271 // Authorization check
272 jobList = AuthorizationEngine.filterResource(ctx.getUserName(), MResource.TYPE.JOB, jobList);
273
274 jobBean = createJobBean(jobList, locale);
275 } else
276 // all jobs in the system
277 if (ctx.getPath().contains(JOBS_PATH)
278 || (ctx.getPath().contains(JOB_PATH) && connectorIdentifier.equals("all"))) {
279 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
280 ctx.getRequest().getRemoteAddr(), "get", "jobs", "all");
281 List<MJob> jobList = repository.findJobs();
282
283 // Authorization check
284 jobList = AuthorizationEngine.filterResource(ctx.getUserName(), MResource.TYPE.JOB, jobList);
285
286 jobBean = createJobBean(jobList, locale);
287 }
288 // job by Id
289 else {
290 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
291 ctx.getRequest().getRemoteAddr(), "get", "job", connectorIdentifier);
292
293 MJob job = HandlerUtils.getJobFromIdentifier(connectorIdentifier);
294 String jobName = job.getName();
295
296 // Authorization check
297 AuthorizationEngine.readJob(ctx.getUserName(), jobName);
298
299 jobBean = createJobBean(Arrays.asList(job), locale);
300 }
301 return jobBean;
302 }
303
304 private JobBean createJobBean(List<MJob> jobs, Locale locale) {
305 JobBean jobBean = new JobBean(jobs);
306 addConnectorConfigBundle(jobBean, locale);
307 return jobBean;
308 }
309
310 private void addConnectorConfigBundle(JobBean bean, Locale locale) {
311 // Add associated resources into the bean
312 for (MJob job : bean.getJobs()) {
313 String fromConnectorName = job.getFromConnectorName();
314 String toConnectorName = job.getToConnectorName();
315
316 // replace it only if it does not already exist
317 if (!bean.hasConnectorConfigBundle(fromConnectorName)) {
318 bean.addConnectorConfigBundle(fromConnectorName, ConnectorManager.getInstance()
319 .getResourceBundle(fromConnectorName, locale));
320 }
321 if (!bean.hasConnectorConfigBundle(toConnectorName)) {
322 bean.addConnectorConfigBundle(toConnectorName, ConnectorManager.getInstance()
323 .getResourceBundle(toConnectorName, locale));
324 }
325 }
326 }
327
328 private JsonBean enableJob(RequestContext ctx, boolean enabled) {
329 Repository repository = RepositoryManager.getInstance().getRepository();
330 String[] elements = ctx.getUrlElements();
331 String jobIdentifier = elements[elements.length - 2];
332 MJob job = HandlerUtils.getJobFromIdentifier(jobIdentifier);
333 String jobName = job.getName();
334
335 // Authorization check
336 AuthorizationEngine.enableDisableJob(ctx.getUserName(), jobName);
337
338 repository.enableJob(jobName, enabled);
339 return JsonBean.EMPTY_BEAN;
340 }
341
342 private JsonBean startJob(RequestContext ctx) {
343 String[] elements = ctx.getUrlElements();
344 String jobIdentifier = elements[elements.length - 2];
345 MJob job = HandlerUtils.getJobFromIdentifier(jobIdentifier);
346 String jobName = job.getName();
347
348 // Authorization check
349 AuthorizationEngine.startJob(ctx.getUserName(), jobName);
350
351 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
352 ctx.getRequest().getRemoteAddr(), "submit", "job", jobName);
353 // TODO(SQOOP-1638): This should be outsourced somewhere more suitable than here
354 // Current approach is to point JobManager to use /v1/job/notification/$JOB_ID/status
355 // and depend on the behavior of status that for running jobs will go to the cluster
356 // and fetch the latest state. We don't have notification first class
357 if (JobManager.getInstance().getNotificationBaseUrl() == null) {
358 String url = ctx.getRequest().getRequestURL().toString();
359 JobManager.getInstance().setNotificationBaseUrl(
360 url.split("v1")[0] + "/v1/job/notification/");
361 }
362
363 MSubmission submission = JobManager.getInstance()
364 .start(jobName, prepareRequestEventContext(ctx));
365 return new SubmissionBean(submission);
366 }
367
368 private JsonBean stopJob(RequestContext ctx) {
369 String[] elements = ctx.getUrlElements();
370 String jobIdentifier = elements[elements.length - 2];
371 MJob job = HandlerUtils.getJobFromIdentifier(jobIdentifier);
372 String jobName = job.getName();
373
374 // Authorization check
375 AuthorizationEngine.stopJob(ctx.getUserName(), jobName);
376
377 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
378 ctx.getRequest().getRemoteAddr(), "stop", "job", jobName);
379 MSubmission submission = JobManager.getInstance().stop(jobName, prepareRequestEventContext(ctx));
380 return new SubmissionBean(submission);
381 }
382
383 private JsonBean getJobStatus(RequestContext ctx) {
384 String[] elements = ctx.getUrlElements();
385 String jobIdentifier = elements[elements.length - 2];
386 MJob job = HandlerUtils.getJobFromIdentifier(jobIdentifier);
387 String jobName = job.getName();
388
389 // Authorization check
390 AuthorizationEngine.statusJob(ctx.getUserName(), jobName);
391
392 AuditLoggerManager.getInstance().logAuditEvent(ctx.getUserName(),
393 ctx.getRequest().getRemoteAddr(), "status", "job", jobName);
394 MSubmission submission = JobManager.getInstance().status(jobName);
395 if (submission == null) {
396 submission = new MSubmission(job.getName(), new Date(), SubmissionStatus.NEVER_EXECUTED);
397 }
398
399 return new SubmissionBean(submission);
400 }
401
402 private HttpEventContext prepareRequestEventContext(RequestContext ctx) {
403 HttpEventContext httpEventContext = new HttpEventContext();
404 httpEventContext.setUsername(ctx.getUserName());
405 return httpEventContext;
406 }
407
408 }