SAMZA-1387: Unable to Start Samza App Because Regex Check
[samza.git] / samza-kafka / src / test / java / org / apache / samza / system / kafka / TestKafkaSystemAdminJava.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 package org.apache.samza.system.kafka;
21
22 import java.util.HashMap;
23 import java.util.Map;
24 import java.util.Properties;
25 import org.apache.samza.system.StreamSpec;
26 import org.apache.samza.system.StreamValidationException;
27 import org.apache.samza.system.SystemAdmin;
28 import org.apache.samza.util.Util;
29 import org.junit.Test;
30 import org.mockito.ArgumentCaptor;
31 import org.mockito.Mockito;
32
33 import static org.junit.Assert.*;
34
35
36 public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
37
38 KafkaSystemAdmin basicSystemAdmin = createSystemAdmin();
39
40
41 @Test
42 public void testCreateCoordinatorStreamDelegatesToCreateStream() {
43 KafkaSystemAdmin systemAdmin = createSystemAdmin();//coordProps, 3, new scala.collection.immutable.HashMap<>(), 1000);
44 SystemAdmin admin = Mockito.spy(systemAdmin);
45 StreamSpec spec = new StreamSpec("testId", "testCoordinatorStream", "testSystem");
46
47 admin.createCoordinatorStream(spec.getPhysicalName());
48 admin.validateStream(spec);
49
50 Mockito.verify(admin).createStream(Mockito.any());
51 }
52
53 @Test
54 public void testCreateCoordinatorStreamDelegatesToCreateStream_specialCharsInTopicName() {
55 final String STREAM = "test.Coord_inator.Stream";
56
57 SystemAdmin admin = Mockito.spy(createSystemAdmin());
58 StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM());
59 admin.createCoordinatorStream(STREAM);
60 admin.validateStream(spec);
61
62 ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
63 Mockito.verify(admin).createStream(specCaptor.capture());
64
65 StreamSpec internalSpec = specCaptor.getValue();
66 assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
67 assertEquals(KafkaSystemAdmin.COORDINATOR_STREAMID(), internalSpec.getId());
68 assertEquals(SYSTEM(), internalSpec.getSystemName());
69 assertEquals(STREAM, internalSpec.getPhysicalName());
70 }
71
72 @Test
73 public void testCreateChangelogStreamDelegatesToCreateStream() {
74 final String STREAM = "testChangeLogStream";
75 final int PARTITIONS = 12;
76 final int REP_FACTOR = 3;
77
78 Properties coordProps = new Properties();
79 Properties changeLogProps = new Properties();
80 changeLogProps.setProperty("cleanup.policy", "compact");
81 changeLogProps.setProperty("segment.bytes", "139");
82 Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
83 changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
84
85 SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
86 StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
87 admin.createChangelogStream(STREAM, PARTITIONS);
88 admin.validateStream(spec);
89
90 ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
91 Mockito.verify(admin).createStream(specCaptor.capture());
92
93 StreamSpec internalSpec = specCaptor.getValue();
94 assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
95 assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
96 assertEquals(SYSTEM(), internalSpec.getSystemName());
97 assertEquals(STREAM, internalSpec.getPhysicalName());
98 assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
99 assertEquals(PARTITIONS, internalSpec.getPartitionCount());
100 assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
101 }
102
103 @Test
104 public void testCreateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
105 final String STREAM = "test.Change_Log.Stream";
106 final int PARTITIONS = 12;
107 final int REP_FACTOR = 3;
108
109 Properties coordProps = new Properties();
110 Properties changeLogProps = new Properties();
111 changeLogProps.setProperty("cleanup.policy", "compact");
112 changeLogProps.setProperty("segment.bytes", "139");
113 Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
114 changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
115
116 SystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap)));
117 StreamSpec spec = new StreamSpec(KafkaSystemAdmin.CHANGELOG_STREAMID(), STREAM, SYSTEM(), PARTITIONS);
118 admin.createChangelogStream(STREAM, PARTITIONS);
119 admin.validateStream(spec);
120
121 ArgumentCaptor<StreamSpec> specCaptor = ArgumentCaptor.forClass(StreamSpec.class);
122 Mockito.verify(admin).createStream(specCaptor.capture());
123
124 StreamSpec internalSpec = specCaptor.getValue();
125 assertTrue(internalSpec instanceof KafkaStreamSpec); // KafkaStreamSpec is used to carry replication factor
126 assertEquals(KafkaSystemAdmin.CHANGELOG_STREAMID(), internalSpec.getId());
127 assertEquals(SYSTEM(), internalSpec.getSystemName());
128 assertEquals(STREAM, internalSpec.getPhysicalName());
129 assertEquals(REP_FACTOR, ((KafkaStreamSpec) internalSpec).getReplicationFactor());
130 assertEquals(PARTITIONS, internalSpec.getPartitionCount());
131 assertEquals(changeLogProps, ((KafkaStreamSpec) internalSpec).getProperties());
132 }
133
134 @Test
135 public void testValidateChangelogStreamDelegatesToValidateStream() {
136 final String STREAM = "testChangeLogValidate";
137 Properties coordProps = new Properties();
138 Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
139 changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
140
141 KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
142 SystemAdmin admin = Mockito.spy(systemAdmin);
143 StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
144
145 admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
146 admin.validateStream(spec);
147 admin.validateChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
148
149 Mockito.verify(admin).createStream(Mockito.any());
150 Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
151 }
152
153 @Test
154 public void testValidateChangelogStreamDelegatesToCreateStream_specialCharsInTopicName() {
155 final String STREAM = "test.Change_Log.Validate";
156 Properties coordProps = new Properties();
157 Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
158 changeLogMap.put(STREAM, new ChangelogInfo(3, new Properties()));
159
160 KafkaSystemAdmin systemAdmin = createSystemAdmin(coordProps, 3, Util.javaMapAsScalaMap(changeLogMap));
161 SystemAdmin admin = Mockito.spy(systemAdmin);
162 StreamSpec spec = new StreamSpec("testId", STREAM, "testSystem", 12);
163
164 admin.createChangelogStream(spec.getPhysicalName(), spec.getPartitionCount());
165 admin.validateStream(spec);
166 admin.validateChangelogStream(STREAM, spec.getPartitionCount()); // Should not throw
167
168 Mockito.verify(admin).createStream(Mockito.any());
169 Mockito.verify(admin, Mockito.times(3)).validateStream(Mockito.any());
170 }
171
172 @Test
173 public void testCreateStream() {
174 SystemAdmin admin = this.basicSystemAdmin;
175 StreamSpec spec = new StreamSpec("testId", "testStream", "testSystem", 8);
176
177 assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec));
178 admin.validateStream(spec);
179
180 assertFalse("createStream should return false if the stream already exists.", admin.createStream(spec));
181 }
182
183 @Test(expected = StreamValidationException.class)
184 public void testValidateStreamDoesNotExist() {
185 SystemAdmin admin = this.basicSystemAdmin;
186
187 StreamSpec spec = new StreamSpec("testId", "testStreamNameExist", "testSystem", 8);
188
189 admin.validateStream(spec);
190 }
191
192 @Test(expected = StreamValidationException.class)
193 public void testValidateStreamWrongPartitionCount() {
194 SystemAdmin admin = this.basicSystemAdmin;
195 StreamSpec spec1 = new StreamSpec("testId", "testStreamPartition", "testSystem", 8);
196 StreamSpec spec2 = new StreamSpec("testId", "testStreamPartition", "testSystem", 4);
197
198 assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
199
200 admin.validateStream(spec2);
201 }
202
203 @Test(expected = StreamValidationException.class)
204 public void testValidateStreamWrongName() {
205 SystemAdmin admin = this.basicSystemAdmin;
206 StreamSpec spec1 = new StreamSpec("testId", "testStreamName1", "testSystem", 8);
207 StreamSpec spec2 = new StreamSpec("testId", "testStreamName2", "testSystem", 8);
208
209 assertTrue("createStream should return true if the stream does not exist and then is created.", admin.createStream(spec1));
210
211 admin.validateStream(spec2);
212 }
213 }