SAMZA-1135 - Support scala 2.12
authorvjagadish1989 <jvenkatr@linkedin.com>
Fri, 31 Mar 2017 17:26:08 +0000 (10:26 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Fri, 31 Mar 2017 17:26:08 +0000 (10:26 -0700)
    Added support for scala 2.12

    Author: Maxim Logvinenko <mlogvinenko@gmail.com>

    Reviewers: Jagadish <jagadish@apache.org>,Prateek Maheshwari <prateekm@linkedin.com>

    Closes #82 from metamx:scala-2.12

95 files changed:
bin/check-all.sh
gradle/dependency-versions-scala-2.12.gradle [new file with mode: 0644]
gradle/dependency-versions.gradle
samza-api/src/main/java/org/apache/samza/storage/StorageEngine.java
samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
samza-core/src/main/java/org/apache/samza/coordinator/StreamPartitionCountMonitor.java
samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
samza-core/src/main/scala/org/apache/samza/config/factories/PropertiesConfigFactory.scala
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
samza-core/src/main/scala/org/apache/samza/job/local/ProcessJob.scala
samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/JmxReporter.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala
samza-core/src/main/scala/org/apache/samza/serializers/CheckpointSerde.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/main/scala/org/apache/samza/system/StreamMetadataCache.scala
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
samza-core/src/main/scala/org/apache/samza/system/chooser/BootstrappingChooser.scala
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemConsumer.scala
samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
samza-core/src/main/scala/org/apache/samza/util/Util.scala
samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
samza-core/src/test/scala/org/apache/samza/config/TestStorageConfig.scala
samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
samza-core/src/test/scala/org/apache/samza/coordinator/TestStreamPartitionCountMonitor.scala
samza-core/src/test/scala/org/apache/samza/job/TestShellCommandBuilder.scala
samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
samza-core/src/test/scala/org/apache/samza/metrics/reporter/TestJmxReporter.scala
samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointSerde.scala
samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/system/TestRangeSystemStreamPartitionMatcher.scala
samza-core/src/test/scala/org/apache/samza/system/TestRegexSystemStreamPartitionMatcher.scala
samza-core/src/test/scala/org/apache/samza/system/TestStreamMetadataCache.scala
samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestBootstrappingChooser.scala
samza-core/src/test/scala/org/apache/samza/system/chooser/TestDefaultChooser.scala
samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemAdmin.scala
samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemConsumer.scala
samza-core/src/test/scala/org/apache/samza/system/filereader/TestFileReaderSystemFactory.scala
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala

index 8f7ed32..2f9f03c 100755 (executable)
@@ -21,7 +21,7 @@
 
 set -e
 
-SCALAs=( "2.10" "2.11" )
+SCALAs=( "2.10" "2.11" "2.12" )
 JDKs=( "JAVA8_HOME" )
 YARNs=( "2.6.1" "2.7.1" )
 
diff --git a/gradle/dependency-versions-scala-2.12.gradle b/gradle/dependency-versions-scala-2.12.gradle
new file mode 100644 (file)
index 0000000..f3eec81
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+ext {
+  scalaVersion = "2.12"
+  scalaLibVersion = "2.12.1"
+  // Extra options for the compiler:
+  // -feature: Give detailed warnings about language feature use (rather than just 'there were 4 warnings')
+  // -language:implicitConversions: Allow the use of implicit conversions without warning or library import
+  // -language:reflectiveCalls: Allow the automatic use of reflection to access fields without warning or library import
+  scalaOptions = "-feature -language:implicitConversions -language:reflectiveCalls"
+  scalatraVersion = "2.5.0"
+  jettyVersion = "9.2.7.v20150116"
+}
index ed3eba4..62872f8 100644 (file)
   jacksonVersion = "1.9.13"
   junitVersion = "4.8.1"
   mockitoVersion = "1.10.19"
-  scalaTestVersion = "2.2.4"
+  scalaTestVersion = "3.0.1"
   zkClientVersion = "0.8"
   zookeeperVersion = "3.4.6"
   metricsVersion = "2.2.0"
-  kafkaVersion = "0.10.0.1"
+  kafkaVersion = "0.10.1.1"
   commonsHttpClientVersion = "3.1"
   rocksdbVersion = "5.0.1"
   yarnVersion = "2.6.1"
index e30a2ab..a83f8b3 100644 (file)
@@ -58,6 +58,8 @@ public interface StorageEngine {
 
   /**
    * Get store properties
+   *
+   * @return store properties
    */
   StoreProperties getStoreProperties();
 }
index e6db74a..f7b2bcd 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 
 public class TaskConfigJava extends MapConfig {
@@ -116,7 +116,7 @@ public class TaskConfigJava extends MapConfig {
     Set<SystemStream> allInputSS = new HashSet<>();
 
     TaskConfig taskConfig = TaskConfig.Config2Task(this);
-    allInputSS.addAll(JavaConversions.setAsJavaSet(taskConfig.getInputStreams()));
+    allInputSS.addAll(JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
     allInputSS.addAll(getBroadcastSystemStreams());
 
     return Collections.unmodifiableSet(allInputSS);
index 94f02d4..9d1877c 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.task.AsyncRunLoop;
 import org.apache.samza.util.HighResolutionClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 import scala.runtime.AbstractFunction1;
 
 import java.util.concurrent.ExecutorService;
@@ -98,7 +98,7 @@ public class RunLoopFactory {
       log.info("Run loop in asynchronous mode.");
 
       return new AsyncRunLoop(
-        JavaConversions.mapAsJavaMap(taskInstances),
+        JavaConverters.mapAsJavaMapConverter(taskInstances).asJava(),
         threadPool,
         consumerMultiplexer,
         taskMaxConcurrency,
index 8652465..b35cbff 100644 (file)
@@ -34,7 +34,7 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 
 /**
@@ -72,8 +72,13 @@ public class StreamPartitionCountMonitor {
    */
   private static Map<SystemStream, SystemStreamMetadata> getMetadata(Set<SystemStream> streamsToMonitor,
       StreamMetadataCache metadataCache) {
-    return JavaConversions
-        .mapAsJavaMap(metadataCache.getStreamMetadata(JavaConversions.asScalaSet(streamsToMonitor).<SystemStream>toSet(), true));
+    return JavaConverters
+        .mapAsJavaMapConverter(
+            metadataCache.getStreamMetadata(
+                JavaConverters.asScalaSetConverter(streamsToMonitor).asScala().toSet(),
+                true
+            )
+        ).asJava();
   }
 
   /**
index 8e375f1..e5c40df 100644 (file)
@@ -47,7 +47,7 @@ import org.apache.samza.util.Throttleable;
 import org.apache.samza.util.ThrottlingScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 
 /**
@@ -118,7 +118,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       Map<TaskName, TaskInstance> taskInstances, Map<TaskName, AsyncTaskWorker> taskWorkers) {
     Map<SystemStreamPartition, List<AsyncTaskWorker>> sspToWorkerMap = new HashMap<>();
     for (TaskInstance task : taskInstances.values()) {
-      Set<SystemStreamPartition> ssps = JavaConversions.setAsJavaSet(task.systemStreamPartitions());
+      Set<SystemStreamPartition> ssps = JavaConverters.setAsJavaSetConverter(task.systemStreamPartitions()).asJava();
       for (SystemStreamPartition ssp : ssps) {
         sspToWorkerMap.putIfAbsent(ssp, new ArrayList<>());
         sspToWorkerMap.get(ssp).add(taskWorkers.get(task.taskName()));
@@ -355,7 +355,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
      */
     private Set<SystemStreamPartition> getWorkingSSPSet(TaskInstance task) {
 
-      Set<SystemStreamPartition> allPartitions = new HashSet<>(JavaConversions.setAsJavaSet(task.systemStreamPartitions()));
+      Set<SystemStreamPartition> allPartitions = new HashSet<>(JavaConverters.setAsJavaSetConverter(task.systemStreamPartitions()).asJava());
 
       // filter only those SSPs that are not at end of stream.
       Set<SystemStreamPartition> workingSSPSet = allPartitions.stream()
index f22b0ee..03b218e 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.metrics.MetricsReporterFactory;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 /**
  * Helper class that instantiates the MetricsReporter.
@@ -36,7 +36,8 @@ public class MetricsReporterLoader {
 
   public static Map<String, MetricsReporter> getMetricsReporters(MetricsConfig config, String containerName) {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
-    for (String metricsReporterName : JavaConversions.seqAsJavaList(config.getMetricReporterNames())) {
+
+    for (String metricsReporterName : JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava()) {
       String metricsFactoryClass = config.getMetricsFactoryClass(metricsReporterName).get();
       if (metricsFactoryClass == null) {
         throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName));
index 9f4aa54..0d66613 100644 (file)
@@ -26,17 +26,14 @@ import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.config.{JobConfig, ConfigRewriter, Config, StreamConfig}
 import org.apache.samza.container.TaskName
-import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.job.JobRunner._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.util.{Util, CommandLine, Logging}
 import org.apache.samza.{Partition, SamzaException}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.coordinator.JobModelManager
 
-import scala.collection.immutable.HashMap
-
 
 /**
  * Command-line tool for inspecting and manipulating the checkpoints for a job.
@@ -83,18 +80,18 @@ object CheckpointTool {
     var newOffsets: TaskNameToCheckpointMap = null
 
     def parseOffsets(propertiesFile: Config): TaskNameToCheckpointMap = {
-      val taskNameSSPPairs = propertiesFile.entrySet.flatMap(entry => {
-        val matcher = SSP_REGEX.matcher(entry.getKey)
+      val taskNameSSPPairs = propertiesFile.asScala.flatMap { case (key, value) => {
+        val matcher = SSP_REGEX.matcher(key)
         if (matcher.matches) {
           val taskname = new TaskName(matcher.group(1))
           val partition = new Partition(Integer.parseInt(matcher.group(4)))
           val ssp = new SystemStreamPartition(matcher.group(2), matcher.group(3), partition)
-          Some(taskname -> Map(ssp -> entry.getValue))
+          Some(taskname -> Map(ssp -> value))
         } else {
-          warn("Warning: ignoring unrecognised property: %s = %s" format (entry.getKey, entry.getValue))
+          warn("Warning: ignoring unrecognised property: %s = %s" format (key, value))
           None
         }
-      }).toList
+      }}.toList
 
       if(taskNameSSPPairs.isEmpty) {
         return null
@@ -165,7 +162,8 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
       .jobModel
       .getContainers
       .values
-      .flatMap(_.getTasks.keys)
+      .asScala
+      .flatMap(_.getTasks.asScala.keys)
       .toSet
 
     taskNames.foreach(manager.register)
@@ -189,8 +187,9 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
   /** Load the most recent checkpoint state for all a specified TaskName. */
   def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = {
     Option(manager.readLastCheckpoint(taskName))
-            .getOrElse(new Checkpoint(new HashMap[SystemStreamPartition, String]()))
+            .getOrElse(new Checkpoint(new java.util.HashMap[SystemStreamPartition, String]()))
             .getOffsets
+            .asScala
             .toMap
   }
 
@@ -199,7 +198,7 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manage
    * checkpoint for that TaskName
    */
   def writeNewCheckpoint(tn: TaskName, newOffsets: Map[SystemStreamPartition, String]) {
-    val checkpoint = new Checkpoint(newOffsets)
+    val checkpoint = new Checkpoint(newOffsets.asJava)
     manager.writeCheckpoint(tn, checkpoint)
   }
 
index dc99283..783340a 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
 import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
@@ -172,7 +172,7 @@ class OffsetManager(
   val systemStreamPartitions = mutable.Map[TaskName, mutable.Set[SystemStreamPartition]]()
 
   def register(taskName: TaskName, systemStreamPartitionsToRegister: Set[SystemStreamPartition]) {
-    systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()).addAll(systemStreamPartitionsToRegister)
+    systemStreamPartitions.getOrElseUpdate(taskName, mutable.Set[SystemStreamPartition]()) ++= systemStreamPartitionsToRegister
     // register metrics
     systemStreamPartitions.foreach { case (taskName, ssp) => ssp.foreach (ssp => offsetManagerMetrics.addCheckpointedOffset(ssp, "")) }
   }
@@ -224,21 +224,21 @@ class OffsetManager(
       debug("Checkpointing offsets for taskName %s." format taskName)
 
       val sspsForTaskName = systemStreamPartitions.getOrElse(taskName, throw new SamzaException("No such SystemStreamPartition set " + taskName + " registered for this checkpointmanager")).toSet
-      val sspToOffsets = lastProcessedOffsets.getOrElse(taskName, null)
+      val sspToOffsets = lastProcessedOffsets.get(taskName)
       val partitionOffsets = if(sspToOffsets != null) {
-        sspToOffsets.filterKeys(sspsForTaskName.contains(_))
+        sspToOffsets.asScala.filterKeys(sspsForTaskName.contains)
       } else {
         warn(taskName + " is not found... ")
         Map[SystemStreamPartition, String]()
       }
 
-      val checkpoint = new Checkpoint(partitionOffsets)
+      val checkpoint = new Checkpoint(partitionOffsets.asJava)
 
       if(checkpointManager != null) {
         checkpointManager.writeCheckpoint(taskName, checkpoint)
         if(sspToOffsets != null) {
-          sspToOffsets.foreach {
-            case (ssp, cp) => offsetManagerMetrics.checkpointedOffsets(ssp).set(cp)
+          sspToOffsets.asScala.foreach {
+            case (ssp, cp) => offsetManagerMetrics.checkpointedOffsets.get(ssp).set(cp)
           }
         }
       }
@@ -248,7 +248,7 @@ class OffsetManager(
       partitionOffsets.groupBy { case (ssp, _) => ssp.getSystem }.foreach {
         case (systemName:String, offsets: Map[SystemStreamPartition, String]) => {
           // Option is empty if there is no checkpointListener for this systemName
-          checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets))
+          checkpointListeners.get(systemName).foreach(_.onCheckpoint(offsets.asJava))
         }
       }
     } else {
@@ -300,7 +300,7 @@ class OffsetManager(
               }
               info("Checkpointed offset is currently %s for %s" format (offset, systemStreamPartition))
               shouldKeep
-          }))
+          }.asJava))
         }
       }
     } else {
@@ -317,7 +317,7 @@ class OffsetManager(
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
     if (checkpoint != null) {
-      Map(taskName -> checkpoint.getOffsets.toMap)
+      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
     } else {
       info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
 
@@ -337,15 +337,15 @@ class OffsetManager(
         systemStreamPartitions.foreach {
           systemStreamPartition =>
             {
-              val offset = lastProcessedOffsets(taskName).get(systemStreamPartition)
+              val offset = lastProcessedOffsets.get(taskName).get(systemStreamPartition)
               info("Got offset %s for %s, but ignoring, since stream was configured to reset offsets." format (offset, systemStreamPartition))
             }
         }
       }
     }
 
-    lastProcessedOffsets.keys().foreach { taskName =>
-      lastProcessedOffsets.get(taskName).keySet().removeAll(systemStreamPartitionsToReset(taskName))
+    lastProcessedOffsets.keys().asScala.foreach { taskName =>
+      lastProcessedOffsets.get(taskName).keySet().removeAll(systemStreamPartitionsToReset(taskName).asJava)
     }
   }
 
@@ -353,9 +353,9 @@ class OffsetManager(
    * Returns a map of all SystemStreamPartitions in lastProcessedOffsets that need to be reset
    */
   private def getSystemStreamPartitionsToReset(taskNameTosystemStreamPartitions: ConcurrentHashMap[TaskName, ConcurrentHashMap[SystemStreamPartition, String]]): Map[TaskName, Set[SystemStreamPartition]] = {
-    taskNameTosystemStreamPartitions.map {
+    taskNameTosystemStreamPartitions.asScala.map {
       case (taskName, sspToOffsets) => {
-        taskName -> (sspToOffsets.filter {
+        taskName -> (sspToOffsets.asScala.filter {
           case (systemStreamPartition, offset) => {
             val systemStream = systemStreamPartition.getSystemStream
             offsetSettings
@@ -372,14 +372,15 @@ class OffsetManager(
    * SystemStreamPartition, and populate startingOffsets.
    */
   private def loadStartingOffsets {
-    startingOffsets = lastProcessedOffsets.map {
+    startingOffsets = lastProcessedOffsets.asScala.map {
       case (taskName, sspToOffsets) => {
         taskName -> {
-          sspToOffsets.groupBy(_._1.getSystem).flatMap {
+          sspToOffsets.asScala.groupBy(_._1.getSystem).flatMap {
             case (systemName, systemStreamPartitionOffsets) =>
               systemAdmins
                 .getOrElse(systemName, throw new SamzaException("Missing system admin for %s. Need system admin to load starting offsets." format systemName))
-                .getOffsetsAfter(systemStreamPartitionOffsets)
+                .getOffsetsAfter(systemStreamPartitionOffsets.asJava)
+                .asScala
           }
         }
       }
index 4e14097..4e86b7c 100644 (file)
@@ -45,7 +45,7 @@ object JobConfig {
   val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
   val JOB_DEFAULT_SYSTEM = "job.default.system"
   val JOB_CONTAINER_COUNT = "job.container.count"
-  val jOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
+  val JOB_CONTAINER_THREAD_POOL_SIZE = "job.container.thread.pool.size"
   val JOB_CONTAINER_SINGLE_THREAD_MODE = "job.container.single.thread.mode"
   val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
   val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
@@ -53,19 +53,19 @@ object JobConfig {
 
   val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
 
-  val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class";
+  val SSP_MATCHER_CLASS = "job.systemstreampartition.matcher.class"
 
   val SSP_MATCHER_CLASS_REGEX = "org.apache.samza.system.RegexSystemStreamPartitionMatcher"
 
   val SSP_MATCHER_CLASS_RANGE = "org.apache.samza.system.RangeSystemStreamPartitionMatcher"
 
-  val SSP_MATCHER_CONFIG_REGEX = "job.systemstreampartition.matcher.config.regex";
+  val SSP_MATCHER_CONFIG_REGEX = "job.systemstreampartition.matcher.config.regex"
 
-  val SSP_MATCHER_CONFIG_RANGES = "job.systemstreampartition.matcher.config.ranges";
+  val SSP_MATCHER_CONFIG_RANGES = "job.systemstreampartition.matcher.config.ranges"
 
-  val SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "job.systemstreampartition.matcher.config.job.factory.regex";
+  val SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "job.systemstreampartition.matcher.config.job.factory.regex"
 
-  val DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)";
+  val DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX = "org\\.apache\\.samza\\.job\\.local(.*ProcessJobFactory|.*ThreadJobFactory)"
 
   // number of partitions in the checkpoint stream should be 1. But sometimes,
   // if a stream was created(automatically) with the wrong number of partitions(default number of partitions
@@ -177,7 +177,7 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSSPMatcherConfigJobFactoryRegex = getOrElse(JobConfig.SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX, JobConfig.DEFAULT_SSP_MATCHER_CONFIG_JOB_FACTORY_REGEX)
 
-  def getThreadPoolSize = getOption(JobConfig.jOB_CONTAINER_THREAD_POOL_SIZE) match {
+  def getThreadPoolSize = getOption(JobConfig.JOB_CONTAINER_THREAD_POOL_SIZE) match {
     case Some(size) => size.toInt
     case _ => 0
   }
index 8e3eb57..020e3bc 100644 (file)
@@ -18,7 +18,7 @@
  */
 
 package org.apache.samza.config
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object SerializerConfig {
   // serializer config constants
@@ -38,6 +38,6 @@ class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
   import SerializerConfig._
   def getSerdeNames() = {
     val subConf = config.subset(SERIALIZER_PREFIX format "", true)
-    subConf.keys.filter(k => k.endsWith(".class")).map(_.replace(".class", ""))
+    subConf.asScala.keys.filter(k => k.endsWith(".class")).map(_.replace(".class", ""))
   }
 }
index 3785011..10b4d1d 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.config
 
 import java.util.concurrent.TimeUnit
 import org.apache.samza.SamzaException
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 
@@ -72,7 +72,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
 
   def getStoreNames: Seq[String] = {
     val conf = config.subset("stores.", true)
-    conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
+    conf.asScala.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
   }
 
   /**
index 4f2c688..910ae63 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.system.SystemStream
 import org.apache.samza.util.Logging
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object StreamConfig {
   // Samza configs for streams
@@ -83,6 +83,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getSerdeStreams(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     val legacySystemStreams = subConf
+      .asScala
       .keys
       .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => {
@@ -91,6 +92,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
       }).toSet
 
     val systemStreams = subset(StreamConfig.STREAMS_PREFIX)
+      .asScala
       .keys
       .filter(k => k.endsWith(StreamConfig.MSG_SERDE) || k.endsWith(StreamConfig.KEY_SERDE))
       .map(k => k.substring(0, k.length - 16 /* .samza.XXX.serde length */ ))
@@ -116,7 +118,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getStreamProperties(streamId: String) = {
     val allProperties = getAllStreamProperties(streamId)
     val samzaProperties = allProperties.subset(StreamConfig.SAMZA_PROPERTY, false)
-    val filteredStreamProperties:java.util.Map[String, String] = allProperties.filterKeys(k => !samzaProperties.containsKey(k))
+    val filteredStreamProperties:java.util.Map[String, String] = allProperties.asScala.filterKeys(k => !samzaProperties.containsKey(k)).asJava
     new MapConfig(filteredStreamProperties)
   }
 
@@ -243,7 +245,7 @@ class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   private def getStreamIds(): Iterable[String] = {
     // StreamIds are not allowed to have '.' so the first index of '.' marks the end of the streamId.
-    subset(StreamConfig.STREAMS_PREFIX).keys.map(key => key.substring(0, key.indexOf(".")))
+    subset(StreamConfig.STREAMS_PREFIX).asScala.keys.map(key => key.substring(0, key.indexOf(".")))
   }
 
   private def getStreamIdsForSystem(system: String): Iterable[String] = {
index 3295394..aeeb2aa 100644 (file)
@@ -19,7 +19,7 @@
 
 package org.apache.samza.config
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 
 object SystemConfig {
@@ -49,6 +49,6 @@ class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getSystemNames() = {
     val subConf = config.subset("systems.", true)
     // find all .samza.factory keys, and strip the suffix
-    subConf.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
+    subConf.asScala.keys.filter(k => k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
   }
 }
index ea91872..8d43b59 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.config.factories
 import java.io.FileInputStream
 import java.net.URI
 import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.Config
 import org.apache.samza.config.ConfigFactory
 import org.apache.samza.config.MapConfig
@@ -36,14 +36,14 @@ class PropertiesConfigFactory extends ConfigFactory with Logging {
     }
 
     val configPath = configUri.getPath
-    val props = new Properties();
-    val in = new FileInputStream(configPath);
+    val props = new Properties()
+    val in = new FileInputStream(configPath)
 
-    props.load(in);
-    in.close
+    props.load(in)
+    in.close()
 
     debug("got config %s from config %s" format (props, configPath))
 
-    new MapConfig(props.toMap[String, String])
+    new MapConfig(props.asScala.asJava)
   }
 }
index b1ab1e0..8fd5729 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, System
 import org.apache.samza.task.ReadableCoordinator
 import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, TimerUtils}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * The run loop uses a single-threaded execution model: activities for
@@ -50,7 +50,7 @@ class RunLoop (
   private var lastCommitNs = clock()
   private var activeNs = 0L
   @volatile private var shutdownNow = false
-  private val coordinatorRequests: CoordinatorRequests = new CoordinatorRequests(taskInstances.keySet)
+  private val coordinatorRequests: CoordinatorRequests = new CoordinatorRequests(taskInstances.keySet.asJava)
 
   // Messages come from the chooser with no connection to the TaskInstance they're bound for.
   // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
@@ -167,7 +167,7 @@ class RunLoop (
       } else if (!coordinatorRequests.commitRequests.isEmpty){
         trace("Committing due to explicit commit request.")
         metrics.commits.inc
-        coordinatorRequests.commitRequests.foreach(taskName => {
+        coordinatorRequests.commitRequests.asScala.foreach(taskName => {
           taskInstances(taskName).commit
         })
       }
index e43ddfe..96a337c 100644 (file)
@@ -75,7 +75,7 @@ import org.apache.samza.util.SystemClock
 import org.apache.samza.util.Util
 import org.apache.samza.util.Util.asScalaClock
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object SamzaContainer extends Logging {
   val DEFAULT_READ_JOBMODEL_DELAY_MS = 100
@@ -148,7 +148,8 @@ object SamzaContainer extends Logging {
     val inputSystemStreamPartitions = containerModel
       .getTasks
       .values
-      .flatMap(_.getSystemStreamPartitions)
+      .asScala
+      .flatMap(_.getSystemStreamPartitions.asScala)
       .toSet
 
     val inputSystemStreams = inputSystemStreamPartitions
@@ -310,7 +311,7 @@ object SamzaContainer extends Logging {
 
     info("Setting up metrics reporters.")
 
-    val reporters = MetricsReporterLoader.getMetricsReporters(config, containerName).toMap
+    val reporters = MetricsReporterLoader.getMetricsReporters(config, containerName).asScala.toMap
 
     info("Got metrics reporters: %s" format reporters.keys)
 
@@ -401,9 +402,10 @@ object SamzaContainer extends Logging {
     val taskNames = containerModel
       .getTasks
       .values
+      .asScala
       .map(_.getTaskName)
       .toSet
-    val containerContext = new SamzaContainerContext(containerId, config, taskNames)
+    val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava)
 
     // TODO not sure how we should make this config based, or not. Kind of
     // strange, since it has some dynamic directories when used with YARN.
@@ -413,7 +415,7 @@ object SamzaContainer extends Logging {
     val storeWatchPaths = new util.HashSet[Path]()
     storeWatchPaths.add(defaultStoreBaseDir.toPath)
 
-    val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
+    val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
       val taskName = taskModel.getTaskName
@@ -511,6 +513,7 @@ object SamzaContainer extends Logging {
 
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions
+        .asScala
         .toSet
 
       info("Retrieved SystemStreamPartitions " + systemStreamPartitions + " for " + taskName)
@@ -755,7 +758,7 @@ class SamzaContainer(
       taskInstance.startStores
       // Measuring the time to restore the stores
       val timeToRestore = System.currentTimeMillis() - startTime
-      val taskGauge = metrics.taskStoreRestorationMetrics.getOrElse(taskInstance.taskName, null)
+      val taskGauge = metrics.taskStoreRestorationMetrics.asScala.getOrElse(taskInstance.taskName, null)
       if (taskGauge != null) {
         taskGauge.set(timeToRestore)
       }
index e07fcf4..c04776a 100644 (file)
@@ -41,7 +41,7 @@ import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.task.WindowableTask
 import org.apache.samza.util.Logging
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TaskInstance(
   task: Any,
@@ -65,7 +65,7 @@ class TaskInstance(
 
   val context = new TaskContext {
     def getMetricsRegistry = metrics.registry
-    def getSystemStreamPartitions = systemStreamPartitions
+    def getSystemStreamPartitions = systemStreamPartitions.asJava
     def getStore(storeName: String) = if (storageManager != null) {
       storageManager(storeName)
     } else {
index 4122c87..26ca0ed 100644 (file)
@@ -53,7 +53,6 @@ import org.apache.samza.util.Util
 import org.apache.samza.Partition
 import org.apache.samza.SamzaException
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 
 /**
@@ -116,10 +115,10 @@ object JobModelManager extends Logging {
         val extendedSystemAdmins = systemAdmins.filter{
           case (systemName, systemAdmin) => systemAdmin.isInstanceOf[ExtendedSystemAdmin]
         }
-        val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.containsKey(systemStream.getSystem))
+        val inputStreamsToMonitor = config.getInputStreams.filter(systemStream => extendedSystemAdmins.contains(systemStream.getSystem))
         if (inputStreamsToMonitor.nonEmpty) {
           streamPartitionCountMonitor = new StreamPartitionCountMonitor(
-            setAsJavaSet(inputStreamsToMonitor),
+            inputStreamsToMonitor.asJava,
             streamMetadataCache,
             metricsRegistryMap,
             config.getMonitorPartitionChangeFrequency)
@@ -135,11 +134,11 @@ object JobModelManager extends Logging {
       // maxChangelogPartitionId always has the absolute max, not the current
       // max (in case the task with the highest changelog partition mapping
       // disappears.
-      val newChangelogPartitionMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+      val newChangelogPartitionMapping = jobModel.getContainers.asScala.flatMap(_._2.getTasks.asScala).map{case (taskName,taskModel) => {
         taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
-      }}.toMap ++ previousChangelogPartitionMapping
+      }}.toMap ++ previousChangelogPartitionMapping.asScala
       info("Saving task-to-changelog partition mapping: %s" format newChangelogPartitionMapping)
-      changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping)
+      changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
 
       createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
 
@@ -187,6 +186,7 @@ object JobModelManager extends Logging {
         case (systemStream, metadata) =>
           metadata
             .getSystemStreamPartitionMetadata
+            .asScala
             .keys
             .map(new SystemStreamPartition(systemStream, _))
       }.toSet
@@ -201,7 +201,7 @@ object JobModelManager extends Logging {
           case Some(jfr(_*)) => {
             info("before match: allSystemStreamPartitions.size = %s" format (allSystemStreamPartitions.size))
             val sspMatcher = Util.getObj[SystemStreamPartitionMatcher](s)
-            val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions, config).asScala.toSet
+            val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet
             // Usually a small set hence ok to log at info level
             info("after match: matchedPartitions = %s" format (matchedPartitions))
             matchedPartitions
@@ -234,18 +234,18 @@ object JobModelManager extends Logging {
     // Do grouping to fetch TaskName to SSP mapping
     val allSystemStreamPartitions = getMatchedInputStreamPartitions(config, streamMetadataCache)
     val grouper = getSystemStreamPartitionGrouper(config)
-    val groups = grouper.group(allSystemStreamPartitions)
+    val groups = grouper.group(allSystemStreamPartitions.asJava)
     info("SystemStreamPartitionGrouper %s has grouped the SystemStreamPartitions into %d tasks with the following taskNames: %s" format(grouper, groups.size(), groups.keySet()))
 
     // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
     // mapping.
-    var maxChangelogPartitionId = changeLogPartitionMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+    var maxChangelogPartitionId = changeLogPartitionMapping.asScala.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
     // Sort the groups prior to assigning the changelog mapping so that the mapping is reproducible and intuitive
     val sortedGroups = new util.TreeMap[TaskName, util.Set[SystemStreamPartition]](groups)
 
     // Assign all SystemStreamPartitions to TaskNames.
     val taskModels = {
-      sortedGroups.map { case (taskName, systemStreamPartitions) =>
+      sortedGroups.asScala.map { case (taskName, systemStreamPartitions) =>
         val changelogPartition = Option(changeLogPartitionMapping.get(taskName)) match {
           case Some(changelogPartitionId) => new Partition(changelogPartitionId)
           case _ =>
@@ -264,14 +264,14 @@ object JobModelManager extends Logging {
     val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
     val containerGrouper = containerGrouperFactory.build(config)
     val containerModels = {
-      if (containerGrouper.isInstanceOf[BalancingTaskNameGrouper])
-        containerGrouper.asInstanceOf[BalancingTaskNameGrouper].balance(taskModels, localityManager)
-      else
-        containerGrouper.group(taskModels, containerIds)
+      containerGrouper match {
+        case grouper: BalancingTaskNameGrouper => grouper.balance(taskModels.asJava, localityManager)
+        case _ => containerGrouper.group(taskModels.asJava, containerIds)
+      }
     }
-    val containerMap = asScalaSet(containerModels).map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
+    val containerMap = containerModels.asScala.map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
-    new JobModel(config, containerMap, localityManager)
+    new JobModel(config, containerMap.asJava, localityManager)
   }
 
   /**
index 70e5a51..68ad02f 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.util.ClassLoaderHelper
 import org.apache.samza.util.CommandLine
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 
@@ -81,23 +81,22 @@ class JobRunner(config: Config) extends Logging {
     if (resetJobConfig) {
       info("Storing config in coordinator stream.")
       coordinatorSystemProducer.register(JobRunner.SOURCE)
-      coordinatorSystemProducer.start
+      coordinatorSystemProducer.start()
       coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
     }
     info("Loading old config from coordinator stream.")
-    coordinatorSystemConsumer.register
-    coordinatorSystemConsumer.start
-    coordinatorSystemConsumer.bootstrap
-    coordinatorSystemConsumer.stop
+    coordinatorSystemConsumer.register()
+    coordinatorSystemConsumer.start()
+    coordinatorSystemConsumer.bootstrap()
+    coordinatorSystemConsumer.stop()
 
-    val oldConfig = coordinatorSystemConsumer.getConfig()
+    val oldConfig = coordinatorSystemConsumer.getConfig
     if (resetJobConfig) {
-      info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
-      (oldConfig.keySet -- config.keySet).foreach(key => {
-        coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
-      })
+      val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
+      info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
+      keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
     }
-    coordinatorSystemProducer.stop
+    coordinatorSystemProducer.stop()
 
     // Create the actual job, and submit it.
     val job = jobFactory.getJob(config).submit
index ee152f9..2dd21c6 100644 (file)
@@ -24,7 +24,7 @@ import java.io.File
 
 import org.apache.samza.config.ShellCommandConfig
 import org.apache.samza.config.ShellCommandConfig.Config2ShellCommand
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class ShellCommandBuilder extends CommandBuilder {
   def buildCommand() = {
@@ -45,6 +45,6 @@ class ShellCommandBuilder extends CommandBuilder {
       case None => envMap
     }
 
-    envMapWithJavaHome
+    envMapWithJavaHome.asJava
   }
 }
index f5c8a46..59444a3 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.samza.job.util.ProcessKiller
 import org.apache.samza.job.{ApplicationStatus, CommandBuilder, StreamJob}
 import org.apache.samza.util.Logging
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager) extends StreamJob with Logging {
   var jobStatus: Option[ApplicationStatus] = None
@@ -38,7 +38,7 @@ class ProcessJob(commandBuilder: CommandBuilder, jobCoordinator: JobModelManager
   def submit: StreamJob = {
     jobStatus = Some(New)
     val waitForThreadStart = new CountDownLatch(1)
-    val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList)
+    val processBuilder = new ProcessBuilder(commandBuilder.buildCommand.split(" ").toList.asJava)
 
     processBuilder
       .environment
index 64daa0f..6692e35 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.metrics
 
 import java.lang.management.ManagementFactory
 import scala.collection._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.lang.Thread.State._
 import java.util.concurrent.Executors
 import java.util.concurrent.TimeUnit
@@ -97,7 +97,7 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna
     var count = 0l
     var timeMillis = 0l
 
-    gcBeans.foreach(gcBean => {
+    gcBeans.asScala.foreach(gcBean => {
       val c = gcBean.getCollectionCount()
       val t = gcBean.getCollectionTime()
       val gcInfo = getGcInfo(gcBean.getName)
index 63123ff..ed401de 100644 (file)
@@ -31,7 +31,7 @@ import org.apache.samza.metrics.MetricsReporter
 import org.apache.samza.metrics.MetricsReporterFactory
 import org.apache.samza.metrics.ReadableMetricsRegistry
 import org.apache.samza.metrics.ReadableMetricsRegistryListener
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.metrics.MetricsVisitor
 import org.apache.samza.metrics.JmxUtil._
 
@@ -45,8 +45,8 @@ class JmxReporter(server: MBeanServer) extends MetricsReporter with Logging {
       registry.listen(listener)
 
       // Second, add all existing metrics.
-      registry.getGroups.foreach(group => {
-        registry.getGroup(group).foreach {
+      registry.getGroups.asScala.foreach(group => {
+        registry.getGroup(group).asScala.foreach {
           case (name, metric) =>
             metric.visit(new MetricsVisitor {
               def counter(counter: Counter) = registerBean(new JmxCounter(counter, getObjectName(group, name, sources(registry))))
index d7aec8b..8a58cd2 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.metrics.reporter
 import java.util.Collections
 import java.util.HashMap
 import java.util.Map
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object Metrics {
   def fromMap(map: Map[String, Map[String, Object]]): Metrics = {
@@ -36,14 +36,14 @@ object Metrics {
 class Metrics(metrics: Map[String, Map[String, Object]]) {
   val immutableMetrics = new HashMap[String, Map[String, Object]]
 
-  for (groupEntry <- metrics.entrySet) {
+  for ((groupKey, groupValue) <- metrics.asScala) {
     val immutableMetricGroup = new HashMap[String, Object]
 
-    for (metricEntry <- groupEntry.getValue.asInstanceOf[Map[String, Object]].entrySet) {
-      immutableMetricGroup.put(metricEntry.getKey, metricEntry.getValue)
+    for ((metricKey, metricValue) <- groupValue.asScala) {
+      immutableMetricGroup.put(metricKey, metricValue)
     }
 
-    immutableMetrics.put(groupEntry.getKey, Collections.unmodifiableMap(immutableMetricGroup))
+    immutableMetrics.put(groupKey, Collections.unmodifiableMap(immutableMetricGroup))
   }
 
   def get[T](group: String, metricName: String) =
index fb438a4..945ae47 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.metrics.reporter
 
 import java.util.HashMap
 import java.util.Map
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.util.Logging
 import org.apache.samza.metrics.Counter
 import org.apache.samza.metrics.Gauge
@@ -115,10 +115,10 @@ class MetricsSnapshotReporter(
       val metricsMsg = new HashMap[String, Map[String, Object]]
 
       // metrics
-      registry.getGroups.foreach(group => {
+      registry.getGroups.asScala.foreach(group => {
         val groupMsg = new HashMap[String, Object]
 
-        registry.getGroup(group).foreach {
+        registry.getGroup(group).asScala.foreach {
           case (name, metric) =>
             metric.visit(new MetricsVisitor {
               def counter(counter: Counter) = groupMsg.put(name, counter.getCount: java.lang.Long)
index fd8455a..364b88a 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.{SamzaException, Partition}
 import org.codehaus.jackson.map.ObjectMapper
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.codehaus.jackson.`type`.TypeReference
 
 /**
@@ -68,9 +68,9 @@ class CheckpointSerde extends Serde[Checkpoint] with Logging {
         new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
       }
 
-      val cpMap = jMap.values.map(deserializeJSONMap).toMap
+      val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
 
-      return new Checkpoint(cpMap)
+      new Checkpoint(cpMap.asJava)
     }catch {
       case e : Exception =>
         warn("Exception while deserializing checkpoint: " + e)
@@ -83,7 +83,7 @@ class CheckpointSerde extends Serde[Checkpoint] with Logging {
     val offsets = checkpoint.getOffsets
     val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size())
 
-    offsets.foreach {
+    offsets.asScala.foreach {
       case (ssp, offset) =>
         val jMap = new util.HashMap[String, String](4)
         jMap.put("system", ssp.getSystemStream.getSystem)
index 695f53a..491f77b 100644 (file)
@@ -37,7 +37,7 @@ import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import org.apache.samza.util.Clock
 
-import scala.collection.{JavaConversions, Map}
+import scala.collection.JavaConverters._
 
 object TaskStorageManager {
   def getStoreDir(storeBaseDir: File, storeName: String) = {
@@ -313,7 +313,7 @@ class TaskStorageManager(
           systemAdmin.asInstanceOf[ExtendedSystemAdmin].getNewestOffset(new SystemStreamPartition(systemStream.getSystem, systemStream.getStream, partition), 3)
         } else {
           val streamToMetadata = systemAdmins(systemStream.getSystem)
-                  .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
+                  .getSystemStreamMetadata(Set(systemStream.getStream).asJava)
           val sspMetadata = streamToMetadata
                   .get(systemStream.getStream)
                   .getSystemStreamPartitionMetadata
index 918fa53..0dd114c 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.system
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.util.{Clock, SystemClock}
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Caches requests to SystemAdmin.getSystemStreamMetadata for a short while (by default
@@ -64,11 +64,11 @@ class StreamMetadataCache (
           val systemAdmin = systemAdmins
             .getOrElse(systemName, throw new SamzaException("Cannot get metadata for unknown system: %s" format systemName))
           val streamToMetadata = if (partitionsMetadataOnly && systemAdmin.isInstanceOf[ExtendedSystemAdmin]) {
-            systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream), cacheTTLms)
+            systemAdmin.asInstanceOf[ExtendedSystemAdmin].getSystemStreamPartitionCounts(systemStreams.map(_.getStream).asJava, cacheTTLms)
           } else {
-            systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream))
+            systemAdmin.getSystemStreamMetadata(systemStreams.map(_.getStream).asJava)
           }
-          streamToMetadata.map {
+          streamToMetadata.asScala.map {
             case (streamName, metadata) => (new SystemStream(systemName, streamName) -> metadata)
           }
       }
index 17d163d..f1acb15 100644 (file)
 package org.apache.samza.system
 
 
+import java.util
 import java.util.concurrent.TimeUnit
-
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
 import org.apache.samza.util.{Logging, TimerUtils}
-import org.apache.samza.system.chooser.{DefaultChooser, MessageChooser}
+import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 import java.util.ArrayDeque
 import java.util.HashSet
@@ -152,10 +152,11 @@ class SystemConsumers (
 
   def start {
     debug("Starting consumers.")
-    emptySystemStreamPartitionsBySystem ++= unprocessedMessagesBySSP
+    emptySystemStreamPartitionsBySystem.asScala ++= unprocessedMessagesBySSP
       .keySet
+      .asScala
       .groupBy(_.getSystem)
-      .mapValues(systemStreamPartitions => new HashSet(systemStreamPartitions.toSeq))
+      .mapValues(systemStreamPartitions => new util.HashSet(systemStreamPartitions.toSeq.asJava))
 
     consumers
       .keySet
index f71bcfb..b39439d 100644 (file)
 
 package org.apache.samza.system.chooser
 
-import org.apache.samza.system.SystemAdmin
-import org.apache.samza.system.SystemStream
-import org.apache.samza.system.SystemStreamMetadata
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.SamzaException
-import org.apache.samza.util.Logging
 import org.apache.samza.metrics.MetricsHelper
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
-
-import scala.collection.JavaConversions._
+import org.apache.samza.system._
+import org.apache.samza.util.Logging
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /**
@@ -92,7 +87,7 @@ class BootstrappingChooser(
     .flatMap {
       case (systemStream, metadata) =>
         metadata
-          .getSystemStreamPartitionMetadata
+          .getSystemStreamPartitionMetadata.asScala
           .keys
           .map(new SystemStreamPartition(systemStream, _))
     }
@@ -120,7 +115,7 @@ class BootstrappingChooser(
     }
 
     // remove the systemStreamPartitions not registered.
-    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.keys.contains(_))
+    laggingSystemStreamPartitions = laggingSystemStreamPartitions.filter(registeredSystemStreamPartitions.contains(_))
     systemStreamLagCounts = laggingSystemStreamPartitions.groupBy(_.getSystemStream).map {case (systemStream, ssps) => systemStream -> ssps.size}
 
     debug("Starting bootstrapping chooser with bootstrap metadata: %s" format bootstrapStreamMetadata)
index 460d11c..02bfae6 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.samza.system.SystemAdmin
 import org.apache.samza.system.SystemStreamMetadata
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.io.RandomAccessFile
 import scala.util.control.Breaks
 import org.apache.samza.Partition
@@ -41,7 +41,7 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
    * @see getNewestOffsetAndUpcomingOffset(RandomAccessFile)
    */
   def getSystemStreamMetadata(streams: java.util.Set[String]) = {
-    val allMetadata = streams.map(stream => {
+    val allMetadata = streams.asScala.map(stream => {
       val file = new RandomAccessFile(stream, "r")
       val systemStreamPartitionMetadata = file.length match {
         case 0 => new SystemStreamPartitionMetadata(null, null, "0")
@@ -52,13 +52,13 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
       }
       file.close
       val streamPartitionMetadata = Map(new Partition(0) -> systemStreamPartitionMetadata)
-      val systemStreamMetadata = new SystemStreamMetadata(stream, streamPartitionMetadata)
+      val systemStreamMetadata = new SystemStreamMetadata(stream, streamPartitionMetadata.asJava)
       (stream, systemStreamMetadata)
     }).toMap
 
     info("Got metadata: %s" format allMetadata)
 
-    allMetadata
+    allMetadata.asJava
   }
 
   /**
@@ -70,7 +70,7 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
    * we are supposed to only call this method in fully consumed messages.
    */
   def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = {
-    val offsetAfter = offsets.map {
+    val offsetAfter = offsets.asScala.map {
       case (systemStreamPartition, offset) => {
         val file = new RandomAccessFile(systemStreamPartition.getStream, "r")
         val newOffset = findNextEnter(file, offset.toLong, 1) match {
@@ -80,7 +80,7 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
         (systemStreamPartition, newOffset.toString)
       }
     }
-    mapAsJavaMap(offsetAfter)
+    offsetAfter.asJava
   }
 
   /**
index 61dfd33..84dd6b4 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.system.filereader
 
-import org.apache.samza.system.SystemConsumer
 import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.system.SystemStreamPartition
@@ -27,13 +26,9 @@ import scala.collection.mutable.Map
 import java.io.RandomAccessFile
 import org.apache.samza.system.IncomingMessageEnvelope
 import java.util.concurrent.LinkedBlockingQueue
-import org.apache.samza.Partition
-import collection.JavaConversions._
-import scala.collection.mutable.HashMap
 import java.util.concurrent.Executors
 import java.util.concurrent.ExecutorService
 import org.apache.samza.util.DaemonThreadFactory
-import org.apache.samza.SamzaException
 import org.apache.samza.util.Logging
 
 object FileReaderSystemConsumer {
index 6962b22..b0e0f3f 100644 (file)
@@ -25,7 +25,7 @@ import joptsimple.util.KeyValuePair
 import org.apache.samza.config.{ConfigFactory, MapConfig}
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import scala.collection.mutable.Buffer
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Defines a basic set of command-line options for Samza tasks. Tools can use this
@@ -64,10 +64,10 @@ class CommandLine {
     val configFactoryClass = options.valueOf(configFactoryOpt)
     val configPaths = options.valuesOf(configPathOpt)
     configFactory = ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass)
-    val configOverrides = options.valuesOf(configOverrideOpt).map(kv => (kv.key, kv.value)).toMap
+    val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv => (kv.key, kv.value)).toMap
 
-    val configs: Buffer[java.util.Map[String, String]] = configPaths.map(configFactory.getConfig)
-    configs += configOverrides
-    new MapConfig(configs)
+    val configs: Buffer[java.util.Map[String, String]] = configPaths.asScala.map(configFactory.getConfig)
+    configs += configOverrides.asJava
+    new MapConfig(configs.asJava)
   }
 }
index 4a945d2..e7832a0 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.samza.config.SystemConfig
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.SystemConfig.Config2System
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.io.InputStreamReader
 
 
@@ -213,14 +213,14 @@ object Util extends Logging {
   def buildCoordinatorStreamConfig(config: Config) = {
     val (jobName, jobId) = getJobNameAndId(config)
     // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
-    new MapConfig(
-      config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+    val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
       Map[String, String](
         JobConfig.JOB_NAME -> jobName,
         JobConfig.JOB_ID -> jobId,
         JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
         JobConfig.MONITOR_PARTITION_CHANGE -> String.valueOf(config.getMonitorPartitionChange),
-        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency)))
+        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
+    new MapConfig(map.asJava)
   }
 
   /**
@@ -324,7 +324,7 @@ object Util extends Logging {
    * Convert a java map to a Scala map
    * */
   def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
-    javaMap.toMap
+    javaMap.asScala.toMap
   }
 
   /**
@@ -336,9 +336,9 @@ object Util extends Logging {
     val localHost = InetAddress.getLocalHost
     if (localHost.isLoopbackAddress) {
       debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName))
-      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) NetworkInterface.getNetworkInterfaces.toList else NetworkInterface.getNetworkInterfaces.toList.reverse
+      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) NetworkInterface.getNetworkInterfaces.asScala.toList else NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
       for (networkInterface <- networkInterfaces) {
-        val addresses = networkInterface.getInetAddresses.toList.filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
+        val addresses = networkInterface.getInetAddresses.asScala.toList.filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
         if (addresses.nonEmpty) {
           val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
           debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress))
index c246413..58412a7 100644 (file)
@@ -49,7 +49,7 @@ import org.apache.samza.system.TestSystemConsumers;
 import org.junit.Before;
 import org.junit.Test;
 import scala.Option;
-import scala.collection.JavaConversions;
+import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
@@ -107,7 +107,7 @@ public class TestAsyncRunLoop {
 
   TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp, OffsetManager manager, SystemConsumers consumers) {
     TaskInstanceMetrics taskInstanceMetrics = new TaskInstanceMetrics("task", new MetricsRegistryMap());
-    scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConversions.asScalaSet(Collections.singleton(ssp)).toSet();
+    scala.collection.immutable.Set<SystemStreamPartition> sspSet = JavaConverters.asScalaSetConverter(Collections.singleton(ssp)).asScala().toSet();
     return new TaskInstance(task, taskName, mock(Config.class), taskInstanceMetrics,
         null, consumers, mock(TaskInstanceCollector.class), mock(SamzaContainerContext.class),
         manager, null, null, sspSet, new TaskInstanceExceptionHandler(taskInstanceMetrics, new scala.collection.immutable.HashSet<String>()));
index 0865b31..283ccc4 100644 (file)
@@ -30,8 +30,8 @@ import org.junit.{Before, Test}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
-import org.scalatest.mock.MockitoSugar
-import scala.collection.JavaConversions._
+import org.scalatest.mockito.MockitoSugar
+import scala.collection.JavaConverters._
 import org.apache.samza.config.JobConfig
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
 
@@ -70,19 +70,19 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
       SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
       TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
-    ))
+    ).asJava)
     val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
       new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
       new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
-    ))
+    ).asJava)
     TestCheckpointTool.checkpointManager = mock[CheckpointManager]
     TestCheckpointTool.systemAdmin = mock[SystemAdmin]
-    when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
-      .thenReturn(Map("foo" -> metadata))
+    when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo").asJava))
+      .thenReturn(Map("foo" -> metadata).asJava)
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
-      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234")))
+      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
     when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
-      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321")))
+      .thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321").asJava))
   }
 
   @Test
@@ -102,8 +102,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
     val checkpointTool = CheckpointTool(config, toOverwrite)
     checkpointTool.run
     verify(TestCheckpointTool.checkpointManager)
-      .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
+      .writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42").asJava))
     verify(TestCheckpointTool.checkpointManager)
-      .writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43")))
+      .writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava))
   }
 }
index 7c3949f..abfc63f 100644 (file)
@@ -33,7 +33,7 @@ import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.scalatest.Assertions.intercept
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestOffsetManager {
   @Test
@@ -42,9 +42,9 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
+    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest").asJava)
     val offsetManager = OffsetManager(systemStreamMetadata, config)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
@@ -59,7 +59,7 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
@@ -83,7 +83,7 @@ class TestOffsetManager {
     // Should not update null offset
     offsetManager.update(taskName, systemStreamPartition, null)
     offsetManager.checkpoint(taskName)
-    val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
+    val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47").asJava)
     assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
   }
 
@@ -93,7 +93,7 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
@@ -119,13 +119,13 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
+    val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45").asJava)
     val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
     val config = new MapConfig(Map(
       "systems.test-system.samza.offset.default" -> "oldest",
-      "systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
+      "systems.test-system.streams.test-stream.samza.reset.offset" -> "true").asJava)
     val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
     offsetManager.register(taskName, Set(systemStreamPartition))
     offsetManager.start
@@ -148,9 +148,9 @@ class TestOffsetManager {
     val systemStreamPartition2 = new SystemStreamPartition(systemStream, partition2)
     val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(
       partition1 -> new SystemStreamPartitionMetadata("0", "1", "2"),
-      partition2 -> new SystemStreamPartitionMetadata("3", "4", "5")))
+      partition2 -> new SystemStreamPartitionMetadata("3", "4", "5")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45"))
+    val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45").asJava)
     // Checkpoint manager only has partition 1.
     val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
     val systemAdmins = Map("test-system" -> getSystemAdmin)
@@ -185,9 +185,9 @@ class TestOffsetManager {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
+    val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail").asJava)
     intercept[IllegalArgumentException] {
       OffsetManager(systemStreamMetadata, config)
     }
@@ -197,10 +197,9 @@ class TestOffsetManager {
   def testDefaultStreamShouldFailWhenFailIsSpecified {
     val systemStream = new SystemStream("test-system", "test-stream")
     val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
+    val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail").asJava)
 
     intercept[IllegalArgumentException] {
       OffsetManager(systemStreamMetadata, config)
@@ -215,8 +214,7 @@ class TestOffsetManager {
     val partition0 = new Partition(0)
     val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0)
     val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val systemStreamMetadata = Map(systemStream0 -> testStreamMetadata)
+    val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
     val checkpointManager = getCheckpointManager(systemStreamPartition1)
     val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
@@ -232,7 +230,7 @@ class TestOffsetManager {
     val taskName = new TaskName("task-name")
     val ssp = new SystemStreamPartition(new SystemStream("test-system", "test-stream"), new Partition(0))
     val sspm = new SystemStreamPartitionMetadata(null, null, "13")
-    val offsetMeta = new SystemStreamMetadata("test-stream", Map(new Partition(0) -> sspm))
+    val offsetMeta = new SystemStreamMetadata("test-stream", Map(new Partition(0) -> sspm).asJava)
     val settings = new OffsetSetting(offsetMeta, OffsetType.OLDEST, resetOffset = false)
     val offsetManager = new OffsetManager(offsetSettings = Map(ssp.getSystemStream -> settings))
     offsetManager.register(taskName, Set(ssp))
@@ -250,12 +248,12 @@ class TestOffsetManager {
     val partition = new Partition(0)
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
     val systemStreamPartition2 = new SystemStreamPartition(systemStream2, partition)
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
-    val testStreamMetadata2 = new SystemStreamMetadata(systemStream2.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
+    val testStreamMetadata2 = new SystemStreamMetadata(systemStream2.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val systemStreamMetadata = Map(systemStream -> testStreamMetadata, systemStream2->testStreamMetadata2)
     val config = new MapConfig
     val checkpointManager = getCheckpointManager1(systemStreamPartition,
-                                                 new Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> "100")),
+                                                 new Checkpoint(Map(systemStreamPartition -> "45", systemStreamPartition2 -> "100").asJava),
                                                  taskName)
     val systemAdmins = Map(systemName -> getSystemAdmin, systemName2->getSystemAdmin)
     val consumer = new SystemConsumerWithCheckpointCallback
@@ -276,7 +274,7 @@ class TestOffsetManager {
     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("45", consumer.recentCheckpoint.get(systemStreamPartition))
     // make sure only the system with the callbacks gets them
-    assertNull(consumer.recentCheckpoint.getOrElse(systemStreamPartition2, null))
+    assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
 
     offsetManager.update(taskName, systemStreamPartition, "46")
     offsetManager.update(taskName, systemStreamPartition, "47")
@@ -284,7 +282,7 @@ class TestOffsetManager {
     assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("100", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("47", consumer.recentCheckpoint.get(systemStreamPartition))
-    assertNull(consumer.recentCheckpoint.getOrElse(systemStreamPartition2, null))
+    assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
 
     offsetManager.update(taskName, systemStreamPartition, "48")
     offsetManager.update(taskName, systemStreamPartition2, "101")
@@ -292,7 +290,7 @@ class TestOffsetManager {
     assertEquals("48", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition).getValue)
     assertEquals("101", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartition2).getValue)
     assertEquals("48", consumer.recentCheckpoint.get(systemStreamPartition))
-    assertNull(consumer.recentCheckpoint.getOrElse(systemStreamPartition2, null))
+    assertNull(consumer.recentCheckpoint.get(systemStreamPartition2))
     offsetManager.stop
   }
 
@@ -308,12 +306,12 @@ class TestOffsetManager {
                       timeout: Long): util.Map[SystemStreamPartition, util.List[IncomingMessageEnvelope]] = { null }
 
     override def onCheckpoint(offsets: java.util.Map[SystemStreamPartition,String]){
-      recentCheckpoint = recentCheckpoint ++ offsets
+      recentCheckpoint = (recentCheckpoint.asScala ++ offsets.asScala).asJava
     }
   }
 
   private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
-    getCheckpointManager1(systemStreamPartition, new Checkpoint(Map(systemStreamPartition -> "45")), taskName)
+    getCheckpointManager1(systemStreamPartition, new Checkpoint(Map(systemStreamPartition -> "45").asJava), taskName)
   }
 
   private def getCheckpointManager1(systemStreamPartition: SystemStreamPartition, checkpoint: Checkpoint, taskName:TaskName = new TaskName("taskName")) = {
@@ -330,17 +328,17 @@ class TestOffsetManager {
       def stop { isStopped = true }
 
       // Only for testing purposes - not present in actual checkpoint manager
-      def getOffets = Map(taskName -> mapAsScalaMap(checkpoint.getOffsets()).toMap)
+      def getOffets = Map(taskName -> checkpoint.getOffsets.asScala.toMap)
     }
   }
 
   private def getSystemAdmin = {
     new SystemAdmin {
       def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
-        offsets.mapValues(offset => (offset.toLong + 1).toString)
+        offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
 
       def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
-        Map[String, SystemStreamMetadata]()
+        Map[String, SystemStreamMetadata]().asJava
 
       override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
         new UnsupportedOperationException("Method not implemented.")
index 4ca738e..f7839af 100644 (file)
@@ -20,7 +20,7 @@
 package org.apache.samza.checkpoint.file
 
 import java.io.File
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.util.Random
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -55,7 +55,7 @@ class TestFileSystemCheckpointManager  {
     val cp = new Checkpoint(Map(
       new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
       new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
-      new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
+      new SystemStreamPartition("b", "d", new Partition(2)) -> "e").asJava)
 
     var readCp:Checkpoint = null
     val cpm =  new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
index 8284b3a..e279639 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.config
 
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.StorageConfig._
 import org.junit.Assert.assertFalse
 import org.junit.Assert.assertTrue
@@ -36,7 +36,7 @@ class TestStorageConfig {
       FACTORY.format("store1") -> "some.factory.Class",
       CHANGELOG_STREAM.format("store1") -> "system1.stream1",
       FACTORY.format("store2") -> "some.factory.Class")
-    val config = new MapConfig(configMap)
+    val config = new MapConfig(configMap.asJava)
     assertFalse(config.isChangelogSystem("system3"))
     assertFalse(config.isChangelogSystem("system2"))
     assertTrue(config.isChangelogSystem("system1"))
@@ -51,7 +51,7 @@ class TestStorageConfig {
       CHANGELOG_STREAM.format("store2") -> "stream2",
       CHANGELOG_STREAM.format("store4") -> "stream4",
       FACTORY.format("store2") -> "some.factory.Class")
-    val config = new MapConfig(configMap)
+    val config = new MapConfig(configMap.asJava)
     assertFalse(config.isChangelogSystem("system3"))
     assertTrue(config.isChangelogSystem("system2"))
     assertTrue(config.isChangelogSystem("system1"))
@@ -60,7 +60,7 @@ class TestStorageConfig {
     assertEquals("system2.stream2", config.getChangelogStream("store2").getOrElse(""));
 
     val configMapErr = Map[String, String](CHANGELOG_STREAM.format("store4")->"stream4")
-    val configErr = new MapConfig(configMapErr)
+    val configErr = new MapConfig(configMapErr.asJava)
 
     try {
       configErr.getChangelogStream("store4").getOrElse("")
index 0304fc8..7533da3 100644 (file)
@@ -39,7 +39,7 @@ import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 import org.scalatest.{Matchers => ScalaTestMatchers}
 
 class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMatchers {
index a72a59a..bb72b72 100644 (file)
@@ -57,23 +57,23 @@ import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.junit.AssertionsForJUnit
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
   @Test
   def testReadJobModel {
-    val config = new MapConfig(Map("a" -> "b"))
+    val config = new MapConfig(Map("a" -> "b").asJava)
     val offsets = new util.HashMap[SystemStreamPartition, String]()
     offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
     val tasks = Map(
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, tasks),
-      Integer.valueOf(1) -> new ContainerModel(1, tasks))
-    val jobModel = new JobModel(config, containers)
+      Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
+    val jobModel = new JobModel(config, containers.asJava)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
@@ -89,16 +89,16 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
 
   @Test
   def testReadJobModelWithTimeouts {
-    val config = new MapConfig(Map("a" -> "b"))
+    val config = new MapConfig(Map("a" -> "b").asJava)
     val offsets = new util.HashMap[SystemStreamPartition, String]()
     offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
     val tasks = Map(
       new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets.keySet(), new Partition(0)),
       new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets.keySet(), new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, tasks),
-      Integer.valueOf(1) -> new ContainerModel(1, tasks))
-    val jobModel = new JobModel(config, containers)
+      Integer.valueOf(0) -> new ContainerModel(0, tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel(1, tasks.asJava))
+    val jobModel = new JobModel(config, containers.asJava)
     def jobModelGenerator(): JobModel = jobModel
     val server = new HttpServer
     val coordinator = new JobModelManager(jobModel, server)
@@ -116,7 +116,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
 
   @Test
   def testChangelogPartitions {
-    val config = new MapConfig(Map("a" -> "b"))
+    val config = new MapConfig(Map("a" -> "b").asJava)
     val offsets = new util.HashMap[SystemStreamPartition, String]()
     offsets.put(new SystemStreamPartition("system", "stream", new Partition(0)), "1")
     val tasksForContainer1 = Map(
@@ -126,12 +126,12 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       new TaskName("t3") -> new TaskModel(new TaskName("t3"), offsets.keySet(), new Partition(2)),
       new TaskName("t4") -> new TaskModel(new TaskName("t4"), offsets.keySet(), new Partition(3)),
       new TaskName("t5") -> new TaskModel(new TaskName("t6"), offsets.keySet(), new Partition(4)))
-    val containerModel1 = new ContainerModel(0, tasksForContainer1)
-    val containerModel2 = new ContainerModel(1, tasksForContainer2)
+    val containerModel1 = new ContainerModel(0, tasksForContainer1.asJava)
+    val containerModel2 = new ContainerModel(1, tasksForContainer2.asJava)
     val containers = Map(
       Integer.valueOf(0) -> containerModel1,
       Integer.valueOf(1) -> containerModel2)
-    val jobModel = new JobModel(config, containers)
+    val jobModel = new JobModel(config, containers.asJava)
     assertEquals(jobModel.maxChangeLogStreamPartitions, 5)
   }
 
@@ -179,7 +179,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -238,7 +238,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -287,7 +287,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       Map[String, SystemProducer](),
       new SerdeManager)
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
     val mockTaskStorageManager = mock[TaskStorageManager]
 
     when(mockTaskStorageManager.init).thenAnswer(new Answer[String] {
@@ -333,7 +333,7 @@ class MockCheckpointManager extends CheckpointManager {
 
   override def register(taskName: TaskName): Unit = {}
 
-  override def readLastCheckpoint(taskName: TaskName): Checkpoint = { new Checkpoint(Map[SystemStreamPartition, String]()) }
+  override def readLastCheckpoint(taskName: TaskName): Checkpoint = { new Checkpoint(Map[SystemStreamPartition, String]().asJava) }
 
   override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint): Unit = { }
 }
index 7e35525..086eb85 100644 (file)
@@ -43,7 +43,7 @@ import org.apache.samza.task._
 import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.Assertions.intercept
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.system.SystemAdmin
 import scala.collection.mutable.ListBuffer
 
@@ -66,11 +66,11 @@ class TestTaskInstance {
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
     val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
-    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
     val taskInstance: TaskInstance = new TaskInstance(
       task,
       taskName,
@@ -148,7 +148,7 @@ class TestTaskInstance {
     val ignoredExceptions = classOf[TroublesomeException].getName + "," +
       classOf[NonFatalException].getName
     val config = new MapConfig(Map[String, String](
-      "task.ignored.exceptions" -> ignoredExceptions))
+      "task.ignored.exceptions" -> ignoredExceptions).asJava)
 
     val partition = new Partition(0)
     val consumerMultiplexer = new SystemConsumers(
@@ -161,11 +161,11 @@ class TestTaskInstance {
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
     val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
-    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -205,7 +205,7 @@ class TestTaskInstance {
   def testIgnoreAllExceptions {
     val task = new TroublesomeTask
     val config = new MapConfig(Map[String, String](
-      "task.ignored.exceptions" -> "*"))
+      "task.ignored.exceptions" -> "*").asJava)
 
     val partition = new Partition(0)
     val consumerMultiplexer = new SystemConsumers(
@@ -218,11 +218,11 @@ class TestTaskInstance {
     val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
     val systemStreamPartitions = Set(systemStreamPartition)
     // Pretend our last checkpointed (next) offset was 2.
-    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
+    val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
     val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
     val taskName = new TaskName("taskName")
     val collector = new TaskInstanceCollector(producerMultiplexer)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
 
     val registry = new MetricsRegistryMap
     val taskMetrics = new TaskInstanceMetrics(registry = registry)
@@ -281,7 +281,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("Offset Reset Task 0")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
 
     val offsetManager = new OffsetManager()
 
@@ -316,7 +316,7 @@ class TestTaskInstance {
     val metrics = new TaskInstanceMetrics()
     val taskName = new TaskName("testing")
     val collector = new TaskInstanceCollector(producers)
-    val containerContext = new SamzaContainerContext(0, config, Set[TaskName](taskName))
+    val containerContext = new SamzaContainerContext(0, config, Set(taskName).asJava)
     val offsetManager = new OffsetManager()
     offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
     val systemAdmins = Map("system" -> new MockSystemAdmin)
index 8f02c78..e7397bb 100644 (file)
@@ -30,7 +30,7 @@ import org.junit.After
 import org.junit.Test
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.TaskConfig
 import org.apache.samza.config.SystemConfig
@@ -74,13 +74,13 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
     val container0Tasks = Map(
-      task0Name -> new TaskModel(task0Name, checkpoint0.keySet, new Partition(4)),
-      task2Name -> new TaskModel(task2Name, checkpoint2.keySet, new Partition(5)))
+      task0Name -> new TaskModel(task0Name, checkpoint0.keySet.asJava, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, checkpoint2.keySet.asJava, new Partition(5)))
     val container1Tasks = Map(
-      task1Name -> new TaskModel(task1Name, checkpoint1.keySet, new Partition(3)))
+      task1Name -> new TaskModel(task1Name, checkpoint1.keySet.asJava, new Partition(3)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
-      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
 
 
     // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
@@ -111,15 +111,15 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     // We want the mocksystemconsumer to use the same instance across runs
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
-    val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs))
-    val expectedJobModel = new JobModel(new MapConfig(config), containers)
+    val coordinator = JobModelManager(new MapConfig((config ++ otherConfigs).asJava))
+    val expectedJobModel = new JobModel(new MapConfig(config.asJava), containers.asJava)
 
     // Verify that the atomicReference is initialized
     assertNotNull(JobModelManager.jobModelRef.get())
     assertEquals(expectedJobModel, JobModelManager.jobModelRef.get())
 
     coordinator.start
-    assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
+    assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
     assertEquals(expectedJobModel, coordinator.jobModel)
 
     // Verify that the JobServlet is serving the correct jobModel
@@ -146,13 +146,13 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     // Construct the expected JobModel, so we can compare it to
     // JobCoordinator's JobModel.
     val container0Tasks = Map(
-      task0Name -> new TaskModel(task0Name, ssp0, new Partition(4)),
-      task2Name -> new TaskModel(task2Name, ssp1, new Partition(5)))
+      task0Name -> new TaskModel(task0Name, ssp0.asJava, new Partition(4)),
+      task2Name -> new TaskModel(task2Name, ssp1.asJava, new Partition(5)))
     val container1Tasks = Map(
-      task1Name -> new TaskModel(task1Name, ssp1, new Partition(3)))
+      task1Name -> new TaskModel(task1Name, ssp1.asJava, new Partition(3)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
-      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava),
+      Integer.valueOf(1) -> new ContainerModel(1, container1Tasks.asJava))
 
     val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
 
@@ -177,7 +177,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
 
     // start the job coordinator and verify if it has all the checkpoints through http port
-    val coordinator = JobModelManager(new MapConfig(config ++ otherConfigs))
+    val coordinator = JobModelManager(new MapConfig((config ++ otherConfigs).asJava))
     coordinator.start
     val url = coordinator.server.getUrl.toString
 
@@ -209,12 +209,10 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val task1Name = new TaskName("Partition 1")
     val task2Name = new TaskName("Partition 2")
     val container0Tasks = Map(
-      task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0)))
-    val container1Tasks = Map(
-      task2Name -> new TaskModel(task2Name, Set(new SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5)))
+      task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks))
-    val jobModel = new JobModel(config, containers)
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+    val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)
   }
@@ -232,11 +230,11 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     val task1Name = new TaskName("Partition 1")
 
     val container0Tasks = Map(
-      task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(0)))
+      task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))).asJava, new Partition(0)))
 
     val containers = Map(
-      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks))
-    val jobModel = new JobModel(config, containers)
+      Integer.valueOf(0) -> new ContainerModel(0, container0Tasks.asJava))
+    val jobModel = new JobModel(config, containers.asJava)
     assertEquals(config, coordinator.jobModel.getConfig)
     assertEquals(jobModel, coordinator.jobModel)
   }
@@ -279,13 +277,13 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
       JobConfig.SSP_MATCHER_CONFIG_REGEX -> "[1]",
       SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
-      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName))
+      SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName).asJava)
     config
   }
 
   def extractChangelogPartitionMapping(url : String) = {
     val jobModel = SamzaContainer.readJobModel(url.toString)
-    val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values())
+    val taskModels = jobModel.getContainers.values().asScala.flatMap(_.getTasks.values().asScala)
     taskModels.map{taskModel => {
       taskModel.getTaskName -> taskModel.getChangelogPartition.getPartitionId
     }}.toMap
@@ -318,7 +316,7 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
       new Partition(1) -> new SystemStreamPartitionMetadata(null, null, null),
       // Create a new Partition(2), which wasn't in the prior changelog mapping.
       new Partition(2) -> new SystemStreamPartitionMetadata(null, null, null))
-    Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata))
+    Map(streamNames.asScala.toList.head -> new SystemStreamMetadata("foo", partitionMetadata.asJava)).asJava
   }
 
   override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
@@ -338,16 +336,16 @@ class MockSystemAdmin extends ExtendedSystemAdmin {
   override def getSystemStreamPartitionCounts(streamNames: util.Set[String],
                                               cacheTTL: Long): util.Map[String, SystemStreamMetadata] = {
     assertEquals(1, streamNames.size())
-    val result = streamNames.map {
+    val result = streamNames.asScala.map {
       stream =>
         val partitionMetadata = Map(
           new Partition(0) -> new SystemStreamPartitionMetadata("", "", ""),
           new Partition(1) -> new SystemStreamPartitionMetadata("", "", ""),
           new Partition(2) -> new SystemStreamPartitionMetadata("", "", "")
         )
-        stream -> new SystemStreamMetadata(stream, partitionMetadata)
+        stream -> new SystemStreamMetadata(stream, partitionMetadata.asJava)
     }.toMap
-    result
+    result.asJava
   }
 
   override def getNewestOffset(ssp: SystemStreamPartition, maxRetries: Integer) = null
index 99ee0bd..7d15fd8 100644 (file)
@@ -31,9 +31,9 @@ import org.mockito.Matchers
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.junit.AssertionsForJUnit
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 
 
 class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSugar {
@@ -71,7 +71,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val metrics = new MetricsRegistryMap()
 
     val partitionCountMonitor = new StreamPartitionCountMonitor(
-      JavaConversions.setAsJavaSet(inputSystemStreamSet),
+      inputSystemStreamSet.asJava,
       mockMetadataCache,
       metrics,
       5
@@ -95,7 +95,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val inputSystemStream = new SystemStream("test-system", "test-stream")
     val inputSystemStreamSet = Set[SystemStream](inputSystemStream)
     val monitor = new StreamPartitionCountMonitor(
-      JavaConversions.setAsJavaSet(inputSystemStreamSet),
+      inputSystemStreamSet.asJava,
       mockMetadataCache,
       new MetricsRegistryMap(),
       50
@@ -140,7 +140,7 @@ class TestStreamPartitionCountMonitor extends AssertionsForJUnit with MockitoSug
     val sampleCount = new CountDownLatch(2); // Verify 2 invocations
 
     val monitor = new StreamPartitionCountMonitor(
-      JavaConversions.setAsJavaSet(inputSystemStreamSet),
+      inputSystemStreamSet.asJava,
       mockMetadataCache,
       new MetricsRegistryMap(),
       50
index 6efa5e8..47e1b0a 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.job
 
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.ShellCommandConfig
 import java.net.URL
@@ -30,7 +30,7 @@ class TestShellCommandBuilder {
   @Test
   def testEnvironmentVariables {
     val urlStr = "http://www.google.com"
-    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo"))
+    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
     val scb = new ShellCommandBuilder
     scb.setConfig(config)
     scb.setId(1)
@@ -38,15 +38,15 @@ class TestShellCommandBuilder {
     val command = scb.buildCommand
     val environment = scb.buildEnvironment
     assertEquals("foo", command)
-    assertEquals("1", environment(ShellCommandConfig.ENV_CONTAINER_ID))
-    assertEquals(urlStr, environment(ShellCommandConfig.ENV_COORDINATOR_URL))
+    assertEquals("1", environment.get(ShellCommandConfig.ENV_CONTAINER_ID))
+    assertEquals(urlStr, environment.get(ShellCommandConfig.ENV_COORDINATOR_URL))
   }
 
   // if cmdPath is specified, the full path to the command should be adjusted
   @Test
   def testCommandWithFwkPath {
     val urlStr = "http://www.linkedin.com"
-    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo"))
+    val config = new MapConfig(Map(ShellCommandConfig.COMMAND_SHELL_EXECUTE -> "foo").asJava)
     val scb = new ShellCommandBuilder
     scb.setConfig(config)
     scb.setId(1)
index a55af50..d02a73c 100644 (file)
  * under the License.
  */
 
-package org.apache.samza.job.local;
+package org.apache.samza.job.local
 
 import org.apache.samza.coordinator.JobModelManager
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.job.ApplicationStatus
 import org.apache.samza.job.CommandBuilder
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestProcessJob {
   @Test
   def testProcessJobShouldFinishOnItsOwn {
     val commandBuilder = new CommandBuilder {
       override def buildCommand = "sleep 1"
-      override def buildEnvironment = Map[String, String]()
+      override def buildEnvironment = Map[String, String]().asJava
     }
     val coordinator = new MockJobModelManager()
     val job = new ProcessJob(commandBuilder, coordinator)
@@ -43,7 +43,7 @@ class TestProcessJob {
   def testProcessJobKillShouldWork {
     val commandBuilder = new CommandBuilder {
       override def buildCommand = "sleep 999999999"
-      override def buildEnvironment = Map[String, String]()
+      override def buildEnvironment = Map[String, String]().asJava
     }
     val coordinator = new MockJobModelManager()
     val job = new ProcessJob(commandBuilder, coordinator)
index 3cfd439..88370dc 100644 (file)
@@ -26,7 +26,6 @@ import org.junit.Test
 import org.apache.samza.task.TaskContext
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.config.MapConfig
-import org.apache.samza.Partition
 import org.apache.samza.metrics.JvmMetrics
 
 import java.lang.management.ManagementFactory
@@ -38,7 +37,7 @@ import javax.management.remote.JMXConnectorServerFactory
 import javax.management.remote.JMXConnectorServer
 import javax.management.remote.JMXConnectorFactory
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object TestJmxReporter {
   val port = 4500
@@ -68,7 +67,7 @@ class TestJmxReporter {
   def testJmxReporter {
     val registry = new MetricsRegistryMap
     val jvm = new JvmMetrics(registry)
-    val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]()))
+    val reporter = new JmxReporterFactory().getMetricsReporter("", "", new MapConfig(Map[String, String]().asJava))
 
     reporter.register("test", registry)
     reporter.start
index 3d0a603..029bbef 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestCheckpointSerde {
   @Test
@@ -37,7 +37,7 @@ class TestCheckpointSerde {
     var offsets = Map[SystemStreamPartition, String]()
     val systemStreamPartition = new SystemStreamPartition("test-system", "test-stream", new Partition(777))
     offsets += systemStreamPartition -> "1"
-    val deserializedOffsets = serde.fromBytes(serde.toBytes(new Checkpoint(offsets)))
+    val deserializedOffsets = serde.fromBytes(serde.toBytes(new Checkpoint(offsets.asJava)))
     assertEquals("1", deserializedOffsets.getOffsets.get(systemStreamPartition))
     assertEquals(1, deserializedOffsets.getOffsets.size)
   }
index 50b2d0f..2e14511 100644 (file)
@@ -23,18 +23,18 @@ package org.apache.samza.serializers
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 
 class TestJsonSerde {
   @Test
   def testJsonSerdeShouldWork {
     val serde = new JsonSerde[java.util.HashMap[String, Object]]
-    val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)))
+    val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)).asJava)
     val bytes = serde.toBytes(obj)
     assertEquals(obj, serde.fromBytes(bytes))
     val serdeHashMapEntry = new JsonSerde[java.util.Map.Entry[String, Object]]
-    obj.entrySet().foreach(entry => {
+    obj.entrySet().asScala.foreach(entry => {
       try {
         val entryBytes = serdeHashMapEntry.toBytes(entry)
       } catch {
index c82e6b1..2495baf 100644 (file)
@@ -38,9 +38,9 @@ import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 
-import scala.collection.JavaConversions
+import scala.collection.JavaConverters._
 
 class TestTaskStorageManager extends MockitoSugar {
 
@@ -356,9 +356,9 @@ class TestTaskStorageManager extends MockitoSugar {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
 
     val mockSystemAdmin = mock[SystemAdmin]
-    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")))))
-    val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
-    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")).asJava))
+    val myMap = mockSspMetadata.asJava
+    when(mockSystemAdmin.getSystemStreamMetadata(any(Set("").asJava.getClass))).thenReturn(myMap)
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -389,9 +389,9 @@ class TestTaskStorageManager extends MockitoSugar {
         TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName) + File.separator + "OFFSET")
 
     val mockSystemAdmin = mock[SystemAdmin]
-    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")))))
-    val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
-    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")).asJava))
+    val myMap = mockSspMetadata.asJava
+    when(mockSystemAdmin.getSystemStreamMetadata(any(Set("").asJava.getClass))).thenReturn(myMap)
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -453,9 +453,9 @@ class TestTaskStorageManager extends MockitoSugar {
     Util.writeDataToFile(offsetFilePath, "100")
 
     val mockSystemAdmin = mock[SystemAdmin]
-    var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")))))
-    var myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
-    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+    var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")).asJava))
+    var myMap = mockSspMetadata.asJava
+    when(mockSystemAdmin.getSystemStreamMetadata(any(Set("").asJava.getClass))).thenReturn(myMap)
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
@@ -472,9 +472,9 @@ class TestTaskStorageManager extends MockitoSugar {
     assertEquals("Found incorrect value in offset file!", "139", Util.readDataFromFile(offsetFilePath))
 
     // Flush again
-    mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")))))
-    myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
-    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+    mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")).asJava))
+    myMap = mockSspMetadata.asJava
+    when(mockSystemAdmin.getSystemStreamMetadata(any(Set("").asJava.getClass))).thenReturn(myMap)
 
     //Invoke test method
     taskStorageManager.flush()
@@ -491,9 +491,9 @@ class TestTaskStorageManager extends MockitoSugar {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
 
     val mockSystemAdmin = mock[SystemAdmin]
-    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", null, null)))))
-    val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
-    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", null, null)).asJava))
+    val myMap = mockSspMetadata.asJava
+    when(mockSystemAdmin.getSystemStreamMetadata(any(Set("").asJava.getClass))).thenReturn(myMap)
 
     //Build TaskStorageManager
     val taskStorageManager = new TaskStorageManagerBuilder()
index 1f81589..a6d82e1 100644 (file)
@@ -26,7 +26,6 @@ import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
@@ -54,7 +53,7 @@ class TestRangeSystemStreamPartitionMatcher {
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
       JobConfig.SSP_MATCHER_CONFIG_RANGES -> range,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
+      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
   }
 
   @Test
@@ -74,7 +73,7 @@ class TestRangeSystemStreamPartitionMatcher {
   def testFilterWithInvalidMatcherConfigRange() {
     val config = getConfig("--")
 
-    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config)
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
     assertEquals(0, filteredSet.size)
   }
 
@@ -82,7 +81,7 @@ class TestRangeSystemStreamPartitionMatcher {
   def testFilterWithMatcherConfigRangeWithNomatches() {
     val config = getConfig("4-5")
 
-    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config)
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
     assertEquals(0, filteredSet.size)
   }
 
@@ -90,7 +89,7 @@ class TestRangeSystemStreamPartitionMatcher {
   def testFilterWithEmptyMatcherConfigRange() {
     val config = getConfig("")
 
-    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet, config)
+    val filteredSet = new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
     assertEquals(0, filteredSet.size)
   }
 
@@ -101,9 +100,9 @@ class TestRangeSystemStreamPartitionMatcher {
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_RANGE,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
+      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
 
-    new RangeSystemStreamPartitionMatcher().filter(sspSet, config)
+    new RangeSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
   }
 
 }
index aa56f0b..b7f2119 100644 (file)
@@ -28,7 +28,6 @@ import org.junit.Test
 
 import scala.collection.mutable
 import scala.collection.JavaConverters._
-import scala.collection.JavaConversions._
 
 class TestRegexSystemStreamPartitionMatcher {
   val sspSet = mutable.Set(new SystemStreamPartition("test", "stream1", new Partition(0)))
@@ -54,14 +53,14 @@ class TestRegexSystemStreamPartitionMatcher {
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CLASS_REGEX,
       JobConfig.SSP_MATCHER_CONFIG_REGEX -> regex,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
+      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
   }
 
   @Test
   def testFilterWithMatcherConfigRegexWithNomatches() {
     val config = getConfig("--")
 
-    val filteredSet = new RegexSystemStreamPartitionMatcher().filter(sspSet, config)
+    val filteredSet = new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
     assertEquals(0, filteredSet.size)
   }
 
@@ -72,8 +71,8 @@ class TestRegexSystemStreamPartitionMatcher {
       TaskConfig.INPUT_STREAMS -> "test.stream1",
       JobConfig.STREAM_JOB_FACTORY_CLASS -> classOf[ThreadJobFactory].getCanonicalName,
       JobConfig.SSP_MATCHER_CLASS -> JobConfig.SSP_MATCHER_CONFIG_REGEX,
-      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
+      (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName).asJava)
 
-    new RegexSystemStreamPartitionMatcher().filter(sspSet, config)
+    new RegexSystemStreamPartitionMatcher().filter(sspSet.asJava, config)
   }
 }
\ No newline at end of file
index 4716d97..f55e4bf 100644 (file)
@@ -22,64 +22,64 @@ package org.apache.samza.system
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.util.Clock
-import org.junit.{Before, Test}
+import org.junit.Test
 import org.mockito.Mockito._
 import org.scalatest.{Matchers => ScalaTestMatchers}
 import org.scalatest.junit.AssertionsForJUnit
-import org.scalatest.mock.MockitoSugar
-import scala.collection.JavaConversions._
+import org.scalatest.mockito.MockitoSugar
+import scala.collection.JavaConverters._
 
 class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with ScalaTestMatchers {
   def makeMetadata(streamNames: Set[String] = Set("stream"), numPartitions: Int = 4) = {
     val partitions = (0 until numPartitions).map(partition => {
       new Partition(partition) -> new SystemStreamPartitionMetadata("oldest", "newest", "upcoming")
     }).toMap
-    streamNames.map(name => name -> new SystemStreamMetadata(name, partitions)).toMap
+    streamNames.map(name => name -> new SystemStreamMetadata(name, partitions.asJava)).toMap
   }
 
   @Test
   def testFetchUncachedMetadataFromSystemAdmin {
     val systemAdmins = Map("foo" -> mock[SystemAdmin])
-    when(systemAdmins("foo").getSystemStreamMetadata(Set("bar"))).thenReturn(makeMetadata(Set("bar")))
+    when(systemAdmins("foo").getSystemStreamMetadata(Set("bar").asJava)).thenReturn(makeMetadata(Set("bar")).asJava)
     val streams = Set(new SystemStream("foo", "bar"))
     val cache = new StreamMetadataCache(systemAdmins)
 
     val result = cache.getStreamMetadata(streams)
     streams shouldEqual result.keySet
     result(new SystemStream("foo", "bar")).getSystemStreamPartitionMetadata.size should equal(4)
-    verify(systemAdmins("foo"), times(1)).getSystemStreamMetadata(Set("bar"))
+    verify(systemAdmins("foo"), times(1)).getSystemStreamMetadata(Set("bar").asJava)
   }
 
   @Test
   def testCacheExpiry {
     val clock = mock[Clock]
     val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream"))).thenReturn(makeMetadata())
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava)).thenReturn(makeMetadata().asJava)
     val streams = Set(new SystemStream("system", "stream"))
     val cache = new StreamMetadataCache(systemAdmins = systemAdmins, clock = clock)
 
     when(clock.currentTimeMillis).thenReturn(0)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream").asJava)
 
     when(clock.currentTimeMillis).thenReturn(cache.cacheTTLms / 2)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream"))
+    verify(systemAdmins("system"), times(1)).getSystemStreamMetadata(Set("stream").asJava)
 
     when(clock.currentTimeMillis).thenReturn(2 * cache.cacheTTLms)
     cache.getStreamMetadata(streams)
     cache.getStreamMetadata(streams)
     cache.getStreamMetadata(streams)
-    verify(systemAdmins("system"), times(2)).getSystemStreamMetadata(Set("stream"))
+    verify(systemAdmins("system"), times(2)).getSystemStreamMetadata(Set("stream").asJava)
   }
 
   @Test
   def testGroupingRequestsBySystem {
     val systemAdmins = Map("sys1" -> mock[SystemAdmin], "sys2" -> mock[SystemAdmin])
-    when(systemAdmins("sys1").getSystemStreamMetadata(Set("stream1a", "stream1b")))
-      .thenReturn(makeMetadata(Set("stream1a", "stream1b"), numPartitions = 3))
-    when(systemAdmins("sys2").getSystemStreamMetadata(Set("stream2a", "stream2b")))
-      .thenReturn(makeMetadata(Set("stream2a", "stream2b"), numPartitions = 5))
+    when(systemAdmins("sys1").getSystemStreamMetadata(Set("stream1a", "stream1b").asJava))
+      .thenReturn(makeMetadata(Set("stream1a", "stream1b"), numPartitions = 3).asJava)
+    when(systemAdmins("sys2").getSystemStreamMetadata(Set("stream2a", "stream2b").asJava))
+      .thenReturn(makeMetadata(Set("stream2a", "stream2b"), numPartitions = 5).asJava)
     val streams = Set(
       new SystemStream("sys1", "stream1a"), new SystemStream("sys1", "stream1b"),
       new SystemStream("sys2", "stream2a"), new SystemStream("sys2", "stream2b")
@@ -90,15 +90,15 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
       val expectedPartitions = if (stream.getSystem == "sys1") 3 else 5
       result(stream).getSystemStreamPartitionMetadata.size shouldEqual expectedPartitions
     })
-    verify(systemAdmins("sys1"), times(1)).getSystemStreamMetadata(Set("stream1a", "stream1b"))
-    verify(systemAdmins("sys2"), times(1)).getSystemStreamMetadata(Set("stream2a", "stream2b"))
+    verify(systemAdmins("sys1"), times(1)).getSystemStreamMetadata(Set("stream1a", "stream1b").asJava)
+    verify(systemAdmins("sys2"), times(1)).getSystemStreamMetadata(Set("stream2a", "stream2b").asJava)
   }
 
   @Test
   def testSystemOmitsStreamFromResult {
     val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream1", "stream2")))
-      .thenReturn(makeMetadata(Set("stream1"))) // metadata doesn't include stream2
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream1", "stream2").asJava))
+      .thenReturn(makeMetadata(Set("stream1")).asJava) // metadata doesn't include stream2
     val streams = Set(new SystemStream("system", "stream1"), new SystemStream("system", "stream2"))
     val exception = intercept[SamzaException] {
       new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
@@ -109,8 +109,8 @@ class TestStreamMetadataCache extends AssertionsForJUnit with MockitoSugar with
   @Test
   def testSystemReturnsNullMetadata {
     val systemAdmins = Map("system" -> mock[SystemAdmin])
-    when(systemAdmins("system").getSystemStreamMetadata(Set("stream")))
-      .thenReturn(Map("stream" -> null))
+    when(systemAdmins("system").getSystemStreamMetadata(Set("stream").asJava))
+      .thenReturn(Map[String, SystemStreamMetadata]("stream" -> null).asJava)
     val streams = Set(new SystemStream("system", "stream"))
     val exception = intercept[SamzaException] {
       new StreamMetadataCache(systemAdmins).getStreamMetadata(streams)
index b5d58e3..b970fc9 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.system.chooser.DefaultChooser
 import org.apache.samza.system.chooser.MockMessageChooser
 import org.apache.samza.util.BlockingEnvelopeMap
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestSystemConsumers {
   def testPollIntervalMs {
@@ -159,7 +159,7 @@ class TestSystemConsumers {
       def start = consumerStarted += 1
       def stop = consumerStopped += 1
       def register(systemStreamPartition: SystemStreamPartition, offset: String) = consumerRegistered += systemStreamPartition -> offset
-      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava
     })
 
     val consumers = new SystemConsumers(new MessageChooser {
@@ -199,7 +199,7 @@ class TestSystemConsumers {
       def start {}
       def stop {}
       def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
-      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+      def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = Map[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]().asJava
     })
     val consumers = new SystemConsumers(new MessageChooser {
       def update(envelope: IncomingMessageEnvelope) = Unit
@@ -300,7 +300,7 @@ class TestSystemConsumers {
     def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = {
       polls += 1
       lastPoll = systemStreamPartitions
-      pollResponse
+      pollResponse.asJava
     }
     def setResponseSizes(numEnvelopes: Int) {
       val q = new java.util.ArrayList[IncomingMessageEnvelope]()
@@ -330,6 +330,6 @@ class TestSystemConsumers {
 
 object TestSystemConsumers {
   def getSystemConsumers(consumers: java.util.Map[String, SystemConsumer]) : SystemConsumers = {
-    new SystemConsumers(new DefaultChooser, consumers.toMap)
+    new SystemConsumers(new DefaultChooser, consumers.asScala.toMap)
   }
 }
index 2a095ce..3c07545 100644 (file)
@@ -35,7 +35,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.runners.Parameterized.Parameters
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 @RunWith(value = classOf[Parameterized])
 class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, SystemStreamMetadata]) => MessageChooser) {
@@ -50,7 +50,7 @@ class TestBootstrappingChooser(getChooser: (MessageChooser, Map[SystemStream, Sy
   private def getMetadata(envelope: IncomingMessageEnvelope, newestOffset: String, futureOffset: Option[String] = None) = {
     new SystemStreamMetadata(
       envelope.getSystemStreamPartition.getStream,
-      Map(envelope.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, newestOffset, futureOffset.getOrElse(null))))
+      Map(envelope.getSystemStreamPartition.getPartition -> new SystemStreamPartitionMetadata(null, newestOffset, futureOffset.getOrElse(null))).asJava)
   }
 
   @Test
index 5a04469..b873762 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestDefaultChooser {
   val envelope1 = new IncomingMessageEnvelope(new SystemStreamPartition("kafka", "stream", new Partition(0)), null, null, 1);
@@ -53,9 +53,9 @@ class TestDefaultChooser {
     val env8Metadata = new SystemStreamPartitionMetadata("0", "456", "654")
     val streamMetadata = new SystemStreamMetadata("stream", Map(
       envelope1.getSystemStreamPartition().getPartition() -> env1Metadata,
-      envelope5.getSystemStreamPartition().getPartition() -> env5Metadata))
+      envelope5.getSystemStreamPartition().getPartition() -> env5Metadata).asJava)
     val stream3Metadata = new SystemStreamMetadata("stream3", Map(
-      envelope8.getSystemStreamPartition().getPartition() -> env8Metadata))
+      envelope8.getSystemStreamPartition().getPartition() -> env8Metadata).asJava)
     val chooser = new DefaultChooser(
       mock0,
       Some(2),
@@ -144,7 +144,7 @@ class TestDefaultChooser {
     val configMap = Map(
       "task.inputs" -> "kafka.foo,kafka.bar-baz",
       "systems.kafka.streams.bar-baz.samza.bootstrap" -> "true")
-    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val config = new DefaultChooserConfig(new MapConfig(configMap.asJava))
     val bootstrapStreams = config.getBootstrapStreams
     assertEquals(1, bootstrapStreams.size)
     assertTrue(bootstrapStreams.contains(new SystemStream("kafka", "bar-baz")))
@@ -155,10 +155,10 @@ class TestDefaultChooser {
     val configMap = Map(
       "task.inputs" -> "kafka.foo,kafka.bar-baz",
       "systems.kafka.streams.bar-baz.samza.priority" -> "3")
-    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val config = new DefaultChooserConfig(new MapConfig(configMap.asJava))
     val priorityStreams = config.getPriorityStreams
     assertEquals(1, priorityStreams.size)
-    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+    assertEquals(3, priorityStreams.get(new SystemStream("kafka", "bar-baz")))
   }
 
   @Test
@@ -167,10 +167,10 @@ class TestDefaultChooser {
       "task.broadcast.inputs" -> "kafka.foo#[1-2],kafka.bar-baz#5,kafka.fizz#0",
       "systems.kafka.streams.bar-baz.samza.priority" -> "3",
       "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
-    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val config = new DefaultChooserConfig(new MapConfig(configMap.asJava))
     val priorityStreams = config.getPriorityStreams
     assertEquals(1, priorityStreams.size)
-    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+    assertEquals(3, priorityStreams.get(new SystemStream("kafka", "bar-baz")))
 
     val bootstrapStreams = config.getBootstrapStreams
     assertEquals(1, bootstrapStreams.size())
@@ -186,11 +186,11 @@ class TestDefaultChooser {
       "systems.kafka.streams.bar-baz.samza.priority" -> "3",
       "systems.kafka.streams.bootstrapTopic.samza.bootstrap" -> "true",
       "systems.kafka.streams.fizz.samza.bootstrap" -> "true")
-    val config = new DefaultChooserConfig(new MapConfig(configMap))
+    val config = new DefaultChooserConfig(new MapConfig(configMap.asJava))
     val priorityStreams = config.getPriorityStreams
     assertEquals(2, priorityStreams.size)
-    assertEquals(2, priorityStreams(new SystemStream("kafka", "priorityTopic")))
-    assertEquals(3, priorityStreams(new SystemStream("kafka", "bar-baz")))
+    assertEquals(2, priorityStreams.get(new SystemStream("kafka", "priorityTopic")))
+    assertEquals(3, priorityStreams.get(new SystemStream("kafka", "bar-baz")))
 
     val bootstrapStreams = config.getBootstrapStreams
     assertEquals(2, bootstrapStreams.size())
index 525d126..8cc593a 100644 (file)
@@ -21,9 +21,7 @@ package org.apache.samza.system.filereader
 
 import java.io.PrintWriter
 import java.io.File
-import java.io.RandomAccessFile
 
-import org.apache.samza.SamzaException
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
@@ -34,7 +32,7 @@ import org.junit.After
 import org.scalatest.junit.AssertionsForJUnit
 
 import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestFileReaderSystemAdmin extends AssertionsForJUnit {
 
@@ -70,7 +68,7 @@ class TestFileReaderSystemAdmin extends AssertionsForJUnit {
     val ssp5 = new SystemStreamPartition("file-reader", files(4), new Partition(0))
 
     val offsets: java.util.Map[SystemStreamPartition, String] =
-      HashMap(ssp3 -> "0", ssp4 -> "12", ssp5 -> "25")
+      HashMap(ssp3 -> "0", ssp4 -> "12", ssp5 -> "25").asJava
     val afterOffsets = fileReaderSystemAdmin.getOffsetsAfter(offsets)
     assertEquals("12", afterOffsets.get(ssp3))
     assertEquals("25", afterOffsets.get(ssp4))
@@ -80,14 +78,14 @@ class TestFileReaderSystemAdmin extends AssertionsForJUnit {
   @Test
   def testGetSystemStreamMetadata {
     val fileReaderSystemAdmin = new FileReaderSystemAdmin
-    val allMetadata = fileReaderSystemAdmin.getSystemStreamMetadata(setAsJavaSet(files.toSet))
+    val allMetadata = fileReaderSystemAdmin.getSystemStreamMetadata(setAsJavaSetConverter(files.toSet).asJava)
     val expectedEmpty = new SystemStreamPartitionMetadata(null, null, "0")
     val expectedNoEntry = new SystemStreamPartitionMetadata("0", "0", "0")
     val expectedOneEntry = new SystemStreamPartitionMetadata("0", "0", "12")
     val expectedTwoEntry = new SystemStreamPartitionMetadata("0", "12", "25")
     val expectedMoreEntry = new SystemStreamPartitionMetadata("0", "37", "50")
 
-    allMetadata.foreach { entry =>
+    allMetadata.asScala.foreach { entry =>
       {
         val result = (entry._2).getSystemStreamPartitionMetadata().get(new Partition(0))
         entry._1 match {
index 5707bb4..22e4853 100644 (file)
@@ -30,8 +30,7 @@ import org.junit.Assert._
 import org.junit.BeforeClass
 import org.junit.Test
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.HashMap
+import scala.collection.JavaConverters._
 
 object TestFileReaderSystemConsumer {
   val consumer = new FileReaderSystemConsumer("file-reader", null)
@@ -90,10 +89,10 @@ class TestFileReaderSystemConsumer {
     consumer.start
     Thread.sleep(500)
 
-    val ssp1Result = consumer.poll(Set(ssp1), 1000)
-    val ssp2Result = consumer.poll(Set(ssp2), 1000)
-    val ssp3Result = consumer.poll(Set(ssp3), 1000)
-    val ssp4Result = consumer.poll(Set(ssp4), 1000)
+    val ssp1Result = consumer.poll(Set(ssp1).asJava, 1000)
+    val ssp2Result = consumer.poll(Set(ssp2).asJava, 1000)
+    val ssp3Result = consumer.poll(Set(ssp3).asJava, 1000)
+    val ssp4Result = consumer.poll(Set(ssp4).asJava, 1000)
 
     assertEquals(0, ssp1Result.size)
     assertEquals(0, ssp2Result.size)
@@ -114,7 +113,7 @@ class TestFileReaderSystemConsumer {
     Thread.sleep(1000)
 
     // ssp5 should read the new lines
-    val ssp5Result = consumer.poll(Set(ssp5), 1000)
+    val ssp5Result = consumer.poll(Set(ssp5).asJava, 1000)
     assertEquals(1, ssp5Result.size)
     assertEquals(3, ssp5Result.get(ssp5).size)
     envelope = ssp5Result.get(ssp5).remove(0)
index c3295f3..82b2c9a 100644 (file)
@@ -24,8 +24,6 @@ import org.junit.Assert._
 import org.junit.Test
 import org.scalatest.junit.AssertionsForJUnit
 
-import scala.collection.JavaConversions._
-
 class TestFileReaderSystemFactory extends AssertionsForJUnit {
 
   @Test
index 53ff372..12c93ae 100644 (file)
@@ -23,14 +23,7 @@ package org.apache.samza.system.hdfs
 import java.text.SimpleDateFormat
 import java.util.UUID
 
-import org.apache.samza.SamzaException
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config.{YarnConfig, Config, ScalaMapConfig}
-import org.apache.samza.util.{Logging, Util}
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
 
 object HdfsConfig {
 
index 9ed64c3..b77cbb9 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.samza.container.TaskName
 import org.codehaus.jackson.`type`.TypeReference
 import org.codehaus.jackson.map.ObjectMapper
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * Kafka Checkpoint Log-specific key used to identify what type of entry is
@@ -43,7 +43,7 @@ class KafkaCheckpointLogKey private (val map: Map[String, String]) {
    */
   def toBytes(): Array[Byte] = {
     val jMap = new util.HashMap[String, String](map.size)
-    jMap.putAll(map)
+    jMap.putAll(map.asJava)
 
     JSON_MAPPER.writeValueAsBytes(jMap)
   }
@@ -156,7 +156,7 @@ object KafkaCheckpointLogKey {
         }
       }
 
-      new KafkaCheckpointLogKey(jmap.toMap)
+      new KafkaCheckpointLogKey(jmap.asScala.toMap)
     } catch {
       case e: Exception =>
         throw new SamzaException("Exception while deserializing checkpoint key", e)
index 7e9f18a..ae6330f 100644 (file)
@@ -25,7 +25,7 @@ import java.util.regex.Pattern
 import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
 
@@ -102,6 +102,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getFetchMessageMaxBytesTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
+      .asScala
       .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
       .map {
         case (fetchMessageMaxBytes, fetchSizeValue) =>
@@ -116,6 +117,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
+      .asScala
       .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
       .map {
         case (topicAutoOffsetReset, resetValue) =>
@@ -162,7 +164,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
-    filteredConfigs.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
+    filteredConfigs.asScala.foreach { kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
 
@@ -177,7 +179,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     consumerProps.putAll(subConf)
     consumerProps.put("group.id", groupId)
     consumerProps.put("client.id", clientId)
-    consumerProps.putAll(injectedProps)
+    consumerProps.putAll(injectedProps.asJava)
     new ConsumerConfig(consumerProps)
   }
 
@@ -189,7 +191,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val producerProps = new util.HashMap[String, Object]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
-    producerProps.putAll(injectedProps)
+    producerProps.putAll(injectedProps.asJava)
     new KafkaProducerConfig(systemName, clientId, producerProps)
   }
 }
index 4e3b247..6dc2f82 100644 (file)
@@ -24,12 +24,11 @@ import kafka.utils.ZkUtils
 import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
 import org.apache.samza.SamzaException
 import org.apache.samza.util.Util
-import collection.JavaConversions._
+import collection.JavaConverters._
 import org.apache.samza.util.Logging
 import scala.collection._
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.system.SystemStream
-import scala.util.Sorting
 
 /**
  * Dynamically determine the Kafka topics to use as input streams to the task via a regular expression.
@@ -80,6 +79,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
       // For each topic that matched, generate all the specified configs
       config
         .getRegexResolvedInheritedConfig(rewriterName)
+        .asScala
         .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + ".streams." + m.getStream + "." + kv._1, kv._2))
     }
     // Build new inputs
@@ -92,7 +92,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging {
       .sortWith(_ < _)
       .mkString(",")
 
-    new MapConfig((keysAndValsToAdd ++ config) += inputStreams)
+    new MapConfig(((keysAndValsToAdd ++ config.asScala) += inputStreams).asJava)
   }
 
   def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {
index 539a439..5338886 100644 (file)
@@ -23,7 +23,6 @@ package org.apache.samza.system.kafka
 
 import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.channels.ClosedByInterruptException
-import java.util.Map.Entry
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
 
 import kafka.api._
@@ -35,9 +34,8 @@ import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.concurrent
-import scala.collection.mutable
 import org.apache.samza.util.KafkaUtil
 
 /**
@@ -71,7 +69,7 @@ class BrokerProxy(
   val sleepMSWhileNoTopicPartitions = 100
 
   /** What's the next offset for a particular partition? **/
-  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
+  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala
 
   /** Block on the first call to get message if the fetcher has not yet returned its initial results **/
   // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
@@ -95,7 +93,7 @@ class BrokerProxy(
   def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
     debug("Adding new topic and partition %s to queue for %s" format (tp, host))
 
-    if (nextOffsets.containsKey(tp)) {
+    if (nextOffsets.asJava.containsKey(tp)) {
       toss("Already consuming TopicPartition %s" format tp)
     }
 
@@ -113,13 +111,13 @@ class BrokerProxy(
 
     nextOffsets += tp -> offset
 
-    metrics.topicPartitions(host, port).set(nextOffsets.size)
+    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
   }
 
   def removeTopicPartition(tp: TopicAndPartition) = {
-    if (nextOffsets.containsKey(tp)) {
+    if (nextOffsets.asJava.containsKey(tp)) {
       val offset = nextOffsets.remove(tp)
-      metrics.topicPartitions(host, port).set(nextOffsets.size)
+      metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
       debug("Removed %s" format tp)
       offset
     } else {
@@ -136,7 +134,7 @@ class BrokerProxy(
         (new ExponentialSleepStrategy).run(
           loop => {
             if (reconnect) {
-              metrics.reconnects(host, port).inc
+              metrics.reconnects.get((host, port)).inc
               simpleConsumer.close()
               simpleConsumer = createSimpleConsumer()
             }
@@ -178,23 +176,23 @@ class BrokerProxy(
     val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
 
     if (topicAndPartitionsToFetch.size > 0) {
-      metrics.brokerReads(host, port).inc
+      metrics.brokerReads.get((host, port)).inc
       val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
       firstCall = false
       firstCallBarrier.countDown()
 
       // Split response into errors and non errors, processing the errors first
-      val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
+      val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error == ErrorMapping.NoError)
 
       handleErrors(errorResponses, response)
 
-      nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+      nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) }
     } else {
       refreshLatencyMetrics
 
       debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
 
-      metrics.brokerSkippedFetchRequests(host, port).inc
+      metrics.brokerSkippedFetchRequests.get((host, port)).inc
 
       Thread.sleep(sleepMSWhileNoTopicPartitions)
     }
@@ -221,7 +219,7 @@ class BrokerProxy(
     immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
 
-  def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
+  def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response:FetchResponse) = {
     // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
     case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
 
@@ -229,10 +227,10 @@ class BrokerProxy(
 
     // Convert FetchResponse into easier-to-work-with Errors
     val errors = for (
-      error <- errorResponses;
-      errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values...
+      (topicAndPartition, responseData) <- errorResponses;
+      errorCode <- Option(response.errorCode(topicAndPartition.topic, topicAndPartition.partition)); // Scala's being cranky about referring to error.getKey values...
       exception <- Option(ErrorMapping.exceptionFor(errorCode))
-    ) yield new Error(error.getKey, errorCode, exception)
+    ) yield new Error(topicAndPartition, errorCode, exception)
 
     val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode }
     val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
@@ -274,10 +272,10 @@ class BrokerProxy(
       nextOffset = message.nextOffset
 
       val bytesSize = message.message.payloadSize + message.message.keySize
-      metrics.reads(tp).inc
-      metrics.bytesRead(tp).inc(bytesSize)
-      metrics.brokerBytesRead(host, port).inc(bytesSize)
-      metrics.offsets(tp).set(nextOffset)
+      metrics.reads.get(tp).inc
+      metrics.bytesRead.get(tp).inc(bytesSize)
+      metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
+      metrics.offsets.get(tp).set(nextOffset)
     }
 
     nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
@@ -285,8 +283,8 @@ class BrokerProxy(
     // Update high water mark
     val hw = data.hw
     if (hw >= 0) {
-      metrics.highWatermark(tp).set(hw)
-      metrics.lag(tp).set(hw - nextOffset)
+      metrics.highWatermark.get(tp).set(hw)
+      metrics.lag.get(tp).set(hw - nextOffset)
     } else {
       debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
     }
@@ -327,10 +325,10 @@ class BrokerProxy(
         if (latestOffset >= 0) {
           // only update the registered topicAndpartitions
           if(metrics.highWatermark.containsKey(topicAndPartition)) {
-            metrics.highWatermark(topicAndPartition).set(latestOffset)
+            metrics.highWatermark.get(topicAndPartition).set(latestOffset)
           }
           if(metrics.lag.containsKey(topicAndPartition)) {
-            metrics.lag(topicAndPartition).set(latestOffset - offset)
+            metrics.lag.get(topicAndPartition).set(latestOffset - offset)
           }
         }
       }
index 309b653..8c90c6c 100644 (file)
@@ -21,20 +21,17 @@ package org.apache.samza.system.kafka
 
 import java.util
 import java.util.{Properties, UUID}
-
 import kafka.admin.AdminUtils
 import kafka.api._
-import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.common.TopicAndPartition
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig
+import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStrategy, KafkaUtil, Logging}
 import org.apache.samza.{Partition, SamzaException}
-
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 
 object KafkaSystemAdmin extends Logging {
@@ -59,7 +56,7 @@ object KafkaSystemAdmin extends Logging {
               (systemStreamPartition.getPartition, partitionMetadata)
             })
             .toMap
-          val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata)
+          val streamMetadata = new SystemStreamMetadata(streamName, streamPartitionMetadata.asJava)
           (streamName, streamMetadata)
       }
       .toMap
@@ -151,7 +148,7 @@ class KafkaSystemAdmin(
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
-          streams.toSet,
+          streams.asScala.toSet,
           systemName,
           getTopicMetadata,
           metadataTTL)
@@ -162,11 +159,11 @@ class KafkaSystemAdmin(
               pm =>
                 new Partition(pm.partitionId) -> new SystemStreamPartitionMetadata("", "", "")
             }.toMap[Partition, SystemStreamPartitionMetadata]
-            (topic -> new SystemStreamMetadata(topic, partitionsMap))
+            (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava))
           }
         }
         loop.done
-        JavaConversions.mapAsJavaMap(result)
+        result.asJava
       },
 
       (exception, loop) => {
@@ -188,11 +185,11 @@ class KafkaSystemAdmin(
     // This is safe to do with Kafka, even if a topic is key-deduped. If the
     // offset doesn't exist on a compacted topic, Kafka will return the first
     // message AFTER the offset that was specified in the fetch request.
-    offsets.mapValues(offset => (offset.toLong + 1).toString)
+    offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
   }
 
   override def getSystemStreamMetadata(streams: java.util.Set[String]) =
-    getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500))
+    getSystemStreamMetadata(streams, new ExponentialSleepStrategy(initialDelayMs = 500)).asJava
 
   /**
    * Given a set of stream names (topics), fetch metadata from Kafka for each
@@ -207,7 +204,7 @@ class KafkaSystemAdmin(
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
-          streams.toSet,
+          streams.asScala.toSet,
           systemName,
           getTopicMetadata,
           metadataTTL)
@@ -241,12 +238,7 @@ class KafkaSystemAdmin(
                   debug("Stripping newest offsets for %s because the topic appears empty." format topicAndPartition)
                   newestOffsets -= topicAndPartition
                   debug("Setting oldest offset to 0 to consume from beginning")
-                  oldestOffsets.get(topicAndPartition) match {
-                    case Some(s) =>
-                      oldestOffsets.updated(topicAndPartition, "0")
-                    case None =>
-                      oldestOffsets.put(topicAndPartition, "0")
-                  }
+                  oldestOffsets += (topicAndPartition -> "0")
                 }
             }
           } finally {
index fa685ee..f25bb68 100644 (file)
@@ -26,7 +26,6 @@ import kafka.message.MessageAndOffset
 import org.apache.samza.Partition
 import org.apache.kafka.common.utils.Utils
 import org.apache.samza.util.Clock
-import java.util.UUID
 import kafka.serializer.DefaultDecoder
 import kafka.serializer.Decoder
 import org.apache.samza.util.BlockingEnvelopeMap
@@ -37,7 +36,7 @@ import org.apache.samza.util.TopicMetadataStore
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.system.SystemAdmin
 
 object KafkaSystemConsumer {
@@ -133,7 +132,7 @@ private[kafka] class KafkaSystemConsumer(
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
-  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
+  val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala
   var perPartitionFetchThreshold = fetchThreshold
   var perPartitionFetchThresholdBytes = 0L
 
index 6efd2dc..b680ed4 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
 import org.apache.samza.util.TimerUtils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
@@ -85,7 +85,7 @@ class KafkaSystemProducer(systemName: String,
         }
         currentProducer.close
 
-        sources.foreach {p =>
+        sources.asScala.foreach {p =>
           if (p._2.exceptionInCallback.get() == null) {
             flush(p._1)
           }
index 0f0bc22..41d380b 100644 (file)
@@ -23,13 +23,13 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.kafka.common.PartitionInfo
 import org.apache.samza.config.Config
 import org.apache.samza.config.ConfigException
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.{TopicExistsException, ErrorMapping, ReplicaNotAvailableException}
+import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.samza.system.kafka.TopicMetadataCache
 
 object KafkaUtil extends Logging {
index 1f2f62f..a14812e 100644 (file)
@@ -39,7 +39,7 @@ import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection._
 
 class TestKafkaCheckpointManager extends KafkaServerTestHarness {
@@ -59,8 +59,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
   val partition = new Partition(0)
   val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123").asJava)
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345").asJava)
 
   var producerConfig: KafkaProducerConfig = null
 
@@ -82,7 +82,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     config.put("acks", "all")
     config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     config.put(ProducerConfig.RETRIES_CONFIG, (new Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-    config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+    config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava)
     producerConfig = new KafkaProducerConfig("kafka", "i001", config)
 
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
@@ -234,7 +234,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
 
   // CheckpointManager with a specific checkpoint topic
   private def getKafkaCheckpointManager = getKafkaCheckpointManagerWithParam(checkpointTopic)
@@ -254,7 +254,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
     systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
     serde = new InvalideSerde(exception),
-    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
+    checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]().asJava)))
 
   class InvalideSerde(exception: String) extends CheckpointSerde {
     override def fromBytes(bytes: Array[Byte]): Checkpoint = {
index d626f1c..555ab9f 100644 (file)
 
 package org.apache.samza.config
 
-import java.net.URI
-import java.io.File
 import java.util.Properties
-import kafka.consumer.ConsumerConfig
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.junit.Before
-import org.junit.BeforeClass
 
 class TestKafkaConfig {
   
@@ -52,7 +48,7 @@ class TestKafkaConfig {
     val factory = new PropertiesConfigFactory()
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
 
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
 
     val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
@@ -85,33 +81,33 @@ class TestKafkaConfig {
 
   @Test
   def testStreamLevelFetchSizeOverride() {
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // default fetch size
     assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
 
     props.setProperty("systems." + SYSTEM_NAME + ".consumer.fetch.message.max.bytes", "262144")
-    val mapConfig1 = new MapConfig(props.toMap[String, String])
+    val mapConfig1 = new MapConfig(props.asScala.asJava)
     val kafkaConfig1 = new KafkaConfig(mapConfig1)
     val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // shared fetch size
     assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
     
     props.setProperty("systems." + SYSTEM_NAME + ".streams.topic1.consumer.fetch.message.max.bytes", "65536")
-    val mapConfig2 = new MapConfig(props.toMap[String, String])
+    val mapConfig2 = new MapConfig(props.asScala.asJava)
     val kafkaConfig2 = new KafkaConfig(mapConfig2)
     val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME)
     // topic fetch size
     assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
 
     // default samza.fetch.threshold.bytes
-    val mapConfig3 = new MapConfig(props.toMap[String, String])
+    val mapConfig3 = new MapConfig(props.asScala.asJava)
     val kafkaConfig3 = new KafkaConfig(mapConfig3)
     assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty)
 
     props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536")
-    val mapConfig4 = new MapConfig(props.toMap[String, String])
+    val mapConfig4 = new MapConfig(props.asScala.asJava)
     val kafkaConfig4 = new KafkaConfig(mapConfig4)
     assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get)
   }
@@ -125,7 +121,7 @@ class TestKafkaConfig {
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"), "delete")
     assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"), "compact")
@@ -138,7 +134,7 @@ class TestKafkaConfig {
   
   @Test
   def testDefaultValuesForProducerProperties() {
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -155,7 +151,7 @@ class TestKafkaConfig {
     
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue);
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -169,7 +165,7 @@ class TestKafkaConfig {
     
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, expectedValue);
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -181,7 +177,7 @@ class TestKafkaConfig {
   def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza");
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
@@ -191,7 +187,7 @@ class TestKafkaConfig {
   def testRetriesWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + ProducerConfig.RETRIES_CONFIG, "Samza");
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
index d6899b8..3871560 100644 (file)
@@ -23,7 +23,7 @@ import org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestKafkaSerdeConfig {
   val MAGIC_VAL = "1000"
@@ -31,7 +31,7 @@ class TestKafkaSerdeConfig {
   val paramsToTest = List(
     "serializers.registry.test.encoder", "serializers.registry.test.decoder")
 
-  val config = new MapConfig(mapAsJavaMap(paramsToTest.map { m => (m, MAGIC_VAL) }.toMap))
+  val config = new MapConfig(paramsToTest.map { m => (m, MAGIC_VAL) }.toMap.asJava)
 
   @Test
   def testKafkaConfigurationIsBackwardsCompatible {
index 89ced34..69d7da6 100644 (file)
@@ -19,9 +19,8 @@
 
 package org.apache.samza.config
 
-import collection.JavaConversions._
+import collection.JavaConverters._
 
-import org.apache.samza.SamzaException
 import org.junit.Assert._
 import org.junit.Test
 
@@ -45,7 +44,7 @@ class TestRegExTopicGenerator {
       getRegexConfigInherited + ".b.triumph" -> "spitfire",
       unrelated)
 
-    val config = new MapConfig(map)
+    val config = new MapConfig(map.asJava)
 
     // Don't actually talk to ZooKeeper
     val rewriter = new RegExTopicGenerator() {
@@ -83,7 +82,7 @@ class TestRegExTopicGenerator {
       override def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = List("yoyoyo")
     }
 
-    val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map))
+    val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava))
     assertEquals("test.yoyoyo", config.get(TaskConfig.INPUT_STREAMS))
   }
 }
index cc7077c..f0bdafd 100644 (file)
@@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{Matchers, Mockito}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
@@ -52,8 +52,8 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages.get(0)._2.offset)
-    assertEquals(84, sink.receivedMessages.get(1)._2.offset)
+    assertEquals(42, sink.receivedMessages(0)._2.offset)
+    assertEquals(84, sink.receivedMessages(1)._2.offset)
   }
 
   @Test def brokerProxySkipsFetchForEmptyRequests() = {
@@ -64,8 +64,8 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
+    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, bp.port)).getCount > 0)
+    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
   }
 
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
@@ -91,7 +91,7 @@ class TestBrokerProxy extends Logging {
       def refreshDropped() {}
 
       def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
-        receivedMessages.add((tp, msg, msg.offset.equals(highWatermark)))
+        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
       }
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
@@ -109,7 +109,7 @@ class TestBrokerProxy extends Logging {
 
     metrics.registerBrokerProxy(host, port)
     metrics.registerTopicAndPartition(tp)
-    metrics.topicPartitions(host, port).set(1)
+    metrics.topicPartitions.get((host, port)).set(1)
 
     val bp = new BrokerProxy(
       host,
@@ -168,7 +168,7 @@ class TestBrokerProxy extends Logging {
             val fetchResponsePartitionData = FetchResponsePartitionData(0, 500, messageSet)
             val map = scala.Predef.Map[TopicAndPartition, FetchResponsePartitionData](tp -> fetchResponsePartitionData)
 
-            when(fetchResponse.data).thenReturn(map)
+            when(fetchResponse.data).thenReturn(map.toSeq)
             when(fetchResponse.messageSet(any(classOf[String]), any(classOf[Int]))).thenReturn(messageSet)
             fetchResponse
           }
@@ -210,14 +210,14 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp, Option("0"))
     Thread.sleep(1000)
     // update when fetching messages
-    assertEquals(500, bp.metrics.highWatermark(tp).getValue)
-    assertEquals(415, bp.metrics.lag(tp).getValue)
+    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(415, bp.metrics.lag.get(tp).getValue)
 
     fetchTp1 = false
     Thread.sleep(1000)
     // update when not fetching messages
-    assertEquals(100, bp.metrics.highWatermark(tp).getValue)
-    assertEquals(15, bp.metrics.lag(tp).getValue)
+    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(15, bp.metrics.lag.get(tp).getValue)
 
     fetchTp1 = true
   }
@@ -264,7 +264,7 @@ class TestBrokerProxy extends Logging {
           val response = mock(classOf[FetchResponsePartitionData])
           when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
           val responseMap = Map(tp -> response)
-          when(mfr.data).thenReturn(responseMap)
+          when(mfr.data).thenReturn(responseMap.toSeq)
           invocationCount += 1
           mfr
         } else {
index be7db97..19f3903 100644 (file)
@@ -39,7 +39,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, ExponentialSleepStra
 import org.junit.Assert._
 import org.junit._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * README: New tests should be added to the Java tests. See TestKafkaSystemAdminJava
@@ -203,11 +203,11 @@ class TestKafkaSystemAdmin {
     validateTopic(TOPIC, 50)
 
     // Verify the empty topic behaves as expected.
-    var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
-    assertNotNull(metadata(TOPIC))
+    assertNotNull(metadata.get(TOPIC))
     // Verify partition count.
-    var sspMetadata = metadata(TOPIC).getSystemStreamPartitionMetadata
+    var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata
     assertEquals(50, sspMetadata.size)
     // Empty topics should have null for latest offset and 0 for earliest offset
     assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
@@ -218,11 +218,11 @@ class TestKafkaSystemAdmin {
     // Add a new message to one of the partitions, and verify that it works as
     // expected.
     producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val1".getBytes)).get()
-    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
-    val streamName = metadata.keySet.head
+    val streamName = metadata.keySet.asScala.head
     assertEquals(TOPIC, streamName)
-    sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
     // key1 gets hash-mod'd to partition 48.
     assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
     assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset)
@@ -234,10 +234,10 @@ class TestKafkaSystemAdmin {
 
     // Add a second message to one of the same partition.
     producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, "val2".getBytes)).get()
-    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
     assertEquals(TOPIC, streamName)
-    sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
     // key1 gets hash-mod'd to partition 48.
     assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
     assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset)
@@ -245,7 +245,7 @@ class TestKafkaSystemAdmin {
 
     // Validate that a fetch will return the message.
     val connector = getConsumerConnector
-    var stream = connector.createMessageStreams(Map(TOPIC -> 1)).get(TOPIC).get.get(0).iterator
+    var stream = connector.createMessageStreams(Map(TOPIC -> 1))(TOPIC).head.iterator
     var message = stream.next
     var text = new String(message.message, "UTF-8")
     connector.shutdown
@@ -261,10 +261,10 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testNonExistentTopic {
-    val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
-    val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
+    val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava)
+    val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0"))))
+      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0")).asJava))
   }
 
   @Test
@@ -273,9 +273,9 @@ class TestKafkaSystemAdmin {
     val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(1))
     val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
       ssp1 -> "1",
-      ssp2 -> "2"))
-    assertEquals("2", offsetsAfter(ssp1))
-    assertEquals("3", offsetsAfter(ssp2))
+      ssp2 -> "2").asJava)
+    assertEquals("2", offsetsAfter.get(ssp1))
+    assertEquals("3", offsetsAfter.get(ssp2))
   }
 
   @Test
@@ -310,7 +310,7 @@ class TestKafkaSystemAdmin {
     val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
     val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
     try {
-      systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff)
+      systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff)
       fail("expected CallLimitReached to be thrown")
     } catch {
       case e: ExponentialSleepStrategy.CallLimitReached => ()
index ce84b6d..c333935 100644 (file)
@@ -23,11 +23,10 @@ import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStream
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestKafkaSystemFactory {
   @Test
@@ -36,7 +35,7 @@ class TestKafkaSystemFactory {
     try {
       producerFactory.getProducer(
         "test",
-        new MapConfig(Map[String, String]()),
+        new MapConfig(Map[String, String]().asJava),
         new MetricsRegistryMap)
       fail("Expected to get a Samza exception.")
     } catch {
@@ -49,7 +48,7 @@ class TestKafkaSystemFactory {
   def testFailWhenSerdeIsInvalid {
     val producerFactory = new KafkaSystemFactory
     val config = new MapConfig(Map[String, String](
-      "streams.test.serde" -> "failme"))
+      "streams.test.serde" -> "failme").asJava)
     try {
       producerFactory.getProducer(
         "test",
@@ -70,7 +69,7 @@ class TestKafkaSystemFactory {
       "systems.test.producer.bootstrap.servers" -> "",
       "systems.test.samza.key.serde" -> "json",
       "systems.test.samza.msg.serde" -> "json",
-      "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory"))
+      "serializers.registry.json.class" -> "samza.serializers.JsonSerdeFactory").asJava)
     var producer = producerFactory.getProducer(
       "test",
       config,
@@ -91,7 +90,7 @@ class TestKafkaSystemFactory {
       StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
       StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
       StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
-    val config = new MapConfig(configMap)
+    val config = new MapConfig(configMap.asJava)
     assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system3", config))
     assertEquals(Map[String, String](), KafkaSystemFactory.getInjectedProducerProperties("system2", config))
     assertEquals(Map[String, String]("compression.type" -> "none"), KafkaSystemFactory.getInjectedProducerProperties("system1", config))
index 5112ac6..c771788 100644 (file)
@@ -205,14 +205,15 @@ class RocksDbKeyValueStore(
   class RocksDbIterator(iter: RocksIterator) extends KeyValueIterator[Array[Byte], Array[Byte]] {
     private var open = true
     private var firstValueAccessed = false
-    def close() = {
+
+    override def close() = {
       open = false
       iter.close()
     }
 
-    def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove")
+    override def remove() = throw new UnsupportedOperationException("RocksDB iterator doesn't support remove")
 
-    def hasNext() = iter.isValid
+    override def hasNext() = iter.isValid
 
     // The iterator is already pointing to the next element
     protected def peekKey() = {
@@ -231,7 +232,7 @@ class RocksDbKeyValueStore(
     // current element we are pointing to and advance the iterator to the next 
     // location (The new location may or may not be valid - this will surface
     // when the next next() call is made, the isValid will fail)
-    def next() = {
+    override def next() = {
       if (!hasNext()) {
         throw new NoSuchElementException
       }
index 4141cbf..2aac6aa 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.storage.{StoreProperties, StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.util.TimerUtils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * A key value store.
@@ -101,7 +101,7 @@ class KeyValueStorageEngine[K, V](
   def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
     val batch = new java.util.ArrayList[Entry[Array[Byte], Array[Byte]]](batchSize)
 
-    for (envelope <- envelopes) {
+    for (envelope <- envelopes.asScala) {
       val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
       val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
 
index 3de257c..9e67fc8 100644 (file)
@@ -21,7 +21,7 @@ package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Util.notNull
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
   val NullKeyErrorMessage = "Null is not a valid key."
@@ -39,7 +39,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
 
   def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
     notNull(keys, NullKeysErrorMessage)
-    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage))
     store.getAll(keys)
   }
 
@@ -50,7 +50,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
-    entries.foreach(entry => {
+    entries.asScala.foreach(entry => {
       notNull(entry.getKey, NullKeyErrorMessage)
       notNull(entry.getValue, NullValueErrorMessage)
     })
@@ -64,7 +64,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
 
   def deleteAll(keys: java.util.List[K]) = {
     notNull(keys, NullKeysErrorMessage)
-    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage))
     store.deleteAll(keys)
   }
 
index d77d476..c8939b7 100644 (file)
@@ -98,10 +98,10 @@ class SerializedKeyValueStore[K, V](
   }
 
   private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], Array[Byte]]) extends KeyValueIterator[K, V] {
-    def hasNext() = iter.hasNext()
-    def remove() = iter.remove()
-    def close() = iter.close()
-    def next(): Entry[K, V] = {
+    override def hasNext() = iter.hasNext()
+    override def remove() = iter.remove()
+    override def close() = iter.close()
+    override def next(): Entry[K, V] = {
       val nxt = iter.next()
       val key = fromBytesOrNull(nxt.getKey, keySerde)
       val value = fromBytesOrNull(nxt.getValue, msgSerde)
index 595dd0d..f57b275 100644 (file)
@@ -19,7 +19,7 @@
 
 package org.apache.samza.storage.kv
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.util
 
 /**
@@ -36,7 +36,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] {
   }
 
   override def putAll(entries: java.util.List[Entry[String, String]]) {
-    for (entry <- entries) {
+    for (entry <- entries.asScala) {
       kvMap.put(entry.getKey, entry.getValue)
     }
   }
index 1ce7d25..5d1b497 100644 (file)
@@ -37,7 +37,7 @@ import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.{CommandLine, Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.Random
 
 /**
@@ -258,7 +258,7 @@ class TestKeyValuePerformance extends Logging {
         store.flush()
 
         timer.reset().start()
-        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
         val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
 
         // Restore cache, in case it's enabled, to a state similar to the one above when the getAll test started
@@ -312,9 +312,9 @@ class TestKeyValuePerformance extends Logging {
         val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch)
 
         // We want to measure ::getAll when called many times, so populate the cache because first call is a cache-miss
-        val totalSize = store.getAll(shuffledKeys).values.map(_.length).sum
+        val totalSize = store.getAll(shuffledKeys.asJava).values.asScala.map(_.length).sum
         timer.reset().start()
-        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
         val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
 
         // We want to measure ::get when called many times, so populate the cache because first call is a cache-miss
index d7d23ec..babd15c 100644 (file)
@@ -36,7 +36,7 @@ import org.junit.Before
 import org.junit.Test
 import org.scalatest.Assertions.intercept
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -114,7 +114,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   def testGetAllWhenZeroMatch() {
     store.put(b("hello"), b("world"))
     val keys = List(b("foo"), b("bar"))
-    val actual = store.getAll(keys)
+    val actual = store.getAll(keys.asJava)
     keys.foreach(k => assertNull("Key: " + k, actual.get(k)))
   }
 
@@ -122,18 +122,18 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   def testGetAllWhenFullMatch() {
     val expected = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
     expected.foreach(e => store.put(e._1, e._2))
-    val actual = store.getAll(expected.keys.toList)
+    val actual = store.getAll(expected.keys.toList.asJava)
     assertEquals("Size", expected.size, actual.size)
     expected.foreach(e => assertArrayEquals("Value at: " + s(e._1), e._2, actual.get(e._1)))
   }
 
   @Test
   def testGetAllWhenPartialMatch() {
-    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2"))
-    val found = all.entrySet.head
-    val notFound = all.entrySet.last
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2")).asJava
+    val found = all.entrySet.asScala.head
+    val notFound = all.entrySet.asScala.last
     store.put(found.getKey, found.getValue)
-    val actual = store.getAll(List(notFound.getKey, found.getKey))
+    val actual = store.getAll(List(notFound.getKey, found.getKey).asJava)
     assertNull(actual.get(notFound.getKey))
     assertArrayEquals(found.getValue, actual.get(found.getKey))
   }
@@ -160,14 +160,14 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
 
       intercept[NullPointerException] { store.get(null) }
       intercept[NullPointerException] { store.getAll(null) }
-      intercept[NullPointerException] { store.getAll(List(a, null)) }
+      intercept[NullPointerException] { store.getAll(List(a, null).asJava) }
       intercept[NullPointerException] { store.delete(null) }
       intercept[NullPointerException] { store.deleteAll(null) }
-      intercept[NullPointerException] { store.deleteAll(List(a, null)) }
+      intercept[NullPointerException] { store.deleteAll(List(a, null).asJava) }
       intercept[NullPointerException] { store.put(null, a) }
       intercept[NullPointerException] { store.put(a, null) }
-      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null))) }
-      intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a))) }
+      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new Entry[Array[Byte], Array[Byte]](a, null)).asJava) }
+      intercept[NullPointerException] { store.putAll(List(new Entry[Array[Byte], Array[Byte]](null, a)).asJava) }
       intercept[NullPointerException] { store.range(a, null) }
       intercept[NullPointerException] { store.range(null, a) }
     }
@@ -182,7 +182,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
     // from the cache's underlying store (rocksdb), but that == would fail.
     val numEntries = CacheSize - 1
     val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + i)))
-    store.putAll(entries)
+    store.putAll(entries.asJava)
     if (cache) {
       assertTrue("All values should be found and cached.", entries.forall(e => store.get(e.getKey) == e.getValue))
     } else {
@@ -225,7 +225,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   def testDeleteAllWhenZeroMatch() {
     val foo = b("foo")
     store.put(foo, foo)
-    store.deleteAll(List(b("bar")))
+    store.deleteAll(List(b("bar")).asJava)
     assertArrayEquals(foo, store.get(foo))
   }
 
@@ -233,23 +233,23 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: String) {
   def testDeleteAllWhenFullMatch() {
     val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
     all.foreach(e => store.put(e._1, e._2))
-    assertEquals(all.size, store.getAll(all.keys.toList).size)
-    store.deleteAll(all.keys.toList)
+    assertEquals(all.size, store.getAll(all.keys.toList.asJava).size)
+    store.deleteAll(all.keys.toList.asJava)
     all.keys.foreach(key => assertNull("Value at: " + s(key), store.get(key)))
   }
 
   @Test
   def testDeleteAllWhenPartialMatch() {
-    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
-    val found = all.entrySet.head
-    val leftAlone = all.entrySet.last
-    all.foreach(e => store.put(e._1, e._2))
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")).asJava
+    val found = all.entrySet.asScala.head
+    val leftAlone = all.entrySet.asScala.last
+    all.asScala.foreach(e => store.put(e._1, e._2))
     assertArrayEquals(found.getValue, store.get(found.getKey))
-    store.deleteAll(List(b("not found"), found.getKey))
+    store.deleteAll(List(b("not found"), found.getKey).asJava)
     store.flush()
     val allIterator = store.all
     try {
-      assertEquals(1, allIterator.size)
+      assertEquals(1, allIterator.asScala.size)
       assertArrayEquals(leftAlone.getValue, store.get(leftAlone.getKey))
     } finally {
       allIterator.close()
index b803dfe..7a107f6 100644 (file)
@@ -45,7 +45,7 @@ import org.apache.samza.task._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, TopicMetadataStore}
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap}
 
 /*
@@ -70,8 +70,8 @@ object StreamTaskTestUtil {
   def zkConnect: String = s"127.0.0.1:$zkPort"
 
   var producer: Producer[Array[Byte], Array[Byte]] = null
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "123").asJava)
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new Partition(0)) -> "12345").asJava)
 
   var metadataStore: TopicMetadataStore = null
 
@@ -203,7 +203,7 @@ class StreamTaskTestUtil {
    */
   def startJob = {
     // Start task.
-    val job = new JobRunner(new MapConfig(jobConfig)).run()
+    val job = new JobRunner(new MapConfig(jobConfig.asJava)).run()
     assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
     TestTask.awaitTaskRegistered
     val tasks = TestTask.tasks
@@ -246,7 +246,7 @@ class StreamTaskTestUtil {
 
     val consumerConfig = new ConsumerConfig(props)
     val consumerConnector = Consumer.create(consumerConfig)
-    var stream = consumerConnector.createMessageStreams(Map(topic -> 1)).get(topic).get.get(0).iterator
+    val stream = consumerConnector.createMessageStreams(Map(topic -> 1))(topic).head.iterator
     var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null
     var messages = ArrayBuffer[String]()
 
index 06a107b..c10e7fb 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object TestShutdownStatefulTask {
   val STORE_NAME = "loggedstore"
@@ -119,7 +119,7 @@ class ShutdownStateStoreTask extends TestTask {
       .getStore(TestShutdownStatefulTask.STORE_NAME)
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
-    iter.foreach( p => restored += (p.getKey -> p.getValue))
+    iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
     System.err.println("ShutdownStateStoreTask.init(): %s" format restored)
     iter.close
   }
index 2240903..e5b6756 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object TestStatefulTask {
     val STORE_NAME = "mystore"
@@ -171,6 +171,7 @@ class StateStoreTestTask extends TestTask {
     store = context.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
     restored ++= iter
+      .asScala
       .map(_.getValue)
       .toSet
     System.err.println("StateStoreTestTask.init(): %s" format restored)
index e5aafbb..c7b1b6d 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.config.{Config, JobConfig, YarnConfig}
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
 import org.apache.hadoop.conf.Configuration
@@ -160,7 +160,7 @@ class ClientHelper(conf: Configuration) extends Logging {
     resource.setVirtualCores(cpu)
     info("set cpu core request to %s for %s" format (cpu, appId.get))
     appCtx.setResource(resource)
-    containerCtx.setCommands(cmds.toList)
+    containerCtx.setCommands(cmds.asJava)
     info("set command to %s for %s" format (cmds, appId.get))
 
     appCtx.setApplicationId(appId.get)
@@ -173,7 +173,7 @@ class ClientHelper(conf: Configuration) extends Logging {
     // include the resources from the universal resource configurations
     try {
       val resourceMapper = new LocalizerResourceMapper(new LocalizerResourceConfig(config), new YarnConfiguration(conf))
-      localResources ++= resourceMapper.getResourceMap
+      localResources ++= resourceMapper.getResourceMap.asScala
     } catch {
       case e: LocalizerResourceException => {
         throw new SamzaException("Exception during resource mapping from config. ", e)
@@ -202,12 +202,12 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     // prepare all local resources for localizer
     info("localResources is: %s" format localResources)
-    containerCtx.setLocalResources(localResources)
+    containerCtx.setLocalResources(localResources.asJava)
     info("set local resources on application master for %s" format appId.get)
 
     env match {
       case Some(env) => {
-        containerCtx.setEnvironment(env)
+        containerCtx.setEnvironment(env.asJava)
         info("set environment variables to %s for %s" format (env, appId.get))
       }
       case None =>
@@ -232,8 +232,8 @@ class ClientHelper(conf: Configuration) extends Logging {
   def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
     yarnClient
       .getApplications
-      .filter(appRep => appId.equals(appRep.getApplicationId()))
-      .headOption
+      .asScala
+      .find(appRep => appId.equals(appRep.getApplicationId))
   }
 
   def getApplicationMasters(status: Option[ApplicationStatus]): List[ApplicationReport] = {
@@ -241,9 +241,10 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     status match {
       case Some(status) => getAppsRsp
+        .asScala
         .filter(appRep => status.equals(convertState(appRep.getYarnApplicationState, appRep.getFinalApplicationStatus).get))
         .toList
-      case None => getAppsRsp.toList
+      case None => getAppsRsp.asScala.toList
     }
   }
 
index f057594..2d8a3f1 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.config.Config
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.samza.util.Logging
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class YarnJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config) = {
@@ -42,7 +42,7 @@ class YarnJobFactory extends StreamJobFactory with Logging {
 
     // Use the Samza job config "fs.<scheme>.impl" to override YarnConfiguration
     val fsImplConfig = new FileSystemImplConfig(config)
-    fsImplConfig.getSchemes.foreach(
+    fsImplConfig.getSchemes.asScala.foreach(
       (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), fsImplConfig.getFsImplClassName(scheme))
     )
 
index cdd389c..122a1df 100644 (file)
@@ -25,12 +25,12 @@ import scalate.ScalateSupport
 import org.apache.samza.config.Config
 import org.apache.samza.job.yarn.{YarnAppState, ClientHelper}
 import org.apache.samza.metrics._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import java.util.HashMap
 import org.apache.samza.serializers.model.SamzaObjectMapper
 
-class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
   val client = new ClientHelper(yarnConfig)
   val jsonMapper = SamzaObjectMapper.getObjectMapper
@@ -43,10 +43,10 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati
     val metricMap = new HashMap[String, java.util.Map[String, Object]]
 
     // build metric map
-    registry.getGroups.foreach(group => {
+    registry.getGroups.asScala.foreach(group => {
       val groupMap = new HashMap[String, Object]
 
-      registry.getGroup(group).foreach {
+      registry.getGroup(group).asScala.foreach {
         case (name, metric) =>
           metric.visit(new MetricsVisitor() {
             def counter(counter: Counter) =
@@ -79,7 +79,7 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningYarnContainers.foreach {
+    state.runningYarnContainers.asScala.foreach {
       case (containerId, container) =>
         val yarnContainerId = container.id.toString
         val containerMap = new HashMap[String, Object]
@@ -98,10 +98,10 @@ class ApplicationMasterRestServlet(config: Config, samzaAppState: SamzaApplicati
       "containers" -> containers,
       "host" -> "%s:%s".format(state.nodeHost, state.rpcUrl.getPort))
 
-    jsonMapper.writeValueAsString(new HashMap[String, Object](status))
+    jsonMapper.writeValueAsString(new HashMap[String, Object](status.asJava))
   }
 
   get("/config") {
-    jsonMapper.writeValueAsString(new HashMap[String, Object](config.sanitize.toMap))
+    jsonMapper.writeValueAsString(new HashMap[String, Object](samzaConfig.sanitize))
   }
 }
index a32cd65..d787f9e 100644 (file)
@@ -24,12 +24,12 @@ import org.scalatra._
 import scalate.ScalateSupport
 import org.apache.samza.job.yarn.YarnAppState
 import org.apache.samza.config.Config
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeMap
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
-class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterWebServlet(samzaConfig: Config, samzaAppState: SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
 
   before() {
@@ -38,7 +38,7 @@ class ApplicationMasterWebServlet(config: Config, samzaAppState: SamzaApplicatio
 
   get("/") {
     layoutTemplate("/WEB-INF/views/index.scaml",
-      "config" -> TreeMap(config.sanitize.toMap.toArray: _*),
+      "config" -> TreeMap(samzaConfig.sanitize.asScala.toMap.toArray: _*),
       "state" -> state,
       "samzaAppState" -> samzaAppState,
       "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))
index d3d34f2..c320a97 100644 (file)
@@ -21,8 +21,8 @@ package org.apache.samza.job.yarn
 
 import org.apache.samza.Partition
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory}
-import scala.collection.JavaConversions._
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin}
+import scala.collection.JavaConverters._
 
 /**
  * A mock implementation class that returns metadata for each stream that contains numTasks partitions in it.
@@ -30,12 +30,12 @@ import scala.collection.JavaConversions._
 class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
   def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = null
   def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    streamNames.map(streamName => {
-      var partitionMetadata = (0 until numTasks).map(partitionId => {
+    streamNames.asScala.map(streamName => {
+      val partitionMetadata = (0 until numTasks).map(partitionId => {
         new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, null, null)
       }).toMap
-      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
-    }).toMap[String, SystemStreamMetadata]
+      streamName -> new SystemStreamMetadata(streamName, partitionMetadata.asJava)
+    }).toMap.asJava
   }
 
   override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
index 5c15385..ad8337b 100644 (file)
@@ -24,11 +24,11 @@ import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{MapConfig, JobConfig, Config, YarnConfig}
+import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig}
 import org.mockito.Mockito._
 import org.mockito.Matchers.any
 import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 
 
 class TestClientHelper extends FunSuite {
index 1dd0c18..65c03d1 100644 (file)
@@ -23,17 +23,12 @@ import java.io.BufferedReader
 import java.net.URL
 import java.io.InputStreamReader
 import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.Partition
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, SystemAdmin, SystemFactory}
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
@@ -113,7 +108,7 @@ class TestSamzaYarnAppMasterService {
     "yarn.container.retry.count" -> "1",
     "yarn.container.retry.window.ms" -> "1999999999",
     "job.coordinator.system" -> "coordinator",
-    "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
+    "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName).asJava)
 }