SAMZA-1698: Update appStatus on failures in localApplication.run(streamApp).
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Fri, 4 May 2018 00:37:49 +0000 (17:37 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 4 May 2018 00:37:49 +0000 (17:37 -0700)
Author: Shanthoosh Venkataraman <santhoshvenkat1988@gmail.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #502 from shanthoosh/local_application_runner_set_exception_in_finish

samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java

index 9529581..8f481cd 100644 (file)
@@ -175,8 +175,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
 
       // 4. start the StreamProcessors
       processors.forEach(StreamProcessor::start);
-    } catch (Exception e) {
-      throw new SamzaException("Failed to start application", e);
+    } catch (Throwable throwable) {
+      appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
+      shutdownLatch.countDown();
+      throw new SamzaException(String.format("Failed to start application: %s.", app), throwable);
     }
   }
 
index a23e513..b4a2259 100644 (file)
@@ -224,16 +224,13 @@ public class TestLocalApplicationRunner {
     when(plan.getJobConfigs()).thenReturn(Collections.singletonList(new JobConfig(new MapConfig(config))));
     doReturn(plan).when(runner).getExecutionPlan(any(), any());
 
-    Throwable t = new Throwable("test failure");
     StreamProcessor sp = mock(StreamProcessor.class);
     ArgumentCaptor<StreamProcessorLifecycleListener> captor =
         ArgumentCaptor.forClass(StreamProcessorLifecycleListener.class);
 
     doAnswer(i ->
       {
-        StreamProcessorLifecycleListener listener = captor.getValue();
-        listener.onFailure(t);
-        return null;
+        throw new Exception("test failure");
       }).when(sp).start();
 
     doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), captor.capture());