FLUME-1572. Add batching support to FILE_ROLL sink.
authorMike Percy <mpercy@apache.org>
Thu, 13 Sep 2012 00:33:16 +0000 (17:33 -0700)
committerMike Percy <mpercy@apache.org>
Thu, 13 Sep 2012 00:33:16 +0000 (17:33 -0700)
(Hari Shreedharan via Mike Percy)

flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java

index e5e97ff..a94eea1 100644 (file)
@@ -47,6 +47,9 @@ public class RollingFileSink extends AbstractSink implements Configurable {
   private static final Logger logger = LoggerFactory
       .getLogger(RollingFileSink.class);
   private static final long defaultRollInterval = 30;
+  private static final int defaultBatchSize = 100;
+
+  private int batchSize = defaultBatchSize;
 
   private File directory;
   private long rollInterval;
@@ -90,6 +93,8 @@ public class RollingFileSink extends AbstractSink implements Configurable {
       this.rollInterval = Long.parseLong(rollInterval);
     }
 
+    batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
+
     this.directory = new File(directory);
   }
 
@@ -175,30 +180,32 @@ public class RollingFileSink extends AbstractSink implements Configurable {
 
     try {
       transaction.begin();
-      event = channel.take();
-
-      if (event != null) {
-        serializer.write(event);
-
-        /*
-         * FIXME: Feature: Rotate on size and time by checking bytes written and
-         * setting shouldRotate = true if we're past a threshold.
-         */
-
-        /*
-         * FIXME: Feature: Control flush interval based on time or number of
-         * events. For now, we're super-conservative and flush on each write.
-         */
-        serializer.flush();
-        outputStream.flush();
-      } else {
-        // No events found, request back-off semantics from runner
-        result = Status.BACKOFF;
+      for (int i = 0; i < batchSize; i++) {
+        event = channel.take();
+        if (event != null) {
+          serializer.write(event);
+
+          /*
+           * FIXME: Feature: Rotate on size and time by checking bytes written and
+           * setting shouldRotate = true if we're past a threshold.
+           */
+
+          /*
+           * FIXME: Feature: Control flush interval based on time or number of
+           * events. For now, we're super-conservative and flush on each write.
+           */
+        } else {
+          // No events found, request back-off semantics from runner
+          result = Status.BACKOFF;
+          break;
+        }
       }
+      serializer.flush();
+      outputStream.flush();
       transaction.commit();
     } catch (Exception ex) {
       transaction.rollback();
-      throw new EventDeliveryException("Failed to process event: " + event, ex);
+      throw new EventDeliveryException("Failed to process transaction", ex);
     } finally {
       transaction.close();
     }
index 10c9b82..07fa644 100644 (file)
@@ -84,6 +84,7 @@ public class TestRollingFileSink {
 
     context.put("sink.directory", tmpDir.getPath());
     context.put("sink.rollInterval", "1");
+    context.put("sink.batchSize", "1");
 
     Configurables.configure(sink, context);
 
@@ -131,6 +132,8 @@ public class TestRollingFileSink {
 
     context.put("sink.directory", tmpDir.getPath());
     context.put("sink.rollInterval", "0");
+    context.put("sink.batchSize", "1");
+
 
     Configurables.configure(sink, context);