FLUME-2350. Consume Order tests need to space out file creation.
[flume.git] / flume-ng-core / src / main / java / org / apache / flume / sink / LoggerSink.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 package org.apache.flume.sink;
20
21 import org.apache.flume.Channel;
22 import org.apache.flume.Event;
23 import org.apache.flume.EventDeliveryException;
24 import org.apache.flume.Sink;
25 import org.apache.flume.Transaction;
26 import org.apache.flume.event.EventHelper;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31  * <p>
32  * A {@link Sink} implementation that logs all events received at the INFO level
33  * to the <tt>org.apache.flume.sink.LoggerSink</tt> logger.
34  * </p>
35  * <p>
36  * <b>WARNING:</b> Logging events can potentially introduce performance
37  * degradation.
38  * </p>
39  * <p>
40  * <b>Configuration options</b>
41  * </p>
42  * <p>
43  * <i>This sink has no configuration parameters.</i>
44  * </p>
45  * <p>
46  * <b>Metrics</b>
47  * </p>
48  * <p>
49  * TODO
50  * </p>
51  */
52 public class LoggerSink extends AbstractSink {
53
54   private static final Logger logger = LoggerFactory
55       .getLogger(LoggerSink.class);
56
57   @Override
58   public Status process() throws EventDeliveryException {
59     Status result = Status.READY;
60     Channel channel = getChannel();
61     Transaction transaction = channel.getTransaction();
62     Event event = null;
63
64     try {
65       transaction.begin();
66       event = channel.take();
67
68       if (event != null) {
69         if (logger.isInfoEnabled()) {
70           logger.info("Event: " + EventHelper.dumpEvent(event));
71         }
72       } else {
73         // No event found, request back-off semantics from the sink runner
74         result = Status.BACKOFF;
75       }
76       transaction.commit();
77     } catch (Exception ex) {
78       transaction.rollback();
79       throw new EventDeliveryException("Failed to log event: " + event, ex);
80     } finally {
81       transaction.close();
82     }
83
84     return result;
85   }
86 }