SAMZA-1268: Javadoc cleanup for public APIs for 0.13 release
[samza.git] / samza-core / src / test / java / org / apache / samza / operators / TestStreamGraphImpl.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.operators;
20
21 import org.apache.samza.config.Config;
22 import org.apache.samza.config.JobConfig;
23 import org.apache.samza.operators.data.MessageType;
24 import org.apache.samza.operators.data.TestInputMessageEnvelope;
25 import org.apache.samza.operators.data.TestMessageEnvelope;
26 import org.apache.samza.operators.stream.InputStreamInternalImpl;
27 import org.apache.samza.operators.stream.IntermediateStreamInternalImpl;
28 import org.apache.samza.operators.stream.OutputStreamInternalImpl;
29 import org.apache.samza.runtime.ApplicationRunner;
30 import org.apache.samza.system.StreamSpec;
31 import org.junit.Test;
32
33 import java.util.function.BiFunction;
34 import java.util.function.Function;
35
36 import static org.junit.Assert.assertEquals;
37 import static org.junit.Assert.assertTrue;
38 import static org.mockito.Mockito.mock;
39 import static org.mockito.Mockito.when;
40
41 public class TestStreamGraphImpl {
42
43 @Test
44 public void testGetInputStream() {
45 ApplicationRunner mockRunner = mock(ApplicationRunner.class);
46 Config mockConfig = mock(Config.class);
47 StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
48 when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
49
50 StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
51 BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
52 (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
53 MessageStream<TestMessageEnvelope> mInputStream = graph.getInputStream("test-stream-1", xMsgBuilder);
54 assertEquals(graph.getInputStreams().get(testStreamSpec), mInputStream);
55 assertTrue(mInputStream instanceof InputStreamInternalImpl);
56 assertEquals(((InputStreamInternalImpl) mInputStream).getMsgBuilder(), xMsgBuilder);
57
58 String key = "test-input-key";
59 MessageType msgBody = new MessageType("test-msg-value", 333333L);
60 TestMessageEnvelope xInputMsg = ((InputStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mInputStream).
61 getMsgBuilder().apply(key, msgBody);
62 assertEquals(xInputMsg.getKey(), key);
63 assertEquals(xInputMsg.getMessage().getValue(), msgBody.getValue());
64 assertEquals(xInputMsg.getMessage().getEventTime(), msgBody.getEventTime());
65 assertEquals(((TestInputMessageEnvelope) xInputMsg).getInputId(), "input-id-1");
66 }
67
68 @Test(expected = IllegalStateException.class)
69 public void testMultipleGetInputStream() {
70 ApplicationRunner mockRunner = mock(ApplicationRunner.class);
71 Config mockConfig = mock(Config.class);
72
73 StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
74 StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
75 StreamSpec nonExistentStreamSpec = new StreamSpec("non-existent-stream", "physical-stream-1", "test-system");
76
77 when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
78 when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
79
80 StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
81 BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
82 (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
83
84 //create 2 streams for the corresponding streamIds
85 MessageStream<TestInputMessageEnvelope> inputStream1 = graph.getInputStream("test-stream-1", xMsgBuilder);
86 MessageStream<TestInputMessageEnvelope> inputStream2 = graph.getInputStream("test-stream-2", xMsgBuilder);
87
88 //assert that the streamGraph contains only the above 2 streams
89 assertEquals(graph.getInputStreams().get(testStreamSpec1), inputStream1);
90 assertEquals(graph.getInputStreams().get(testStreamSpec2), inputStream2);
91 assertEquals(graph.getInputStreams().get(nonExistentStreamSpec), null);
92 assertEquals(graph.getInputStreams().size(), 2);
93
94 //should throw IllegalStateException
95 graph.getInputStream("test-stream-1", xMsgBuilder);
96 }
97
98
99 @Test
100 public void testGetOutputStream() {
101 ApplicationRunner mockRunner = mock(ApplicationRunner.class);
102 Config mockConfig = mock(Config.class);
103 StreamSpec testStreamSpec = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
104 when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec);
105
106 class MyMessageType extends MessageType {
107 public final String outputId;
108
109 public MyMessageType(String value, long eventTime, String outputId) {
110 super(value, eventTime);
111 this.outputId = outputId;
112 }
113 }
114
115 StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
116 Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
117 Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
118 x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
119
120 OutputStream<String, MyMessageType, TestInputMessageEnvelope> mOutputStream =
121 graph.getOutputStream("test-stream-1", xKeyExtractor, xMsgExtractor);
122 assertEquals(graph.getOutputStreams().get(testStreamSpec), mOutputStream);
123 assertTrue(mOutputStream instanceof OutputStreamInternalImpl);
124 assertEquals(((OutputStreamInternalImpl) mOutputStream).getKeyExtractor(), xKeyExtractor);
125 assertEquals(((OutputStreamInternalImpl) mOutputStream).getMsgExtractor(), xMsgExtractor);
126
127 TestInputMessageEnvelope xInputMsg = new TestInputMessageEnvelope("test-key-1", "test-msg-1", 33333L, "input-id-1");
128 assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
129 getKeyExtractor().apply(xInputMsg), "test-key-1");
130 assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
131 getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
132 assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
133 getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
134 assertEquals(((OutputStreamInternalImpl<String, MyMessageType, TestInputMessageEnvelope>) mOutputStream).
135 getMsgExtractor().apply(xInputMsg).outputId, "test-output-id-1");
136 }
137
138 @Test
139 public void testGetIntermediateStream() {
140 ApplicationRunner mockRunner = mock(ApplicationRunner.class);
141 Config mockConfig = mock(Config.class);
142 StreamSpec testStreamSpec = new StreamSpec("myJob-i001-test-stream-1", "physical-stream-1", "test-system");
143 when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(testStreamSpec);
144 when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
145 when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
146
147 class MyMessageType extends MessageType {
148 public final String outputId;
149
150 public MyMessageType(String value, long eventTime, String outputId) {
151 super(value, eventTime);
152 this.outputId = outputId;
153 }
154 }
155
156 StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
157 Function<TestMessageEnvelope, String> xKeyExtractor = x -> x.getKey();
158 Function<TestMessageEnvelope, MyMessageType> xMsgExtractor =
159 x -> new MyMessageType(x.getMessage().getValue(), x.getMessage().getEventTime(), "test-output-id-1");
160 BiFunction<String, MessageType, TestInputMessageEnvelope> xMsgBuilder =
161 (k, v) -> new TestInputMessageEnvelope(k, v.getValue(), v.getEventTime(), "input-id-1");
162
163 MessageStream<TestMessageEnvelope> mIntermediateStream =
164 graph.getIntermediateStream("test-stream-1", xKeyExtractor, xMsgExtractor, xMsgBuilder);
165 assertEquals(graph.getOutputStreams().get(testStreamSpec), mIntermediateStream);
166 assertTrue(mIntermediateStream instanceof IntermediateStreamInternalImpl);
167 assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getKeyExtractor(), xKeyExtractor);
168 assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgExtractor(), xMsgExtractor);
169 assertEquals(((IntermediateStreamInternalImpl) mIntermediateStream).getMsgBuilder(), xMsgBuilder);
170
171 TestMessageEnvelope xInputMsg = new TestMessageEnvelope("test-key-1", "test-msg-1", 33333L);
172 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
173 getKeyExtractor().apply(xInputMsg), "test-key-1");
174 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
175 getMsgExtractor().apply(xInputMsg).getValue(), "test-msg-1");
176 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
177 getMsgExtractor().apply(xInputMsg).getEventTime(), 33333L);
178 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
179 getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getKey(), "test-key-1");
180 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
181 getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getValue(), "test-msg-1");
182 assertEquals(((IntermediateStreamInternalImpl<String, MessageType, TestMessageEnvelope>) mIntermediateStream).
183 getMsgBuilder().apply("test-key-1", new MyMessageType("test-msg-1", 33333L, "test-output-id-1")).getMessage().getEventTime(), 33333L);
184 }
185
186 @Test
187 public void testGetNextOpId() {
188 ApplicationRunner mockRunner = mock(ApplicationRunner.class);
189 Config mockConfig = mock(Config.class);
190
191 StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
192 assertEquals(graph.getNextOpId(), 0);
193 assertEquals(graph.getNextOpId(), 1);
194 }
195
196 }