SAMZA-1706: lazy initialization for eventhub system producer
authorHai Lu <halu@linkedin.com>
Wed, 9 May 2018 22:32:44 +0000 (15:32 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Wed, 9 May 2018 22:32:44 +0000 (15:32 -0700)
We are seeing slow shutdown issue for eventhub system producers for users who only use eventhub consumer (but then Samza system creates both consumer and producer together no matter what). As a workaround, add lazy initialization to the producer to avoid the slow shutdown

Author: Hai Lu <halu@linkedin.com>

Reviewers: Jagadish <jagadish@apache.org>

Closes #511 from lhaiesp/master

samza-azure/src/main/java/org/apache/samza/system/eventhub/producer/EventHubSystemProducer.java

index 690e59e..b9afea7 100644 (file)
@@ -95,6 +95,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
 
   private volatile boolean isStarted = false;
 
+  // We implement lazy initialization for producer as a workaround for
+  // slow shutdown issue.
+  private boolean isInitialized = false;
+
   /**
    * Per partition event hub client. Partitions from the same stream may share the same client,
    * depends on config PerPartitionConnection. See {@link EventHubConfig}
@@ -127,9 +131,12 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     this.interceptors = interceptors;
     this.maxMessageSize = config.getSkipMessagesLargerThan(systemName);
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
+  }
+
+  private void init() {
+    LOG.info("Initializing EventHubSystemProducer");
     // Fetches the stream ids
     List<String> streamIds = config.getStreams(systemName);
-
     // Create and initiate connections to Event Hubs
     // even if PerPartitionConnection == true, we still need a stream level event hub for initial metadata (fetching
     // partition count)
@@ -139,6 +146,40 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
       perStreamEventHubClientManagers.put(streamId, ehClient);
       ehClient.init();
     }
+
+    // Create partition senders if required
+    if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
+      // Create all partition senders
+      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
+          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
+
+          try {
+            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
+            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
+            Integer numPartitions =
+                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
+
+            for (int i = 0; i < numPartitions; i++) {
+              String partitionId = String.valueOf(i);
+              EventHubClientManager perPartitionClientManager =
+                  createOrGetEventHubClientManagerForPartition(streamId, i);
+              PartitionSender partitionSender =
+                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
+              partitionSenders.put(i, partitionSender);
+            }
+
+            streamPartitionSenders.put(streamId, partitionSenders);
+          } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
+            throw new SamzaException(msg, e);
+          } catch (EventHubException | IllegalArgumentException e) {
+            String msg = "Creation of partition sender failed with exception";
+            throw new SamzaException(msg, e);
+          }
+        });
+    }
+    isInitialized = true;
+    LOG.info("EventHubSystemProducer initialized.");
   }
 
   @Override
@@ -183,38 +224,6 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     super.start();
     LOG.info("Starting system producer.");
 
-    // Create partition senders if required
-    if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(partitioningMethod)) {
-      // Create all partition senders
-      perStreamEventHubClientManagers.forEach((streamId, samzaEventHubClient) -> {
-          EventHubClient ehClient = samzaEventHubClient.getEventHubClient();
-
-          try {
-            Map<Integer, PartitionSender> partitionSenders = new HashMap<>();
-            long timeoutMs = config.getRuntimeInfoWaitTimeMS(systemName);
-            Integer numPartitions =
-                ehClient.getRuntimeInformation().get(timeoutMs, TimeUnit.MILLISECONDS).getPartitionCount();
-
-            for (int i = 0; i < numPartitions; i++) {
-              String partitionId = String.valueOf(i);
-              EventHubClientManager perPartitionClientManager =
-                  createOrGetEventHubClientManagerForPartition(streamId, i);
-              PartitionSender partitionSender =
-                  perPartitionClientManager.getEventHubClient().createPartitionSenderSync(partitionId);
-              partitionSenders.put(i, partitionSender);
-            }
-
-            streamPartitionSenders.put(streamId, partitionSenders);
-          } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            String msg = "Failed to fetch number of Event Hub partitions for partition sender creation";
-            throw new SamzaException(msg, e);
-          } catch (EventHubException | IllegalArgumentException e) {
-            String msg = "Creation of partition sender failed with exception";
-            throw new SamzaException(msg, e);
-          }
-        });
-    }
-
     // Initiate metrics
     streamIds.forEach((streamId) -> {
         eventSkipRate.put(streamId, metricsRegistry.newCounter(streamId, EVENT_SKIP_RATE));
@@ -245,6 +254,10 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
     if (!isStarted) {
       throw new SamzaException("Trying to call send before the producer is started.");
     }
+    if (!isInitialized) {
+      // lazy initialization on the first send
+      init();
+    }
 
     String streamId = config.getStreamId(envelope.getSystemStream().getStream());
 
@@ -371,6 +384,9 @@ public class EventHubSystemProducer extends AsyncSystemProducer {
           .forEach(ehClient -> ehClient.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS));
       perPartitionEventHubClients.clear();
     }
+    isStarted = false;
+    isInitialized = false;
+    LOG.info("EventHubSystemProducer stopped.");
   }
 
   /**