bc2d74b81738557211b460c8cda9df20aa80f2eb
[samza.git] / samza-core / src / main / scala / org / apache / samza / job / local / ProcessJob.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 package org.apache.samza.job.local
21
22 import java.io.{InputStream, OutputStream}
23 import java.util.concurrent.CountDownLatch
24
25 import org.apache.samza.SamzaException
26 import org.apache.samza.coordinator.JobModelManager
27 import org.apache.samza.job.ApplicationStatus.{New, Running, UnsuccessfulFinish}
28 import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
29 import org.apache.samza.util.Logging
30
31 import scala.collection.JavaConverters._
32
33 class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager) extends StreamJob with Logging {
34   var jobStatus: Option[ApplicationStatus] = None
35   var process: Process = null
36
37   def submit: StreamJob = {
38     jobStatus = Some(New)
39     val waitForThreadStart = new CountDownLatch(1)
40     val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList.asJava)
41
42     processBuilder
43       .environment
44       .putAll(commandBuilder.buildEnvironment)
45
46     // create a non-daemon thread to make job runner block until the job finishes.
47     // without this, the proc dies when job runner ends.
48     val procThread = new Thread {
49       override def run {
50         process = processBuilder.start
51
52         // pipe all output to this process's streams
53         val outThread = new Thread(new Piper(process.getInputStream, System.out))
54         val errThread = new Thread(new Piper(process.getErrorStream, System.err))
55         outThread.setDaemon(true)
56         errThread.setDaemon(true)
57         outThread.start
58         errThread.start
59         waitForThreadStart.countDown
60         process.waitFor
61         jobCoordinator.stop
62       }
63     }
64
65     procThread.start
66     waitForThreadStart.await
67     jobStatus = Some(Running)
68     ProcessJob.this
69   }
70
71   def kill: StreamJob = {
72     process.destroyForcibly
73     jobStatus = Some(UnsuccessfulFinish)
74     ProcessJob.this
75   }
76
77   def waitForFinish(timeoutMs: Long) = {
78     val thread = new Thread {
79       setDaemon(true)
80       override def run {
81         try {
82           process.waitFor
83         } catch {
84           case e: InterruptedException => info("Got interrupt.", e)
85         }
86       }
87     }
88
89     thread.start
90     thread.join(timeoutMs)
91     thread.interrupt
92     jobStatus.getOrElse(null)
93   }
94
95   def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
96     val start = System.currentTimeMillis
97
98     while (System.currentTimeMillis - start < timeoutMs && status != jobStatus) {
99       Thread.sleep(500)
100     }
101
102     jobStatus.getOrElse(null)
103   }
104
105   def getStatus = jobStatus.getOrElse(null)
106 }
107
108 /**
109  * Silly class to forward bytes from one stream to another. Using this to pipe
110  * output from subprocess to this process' stdout/stderr.
111  */
112 class Piper(in: InputStream, out: OutputStream) extends Runnable {
113   def run() {
114     try {
115       val b = new Array[Byte](512)
116       var read = 1;
117       while (read > -1) {
118         read = in.read(b, 0, b.length)
119         if (read > -1) {
120           out.write(b, 0, read)
121           out.flush()
122         }
123       }
124     } catch {
125       case e: Exception => throw new SamzaException("Broken pipe", e);
126     } finally {
127       in.close()
128       out.close()
129     }
130   }
131 }