c0da1b261a3147766e548aa07c7060e03af506b9
[samza.git] / samza-core / src / main / java / org / apache / samza / operators / StreamGraphImpl.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.operators;
20
21 import org.apache.samza.config.Config;
22 import org.apache.samza.config.JobConfig;
23 import org.apache.samza.operators.spec.InputOperatorSpec;
24 import org.apache.samza.operators.spec.OutputStreamImpl;
25 import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
26 import org.apache.samza.operators.spec.OperatorSpec;
27 import org.apache.samza.runtime.ApplicationRunner;
28 import org.apache.samza.system.StreamSpec;
29
30 import java.util.Collection;
31 import java.util.Collections;
32 import java.util.HashSet;
33 import java.util.LinkedHashMap;
34 import java.util.Map;
35 import java.util.Set;
36 import java.util.function.BiFunction;
37 import java.util.function.Function;
38 import java.util.stream.Collectors;
39
40 /**
41 * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
42 * create the DAG of transforms.
43 */
44 public class StreamGraphImpl implements StreamGraph {
45
46 /**
47 * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
48 * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
49 */
50 private int opId = 0;
51
52 // We use a LHM for deterministic order in initializing and closing operators.
53 private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>();
54 private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>();
55 private final ApplicationRunner runner;
56 private final Config config;
57
58 private ContextManager contextManager = null;
59
60 public StreamGraphImpl(ApplicationRunner runner, Config config) {
61 // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
62 // can use streamId to send and receive messages.
63 this.runner = runner;
64 this.config = config;
65 }
66
67 @Override
68 public <K, V, M> MessageStream<M> getInputStream(String streamId,
69 BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
70 if (msgBuilder == null) {
71 throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
72 }
73
74 if (inputOperators.containsKey(runner.getStreamSpec(streamId))) {
75 throw new IllegalStateException("getInputStream() invoked multiple times "
76 + "with the same streamId: " + streamId);
77 }
78
79 StreamSpec streamSpec = runner.getStreamSpec(streamId);
80 inputOperators.put(streamSpec,
81 new InputOperatorSpec<>(streamSpec, (BiFunction<K, V, M>) msgBuilder, this.getNextOpId()));
82 return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
83 }
84
85 @Override
86 public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
87 Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor) {
88 if (keyExtractor == null) {
89 throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
90 }
91
92 if (msgExtractor == null) {
93 throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
94 }
95
96 if (outputStreams.containsKey(runner.getStreamSpec(streamId))) {
97 throw new IllegalStateException("getOutputStream() invoked multiple times "
98 + "with the same streamId: " + streamId);
99 }
100
101 StreamSpec streamSpec = runner.getStreamSpec(streamId);
102 outputStreams.put(streamSpec,
103 new OutputStreamImpl<>(streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
104 return outputStreams.get(streamSpec);
105 }
106
107 @Override
108 public StreamGraph withContextManager(ContextManager contextManager) {
109 this.contextManager = contextManager;
110 return this;
111 }
112
113 /**
114 * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
115 * An intermediate {@link MessageStream} is both an output and an input stream.
116 *
117 * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
118 * logical streamId.
119 * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
120 * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
121 * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
122 * in the intermediate {@link MessageStream}
123 * @param <K> the type of key in the intermediate message
124 * @param <V> the type of message in the intermediate message
125 * @param <M> the type of messages in the intermediate {@link MessageStream}
126 * @return the intermediate {@link MessageStreamImpl}
127 */
128 <K, V, M> IntermediateMessageStreamImpl<K, V, M> getIntermediateStream(String streamName,
129 Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor,
130 BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
131 String streamId = String.format("%s-%s-%s",
132 config.get(JobConfig.JOB_NAME()),
133 config.get(JobConfig.JOB_ID(), "1"),
134 streamName);
135 if (msgBuilder == null) {
136 throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
137 }
138 if (keyExtractor == null) {
139 throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
140 }
141 if (msgExtractor == null) {
142 throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
143 }
144 StreamSpec streamSpec = runner.getStreamSpec(streamId);
145 if (inputOperators.containsKey(streamSpec) || outputStreams.containsKey(streamSpec)) {
146 throw new IllegalStateException("getIntermediateStream() invoked multiple times "
147 + "with the same streamId: " + streamId);
148 }
149 inputOperators.put(streamSpec, new InputOperatorSpec(streamSpec, msgBuilder, this.getNextOpId()));
150 outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, keyExtractor, msgExtractor));
151 return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
152 }
153
154 public Map<StreamSpec, InputOperatorSpec> getInputOperators() {
155 return Collections.unmodifiableMap(inputOperators);
156 }
157
158 public Map<StreamSpec, OutputStreamImpl> getOutputStreams() {
159 return Collections.unmodifiableMap(outputStreams);
160 }
161
162 public ContextManager getContextManager() {
163 return this.contextManager;
164 }
165
166 /* package private */ int getNextOpId() {
167 return this.opId++;
168 }
169
170 /**
171 * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl}
172 *
173 * @return a set of all available {@link OperatorSpec}s
174 */
175 public Collection<OperatorSpec> getAllOperatorSpecs() {
176 Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values();
177 Set<OperatorSpec> operatorSpecs = new HashSet<>();
178
179 for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) {
180 doGetOperatorSpecs(inputOperatorSpec, operatorSpecs);
181 }
182 return operatorSpecs;
183 }
184
185 private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) {
186 Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs();
187 for (OperatorSpec registeredOperatorSpec: registeredOperatorSpecs) {
188 specs.add(registeredOperatorSpec);
189 doGetOperatorSpecs(registeredOperatorSpec, specs);
190 }
191 }
192
193 /**
194 * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
195 *
196 * @return <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
197 */
198 public boolean hasWindowOrJoins() {
199 // Obtain the operator specs from the streamGraph
200 Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs();
201
202 Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream()
203 .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN)
204 .collect(Collectors.toSet());
205
206 return windowOrJoinSpecs.size() != 0;
207 }
208 }