58ecf99d55a7a3b988e16ad9bf80f2328803188a
[samza.git] / samza-core / src / test / scala / org / apache / samza / job / local / TestProcessJob.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 package org.apache.samza.job.local
21
22 import org.apache.samza.coordinator.JobModelManager
23 import org.junit.Assert._
24 import org.junit.Test
25 import org.apache.samza.job.ApplicationStatus
26 import org.apache.samza.job.CommandBuilder
27 import scala.collection.JavaConverters._
28
29 class TestProcessJob {
30   @Test
31   def testProcessJobShouldFinishOnItsOwn {
32     val commandBuilder = new CommandBuilder {
33       override def buildCommand = "sleep 1"
34       override def buildEnvironment = Map[String, String]().asJava
35     }
36     val coordinator = new MockJobModelManager()
37     val job = new ProcessJob(commandBuilder, coordinator)
38     job.submit
39     job.waitForFinish(999999)
40   }
41
42   // TODO: fix in SAMZA-1261
43   // @Test
44   def testProcessJobKillShouldWork {
45     val commandBuilder = new CommandBuilder {
46       override def buildCommand = "sleep 999999999"
47       override def buildEnvironment = Map[String, String]().asJava
48     }
49     val coordinator = new MockJobModelManager()
50     val job = new ProcessJob(commandBuilder, coordinator)
51     job.submit
52     job.waitForFinish(500)
53     job.kill
54     job.waitForFinish(999999)
55     assertTrue(coordinator.stopped)
56     assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
57   }
58 }
59
60 class MockJobModelManager extends JobModelManager(null, null) {
61   var stopped: Boolean = false
62
63   override def start: Unit = { }
64
65   override def stop: Unit = {
66     stopped = true;
67   }
68 }