SAMZA-1384: Race condition with async commit affects checkpoint correctness
[samza.git] / samza-core / src / test / scala / org / apache / samza / checkpoint / TestOffsetManager.scala
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied.  See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19
20 package org.apache.samza.checkpoint
21
22 import java.util
23 import java.util.Collections
24 import java.util.Collections.EmptyMap
25
26 import org.apache.samza.container.TaskName
27 import org.apache.samza.Partition
28 import org.apache.samza.system._
29 import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
30 import org.junit.Assert._
31 import org.junit.{Ignore, Test}
32 import org.apache.samza.SamzaException
33 import org.apache.samza.config.MapConfig
34 import org.scalatest.Assertions.intercept
35
36 import scala.collection.JavaConverters._
37
38 class TestOffsetManager {
39   @Test
40   def testSystemShouldUseDefaults {
41     val taskName = new TaskName("c")
42     val systemStream = new SystemStream("test-system", "test-stream")
43     val partition = new Partition(0)
44     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
45     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
46     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
47     val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest").asJava)
48     val offsetManager = OffsetManager(systemStreamMetadata, config)
49     offsetManager.register(taskName, Set(systemStreamPartition))
50     offsetManager.start
51     assertFalse(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).isDefined)
52     assertTrue(offsetManager.getStartingOffset(taskName, systemStreamPartition).isDefined)
53     assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
54   }
55
56   @Test
57   def testShouldLoadFromAndSaveWithCheckpointManager {
58     val taskName = new TaskName("c")
59     val systemStream = new SystemStream("test-system", "test-stream")
60     val partition = new Partition(0)
61     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
62     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
63     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
64     val config = new MapConfig
65     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
66     val systemAdmins = Map("test-system" -> getSystemAdmin)
67     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
68     offsetManager.register(taskName, Set(systemStreamPartition))
69     offsetManager.start
70     assertTrue(checkpointManager.isStarted)
71     assertEquals(1, checkpointManager.registered.size)
72     assertEquals(taskName, checkpointManager.registered.head)
73     assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName))
74     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
75     assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
76     assertEquals("45", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
77     offsetManager.update(taskName, systemStreamPartition, "46")
78     assertEquals("46", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
79     offsetManager.update(taskName, systemStreamPartition, "47")
80     assertEquals("47", offsetManager.getLastProcessedOffset(taskName, systemStreamPartition).get)
81     // Should never update starting offset.
82     assertEquals("46", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
83     // Should not update null offset
84     offsetManager.update(taskName, systemStreamPartition, null)
85     checkpoint(offsetManager, taskName)
86     val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47").asJava)
87     assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
88   }
89
90   @Test
91   def testGetCheckpointedOffsetMetric{
92     val taskName = new TaskName("c")
93     val systemStream = new SystemStream("test-system", "test-stream")
94     val partition = new Partition(0)
95     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
96     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
97     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
98     val config = new MapConfig
99     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
100     val systemAdmins = Map("test-system" -> getSystemAdmin)
101     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
102     offsetManager.register(taskName, Set(systemStreamPartition))
103     offsetManager.start
104     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
105     checkpoint(offsetManager, taskName)
106     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
107     offsetManager.update(taskName, systemStreamPartition, "46")
108     offsetManager.update(taskName, systemStreamPartition, "47")
109     checkpoint(offsetManager, taskName)
110     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
111     offsetManager.update(taskName, systemStreamPartition, "48")
112     checkpoint(offsetManager, taskName)
113     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
114   }
115
116   @Test
117   def testShouldResetStreams {
118     val taskName = new TaskName("c")
119     val systemStream = new SystemStream("test-system", "test-stream")
120     val partition = new Partition(0)
121     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
122     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
123     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
124     val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45").asJava)
125     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
126     val config = new MapConfig(Map(
127       "systems.test-system.samza.offset.default" -> "oldest",
128       "systems.test-system.streams.test-stream.samza.reset.offset" -> "true").asJava)
129     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
130     offsetManager.register(taskName, Set(systemStreamPartition))
131     offsetManager.start
132     assertTrue(checkpointManager.isStarted)
133     assertEquals(1, checkpointManager.registered.size)
134     assertEquals(taskName, checkpointManager.registered.head)
135     assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName))
136     // Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
137     assertEquals("0", offsetManager.getStartingOffset(taskName, systemStreamPartition).get)
138   }
139
140   @Test
141   def testOffsetManagerShouldHandleNullCheckpoints {
142     val systemStream = new SystemStream("test-system", "test-stream")
143     val partition1 = new Partition(0)
144     val partition2 = new Partition(1)
145     val taskName1 = new TaskName("P0")
146     val taskName2 = new TaskName("P1")
147     val systemStreamPartition1 = new SystemStreamPartition(systemStream, partition1)
148     val systemStreamPartition2 = new SystemStreamPartition(systemStream, partition2)
149     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(
150       partition1 -> new SystemStreamPartitionMetadata("0", "1", "2"),
151       partition2 -> new SystemStreamPartitionMetadata("3", "4", "5")).asJava)
152     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
153     val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45").asJava)
154     // Checkpoint manager only has partition 1.
155     val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
156     val systemAdmins = Map("test-system" -> getSystemAdmin)
157     val config = new MapConfig
158     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
159     // Register both partitions. Partition 2 shouldn't have a checkpoint.
160     offsetManager.register(taskName1, Set(systemStreamPartition1))
161     offsetManager.register(taskName2, Set(systemStreamPartition2))
162     offsetManager.start
163     assertTrue(checkpointManager.isStarted)
164     assertEquals(2, checkpointManager.registered.size)
165     assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1))
166     assertNull(checkpointManager.readLastCheckpoint(taskName2))
167   }
168
169   @Test
170   def testShouldFailWhenMissingMetadata {
171     val taskName = new TaskName("c")
172     val systemStream = new SystemStream("test-system", "test-stream")
173     val partition = new Partition(0)
174     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
175     val offsetManager = new OffsetManager
176     offsetManager.register(taskName, Set(systemStreamPartition))
177
178     intercept[SamzaException] {
179       offsetManager.start
180     }
181   }
182
183   @Test
184   def testDefaultSystemShouldFailWhenFailIsSpecified {
185     val systemStream = new SystemStream("test-system", "test-stream")
186     val partition = new Partition(0)
187     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
188     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
189     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
190     val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail").asJava)
191     intercept[IllegalArgumentException] {
192       OffsetManager(systemStreamMetadata, config)
193     }
194   }
195
196   @Test
197   def testDefaultStreamShouldFailWhenFailIsSpecified {
198     val systemStream = new SystemStream("test-system", "test-stream")
199     val partition = new Partition(0)
200     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
201     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
202     val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail").asJava)
203
204     intercept[IllegalArgumentException] {
205       OffsetManager(systemStreamMetadata, config)
206     }
207   }
208
209   @Test
210   def testOutdatedStreamInCheckpoint {
211     val taskName = new TaskName("c")
212     val systemStream0 = new SystemStream("test-system-0", "test-stream")
213     val systemStream1 = new SystemStream("test-system-1", "test-stream")
214     val partition0 = new Partition(0)
215     val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0)
216     val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0)
217     val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
218     val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
219     val checkpointManager = getCheckpointManager(systemStreamPartition1)
220     val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
221     offsetManager.register(taskName, Set(systemStreamPartition0))
222     offsetManager.start
223     assertTrue(checkpointManager.isStarted)
224     assertEquals(1, checkpointManager.registered.size)
225     assertNull(offsetManager.getLastProcessedOffset(taskName, systemStreamPartition1).getOrElse(null))
226   }
227
228   @Test
229   def testDefaultToUpcomingOnMissingDefault {
230     val taskName = new TaskName("task-name")
231     val ssp = new SystemStreamPartition(new SystemStream("test-system", "test-stream"), new Partition(0))
232     val sspm = new SystemStreamPartitionMetadata(null, null, "13")
233     val offsetMeta = new SystemStreamMetadata("test-stream", Map(new Partition(0) -> sspm).asJava)
234     val settings = new OffsetSetting(offsetMeta, OffsetType.OLDEST, resetOffset = false)
235     val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings))
236     offsetManager.register(taskName, Set(ssp))
237     offsetManager.start
238     assertEquals(Some("13"), offsetManager.getStartingOffset(taskName, ssp))
239   }
240
241   @Test
242   def testCheckpointListener{
243     val taskName = new TaskName("c")
244     val systemName = "test-system"
245     val systemName2 = "test-system2"
246     val systemStream = new SystemStream(systemName, "test-stream")
247     val systemStream2 = new SystemStream(systemName2, "test-stream2")
248     val partition = new Partition(0)
249     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
250     val systemStreamPartition2 = new SystemStreamPartition(systemStream2, partition)
251     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
252     val testStreamMetadata2 = new SystemStreamMetadata(systemStream2.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
253     val systemStreamMetadata = Map(systemStream -> testStreamMetadata, systemStream2->testStreamMetadata2)
254     val config = new MapConfig
255     val checkpointManager = getCheckpointManager1(systemStreamPartition,
256                                                  new Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> "100").asJava),
257                                                  taskName)
258     val systemAdmins = Map(systemName -> getSystemAdmin, systemName2->getSystemAdmin)
259     val consumer = new SystemConsumerWithCheckpointCallback
260
261     val checkpointListeners: Map[String, CheckpointListener] = if(consumer.isInstanceOf[CheckpointListener])
262       Map(systemName -> consumer.asInstanceOf[CheckpointListener])
263     else
264       Map()
265
266     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager,
267                                       systemAdmins, checkpointListeners, new OffsetManagerMetrics)
268     offsetManager.register(taskName, Set(systemStreamPartition, systemStreamPartition2))
269
270     offsetManager.start
271     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
272     checkpoint(offsetManager, taskName)
273     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
274     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
275     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
276     // make sure only the system with the callbacks gets them
277     assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
278
279     offsetManager.update(taskName, systemStreamPartition, "46")
280     offsetManager.update(taskName, systemStreamPartition, "47")
281     checkpoint(offsetManager, taskName)
282     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
283     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
284     assertEquals("47", consumer.recentCheckpoint.get(systemStreamPartition))
285     assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
286
287     offsetManager.update(taskName, systemStreamPartition, "48")
288     offsetManager.update(taskName, systemStreamPartition2, "101")
289     checkpoint(offsetManager, taskName)
290     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
291     assertEquals("101", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
292     assertEquals("48", consumer.recentCheckpoint.get(systemStreamPartition))
293     assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
294     offsetManager.stop
295   }
296
297   /**
298     * If task.max.concurrency > 1 and task.async.commit == true, a task could update its offsets at the same time as
299     * TaskInstance.commit(). This makes it possible to checkpoint offsets which did not successfully flush.
300     *
301     * This is prevented by using separate methods to get a checkpoint and write that checkpoint. See SAMZA-1384
302     */
303   @Test
304   def testConcurrentCheckpointAndUpdate{
305     val taskName = new TaskName("c")
306     val systemStream = new SystemStream("test-system", "test-stream")
307     val partition = new Partition(0)
308     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
309     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
310     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
311     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
312     val systemAdmins = Map("test-system" -> getSystemAdmin)
313     val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
314     offsetManager.register(taskName, Set(systemStreamPartition))
315     offsetManager.start
316
317     // Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
318     checkpoint(offsetManager, taskName)
319     assertEquals("45", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
320
321     offsetManager.update(taskName, systemStreamPartition, "46")
322     // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit()
323     val checkpoint46 = offsetManager.buildCheckpoint(taskName)
324     offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint
325     offsetManager.writeCheckpoint(taskName, checkpoint46)
326     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartition))
327     assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
328
329     // Now write the checkpoint for the latest offset
330     val checkpoint47 = offsetManager.buildCheckpoint(taskName)
331     offsetManager.writeCheckpoint(taskName, checkpoint47)
332     assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartition))
333     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
334   }
335
336   // Utility method to create and write checkpoint in one statement
337   def checkpoint(offsetManager: OffsetManager, taskName: TaskName): Unit = {
338     offsetManager.writeCheckpoint(taskName, offsetManager.buildCheckpoint(taskName))
339   }
340
341   class SystemConsumerWithCheckpointCallback extends SystemConsumer with CheckpointListener{
342     var recentCheckpoint: java.util.Map[SystemStreamPartition, String] = java.util.Collections.emptyMap[SystemStreamPartition, String]
343     override def start() {}
344
345     override def stop() {}
346
347     override def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
348
349     override def poll(systemStreamPartitions: util.Set[SystemStreamPartition],
350                       timeout: Long): util.Map[SystemStreamPartition, util.List[IncomingMessageEnvelope]] = { null }
351
352     override def onCheckpoint(offsets: java.util.Map[SystemStreamPartition,String]){
353       recentCheckpoint = (recentCheckpoint.asScala ++ offsets.asScala).asJava
354     }
355   }
356
357   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
358     getCheckpointManager1(systemStreamPartition, new Checkpoint(Map(systemStreamPartition -> "45").asJava), taskName)
359   }
360
361   private def getCheckpointManager1(systemStreamPartition: SystemStreamPartition, checkpoint: Checkpoint, taskName:TaskName = new TaskName("taskName")) = {
362     new CheckpointManager {
363       var isStarted = false
364       var isStopped = false
365       var registered = Set[TaskName]()
366       var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
367       var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
368       def start { isStarted = true }
369       def register(taskName: TaskName) { registered += taskName }
370       def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
371       def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
372       def stop { isStopped = true }
373
374       // Only for testing purposes - not present in actual checkpoint manager
375       def getOffets = Map(taskName -> checkpoint.getOffsets.asScala.toMap)
376     }
377   }
378
379   private def getSystemAdmin = {
380     new SystemAdmin {
381       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
382         offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
383
384       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
385         Map[String, SystemStreamMetadata]().asJava
386
387       override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
388         new UnsupportedOperationException("Method not implemented.")
389       }
390
391       override def validateChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
392         new UnsupportedOperationException("Method not implemented.")
393       }
394
395       override def createCoordinatorStream(streamName: String) {
396         new UnsupportedOperationException("Method not implemented.")
397       }
398
399       override def offsetComparator(offset1: String, offset2: String) = null
400     }
401   }
402 }