SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure
authorPawas Chhokra <pawas2703@gmail.com>
Fri, 18 Aug 2017 21:38:46 +0000 (14:38 -0700)
committernavina <navina@apache.org>
Fri, 18 Aug 2017 21:38:46 +0000 (14:38 -0700)
PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
PR 4: TableUtils + ProcessorEntity
PR 5: AzureLeaderElector
PR 6: Added all schedulers (current PR)

Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pawas2703@gmail.com>

Reviewers: Navina Ramesh <navina@apache.org>

Closes #261 from PawasChhokra/AzureSchedulers

18 files changed:
samza-azure/src/main/java/org/apache/samza/AzureClient.java
samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java [moved from samza-azure/src/main/java/org/apache/samza/AzureConfig.java with 91% similarity]
samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java [moved from samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java with 82% similarity]
samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java [moved from samza-azure/src/main/java/org/apache/samza/JobModelBundle.java with 97% similarity]
samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java [moved from samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java with 80% similarity]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java [new file with mode: 0644]
samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java [moved from samza-azure/src/main/java/org/apache/samza/BlobUtils.java with 98% similarity]
samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java [moved from samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java with 95% similarity]
samza-azure/src/main/java/org/apache/samza/util/TableUtils.java [moved from samza-azure/src/main/java/org/apache/samza/TableUtils.java with 88% similarity]

index 2248d12..04f8fd3 100644 (file)
@@ -25,6 +25,7 @@ import com.microsoft.azure.storage.RetryPolicy;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableRequestOptions;
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
 import org.slf4j.Logger;
@@ -44,21 +45,26 @@ public class AzureClient {
   /**
    * Creates a reference to the Azure Storage account according to the connection string that the client passes.
    * Also creates references to Azure Blob Storage and Azure Table Storage.
-   * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+   * @param storageConnectionString Connection string to connect to Azure Storage Account
+   *                                Format: DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key"
    * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid.
    */
-  AzureClient(String storageConnectionString) {
+  public AzureClient(String storageConnectionString) {
     try {
       account = CloudStorageAccount.parse(storageConnectionString);
+      RetryPolicy retryPolicy = new RetryLinearRetry(5000,  3);
 
       blobClient = account.createCloudBlobClient();
       // Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals.
-      BlobRequestOptions options = new BlobRequestOptions();
-      RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
-      options.setRetryPolicyFactory(retryPolicy);
-      blobClient.setDefaultRequestOptions(options);
+      BlobRequestOptions blobOptions = new BlobRequestOptions();
+      blobOptions.setRetryPolicyFactory(retryPolicy);
+      blobClient.setDefaultRequestOptions(blobOptions);
 
+      // Set retry policy for operations on the table. In this case, every failed operation on the table will be retried thrice, after 5 second intervals.
       tableClient = account.createCloudTableClient();
+      TableRequestOptions tableOptions = new TableRequestOptions();
+      tableOptions.setRetryPolicyFactory(retryPolicy);
+      tableClient.setDefaultRequestOptions(tableOptions);
     } catch (IllegalArgumentException | URISyntaxException e) {
       LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
       LOG.error("Please confirm the connection string is in the Azure connection string format.");
  * under the License.
  */
 
-package org.apache.samza;
-
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.MapConfig;
+package org.apache.samza.config;
 
 /**
  * Config class for reading all user defined parameters for Azure driven coordination services.
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
new file mode 100644 (file)
index 0000000..9438690
--- /dev/null
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.AzureClient;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
+import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
+import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.LeaseBlobManager;
+import org.apache.samza.util.TableUtils;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * Class that provides coordination mechanism for Samza standalone in Azure.
+ * Handles processor lifecycle through Azure blob and table storage. Orchestrates leader election.
+ * The leader job coordinator generates partition mapping, writes shared data to the blob and manages rebalancing.
+ */
+public class AzureJobCoordinator implements JobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(AzureJobCoordinator.class);
+  private static final int METADATA_CACHE_TTL_MS = 5000;
+  private static final String INITIAL_STATE = "UNASSIGNED";
+  private final Consumer<String> errorHandler;
+  private final AzureLeaderElector azureLeaderElector;
+  private final BlobUtils leaderBlob;
+  private final TableUtils table;
+  private final Config config;
+  private final String processorId;
+  private final AzureClient client;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicBoolean versionUpgradeDetected;
+  private final HeartbeatScheduler heartbeat;
+  private final JMVersionUpgradeScheduler versionUpgrade;
+  private final LeaderLivenessCheckScheduler leaderAlive;
+  private LivenessCheckScheduler liveness;
+  private RenewLeaseScheduler renewLease;
+  private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
+  private StreamMetadataCache streamMetadataCache = null;
+  private JobCoordinatorListener coordinatorListener = null;
+  private JobModel jobModel = null;
+
+  /**
+   * Creates an instance of Azure job coordinator, along with references to Azure leader elector, Azure Blob and Azure Table.
+   * @param config User defined config
+   */
+  public AzureJobCoordinator(Config config) {
+    //TODO: Cleanup previous values in the table when barrier times out.
+    this.config = config;
+    processorId = createProcessorId(config);
+    currentJMVersion = new AtomicReference<>(INITIAL_STATE);
+    AzureConfig azureConfig = new AzureConfig(config);
+    client = new AzureClient(azureConfig.getAzureConnect());
+    leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
+    errorHandler = (errorMsg) -> {
+      LOG.error(errorMsg);
+      stop();
+    };
+    table = new TableUtils(client, azureConfig.getAzureTableName(), INITIAL_STATE);
+    azureLeaderElector = new AzureLeaderElector(new LeaseBlobManager(leaderBlob.getBlob()));
+    azureLeaderElector.setLeaderElectorListener(new AzureLeaderElectorListener());
+    versionUpgradeDetected = new AtomicBoolean(false);
+    heartbeat = new HeartbeatScheduler(errorHandler, table, currentJMVersion, processorId);
+    versionUpgrade = new JMVersionUpgradeScheduler(errorHandler, leaderBlob, currentJMVersion, versionUpgradeDetected, processorId);
+    leaderAlive = new LeaderLivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, INITIAL_STATE);
+    leaderBarrierScheduler = null;
+    renewLease = null;
+    liveness = null;
+  }
+
+  @Override
+  public void start() {
+
+    LOG.info("Starting Azure job coordinator.");
+    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+    table.addProcessorEntity(INITIAL_STATE, processorId, false);
+
+    // Start scheduler for heartbeating
+    LOG.info("Starting scheduler for heartbeating.");
+    heartbeat.scheduleTask();
+
+    azureLeaderElector.tryBecomeLeader();
+
+    // Start scheduler to check for job model version upgrades
+    LOG.info("Starting scheduler to check for job model version upgrades.");
+    versionUpgrade.setStateChangeListener(createJMVersionUpgradeListener());
+    versionUpgrade.scheduleTask();
+
+    // Start scheduler to check for leader liveness
+    LOG.info("Starting scheduler to check for leader liveness.");
+    leaderAlive.setStateChangeListener(createLeaderLivenessListener());
+    leaderAlive.scheduleTask();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Shutting down Azure job coordinator.");
+
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
+
+    // Resign leadership
+    if (azureLeaderElector.amILeader()) {
+      azureLeaderElector.resignLeadership();
+    }
+
+    // Shutdown all schedulers
+    shutdownSchedulers();
+
+    if (coordinatorListener != null) {
+      coordinatorListener.onCoordinatorStop();
+    }
+  }
+
+  @Override
+  public String getProcessorId() {
+    return processorId;
+  }
+
+  @Override
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return jobModel;
+  }
+
+  private void shutdownSchedulers() {
+    if (renewLease != null) {
+      renewLease.shutdown();
+    }
+    if (leaderBarrierScheduler != null) {
+      leaderBarrierScheduler.shutdown();
+    }
+    if (liveness != null) {
+      liveness.shutdown();
+    }
+    heartbeat.shutdown();
+    leaderAlive.shutdown();
+    versionUpgrade.shutdown();
+  }
+
+  /**
+   * Creates a listener for LeaderBarrierCompleteScheduler class.
+   * Invoked by the leader when it detects that rebalancing has completed by polling the processor table.
+   * Updates the barrier state on the blob to denote that the barrier has completed.
+   * Cancels all future tasks scheduled by the LeaderBarrierComplete scheduler to check if barrier has completed.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLeaderBarrierCompleteListener(String nextJMVersion, AtomicBoolean barrierTimeout) {
+    return () -> {
+      versionUpgradeDetected.getAndSet(false);
+      String state;
+      if (barrierTimeout.get()) {
+        LOG.error("Barrier timed out for version {}", nextJMVersion);
+        state = BarrierState.TIMEOUT.name() + " " + nextJMVersion;
+      } else {
+        LOG.info("Leader detected barrier completion.");
+        state = BarrierState.END.name() + " " + nextJMVersion;
+      }
+      if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) {
+        LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+        stop();
+        table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+      }
+      leaderBarrierScheduler.shutdown();
+    };
+  }
+
+  /**
+   * Creates a listener for LivenessCheckScheduler class.
+   * Invoked by the leader when the list of active processors in the system changes.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLivenessListener(AtomicReference<List<String>> liveProcessors) {
+    return () -> {
+      LOG.info("Leader detected change in list of live processors.");
+      doOnProcessorChange(liveProcessors.get());
+    };
+  }
+
+  /**
+   * Creates a listener for JMVersionUpgradeScheduler class.
+   * Invoked when the processor detects a job model version upgrade on the blob.
+   * Stops listening for job model version upgrades until rebalancing achieved.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createJMVersionUpgradeListener() {
+    return () -> {
+      LOG.info("Job model version upgrade detected.");
+      versionUpgradeDetected.getAndSet(true);
+      onNewJobModelAvailable(leaderBlob.getJobModelVersion());
+    };
+  }
+
+  /**
+   * Creates a listener for LeaderLivenessCheckScheduler class.
+   * Invoked when an existing leader dies. Enables the JC to participate in leader election again.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLeaderLivenessListener() {
+    return () -> {
+      LOG.info("Existing leader died.");
+      azureLeaderElector.tryBecomeLeader();
+    };
+  }
+
+  /**
+   * For each input stream specified in config, exactly determine its
+   * partitions, returning a set of SystemStreamPartitions containing them all.
+   */
+  private Set<SystemStreamPartition> getInputStreamPartitions() {
+    TaskConfig taskConfig = new TaskConfig(config);
+    scala.collection.immutable.Set<SystemStream> inputSystemStreams = taskConfig.getInputStreams();
+
+    // Get the set of partitions for each SystemStream from the stream metadata
+    Set<SystemStreamPartition>
+        sspSet = JavaConverters.mapAsJavaMapConverter(streamMetadataCache.getStreamMetadata(inputSystemStreams, true)).asJava()
+        .entrySet()
+        .stream()
+        .flatMap(this::mapSSMToSSP)
+        .collect(Collectors.toSet());
+
+    return sspSet;
+  }
+
+  private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> ssMs) {
+    return ssMs.getValue()
+        .getSystemStreamPartitionMetadata()
+        .keySet()
+        .stream()
+        .map(partition -> new SystemStreamPartition(ssMs.getKey(), partition));
+  }
+
+  /**
+   * Gets a SystemStreamPartitionGrouper object from the configuration.
+   */
+  private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
+    JobConfig jobConfig = new JobConfig(config);
+    String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
+    SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
+    return grouper;
+  }
+
+  private int getMaxNumTasks() {
+    // Do grouping to fetch TaskName to SSP mapping
+    Set<SystemStreamPartition> allSystemStreamPartitions = getInputStreamPartitions();
+    SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper();
+    Map<TaskName, Set<SystemStreamPartition>> groups = grouper.group(allSystemStreamPartitions);
+    LOG.info("SystemStreamPartitionGrouper " + grouper.toString() + " has grouped the SystemStreamPartitions into " + Integer.toString(groups.size()) +
+        " tasks with the following taskNames: {}", groups.keySet());
+    return groups.size();
+  }
+
+  /**
+   * Called only by the leader, either when the processor becomes the leader, or when the list of live processors changes.
+   * @param currentProcessorIds New updated list of processor IDs which caused the rebalancing.
+   */
+  private void doOnProcessorChange(List<String> currentProcessorIds) {
+    // if list of processors is empty - it means we are called from 'onBecomeLeader'
+
+    // Check if number of processors is greater than number of tasks
+    List<String> initialProcessorIds = new ArrayList<>(currentProcessorIds);
+    int numTasks = getMaxNumTasks();
+    if (currentProcessorIds.size() > numTasks) {
+      int iterator = 0;
+      while (currentProcessorIds.size() != numTasks) {
+        if (!currentProcessorIds.get(iterator).equals(processorId)) {
+          currentProcessorIds.remove(iterator);
+          iterator++;
+        }
+      }
+    }
+    LOG.info("currentProcessorIds = {}", currentProcessorIds);
+    LOG.info("initialProcessorIds = {}", initialProcessorIds);
+
+    String nextJMVersion;
+    String prevJMVersion = currentJMVersion.get();
+    JobModel prevJobModel = jobModel;
+    AtomicBoolean barrierTimeout = new AtomicBoolean(false);
+
+    if (currentProcessorIds.isEmpty()) {
+      if (currentJMVersion.get().equals(INITIAL_STATE)) {
+        nextJMVersion = "1";
+      } else {
+        nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+      }
+      currentProcessorIds = new ArrayList<>(table.getActiveProcessorsList(currentJMVersion));
+      initialProcessorIds = currentProcessorIds;
+    } else {
+      //Check if previous barrier not reached, then previous barrier times out.
+      String blobJMV = leaderBlob.getJobModelVersion();
+      nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+      if (blobJMV != null && Integer.valueOf(blobJMV) > Integer.valueOf(prevJMVersion)) {
+        prevJMVersion = blobJMV;
+        prevJobModel = leaderBlob.getJobModel();
+        nextJMVersion = Integer.toString(Integer.valueOf(blobJMV) + 1);
+        versionUpgradeDetected.getAndSet(false);
+        leaderBarrierScheduler.shutdown();
+        leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + blobJMV, azureLeaderElector.getLeaseId().get());
+      }
+    }
+
+    // Generate the new JobModel
+    JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(),
+        null, streamMetadataCache, currentProcessorIds);
+    LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
+
+    // Publish the new job model
+    boolean jmWrite = leaderBlob.publishJobModel(prevJobModel, newJobModel, prevJMVersion, nextJMVersion, azureLeaderElector.getLeaseId().get());
+    // Publish barrier state
+    boolean barrierWrite = leaderBlob.publishBarrierState(BarrierState.START.name() + " " + nextJMVersion, azureLeaderElector.getLeaseId().get());
+    barrierTimeout.set(false);
+    // Publish list of processors this function was called with
+    boolean processorWrite = leaderBlob.publishLiveProcessorList(initialProcessorIds, azureLeaderElector.getLeaseId().get());
+
+    //Shut down processor if write fails even after retries. These writes have an inherent retry policy.
+    if (!jmWrite || !barrierWrite || !processorWrite) {
+      LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+      stop();
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    }
+
+    LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
+
+    // Start scheduler to check if barrier reached
+    long startTime = System.currentTimeMillis();
+    leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(errorHandler, table, nextJMVersion, initialProcessorIds, startTime, barrierTimeout, currentJMVersion, processorId);
+    leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(nextJMVersion, barrierTimeout));
+    leaderBarrierScheduler.scheduleTask();
+  }
+
+  /**
+   * Called when the JC detects a job model version upgrade on the shared blob.
+   * @param nextJMVersion The new job model version after rebalancing.
+   */
+  private void onNewJobModelAvailable(final String nextJMVersion) {
+    LOG.info("pid=" + processorId + "new JobModel available with job model version {}", nextJMVersion);
+
+    //Get the new job model from blob
+    jobModel = leaderBlob.getJobModel();
+    LOG.info("pid=" + processorId + ": new JobModel available. ver=" + nextJMVersion + "; jm = " + jobModel);
+
+    if (!jobModel.getContainers().containsKey(processorId)) {
+      LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId);
+      stop();
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    } else {
+      //Stop current work
+      if (coordinatorListener != null) {
+        coordinatorListener.onJobModelExpired();
+      }
+      // Add entry with new job model version to the processor table
+      table.addProcessorEntity(nextJMVersion, processorId, azureLeaderElector.amILeader());
+
+      // Start polling blob to check if barrier reached
+      Random random = new Random();
+      String blobBarrierState = leaderBlob.getBarrierState();
+      while (true) {
+        if (blobBarrierState.equals(BarrierState.END.name() + " " + nextJMVersion)) {
+          LOG.info("Barrier completion detected by the worker for barrier version {}.", nextJMVersion);
+          versionUpgradeDetected.getAndSet(false);
+          onNewJobModelConfirmed(nextJMVersion);
+          break;
+        } else if (blobBarrierState.equals(BarrierState.TIMEOUT.name() + " " + nextJMVersion) ||
+            (Integer.valueOf(leaderBlob.getJobModelVersion()) > Integer.valueOf(nextJMVersion))) {
+          LOG.info("Barrier timed out for version number {}", nextJMVersion);
+          versionUpgradeDetected.getAndSet(false);
+          break;
+        } else {
+          try {
+            Thread.sleep(random.nextInt(5000));
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+          }
+          LOG.info("Checking for barrier state on the blob again...");
+          blobBarrierState = leaderBlob.getBarrierState();
+        }
+      }
+    }
+  }
+
+  /**
+   * Called when the JC detects that the barrier has completed by checking the barrier state on the blob.
+   * @param nextJMVersion The new job model version after rebalancing.
+   */
+  private void onNewJobModelConfirmed(final String nextJMVersion) {
+    LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed");
+
+    // Delete previous value
+    if (table.getEntity(currentJMVersion.get(), processorId) != null) {
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    }
+    if (table.getEntity(INITIAL_STATE, processorId) != null) {
+      table.deleteProcessorEntity(INITIAL_STATE, processorId);
+    }
+
+    //Start heartbeating to new entry only when barrier reached.
+    //Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version.
+    currentJMVersion.getAndSet(nextJMVersion);
+
+    //Start the container with the new model
+    if (coordinatorListener != null) {
+      coordinatorListener.onNewJobModel(processorId, jobModel);
+    }
+  }
+
+  private String createProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(String
+          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+
+  public class AzureLeaderElectorListener implements LeaderElectorListener {
+    /**
+     * Keep renewing the lease and do the required tasks as a leader.
+     */
+    @Override
+    public void onBecomingLeader() {
+      // Update table to denote that it is a leader.
+      table.updateIsLeader(currentJMVersion.get(), processorId, true);
+
+      // Schedule a task to renew the lease after a fixed time interval
+      LOG.info("Starting scheduler to keep renewing lease held by the leader.");
+      renewLease = new RenewLeaseScheduler((errorMsg) -> {
+          LOG.error(errorMsg);
+          table.updateIsLeader(currentJMVersion.get(), processorId, false);
+          azureLeaderElector.resignLeadership();
+          renewLease.shutdown();
+          liveness.shutdown();
+        }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
+      renewLease.scheduleTask();
+
+      doOnProcessorChange(new ArrayList<>());
+
+      // Start scheduler to check for change in list of live processors
+      LOG.info("Starting scheduler to check for change in list of live processors in the system.");
+      liveness = new LivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, processorId);
+      liveness.setStateChangeListener(createLivenessListener(liveness.getLiveProcessors()));
+      liveness.scheduleTask();
+    }
+  }
+}
\ No newline at end of file
  * under the License.
  */
 
-package org.apache.samza;
+package org.apache.samza.coordinator;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.coordinator.LeaderElector;
-import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.AzureException;
+import org.apache.samza.util.LeaseBlobManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,20 +58,19 @@ public class AzureLeaderElector implements LeaderElector {
    * Tries to become the leader by acquiring a lease on the blob.
    * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
    * Invokes the listener on becoming the leader.
+   * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
    */
   @Override
-  public void tryBecomeLeader() {
-    try {
-      leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
-      if (leaseId.get() != null) {
-        LOG.info("Became leader with lease ID {}.", leaseId.get());
-        isLeader.set(true);
-        if (leaderElectorListener != null) {
-          leaderElectorListener.onBecomingLeader();
-        }
+  public void tryBecomeLeader() throws AzureException {
+    leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+    if (leaseId.get() != null) {
+      LOG.info("Became leader with lease ID {}.", leaseId.get());
+      isLeader.set(true);
+      if (leaderElectorListener != null) {
+        leaderElectorListener.onBecomingLeader();
       }
-    } catch (AzureException e) {
-      LOG.error("Error while trying to acquire lease.", e);
+    } else {
+      LOG.info("Unable to become the leader. Continuing as a worker.");
     }
   }
 
@@ -107,5 +106,4 @@ public class AzureLeaderElector implements LeaderElector {
   public LeaseBlobManager getLeaseBlobManager() {
     return this.leaseBlobManager;
   }
-
 }
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
new file mode 100644 (file)
index 0000000..1c144de
--- /dev/null
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+/**
+ * Enum depicting different barrier states.
+ */
+public enum BarrierState {
+  START, END, TIMEOUT
+}
  * under the License.
  */
 
-package org.apache.samza;
+package org.apache.samza.coordinator.data;
 
 import com.microsoft.azure.storage.table.TableServiceEntity;
+import java.util.Random;
 
 
 /**
@@ -29,6 +30,7 @@ import com.microsoft.azure.storage.table.TableServiceEntity;
  * and boolean isLeader value which denotes whether the processor is a leader or not.
  */
 public class ProcessorEntity extends TableServiceEntity {
+  private Random rand = new Random();
   private int liveness;
   private boolean isLeader;
 
@@ -38,14 +40,16 @@ public class ProcessorEntity extends TableServiceEntity {
     this.partitionKey = jobModelVersion;
     this.rowKey = processorId;
     this.isLeader = false;
+    this.liveness = rand.nextInt(10000) + 2;
   }
 
   /**
    * Updates heartbeat by updating the liveness value in the table.
-   * @param value
+   * Sets the liveness field to a random integer value in order to update the last modified since timestamp of the row in the table.
+   * This asserts to the leader that the processor is alive.
    */
-  public void setLiveness(int value) {
-    liveness = value;
+  public void updateLiveness() {
+    liveness = rand.nextInt(10000) + 2;
   }
 
   public void setIsLeader(boolean leader) {
@@ -55,4 +59,4 @@ public class ProcessorEntity extends TableServiceEntity {
   public boolean getIsLeader() {
     return isLeader;
   }
-}
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
new file mode 100644 (file)
index 0000000..2abb380
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor for heartbeating to a row of the table.
+ * Heartbeats every 5 seconds.
+ * The row is determined by the job model version and processor id passed to the scheduler.
+ * All time units are in SECONDS.
+ */
+public class HeartbeatScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HeartbeatScheduler.class);
+  private static final long HEARTBEAT_DELAY_SEC = 5;
+  private static final ThreadFactory PROCESSOR_THREAD_FACTORY =
+      new ThreadFactoryBuilder().setNameFormat("HeartbeatScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final String processorId;
+  private final TableUtils table;
+  private final AtomicReference<String> currentJMVersion;
+  private final Consumer<String> errorHandler;
+
+  public HeartbeatScheduler(Consumer<String> errorHandler, TableUtils table, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.currentJMVersion = currentJMVersion;
+    processorId = pid;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          String currJVM = currentJMVersion.get();
+          LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
+          table.updateHeartbeat(currJVM, processorId);
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
+        }
+      }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down HeartbeatScheduler");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
new file mode 100644 (file)
index 0000000..ded014f
--- /dev/null
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor to check for job model version upgrades on the blob.
+ * Checks every 5 seconds.
+ * The processor polls the leader blob in order to track this.
+ * All time units are in SECONDS.
+ */
+public class JMVersionUpgradeScheduler implements TaskScheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(JMVersionUpgradeScheduler.class);
+  private static final long JMV_UPGRADE_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("JMVersionUpgradeScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicBoolean versionUpgradeDetected;
+  private final String processorId;
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+
+  public JMVersionUpgradeScheduler(Consumer<String> errorHandler, BlobUtils blob,
+      AtomicReference<String> currentJMVersion, AtomicBoolean versionUpgradeDetected, String processorId) {
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.versionUpgradeDetected = versionUpgradeDetected;
+    this.processorId = processorId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for job model version upgrade");
+          // Read job model version from the blob.
+          String blobJMV = blob.getJobModelVersion();
+          LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+          String blobBarrierState = blob.getBarrierState();
+          String currentJMV = currentJMVersion.get();
+          LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
+          String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
+          List<String> processorList = blob.getLiveProcessorList();
+          // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
+          if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+        }
+      }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down JMVersionUpgradeScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
new file mode 100644 (file)
index 0000000..7386fa9
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check if the barrier has completed.
+ * Checks every 15 seconds.
+ * The leader polls the Azure processor table in order to track this.
+ * The barrier is completed if all processors that are listed alive on the blob, have entries in the Azure table with the new job model version.
+ * All time units are in SECONDS.
+ */
+public class LeaderBarrierCompleteScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderBarrierCompleteScheduler.class);
+  private static final long BARRIER_REACHED_DELAY_SEC = 5;
+  private static final long BARRIER_TIMEOUT_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderBarrierCompleteScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final String nextJMVersion;
+  private final Set<String> blobProcessorSet;
+  private final long startTime;
+  private final AtomicBoolean barrierTimeout;
+  private final Consumer<String> errorHandler;
+  private final String processorId;
+  private final AtomicReference<String> currentJMVersion;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderBarrierCompleteScheduler(Consumer<String> errorHandler, TableUtils table, String nextJMVersion,
+      List<String> blobProcessorList, long startTime, AtomicBoolean barrierTimeout, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.nextJMVersion = nextJMVersion;
+    this.blobProcessorSet = new HashSet<>(blobProcessorList);
+    this.startTime = startTime;
+    this.barrierTimeout = barrierTimeout;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+    this.currentJMVersion = currentJMVersion;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+            barrierTimeout.getAndSet(true);
+            listener.onStateChange();
+          } else {
+            LOG.info("Leader checking for barrier state");
+            // Get processor IDs listed in the table that have the new job model verion.
+            Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
+            Set<String> tableProcessors = new HashSet<>();
+            for (ProcessorEntity entity : tableList) {
+              tableProcessors.add(entity.getRowKey());
+            }
+            LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
+            LOG.info("List of live processors as seen in the table = {}", tableProcessors);
+            if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
+              barrierTimeout.getAndSet(true);
+              listener.onStateChange();
+            } else if (blobProcessorSet.equals(tableProcessors)) {
+              listener.onStateChange();
+            }
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
+        }
+      }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderBarrierCompleteScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
new file mode 100644 (file)
index 0000000..e0fa448
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class invoked by each processor to check if the leader is alive.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LeaderLivenessCheckScheduler implements TaskScheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderLivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 10;
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderLivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final AtomicReference<String> currentJMVersion;
+  private final BlobUtils blob;
+  private final Consumer<String> errorHandler;
+  private final String initialState;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderLivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, String initialState) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.initialState = initialState;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for leader liveness");
+          if (!checkIfLeaderAlive()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  private boolean checkIfLeaderAlive() {
+    String currJMV = currentJMVersion.get();
+    String blobJMV = blob.getJobModelVersion();
+    //Get the leader processor row from the table.
+    Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV);
+    ProcessorEntity leader = null, nextLeader = null;
+    for (ProcessorEntity entity: tableList) {
+      if (entity.getIsLeader()) {
+        leader = entity;
+        break;
+      }
+    }
+    int currJMVInt = 0;
+    if (!currJMV.equals(initialState)) {
+      currJMVInt = Integer.valueOf(currJMV);
+    }
+    if (Integer.valueOf(blobJMV) > currJMVInt) {
+      for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
+        if (entity.getIsLeader()) {
+          nextLeader = entity;
+          break;
+        }
+      }
+    }
+    // Check if row hasn't been updated since 30 seconds.
+    if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= (
+        LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
new file mode 100644 (file)
index 0000000..d4715f3
--- /dev/null
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check for changes in the list of live processors.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LivenessCheckScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicReference<List<String>> liveProcessorsList = new AtomicReference<>(null);
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+  private final String processorId;
+
+  public LivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
+            scheduler.shutdownNow();
+            return;
+          }
+          LOG.info("Checking for list of live processors");
+          //Get the list of live processors published on the blob.
+          Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
+          //Get the list of live processors from the table. This is the current system state.
+          Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
+          //Invoke listener if the table list is not consistent with the blob list.
+          if (!liveProcessors.equals(currProcessors)) {
+            liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  public AtomicReference<List<String>> getLiveProcessors() {
+    return liveProcessorsList;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
new file mode 100644 (file)
index 0000000..f158122
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class to keep renewing the lease once an entity has acquired it.
+ * Renews every 45 seconds.
+ * All time units are in SECONDS.
+ */
+public class RenewLeaseScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RenewLeaseScheduler.class);
+  private static final long RENEW_LEASE_DELAY_SEC = 45;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("RenewLeaseScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final LeaseBlobManager leaseBlobManager;
+  private final AtomicReference<String> leaseId;
+  private final Consumer<String> errorHandler;
+
+  public RenewLeaseScheduler(Consumer<String> errorHandler, LeaseBlobManager leaseBlobManager, AtomicReference<String> leaseId) {
+    this.leaseBlobManager = leaseBlobManager;
+    this.leaseId = leaseId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Renewing lease");
+          boolean status = leaseBlobManager.renewLease(leaseId.get());
+          if (!status) {
+            errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+        }
+      }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down RenewLeaseScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
new file mode 100644 (file)
index 0000000..95fc4e1
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+/**
+ * Listener interface for Azure Job Coordinator, to track state changes and take necessary actions.
+ */
+public interface SchedulerStateChangeListener {
+
+  void onStateChange();
+
+}
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
new file mode 100644 (file)
index 0000000..63d6e24
--- /dev/null
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import java.util.concurrent.ScheduledFuture;
+
+
+/**
+ * Interface for scheduling tasks for Azure Job Coordinator.
+ */
+public interface TaskScheduler {
+
+  ScheduledFuture scheduleTask();
+
+  void setStateChangeListener(SchedulerStateChangeListener listener);
+
+  void shutdown();
+}
\ No newline at end of file
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza;
+package org.apache.samza.util;
 
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.StorageException;
@@ -30,6 +30,10 @@ import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.JobModelBundle;
+import org.apache.samza.SamzaException;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.serializers.model.SamzaObjectMapper;
 import org.eclipse.jetty.http.HttpStatus;
  * under the License.
  */
 
-package org.apache.samza;
+package org.apache.samza.util;
 
 import com.microsoft.azure.storage.AccessCondition;
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.samza.AzureException;
 import org.eclipse.jetty.http.HttpStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,7 @@ import org.slf4j.LoggerFactory;
 public class LeaseBlobManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
-  private CloudPageBlob leaseBlob;
+  private final CloudPageBlob leaseBlob;
 
   public LeaseBlobManager(CloudPageBlob leaseBlob) {
     this.leaseBlob = leaseBlob;
@@ -54,7 +55,7 @@ public class LeaseBlobManager {
     } catch (StorageException storageException) {
       int httpStatusCode = storageException.getHttpStatusCode();
       if (httpStatusCode == HttpStatus.CONFLICT_409) {
-        LOG.info("The blob you're trying to acquire is leased already.", storageException);
+        LOG.info("The blob you're trying to acquire is leased already.", storageException.getMessage());
       } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
         LOG.error("The blob you're trying to lease does not exist.", storageException);
         throw new AzureException(storageException);
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.samza;
+package org.apache.samza.util;
 
 import com.microsoft.azure.storage.StorageException;
 import com.microsoft.azure.storage.table.CloudTable;
@@ -29,6 +29,9 @@ import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.ProcessorEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,20 +47,23 @@ public class TableUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
   private static final String PARTITION_KEY = "PartitionKey";
-  private static final long CHECK_LIVENESS_DELAY = 30;
-  private static final String INITIAL_STATE = "unassigned";
-  private CloudTableClient tableClient;
-  private CloudTable table;
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private final String initialState;
+  private final CloudTableClient tableClient;
+  private final CloudTable table;
 
-  public TableUtils(AzureClient client, String tableName) {
+  public TableUtils(AzureClient client, String tableName, String initialState) {
+    this.initialState = initialState;
     tableClient = client.getTableClient();
     try {
       table = tableClient.getTableReference(tableName);
       table.createIfNotExists();
     } catch (URISyntaxException e) {
-      LOG.error("\nConnection string specifies an invalid URI.", new SamzaException(e));
+      LOG.error("\nConnection string specifies an invalid URI.", e);
+      throw new AzureException(e);
     } catch (StorageException e) {
-      LOG.error("Azure storage exception.", new SamzaException(e));
+      LOG.error("Azure storage exception.", e);
+      throw new AzureException(e);
     }
   }
 
@@ -65,14 +71,13 @@ public class TableUtils {
    * Add a row which denotes an active processor to the processor table.
    * @param jmVersion Job model version that the processor is operating on.
    * @param pid Unique processor ID.
-   * @param liveness Random heartbeat value.
    * @param isLeader Denotes whether the processor is a leader or not.
    * @throws AzureException If an Azure storage service error occurred.
    */
-  public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) {
+  public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) {
     ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
     entity.setIsLeader(isLeader);
-    entity.setLiveness(liveness);
+    entity.updateLiveness();
     TableOperation add = TableOperation.insert(entity);
     try {
       table.execute(add);
@@ -108,10 +113,9 @@ public class TableUtils {
   public void updateHeartbeat(String jmVersion, String pid) {
     try {
       Random rand = new Random();
-      int value = rand.nextInt(10000) + 2;
       TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
       ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      entity.setLiveness(value);
+      entity.updateLiveness();
       TableOperation update = TableOperation.replace(entity);
       table.execute(update);
     } catch (StorageException e) {
@@ -177,17 +181,20 @@ public class TableUtils {
     Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
     Set<String> activeProcessorsList = new HashSet<>();
     for (ProcessorEntity entity: tableList) {
-      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
+      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
         activeProcessorsList.add(entity.getRowKey());
       }
     }
 
-    Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(INITIAL_STATE);
+    Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(initialState);
     for (ProcessorEntity entity: unassignedList) {
-      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
+      long temp = System.currentTimeMillis() - entity.getTimestamp().getTime();
+      LOG.info("Time elapsed since last heartbeat: {}", temp);
+      if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
         activeProcessorsList.add(entity.getRowKey());
       }
     }
+    LOG.info("Active processors list: {}", activeProcessorsList);
     return activeProcessorsList;
   }