SAMZA-1268: Javadoc cleanup for public APIs for 0.13 release
[samza.git] / samza-core / src / main / java / org / apache / samza / task / StreamOperatorTask.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.task;
20
21 import org.apache.samza.application.StreamApplication;
22 import org.apache.samza.config.Config;
23 import org.apache.samza.operators.ContextManager;
24 import org.apache.samza.operators.StreamGraphImpl;
25 import org.apache.samza.operators.impl.OperatorImplGraph;
26 import org.apache.samza.operators.impl.RootOperatorImpl;
27 import org.apache.samza.operators.stream.InputStreamInternal;
28 import org.apache.samza.runtime.ApplicationRunner;
29 import org.apache.samza.system.IncomingMessageEnvelope;
30 import org.apache.samza.system.SystemStream;
31 import org.apache.samza.util.Clock;
32 import org.apache.samza.util.SystemClock;
33
34 import java.util.HashMap;
35 import java.util.Map;
36
37
38 /**
39 * A {@link StreamTask} implementation that brings all the operator API implementation components together and
40 * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
41 */
42 public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
43
44 private final StreamApplication streamApplication;
45 private final ApplicationRunner runner;
46 private final Clock clock;
47
48 private OperatorImplGraph operatorImplGraph;
49 private ContextManager contextManager;
50 private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
51
52 /**
53 * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
54 * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG
55 * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams
56 * @param clock the {@link Clock} to use for time-keeping
57 */
58 public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) {
59 this.streamApplication = streamApplication;
60 this.runner = runner;
61 this.clock = clock;
62 }
63
64 public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) {
65 this(application, runner, SystemClock.instance());
66 }
67
68 /**
69 * Initializes this task during startup.
70 * <p>
71 * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
72 * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
73 * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
74 *<p>
75 * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
76 * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
77 * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
78 *
79 * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
80 * @param context allows initializing and accessing contextual data of this StreamTask
81 * @throws Exception in case of initialization errors
82 */
83 @Override
84 public final void init(Config config, TaskContext context) throws Exception {
85 StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
86 // initialize the user-implemented stream application.
87 this.streamApplication.init(streamGraph, config);
88
89 // get the user-implemented context manager and initialize it
90 this.contextManager = streamGraph.getContextManager();
91 if (this.contextManager != null) {
92 this.contextManager.init(config, context);
93 }
94
95 // create the operator impl DAG corresponding to the logical operator spec DAG
96 OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
97 operatorImplGraph.init(streamGraph, config, context);
98 this.operatorImplGraph = operatorImplGraph;
99
100 // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
101 inputSystemStreamToInputStream = new HashMap<>();
102 streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
103 SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
104 inputSystemStreamToInputStream.put(systemStream, inputStream);
105 });
106 }
107
108 /**
109 * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
110 * for the input {@link SystemStream}.
111 * <p>
112 * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
113 * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
114 *
115 * @param ime incoming message envelope to process
116 * @param collector the collector to send messages with
117 * @param coordinator the coordinator to request commits or shutdown
118 */
119 @Override
120 public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
121 SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
122 InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
123 RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
124 if (rootOperatorImpl != null) {
125 // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
126 // before applying the msgBuilder.
127 Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
128 rootOperatorImpl.onMessage(message, collector, coordinator);
129 }
130 }
131
132 @Override
133 public final void window(MessageCollector collector, TaskCoordinator coordinator) {
134 operatorImplGraph.getAllRootOperators()
135 .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
136 }
137
138 @Override
139 public void close() throws Exception {
140 if (this.contextManager != null) {
141 this.contextManager.close();
142 }
143 }
144 }