FLUME-3031. Change sequence source to reset its counter for event body on channel...
[flume.git] / flume-ng-core / src / main / java / org / apache / flume / sink / LoggerSink.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.flume.sink;
20
21 import org.apache.flume.Channel;
22 import org.apache.flume.Event;
23 import org.apache.flume.EventDeliveryException;
24 import org.apache.flume.Sink;
25 import org.apache.flume.Transaction;
26 import org.apache.flume.event.EventHelper;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 /**
31 * <p>
32 * A {@link Sink} implementation that logs all events received at the INFO level
33 * to the <tt>org.apache.flume.sink.LoggerSink</tt> logger.
34 * </p>
35 * <p>
36 * <b>WARNING:</b> Logging events can potentially introduce performance
37 * degradation.
38 * </p>
39 * <p>
40 * <b>Configuration options</b>
41 * </p>
42 * <p>
43 * <i>This sink has no configuration parameters.</i>
44 * </p>
45 * <p>
46 * <b>Metrics</b>
47 * </p>
48 * <p>
49 * TODO
50 * </p>
51 */
52 public class LoggerSink extends AbstractSink {
53
54 private static final Logger logger = LoggerFactory
55 .getLogger(LoggerSink.class);
56
57 @Override
58 public Status process() throws EventDeliveryException {
59 Status result = Status.READY;
60 Channel channel = getChannel();
61 Transaction transaction = channel.getTransaction();
62 Event event = null;
63
64 try {
65 transaction.begin();
66 event = channel.take();
67
68 if (event != null) {
69 if (logger.isInfoEnabled()) {
70 logger.info("Event: " + EventHelper.dumpEvent(event));
71 }
72 } else {
73 // No event found, request back-off semantics from the sink runner
74 result = Status.BACKOFF;
75 }
76 transaction.commit();
77 } catch (Exception ex) {
78 transaction.rollback();
79 throw new EventDeliveryException("Failed to log event: " + event, ex);
80 } finally {
81 transaction.close();
82 }
83
84 return result;
85 }
86 }

Copyright 2016, The Apache Software Foundation.