Tests for deprecated Kafka Consumer
authorBoris S <bshkolnik@linkedin.com>
Wed, 24 Oct 2018 01:11:43 +0000 (18:11 -0700)
committerBoris S <bshkolnik@linkedin.com>
Wed, 24 Oct 2018 01:11:43 +0000 (18:11 -0700)
Tested with running with system set to org.apache.samza.system.kafka_deprecated.KafkaSystemFactory.

Author: Boris S <bshkolnik@linkedin.com>
Author: Boris S <boryas@apache.org>
Author: Boris Shkolnik <bshkolni@linkedin.com>

Reviewers: Cameron Lee <calee@linkedin.com>

Closes #755 from sborya/OldKafkaConsumer

samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala [new file with mode: 0644]
samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala [new file with mode: 0644]

diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestBrokerProxy.scala
new file mode 100644 (file)
index 0000000..702e674
--- /dev/null
@@ -0,0 +1,434 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.samza.system.kafka_deprecated
+
+import java.nio.ByteBuffer
+import java.util.concurrent.CountDownLatch
+
+import kafka.api.{PartitionOffsetsResponse, _}
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{ByteBufferMessageSet, Message, MessageAndOffset, MessageSet}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Logging
+import org.junit.Assert._
+import org.junit._
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.{Matchers, Mockito}
+
+import scala.collection.JavaConverters._
+
+class TestBrokerProxy extends Logging {
+  val tp2 = new TopicAndPartition("Redbird", 2013)
+  var fetchTp1 = true // control whether fetching tp1 messages or not
+
+  @Test def brokerProxyRetrievesMessagesCorrectly() = {
+    val (bp, tp, sink) = getMockBrokerProxy()
+
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+    // Add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, Option("0"))
+    Thread.sleep(1000)
+    assertEquals(2, sink.receivedMessages.size)
+    assertEquals(42, sink.receivedMessages(0)._2.offset)
+    assertEquals(84, sink.receivedMessages(1)._2.offset)
+  }
+
+  @Test def brokerProxySkipsFetchForEmptyRequests() = {
+    val (bp, tp, sink) = getMockBrokerProxy()
+
+    bp.start
+    // Only add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, Option("0"))
+    Thread.sleep(1000)
+    assertEquals(0, sink.receivedMessages.size)
+    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0)
+    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
+  }
+
+  @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
+    val (bp, tp, _) = getMockBrokerProxy()
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+
+    try {
+      bp.addTopicPartition(tp, Option("1"))
+      fail("Should have thrown an exception")
+    } catch {
+      case se: SamzaException => assertEquals(se.getMessage, "Already consuming TopicPartition [Redbird,2012]")
+      case other: Exception => fail("Got some other exception than what we were expecting: " + other)
+    }
+  }
+
+  def getMockBrokerProxy() = {
+    val sink = new MessageSink {
+      val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
+
+      def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+
+      def refreshDropped() {}
+
+      def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
+      }
+
+      def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+      }
+
+      // Never need messages for tp2.
+      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2) && fetchTp1
+    }
+
+    val system = "daSystem"
+    val host = "host"
+    val port = 2222
+    val tp = new TopicAndPartition("Redbird", 2012)
+    val metrics = new KafkaSystemConsumerMetrics(system)
+
+    metrics.registerBrokerProxy(host, port)
+    metrics.registerTopicAndPartition(tp)
+    metrics.topicPartitions.get((host, port)).set(1)
+
+    val bp = new BrokerProxy(
+      host,
+      port,
+      system,
+      "daClientId",
+      metrics,
+      sink,
+      offsetGetter = new GetOffset("fail", Map("Redbird" -> "largest"))) {
+
+      override val sleepMSWhileNoTopicPartitions = 100
+      // Speed up for test
+      var alreadyCreatedConsumer = false
+
+      // Scala traits and Mockito mocks don't mix, unfortunately.
+      override def createSimpleConsumer() = {
+        if (alreadyCreatedConsumer) {
+          System.err.println("Should only be creating one consumer in this test!")
+          throw new InterruptedException("Should only be creating one consumer in this test!")
+        }
+        alreadyCreatedConsumer = true
+
+        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) {
+          val sc = Mockito.mock(classOf[SimpleConsumer])
+          val mockOffsetResponse = {
+            val offsetResponse = Mockito.mock(classOf[OffsetResponse])
+            val partitionOffsetResponse = {
+              val por = Mockito.mock(classOf[PartitionOffsetsResponse])
+              when(por.offsets).thenReturn(List(1l).toSeq)
+              por
+            }
+
+            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
+            when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
+            offsetResponse
+          }
+
+          when(sc.getOffsetsBefore(any(classOf[OffsetRequest]))).thenReturn(mockOffsetResponse)
+
+          val fetchResponse = {
+            val fetchResponse = Mockito.mock(classOf[FetchResponse])
+
+            val messageSet = {
+              val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
+
+              def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
+              val messages = List(new MessageAndOffset(getMessage, 42), new MessageAndOffset(getMessage, 84))
+
+              when(messageSet.sizeInBytes).thenReturn(43)
+              when(messageSet.size).thenReturn(44)
+              when(messageSet.iterator).thenReturn(messages.iterator)
+              when(messageSet.head).thenReturn(messages.head)
+              messageSet
+            }
+
+            val fetchResponsePartitionData = FetchResponsePartitionData(Errors.NONE, 500, messageSet)
+            val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
+
+            when(fetchResponse.data).thenReturn(map.toSeq)
+            when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
+            fetchResponse
+          }
+          when(sc.fetch(any(classOf[FetchRequest]))).thenReturn(fetchResponse)
+
+          override def close() = sc.close()
+
+          override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
+
+          override def fetch(request: FetchRequest): FetchResponse = {
+            // Verify that we only get fetch requests for one tp, even though
+            // two were registered. This is to verify that
+            // sink.needsMoreMessages works.
+            assertEquals(1, request.requestInfo.size)
+            sc.fetch(request)
+          }
+
+          when(sc.earliestOrLatestOffset(any(classOf[TopicAndPartition]), any(classOf[Long]), any(classOf[Int]))).thenReturn(100)
+
+          override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
+
+          override def commitOffsets(request: OffsetCommitRequest): OffsetCommitResponse = sc.commitOffsets(request)
+
+          override def fetchOffsets(request: OffsetFetchRequest): OffsetFetchResponse = sc.fetchOffsets(request)
+
+          override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = sc.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
+        }
+      }
+
+    }
+
+    (bp, tp, sink)
+  }
+
+  @Test def brokerProxyUpdateLatencyMetrics() = {
+    val (bp, tp, _) = getMockBrokerProxy()
+
+    bp.start
+    bp.addTopicPartition(tp, Option("0"))
+    Thread.sleep(1000)
+    // update when fetching messages
+    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(415, bp.metrics.lag.get(tp).getValue)
+
+    fetchTp1 = false
+    Thread.sleep(1000)
+    // update when not fetching messages
+    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(15, bp.metrics.lag.get(tp).getValue)
+
+    fetchTp1 = true
+  }
+
+ @Test def brokerProxyCorrectlyHandlesOffsetOutOfRange(): Unit = {
+    // Need to wait for the thread to do some work before ending the test
+    val countdownLatch = new CountDownLatch(1)
+    var failString: String = null
+
+    val mockMessageSink = mock(classOf[MessageSink])
+    when(mockMessageSink.needsMoreMessages(any())).thenReturn(true)
+
+    val doNothingMetrics = new KafkaSystemConsumerMetrics()
+
+    val tp = new TopicAndPartition("topic", 42)
+
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    // This will be used by the simple consumer below, and this is the response that simple consumer needs
+    when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+    when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+
+    var callsToCreateSimpleConsumer = 0
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    // Create an answer that first indicates offset out of range on first invocation and on second
+    // verifies that the parameters have been updated to what we expect them to be
+    val answer = new Answer[FetchResponse]() {
+      var invocationCount = 0
+
+      def answer(invocation: InvocationOnMock): FetchResponse = {
+        val arguments = invocation.getArguments()(0).asInstanceOf[List[Object]](0).asInstanceOf[(String, Long)]
+
+        if (invocationCount == 0) {
+          if (arguments !=(tp, 0)) {
+            failString = "First invocation did not have the right arguments: " + arguments
+            countdownLatch.countDown()
+          }
+          val mfr = mock(classOf[FetchResponse])
+          when(mfr.hasError).thenReturn(true)
+          when(mfr.error("topic", 42)).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
+
+          val messageSet = mock(classOf[MessageSet])
+          when(messageSet.iterator).thenReturn(Iterator.empty)
+          val response = mock(classOf[FetchResponsePartitionData])
+          when(response.error).thenReturn(Errors.OFFSET_OUT_OF_RANGE)
+          val responseMap = Map(tp -> response)
+          when(mfr.data).thenReturn(responseMap.toSeq)
+          invocationCount += 1
+          mfr
+        } else {
+          if (arguments !=(tp, 1492)) {
+            failString = "On second invocation, arguments were not correct: " + arguments
+          }
+          countdownLatch.countDown()
+          Thread.currentThread().interrupt()
+          null
+        }
+      }
+    }
+
+    when(mockSimpleConsumer.defaultFetch(any())).thenAnswer(answer)
+
+    // So now we have a fetch response that will fail.  Prime the mockGetOffset to send us to a new offset
+
+    val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+
+      override def createSimpleConsumer() = {
+        if (callsToCreateSimpleConsumer > 1) {
+          failString = "Tried to create more than one simple consumer"
+          countdownLatch.countDown()
+        }
+        callsToCreateSimpleConsumer += 1
+        mockSimpleConsumer
+      }
+    }
+
+    bp.addTopicPartition(tp, Option("0"))
+    bp.start
+    countdownLatch.await()
+    bp.stop
+    if (failString != null) {
+      fail(failString)
+    }
+  }
+
+  /**
+   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
+   * that it owns when a consumer failure occurs.
+   */
+  @Test def brokerProxyAbdicatesOnConnectionFailure(): Unit = {
+    val countdownLatch = new CountDownLatch(1)
+    var abdicated: Option[TopicAndPartition] = None
+    @volatile var refreshDroppedCount = 0
+    val mockMessageSink = new MessageSink {
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+      }
+
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+      }
+
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
+        abdicated = Some(tp)
+        countdownLatch.countDown
+      }
+
+      override def refreshDropped() {
+        refreshDroppedCount += 1
+      }
+
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
+        true
+      }
+    }
+
+    val doNothingMetrics = new KafkaSystemConsumerMetrics()
+    val tp = new TopicAndPartition("topic", 42)
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+    when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+    when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that."))
+
+    val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+
+    val waitForRefresh = () => {
+      val currentRefreshDroppedCount = refreshDroppedCount
+      while (refreshDroppedCount == currentRefreshDroppedCount) {
+        Thread.sleep(100)
+      }
+    }
+
+    bp.addTopicPartition(tp, Option("0"))
+    bp.start
+    // BP should refresh on startup.
+    waitForRefresh()
+    countdownLatch.await()
+    // BP should continue refreshing after it's abdicated all TopicAndPartitions.
+    waitForRefresh()
+    bp.stop
+    assertEquals(tp, abdicated.getOrElse(null))
+  }
+
+  @Test def brokerProxyAbdicatesHardErrors(): Unit = {
+    val doNothingMetrics = new KafkaSystemConsumerMetrics
+    val mockMessageSink = new MessageSink {
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {}
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {}
+      override def refreshDropped() {throw new OutOfMemoryError("Test - OOME")}
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+    }
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    val bp = new BrokerProxy("host", 658, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    var caughtError = false
+    try {
+      bp.thread.run
+    } catch {
+      case e: SamzaException => {
+        assertEquals(e.getMessage, "Got out of memory error in broker proxy thread.")
+        info("Received OutOfMemoryError in broker proxy.")
+        caughtError = true
+      }
+    }
+    assertEquals(true, caughtError)
+    val mockMessageSink2 = new MessageSink {
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = true
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long): Unit = {}
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit = {}
+      override def refreshDropped(): Unit = {throw new StackOverflowError("Test - SOE")}
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean): Unit = {}
+    }
+    caughtError = false
+    val bp2 = new BrokerProxy("host", 689, "system", "clientID2", doNothingMetrics, mockMessageSink2, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    try {
+      bp2.thread.run
+    } catch {
+      case e: SamzaException => {
+        assertEquals(e.getMessage, "Got stack overflow error in broker proxy thread.")
+        info("Received StackOverflowError in broker proxy.")
+        caughtError = true
+      }
+    }
+    assertEquals(true, caughtError)
+  }
+
+  @Test
+       def brokerProxyStopCloseConsumer: Unit = {
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+    val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    bp.start
+    bp.stop
+    verify(mockSimpleConsumer).close
+  }
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestGetOffset.scala
new file mode 100644 (file)
index 0000000..21ee4cd
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.nio.ByteBuffer
+
+import kafka.api._
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.Message
+import kafka.message.ByteBufferMessageSet
+import org.apache.kafka.common.errors.OffsetOutOfRangeException
+import org.junit._
+import org.junit.Assert._
+import org.mockito.Mockito
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+class TestGetOffset {
+
+  private val outOfRangeOffset : String = "0"
+
+  /**
+   * An empty message set is still a valid offset. It just means that the
+   * offset was for the upcoming message, which hasn't yet been written. The
+   * fetch request times out in such a case, and an empty message set is
+   * returned.
+   */
+  @Test
+  def testIsValidOffsetWorksWithEmptyMessageSet {
+    val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
+    // Should not throw an exception.
+    assertTrue(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), "1234"))
+  }
+
+  /**
+    * An empty message set is still a valid offset. It just means that the
+    * offset was for the upcoming message, which hasn't yet been written. The
+    * fetch request times out in such a case, and an empty message set is
+    * returned.
+    */
+  @Test
+  def testIsValidOffsetWorksWithOffsetOutOfRangeException {
+    val getOffset = new GetOffset(OffsetRequest.LargestTimeString)
+    // Should not throw an exception.
+    assertFalse(getOffset.isValidOffset(getMockDefaultFetchSimpleConsumer, TopicAndPartition("foo", 1), outOfRangeOffset))
+  }
+
+  /**
+   * Create a default fetch simple consumer that returns empty message sets.
+   */
+  def getMockDefaultFetchSimpleConsumer = {
+    new DefaultFetchSimpleConsumer("", 0, 0, 0, "") {
+      val sc = Mockito.mock(classOf[SimpleConsumer])
+
+      // Build an empty fetch response.
+      val fetchResponse = {
+        val fetchResponse = Mockito.mock(classOf[FetchResponse])
+        val messageSet = {
+          val messageSet = Mockito.mock(classOf[ByteBufferMessageSet])
+          val messages = List()
+
+          def getMessage() = new Message(Mockito.mock(classOf[ByteBuffer]))
+
+          when(messageSet.sizeInBytes).thenReturn(0)
+          when(messageSet.size).thenReturn(0)
+          when(messageSet.iterator).thenReturn(messages.iterator)
+
+          messageSet
+        }
+        when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
+
+        fetchResponse
+      }
+
+      doAnswer(new Answer[FetchResponse] {
+          override def answer(invocation: InvocationOnMock): FetchResponse = {
+            if (invocation.getArgumentAt(0, classOf[FetchRequest]).requestInfo.exists(
+              req => req._2.offset.toString.equals(outOfRangeOffset))) {
+              throw new OffsetOutOfRangeException("test exception")
+            }
+            fetchResponse
+          }
+        }).when(sc).fetch(any(classOf[FetchRequest]))
+
+      override def fetch(request: FetchRequest): FetchResponse = {
+        sc.fetch(request)
+      }
+    }
+  }
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemAdmin.scala
new file mode 100644 (file)
index 0000000..bf64c03
--- /dev/null
@@ -0,0 +1,351 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.util.{Properties, UUID}
+
+import kafka.admin.AdminUtils
+import org.apache.kafka.common.errors.LeaderNotAvailableException
+import org.apache.kafka.common.protocol.Errors
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector}
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{TestUtils, ZkUtils}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.security.JaasUtils
+import org.apache.samza.Partition
+import org.apache.samza.config.KafkaProducerConfig
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.system.{StreamSpec, SystemStreamMetadata, SystemStreamPartition}
+import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, TopicMetadataStore}
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+/**
+  * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava
+  */
+object TestKafkaSystemAdmin extends KafkaServerTestHarness {
+
+  val SYSTEM = "kafka"
+  val TOPIC = "input"
+  val TOPIC2 = "input2"
+  val TOTAL_PARTITIONS = 50
+  val REPLICATION_FACTOR = 2
+  val zkSecure = JaasUtils.isZkSecurityEnabled()
+
+  protected def numBrokers: Int = 3
+
+  var producer: KafkaProducer[Array[Byte], Array[Byte]] = null
+  var metadataStore: TopicMetadataStore = null
+  var producerConfig: KafkaProducerConfig = null
+  var systemAdmin: KafkaSystemAdmin = null
+
+  override def generateConfigs(): Seq[KafkaConfig] = {
+    val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, true)
+    props.map(KafkaConfig.fromProps)
+  }
+
+  @BeforeClass
+  override def setUp() {
+    super.setUp()
+    val config = new java.util.HashMap[String, String]()
+    config.put("bootstrap.servers", brokerList)
+    config.put("acks", "all")
+    config.put("serializer.class", "kafka.serializer.StringEncoder")
+    producerConfig = new KafkaProducerConfig("kafka", "i001", config)
+    producer = new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
+    metadataStore = new ClientUtilTopicMetadataStore(brokerList, "some-job-name")
+    systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure))
+    systemAdmin.start()
+  }
+
+  @AfterClass
+  override def tearDown() {
+    systemAdmin.stop()
+    producer.close()
+    super.tearDown()
+  }
+
+  def createTopic(topicName: String, partitionCount: Int) {
+    AdminUtils.createTopic(
+      zkUtils,
+      topicName,
+      partitionCount,
+      REPLICATION_FACTOR)
+  }
+
+  def validateTopic(topic: String, expectedPartitionCount: Int) {
+    var done = false
+    var retries = 0
+    val maxRetries = 100
+
+    while (!done && retries < maxRetries) {
+      try {
+        val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), SYSTEM, metadataStore.getTopicInfo)
+        val topicMetadata = topicMetadataMap(topic)
+
+        KafkaUtil.maybeThrowException(topicMetadata.error.exception())
+
+        done = expectedPartitionCount == topicMetadata.partitionsMetadata.size
+      } catch {
+        case e: Exception =>
+          System.err.println("Got exception while validating test topics. Waiting and retrying.", e)
+          retries += 1
+          Thread.sleep(500)
+      }
+    }
+
+    if (retries >= maxRetries) {
+      fail("Unable to successfully create topics. Tried to validate %s times." format retries)
+    }
+  }
+
+  def getConsumerConnector(): ConsumerConnector = {
+    val props = new Properties
+
+    props.put("zookeeper.connect", zkConnect)
+    props.put("group.id", "test")
+    props.put("auto.offset.reset", "smallest")
+
+    val consumerConfig = new ConsumerConfig(props)
+    Consumer.create(consumerConfig)
+  }
+
+  def createSystemAdmin(coordinatorStreamProperties: Properties, coordinatorStreamReplicationFactor: Int, topicMetaInformation: Map[String, ChangelogInfo]): KafkaSystemAdmin = {
+    new KafkaSystemAdmin(SYSTEM, brokerList, connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamProperties,
+      coordinatorStreamReplicationFactor, 10000, ConsumerConfig.SocketBufferSize, UUID.randomUUID.toString, topicMetaInformation, Map(), false)
+  }
+
+}
+
+/**
+ * Test creates a local ZK and Kafka cluster, and uses it to create and test
+ * topics for to verify that offset APIs in SystemAdmin work as expected.
+ */
+class TestKafkaSystemAdmin {
+  import TestKafkaSystemAdmin._
+
+  @Test
+  def testShouldAssembleMetadata {
+    val oldestOffsets = Map(
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "o1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "o2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "o3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "o4")
+    val newestOffsets = Map(
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "n1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "n2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "n3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "n4")
+    val upcomingOffsets = Map(
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(0)) -> "u1",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(0)) -> "u2",
+      new SystemStreamPartition(SYSTEM, "stream1", new Partition(1)) -> "u3",
+      new SystemStreamPartition(SYSTEM, "stream2", new Partition(1)) -> "u4")
+    val metadata = KafkaSystemAdmin.assembleMetadata(oldestOffsets, newestOffsets, upcomingOffsets)
+    assertNotNull(metadata)
+    assertEquals(2, metadata.size)
+    assertTrue(metadata.contains("stream1"))
+    assertTrue(metadata.contains("stream2"))
+    val stream1Metadata = metadata("stream1")
+    val stream2Metadata = metadata("stream2")
+    assertNotNull(stream1Metadata)
+    assertNotNull(stream2Metadata)
+    assertEquals("stream1", stream1Metadata.getStreamName)
+    assertEquals("stream2", stream2Metadata.getStreamName)
+    val expectedSystemStream1Partition0Metadata = new SystemStreamPartitionMetadata("o1", "n1", "u1")
+    val expectedSystemStream1Partition1Metadata = new SystemStreamPartitionMetadata("o3", "n3", "u3")
+    val expectedSystemStream2Partition0Metadata = new SystemStreamPartitionMetadata("o2", "n2", "u2")
+    val expectedSystemStream2Partition1Metadata = new SystemStreamPartitionMetadata("o4", "n4", "u4")
+    val stream1PartitionMetadata = stream1Metadata.getSystemStreamPartitionMetadata
+    val stream2PartitionMetadata = stream2Metadata.getSystemStreamPartitionMetadata
+    assertEquals(expectedSystemStream1Partition0Metadata, stream1PartitionMetadata.get(new Partition(0)))
+    assertEquals(expectedSystemStream1Partition1Metadata, stream1PartitionMetadata.get(new Partition(1)))
+    assertEquals(expectedSystemStream2Partition0Metadata, stream2PartitionMetadata.get(new Partition(0)))
+    assertEquals(expectedSystemStream2Partition1Metadata, stream2PartitionMetadata.get(new Partition(1)))
+  }
+
+  @Test
+  def testShouldGetOldestNewestAndNextOffsets {
+    // Create an empty topic with 50 partitions, but with no offsets.
+    createTopic(TOPIC, 50)
+    validateTopic(TOPIC, 50)
+
+    // Verify the empty topic behaves as expected.
+    var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+    assertEquals(1, metadata.size)
+    assertNotNull(metadata.get(TOPIC))
+    // Verify partition count.
+    var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata
+    assertEquals(50, sspMetadata.size)
+    // Empty topics should have null for latest offset and 0 for earliest offset
+    assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
+    assertNull(sspMetadata.get(new Partition(0)).getNewestOffset)
+    // Empty Kafka topics should have a next offset of 0.
+    assertEquals("0", sspMetadata.get(new Partition(0)).getUpcomingOffset)
+
+    // Add a new message to one of the partitions, and verify that it works as
+    // expected.
+    producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+    assertEquals(1, metadata.size)
+    val streamName = metadata.keySet.asScala.head
+    assertEquals(TOPIC, streamName)
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
+    // key1 gets hash-mod'd to partition 48.
+    assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
+    assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset)
+    assertEquals("1", sspMetadata.get(new Partition(48)).getUpcomingOffset)
+    // Some other partition should be empty.
+    assertEquals("0", sspMetadata.get(new Partition(3)).getOldestOffset)
+    assertNull(sspMetadata.get(new Partition(3)).getNewestOffset)
+    assertEquals("0", sspMetadata.get(new Partition(3)).getUpcomingOffset)
+
+    // Add a second message to one of the same partition.
+    producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get()
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
+    assertEquals(1, metadata.size)
+    assertEquals(TOPIC, streamName)
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
+    // key1 gets hash-mod'd to partition 48.
+    assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
+    assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset)
+    assertEquals("2", sspMetadata.get(new Partition(48)).getUpcomingOffset)
+
+    // Validate that a fetch will return the message.
+    val connector = getConsumerConnector
+    var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator
+    var message = stream.next
+    var text = new String(message.message, "UTF-8")
+    connector.shutdown
+    // First message should match the earliest expected offset.
+    assertEquals(sspMetadata.get(new Partition(48)).getOldestOffset, message.offset.toString)
+    assertEquals("val1", text)
+    // Second message should match the earliest expected offset.
+    message = stream.next
+    text = new String(message.message, "UTF-8")
+    assertEquals(sspMetadata.get(new Partition(48)).getNewestOffset, message.offset.toString)
+    assertEquals("val2", text)
+  }
+
+  @Test
+  def testNonExistentTopic {
+    val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava)
+    val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata"))
+    assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
+      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava))
+  }
+
+  @Test
+  def testOffsetsAfter {
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+    val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
+      ssp1 -> "1",
+      ssp2 -> "2").asJava)
+    assertEquals("2", offsetsAfter.get(ssp1))
+    assertEquals("3", offsetsAfter.get(ssp2))
+  }
+
+  @Test
+  def testShouldCreateCoordinatorStream {
+    val topic = "test-coordinator-stream"
+    val systemAdmin = new KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure), coordinatorStreamReplicationFactor = 3)
+
+    val spec = StreamSpec.createCoordinatorStreamSpec(topic, "kafka")
+    systemAdmin.createStream(spec)
+    validateTopic(topic, 1)
+    val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+    assertTrue(topicMetadataMap.contains(topic))
+    val topicMetadata = topicMetadataMap(topic)
+    val partitionMetadata = topicMetadata.partitionsMetadata.head
+    assertEquals(0, partitionMetadata.partitionId)
+    assertEquals(3, partitionMetadata.replicas.size)
+  }
+
+  class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin(SYSTEM, brokerList, () => ZkUtils(zkConnect, 6000, 6000, zkSecure)) {
+    import kafka.api.TopicMetadata
+    var metadataCallCount = 0
+
+    // Simulate Kafka telling us that the leader for the topic is not available
+    override def getTopicMetadata(topics: Set[String]) = {
+      metadataCallCount += 1
+      val topicMetadata = TopicMetadata(topic = "quux", partitionsMetadata = Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+      Map("quux" -> topicMetadata)
+    }
+  }
+
+  @Test
+  def testShouldRetryOnTopicMetadataError {
+    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+    val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
+    try {
+      systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff)
+      fail("expected CallLimitReached to be thrown")
+    } catch {
+      case e: ExponentialSleepStrategy.CallLimitReached => ()
+    }
+  }
+
+  @Test
+  def testGetNewestOffset {
+    createTopic(TOPIC2, 16)
+    validateTopic(TOPIC2, 16)
+
+    val sspUnderTest = new SystemStreamPartition("kafka", TOPIC2, new Partition(4))
+    val otherSsp = new SystemStreamPartition("kafka", TOPIC2, new Partition(13))
+
+    assertNull(systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Add a new message to one of the partitions, and verify that it works as expected.
+    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 4, "key1".getBytes, "val1".getBytes)).get().offset().toString)
+    assertEquals("0", systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Again
+    assertEquals("1", producer.send(new ProducerRecord(TOPIC2, 4, "key2".getBytes, "val2".getBytes)).get().offset().toString)
+    assertEquals("1", systemAdmin.getNewestOffset(sspUnderTest, 3))
+    assertNull(systemAdmin.getNewestOffset(otherSsp, 3))
+
+    // Add a message to both partitions
+    assertEquals("2", producer.send(new ProducerRecord(TOPIC2, 4, "key3".getBytes, "val3".getBytes)).get().offset().toString)
+    assertEquals("0", producer.send(new ProducerRecord(TOPIC2, 13, "key4".getBytes, "val4".getBytes)).get().offset().toString)
+    assertEquals("2", systemAdmin.getNewestOffset(sspUnderTest, 0))
+    assertEquals("0", systemAdmin.getNewestOffset(otherSsp, 0))
+  }
+
+  @Test (expected = classOf[LeaderNotAvailableException])
+  def testGetNewestOffsetMaxRetry {
+    val expectedRetryCount = 3
+    val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
+    try {
+      systemAdmin.getNewestOffset(new SystemStreamPartition(SYSTEM, "quux", new Partition(0)), 3)
+    } catch {
+      case e: Exception =>
+        assertEquals(expectedRetryCount + 1, systemAdmin.metadataCallCount)
+        throw e
+    }
+  }
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemConsumer.scala
new file mode 100644 (file)
index 0000000..ac37619
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import kafka.api.TopicMetadata
+import kafka.api.PartitionMetadata
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.message.Message
+import kafka.message.MessageAndOffset
+import org.apache.kafka.common.protocol.Errors
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.Partition
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Test
+import org.junit.Assert._
+import org.apache.samza.system.SystemAdmin
+import org.mockito.Mockito._
+import org.mockito.Matchers._
+
+class TestKafkaSystemConsumer {
+  val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
+  private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
+  private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
+  private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
+  private val clientId = "TestClientId"
+
+  @Test
+  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 50) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    assertEquals(1000, consumer.perPartitionFetchThreshold)
+  }
+
+  @Test
+  def testBrokerCreationShouldTriggerStart {
+    val systemName = "test-system"
+    val streamName = "test-stream"
+    val metrics = new KafkaSystemConsumerMetrics
+    // Lie and tell the store that the partition metadata is empty. We can't
+    // use partition metadata because it has Broker in its constructor, which
+    // is package private to Kafka.
+    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE)))
+    var hosts = List[String]()
+    var getHostPortCount = 0
+    val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) {
+      override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = {
+        // Generate a unique host every time getHostPort is called.
+        getHostPortCount += 1
+        Some("localhost-%s" format getHostPortCount, 0)
+      }
+
+      override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+        new BrokerProxy(host, port, systemName, "", metrics, sink) {
+          override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
+            // Skip this since we normally do verification of offsets, which
+            // tries to connect to Kafka. Rather than mock that, just forget it.
+            nextOffsets.size
+          }
+
+          override def start {
+            hosts :+= host
+          }
+        }
+      }
+    }
+
+    consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
+    assertEquals(0, hosts.size)
+    consumer.start
+    assertEquals(List("localhost-1"), hosts)
+    // Should trigger a refresh with a new host.
+    consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
+    assertEquals(List("localhost-1", "localhost-2"), hosts)
+  }
+
+  @Test
+  def testConsumerRegisterOlderOffsetOfTheSamzaSSP {
+    when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod()
+
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000)
+    val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
+    val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
+    val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2))
+
+    consumer.register(ssp0, "0")
+    consumer.register(ssp0, "5")
+    consumer.register(ssp1, "2")
+    consumer.register(ssp1, "3")
+    consumer.register(ssp2, "0")
+
+    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0)))
+    assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
+    assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
+  }
+
+  @Test
+  def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    assertEquals(5000, consumer.perPartitionFetchThreshold)
+    assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
+  }
+
+  @Test
+  def testFetchThresholdBytes {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    val msg = Array[Byte](5, 112, 9, 126)
+    val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
+    // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead
+    consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  msgAndOffset, 887354)
+
+    assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+  }
+
+  @Test
+  def testFetchThresholdBytesDisabled {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    assertEquals(5000, consumer.perPartitionFetchThreshold)
+    assertEquals(0, consumer.perPartitionFetchThresholdBytes)
+    assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+  }
+}
+
+class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
+  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemFactory.scala
new file mode 100644 (file)
index 0000000..41a48f3
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.StorageConfig
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+class TestKafkaSystemFactory {
+  @Test
+  def testFailWhenNoSerdeDefined {
+    val producerFactory = new KafkaSystemFactory
+    try {
+      producerFactory.getProducer(
+        "test",
+        new MapConfig(Map[String, String]().asJava),
+        new MetricsRegistryMap)
+      fail("Expected to get a Samza exception.")
+    } catch {
+      case e: SamzaException => None // expected
+      case e: Exception => fail("Expected SamzaException, but got " + e)
+    }
+  }
+
+  @Test
+  def testFailWhenSerdeIsInvalid {
+    val producerFactory = new KafkaSystemFactory
+    val config = new MapConfig(Map[String, String](
+      "streams.test.serde" -> "failme").asJava)
+    try {
+      producerFactory.getProducer(
+        "test",
+        config,
+        new MetricsRegistryMap)
+      fail("Expected to get a Samza exception.")
+    } catch {
+      case e: SamzaException => None // expected
+      case e: Exception => fail("Expected SamzaException, but got " + e)
+    }
+  }
+
+  @Test
+  def testHappyPath {
+    val producerFactory = new KafkaSystemFactory
+    val config = new MapConfig(Map[String, String](
+      "job.name" -> "test",
+      "systems.test.producer.bootstrap.servers" -> "",
+      "systems.test.samza.key.serde" -> "json",
+      "systems.test.samza.msg.serde" -> "json",
+      "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory").asJava)
+    var producer = producerFactory.getProducer(
+      "test",
+      config,
+      new MetricsRegistryMap)
+    assertNotNull(producer)
+    assertTrue(producer.isInstanceOf[KafkaSystemProducer])
+    producer = producerFactory.getProducer(
+      "test",
+      config,
+      new MetricsRegistryMap)
+    assertNotNull(producer)
+    assertTrue(producer.isInstanceOf[KafkaSystemProducer])
+  }
+
+  @Test
+  def testInjectedProducerProps {
+    val configMap = Map[String, String](
+      StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
+      StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
+      StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
+    val config = new MapConfig(configMap.asJava)
+    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
+    assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
+    assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
+  }
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestKafkaSystemProducer.scala
new file mode 100644 (file)
index 0000000..16a1287
--- /dev/null
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.errors.{RecordTooLargeException, SerializationException, TimeoutException}
+import org.apache.kafka.test.MockSerializer
+import org.apache.samza.system.kafka.MockKafkaProducer
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducerException, SystemStream}
+import org.junit.Assert._
+import org.junit.Test
+import org.scalatest.Assertions.intercept
+
+
+class TestKafkaSystemProducer {
+  val systemStream = new SystemStream("testSystem", "testStream")
+  val someMessage = new OutgoingMessageEnvelope(systemStream, "test".getBytes)
+
+  @Test
+  def testKafkaProducer {
+    val mockProducer = new MockProducer(true, new MockSerializer, new MockSerializer)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start
+    systemProducer.send("test", someMessage)
+    assertEquals(1, systemProducer.producerRef.get().asInstanceOf[MockProducer[Array[Byte], Array[Byte]]].history().size())
+    systemProducer.stop
+  }
+
+  @Test
+  def testKafkaProducerUsingMockKafkaProducer {
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", someMessage)
+    assertEquals(1, mockProducer.getMsgsSent)
+    systemProducer.stop()
+  }
+
+  @Test
+  def testKafkaProducerBufferedSend {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producerMetrics = new KafkaSystemProducerMetrics
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = producerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    assertEquals(1, mockProducer.getMsgsSent)
+
+    val sendThread: Thread = mockProducer.startDelayedSendThread(2000)
+    sendThread.join()
+
+    assertEquals(3, mockProducer.getMsgsSent)
+    systemProducer.stop()
+  }
+
+  @Test
+  def testKafkaProducerFlushSuccessful {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    assertEquals(1, mockProducer.getMsgsSent)
+    mockProducer.startDelayedSendThread(2000)
+    systemProducer.flush("test")
+    assertEquals(3, mockProducer.getMsgsSent)
+    systemProducer.stop()
+  }
+
+  @Test
+  def testKafkaProducerFlushWithException {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics())
+    systemProducer.register("test")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    systemProducer.send("test", msg3)
+    systemProducer.send("test", msg4)
+
+    assertEquals(1, mockProducer.getMsgsSent)
+
+    mockProducer.startDelayedSendThread(2000)
+    val thrown = intercept[SystemProducerException] {
+      systemProducer.flush("test")
+    }
+    assertTrue(thrown.isInstanceOf[SystemProducerException])
+    assertEquals(3, mockProducer.getMsgsSent) // msg1, msg2 and msg4 will be sent
+    systemProducer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithRetriableException {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => mockProducer,
+      metrics = producerMetrics)
+
+    producer.register("test")
+    producer.start()
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    producer.send("test", msg3)
+    producer.flush("test")
+
+    mockProducer.setErrorNext(true, true, new TimeoutException())
+
+    producer.send("test", msg4)
+    val thrown = intercept[SystemProducerException] {
+      producer.flush("test")
+    }
+    assertTrue(thrown.isInstanceOf[SystemProducerException])
+    assertTrue(thrown.getCause.getCause.isInstanceOf[TimeoutException])
+    assertEquals(3, mockProducer.getMsgsSent)
+    producer.stop()
+  }
+
+  /**
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Record the original exception
+    * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+    *
+    * Assumptions:
+    * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+    * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+    * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+    * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+    *
+    * Conclusions:
+    * It is only safe to handle the async exceptions from by closing the producer and failing the container.
+    * This prevents race conditons with setting/clearing exceptions and recreating the producer that could cause data
+    * loss by checkpointing a failed offset.
+    *
+    * Inaccuracies:
+    * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+    * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+    * conditions where the batches align perfectly around the failed send().
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+                                           getProducer = () => {
+                                             mockProducer.open() // A new producer would not already be closed, so reset it.
+                                             mockProducer
+                                           },
+                                           metrics = producerMetrics)
+    producer.register("test")
+    producer.start()
+
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    val senderException = intercept[SystemProducerException] {
+      producer.send("test", msg4) // Should fail because the producer is closed.
+    }
+    assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException = intercept[SystemProducerException] {
+       producer.flush("test") // Should throw the callback exception
+    }
+    assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val postFlushException = intercept[SystemProducerException] {
+      producer.send("test", msg5) // Should not be able to send again after flush
+    }
+    assertTrue(postFlushException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException2 = intercept[SystemProducerException] {
+      producer.flush("test") // Should rethrow the exception
+    }
+    assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+    assertEquals(2, mockProducer.getMsgsSent) // only the messages before the error get sent
+    producer.stop()
+  }
+
+  /**
+    * Recapping from [[testKafkaProducerWithFatalExceptions]]:
+    *
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Record the original exception
+    * 3. Throw the exception every time a KafkaSystemProducer method is invoked until the container fails.
+    *
+    * This test focuses on point 3. Particularly it ensures that the failures are handled properly across multiple sources
+    * which share the same producer.
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsMultipleSources {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+    val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer would not already be closed, so reset it.
+        mockProducer
+      },
+      metrics = producerMetrics)
+    producer.register("test1")
+    producer.register("test2")
+
+    producer.start()
+
+    // Initial sends
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+
+    // Inject error for next send
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test1", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    // Subsequent sends
+    val senderException = intercept[SystemProducerException] {
+      producer.send("test1", msg4) // Should fail because the producer is closed.
+    }
+    assertTrue(senderException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException = intercept[SystemProducerException] {
+      producer.send("test2", msg4) // First send from separate source gets a producer closed exception
+    }
+    assertTrue(callbackException.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    val callbackException2 = intercept[SystemProducerException] {
+      producer.send("test2", msg5) // Second send should still get the error
+    }
+    assertTrue(callbackException2.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    // Flushes
+    val callbackException3 = intercept[SystemProducerException] {
+      producer.flush("test2") // Should rethrow the closed exception in flush
+    }
+    assertTrue(callbackException3.isInstanceOf[SystemProducerException])
+    assertTrue(callbackException3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+    intercept[SystemProducerException] {
+      producer.send("test2", msg6) // Should still not be able to send after flush
+    }
+
+    val thrown3 = intercept[SystemProducerException] {
+      producer.flush("test1") // Should throw the callback exception
+    }
+    assertTrue(thrown3.isInstanceOf[SystemProducerException])
+    assertTrue(thrown3.getCause.getCause.isInstanceOf[RecordTooLargeException])
+
+    intercept[SystemProducerException] {
+      producer.send("test1", msg7) // Should still not be able to send after flush
+    }
+
+    intercept[SystemProducerException] {
+      producer.flush("test1") // Should throw the callback exception
+    }
+    assertEquals(2, mockProducer.getMsgsSent)
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithNonFatalExceptionsMultipleSources {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName = "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer would not already be closed, so reset it.
+        mockProducer
+      },
+      metrics = producerMetrics)
+    producer.register("test1")
+    producer.register("test2")
+    producer.start()
+
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+    mockProducer.setErrorNext(true, false, new SerializationException())
+    val sendException = intercept[SystemProducerException] {
+      producer.send("test1", msg3) // User-thread exception
+    }
+    assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+    assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    producer.send("test1", msg3) // Should be able to resend msg3
+    producer.send("test2", msg4) // Second source should not be affected
+
+    producer.flush("test1") // Flush should be unaffected
+
+    producer.send("test1", msg5) // Should be able to send again after flush
+
+    assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+    producer.stop()
+  }
+
+  /**
+    * If there's an exception and the user configured task.drop.producer.errors, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Recreate the producer.
+    * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+    *
+    * Assumptions:
+    * 1. SystemProducer.flush() can happen concurrently with SystemProducer.send() for a particular TaskInstance (task.async.commit)
+    * 2. SystemProducer.flush() cannot happen concurrently with itself for a particular task instance
+    * 3. Any exception thrown from SystemProducer.flush() will prevent the checkpointing and fail the container
+    * 4. A single KafkaProducer is shared by all the tasks so any failure from one task can affect the others.
+    *
+    * Conclusions:
+    * If the user is ok with dropping messages for the sake of availability, we will swallow all exceptions and
+    * recreate the producer to recover. There are no guarantees how many messages are lost, but the send-failed metric
+    * should be accurate, so users should alert on that.
+    *
+    * Inaccuracies:
+    * A real kafka producer succeeds or fails all the messages in a batch. In other words, the messages of a batch all
+    * fail or they all succeed together. This test, however, fails individual callbacks in order to test boundary
+    * conditions where the batches align perfectly around the failed send().
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer would not already be closed, so reset it.
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test")
+    producer.start()
+
+    producer.send("test", msg1)
+    producer.send("test", msg2)
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertNull(producer.producerRef.get())
+    assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)
+
+    producer.send("test", msg4) // Should succeed because the producer recovered.
+    assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get())
+    assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+    producer.flush("test") // Should not throw
+
+    producer.send("test", msg5) // Should be able to send again after flush
+    assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)
+    producer.flush("test")
+
+    assertEquals(4, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+    producer.stop()
+  }
+
+  /**
+    * Recapping from [[testKafkaProducerWithFatalExceptionsDroppingExceptions]]:
+    *
+    * If there's an exception, we should:
+    * 1. Close the producer (from the one-and-only kafka send thread) to prevent subsequent sends from going out of order.
+    * 2. Recreate the producer.
+    * 3. Ignore any messages that were dropped (user knows they're signing up for this if they enable the option)
+    *
+    * This test ensures that the failures are handled properly across multiple sources
+    * which share the same producer.
+    */
+  @Test
+  def testKafkaProducerWithFatalExceptionsMultipleSourcesDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val msg6 = new OutgoingMessageEnvelope(systemStream, "f".getBytes)
+    val msg7 = new OutgoingMessageEnvelope(systemStream, "g".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName =  "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer would not already be closed, so reset it.
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test1")
+    producer.register("test2")
+
+    producer.start()
+
+    // Initial sends
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+
+    // Inject error for next send
+    mockProducer.setErrorNext(true, true, new RecordTooLargeException())
+    producer.send("test1", msg3) // Callback exception
+    assertTrue(mockProducer.isClosed)
+    assertNull(producer.producerRef.get())
+    assertEquals("Should not have created a new producer", 1, mockProducer.getOpenCount)
+
+    // Subsequent sends
+    producer.send("test1", msg4) // Should succeed because the producer recovered.
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should have created a new producer", 2, mockProducer.getOpenCount)
+    assertNotNull(producer.producerRef.get())
+    producer.send("test2", msg5) // Second source should also not have any error.
+    assertEquals("Should not have created a new producer", 2, mockProducer.getOpenCount)
+
+    // Flushes
+    producer.flush("test2") // Should not throw for test2
+    producer.send("test2", msg6) // Should still work after flush
+
+    producer.flush("test1") // Should not throw for test1 either
+    producer.send("test1", msg7)
+
+    assertEquals(6, mockProducer.getMsgsSent) // every message except the one with the error should get sent
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerWithNonFatalExceptionsMultipleSourcesDroppingExceptions {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(systemStream, "d".getBytes)
+    val msg5 = new OutgoingMessageEnvelope(systemStream, "e".getBytes)
+    val producerMetrics = new KafkaSystemProducerMetrics()
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val producer = new KafkaSystemProducer(systemName = "test",
+      getProducer = () => {
+        mockProducer.open() // A new producer would not already be closed, so reset it.
+        mockProducer
+      },
+      metrics = producerMetrics,
+      dropProducerExceptions = true)  // Here's where we enable exception dropping.
+    producer.register("test1")
+    producer.register("test2")
+    producer.start()
+
+    producer.send("test1", msg1)
+    producer.send("test2", msg2)
+    mockProducer.setErrorNext(true, false, new SerializationException())
+    val sendException = intercept[SystemProducerException] {
+      producer.send("test1", msg3) // User-thread exception
+    }
+    assertTrue(sendException.getCause.isInstanceOf[SerializationException])
+    assertFalse(mockProducer.isClosed)
+    assertNotNull(producer.producerRef.get()) // Synchronous error; producer should not be recreated
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+
+    producer.send("test1", msg3) // Should be able to resend msg3
+    assertFalse(mockProducer.isClosed)
+    assertEquals("Should NOT have created a new producer", 1, mockProducer.getOpenCount)
+    assertNotNull(producer.producerRef.get())
+    producer.send("test2", msg4) // Second source should not be affected
+
+    producer.flush("test1") // Flush should be unaffected
+
+    producer.send("test1", msg5) // Should be able to send again after flush
+
+    assertEquals(5, mockProducer.getMsgsSent) // only the messages before the error get sent
+    producer.stop()
+  }
+
+  @Test
+  def testKafkaProducerFlushMsgsWhenStop {
+    val msg1 = new OutgoingMessageEnvelope(systemStream, "a".getBytes)
+    val msg2 = new OutgoingMessageEnvelope(systemStream, "b".getBytes)
+    val msg3 = new OutgoingMessageEnvelope(systemStream, "c".getBytes)
+    val msg4 = new OutgoingMessageEnvelope(new SystemStream("test2", "test"), "d".getBytes)
+
+    val mockProducer = new MockKafkaProducer(1, "test", 1)
+    val systemProducer = new KafkaSystemProducer(systemName = "test",
+                                                 getProducer = () => mockProducer,
+                                                 metrics = new KafkaSystemProducerMetrics)
+    systemProducer.register("test")
+    systemProducer.register("test2")
+    systemProducer.start()
+    systemProducer.send("test", msg1)
+
+    mockProducer.setShouldBuffer(true)
+    systemProducer.send("test", msg2)
+    systemProducer.send("test", msg3)
+    systemProducer.send("test2", msg4)
+    assertEquals(1, mockProducer.getMsgsSent)
+    mockProducer.startDelayedSendThread(2000)
+    assertEquals(1, mockProducer.getMsgsSent)
+    systemProducer.stop()
+    assertEquals(4, mockProducer.getMsgsSent)
+  }
+
+  @Test
+  def testSystemStreamNameNullOrEmpty {
+    val omeStreamNameNull = new OutgoingMessageEnvelope(new SystemStream("test", null), "a".getBytes)
+    val omeStreamNameEmpty = new OutgoingMessageEnvelope(new SystemStream("test", ""), "a".getBytes)
+    val mockProducer = new MockKafkaProducer(1, "testMock", 1)
+    val producer = new KafkaSystemProducer(systemName = "test", getProducer = () => mockProducer,
+                                           metrics = new KafkaSystemProducerMetrics)
+
+    val thrownNull = intercept[IllegalArgumentException] {
+      producer.register("test1")
+      producer.start()
+      producer.send("testSrc1", omeStreamNameNull)
+      assertEquals(0, mockProducer.getMsgsSent)
+    }
+    val thrownEmpty = intercept[IllegalArgumentException] {
+      producer.register("test2")
+      producer.start()
+      producer.send("testSrc2", omeStreamNameEmpty)
+      assertEquals(0, mockProducer.getMsgsSent)
+    }
+    assertTrue(thrownNull.getMessage() == "Invalid system stream: " + omeStreamNameNull.getSystemStream)
+    assertTrue(thrownEmpty.getMessage() == "Invalid system stream: " + omeStreamNameEmpty.getSystemStream)
+  }
+}
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka_deprecated/TestTopicMetadataCache.scala
new file mode 100644 (file)
index 0000000..dda011b
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.system.kafka_deprecated
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import kafka.api.TopicMetadata
+import org.I0Itec.zkclient.ZkClient
+import org.apache.samza.util.Clock
+import org.apache.samza.util.TopicMetadataStore
+import org.junit.Assert._
+import org.junit.Before
+import org.junit.Test
+import kafka.common.ErrorMapping
+import kafka.api.PartitionMetadata
+import org.apache.kafka.common.protocol.Errors
+
+class TestTopicMetadataCache {
+
+  class MockTime extends Clock {
+    var currentValue = 0
+
+    def currentTimeMillis: Long = currentValue
+  }
+
+  class MockTopicMetadataStore extends TopicMetadataStore {
+    var mockCache = Map(
+      "topic1" -> new TopicMetadata("topic1", List.empty, Errors.NONE),
+      "topic2" -> new TopicMetadata("topic2", List.empty, Errors.NONE))
+    var numberOfCalls: AtomicInteger = new AtomicInteger(0)
+
+    def getTopicInfo(topics: Set[String]) = {
+      var topicMetadata = Map[String, TopicMetadata]()
+      topics.foreach(topic => topicMetadata += topic -> mockCache(topic))
+      numberOfCalls.getAndIncrement
+      topicMetadata
+    }
+
+    def setErrorCode(topic: String, errorCode: Short) {
+      mockCache += topic -> new TopicMetadata(topic, List.empty, Errors.forCode(errorCode))
+    }
+  }
+
+  @Before def setup {
+    TopicMetadataCache.clear
+  }
+
+  @Test
+  def testBasicMetadataCacheFunctionality {
+    val mockStore = new MockTopicMetadataStore
+    val mockTime = new MockTime
+
+    // Retrieve a topic from the cache. Initially cache is empty and store is queried to get the data
+    mockStore.setErrorCode("topic1", 3)
+    var metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(3, metadata("topic1").error.code)
+    assertEquals(1, mockStore.numberOfCalls.get())
+
+    // Retrieve the same topic from the cache which has an error code. Ensure the store is called to refresh the cache
+    mockTime.currentValue = 5
+    mockStore.setErrorCode("topic1", 0)
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").error.code)
+    assertEquals(2, mockStore.numberOfCalls.get())
+
+    // Retrieve the same topic from the cache with refresh rate greater than the last update. Ensure the store is not
+    // called
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").error.code)
+    assertEquals(2, mockStore.numberOfCalls.get())
+
+    // Ensure that refresh happens when refresh rate is less than the last update. Ensure the store is called
+    mockTime.currentValue = 11
+    metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+    assertEquals("topic1", metadata("topic1").topic)
+    assertEquals(0, metadata("topic1").error.code)
+    assertEquals(3, mockStore.numberOfCalls.get())
+  }
+
+  @Test
+  def testMultiThreadedInteractionForTopicMetadataCache {
+    val mockStore = new MockTopicMetadataStore
+    val mockTime = new MockTime
+    val waitForThreadStart = new CountDownLatch(3)
+    val numAssertionSuccess = new AtomicBoolean(true)
+    // Add topic to the cache from multiple threads and ensure the store is called only once
+    val threads = new Array[Thread](3)
+
+    mockTime.currentValue = 17
+    for (i <- 0 until 3) {
+      threads(i) = new Thread(new Runnable {
+        def run {
+          waitForThreadStart.countDown()
+          waitForThreadStart.await()
+          val metadata = TopicMetadataCache.getTopicMetadata(Set("topic1"), "kafka", mockStore.getTopicInfo, 5, mockTime.currentTimeMillis)
+          numAssertionSuccess.compareAndSet(true, metadata("topic1").topic.equals("topic1"))
+          numAssertionSuccess.compareAndSet(true, metadata("topic1").error.code == 0)
+        }
+      })
+      threads(i).start()
+    }
+    for (i <- 0 until 3) {
+      threads(i).join
+    }
+    assertTrue(numAssertionSuccess.get())
+    assertEquals(1, mockStore.numberOfCalls.get())
+  }
+
+  @Test
+  def testBadErrorCodes {
+    val partitionMetadataBad = new PartitionMetadata(0, None, Seq(), error = Errors.LEADER_NOT_AVAILABLE)
+    val partitionMetadataGood = new PartitionMetadata(0, None, Seq(), error = Errors.NONE)
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.REQUEST_TIMED_OUT)))
+    assertTrue(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataBad), Errors.NONE)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List.empty, Errors.NONE)))
+    assertFalse(TopicMetadataCache.hasBadErrorCode(new TopicMetadata("test", List(partitionMetadataGood), Errors.NONE)))
+  }
+}