4720298d2d41560437524d6af6651ff3279554aa
[samza.git] / samza-core / src / main / java / org / apache / samza / task / StreamOperatorTask.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.task;
20
21 import org.apache.samza.application.StreamApplication;
22 import org.apache.samza.config.Config;
23 import org.apache.samza.operators.ContextManager;
24 import org.apache.samza.operators.StreamGraphImpl;
25 import org.apache.samza.operators.impl.OperatorImplGraph;
26 import org.apache.samza.operators.impl.RootOperatorImpl;
27 import org.apache.samza.operators.stream.InputStreamInternal;
28 import org.apache.samza.runtime.ApplicationRunner;
29 import org.apache.samza.system.IncomingMessageEnvelope;
30 import org.apache.samza.system.SystemStream;
31 import org.apache.samza.util.Clock;
32 import org.apache.samza.util.SystemClock;
33
34 import java.util.HashMap;
35 import java.util.Map;
36
37
38 /**
39 * A {@link StreamTask} implementation that brings all the operator API implementation components together and
40 * feeds the input messages into the user-defined transformation chains in {@link StreamApplication}.
41 */
42 public final class StreamOperatorTask implements StreamTask, InitableTask, WindowableTask, ClosableTask {
43
44 private final StreamApplication streamApplication;
45 private final ApplicationRunner runner;
46 private final Clock clock;
47
48 private OperatorImplGraph operatorImplGraph;
49 private ContextManager contextManager;
50 private Map<SystemStream, InputStreamInternal> inputSystemStreamToInputStream;
51
52 /**
53 * Constructs an adaptor task to run the user-implemented {@link StreamApplication}.
54 * @param streamApplication the user-implemented {@link StreamApplication} that creates the logical DAG
55 * @param runner the {@link ApplicationRunner} to get the mapping between logical and physical streams
56 * @param clock the {@link Clock} to use for time-keeping
57 */
58 public StreamOperatorTask(StreamApplication streamApplication, ApplicationRunner runner, Clock clock) {
59 this.streamApplication = streamApplication;
60 this.runner = runner;
61 this.clock = clock;
62 }
63
64 public StreamOperatorTask(StreamApplication application, ApplicationRunner runner) {
65 this(application, runner, SystemClock.instance());
66 }
67
68 /**
69 * Initializes this task during startup.
70 * <p>
71 * Implementation: Initializes the user-implemented {@link StreamApplication}. The {@link StreamApplication} sets
72 * the input and output streams and the task-wide context manager using the {@link StreamGraphImpl} APIs,
73 * and the logical transforms using the {@link org.apache.samza.operators.MessageStream} APIs.
74 *<p>
75 * It then uses the {@link StreamGraphImpl} to create the {@link OperatorImplGraph} corresponding to the logical
76 * DAG. It also saves the mapping between input {@link SystemStream}s and their corresponding
77 * {@link InputStreamInternal}s for delivering incoming messages to the appropriate sub-DAG.
78 *
79 * @param config allows accessing of fields in the configuration files that this StreamTask is specified in
80 * @param context allows initializing and accessing contextual data of this StreamTask
81 * @throws Exception in case of initialization errors
82 */
83 @Override
84 public final void init(Config config, TaskContext context) throws Exception {
85 StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
86 // initialize the user-implemented stream application.
87 this.streamApplication.init(streamGraph, config);
88
89 // get the user-implemented context manager and initialize the task-specific context.
90 this.contextManager = streamGraph.getContextManager();
91 TaskContext initializedTaskContext = this.contextManager.initTaskContext(config, context);
92
93 // create the operator impl DAG corresponding to the logical operator spec DAG
94 OperatorImplGraph operatorImplGraph = new OperatorImplGraph(clock);
95 operatorImplGraph.init(streamGraph, config, initializedTaskContext);
96 this.operatorImplGraph = operatorImplGraph;
97
98 // TODO: SAMZA-1118 - Remove mapping after SystemConsumer starts returning logical streamId with incoming messages
99 inputSystemStreamToInputStream = new HashMap<>();
100 streamGraph.getInputStreams().forEach((streamSpec, inputStream)-> {
101 SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName());
102 inputSystemStreamToInputStream.put(systemStream, inputStream);
103 });
104 }
105
106 /**
107 * Passes the incoming message envelopes along to the {@link org.apache.samza.operators.impl.RootOperatorImpl} node
108 * for the input {@link SystemStream}.
109 * <p>
110 * From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates its transformed output to
111 * its chained {@link org.apache.samza.operators.impl.OperatorImpl}s itself.
112 *
113 * @param ime incoming message envelope to process
114 * @param collector the collector to send messages with
115 * @param coordinator the coordinator to request commits or shutdown
116 */
117 @Override
118 public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
119 SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
120 InputStreamInternal inputStream = inputSystemStreamToInputStream.get(systemStream);
121 RootOperatorImpl rootOperatorImpl = operatorImplGraph.getRootOperator(systemStream);
122 if (rootOperatorImpl != null) {
123 // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde
124 // before applying the msgBuilder.
125 Object message = inputStream.getMsgBuilder().apply(ime.getKey(), ime.getMessage());
126 rootOperatorImpl.onMessage(message, collector, coordinator);
127 }
128 }
129
130 @Override
131 public final void window(MessageCollector collector, TaskCoordinator coordinator) {
132 operatorImplGraph.getAllRootOperators()
133 .forEach(rootOperator -> rootOperator.onTimer(collector, coordinator));
134 }
135
136 @Override
137 public void close() throws Exception {
138 this.contextManager.finalizeTaskContext();
139 }
140 }