SAMZA-1785: add retry logic in eventhubs system consumer for non transient error
authorHai Lu <halu@linkedin.com>
Mon, 30 Jul 2018 16:28:16 +0000 (09:28 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Mon, 30 Jul 2018 16:28:16 +0000 (09:28 -0700)
Implement a retry logic in EH system consumer because of lack of nurse job on azure and lack of retry logic in samza standlone.

The retry logic can be tuned through config to control max retry count allowed within a certain time window (sliding window).

Author: Hai Lu <halu@linkedin.com>

Reviewers: Jagadish<jagadish@apache.org>

Closes #587 from lhaiesp/master

samza-azure/src/main/java/org/apache/samza/system/eventhub/EventHubConfig.java
samza-azure/src/main/java/org/apache/samza/system/eventhub/consumer/EventHubSystemConsumer.java
samza-azure/src/test/java/org/apache/samza/system/eventhub/MockEventHubClientManagerFactory.java
samza-azure/src/test/java/org/apache/samza/system/eventhub/consumer/TestEventHubSystemConsumer.java
samza-core/src/main/java/org/apache/samza/testUtils/TestClock.java [moved from samza-core/src/test/java/org/apache/samza/testUtils/TestClock.java with 100% similarity]

index 61f823c..4e1e3bb 100644 (file)
@@ -85,6 +85,19 @@ public class EventHubConfig extends MapConfig {
   public static final String CONFIG_PER_PARTITION_CONNECTION = "systems.%s.eventhubs.perPartition.connection";
   public static final Boolean DEFAULT_CONFIG_PER_PARTITION_CONNECTION = true;
 
+  /*
+   * This set of configs control the max retry count allowed within a certain sliding window, as well as
+   * the minimum interval between two retries.
+   * For example, if max retry count is 10, window size is 1 day, min retry interval is 10 min, then
+   * we retry up to 10 times within 1 day time frame and we only retry 10 min after the last retry.
+   */
+  public static final String CONFIG_MAX_RETRY_COUNT = "systems.%s.eventhubs.max.retry.count";
+  public static final long DEFAULT_CONFIG_MAX_RETRIES_COUNT = 3;
+  public static final String CONFIG_RETRY_WINDOW_MS = "systems.%s.eventhubs.retry.window.ms";
+  public static final long DEFAULT_CONFIG_RETRY_WINDOW_MS = Duration.ofHours(3).toMillis();
+  public static final String CONFIG_MIN_RETRY_INTERVAL_MS = "systems.%s.eventhubs.min.retry.interval.ms";
+  public static final long DEFAULT_CONFIG_RETRY_INTERVAL_MS = Duration.ofMinutes(3).toMillis();
+
   private final Map<String, String> physcialToId = new HashMap<>();
 
   private static final Logger LOG = LoggerFactory.getLogger(EventHubConfig.class);
@@ -310,4 +323,31 @@ public class EventHubConfig extends MapConfig {
     }
     return Boolean.valueOf(isPerPartitionConnection);
   }
+
+  /**
+   * Get max retry count allowed before propagating the exception to users
+   * @param systemaName name of the system
+   * @return long, max retry count allowed
+   */
+  public long getMaxRetryCount(String systemaName) {
+    return getLong(String.format(CONFIG_MAX_RETRY_COUNT, systemaName), DEFAULT_CONFIG_MAX_RETRIES_COUNT);
+  }
+
+  /**
+   * Get the sliding window size in ms for tracking the retry count
+   * @param systemName name of the system
+   * @return long, sliding window size in ms
+   */
+  public long getRetryWindowMs(String systemName) {
+    return getLong(String.format(CONFIG_RETRY_WINDOW_MS, systemName), DEFAULT_CONFIG_RETRY_WINDOW_MS);
+  }
+
+  /**
+   * Get the minimum interval in ms between two retries on non transient error
+   * @param systemName name of the system
+   * @return long, minimum interval in ms between retries
+   */
+  public long getMinRetryIntervalMs(String systemName) {
+    return getLong(String.format(CONFIG_MIN_RETRY_INTERVAL_MS, systemName), DEFAULT_CONFIG_RETRY_INTERVAL_MS);
+  }
 }
index 6b3f344..454fc57 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.system.eventhub.consumer;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.microsoft.azure.eventhubs.EventData;
 import com.microsoft.azure.eventhubs.EventHubException;
 import com.microsoft.azure.eventhubs.EventPosition;
@@ -33,6 +34,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -43,6 +47,7 @@ import org.apache.commons.lang3.Validate;
 import org.apache.samza.SamzaException;
 import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.SlidingTimeWindowReservoir;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.EventHubClientManager;
@@ -53,6 +58,7 @@ import org.apache.samza.system.eventhub.admin.EventHubSystemAdmin;
 import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
 import org.apache.samza.system.eventhub.producer.EventHubSystemProducer;
 import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Clock;
 import org.apache.samza.util.ShutdownUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -137,22 +143,39 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
   private final String systemName;
   private final EventHubClientManagerFactory eventHubClientManagerFactory;
 
-  // Partition receiver error propagation
-  private final AtomicReference<Throwable> eventHubHandlerError = new AtomicReference<>(null);
+  // Partition receiver non transient error propagation
+  private final AtomicReference<Throwable> eventHubNonTransientError = new AtomicReference<>(null);
+
+  private final ExecutorService reconnectTaskRunner = Executors.newSingleThreadExecutor(
+      new ThreadFactoryBuilder().setNameFormat("EventHubs-Reconnect-Task").setDaemon(true).build());
+  private long lastRetryTs = 0;
+
+  private final Clock clock;
+  @VisibleForTesting
+  final SlidingTimeWindowReservoir recentRetryAttempts;
+  @VisibleForTesting
+  volatile Future reconnectTaskStatus = null;
 
   public EventHubSystemConsumer(EventHubConfig config, String systemName,
       EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
       MetricsRegistry registry) {
-    super(registry, System::currentTimeMillis);
+    this(config, systemName, eventHubClientManagerFactory, interceptors, registry, System::currentTimeMillis);
+  }
+
+  EventHubSystemConsumer(EventHubConfig config, String systemName,
+      EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> interceptors,
+      MetricsRegistry registry, Clock clock) {
+    super(registry, clock);
 
     this.config = config;
+    this.clock = clock;
     this.systemName = systemName;
     this.interceptors = interceptors;
     this.eventHubClientManagerFactory = eventHubClientManagerFactory;
     List<String> streamIds = config.getStreams(systemName);
     prefetchCount = config.getPrefetchCount(systemName);
 
-
+    recentRetryAttempts = new SlidingTimeWindowReservoir(config.getRetryWindowMs(systemName), clock);
 
     // Initiate metrics
     eventReadRates =
@@ -231,14 +254,9 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     return eventHubClientManager;
   }
 
-  @Override
-  public void start() {
-    if (isStarted) {
-      LOG.warn("Trying to start EventHubSystemConsumer while it's already started. Ignore the request.");
-      return;
-    }
-    isStarted = true;
+  private synchronized void initializeEventHubsManagers() {
     LOG.info("Starting EventHubSystemConsumer. Count of SSPs registered: " + streamPartitionOffsets.entrySet().size());
+    eventHubNonTransientError.set(null);
     // Create receivers for Event Hubs
     for (Map.Entry<SystemStreamPartition, String> entry : streamPartitionOffsets.entrySet()) {
       SystemStreamPartition ssp = entry.getKey();
@@ -289,25 +307,43 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
       }
       LOG.info(String.format("Connection successfully started for namespace=%s, entity=%s ", namespace, entityPath));
     }
+  }
+
+  @Override
+  public void start() {
+    if (isStarted) {
+      LOG.warn("Trying to start EventHubSystemConsumer while it's already started. Ignore the request.");
+      return;
+    }
+    isStarted = true;
+    initializeEventHubsManagers();
     LOG.info("EventHubSystemConsumer started");
   }
 
   @Override
   public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
       Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
-    Throwable handlerError = eventHubHandlerError.get();
-
-    if (handlerError != null) {
-      if (isErrorTransient(handlerError)) {
-        // Log a warning if the error is transient
-        // Partition receiver handler OnError should have handled it by recreating the receiver
-        LOG.warn("Received a transient error from event hub partition receiver, restarted receiver", handlerError);
+    Throwable handlerError = eventHubNonTransientError.get();
+    /*
+     * We will retry for non transient error by instantiating a new EventHubs client if
+     * 1. Last retry happened more than CONFIG_MIN_RETRY_INTERVAL_MS ms ago. Otherwise we ignore
+     * 2. We haven't reached CONFIG_MAX_RETRY_COUNT allowed within the CONFIG_RETRY_WINDOW_MS window.
+     *    Otherwise we throw
+     */
+    if (handlerError != null && clock.currentTimeMillis() - lastRetryTs > config.getMinRetryIntervalMs(systemName)) {
+      int currentRetryCount = recentRetryAttempts.size();
+      long maxRetryCount = config.getMaxRetryCount(systemName);
+      if (currentRetryCount < maxRetryCount) {
+        LOG.warn("Received non transient error. Will retry.", handlerError);
+        LOG.info("Current retry count within window: {}. max retry count allowed: {}. window size: {} ms",
+            currentRetryCount, maxRetryCount, config.getRetryWindowMs(systemName));
+        long now = clock.currentTimeMillis();
+        recentRetryAttempts.update(now);
+        lastRetryTs = now;
+        reconnectTaskStatus = reconnectTaskRunner.submit(this::renewEventHubsClient);
       } else {
-        // Propagate the error to user if the throwable is either
-        // 1. permanent ServiceBusException error from client
-        // 2. SamzaException thrown bu the EventHubConsumer
-        //   2a. Interrupted during put operation to BEM
-        //   2b. Failure in renewing the Partititon Receiver
+        LOG.error("Retries exhausted. Reached max allowed retries: ({}) within window {} ms", currentRetryCount,
+            config.getRetryWindowMs(systemName));
         String msg = "Received a non transient error from event hub partition receiver";
         throw new SamzaException(msg, handlerError);
       }
@@ -316,6 +352,17 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
     return super.poll(systemStreamPartitions, timeout);
   }
 
+  private synchronized void renewEventHubsClient() {
+    try {
+      LOG.info("Start to renew eventhubs client");
+      shutdownEventHubsManagers(); // The shutdown is in parallel and time bounded
+      initializeEventHubsManagers();
+    } catch (Exception e) {
+      LOG.error("Failed to renew eventhubs client", e);
+      eventHubNonTransientError.set(e);
+    }
+  }
+
   private void renewPartitionReceiver(SystemStreamPartition ssp) {
     String streamId = config.getStreamId(ssp.getStream());
     EventHubClientManager eventHubClientManager = perPartitionEventHubManagers.get(ssp);
@@ -341,15 +388,12 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
       receiver.setReceiveHandler(streamPartitionHandlers.get(ssp));
       streamPartitionReceivers.put(ssp, receiver);
     } catch (Exception e) {
-      eventHubHandlerError.set(new SamzaException(
+      eventHubNonTransientError.set(new SamzaException(
           String.format("Failed to recreate receiver for EventHubs after ReceiverHandlerError (ssp=%s)", ssp), e));
     }
   }
 
-  @Override
-  public void stop() {
-    LOG.info("Stopping event hub system consumer...");
-
+  private synchronized void shutdownEventHubsManagers() {
     // There could be potentially many Receivers and EventHubManagers, so close the managers in parallel
     LOG.info("Start shutting down eventhubs receivers");
     ShutdownUtil.boundedShutdown(streamPartitionReceivers.values().stream().map(receiver -> new Runnable() {
@@ -377,16 +421,19 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
     perPartitionEventHubManagers.clear();
     perStreamEventHubManagers.clear();
-    isStarted = false;
-    LOG.info("Event hub system consumer stopped.");
   }
 
-  private boolean isErrorTransient(Throwable throwable) {
-    if (throwable instanceof EventHubException) {
-      EventHubException eventHubException = (EventHubException) throwable;
-      return eventHubException.getIsTransient();
+  @Override
+  public void stop() {
+    LOG.info("Stopping event hub system consumer...");
+    try {
+      reconnectTaskRunner.shutdown();
+      shutdownEventHubsManagers();
+      isStarted = false;
+    } catch (Exception e) {
+      LOG.warn("Exception during stop.", e);
     }
-    return false;
+    LOG.info("Event hub system consumer stopped.");
   }
 
   @Override
@@ -471,10 +518,6 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
         EventHubException busException = (EventHubException) throwable;
 
         if (busException.getIsTransient()) {
-
-          // Only set to transient throwable if there has been no previous errors
-          eventHubHandlerError.compareAndSet(null, throwable);
-
           LOG.warn(
               String.format("Received transient exception from EH client. Renew partition receiver for ssp: %s", ssp),
               throwable);
@@ -492,7 +535,7 @@ public class EventHubSystemConsumer extends BlockingEnvelopeMap {
 
       LOG.error(String.format("Received non transient exception from EH client for ssp: %s", ssp), throwable);
       // Propagate non transient or unknown errors
-      eventHubHandlerError.set(throwable);
+      eventHubNonTransientError.set(throwable);
     }
   }
 }
index 6ee9bcf..3b4f1ec 100644 (file)
@@ -70,6 +70,10 @@ public class MockEventHubClientManagerFactory extends EventHubClientManagerFacto
     handlers.forEach((ssp, value) -> value.onReceive(eventData.get(ssp)));
   }
 
+  public void triggerError(Map<SystemStreamPartition, PartitionReceiveHandler> handlers, Throwable e) {
+    handlers.forEach((ssp, value) -> value.onError(e));
+  }
+
   public EventPosition getPartitionOffset(String partitionId) {
     return startingOffsets.getOrDefault(partitionId, null);
   }
index 6e055c6..47e4656 100644 (file)
@@ -29,6 +29,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.eventhub.*;
 import org.apache.samza.system.eventhub.admin.PassThroughInterceptor;
 import org.apache.samza.system.eventhub.producer.SwapFirstLastByteInterceptor;
+import org.apache.samza.testUtils.TestClock;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -36,6 +37,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.*;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -349,4 +352,95 @@ public class TestEventHubSystemConsumer {
     Assert.assertEquals(counters2.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents);
     Assert.assertEquals(counters2.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 0);
   }
+
+  @Test
+  public void testNonTransientErrorRetry() throws Exception {
+    String systemName = "eventhubs";
+    String streamName = "testNonTransientErrorRetry";
+    int numEvents = 10; // needs to be less than BLOCKING_QUEUE_SIZE
+    int partitionId = 0;
+    TestClock testClock = new TestClock();
+
+    TestMetricsRegistry testMetrics = new TestMetricsRegistry();
+    Map<SystemStreamPartition, List<EventData>> eventData = new HashMap<>();
+    SystemStreamPartition ssp = new SystemStreamPartition(systemName, streamName, new Partition(partitionId));
+    Map<String, Interceptor> interceptors = new HashMap<>();
+    interceptors.put(streamName, new PassThroughInterceptor());
+
+    // create EventData
+    List<EventData> singlePartitionEventData = MockEventData.generateEventData(numEvents);
+    eventData.put(ssp, singlePartitionEventData);
+
+    // Set configs
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, systemName), streamName);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, streamName), EVENTHUB_NAMESPACE);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, streamName), EVENTHUB_KEY_NAME);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, streamName), EVENTHUB_KEY);
+    configMap.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, streamName), MOCK_ENTITY_1);
+    configMap.put(String.format(EventHubConfig.CONFIG_MAX_RETRY_COUNT, systemName), "1");
+    MapConfig config = new MapConfig(configMap);
+
+    MockEventHubClientManagerFactory eventHubClientWrapperFactory = new MockEventHubClientManagerFactory(eventData);
+
+    EventHubSystemConsumer consumer =
+        new EventHubSystemConsumer(new EventHubConfig(config), systemName, eventHubClientWrapperFactory, interceptors,
+            testMetrics, testClock);
+    consumer.register(ssp, EventHubSystemConsumer.END_OF_STREAM);
+    consumer.start();
+
+    // 1st error should retry instead of throw
+    testClock.advanceTime(System.currentTimeMillis());
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    // assert that the reconnect task was submitted and completed eventually
+    Assert.assertNotNull("reconnect task should have been submitted", consumer.reconnectTaskStatus);
+    Future lastReconnectTask = consumer.reconnectTaskStatus;
+    lastReconnectTask.get(10000, TimeUnit.MILLISECONDS); // should return instantaneously
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 1);
+
+    // after retry should receive events normally
+    testClock.advanceTime(1);
+    eventHubClientWrapperFactory.sendToHandlers(consumer.streamPartitionHandlers);
+    List<IncomingMessageEnvelope> result = consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    verifyEvents(result, singlePartitionEventData);
+    Assert.assertEquals(testMetrics.getCounters(streamName).size(), 3);
+    Assert.assertEquals(testMetrics.getGauges(streamName).size(), 2);
+    Map<String, Counter> counters =
+        testMetrics.getCounters(streamName).stream().collect(Collectors.toMap(Counter::getName, Function.identity()));
+    Assert.assertEquals(counters.get(EventHubSystemConsumer.EVENT_READ_RATE).getCount(), numEvents);
+
+    // 2nd error: advance into next window, the older retry should have been evicted so this error should cause retry
+    testClock.advanceTime(EventHubConfig.DEFAULT_CONFIG_RETRY_WINDOW_MS + 1);
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 0);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    Assert.assertNotNull("reconnect task should have been submitted", consumer.reconnectTaskStatus);
+    lastReconnectTask = consumer.reconnectTaskStatus;
+    lastReconnectTask.get(10000, TimeUnit.MILLISECONDS); // should return instantaneously
+    Assert.assertEquals(consumer.recentRetryAttempts.size(), 1);
+
+    // 3rd error: 1 ms is within the min retry interval; so poll should do nothing
+    testClock.advanceTime(1);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+    Assert.assertEquals("there shouldn't be another retry task within min retry interval", consumer.reconnectTaskStatus,
+        lastReconnectTask);
+
+    // 4th error: now the poll should throw
+    testClock.advanceTime(EventHubConfig.DEFAULT_CONFIG_RETRY_INTERVAL_MS + 1);
+    eventHubClientWrapperFactory.triggerError(consumer.streamPartitionHandlers,
+        new EventHubException(false /* is transient */, "test"));
+    try {
+      consumer.poll(Collections.singleton(ssp), 0).get(ssp);
+      Assert.fail("poll should have thrown");
+    } catch (Exception e) {
+      Assert.assertEquals(e.getCause().getMessage(), "test");
+    }
+
+    Assert.assertEquals(counters.get(EventHubSystemConsumer.READ_ERRORS).getCount(), 4);
+  }
 }