5d7855fa5aee6cbe693fa47c1ebad03da316f42b
[incubator-s4.git] / test-apps / twitter-counter / src / main / java / org / apache / s4 / example / twitter / TwitterCounterApp.java
1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 package org.apache.s4.example.twitter;
20
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.s4.base.KeyFinder;
25 import org.apache.s4.core.App;
26 import org.apache.s4.core.Stream;
27 import org.apache.s4.core.ft.CheckpointingConfig;
28 import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
29
30 import com.google.common.collect.ImmutableList;
31
32 public class TwitterCounterApp extends App {
33
34 @Override
35 protected void onClose() {
36 }
37
38 @Override
39 protected void onInit() {
40 try {
41
42 TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
43 topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
44 // we checkpoint this PE every 20s
45 topNTopicPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20)
46 .timeUnit(TimeUnit.SECONDS).build());
47 @SuppressWarnings("unchecked")
48 Stream<TopicEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder<TopicEvent>() {
49
50 @Override
51 public List<String> get(final TopicEvent arg0) {
52 return ImmutableList.of("aggregationKey");
53 }
54 }, topNTopicPE);
55
56 TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
57 topicCountAndReportPE.setDownstream(aggregatedTopicStream);
58 topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
59 // we checkpoint instances every 2 events
60 topicCountAndReportPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT)
61 .frequency(2).build());
62 Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
63
64 @Override
65 public List<String> get(final TopicEvent arg0) {
66 return ImmutableList.of(arg0.getTopic());
67 }
68 }, topicCountAndReportPE);
69
70 TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
71 topicExtractorPE.setDownStream(topicSeenStream);
72 topicExtractorPE.setSingleton(true);
73 createInputStream("RawStatus", topicExtractorPE);
74
75 } catch (Exception e) {
76 throw new RuntimeException(e);
77 }
78 }
79
80 @Override
81 protected void onStart() {
82
83 }
84 }

Copyright 2016, The Apache Software Foundation.