SAMZA-2103: [samza-aws] code cleanup and refactoring (#914)
authorAndrei Paikin <andreypaykin@gmail.com>
Wed, 13 Feb 2019 15:14:52 +0000 (18:14 +0300)
committerJacob Maes <jacob.maes@gmail.com>
Wed, 13 Feb 2019 15:14:52 +0000 (07:14 -0800)
samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisRecordProcessor.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumer.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/KinesisSystemConsumerOffset.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/consumer/SSPAllocator.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java
samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java
samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisRecordProcessor.java
samza-aws/src/test/java/org/apache/samza/system/kinesis/consumer/TestKinesisSystemConsumer.java

index 53ff27f..666ebde 100644 (file)
@@ -63,7 +63,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
   private final SystemStreamPartition ssp;
 
   private String shardId;
-  private KinesisRecordProcessorListener listener;
+  private final KinesisRecordProcessorListener listener;
   private IRecordProcessorCheckpointer checkpointer;
   private ExtendedSequenceNumber initSeqNumber;
 
@@ -85,7 +85,7 @@ public class KinesisRecordProcessor implements IRecordProcessor {
    */
   @Override
   public void initialize(InitializationInput initializationInput) {
-    Validate.isTrue(listener != null, "There is no listener set for the processor.");
+    Validate.notNull(listener, "There is no listener set for the processor.");
     initSeqNumber = initializationInput.getExtendedSequenceNumber();
     shardId = initializationInput.getShardId();
     LOG.info("Initialization done for {} with sequence {}", this,
index 6afffd3..b9808f7 100644 (file)
@@ -212,8 +212,8 @@ public class KinesisSystemConsumer extends BlockingEnvelopeMap implements Checkp
   }
 
   @Override
-  public void onCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
-    LOG.info("onCheckpoint called with sspOffsets {}", sspOffsets);
+  public void afterCheckpoint(Map<SystemStreamPartition, String> sspOffsets) {
+    LOG.info("afterCheckpoint called with sspOffsets {}", sspOffsets);
     sspOffsets.forEach((ssp, offset) -> {
         KinesisRecordProcessor processor = processors.get(ssp);
         KinesisSystemConsumerOffset kinesisOffset = KinesisSystemConsumerOffset.parse(offset);
index 13296ca..541c19c 100644 (file)
@@ -40,9 +40,9 @@ import org.codehaus.jackson.annotate.JsonProperty;
 public class KinesisSystemConsumerOffset {
 
   @JsonProperty("shardId")
-  private String shardId;
+  private final String shardId;
   @JsonProperty("seqNumber")
-  private String seqNumber;
+  private final String seqNumber;
 
   @JsonCreator
   KinesisSystemConsumerOffset(@JsonProperty("shardId") String shardId,
@@ -91,10 +91,7 @@ public class KinesisSystemConsumerOffset {
       return false;
     }
     String thatSeqNumber = ((KinesisSystemConsumerOffset) o).getSeqNumber();
-    if (!(seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber))) {
-      return false;
-    }
-    return true;
+    return seqNumber == null ? thatSeqNumber == null : seqNumber.equals(thatSeqNumber);
   }
 
   @Override
index 4b7cff8..83b6b36 100644 (file)
@@ -46,10 +46,10 @@ class SSPAllocator {
   private final Map<String, Set<SystemStreamPartition>> availableSsps = new HashMap<>();
 
   synchronized SystemStreamPartition allocate(String stream) throws NoAvailablePartitionException {
-    Validate.isTrue(availableSsps.get(stream) != null,
+    Validate.notNull(availableSsps.get(stream),
         String.format("availableSsps is null for stream %s", stream));
 
-    if (availableSsps.get(stream).size() <= 0) {
+    if (availableSsps.get(stream).isEmpty()) {
       // Set a flag in system consumer so that it could throw an exception in the subsequent poll.
       throw new NoAvailablePartitionException(String.format("More shards detected for stream %s than initially"
           + " registered. Could be the result of dynamic resharding.", stream));
index 1c2e0a2..fcce49e 100644 (file)
@@ -67,7 +67,7 @@ public class KinesisInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public KinesisInputDescriptor<StreamMessageType> withRegion(String region) {
-    this.region = Optional.of(StringUtils.stripToNull(region));
+    this.region = Optional.ofNullable(StringUtils.stripToNull(region));
     return this;
   }
 
@@ -77,7 +77,7 @@ public class KinesisInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) {
-    this.accessKey = Optional.of(StringUtils.stripToNull(accessKey));
+    this.accessKey = Optional.ofNullable(StringUtils.stripToNull(accessKey));
     return this;
   }
 
@@ -87,7 +87,7 @@ public class KinesisInputDescriptor<StreamMessageType>
    * @return this input descriptor
    */
   public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) {
-    this.secretKey = Optional.of(StringUtils.stripToNull(secretKey));
+    this.secretKey = Optional.ofNullable(StringUtils.stripToNull(secretKey));
     return this;
   }
 
@@ -110,13 +110,13 @@ public class KinesisInputDescriptor<StreamMessageType>
     String clientConfigPrefix =
         String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId);
 
-    this.region.ifPresent(
+    region.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val));
-    this.accessKey.ifPresent(
+    accessKey.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val));
-    this.secretKey.ifPresent(
+    secretKey.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val));
-    this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
+    kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v));
 
     return config;
   }
index ffeb667..678dfe6 100644 (file)
@@ -72,7 +72,7 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
    * @return this system descriptor
    */
   public KinesisSystemDescriptor withRegion(String region) {
-    this.region = Optional.of(StringUtils.stripToNull(region));
+    this.region = Optional.ofNullable(StringUtils.stripToNull(region));
     return this;
   }
 
@@ -102,7 +102,7 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
    * @return this system descriptor
    */
   public KinesisSystemDescriptor withProxyHost(String proxyHost) {
-    this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost));
+    this.proxyHost = Optional.ofNullable(StringUtils.stripToNull(proxyHost));
     return this;
   }
 
@@ -121,18 +121,18 @@ public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescr
     Map<String, String> config = new HashMap<>(super.toConfig());
     String systemName = getSystemName();
 
-    this.region.ifPresent(
+    region.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val));
-    this.proxyHost.ifPresent(
+    proxyHost.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val));
-    this.proxyPort.ifPresent(
+    proxyPort.ifPresent(
         val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val)));
 
-    final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
-    this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
+    String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName);
+    kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v));
 
-    final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
-    this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
+    String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName);
+    awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v));
 
     return config;
   }
index 6379fcc..6f1f052 100644 (file)
@@ -56,20 +56,16 @@ public class TestKinesisRecordProcessor {
       KinesisRecordProcessor.POLL_INTERVAL_DURING_PARENT_SHARD_SHUTDOWN_MS + 1000;
 
   @Test
-  public void testLifeCycleWithEvents() throws InterruptedException, ShutdownException, InvalidStateException,
-                                               NoSuchFieldException, IllegalAccessException {
+  public void testLifeCycleWithEvents() {
     testLifeCycleHelper(5);
   }
 
   @Test
-  public void testLifeCycleWithNoEvents() throws InterruptedException, ShutdownException, InvalidStateException,
-                                                 NoSuchFieldException, IllegalAccessException {
+  public void testLifeCycleWithNoEvents() {
     testLifeCycleHelper(0);
   }
 
-  private void testLifeCycleHelper(int numRecords) throws InterruptedException, ShutdownException,
-                                                          InvalidStateException, NoSuchFieldException,
-                                                          IllegalAccessException {
+  private void testLifeCycleHelper(int numRecords) {
     String system = "kinesis";
     String stream = "stream";
     final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -102,7 +98,7 @@ public class TestKinesisRecordProcessor {
     // Verification steps
 
     // Verify there is a receivedRecords call to listener.
-    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+    Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
 
     if (numRecords > 0) {
       // Call checkpoint on last record
@@ -114,7 +110,7 @@ public class TestKinesisRecordProcessor {
     shutDownProcessor(processor, ShutdownReason.ZOMBIE);
 
     // Verify that the processor is shutdown.
-    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+    Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
   }
 
   /**
@@ -125,8 +121,7 @@ public class TestKinesisRecordProcessor {
    * before it processed any records.
    */
   @Test
-  public void testCheckpointAfterInit() throws InterruptedException, ShutdownException, InvalidStateException,
-                                               NoSuchFieldException, IllegalAccessException {
+  public void testCheckpointAfterInit() {
     String system = "kinesis";
     String stream = "stream";
     final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -160,26 +155,21 @@ public class TestKinesisRecordProcessor {
     shutDownProcessor(processor, ShutdownReason.ZOMBIE);
 
     // Verify that the processor is shutdown.
-    Assert.assertTrue("Unable to shutdown processor.", receivedShutdownLatch.getCount() == 0);
+    Assert.assertEquals("Unable to shutdown processor.", 0, receivedShutdownLatch.getCount());
   }
 
   @Test
-  public void testShutdownDuringReshardWithEvents() throws InterruptedException, ShutdownException,
-                                                           InvalidStateException, NoSuchFieldException,
-                                                           IllegalAccessException {
+  public void testShutdownDuringReshardWithEvents() throws InterruptedException {
     testShutdownDuringReshardHelper(5);
   }
 
   @Test
-  public void testShutdownDuringReshardWithNoEvents() throws InterruptedException, ShutdownException,
-                                                             InvalidStateException, NoSuchFieldException,
-                                                             IllegalAccessException {
+  public void testShutdownDuringReshardWithNoEvents() throws InterruptedException {
     testShutdownDuringReshardHelper(0);
   }
 
   private void testShutdownDuringReshardHelper(int numRecords)
-      throws InterruptedException, ShutdownException, InvalidStateException, NoSuchFieldException,
-             IllegalAccessException {
+      throws InterruptedException {
     String system = "kinesis";
     String stream = "stream";
     final CountDownLatch receivedShutdownLatch = new CountDownLatch(1);
@@ -212,7 +202,7 @@ public class TestKinesisRecordProcessor {
     // Verification steps
 
     // Verify there is a receivedRecords call to listener.
-    Assert.assertTrue("Unable to receive records.", receivedRecordsLatch.getCount() == 0);
+    Assert.assertEquals("Unable to receive records.", 0, receivedRecordsLatch.getCount());
 
     // Call shutdown (with TERMINATE reason) on processor and verify that the processor does not call shutdown on the
     // listener until checkpoint is called for the last record consumed from shard.
@@ -240,7 +230,7 @@ public class TestKinesisRecordProcessor {
   }
 
   static Map<KinesisRecordProcessor, List<Record>> generateRecords(int numRecordsPerShard,
-      List<KinesisRecordProcessor> processors) throws ShutdownException, InvalidStateException {
+      List<KinesisRecordProcessor> processors) {
     Map<KinesisRecordProcessor, List<Record>> processorRecordMap = new HashMap<>();
     processors.forEach(processor -> {
         try {
@@ -298,4 +288,4 @@ public class TestKinesisRecordProcessor {
     }
     return records;
   }
-}
\ No newline at end of file
+}
index ade02ac..fe7fa96 100644 (file)
@@ -65,8 +65,7 @@ public class TestKinesisSystemConsumer {
   private static final String SYSTEM_CONSUMER_REGISTER_OFFSET = "0000"; // Could be any string
 
   @Test
-  public void testProcessRecords() throws InterruptedException, ShutdownException, InvalidStateException,
-                                          NoSuchFieldException, IllegalAccessException {
+  public void testProcessRecords() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
     String system = "kinesis";
     String stream = "stream";
     int numShards = 2;
@@ -76,9 +75,7 @@ public class TestKinesisSystemConsumer {
   }
 
   @Test
-  public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, ShutdownException,
-                                                             InvalidStateException, NoSuchFieldException,
-                                                             IllegalAccessException {
+  public void testProcessRecordsWithEmptyRecordList() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
     String system = "kinesis";
     String stream = "stream";
     int numShards = 1;
@@ -96,8 +93,7 @@ public class TestKinesisSystemConsumer {
    * 5. Shutting down (due to re-assignment or lease expiration) record processors.
    */
   private void testProcessRecordsHelper(String system, String stream, int numShards, int numRecordsPerShard)
-      throws InterruptedException, ShutdownException, InvalidStateException,
-             NoSuchFieldException, IllegalAccessException {
+      throws InterruptedException, NoSuchFieldException, IllegalAccessException {
 
     KinesisConfig kConfig = new KinesisConfig(new MapConfig());
     // Create consumer
@@ -140,24 +136,22 @@ public class TestKinesisSystemConsumer {
         try {
           KinesisRecordProcessor processor = sspToProcessorMap.get(ssp);
 
-          if (numRecordsPerShard > 0) {
-            // Verify that the read messages are received in order and are the same as input records
-            Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
-            List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
-            List<Record> inputRecords = inputRecordMap.get(processor);
-            verifyRecords(envelopes, inputRecords, processor.getShardId());
-
-            // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
-            IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
-            consumer.onCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
-            ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
-            verify(getCheckpointer(processor)).checkpoint(argument.capture());
-            Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
-          }
+          // Verify that the read messages are received in order and are the same as input records
+          Assert.assertEquals(messages.get(ssp).size(), numRecordsPerShard);
+          List<IncomingMessageEnvelope> envelopes = messages.get(ssp);
+          List<Record> inputRecords = inputRecordMap.get(processor);
+          verifyRecords(envelopes, inputRecords, processor.getShardId());
+
+          // Call checkpoint on consumer and verify that the checkpoint is called with the right offset
+          IncomingMessageEnvelope lastEnvelope = envelopes.get(envelopes.size() - 1);
+          consumer.afterCheckpoint(Collections.singletonMap(ssp, lastEnvelope.getOffset()));
+          ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+          verify(getCheckpointer(processor)).checkpoint(argument.capture());
+          Assert.assertEquals(inputRecords.get(inputRecords.size() - 1).getSequenceNumber(), argument.getValue());
 
           // Call shutdown (with ZOMBIE reason) on processor and verify if shutdown freed the ssp mapping
           shutDownProcessor(processor, ShutdownReason.ZOMBIE);
-          Assert.assertTrue(!sspToProcessorMap.containsValue(processor));
+          Assert.assertFalse(sspToProcessorMap.containsValue(processor));
           Assert.assertTrue(isSspAvailable(consumer, ssp));
         } catch (NoSuchFieldException | IllegalAccessException | InvalidStateException | ShutdownException ex) {
           throw new RuntimeException(ex);
@@ -221,7 +215,7 @@ public class TestKinesisSystemConsumer {
         Assert.assertEquals(kinesisMessageEnvelope.getShardId(), shardId);
         ByteBuffer outputData = ByteBuffer.wrap((byte[]) kinesisMessageEnvelope.getMessage());
         record.getData().rewind();
-        Assert.assertTrue(outputData.equals(record.getData()));
+        Assert.assertEquals(outputData, record.getData());
         verifyOffset(envelope.getOffset(), record, shardId);
       });
   }