5d7855fa5aee6cbe693fa47c1ebad03da316f42b
[incubator-s4.git] / test-apps / twitter-counter / src / main / java / org / apache / s4 / example / twitter / TwitterCounterApp.java
1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18
19 package org.apache.s4.example.twitter;
20
21 import java.util.List;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.s4.base.KeyFinder;
25 import org.apache.s4.core.App;
26 import org.apache.s4.core.Stream;
27 import org.apache.s4.core.ft.CheckpointingConfig;
28 import org.apache.s4.core.ft.CheckpointingConfig.CheckpointingMode;
29
30 import com.google.common.collect.ImmutableList;
31
32 public class TwitterCounterApp extends App {
33
34     @Override
35     protected void onClose() {
36     }
37
38     @Override
39     protected void onInit() {
40         try {
41
42             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
43             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
44             // we checkpoint this PE every 20s
45             topNTopicPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.TIME).frequency(20)
46                     .timeUnit(TimeUnit.SECONDS).build());
47             @SuppressWarnings("unchecked")
48             Stream<TopicEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder<TopicEvent>() {
49
50                 @Override
51                 public List<String> get(final TopicEvent arg0) {
52                     return ImmutableList.of("aggregationKey");
53                 }
54             }, topNTopicPE);
55
56             TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
57             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
58             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
59             // we checkpoint instances every 2 events
60             topicCountAndReportPE.setCheckpointingConfig(new CheckpointingConfig.Builder(CheckpointingMode.EVENT_COUNT)
61                     .frequency(2).build());
62             Stream<TopicEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicEvent>() {
63
64                 @Override
65                 public List<String> get(final TopicEvent arg0) {
66                     return ImmutableList.of(arg0.getTopic());
67                 }
68             }, topicCountAndReportPE);
69
70             TopicExtractorPE topicExtractorPE = createPE(TopicExtractorPE.class);
71             topicExtractorPE.setDownStream(topicSeenStream);
72             topicExtractorPE.setSingleton(true);
73             createInputStream("RawStatus", topicExtractorPE);
74
75         } catch (Exception e) {
76             throw new RuntimeException(e);
77         }
78     }
79
80     @Override
81     protected void onStart() {
82
83     }
84 }