SAMZA-1452: Clean up interrupted thread bugs
authorNacho Solis <nsolis@linkedin.com>
Wed, 11 Oct 2017 14:49:52 +0000 (07:49 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 11 Oct 2017 14:49:52 +0000 (07:49 -0700)
Call Thread.currentThread().interrupt(); when capturing InterruptedException

Author: Nacho Solis <nsolis@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>,Jagadish <jvenkatr@linkedin.com>

Closes #322 from isolis/cleancodebugs

samza-api/src/main/java/org/apache/samza/system/SystemStreamPartitionIterator.java
samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java
samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java
samza-rest/src/main/java/org/apache/samza/monitor/SamzaMonitorService.java
samza-rest/src/main/java/org/apache/samza/rest/script/ScriptRunner.java

index d1d61ed..726a5f4 100644 (file)
@@ -90,6 +90,7 @@ public class SystemStreamPartitionIterator implements Iterator<IncomingMessageEn
           peeks.addAll(systemStreamPartitionEnvelopes);
         }
       } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
         throw new SamzaException(e);
       }
     }
index 223d1d6..d1b532f 100644 (file)
@@ -131,6 +131,7 @@ public class ConfigManager {
     } catch (InterruptedException e) {
       e.printStackTrace();
       log.warn("Got interrupt in config manager thread, so shutting down");
+      Thread.currentThread().interrupt();
     } finally {
       log.info("Stopping the config manager");
       stop();
@@ -305,6 +306,7 @@ public class ConfigManager {
       }
     } catch (InterruptedException e) {
       e.printStackTrace();
+      Thread.currentThread().interrupt();
     }
 
     log.info("Killed the current job successfully");
index 9438690..622932f 100644 (file)
@@ -427,7 +427,7 @@ public class AzureJobCoordinator implements JobCoordinator {
           try {
             Thread.sleep(random.nextInt(5000));
           } catch (InterruptedException e) {
-            Thread.interrupted();
+            Thread.currentThread().interrupt();
           }
           LOG.info("Checking for barrier state on the blob again...");
           blobBarrierState = leaderBlob.getBarrierState();
index c0d3ff2..172a0f3 100644 (file)
@@ -75,7 +75,7 @@ public class AzureLock implements DistributedLockWithState {
         try {
           Thread.sleep(random.nextInt(1000));
         } catch (InterruptedException e) {
-          Thread.interrupted();
+          Thread.currentThread().interrupt();
         }
         LOG.info("Trying to acquire lock again...");
       }
index 70be208..b548200 100644 (file)
@@ -222,6 +222,7 @@ public class StreamProcessor {
               LOGGER.info("Container was not running.", icse);
               shutdownComplete = true;
             } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
               LOGGER.warn("Container shutdown was interrupted!" + container.toString(), e);
             }
             LOGGER.info("Shutting down container done for pid=" + processorId + "; complete =" + shutdownComplete);
index fb9bb56..230625d 100644 (file)
@@ -224,6 +224,7 @@ public class HdfsSystemConsumer extends BlockingEnvelopeMap {
     try {
       super.put(systemStreamPartition, envelope);
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       throw new SamzaException("ReaderRunnable interrupted for ssp: " + systemStreamPartition);
     }
   }
index 80321f3..85491ad 100644 (file)
@@ -90,6 +90,7 @@ public class SamzaMonitorService {
                 } catch (IOException e) {
                     LOGGER.error("Caught IOException during " + monitor.toString() + ".monitor()", e);
                 } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
                     LOGGER.error("Caught InterruptedException during " + monitor.toString() + ".monitor()", e);
                 } catch (Exception e) {
                     LOGGER.error("Unexpected exception during {}.monitor()", monitor, e);
index 1eab067..549541e 100644 (file)
@@ -123,6 +123,7 @@ public class ScriptRunner {
         try {
           p.waitFor();
         } catch (InterruptedException ignore) {
+          Thread.currentThread().interrupt();
           return;
         }
       }