SAMZA-2102: [samza-azure] code cleanup and refactoring (#913)
authorAndrei Paikin <andreypaykin@gmail.com>
Wed, 13 Feb 2019 15:39:13 +0000 (18:39 +0300)
committerJacob Maes <jacob.maes@gmail.com>
Wed, 13 Feb 2019 15:39:13 +0000 (07:39 -0800)
samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubClientManager.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubIncomingMessageEnvelope.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsInputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsOutputDescriptor.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/descriptors/EventHubsSystemDescriptor.java
samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
samza-azure/src/test/java/org/apache/samza/checkpoint/azure/ITestAzureCheckpointManager.java

index 172a0f3..8cddc4c 100644 (file)
@@ -37,8 +37,8 @@ public class AzureLock implements DistributedLockWithState {
 
   private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
   private static final int LEASE_TIME_IN_SEC = 60;
-  private AtomicBoolean hasLock;
-  private AtomicReference<String> leaseId;
+  private final AtomicBoolean hasLock;
+  private final AtomicReference<String> leaseId;
   private final LeaseBlobManager leaseBlobManager;
 
   public AzureLock(BlobUtils blobUtils) {
@@ -97,4 +97,4 @@ public class AzureLock implements DistributedLockWithState {
       LOG.info("Unable to unlock.");
     }
   }
-}
\ No newline at end of file
+}
index e0fa448..59a8123 100644 (file)
@@ -84,32 +84,19 @@ public class LeaderLivenessCheckScheduler implements TaskScheduler {
     String currJMV = currentJMVersion.get();
     String blobJMV = blob.getJobModelVersion();
     //Get the leader processor row from the table.
-    Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV);
-    ProcessorEntity leader = null, nextLeader = null;
-    for (ProcessorEntity entity: tableList) {
-      if (entity.getIsLeader()) {
-        leader = entity;
-        break;
-      }
-    }
-    int currJMVInt = 0;
-    if (!currJMV.equals(initialState)) {
-      currJMVInt = Integer.valueOf(currJMV);
-    }
-    if (Integer.valueOf(blobJMV) > currJMVInt) {
-      for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
-        if (entity.getIsLeader()) {
-          nextLeader = entity;
-          break;
-        }
-      }
-    }
+    ProcessorEntity leader = getLeader(currJMV);
+    int currJMVInt = currJMV.equals(initialState) ? 0 : Integer.valueOf(currJMV);
+    ProcessorEntity nextLeader = Integer.valueOf(blobJMV) > currJMVInt ? getLeader(blobJMV) : null;
     // Check if row hasn't been updated since 30 seconds.
-    if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= (
-        LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
-      return false;
+    boolean leaderIsAlive = leader != null && (System.currentTimeMillis() - leader.getTimestamp().getTime() < (LIVENESS_DEBOUNCE_TIME_SEC * 1000));
+    return leaderIsAlive || nextLeader != null;
+  }
+
+  private ProcessorEntity getLeader(String jmv) {
+    for (ProcessorEntity entity: table.getEntitiesWithPartition(jmv)) {
+      if (entity.getIsLeader()) return entity;
     }
-    return true;
+    return null;
   }
 
   @Override
@@ -117,4 +104,4 @@ public class LeaderLivenessCheckScheduler implements TaskScheduler {
     LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
     scheduler.shutdownNow();
   }
-}
\ No newline at end of file
+}
index 0b4f18f..fcd736c 100644 (file)
@@ -40,7 +40,7 @@ public interface EventHubClientManager {
    * denote that the close invocation should block until all the teardown
    * operations for the {@link EventHubClient} are completed
    */
-  public static int BLOCK_UNTIL_CLOSE = -1;
+  int BLOCK_UNTIL_CLOSE = -1;
 
   /**
    * Lifecycle hook to perform initializations for the creation of
index 27abe07..44353d2 100644 (file)
@@ -50,7 +50,7 @@ import java.util.concurrent.TimeoutException;
 
 public class EventHubSystemAdmin implements SystemAdmin {
   private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemAdmin.class);
-  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
+  private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
 
   private final EventHubClientManagerFactory eventHubClientManagerFactory;
   private final String systemName;
@@ -83,21 +83,18 @@ public class EventHubSystemAdmin implements SystemAdmin {
   }
 
   // PartitionRuntimeInformation does not implement toString()
-  private String printPartitionRuntimeInfo(PartitionRuntimeInformation runtimeInformation) {
-    if (runtimeInformation == null) {
+  private String printPartitionRuntimeInfo(PartitionRuntimeInformation runtimeInfo) {
+    if (runtimeInfo == null) {
       return "[PartitionRuntimeInformation: null]";
     }
-    StringBuilder stringBuilder = new StringBuilder();
-    stringBuilder.append("[PartitionRuntimeInformation:");
-    stringBuilder.append(" eventHubPath=").append(runtimeInformation.getEventHubPath());
-    stringBuilder.append(" partitionId=").append(runtimeInformation.getPartitionId());
-    stringBuilder.append(" lastEnqueuedTimeUtc=").append(runtimeInformation.getLastEnqueuedTimeUtc().toString());
-    stringBuilder.append(" lastEnqueuedOffset=").append(runtimeInformation.getLastEnqueuedOffset());
     // calculate the number of messages in the queue
-    stringBuilder.append(" numMessages=")
-        .append(runtimeInformation.getLastEnqueuedSequenceNumber() - runtimeInformation.getBeginSequenceNumber());
-    stringBuilder.append("]");
-    return stringBuilder.toString();
+    return "[PartitionRuntimeInformation:"
+      + " eventHubPath=" + runtimeInfo.getEventHubPath()
+      + " partitionId=" + runtimeInfo.getPartitionId()
+      + " lastEnqueuedTimeUtc=" + runtimeInfo.getLastEnqueuedTimeUtc()
+      + " lastEnqueuedOffset=" + runtimeInfo.getLastEnqueuedOffset()
+      + " numMessages=" + (runtimeInfo.getLastEnqueuedSequenceNumber() - runtimeInfo.getBeginSequenceNumber())
+      + "]";
   }
 
   @Override
index 8aa7480..bcd9c2d 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.system.SystemStreamPartition;
  * Extension of {@link IncomingMessageEnvelope} which contains {@link EventData} system and user properties metadata
  */
 public class EventHubIncomingMessageEnvelope extends IncomingMessageEnvelope {
-  private EventData eventData;
+  private final EventData eventData;
 
   public EventHubIncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key,
                                          Object message, EventData eventData) {
index df98d5b..887b3fe 100644 (file)
@@ -104,7 +104,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemConsumer.class);
 
   // Overall timeout for EventHubClient exponential backoff policy
-  private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10L);
+  private static final Duration DEFAULT_EVENTHUB_RECEIVER_TIMEOUT = Duration.ofMinutes(10);
   private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofSeconds(15).toMillis();
 
   public static final String START_OF_STREAM = ClientConstants.START_OF_STREAM; // -1
@@ -394,28 +394,24 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private synchronized void shutdownEventHubsManagers() {
     // There could be potentially many Receivers and EventHubManagers, so close the managers in parallel
     LOG.info("Start shutting down eventhubs receivers");
-    ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() {
-      @Override
-      public void run() {
+    ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver ->
+      (Runnable) () -> {
         try {
           receiver.close().get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
         } catch (Exception e) {
           LOG.error("Failed to shutdown receiver.", e);
         }
-      }
-    }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+      }).collect(Collectors.toList()), "EventHubSystemConsumer.Receiver#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
 
     LOG.info("Start shutting down eventhubs managers");
-    ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager -> new Runnable() {
-      @Override
-      public void run() {
+    ShutdownUtil.boundedShutdown(perPartitionEventHubManagers.values().stream().map(manager ->
+      (Runnable) () -> {
         try {
           manager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
         } catch (Exception e) {
           LOG.error("Failed to shutdown eventhubs manager.", e);
         }
-      }
-    }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
+      }).collect(Collectors.toList()), "EventHubSystemConsumer.ClientManager#close", DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
 
     perPartitionEventHubManagers.clear();
     perStreamEventHubManagers.clear();
@@ -447,7 +443,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     private final Counter errorRate;
     private final Interceptor interceptor;
     private final Integer maxEventCount;
-    SystemStreamPartition ssp;
+    private final SystemStreamPartition ssp;
 
     PartitionReceiverHandlerImpl(SystemStreamPartition ssp, Counter eventReadRate, Counter eventByteReadRate,
         SamzaHistogram readLatency, Counter readErrors, Interceptor interceptor, int maxEventCount) {
@@ -521,7 +517,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
               throwable);
           try {
             // Add a fixed delay so that we don't keep retrying when there are long-lasting failures
-            Thread.sleep(Duration.ofSeconds(2).toMillis());
+            TimeUnit.SECONDS.sleep(2);
           } catch (InterruptedException e) {
             LOG.warn("Interrupted during sleep before renew", e);
           }
index c8cc36b..de4e275 100644 (file)
@@ -43,8 +43,8 @@ import org.apache.samza.system.eventhub.EventHubConfig;
  */
 public class EventHubsInputDescriptor<StreamMessageType>
     extends InputDescriptor<StreamMessageType, EventHubsInputDescriptor<StreamMessageType>> {
-  private String namespace;
-  private String entityPath;
+  private final String namespace;
+  private final String entityPath;
   private Optional<String> sasKeyName = Optional.empty();
   private Optional<String> sasToken = Optional.empty();
   private Optional<String> consumerGroup = Optional.empty();
@@ -76,7 +76,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public EventHubsInputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
-    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    this.sasKeyName = Optional.ofNullable(StringUtils.stripToNull(sasKeyName));
     return this;
   }
 
@@ -87,7 +87,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public EventHubsInputDescriptor<StreamMessageType> withSasKey(String sasToken) {
-    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    this.sasToken = Optional.ofNullable(StringUtils.stripToNull(sasToken));
     return this;
   }
 
@@ -99,13 +99,13 @@ public class EventHubsInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public EventHubsInputDescriptor<StreamMessageType> withConsumerGroup(String consumerGroup) {
-    this.consumerGroup = Optional.of(StringUtils.stripToNull(consumerGroup));
+    this.consumerGroup = Optional.ofNullable(StringUtils.stripToNull(consumerGroup));
     return this;
   }
 
   @Override
   public Map<String, String> toConfig() {
-    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+    Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
 
     String streamId = getStreamId();
 
@@ -116,7 +116,7 @@ public class EventHubsInputDescriptor<StreamMessageType>
         ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamId), keyName));
     sasToken.ifPresent(key ->
         ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamId), key));
-    this.consumerGroup.ifPresent(consumerGroupName ->
+    consumerGroup.ifPresent(consumerGroupName ->
         ehConfigs.put(String.format(EventHubConfig.CONFIG_STREAM_CONSUMER_GROUP, streamId), consumerGroupName));
     return ehConfigs;
   }
index b3e1c59..2b3b469 100644 (file)
@@ -43,8 +43,8 @@ import org.apache.samza.system.eventhub.EventHubConfig;
  */
 public class EventHubsOutputDescriptor<StreamMessageType>
     extends OutputDescriptor<StreamMessageType, EventHubsOutputDescriptor<StreamMessageType>> {
-  private String namespace;
-  private String entityPath;
+  private final String namespace;
+  private final String entityPath;
   private Optional<String> sasKeyName = Optional.empty();
   private Optional<String> sasToken = Optional.empty();
 
@@ -63,7 +63,7 @@ public class EventHubsOutputDescriptor<StreamMessageType>
     this.namespace = StringUtils.stripToNull(namespace);
     this.entityPath = StringUtils.stripToNull(entityPath);
     if (this.namespace == null || this.entityPath == null) {
-      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in " //
+      throw new ConfigException(String.format("Missing namespace and entity path Event Hubs output descriptor in "
           + "system: {%s}, stream: {%s}", getSystemName(), streamId));
     }
   }
@@ -75,7 +75,7 @@ public class EventHubsOutputDescriptor<StreamMessageType>
    * @return this output descriptor
    */
   public EventHubsOutputDescriptor<StreamMessageType> withSasKeyName(String sasKeyName) {
-    this.sasKeyName = Optional.of(StringUtils.stripToNull(sasKeyName));
+    this.sasKeyName = Optional.ofNullable(StringUtils.stripToNull(sasKeyName));
     return this;
   }
 
@@ -86,13 +86,13 @@ public class EventHubsOutputDescriptor<StreamMessageType>
    * @return this output descriptor
    */
   public EventHubsOutputDescriptor<StreamMessageType> withSasKey(String sasToken) {
-    this.sasToken = Optional.of(StringUtils.stripToNull(sasToken));
+    this.sasToken = Optional.ofNullable(StringUtils.stripToNull(sasToken));
     return this;
   }
 
   @Override
   public Map<String, String> toConfig() {
-    HashMap<String, String> ehConfigs = new HashMap<>(super.toConfig());
+    Map<String, String> ehConfigs = new HashMap<>(super.toConfig());
 
     String streamId = getStreamId();
 
index 2084018..94dbbbd 100644 (file)
@@ -42,7 +42,7 @@ import org.apache.samza.system.eventhub.producer.EventHubSystemProducer.Partitio
 public class EventHubsSystemDescriptor extends SystemDescriptor<EventHubsSystemDescriptor> {
   private static final String FACTORY_CLASS_NAME = EventHubSystemFactory.class.getName();
 
-  private List<String> streamIds = new ArrayList<>();
+  private final List<String> streamIds = new ArrayList<>();
   private Optional<Integer> fetchRuntimeInfoTimeout = Optional.empty();
   private Optional<Integer> numClientThreads = Optional.empty();
   private Optional<Integer> consumerReceiveQueueSize = Optional.empty();
index 85e4273..2e4a27f 100644 (file)
@@ -129,8 +129,7 @@ public class BlobUtils {
       LOG.error("Job Model details don't exist on the blob.");
       return null;
     }
-    JobModel jm = jmBundle.getCurrJobModel();
-    return jm;
+    return jmBundle.getCurrJobModel();
   }
 
   /**
@@ -146,8 +145,7 @@ public class BlobUtils {
       LOG.error("Job Model details don't exist on the blob.");
       return null;
     }
-    String jmVersion = jmBundle.getCurrJobModelVersion();
-    return jmVersion;
+    return jmBundle.getCurrJobModelVersion();
   }
 
   /**
@@ -192,14 +190,12 @@ public class BlobUtils {
       LOG.error("Failed to read barrier state from blob.", e);
       throw new AzureException(e);
     }
-    String state;
     try {
-      state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
+      return SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
     } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
+      LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for barrier state retrieved from the blob.", e);
       throw new SamzaException(e);
     }
-    return state;
   }
 
   /**
@@ -242,14 +238,12 @@ public class BlobUtils {
       LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
       throw new AzureException(e);
     }
-    List<String> list;
     try {
-      list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
+      return SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
     } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
+      LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for live processor list retrieved from the blob", new SamzaException(e));
       throw new SamzaException(e);
     }
-    return list;
   }
 
   public CloudBlobClient getBlobClient() {
@@ -273,12 +267,11 @@ public class BlobUtils {
       throw new AzureException(e);
     }
     try {
-      JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
-      return jmBundle;
+      return SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
     } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
+      LOG.error("Failed to parse byte data: " + Arrays.toString(data) + " for JobModel details retrieved from the blob", e);
       throw new SamzaException(e);
     }
   }
 
-}
\ No newline at end of file
+}
index 4560b11..7cf01c0 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.checkpoint.azure;
 
-import junit.framework.Assert;
 import org.apache.samza.Partition;
 import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.CheckpointManager;