SAMZA-1261: Fix TestProcessJob flaky test
authorAhmed Abdul Hamid <ahabdulh@ahabdulh-ld1.linkedin.biz>
Mon, 7 May 2018 20:45:53 +0000 (13:45 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Mon, 7 May 2018 20:45:53 +0000 (13:45 -0700)
- Fix flaky test, `TestProcessJob` `testProcessJobKillShouldWork`, which was failing intermittently due to a race condition. In particular, the thread running the test could assert `jobModelManager.stopped` before another thread, enclosed within `ProcessJob.submit`, could actually invoke `jobModelManager.stop`.

+ Refactor `ProcessJob` to improve its overall robustness
  + Handle corner cases, e.g.
     + Fail gracefully if starting process within `ProcessJob.submit` throws
     + Ignore attempts to kill a job before it is submitted
     + Ensure job status is always set appropriately
  + Remove unnecessary stdout/stderr piping code
  + Employ `wait`/`notify` instead of `Thread.sleep`
  + Eliminate all artificial wait method invocations intended to influence inter-thread execution order in unit tests
  + Add more unit tests

Author: Ahmed Abdul Hamid <ahabdulh@ahabdulh-ld1.linkedin.biz>

Reviewers: Boris S<sborya@apache.org>, Shanthoosh V<svenkata@linkedin.com>

Closes #485 from ahmedahamid/master

samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala

index bc2d74b..f719220 100644 (file)
 
 package org.apache.samza.job.local
 
-import java.io.{InputStream, OutputStream}
 import java.util.concurrent.CountDownLatch
 
-import org.apache.samza.SamzaException
 import org.apache.samza.coordinator.JobModelManager
-import org.apache.samza.job.ApplicationStatus.{New, Running, UnsuccessfulFinish}
+import org.apache.samza.job.ApplicationStatus.{New, Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
 import org.apache.samza.util.Logging
 
 import scala.collection.JavaConverters._
 
-class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager) extends StreamJob with Logging {
-  var jobStatus: Option[ApplicationStatus] = None
-  var process: Process = null
-
-  def submit: StreamJob = {
-    jobStatus = Some(New)
-    val waitForThreadStart = new CountDownLatch(1)
+object ProcessJob {
+  private def createProcessBuilder(commandBuilder: CommandBuilder): ProcessBuilder = {
     val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList.asJava)
+    processBuilder.environment.putAll(commandBuilder.buildEnvironment)
+
+    // Pipe all output to this process's streams.
+    processBuilder.redirectOutput(ProcessBuilder.Redirect.INHERIT)
+    processBuilder.redirectError(ProcessBuilder.Redirect.INHERIT)
 
     processBuilder
-      .environment
-      .putAll(commandBuilder.buildEnvironment)
+  }
+}
+
+class ProcessJob(commandBuilder: CommandBuilder, val jobModelManager: JobModelManager) extends StreamJob with Logging {
+
+  import ProcessJob._
+
+  val lock = new Object
+  val processBuilder: ProcessBuilder = createProcessBuilder(commandBuilder)
+  var jobStatus: ApplicationStatus = New
+  var processThread: Option[Thread] = None
+
+
+  def submit: StreamJob = {
+    val threadStartCountDownLatch = new CountDownLatch(1)
 
-    // create a non-daemon thread to make job runner block until the job finishes.
-    // without this, the proc dies when job runner ends.
-    val procThread = new Thread {
+    // Create a non-daemon thread to make job runner block until the job finishes.
+    // Without this, the proc dies when job runner ends.
+    processThread = Some(new Thread {
       override def run {
-        process = processBuilder.start
-
-        // pipe all output to this process's streams
-        val outThread = new Thread(new Piper(process.getInputStream, System.out))
-        val errThread = new Thread(new Piper(process.getErrorStream, System.err))
-        outThread.setDaemon(true)
-        errThread.setDaemon(true)
-        outThread.start
-        errThread.start
-        waitForThreadStart.countDown
-        process.waitFor
-        jobCoordinator.stop
+        var processExitCode = -1
+        var process: Option[Process] = None
+
+        setStatus(Running)
+
+        try {
+          threadStartCountDownLatch.countDown
+          process = Some(processBuilder.start)
+          processExitCode = process.get.waitFor
+        } catch {
+          case _: InterruptedException => process foreach { p => p.destroyForcibly }
+          case e: Exception => error("Encountered an error during job start: %s".format(e.getMessage))
+        } finally {
+          jobModelManager.stop
+          setStatus(if (processExitCode == 0) SuccessfulFinish else UnsuccessfulFinish)
+        }
       }
-    }
+    })
+
+    info("Starting process job")
 
-    procThread.start
-    waitForThreadStart.await
-    jobStatus = Some(Running)
+    processThread.get.start
+    threadStartCountDownLatch.await
     ProcessJob.this
   }
 
   def kill: StreamJob = {
-    process.destroyForcibly
-    jobStatus = Some(UnsuccessfulFinish)
-    ProcessJob.this
-  }
+    getStatus match {
+      case Running => {
+        info("Attempting to kill running process job")
 
-  def waitForFinish(timeoutMs: Long) = {
-    val thread = new Thread {
-      setDaemon(true)
-      override def run {
-        try {
-          process.waitFor
-        } catch {
-          case e: InterruptedException => info("Got interrupt.", e)
+        processThread foreach { thread =>
+          thread.interrupt
+          thread.join
+
+          info("Process job killed successfully")
         }
       }
+      case status => warn("Ignoring attempt to kill a process job that is not running. Job status is %s".format(status))
     }
 
-    thread.start
-    thread.join(timeoutMs)
-    thread.interrupt
-    jobStatus.getOrElse(null)
+    ProcessJob.this
   }
 
-  def waitForStatus(status: ApplicationStatus, timeoutMs: Long) = {
-    val start = System.currentTimeMillis
+  def waitForFinish(timeoutMs: Long): ApplicationStatus = {
+    require(timeoutMs >= 0, "Timeout values must be non-negative")
 
-    while (System.currentTimeMillis - start < timeoutMs && status != jobStatus) {
-      Thread.sleep(500)
-    }
-
-    jobStatus.getOrElse(null)
+    processThread foreach { thread => thread.join(timeoutMs) }
+    getStatus
   }
 
-  def getStatus = jobStatus.getOrElse(null)
-}
+  def waitForStatus(status: ApplicationStatus, timeoutMs: Long): ApplicationStatus = lock.synchronized {
+    require(timeoutMs >= 0, "Timeout values must be non-negative")
 
-/**
- * Silly class to forward bytes from one stream to another. Using this to pipe
- * output from subprocess to this process' stdout/stderr.
- */
-class Piper(in: InputStream, out: OutputStream) extends Runnable {
-  def run() {
-    try {
-      val b = new Array[Byte](512)
-      var read = 1;
-      while (read > -1) {
-        read = in.read(b, 0, b.length)
-        if (read > -1) {
-          out.write(b, 0, read)
-          out.flush()
+    timeoutMs match {
+      case 0 => {
+        info("Waiting for application status %s indefinitely".format(status))
+
+        while (getStatus != status) lock.wait(0)
+      }
+      case _ => {
+        info("Waiting for application status %s for %d ms".format(status, timeoutMs))
+
+        val startTimeMs = System.currentTimeMillis
+        var remainingTimeoutMs = timeoutMs
+
+        while (getStatus != status && remainingTimeoutMs > 0) {
+          lock.wait(remainingTimeoutMs)
+
+          val elapsedWaitTimeMs = System.currentTimeMillis - startTimeMs
+          remainingTimeoutMs = timeoutMs - elapsedWaitTimeMs
         }
       }
-    } catch {
-      case e: Exception => throw new SamzaException("Broken pipe", e);
-    } finally {
-      in.close()
-      out.close()
     }
+    getStatus
+  }
+
+  def getStatus: ApplicationStatus = lock.synchronized {
+    jobStatus
+  }
+
+  private def setStatus(status: ApplicationStatus): Unit = lock.synchronized {
+    info("Changing process job status from %s to %s".format(jobStatus, status))
+
+    jobStatus = status
+    lock.notify
   }
 }
index 58ecf99..dc87583 100644 (file)
 package org.apache.samza.job.local
 
 import org.apache.samza.coordinator.JobModelManager
+import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish}
+import org.apache.samza.job.CommandBuilder
 import org.junit.Assert._
 import org.junit.Test
-import org.apache.samza.job.ApplicationStatus
-import org.apache.samza.job.CommandBuilder
+
 import scala.collection.JavaConverters._
 
-class TestProcessJob {
-  @Test
-  def testProcessJobShouldFinishOnItsOwn {
+object TestProcessJob {
+
+  val OneSecondCommand = "sleep 1"
+  val TenSecondCommand = "sleep 10"
+  val SimpleCommand = "true"
+  val FailingCommand = "false"
+  val BadCommand = "bad-non-existing-command"
+
+  private def createProcessJob(command: String): ProcessJob = {
     val commandBuilder = new CommandBuilder {
-      override def buildCommand = "sleep 1"
+      override def buildCommand = command
+
       override def buildEnvironment = Map[String, String]().asJava
     }
-    val coordinator = new MockJobModelManager()
-    val job = new ProcessJob(commandBuilder, coordinator)
-    job.submit
-    job.waitForFinish(999999)
+    new ProcessJob(commandBuilder, new MockJobModelManager)
   }
 
-  // TODO: fix in SAMZA-1261
-  // @Test
-  def testProcessJobKillShouldWork {
-    val commandBuilder = new CommandBuilder {
-      override def buildCommand = "sleep 999999999"
-      override def buildEnvironment = Map[String, String]().asJava
-    }
-    val coordinator = new MockJobModelManager()
-    val job = new ProcessJob(commandBuilder, coordinator)
-    job.submit
-    job.waitForFinish(500)
-    job.kill
-    job.waitForFinish(999999)
-    assertTrue(coordinator.stopped)
-    assertEquals(ApplicationStatus.UnsuccessfulFinish, job.waitForFinish(999999999))
+  private def getMockJobModelManager(processJob: ProcessJob): MockJobModelManager = {
+    processJob.jobModelManager.asInstanceOf[MockJobModelManager]
+  }
+}
+
+class TestProcessJob {
+
+  import TestProcessJob._
+
+  @Test
+  def testProcessJobShouldFinishOnItsOwn: Unit = {
+    val processJob = createProcessJob(SimpleCommand)
+
+    val status = processJob.submit.waitForFinish(0)
+
+    assertEquals(SuccessfulFinish, status)
+    assertTrue(getMockJobModelManager(processJob).stopped)
+  }
+
+  @Test
+  def testProcessJobShouldReportFailingCommands: Unit = {
+    val processJob = createProcessJob(FailingCommand)
+
+    val status = processJob.submit.waitForFinish(0)
+
+    assertEquals(UnsuccessfulFinish, status)
+    assertTrue(getMockJobModelManager(processJob).stopped)
+  }
+
+  @Test
+  def testProcessJobWaitForFinishShouldTimeOut: Unit = {
+    val processJob = createProcessJob(OneSecondCommand)
+
+    // Wait for a shorter duration than that necessary for the specified command to complete.
+    val status = processJob.submit.waitForFinish(10)
+
+    assertEquals(Running, status)
+  }
+
+  @Test
+  def testProcessJobKillShouldWork: Unit = {
+    val processJob = createProcessJob(TenSecondCommand)
+
+    processJob.submit.kill
+
+    assertEquals(UnsuccessfulFinish, processJob.getStatus)
+    assertTrue(getMockJobModelManager(processJob).stopped)
+  }
+
+  @Test
+  def testProcessJobSubmitBadProcessShouldFailGracefully: Unit = {
+    val processJob = createProcessJob(BadCommand)
+
+    processJob.submit.waitForFinish(0)
+
+    assertEquals(UnsuccessfulFinish, processJob.getStatus)
+    assertTrue(getMockJobModelManager(processJob).stopped)
+  }
+
+  @Test
+  def testProcessJobWaitForStatusShouldWork: Unit = {
+    val processJob = createProcessJob(SimpleCommand)
+
+    processJob.submit.waitForStatus(SuccessfulFinish, 0)
+
+    assertEquals(SuccessfulFinish, processJob.getStatus)
+    assertTrue(getMockJobModelManager(processJob).stopped)
+  }
+
+  @Test
+  def testProcessJobWaitForStatusShouldTimeOut: Unit = {
+    val processJob = createProcessJob(OneSecondCommand)
+
+    // Wait for a shorter duration than that necessary for the specified command to complete.
+    val status = processJob.submit.waitForStatus(SuccessfulFinish, 10)
+
+    assertEquals(Running, status)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testProcessJobWaitForStatusShouldThrowOnNegativeTimeout: Unit = {
+    val processJob = createProcessJob(SimpleCommand)
+    processJob.waitForStatus(SuccessfulFinish, -1)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def testProcessJobWaitForFinishShouldThrowOnNegativeTimeout: Unit = {
+    val processJob = createProcessJob(SimpleCommand)
+    processJob.waitForFinish(-1)
   }
 }
 
 class MockJobModelManager extends JobModelManager(null, null) {
   var stopped: Boolean = false
 
-  override def start: Unit = { }
+  override def start: Unit = {}
 
   override def stop: Unit = {
-    stopped = true;
+    stopped = true
   }
 }