31a75cefe82382258091ebb07fb0b9273e02655e
[samza.git] / samza-core / src / main / java / org / apache / samza / operators / StreamGraphImpl.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.operators;
20
21 import org.apache.samza.config.Config;
22 import org.apache.samza.config.JobConfig;
23 import org.apache.samza.operators.spec.OperatorSpec;
24 import org.apache.samza.operators.stream.InputStreamInternal;
25 import org.apache.samza.operators.stream.InputStreamInternalImpl;
26 import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
27 import org.apache.samza.operators.stream.OutputStreamInternal;
28 import org.apache.samza.operators.stream.OutputStreamInternalImpl;
29 import org.apache.samza.runtime.ApplicationRunner;
30 import org.apache.samza.system.StreamSpec;
31
32 import java.util.Collection;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.Map;
37 import java.util.Set;
38 import java.util.function.BiFunction;
39 import java.util.function.Function;
40 import java.util.stream.Collectors;
41
42 /**
43 * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to
44 * create the DAG of transforms.
45 */
46 public class StreamGraphImpl implements StreamGraph {
47
48 /**
49 * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
50 * Should only be accessed by {@link MessageStreamImpl} via {@link #getNextOpId()}.
51 */
52 private int opId = 0;
53
54 private final Map<StreamSpec, InputStreamInternal> inStreams = new HashMap<>();
55 private final Map<StreamSpec, OutputStreamInternal> outStreams = new HashMap<>();
56 private final ApplicationRunner runner;
57 private final Config config;
58
59 private ContextManager contextManager = new ContextManager() { };
60
61 public StreamGraphImpl(ApplicationRunner runner, Config config) {
62 // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems
63 // can use streamId to send and receive messages.
64 this.runner = runner;
65 this.config = config;
66 }
67
68 @Override
69 public <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
70 if (msgBuilder == null) {
71 throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
72 }
73
74 if (inStreams.containsKey(runner.getStreamSpec(streamId))) {
75 throw new IllegalStateException("Cannot invoke getInputStream() multiple times with the same streamId: " + streamId);
76 }
77
78 return inStreams.computeIfAbsent(runner.getStreamSpec(streamId),
79 streamSpec -> new InputStreamInternalImpl<>(this, streamSpec, (BiFunction<K, V, M>) msgBuilder));
80 }
81
82 @Override
83 public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
84 Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor) {
85 if (keyExtractor == null) {
86 throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
87 }
88
89 if (msgExtractor == null) {
90 throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
91 }
92
93 if (outStreams.containsKey(runner.getStreamSpec(streamId))) {
94 throw new IllegalStateException("Cannot invoke getOutputStream() multiple times with the same streamId: " + streamId);
95 }
96
97 return outStreams.computeIfAbsent(runner.getStreamSpec(streamId),
98 streamSpec -> new OutputStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
99 }
100
101 @Override
102 public StreamGraph withContextManager(ContextManager contextManager) {
103 this.contextManager = contextManager;
104 return this;
105 }
106
107 /**
108 * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
109 * An intermediate {@link MessageStream} is both an output and an input stream.
110 *
111 * @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
112 * logical streamId.
113 * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
114 * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
115 * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
116 * in the intermediate {@link MessageStream}
117 * @param <K> the type of key in the intermediate message
118 * @param <V> the type of message in the intermediate message
119 * @param <M> the type of messages in the intermediate {@link MessageStream}
120 * @return the intermediate {@link MessageStreamImpl}
121 */
122 <K, V, M> MessageStreamImpl<M> getIntermediateStream(String streamName,
123 Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor, BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
124 String streamId = String.format("%s-%s-%s",
125 config.get(JobConfig.JOB_NAME()),
126 config.get(JobConfig.JOB_ID(), "1"),
127 streamName);
128 if (msgBuilder == null) {
129 throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
130 }
131
132 if (keyExtractor == null) {
133 throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
134 }
135 if (msgExtractor == null) {
136 throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
137 }
138
139 StreamSpec streamSpec = runner.getStreamSpec(streamId);
140 IntermediateStreamInternalImpl<K, V, M> intStream =
141 (IntermediateStreamInternalImpl<K, V, M>) inStreams
142 .computeIfAbsent(streamSpec,
143 k -> new IntermediateStreamInternalImpl<>(this, streamSpec, (Function<M, K>) keyExtractor,
144 (Function<M, V>) msgExtractor, (BiFunction<K, V, M>) msgBuilder));
145 outStreams.putIfAbsent(streamSpec, intStream);
146 return intStream;
147 }
148
149 public Map<StreamSpec, InputStreamInternal> getInputStreams() {
150 return Collections.unmodifiableMap(inStreams);
151 }
152
153 public Map<StreamSpec, OutputStreamInternal> getOutputStreams() {
154 return Collections.unmodifiableMap(outStreams);
155 }
156
157 public ContextManager getContextManager() {
158 return this.contextManager;
159 }
160
161 /* package private */ int getNextOpId() {
162 return this.opId++;
163 }
164
165 /**
166 * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl}
167 *
168 * @return a set of all available {@link OperatorSpec}s
169 */
170 public Collection<OperatorSpec> getAllOperatorSpecs() {
171 Collection<InputStreamInternal> inputStreams = inStreams.values();
172 Set<OperatorSpec> operatorSpecs = new HashSet<>();
173
174 for (InputStreamInternal stream : inputStreams) {
175 doGetOperatorSpecs((MessageStreamImpl) stream, operatorSpecs);
176 }
177 return operatorSpecs;
178 }
179
180 private void doGetOperatorSpecs(MessageStreamImpl stream, Set<OperatorSpec> specs) {
181 Collection<OperatorSpec> registeredOperatorSpecs = stream.getRegisteredOperatorSpecs();
182 for (OperatorSpec spec : registeredOperatorSpecs) {
183 specs.add(spec);
184 MessageStreamImpl nextStream = spec.getNextStream();
185 if (nextStream != null) {
186 //Recursively traverse and obtain all reachable operators
187 doGetOperatorSpecs(nextStream, specs);
188 }
189 }
190 }
191
192 /**
193 * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
194 *
195 * @return <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator
196 */
197 public boolean hasWindowOrJoins() {
198 // Obtain the operator specs from the streamGraph
199 Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs();
200
201 Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream()
202 .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN)
203 .collect(Collectors.toSet());
204
205 return windowOrJoinSpecs.size() != 0;
206 }
207 }