Misc. Util cleanup
authorPrateek Maheshwari <pmaheshw@linkedin.com>
Wed, 18 Apr 2018 20:32:15 +0000 (13:32 -0700)
committerPrateek Maheshwari <pmaheshw@linkedin.com>
Wed, 18 Apr 2018 20:32:15 +0000 (13:32 -0700)
Major changes:
1. Broke up 'Util' class into multiple classes: 'FileUtil', 'HttpUtil', 'CoordinatorStreamUtil'.
2. Consolidated some Util classes: MathUtil, ScalaJavaUtil
3. Removed redundant Util classes: ClassloaderUtil, ScalaToJavaUtil
4. Renamed some Util classes for consistency: TimerUtils -> TimerUtil.
5. Inlined some util classes and methods to where they're used: LexicographicComparator to RocksDBKeyValueStore, defaultSerdeFactoryFromSerdeName to SerdeManager, etc.

Rest of the changes are updates to use the new classes and methods.

Testing: Local build and test works. Tested with a locally deployed Samza job.

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jacob Maes <jmakes@apache.org>, Jagadish Venkatraman <vjagadish1989@gmail.com>

Closes #455 from prateekm/util-cleanup

80 files changed:
samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java
samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java
samza-core/src/main/java/org/apache/samza/config/JavaSystemConfig.java
samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatClient.java
samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
samza-core/src/main/java/org/apache/samza/execution/JobNode.java
samza-core/src/main/java/org/apache/samza/execution/StreamManager.java
samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
samza-core/src/main/java/org/apache/samza/table/TableManager.java
samza-core/src/main/java/org/apache/samza/task/TaskFactoryUtil.java
samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java [deleted file]
samza-core/src/main/java/org/apache/samza/util/MathUtil.java [moved from samza-core/src/main/java/org/apache/samza/operators/util/MathUtils.java with 56% similarity]
samza-core/src/main/java/org/apache/samza/util/MetricsReporterLoader.java
samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java [deleted file]
samza-core/src/main/java/org/apache/samza/util/ThrottlingExecutor.java
samza-core/src/main/java/org/apache/samza/util/ThrottlingScheduler.java
samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporterFactory.scala
samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/util/ExponentialSleepStrategy.scala
samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala [deleted file]
samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/util/TimerUtil.scala [moved from samza-core/src/main/scala/org/apache/samza/util/TimerUtils.scala with 79% similarity]
samza-core/src/main/scala/org/apache/samza/util/Util.scala
samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
samza-core/src/test/java/org/apache/samza/util/TestMathUtils.java
samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
samza-core/src/test/scala/org/apache/samza/serializers/TestSerdeManager.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala [new file with mode: 0644]
samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemFactory.java
samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala
samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
samza-kafka/src/test/java/org/apache/samza/system/kafka/TestKafkaSystemAdminJava.java
samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
samza-rest/src/main/java/org/apache/samza/monitor/MonitorLoader.java
samza-rest/src/main/java/org/apache/samza/rest/SamzaRestApplication.java
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/AbstractJobProxy.java
samza-rest/src/main/java/org/apache/samza/rest/proxy/job/SimpleYarnJobProxy.java
samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java
samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxyFactory.java
samza-rest/src/main/java/org/apache/samza/rest/resources/TasksResource.java
samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
samza-yarn/src/main/java/org/apache/samza/validation/YarnJobValidationTool.java
samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
samza-yarn/src/test/java/org/apache/samza/webapp/TestYarnContainerHeartbeatServlet.java

index 2b65ae0..96f628c 100644 (file)
@@ -47,7 +47,6 @@ import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.BlobUtils;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.LeaseBlobManager;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.TableUtils;
@@ -301,7 +300,8 @@ public class AzureJobCoordinator implements JobCoordinator {
   private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
     JobConfig jobConfig = new JobConfig(config);
     String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
-    SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
+    SystemStreamPartitionGrouper grouper = Util.getObj(factoryString, SystemStreamPartitionGrouperFactory.class)
+        .getSystemStreamPartitionGrouper(jobConfig);
     return grouper;
   }
 
@@ -479,7 +479,7 @@ public class AzureJobCoordinator implements JobCoordinator {
       return appConfig.getProcessorId();
     } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
       ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
       return idGenerator.generateProcessorId(config);
     } else {
       throw new ConfigException(String
index 1b70857..521a43b 100644 (file)
@@ -235,7 +235,7 @@ public abstract class AbstractContainerAllocator implements Runnable {
    */
   private CommandBuilder getCommandBuilder(String samzaContainerId) {
     String cmdBuilderClassName = taskConfig.getCommandClass(ShellCommandBuilder.class.getName());
-    CommandBuilder cmdBuilder = (CommandBuilder) Util.getObj(cmdBuilderClassName);
+    CommandBuilder cmdBuilder = Util.getObj(cmdBuilderClassName, CommandBuilder.class);
 
     cmdBuilder.setConfig(config).setId(samzaContainerId).setUrl(state.jobModelManager.server().getUrl());
     return cmdBuilder;
index 474ac8c..cbcc192 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.ContainerProcessManagerMetrics;
 import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.HashMap;
@@ -460,15 +460,9 @@ public class ContainerProcessManager implements ClusterResourceManager.Callback
     final ResourceManagerFactory factory;
 
     try {
-      factory = ClassLoaderHelper.<ResourceManagerFactory>fromClassName(containerManagerFactoryClass);
-    } catch (InstantiationException e) {
-      log.error("Instantiation exception when creating ContainerManager", e);
-      throw new SamzaException(e);
-    } catch (IllegalAccessException e) {
-      log.error("Illegal access exception when creating ContainerManager", e);
-      throw new SamzaException(e);
-    } catch (ClassNotFoundException e) {
-      log.error("ClassNotFound Exception when creating ContainerManager", e);
+      factory = Util.getObj(containerManagerFactoryClass, ResourceManagerFactory.class);
+    } catch (Exception e) {
+      log.error("Exception when creating ContainerManager", e);
       throw new SamzaException(e);
     }
     return factory;
index 1e51c46..96c2c4d 100644 (file)
@@ -107,7 +107,7 @@ public class JavaSystemConfig extends MapConfig {
           throw new SamzaException(
               String.format("A stream uses system %s, which is missing from the configuration.", systemName));
         }
-        return Util.getObj(systemFactoryClassName);
+        return Util.getObj(systemFactoryClassName, SystemFactory.class);
       }));
 
     return systemFactories;
index 76a42a9..2322727 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.config;
 import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.apache.samza.zk.ZkCoordinationUtilsFactory;
 
 public class JobCoordinatorConfig extends MapConfig {
@@ -55,7 +55,7 @@ public class JobCoordinatorConfig extends MapConfig {
     // load the class
     String coordinationUtilsFactoryClass = getJobCoordinationUtilsFactoryClassName();
 
-    return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
+    return Util.getObj(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
   }
 
   public String getJobCoordinatorFactoryClassName() {
index 91cb9ef..29dd3ef 100644 (file)
@@ -77,7 +77,7 @@ public class TaskConfigJava extends MapConfig {
     String checkpointManagerFactoryName = getCheckpointManagerFactoryName();
     if (StringUtils.isNotBlank(checkpointManagerFactoryName)) {
       CheckpointManager checkpointManager =
-          Util.<CheckpointManagerFactory>getObj(checkpointManagerFactoryName).getCheckpointManager(this, metricsRegistry);
+          Util.getObj(checkpointManagerFactoryName, CheckpointManagerFactory.class).getCheckpointManager(this, metricsRegistry);
       return checkpointManager;
     }
     return null;
index 7273d54..e6208ba 100644 (file)
@@ -25,7 +25,7 @@ import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.stream.Collectors;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -86,7 +86,7 @@ public class ContainerHeartbeatClient {
     BufferedReader br = null;
     for (int currentTry = 0; currentTry < NUM_RETRIES; currentTry++) {
       try {
-        conn = Util.getHttpConnection(url, TIMEOUT_MS);
+        conn = HttpUtil.getHttpConnection(url, TIMEOUT_MS);
         br = new BufferedReader(new InputStreamReader(conn.getInputStream()));
         if (conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
           throw new IOException(String.format("HTTP error fetching url %s. Returned status code %d", url.toString(),
index c9ec6b5..6be9e0b 100644 (file)
@@ -30,8 +30,7 @@ import scala.collection.JavaConverters;
 import scala.runtime.AbstractFunction1;
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.samza.util.Util.asScalaClock;
-
+import static org.apache.samza.util.ScalaJavaUtil.toScalaFunction;
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -78,7 +77,7 @@ public class RunLoopFactory {
         maxThrottlingDelayMs,
         taskWindowMs,
         taskCommitMs,
-        asScalaClock(() -> System.nanoTime()));
+        toScalaFunction(() -> clock.nanoTime()));
     } else {
       Integer taskMaxConcurrency = config.getMaxConcurrency();
 
index 0cf79ee..38255a2 100644 (file)
@@ -45,7 +45,7 @@ import org.apache.samza.system.SystemStreamMetadata;
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.SystemStreamPartitionIterator;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,8 +68,8 @@ public class CoordinatorStreamSystemConsumer {
   private volatile Set<CoordinatorStreamMessage> bootstrappedStreamSet = Collections.emptySet();
 
   public CoordinatorStreamSystemConsumer(Config config, MetricsRegistry registry) {
-    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
-    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
     SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
     SystemConsumer systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem(), config, registry);
 
index 701b229..555620c 100644 (file)
@@ -36,7 +36,7 @@ import org.apache.samza.system.OutgoingMessageEnvelope;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,8 +56,8 @@ public class CoordinatorStreamSystemProducer {
 
 
   public CoordinatorStreamSystemProducer(Config config, MetricsRegistry registry) {
-    SystemStream coordinatorSystemStream = Util.getCoordinatorSystemStream(config);
-    SystemFactory systemFactory = Util.getCoordinatorSystemFactory(config);
+    SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
     SystemAdmin systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem(), config);
     SystemProducer systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem(), config, registry);
     this.systemStream = coordinatorSystemStream;
index bc85d00..8abd463 100644 (file)
@@ -46,7 +46,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.OutputStreamImpl;
 import org.apache.samza.operators.spec.StatefulOperatorSpec;
 import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.util.MathUtils;
+import org.apache.samza.util.MathUtil;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.system.StreamSpec;
@@ -187,7 +187,8 @@ public class JobNode {
         // Note: no need to generate config for Serde's, as they are already produced by addSerdeConfigs()
 
         // Generate additional configuration
-        TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName());
+        TableProviderFactory tableProviderFactory =
+            Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
         TableProvider tableProvider = tableProviderFactory.getTableProvider(tableSpec);
         configs.putAll(tableProvider.generateConfig(configs));
       });
@@ -343,7 +344,7 @@ public class JobNode {
     }
 
     // Compute the gcd of the resultant list
-    return MathUtils.gcd(candidateTimerIntervals);
+    return MathUtil.gcd(candidateTimerIntervals);
   }
 
   /**
index 0a6eb83..3c1818f 100644 (file)
@@ -42,7 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConversions;
 
-import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+import static org.apache.samza.util.ScalaJavaUtil.defaultValue;
 
 public class StreamManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(StreamManager.class);
@@ -112,17 +112,20 @@ public class StreamManager {
 
       //Find checkpoint stream and clean up
       TaskConfig taskConfig = new TaskConfig(prevConfig);
-      String checkpointManagerFactoryClass = taskConfig.getCheckpointManagerFactory().getOrElse(defaultValue(null));
-      if (checkpointManagerFactoryClass != null) {
-        CheckpointManager checkpointManager = ((CheckpointManagerFactory) Util.getObj(checkpointManagerFactoryClass))
-            .getCheckpointManager(prevConfig, new MetricsRegistryMap());
+      String checkpointManagerFactoryClassName = taskConfig.getCheckpointManagerFactory()
+          .getOrElse(defaultValue(null));
+      if (checkpointManagerFactoryClassName != null) {
+        CheckpointManager checkpointManager =
+            Util.getObj(checkpointManagerFactoryClassName, CheckpointManagerFactory.class)
+                .getCheckpointManager(prevConfig, new MetricsRegistryMap());
         checkpointManager.clearCheckpoints();
       }
 
       //Find changelog streams and remove them
       StorageConfig storageConfig = new StorageConfig(prevConfig);
       for (String store : JavaConversions.asJavaCollection(storageConfig.getStoreNames())) {
-        String changelog = storageConfig.getChangelogStream(store).getOrElse(defaultValue(null));
+        String changelog = storageConfig.getChangelogStream(store)
+            .getOrElse(defaultValue(null));
         if (changelog != null) {
           LOGGER.info("Clear store {} changelog {}", store, changelog);
           SystemStream systemStream = Util.getSystemStreamFromNames(changelog);
index 1c8e592..73d10ff 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.operators.triggers.AnyTrigger;
 import org.apache.samza.operators.triggers.RepeatingTrigger;
 import org.apache.samza.operators.triggers.TimeBasedTrigger;
 import org.apache.samza.operators.triggers.Trigger;
-import org.apache.samza.operators.util.MathUtils;
+import org.apache.samza.util.MathUtil;
 import org.apache.samza.operators.windows.WindowPane;
 import org.apache.samza.operators.windows.internal.WindowInternal;
 import org.apache.samza.serializers.Serde;
@@ -95,7 +95,7 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
         .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
         .collect(Collectors.toList());
 
-    return MathUtils.gcd(candidateDurations);
+    return MathUtil.gcd(candidateDurations);
   }
 
   private List<TimeBasedTrigger> getTimeBasedTriggers(Trigger rootTrigger) {
index 8dacc6c..40deb1b 100644 (file)
@@ -41,6 +41,7 @@ import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
+import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,11 +115,8 @@ public class StreamProcessor {
 
   /* package private */
   JobCoordinator getJobCoordinator() {
-    return Util.
-        <JobCoordinatorFactory>getObj(
-            new JobCoordinatorConfig(config)
-                .getJobCoordinatorFactoryClassName())
-        .getJobCoordinator(config);
+    String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
+    return Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class).getJobCoordinator(config);
   }
 
   @VisibleForTesting
@@ -201,7 +199,7 @@ public class StreamProcessor {
         processorId,
         jobModel,
         config,
-        Util.javaMapAsScalaMap(customMetricsReporter),
+        ScalaJavaUtil.toScalaMap(customMetricsReporter),
         taskFactory);
   }
 
index 79bd568..5831910 100644 (file)
@@ -35,10 +35,8 @@ import org.apache.samza.container.SamzaContainerExceptionHandler;
 import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.TaskFactoryUtil;
-import org.apache.samza.util.ScalaToJavaUtils;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ScalaJavaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +77,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
         containerId,
         jobModel,
         config,
-        Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
+        ScalaJavaUtil.toScalaMap(new HashMap<>()),
         taskFactory);
     container.setContainerListener(
         new SamzaContainerListener() {
@@ -140,7 +138,7 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
       throw new SamzaException("can not find the job name");
     }
     String jobName = jobConfig.getName().get();
-    String jobId = jobConfig.getJobId().getOrElse(ScalaToJavaUtils.defaultValue("1"));
+    String jobId = jobConfig.getJobId().getOrElse(ScalaJavaUtil.defaultValue("1"));
     MDC.put("containerName", "samza-container-" + containerId);
     MDC.put("jobName", jobName);
     MDC.put("jobId", jobId);
index f8c4d43..53b59b2 100644 (file)
@@ -27,8 +27,8 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.util.Util;
 import org.codehaus.jackson.JsonGenerator;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParser;
@@ -166,16 +166,23 @@ public class SamzaObjectMapper {
 
   public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
     @Override
-    public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider) throws IOException {
-      String ssp = Util.sspToString(systemStreamPartition);
-      jgen.writeFieldName(ssp);
+    public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
+      String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId());
+      jgen.writeFieldName(sspString);
     }
   }
 
   public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
     @Override
     public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
-      return Util.stringToSsp(sspString);
+      int idx = sspString.indexOf('.');
+      int lastIdx = sspString.lastIndexOf('.');
+      if (idx < 0 || lastIdx < 0) {
+        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
+      }
+      return new SystemStreamPartition(
+          new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)),
+          new Partition(Integer.parseInt(sspString.substring(lastIdx + 1))));
     }
   }
 
index d58b251..01ee84e 100644 (file)
@@ -141,7 +141,7 @@ public class PassthroughJobCoordinator implements JobCoordinator {
       return appConfig.getProcessorId();
     } else if (appConfig.getAppProcessorIdGeneratorClass() != null) {
       ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
       return idGenerator.generateProcessorId(config);
     } else {
       throw new ConfigException(String
index 3af654e..db6f0d9 100644 (file)
@@ -47,6 +47,7 @@ import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.util.CommandLine;
+import org.apache.samza.util.ScalaJavaUtil;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
@@ -151,7 +152,7 @@ public class StorageRecovery extends CommandLine {
 
       String factoryClass = config.getStorageFactoryClassName(storeName);
       if (factoryClass != null) {
-        storageEngineFactories.put(storeName, Util.<StorageEngineFactory<Object, Object>>getObj(factoryClass));
+        storageEngineFactories.put(storeName, Util.getObj(factoryClass, StorageEngineFactory.class));
       } else {
         throw new SamzaException("Missing storage factory for " + storeName + ".");
       }
@@ -229,9 +230,9 @@ public class StorageRecovery extends CommandLine {
         }
         TaskStorageManager taskStorageManager = new TaskStorageManager(
             taskModel.getTaskName(),
-            Util.javaMapAsScalaMap(taskStores),
-            Util.javaMapAsScalaMap(storeConsumers),
-            Util.javaMapAsScalaMap(changeLogSystemStreams),
+            ScalaJavaUtil.toScalaMap(taskStores),
+            ScalaJavaUtil.toScalaMap(storeConsumers),
+            ScalaJavaUtil.toScalaMap(changeLogSystemStreams),
             maxPartitionNumber,
             streamMetadataCache,
             storeBaseDir,
index bada304..186b4a8 100644 (file)
@@ -113,7 +113,8 @@ public class TableManager {
       throw new SamzaException("Table " + tableSpec.getId() + " already exists");
     }
     TableCtx ctx = new TableCtx();
-    TableProviderFactory tableProviderFactory = Util.getObj(tableSpec.getTableProviderFactoryClassName());
+    TableProviderFactory tableProviderFactory =
+        Util.getObj(tableSpec.getTableProviderFactoryClassName(), TableProviderFactory.class);
     ctx.tableProvider = tableProviderFactory.getTableProvider(tableSpec);
     ctx.tableSpec = tableSpec;
     tables.put(tableSpec.getId(), ctx);
index 1fe9187..2a894ae 100644 (file)
@@ -30,9 +30,8 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ExecutorService;
 
-import scala.runtime.AbstractFunction0;
-
-import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+import static org.apache.samza.util.ScalaJavaUtil.toScalaFunction;
+import static org.apache.samza.util.ScalaJavaUtil.defaultValue;
 
 /**
  * This class provides utility functions to load task factory classes based on config, and to wrap {@link StreamTaskFactory} in {@link AsyncStreamTaskFactory}
@@ -64,13 +63,10 @@ public class TaskFactoryUtil {
    */
   private static Object fromTaskClassConfig(Config config) {
     // if there is configuration to set the job w/ a specific type of task, instantiate the corresponding task factory
-    String taskClassName = new TaskConfig(config).getTaskClass().getOrElse(
-      new AbstractFunction0<String>() {
-        @Override
-        public String apply() {
-          throw new ConfigException("There is no task class defined in the configuration. Failed to create a valid TaskFactory");
-        }
-      });
+    String taskClassName = new TaskConfig(config).getTaskClass().getOrElse(toScalaFunction(
+      () -> {
+        throw new ConfigException("No task class defined in the configuration.");
+      }));
 
     log.info("Got task class name: {}", taskClassName);
 
diff --git a/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java b/samza-core/src/main/java/org/apache/samza/util/ClassLoaderHelper.java
deleted file mode 100644 (file)
index 3680b4f..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import org.apache.samza.config.ConfigException;
-
-import java.lang.reflect.Constructor;
-
-public class ClassLoaderHelper {
-
-  public static <T> T fromClassName(String className) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
-    Class<T> clazz = (Class<T>) Class.forName(className);
-    T instance = clazz.newInstance();
-    return instance;
-  }
-
-  public static <T> T fromClassName(String className, Class<T> classType) {
-    try {
-      Class<?> idGeneratorClass = Class.forName(className);
-      if (!classType.isAssignableFrom(idGeneratorClass)) {
-        throw new ConfigException(String.format(
-            "Class %s is not of type %s", className, classType));
-      }
-      Constructor<?> constructor = idGeneratorClass.getConstructor();
-      return (T) constructor.newInstance();
-    } catch (Exception e) {
-      throw new ConfigException(String.format(
-          "Problem in loading %s class %s", classType, className), e);
-    }
-  }
-}
\ No newline at end of file
  * under the License.
  */
 
-package org.apache.samza.operators.util;
+package org.apache.samza.util;
 
 import java.util.List;
 
-public class MathUtils {
+public class MathUtil {
 
   public static long gcd(long a, long b) {
     // use the euclid gcd algorithm
@@ -47,4 +47,31 @@ public class MathUtils {
     }
     return result;
   }
+
+  /**
+   * Add the supplied arguments and handle overflow by clamping the resulting sum to
+   * {@code Long.MinValue} if the sum would have been less than {@code Long.MinValue} or
+   * {@code Long.MaxValue} if the sum would have been greater than {@code Long.MaxValue}.
+   *
+   * @param lhs left hand side of sum
+   * @param rhs right hand side of sum
+   * @return the sum if no overflow occurs, or the clamped extreme if it does.
+   */
+  public static long clampAdd(long lhs, long rhs) {
+    long sum = lhs + rhs;
+
+    // From "Hacker's Delight", overflow occurs IFF both operands have the same sign and the
+    // sign of the sum differs from the operands. Here we're doing a basic bitwise check that
+    // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will have the high-order
+    // bit set to true IFF the signs are different.
+    if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) {
+      if (lhs >= 0) {
+        return Long.MAX_VALUE;
+      } else {
+        return Long.MIN_VALUE;
+      }
+    }
+
+    return sum;
+  }
 }
index 03b218e..e7eeb5e 100644 (file)
@@ -38,11 +38,11 @@ public class MetricsReporterLoader {
     Map<String, MetricsReporter> metricsReporters = new HashMap<>();
 
     for (String metricsReporterName : JavaConverters.seqAsJavaListConverter(config.getMetricReporterNames()).asJava()) {
-      String metricsFactoryClass = config.getMetricsFactoryClass(metricsReporterName).get();
-      if (metricsFactoryClass == null) {
+      String metricsFactoryClassName = config.getMetricsFactoryClass(metricsReporterName).get();
+      if (metricsFactoryClassName == null) {
         throw new SamzaException(String.format("Metrics reporter %s missing .class config", metricsReporterName));
       }
-      MetricsReporterFactory metricsReporterFactory = Util.getObj(metricsFactoryClass);
+      MetricsReporterFactory metricsReporterFactory = Util.getObj(metricsFactoryClassName, MetricsReporterFactory.class);
       metricsReporters.put(metricsReporterName,
                            metricsReporterFactory.getMetricsReporter(metricsReporterName,
                                                                      containerName,
diff --git a/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java b/samza-core/src/main/java/org/apache/samza/util/ScalaToJavaUtils.java
deleted file mode 100644 (file)
index 6c7fc2d..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.util;
-
-import scala.runtime.AbstractFunction0;
-
-/**
- * Common utils methods that helps to convert or use Scala objects in Java code
- */
-public class ScalaToJavaUtils {
-  /**
-   * Returns a default value object for scala option.getOrDefault() to use
-   * @param value default value
-   * @param <T> value type
-   * @return object containing default value
-   */
-  public static <T> AbstractFunction0<T> defaultValue(final T value) {
-    return new AbstractFunction0<T>() {
-      @Override
-      public T apply() {
-        return value;
-      }
-    };
-  }
-}
index eb956f2..fe8dba0 100644 (file)
@@ -71,7 +71,7 @@ public class ThrottlingExecutor implements Throttleable, Executor {
       // the sleep operation (if applicable), so they do not continue to grow. We also clamp the
       // maximum sleep time to prevent excessively large sleeps between executions.
       pendingNanos = Math.min(maxDelayNanos,
-          Util.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
+          MathUtil.clampAdd(pendingNanos, (long) (workNanos * currentWorkToIdleFactor)));
       if (pendingNanos > 0) {
         try {
           pendingNanos = sleep(pendingNanos);
@@ -139,6 +139,6 @@ public class ThrottlingExecutor implements Throttleable, Executor {
     final long start = System.nanoTime();
     TimeUnit.NANOSECONDS.sleep(nanos);
 
-    return Util.clampAdd(nanos, -(System.nanoTime() - start));
+    return MathUtil.clampAdd(nanos, -(System.nanoTime() - start));
   }
 }
index 5b5780b..265d46d 100644 (file)
@@ -97,7 +97,7 @@ public class ThrottlingScheduler implements Throttleable {
     long newValue;
     do {
       currentValue = pendingNanos.get();
-      newValue = Util.clampAdd(currentValue, amount);
+      newValue = MathUtil.clampAdd(currentValue, amount);
     } while (!pendingNanos.compareAndSet(currentValue, newValue));
   }
 
index 0e0f815..2b4bc8b 100644 (file)
@@ -51,9 +51,9 @@ import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.util.MetricsReporterLoader;
 import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -311,7 +311,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
       return appConfig.getProcessorId();
     } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
       ProcessorIdGenerator idGenerator =
-          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+          Util.getObj(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
       return idGenerator.generateProcessorId(config);
     } else {
       throw new ConfigException(String
index e517946..0ca8a3d 100644 (file)
@@ -124,7 +124,8 @@ object CheckpointTool {
   def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
     val manager = config.getCheckpointManagerFactory match {
       case Some(className) =>
-        Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
+        Util.getObj(className, classOf[CheckpointManagerFactory])
+          .getCheckpointManager(config, new MetricsRegistryMap)
       case _ =>
         throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
     }
@@ -133,10 +134,10 @@ object CheckpointTool {
 
   def rewriteConfig(config: JobConfig): Config = {
     def rewrite(c: JobConfig, rewriterName: String): Config = {
-      val klass = config
+      val rewriterClassName = config
               .getConfigRewriterClass(rewriterName)
               .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
+      val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
       info("Re-writing config for CheckpointTool with " + rewriter)
       rewriter.rewrite(rewriterName, c)
     }
index 764f77a..240a296 100644 (file)
  */
 
 package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.ByteBufferSerdeFactory
+import org.apache.samza.serializers.ByteSerdeFactory
+import org.apache.samza.serializers.DoubleSerdeFactory
+import org.apache.samza.serializers.IntegerSerdeFactory
+import org.apache.samza.serializers.JsonSerdeFactory
+import org.apache.samza.serializers.LongSerdeFactory
+import org.apache.samza.serializers.SerializableSerdeFactory
+import org.apache.samza.serializers.StringSerdeFactory
+import org.apache.samza.util.Util.info
+
 import scala.collection.JavaConverters._
 
 object SerializerConfig {
@@ -28,6 +40,26 @@ object SerializerConfig {
   val SERDE_SERIALIZED_INSTANCE = SERIALIZER_PREFIX + SERIALIZED_INSTANCE_SUFFIX
 
   implicit def Config2Serializer(config: Config) = new SerializerConfig(config)
+
+  /**
+    * Returns the pre-defined serde factory class name for the provided serde name. If no pre-defined factory exists,
+    * throws an exception.
+    */
+  def getSerdeFactoryName(serdeName: String) = {
+    val serdeFactoryName = serdeName match {
+      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
+      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
+      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
+      case "json" => classOf[JsonSerdeFactory].getCanonicalName
+      case "long" => classOf[LongSerdeFactory].getCanonicalName
+      case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
+      case "string" => classOf[StringSerdeFactory].getCanonicalName
+      case "double" => classOf[DoubleSerdeFactory].getCanonicalName
+      case _ => throw new SamzaException("No pre-defined factory class name for serde name %s" format serdeName)
+    }
+    info("Using default serde %s for serde name %s" format (serdeFactoryName, serdeName))
+    serdeFactoryName
+  }
 }
 
 class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
index 8fd5729..a738616 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.container
 import org.apache.samza.task.CoordinatorRequests
 import org.apache.samza.system.{IncomingMessageEnvelope, SystemConsumers, SystemStreamPartition}
 import org.apache.samza.task.ReadableCoordinator
-import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, TimerUtils}
+import org.apache.samza.util.{Logging, Throttleable, ThrottlingExecutor, TimerUtil}
 
 import scala.collection.JavaConverters._
 
@@ -42,7 +42,7 @@ class RunLoop (
   val maxThrottlingDelayMs: Long,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
-  val clock: () => Long = { System.nanoTime }) extends Runnable with Throttleable with TimerUtils with Logging {
+  val clock: () => Long = { System.nanoTime }) extends Runnable with Throttleable with TimerUtil with Logging {
 
   private val metricsMsOffset = 1000000L
   private val executor = new ThrottlingExecutor(maxThrottlingDelayMs)
index aa5187b..1641511 100644 (file)
 package org.apache.samza.container
 
 import java.io.File
+import java.lang.management.ManagementFactory
 import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.util
 import java.util.Base64
-import java.util.concurrent.{ScheduledExecutorService, ExecutorService, Executors, TimeUnit}
+import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
@@ -49,7 +50,7 @@ import org.apache.samza.system._
 import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, RoundRobinChooserFactory}
 import org.apache.samza.table.TableManager
 import org.apache.samza.task._
-import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util.Util
 import org.apache.samza.util._
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
@@ -69,7 +70,7 @@ object SamzaContainer extends Logging {
     SamzaObjectMapper
       .getObjectMapper
       .readValue(
-        Util.read(
+        HttpUtil.read(
           url = new URL(url),
           retryBackoff = new ExponentialSleepStrategy(initialDelayMs = initialDelayMs)),
         classOf[JobModel])
@@ -94,7 +95,7 @@ object SamzaContainer extends Logging {
       localityManager = new LocalityManager(coordinatorStreamManager)
     }
 
-    val containerPID = Util.getContainerPID
+    val containerPID = ManagementFactory.getRuntimeMXBean().getName()
 
     info("Setting up Samza container: %s" format containerName)
 
@@ -145,7 +146,7 @@ object SamzaContainer extends Logging {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      (systemName, Util.getObj[SystemFactory](systemFactoryClassName))
+      (systemName, Util.getObj(systemFactoryClassName, classOf[SystemFactory]))
     }).toMap
     info("Got system factories: %s" format systemFactories.keys)
 
@@ -192,9 +193,9 @@ object SamzaContainer extends Logging {
     val serdesFromFactories = config.getSerdeNames.map(serdeName => {
       val serdeClassName = config
         .getSerdeClass(serdeName)
-        .getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
+        .getOrElse(SerializerConfig.getSerdeFactoryName(serdeName))
 
-      val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
+      val serde = Util.getObj(serdeClassName, classOf[SerdeFactory[Object]])
         .getSerde(serdeName, config)
 
       (serdeName, serde)
@@ -326,7 +327,7 @@ object SamzaContainer extends Logging {
 
     val chooserFactoryClassName = config.getMessageChooserClass.getOrElse(classOf[RoundRobinChooserFactory].getName)
 
-    val chooserFactory = Util.getObj[MessageChooserFactory](chooserFactoryClassName)
+    val chooserFactory = Util.getObj(chooserFactoryClassName, classOf[MessageChooserFactory])
 
     val chooser = DefaultChooser(inputStreamMetadata, chooserFactory, config, samzaContainerMetrics.registry, systemAdmins)
 
@@ -339,7 +340,7 @@ object SamzaContainer extends Logging {
     val securityManager = config.getSecurityManagerFactory match {
       case Some(securityManagerFactoryClassName) =>
         Util
-          .getObj[SecurityManagerFactory](securityManagerFactoryClassName)
+          .getObj(securityManagerFactoryClassName, classOf[SecurityManagerFactory])
           .getSecurityManager(config)
       case _ => null
     }
@@ -347,7 +348,8 @@ object SamzaContainer extends Logging {
 
     val checkpointManager = config.getCheckpointManagerFactory()
       .filterNot(_.isEmpty)
-      .map(Util.getObj[CheckpointManagerFactory](_).getCheckpointManager(config, samzaContainerMetrics.registry))
+      .map(Util.getObj(_, classOf[CheckpointManagerFactory])
+        .getCheckpointManager(config, samzaContainerMetrics.registry))
       .orNull
     info("Got checkpoint manager: %s" format checkpointManager)
 
@@ -381,7 +383,7 @@ object SamzaContainer extends Logging {
       metrics = systemConsumersMetrics,
       dropDeserializationError = dropDeserializationError,
       pollIntervalMs = pollIntervalMs,
-      clock = clock)
+      clock = () => clock.nanoTime())
 
     val producerMultiplexer = new SystemProducers(
       producers = producers,
@@ -395,7 +397,7 @@ object SamzaContainer extends Logging {
         val storageFactoryClassName = config
           .getStorageFactoryClassName(storeName)
           .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
-        (storeName, Util.getObj[StorageEngineFactory[Object, Object]](storageFactoryClassName))
+        (storeName, Util.getObj(storageFactoryClassName, classOf[StorageEngineFactory[Object, Object]]))
       }).toMap
 
     info("Got storage engines: %s" format storageEngineFactories.keys)
@@ -468,7 +470,10 @@ object SamzaContainer extends Logging {
 
       var loggedStorageBaseDir: File = null
       if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
-        val jobNameAndId = Util.getJobNameAndId(config)
+        val jobNameAndId = (
+          config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
+          config.getJobId.getOrElse("1")
+        )
         loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
           + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
       } else {
@@ -598,7 +603,7 @@ object SamzaContainer extends Logging {
 
     val diskQuotaPolicyFactoryString = config.get("container.disk.quota.policy.factory",
       classOf[NoThrottlingDiskQuotaPolicyFactory].getName)
-    val diskQuotaPolicyFactory = Util.getObj[DiskQuotaPolicyFactory](diskQuotaPolicyFactoryString)
+    val diskQuotaPolicyFactory = Util.getObj(diskQuotaPolicyFactoryString, classOf[DiskQuotaPolicyFactory])
     val diskQuotaPolicy = diskQuotaPolicyFactory.create(config)
 
     var diskSpaceMonitor: DiskSpaceMonitor = null
index a68fb6f..df37ecd 100644 (file)
@@ -131,7 +131,7 @@ object JobModelManager extends Logging {
         config.getStreamJobFactoryClass match {
           case Some(jfr(_*)) => {
             info("before match: allSystemStreamPartitions.size = %s" format (allSystemStreamPartitions.size))
-            val sspMatcher = Util.getObj[SystemStreamPartitionMatcher](s)
+            val sspMatcher = Util.getObj(s, classOf[SystemStreamPartitionMatcher])
             val matchedPartitions = sspMatcher.filter(allSystemStreamPartitions.asJava, config).asScala.toSet
             // Usually a small set hence ok to log at info level
             info("after match: matchedPartitions = %s" format (matchedPartitions))
@@ -149,7 +149,7 @@ object JobModelManager extends Logging {
    */
   private def getSystemStreamPartitionGrouper(config: Config) = {
     val factoryString = config.getSystemStreamPartitionGrouperFactory
-    val factory = Util.getObj[SystemStreamPartitionGrouperFactory](factoryString)
+    val factory = Util.getObj(factoryString, classOf[SystemStreamPartitionGrouperFactory])
     factory.getSystemStreamPartitionGrouper(config)
   }
 
@@ -200,7 +200,7 @@ object JobModelManager extends Logging {
 
     // Here is where we should put in a pluggable option for the
     // SSPTaskNameGrouper for locality, load-balancing, etc.
-    val containerGrouperFactory = Util.getObj[TaskNameGrouperFactory](config.getTaskNameGrouperFactory)
+    val containerGrouperFactory = Util.getObj(config.getTaskNameGrouperFactory, classOf[TaskNameGrouperFactory])
     val containerGrouper = containerGrouperFactory.build(config)
     val containerModels = {
       containerGrouper match {
index 7a250b2..917c018 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
 import org.apache.samza.runtime.ApplicationRunnerOperation
 import org.apache.samza.system.StreamSpec
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.util.{CoordinatorStreamUtil, Logging, Util}
 
 import scala.collection.JavaConverters._
 
@@ -83,8 +83,8 @@ class JobRunner(config: Config) extends Logging {
 
     // Create the coordinator stream if it doesn't exist
     info("Creating coordinator stream")
-    val coordinatorSystemStream = Util.getCoordinatorSystemStream(config)
-    val systemFactory = Util.getCoordinatorSystemFactory(config)
+    val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+    val systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config)
     val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
     val streamName = coordinatorSystemStream.getStream
     val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
index 1401f82..642a484 100644 (file)
@@ -79,7 +79,7 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
         case Some(cmdBuilderClassName) => {
           // A command class was specified, so we need to use a process job to
           // execute the command in its own process.
-          Util.getObj[CommandBuilder](cmdBuilderClassName)
+          Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
         }
         case _ => {
           info("Defaulting to ShellCommandBuilder")
index 052a9d3..33802a1 100644 (file)
@@ -78,7 +78,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
       .getSystemFactory(systemName)
       .getOrElse(throw new SamzaException("Trying to fetch system factory for system %s, which isn't defined in config." format systemName))
 
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     info("Got system factory %s." format systemFactory)
 
@@ -94,9 +94,7 @@ class MetricsSnapshotReporterFactory extends MetricsReporterFactory with Logging
     val serde = if (serdeName != null) {
       config.getSerdeClass(serdeName) match {
         case Some(serdeClassName) =>
-          Util
-            .getObj[SerdeFactory[MetricsSnapshot]](serdeClassName)
-            .getSerde(serdeName, config)
+          Util.getObj(serdeClassName, classOf[SerdeFactory[MetricsSnapshot]]).getSerde(serdeName, config)
         case _ => null
       }
     } else {
index 76594ae..60c325d 100644 (file)
@@ -25,8 +25,6 @@ import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.config.StorageConfig
-import org.apache.samza.system.WatermarkMessage
-
 
 class SerdeManager(
   serdes: Map[String, Serde[Object]] = Map(),
index 476e215..00dc20f 100644 (file)
@@ -26,9 +26,7 @@ import org.apache.samza.config.StorageConfig
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
 import org.apache.samza.system._
-import org.apache.samza.util.Logging
-import org.apache.samza.util.Util
-import org.apache.samza.util.Clock
+import org.apache.samza.util.{Clock, FileUtil, Logging, Util}
 
 import scala.collection.JavaConverters._
 
@@ -91,7 +89,7 @@ class TaskStorageManager(
 
       if(storePartitionDir.exists()) {
         info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString)
-        Util.rm(storePartitionDir)
+        FileUtil.rm(storePartitionDir)
       }
 
       val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
@@ -100,7 +98,7 @@ class TaskStorageManager(
       // Delete the logged store if it is not valid.
       if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
         info("Deleting logged storage partition directory %s." format loggedStorePartitionDir.toPath.toString)
-        Util.rm(loggedStorePartitionDir)
+        FileUtil.rm(loggedStorePartitionDir)
       } else {
         val offset = readOffsetFile(loggedStorePartitionDir)
         info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStorePartitionDir))
@@ -199,7 +197,7 @@ class TaskStorageManager(
     val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
     if (offsetFileRef.exists()) {
       info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
-      offset = Util.readDataFromFile(offsetFileRef)
+      offset = FileUtil.readWithChecksum(offsetFileRef)
     } else {
       info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
     }
@@ -354,12 +352,12 @@ class TaskStorageManager(
         val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
         if (newestOffset != null) {
           debug("Storing offset for store in OFFSET file ")
-          Util.writeDataToFile(offsetFile, newestOffset)
+          FileUtil.writeWithChecksum(offsetFile, newestOffset)
           debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName))
         } else {
           //if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file
           if (offsetFile.exists()) {
-            Util.rm(offsetFile)
+            FileUtil.rm(offsetFile)
           }
           debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
         }
index 49ab52a..c4fc095 100644 (file)
@@ -24,7 +24,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 import scala.collection.JavaConverters._
 import org.apache.samza.serializers.SerdeManager
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.util.{Logging, TimerUtil}
 import org.apache.samza.system.chooser.MessageChooser
 import org.apache.samza.SamzaException
 import java.util.ArrayDeque
@@ -106,7 +106,7 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtils {
+  val clock: () => Long = () => System.nanoTime()) extends Logging with TimerUtil {
 
   /**
    * A buffer of incoming messages grouped by SystemStreamPartition. These
index b0e0f3f..14dd1c9 100644 (file)
@@ -61,9 +61,9 @@ class CommandLine {
     }
 
     // Set up the job parameters.
-    val configFactoryClass = options.valueOf(configFactoryOpt)
+    val configFactoryClassName = options.valueOf(configFactoryOpt)
     val configPaths = options.valuesOf(configPathOpt)
-    configFactory = ClassLoaderHelper.fromClassName[ConfigFactory](configFactoryClass)
+    configFactory = Util.getObj(configFactoryClassName, classOf[ConfigFactory])
     val configOverrides = options.valuesOf(configOverrideOpt).asScala.map(kv => (kv.key, kv.value)).toMap
 
     val configs: Buffer[java.util.Map[String, String]] = configPaths.asScala.map(configFactory.getConfig)
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
new file mode 100644 (file)
index 0000000..cd74716
--- /dev/null
@@ -0,0 +1,94 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.{Config, ConfigException, JobConfig, MapConfig, SystemConfig}
+import org.apache.samza.system.{SystemFactory, SystemStream}
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+
+import scala.collection.immutable.Map
+import scala.collection.JavaConverters._
+
+object CoordinatorStreamUtil {
+  /**
+    * Given a job's full config object, build a subset config which includes
+    * only the job name, job id, and system config for the coordinator stream.
+    */
+  def buildCoordinatorStreamConfig(config: Config) = {
+    val (jobName, jobId) = getJobNameAndId(config)
+    // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
+    val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
+      Map[String, String](
+        JobConfig.JOB_NAME -> jobName,
+        JobConfig.JOB_ID -> jobId,
+        JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
+        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
+    new MapConfig(map.asJava)
+  }
+
+  /**
+    * Get the coordinator system stream from the configuration
+    * @param config
+    * @return
+    */
+  def getCoordinatorSystemStream(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
+    val (jobName, jobId) = getJobNameAndId(config)
+    val streamName = getCoordinatorStreamName(jobName, jobId)
+    new SystemStream(systemName, streamName)
+  }
+
+  /**
+    * Get the coordinator system factory from the configuration
+    * @param config
+    * @return
+    */
+  def getCoordinatorSystemFactory(config: Config) = {
+    val systemName = config.getCoordinatorSystemName
+    val systemFactoryClassName = config
+      .getSystemFactory(systemName)
+      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+    Util.getObj(systemFactoryClassName, classOf[SystemFactory])
+  }
+
+  /**
+    * Generates a coordinator stream name based on the job name and job id
+    * for the job. The format of the stream name will be:
+    * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
+    */
+  def getCoordinatorStreamName(jobName: String, jobId: String) = {
+    "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+  }
+
+  /**
+    * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
+    * defined in the config, and job name must be defined in config.
+    *
+    * @return A tuple of (jobName, jobId)
+    */
+  private def getJobNameAndId(config: Config) = {
+    (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
+      config.getJobId.getOrElse("1"))
+  }
+}
index da55371..31c48d5 100644 (file)
@@ -23,6 +23,9 @@ package org.apache.samza.util
 
 import java.nio.channels.ClosedByInterruptException
 import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
+import org.apache.samza.util.ExponentialSleepStrategy.DefaultBackOffMultiplier
+import org.apache.samza.util.ExponentialSleepStrategy.DefaultInitialDelayMs
+import org.apache.samza.util.ExponentialSleepStrategy.DefaultMaximumDelayMs
 
 /**
  * Encapsulates the pattern of retrying an operation until it succeeds.
@@ -35,14 +38,18 @@ import org.apache.samza.util.ExponentialSleepStrategy.RetryLoop
  * @param maximumDelayMs Cap up to which we will increase the delay.
  */
 class ExponentialSleepStrategy(
-    backOffMultiplier: Double = 2.0,
-    initialDelayMs: Long = 100,
-    maximumDelayMs: Long = 10000) {
+    backOffMultiplier: Double = DefaultBackOffMultiplier,
+    initialDelayMs: Long = DefaultInitialDelayMs,
+    maximumDelayMs: Long = DefaultMaximumDelayMs) {
 
   require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
   require(initialDelayMs > 0, "initialDelayMs must be positive")
   require(maximumDelayMs >= initialDelayMs, "maximumDelayMs must be >= initialDelayMs")
 
+  def this() {
+    this(DefaultBackOffMultiplier, DefaultInitialDelayMs, DefaultMaximumDelayMs)
+  }
+
   /**
    * Given the delay before the last retry, calculate what the delay before the
    * next retry should be.
@@ -118,6 +125,10 @@ class ExponentialSleepStrategy(
 }
 
 object ExponentialSleepStrategy {
+  val DefaultBackOffMultiplier = 2.0
+  val DefaultInitialDelayMs = 100
+  val DefaultMaximumDelayMs = 10000
+
   /**
    * State of the retry loop, passed to every invocation of the loopOperation
    * or the exception handler.
diff --git a/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
new file mode 100644 (file)
index 0000000..4b93543
--- /dev/null
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.util.zip.CRC32
+
+import org.apache.samza.util.Util.info
+
+object FileUtil {
+  /**
+    * Writes checksum & data to a file
+    * Checksum is pre-fixed to the data and is a 32-bit long type data.
+    * @param file The file handle to write to
+    * @param data The data to be written to the file
+    * */
+  def writeWithChecksum(file: File, data: String) = {
+    val checksum = getChecksum(data)
+    var oos: ObjectOutputStream = null
+    var fos: FileOutputStream = null
+    try {
+      fos = new FileOutputStream(file)
+      oos = new ObjectOutputStream(fos)
+      oos.writeLong(checksum)
+      oos.writeUTF(data)
+    } finally {
+      oos.close()
+      fos.close()
+    }
+  }
+
+  /**
+    * Reads from a file that has a checksum prepended to the data
+    * @param file The file handle to read from
+    * */
+  def readWithChecksum(file: File) = {
+    var fis: FileInputStream = null
+    var ois: ObjectInputStream = null
+    try {
+      fis = new FileInputStream(file)
+      ois = new ObjectInputStream(fis)
+      val checksumFromFile = ois.readLong()
+      val data = ois.readUTF()
+      if(checksumFromFile == getChecksum(data)) {
+        data
+      } else {
+        info("Checksum match failed. Data in file is corrupted. Skipping content.")
+        null
+      }
+    } finally {
+      ois.close()
+      fis.close()
+    }
+  }
+
+  /**
+    * Recursively remove a directory (or file), and all sub-directories. Equivalent
+    * to rm -rf.
+    */
+  def rm(file: File) {
+    if (file == null) {
+      return
+    } else if (file.isDirectory) {
+      val files = file.listFiles()
+      if (files != null) {
+        for (f <- files)
+          rm(f)
+      }
+      file.delete()
+    } else {
+      file.delete()
+    }
+  }
+
+  /**
+    * Generates the CRC32 checksum code for any given data
+    * @param data The string for which checksum has to be generated
+    * @return long type value representing the checksum
+    * */
+  def getChecksum(data: String) = {
+    val crc = new CRC32
+    crc.update(data.getBytes)
+    crc.getValue
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/HttpUtil.scala
new file mode 100644 (file)
index 0000000..ea5eb5a
--- /dev/null
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{BufferedReader, IOException, InputStream, InputStreamReader}
+import java.net.{HttpURLConnection, URL}
+
+import org.apache.samza.SamzaException
+import org.apache.samza.util.Util.{error, warn}
+
+object HttpUtil {
+
+  /**
+    * Reads a URL and returns the response body as a string. Retries in an exponential backoff, but does no other error handling.
+    *
+    * @param url HTTP URL to read from.
+    * @param timeout how long to wait before timing out when connecting to or reading from the HTTP server.
+    * @param retryBackoff instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
+    * @return string payload of the body of the HTTP response.
+    */
+  def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
+    var httpConn = getHttpConnection(url, timeout)
+    retryBackoff.run(loop => {
+      if(httpConn.getResponseCode != 200)
+      {
+        warn("Error: " + httpConn.getResponseCode)
+        val errorContent = readStream(httpConn.getErrorStream)
+        warn("Error reading stream, failed with response %s" format errorContent)
+        httpConn = getHttpConnection(url, timeout)
+      }
+      else
+      {
+        loop.done
+      }
+    },
+      (exception, loop) => {
+        exception match {
+          case ioe: IOException => {
+            warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
+            httpConn = getHttpConnection(url, timeout)
+          }
+          case e: Exception =>
+            loop.done
+            error("Unable to connect to Job coordinator server, received exception", e)
+            throw e
+        }
+      })
+
+    if(httpConn.getResponseCode != 200) {
+      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+    }
+    readStream(httpConn.getInputStream)
+  }
+
+  def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+    val conn = url.openConnection()
+    conn.setConnectTimeout(timeout)
+    conn.setReadTimeout(timeout)
+    conn.asInstanceOf[HttpURLConnection]
+  }
+
+  private def readStream(stream: InputStream): String = {
+    val br = new BufferedReader(new InputStreamReader(stream))
+    var line: String = null
+    val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
+    br.close
+    stream.close
+    body
+  }
+}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala b/samza-core/src/main/scala/org/apache/samza/util/LexicographicComparator.scala
deleted file mode 100644 (file)
index 93c5220..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.util;
-
-import java.util.Comparator
-
-/**
- * A comparator that applies a lexicographical comparison on byte arrays.
- */
-class LexicographicComparator extends Comparator[Array[Byte]] {
-  def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
-    val l = math.min(k1.length, k2.length)
-    var i = 0
-    while (i < l) {
-      if (k1(i) != k2(i))
-        return (k1(i) & 0xff) - (k2(i) & 0xff)
-      i += 1
-    }
-    // okay prefixes are equal, the shorter array is less
-    k1.length - k2.length
-  }
-}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/ScalaJavaUtil.scala
new file mode 100644 (file)
index 0000000..f3ba746
--- /dev/null
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import scala.collection.immutable.Map
+import scala.collection.JavaConverters._
+import scala.runtime.AbstractFunction0
+
+object ScalaJavaUtil {
+
+  /**
+    * Convert a Java map to a Scala immutable Map
+    * */
+  def toScalaMap[K, V](javaMap: java.util.Map[K, V]): Map[K, V] = {
+    javaMap.asScala.toMap
+  }
+
+  /**
+    * Wraps the provided value in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+    *
+    * @param value the value to be wrapped
+    * @tparam T type of the value
+    * @return an AbstractFunction0 that returns contained value when called
+    */
+  def defaultValue[T](value: T): AbstractFunction0[T] = {
+    new AbstractFunction0[T] {
+      override def apply(): T = value
+    }
+  }
+
+  /**
+    * Wraps the provided Java Supplier in an Scala Function, e.g. for use in [[Option#getOrDefault]]
+    *
+    * @param javaFunction the java Supplier function to be wrapped
+    * @tparam T type of the value
+    * @return an AbstractFunction0 that returns contained value when called
+    */
+  def toScalaFunction[T](javaFunction: java.util.function.Supplier[T]): AbstractFunction0[T] = {
+    new AbstractFunction0[T] {
+      override def apply(): T = javaFunction.get()
+    }
+  }
+}
@@ -22,14 +22,14 @@ package org.apache.samza.util
 import org.apache.samza.metrics.Timer
 
 /**
- * a helper class to facilitate update {@link org.apache.samza.metrics.Timer} metric
+ * A helper class to facilitate updating [[org.apache.samza.metrics.Timer]] metrics
  */
-trait TimerUtils {
+trait TimerUtil {
   val clock: () => Long
 
   /**
-   * A helper method to update the {@link org.apache.samza.metrics.Timer} metric.
-   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block.
+   * A helper method to update the [[org.apache.samza.metrics.Timer]] metric.
+   * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block.
    * It updates the Timer instance with the duration of running code block.
    */
   def updateTimer[T](timer: Timer)(runCodeBlock: => T): T = {
@@ -40,8 +40,8 @@ trait TimerUtils {
   }
 
   /**
-   * A helper method to update the {@link org.apache.samza.metrics.Timer} metrics.
-   * It accepts a {@link org.apache.samza.metrics.Timer} instance and a code block
+   * A helper method to update the [[org.apache.samza.metrics.Timer]] metrics.
+   * It accepts a [[org.apache.samza.metrics.Timer]] instance and a code block
    * with no return value. It passes one Long parameter to code block that contains
    * current time in nanoseconds. It updates the Timer instance with the duration of
    * running code block and returns the same duration.
index e12c81a..059eb03 100644 (file)
 
 package org.apache.samza.util
 
-import java.io._
-import java.lang.management.ManagementFactory
-import java.net._
-import java.util.Random
-import java.util.zip.CRC32
 
 import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.SystemConfig.Config2System
 import org.apache.samza.config._
-import org.apache.samza.serializers._
-import org.apache.samza.system.{SystemFactory, SystemStream, SystemStreamPartition}
-import org.apache.samza.{Partition, SamzaException}
+import org.apache.samza.system.SystemStream
+import org.apache.samza.SamzaException
+
+import java.lang.management.ManagementFactory
+import java.net.Inet4Address
+import java.net.InetAddress
+import java.net.NetworkInterface
+import java.util.Random
 
 import scala.collection.JavaConverters._
-import scala.collection.immutable.Map
 
 
 object Util extends Logging {
   val Random = new Random
   val ThreadMxBean = ManagementFactory.getThreadMXBean
 
-  def clock: Long = System.currentTimeMillis
   /**
    * Make an environment variable string safe to pass.
    */
@@ -53,28 +50,9 @@ object Util extends Logging {
     startInclusive + Random.nextInt(endExclusive - startInclusive)
 
   /**
-   * Recursively remove a directory (or file), and all sub-directories. Equivalent
-   * to rm -rf.
+   * Instantiate an object of type T from a given className.
    */
-  def rm(file: File) {
-    if (file == null) {
-      return
-    } else if (file.isDirectory) {
-      val files = file.listFiles()
-      if (files != null) {
-        for (f <- files)
-          rm(f)
-      }
-      file.delete()
-    } else {
-      file.delete()
-    }
-  }
-
-  /**
-   * Instantiate a class instance from a given className.
-   */
-  def getObj[T](className: String) = {
+  def getObj[T](className: String, clazz: Class[T]) = {
     try {
       Class
         .forName(className)
@@ -82,7 +60,7 @@ object Util extends Logging {
         .asInstanceOf[T]
     } catch {
       case e: Throwable => {
-        error("Unable to instantiate a class instance for %s." format className, e)
+        error("Unable to create an instance for class %s." format className, e)
         throw e
       }
     }
@@ -109,244 +87,22 @@ object Util extends Logging {
   }
 
   /**
-   * Makes sure that an object is not null, and throws a NullPointerException
-   * if it is.
-   */
-  def notNull[T](obj: T, msg: String) = if (obj == null) {
-    throw new NullPointerException(msg)
-  }
-
-  /**
-   * Returns the name representing the JVM. It usually contains the PID of the process plus some additional information
-   * @return String that contains the name representing this JVM
-   */
-  def getContainerPID(): String = {
-    ManagementFactory.getRuntimeMXBean().getName()
-  }
-
-  /**
-   * Overriding read method defined below so that it can be accessed from Java classes with default values
-   */
-  def read(url: URL, timeout: Int): String = {
-    read(url, timeout, new ExponentialSleepStrategy)
-  }
-
-  /**
-   * Reads a URL and returns its body as a string. Does no error handling.
-   *
-   * @param url HTTP URL to read from.
-   * @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
-   * @param retryBackoff Instance of exponentialSleepStrategy that encapsulates info on how long to sleep and retry operation
-   * @return String payload of the body of the HTTP response.
-   */
-  def read(url: URL, timeout: Int = 60000, retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy): String = {
-    var httpConn = getHttpConnection(url, timeout)
-    retryBackoff.run(loop => {
-      if(httpConn.getResponseCode != 200)
-      {
-        warn("Error: " + httpConn.getResponseCode)
-        val errorContent = readStream(httpConn.getErrorStream)
-        warn("Error reading stream, failed with response %s" format errorContent)
-        httpConn = getHttpConnection(url, timeout)
-      }
-      else
-      {
-        loop.done
-      }
-    },
-    (exception, loop) => {
-      exception match {
-        case ioe: IOException => {
-          warn("Error getting response from Job coordinator server. received IOException: %s. Retrying..." format ioe.getClass)
-          httpConn = getHttpConnection(url, timeout)
-        }
-        case e: Exception =>
-          loop.done
-          error("Unable to connect to Job coordinator server, received exception", e)
-          throw e
-      }
-    })
-
-    if(httpConn.getResponseCode != 200) {
-      throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
-    }
-    readStream(httpConn.getInputStream)
-  }
-
-  def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
-    val conn = url.openConnection()
-    conn.setConnectTimeout(timeout)
-    conn.setReadTimeout(timeout)
-    conn.asInstanceOf[HttpURLConnection]
-  }
-  private def readStream(stream: InputStream): String = {
-    val br = new BufferedReader(new InputStreamReader(stream));
-    var line: String = null;
-    val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
-    br.close
-    stream.close
-    body
-  }
-
-  /**
-   * Generates a coordinator stream name based on the job name and job id
-   * for the job. The format of the stream name will be:
-   * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
-   */
-  def getCoordinatorStreamName(jobName: String, jobId: String) = {
-    "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-  }
-
-  /**
-   * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
-   * defined in the config, and job name must be defined in config.
-   *
-   * @return A tuple of (jobName, jobId)
-   */
-  def getJobNameAndId(config: Config) = {
-    (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
-  }
-
-  /**
-   * Given a job's full config object, build a subset config which includes
-   * only the job name, job id, and system config for the coordinator stream.
-   */
-  def buildCoordinatorStreamConfig(config: Config) = {
-    val (jobName, jobId) = getJobNameAndId(config)
-    // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
-    val map = config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false).asScala ++
-      Map[String, String](
-        JobConfig.JOB_NAME -> jobName,
-        JobConfig.JOB_ID -> jobId,
-        JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName,
-        JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS -> String.valueOf(config.getMonitorPartitionChangeFrequency))
-    new MapConfig(map.asJava)
-  }
-
-  /**
-   * Get the coordinator system stream from the configuration
-   * @param config
-   * @return
-   */
-  def getCoordinatorSystemStream(config: Config) = {
-    val systemName = config.getCoordinatorSystemName
-    val (jobName, jobId) = Util.getJobNameAndId(config)
-    val streamName = Util.getCoordinatorStreamName(jobName, jobId)
-    new SystemStream(systemName, streamName)
-  }
-
-  /**
-    * Get the coordinator system factory from the configuration
-    * @param config
-    * @return
-    */
-  def getCoordinatorSystemFactory(config: Config) = {
-    val systemName = config.getCoordinatorSystemName
-    val systemFactoryClassName = config
-      .getSystemFactory(systemName)
-      .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
-    Util.getObj[SystemFactory](systemFactoryClassName)
-  }
-
-  /**
-   * The helper function converts a SSP to a string
-   * @param ssp System stream partition
-   * @return The string representation of the SSP
-   */
-  def sspToString(ssp: SystemStreamPartition): String = {
-     ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
-  }
-
-  /**
-   * The method converts the string SSP back to a SSP
-   * @param ssp The string form of the SSP
-   * @return An SSP typed object
-   */
-  def stringToSsp(ssp: String): SystemStreamPartition = {
-     val idx = ssp.indexOf('.');
-     val lastIdx = ssp.lastIndexOf('.')
-     if (idx < 0 || lastIdx < 0) {
-       throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
-     }
-     new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
-                               new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
-  }
-
-  /**
-   * Method to generate the CRC32 checksum code for any given data
-   * @param data The string for which checksum has to be generated
-   * @return long type value representing the checksum
-   * */
-  def getChecksumValue(data: String) = {
-    val crc = new CRC32
-    crc.update(data.getBytes)
-    crc.getValue
-  }
-
-  /**
-   * Method that always writes checksum & data to a file
-   * Checksum is pre-fixed to the data and is a 32-bit long type data.
-   * @param file The file handle to write to
-   * @param data The data to be written to the file
-   * */
-  def writeDataToFile(file: File, data: String) = {
-    val checksum = getChecksumValue(data)
-    var oos: ObjectOutputStream = null
-    var fos: FileOutputStream = null
-    try {
-      fos = new FileOutputStream(file)
-      oos = new ObjectOutputStream(fos)
-      oos.writeLong(checksum)
-      oos.writeUTF(data)
-    } finally {
-      oos.close()
-      fos.close()
-    }
-  }
-
-  /**
-   * Method to read from a file that has a checksum prepended to the data
-   * @param file The file handle to read from
-   * */
-  def readDataFromFile(file: File) = {
-    var fis: FileInputStream = null
-    var ois: ObjectInputStream = null
-    try {
-      fis = new FileInputStream(file)
-      ois = new ObjectInputStream(fis)
-      val checksumFromFile = ois.readLong()
-      val data = ois.readUTF()
-      if(checksumFromFile == getChecksumValue(data)) {
-        data
-      } else {
-        info("Checksum match failed. Data in file is corrupted. Skipping content.")
-        null
-      }
-    } finally {
-      ois.close()
-      fis.close()
-    }
-  }
-
-  /**
-   * Convert a java map to a Scala map
-   * */
-  def javaMapAsScalaMap[T, K](javaMap: java.util.Map[T, K]): Map[T, K] = {
-    javaMap.asScala.toMap
-  }
-
-  /**
-   * Returns the the first host address which is not the loopback address, or {@link java.net.InetAddress#getLocalHost InetAddress.getLocalhost()} as a fallback
+   * Returns the the first host address which is not the loopback address, or [[java.net.InetAddress#getLocalHost]] as a fallback
    *
-   * @return the {@link java.net.InetAddress InetAddress} which represents the localhost
+   * @return the [[java.net.InetAddress]] which represents the localhost
    */
   def getLocalHost: InetAddress = {
     val localHost = InetAddress.getLocalHost
     if (localHost.isLoopbackAddress) {
       debug("Hostname %s resolves to a loopback address, trying to resolve an external IP address.".format(localHost.getHostName))
-      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) NetworkInterface.getNetworkInterfaces.asScala.toList else NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+      val networkInterfaces = if (System.getProperty("os.name").startsWith("Windows")) {
+        NetworkInterface.getNetworkInterfaces.asScala.toList
+      } else {
+        NetworkInterface.getNetworkInterfaces.asScala.toList.reverse
+      }
       for (networkInterface <- networkInterfaces) {
-        val addresses = networkInterface.getInetAddresses.asScala.toList.filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
+        val addresses = networkInterface.getInetAddresses.asScala.toList
+          .filterNot(address => address.isLinkLocalAddress || address.isLoopbackAddress)
         if (addresses.nonEmpty) {
           val address = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head)
           debug("Found an external IP address %s which represents the localhost.".format(address.getHostAddress))
@@ -358,58 +114,6 @@ object Util extends Logging {
   }
 
   /**
-   * A helper function which returns system's default serde factory class according to the
-   * serde name. If not found, throw exception.
-   */
-  def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
-    info("looking for default serdes")
-
-    val serde = serdeName match {
-      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
-      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
-      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
-      case "json" => classOf[JsonSerdeFactory].getCanonicalName
-      case "long" => classOf[LongSerdeFactory].getCanonicalName
-      case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
-      case "string" => classOf[StringSerdeFactory].getCanonicalName
-      case "double" => classOf[DoubleSerdeFactory].getCanonicalName
-      case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No class defined for serde %s" format serdeName)
-    }
-    info("use default serde %s for %s" format (serde, serdeName))
-    serde
-  }
-
-  /**
-   * Add the supplied arguments and handle overflow by clamping the resulting sum to
-   * {@code Long.MinValue} if the sum would have been less than {@code Long.MinValue} or
-   * {@code Long.MaxValue} if the sum would have been greater than {@code Long.MaxValue}.
-   *
-   * @param lhs left hand side of sum
-   * @param rhs right hand side of sum
-   * @return the sum if no overflow occurs, or the clamped extreme if it does.
-   */
-  def clampAdd(lhs: Long, rhs: Long): Long = {
-    val sum = lhs + rhs
-
-    // From "Hacker's Delight", overflow occurs IFF both operands have the same sign and the
-    // sign of the sum differs from the operands. Here we're doing a basic bitwise check that
-    // collapses 6 branches down to 2. The expression {@code lhs ^ rhs} will have the high-order
-    // bit set to true IFF the signs are different.
-    if ((~(lhs ^ rhs) & (lhs ^ sum)) < 0) {
-      return if (lhs >= 0) Long.MaxValue else Long.MinValue
-    }
-
-    sum
-  }
-
-  /**
-   * Implicitly convert the Java TimerClock to Scala clock function which returns long timestamp.
-   * @param c Java TimeClock
-   * @return Scala clock function
-   */
-  implicit def asScalaClock(c: HighResolutionClock): () => Long = () => c.nanoTime()
-
-  /**
    * Re-writes configuration using a ConfigRewriter, if one is defined. If
    * there is no ConfigRewriter defined for the job, then this method is a
    * no-op.
@@ -419,10 +123,10 @@ object Util extends Logging {
    */
   def rewriteConfig(config: Config): Config = {
     def rewrite(c: Config, rewriterName: String): Config = {
-      val klass = config
+      val rewriterClassName = config
               .getConfigRewriterClass(rewriterName)
               .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
+      val rewriter = Util.getObj(rewriterClassName, classOf[ConfigRewriter])
       info("Re-writing config with " + rewriter)
       rewriter.rewrite(rewriterName, c)
     }
index 6413413..e537a91 100644 (file)
@@ -26,8 +26,8 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.system.*;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
-import org.apache.samza.util.Util;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -88,7 +88,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
     if (jobId == null) {
       jobId = "1";
     }
-    String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+    String streamName = CoordinatorStreamUtil.getCoordinatorStreamName(jobName, jobId);
     SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
     mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
     return mockConsumer;
@@ -97,7 +97,7 @@ public class MockCoordinatorStreamSystemFactory implements SystemFactory {
   private SystemStream getCoordinatorSystemStream(Config config) {
     assertNotNull(config.get("job.coordinator.system"));
     assertNotNull(config.get("job.name"));
-    return new SystemStream(config.get("job.coordinator.system"), Util.getCoordinatorStreamName(config.get("job.name"),
+    return new SystemStream(config.get("job.coordinator.system"), CoordinatorStreamUtil.getCoordinatorStreamName(config.get("job.name"),
         config.get("job.id") == null ? "1" : config.get("job.id")));
   }
 
index 46e0735..2f95016 100644 (file)
 package org.apache.samza.util;
 
 import com.google.common.collect.ImmutableList;
-import junit.framework.Assert;
-import org.apache.samza.operators.util.MathUtils;
-import org.junit.Test;
 
 import java.util.Collections;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
 
 public class TestMathUtils {
 
   @Test(expected = IllegalArgumentException.class)
   public void testGcdWithNullInputs() {
-    MathUtils.gcd(null);
+    MathUtil.gcd(null);
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void testGcdWithEmptyInputs() {
-    MathUtils.gcd(Collections.emptyList());
+    MathUtil.gcd(Collections.emptyList());
   }
 
   @Test
   public void testGcdWithValidInputs() {
     // gcd(x, x) = x
-    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 2L)));
-    Assert.assertEquals(15, MathUtils.gcd(ImmutableList.of(15L)));
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(1L)));
+    assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 2L)));
+    assertEquals(15, MathUtil.gcd(ImmutableList.of(15L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(1L)));
 
     // gcd(0,x) = x
-    Assert.assertEquals(2, MathUtils.gcd(ImmutableList.of(2L, 0L)));
+    assertEquals(2, MathUtil.gcd(ImmutableList.of(2L, 0L)));
 
     // gcd(1,x) = 1
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 1L)));
 
     // other happy path test cases
-    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
-    Assert.assertEquals(10, MathUtils.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
-    Assert.assertEquals(5, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
+    assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L, 0L)));
+    assertEquals(10, MathUtil.gcd(ImmutableList.of(10L, 20L, 30L, 40L, 50L)));
+    assertEquals(5, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L)));
 
-    Assert.assertEquals(1, MathUtils.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
+    assertEquals(1, MathUtil.gcd(ImmutableList.of(25L, 35L, 45L, 55L, 13L)));
   }
 
+  @Test
+  public void testClampAdd() {
+    assertEquals(0, MathUtil.clampAdd(0, 0));
+    assertEquals(2, MathUtil.clampAdd(1, 1));
+    assertEquals(-2, MathUtil.clampAdd(-1, -1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 0));
+    assertEquals(Long.MAX_VALUE - 1, MathUtil.clampAdd(Long.MAX_VALUE, -1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, 1));
+    assertEquals(Long.MAX_VALUE, MathUtil.clampAdd(Long.MAX_VALUE, Long.MAX_VALUE));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, 0));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, -1));
+    assertEquals(Long.MIN_VALUE + 1, MathUtil.clampAdd(Long.MIN_VALUE, 1));
+    assertEquals(Long.MIN_VALUE, MathUtil.clampAdd(Long.MIN_VALUE, Long.MIN_VALUE));
+    assertEquals(-1, MathUtil.clampAdd(Long.MAX_VALUE, Long.MIN_VALUE));
+  }
 }
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala b/samza-core/src/test/scala/org/apache/samza/config/TestSerializerConfig.scala
new file mode 100644 (file)
index 0000000..83c901c
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.ByteSerdeFactory
+import org.apache.samza.serializers.DoubleSerdeFactory
+import org.apache.samza.serializers.IntegerSerdeFactory
+import org.apache.samza.serializers.JsonSerdeFactory
+import org.apache.samza.serializers.LongSerdeFactory
+import org.apache.samza.config.SerializerConfig.getSerdeFactoryName
+import org.apache.samza.serializers.SerializableSerdeFactory
+import org.apache.samza.serializers.StringSerdeFactory
+import org.junit.Assert.assertEquals
+import org.junit.Assert.assertTrue
+import org.junit.Test
+
+class TestSerializerConfig {
+  @Test
+  def testGetSerdeFactoryName {
+    val config = new MapConfig
+    assertEquals(classOf[ByteSerdeFactory].getName, getSerdeFactoryName("byte"))
+    assertEquals(classOf[IntegerSerdeFactory].getName, getSerdeFactoryName("integer"))
+    assertEquals(classOf[JsonSerdeFactory].getName, getSerdeFactoryName("json"))
+    assertEquals(classOf[LongSerdeFactory].getName, getSerdeFactoryName("long"))
+    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, getSerdeFactoryName("serializable"))
+    assertEquals(classOf[StringSerdeFactory].getName, getSerdeFactoryName("string"))
+    assertEquals(classOf[DoubleSerdeFactory].getName, getSerdeFactoryName("double"))
+
+    // throw SamzaException if can not find the correct serde
+    var throwSamzaException = false
+    try {
+      getSerdeFactoryName("otherName")
+    } catch {
+      case e: SamzaException => throwSamzaException = true
+      case _: Exception =>
+    }
+    assertTrue(throwSamzaException)
+  }
+}
index cf05b3b..95a0a11 100644 (file)
@@ -25,7 +25,7 @@ import org.apache.samza.checkpoint.TestCheckpointTool.MockCheckpointManagerFacto
 import org.apache.samza.job.local.ProcessJobFactory
 import org.apache.samza.job.local.ThreadJobFactory
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
@@ -119,8 +119,9 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
     assertEquals(new MapConfig(config.asJava), coordinator.jobModel.getConfig)
     assertEquals(expectedJobModel, coordinator.jobModel)
 
+    val response = HttpUtil.read(coordinator.server.getUrl)
     // Verify that the JobServlet is serving the correct jobModel
-    val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(Util.read(coordinator.server.getUrl), classOf[JobModel])
+    val jobModelFromCoordinatorUrl = SamzaObjectMapper.getObjectMapper.readValue(response, classOf[JobModel])
     assertEquals(expectedJobModel, jobModelFromCoordinatorUrl)
 
     coordinator.stop
@@ -245,7 +246,7 @@ class TestJobCoordinator extends FlatSpec with PrivateMethodTester {
       val systemFactoryClassName = config
         .getSystemFactory(systemName)
         .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemName))
-      val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+      val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
       systemName -> systemFactory.getAdmin(systemName, config)
     }).toMap
 
index f1dcc3d..6ca4070 100644 (file)
 
 package org.apache.samza.coordinator.server
 
-import org.apache.samza.util.Util
+import org.apache.samza.util.{HttpUtil, Util}
 import org.junit.Assert._
 import org.junit.Test
 import java.net.URL
-import org.eclipse.jetty.server.Connector
 
 class TestHttpServer {
   @Test
@@ -32,9 +31,9 @@ class TestHttpServer {
     try {
       server.addServlet("/basic", new BasicServlet())
       server.start
-      val body = Util.read(new URL(server.getUrl + "/basic"))
+      val body = HttpUtil.read(new URL(server.getUrl + "/basic"))
       assertEquals("{\"foo\":\"bar\"}", body)
-      val css = Util.read(new URL(server.getUrl + "/css/ropa-sans.css"))
+      val css = HttpUtil.read(new URL(server.getUrl + "/css/ropa-sans.css"))
       assertTrue(css.contains("RopaSans"))
     } finally {
       server.stop
index 774230c..cd6c5be 100644 (file)
@@ -31,6 +31,9 @@ import org.apache.samza.system.SystemStream
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
+import org.apache.samza.SamzaException
+import org.apache.samza.config.MapConfig
+import org.apache.samza.util.Util
 
 class TestSerdeManager {
   @Test
index 90a4c01..bbdb819 100644 (file)
@@ -29,7 +29,7 @@ import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
-import org.apache.samza.util.{SystemClock, Util}
+import org.apache.samza.util.{FileUtil, SystemClock}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 import org.mockito.Matchers._
@@ -56,8 +56,8 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @After
   def tearDownTestDirs() {
-    Util.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
-    Util.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
+    FileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir)
+    FileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir)
   }
 
   /**
@@ -125,7 +125,7 @@ class TestTaskStorageManager extends MockitoSugar {
     // Test 2: flush should update the offset file
     taskManager.flush()
     assertTrue(offsetFile.exists())
-    assertEquals("50", Util.readDataFromFile(offsetFile))
+    assertEquals("50", FileUtil.readWithChecksum(offsetFile))
 
     // Test 3: Update sspMetadata before shutdown and verify that offset file is updated correctly
     metadata = new SystemStreamMetadata("testStream", new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() {
@@ -142,7 +142,7 @@ class TestTaskStorageManager extends MockitoSugar {
     taskManager.stop()
     assertTrue(storeFile.exists())
     assertTrue(offsetFile.exists())
-    assertEquals("100", Util.readDataFromFile(offsetFile))
+    assertEquals("100", FileUtil.readWithChecksum(offsetFile))
 
 
     // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset
@@ -274,7 +274,7 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(loggedStore, true)
@@ -296,7 +296,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
     val offsetFile = new File(storeDirectory, "OFFSET")
     offsetFile.createNewFile()
-    Util.writeDataToFile(offsetFile, "Test Offset Data")
+    FileUtil.writeWithChecksum(offsetFile, "Test Offset Data")
     offsetFile.setLastModified(0)
     val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
       .addStore(loggedStore, true)
@@ -315,7 +315,7 @@ class TestTaskStorageManager extends MockitoSugar {
   @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(loggedStore, false)
@@ -352,7 +352,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
   }
 
   /**
@@ -386,7 +386,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
 
     assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists())
   }
@@ -416,7 +416,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "100", FileUtil.readWithChecksum(offsetFilePath))
 
     //Invoke test method again
     taskStorageManager.flush()
@@ -430,7 +430,7 @@ class TestTaskStorageManager extends MockitoSugar {
     val partition = new Partition(0)
 
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
-    Util.writeDataToFile(offsetFilePath, "100")
+    FileUtil.writeWithChecksum(offsetFilePath, "100")
 
     val mockSystemAdmin = mock[SystemAdmin]
     var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")).asJava))
@@ -449,7 +449,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "139", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "139", FileUtil.readWithChecksum(offsetFilePath))
 
     // Flush again
     mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")).asJava))
@@ -461,7 +461,7 @@ class TestTaskStorageManager extends MockitoSugar {
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
-    assertEquals("Found incorrect value in offset file!", "193", Util.readDataFromFile(offsetFilePath))
+    assertEquals("Found incorrect value in offset file!", "193", FileUtil.readWithChecksum(offsetFilePath))
   }
 
   @Test
@@ -556,7 +556,7 @@ class TestTaskStorageManager extends MockitoSugar {
     if (writeOffsetFile) {
       val offsetFile = new File(storeDirectory, "OFFSET")
       if (fileOffset != null) {
-        Util.writeDataToFile(offsetFile, fileOffset)
+        FileUtil.writeWithChecksum(offsetFile, fileOffset)
       } else {
         // Write garbage to produce a null result when it's read
         val fos = new FileOutputStream(offsetFile)
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
new file mode 100644 (file)
index 0000000..5bb6da7
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.samza.util
+
+import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+
+import org.junit.Assert.{assertEquals, assertNull, assertTrue}
+import org.junit.Test
+
+class TestFileUtil {
+  val data = "100"
+  val checksum = FileUtil.getChecksum(data)
+  val file = new File(System.getProperty("java.io.tmpdir"), "test")
+
+  @Test
+  def testWriteDataToFile() {
+    // Invoke test
+    FileUtil.writeWithChecksum(file, data)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(checksum, ois.readLong())
+    assertEquals(data, ois.readUTF())
+    ois.close()
+    fis.close()
+  }
+
+  @Test
+  def testReadDataFromFile() {
+    // Setup
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(checksum)
+    oos.writeUTF(data)
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = FileUtil.readWithChecksum(file)
+
+    // Check data returned
+    assertEquals(data, result)
+  }
+
+  @Test
+  def testReadInvalidDataFromFile() {
+    // Write garbage to produce a null result when it's read
+    val fos = new FileOutputStream(file)
+    val oos = new ObjectOutputStream(fos)
+    oos.writeLong(1)
+    oos.writeUTF("Junk Data")
+    oos.close()
+    fos.close()
+
+    // Invoke test
+    val result = FileUtil.readWithChecksum(file)
+
+    // Check data returned
+    assertNull(result)
+  }
+}
index fae735b..f0b8a17 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.util
 
-import java.io._
 import org.junit.Assert._
 import org.junit.Test
 import org.apache.samza.config.MapConfig
@@ -27,116 +26,21 @@ import org.apache.samza.serializers._
 import org.apache.samza.SamzaException
 
 class TestUtil {
-
-  val data = "100"
-  val checksum = Util.getChecksumValue(data)
-  val file = new File(System.getProperty("java.io.tmpdir"), "test")
-
-  @Test
-  def testWriteDataToFile() {
-    // Invoke test
-    Util.writeDataToFile(file, data)
-
-    // Check that file exists
-    assertTrue("File was not created!", file.exists())
-    val fis = new FileInputStream(file)
-    val ois = new ObjectInputStream(fis)
-
-    // Check content of the file is as expected
-    assertEquals(checksum, ois.readLong())
-    assertEquals(data, ois.readUTF())
-    ois.close()
-    fis.close()
-  }
-
-  @Test
-  def testReadDataFromFile() {
-    // Setup
-    val fos = new FileOutputStream(file)
-    val oos = new ObjectOutputStream(fos)
-    oos.writeLong(checksum)
-    oos.writeUTF(data)
-    oos.close()
-    fos.close()
-
-    // Invoke test
-    val result = Util.readDataFromFile(file)
-
-    // Check data returned
-    assertEquals(data, result)
-  }
-
-  @Test
-  def testReadInvalidDataFromFile() {
-    // Write garbage to produce a null result when it's read
-    val fos = new FileOutputStream(file)
-    val oos = new ObjectOutputStream(fos)
-    oos.writeLong(1)
-    oos.writeUTF("Junk Data")
-    oos.close()
-    fos.close()
-
-    // Invoke test
-    val result = Util.readDataFromFile(file)
-
-    // Check data returned
-    assertNull(result)
-  }
-
   @Test
   def testGetLocalHost(): Unit = {
     assertNotNull(Util.getLocalHost)
   }
 
   @Test
-  def testDefaultSerdeFactoryFromSerdeName {
-    import Util._
-    val config = new MapConfig
-    assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte"))
-    assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer"))
-    assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json"))
-    assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long"))
-    assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable"))
-    assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string"))
-    assertEquals(classOf[DoubleSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("double"))
-
-    // throw SamzaException if can not find the correct serde
-    var throwSamzaException = false
-    try {
-      defaultSerdeFactoryFromSerdeName("otherName")
-    } catch {
-      case e: SamzaException => throwSamzaException = true
-      case _: Exception =>
-    }
-    assertTrue(throwSamzaException)
-  }
-
-  @Test
-  def testClampAdd() {
-    assertEquals(0, Util.clampAdd(0, 0))
-    assertEquals(2, Util.clampAdd(1, 1))
-    assertEquals(-2, Util.clampAdd(-1, -1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 0))
-    assertEquals(Long.MaxValue - 1, Util.clampAdd(Long.MaxValue, -1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, 1))
-    assertEquals(Long.MaxValue, Util.clampAdd(Long.MaxValue, Long.MaxValue))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, 0))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, -1))
-    assertEquals(Long.MinValue + 1, Util.clampAdd(Long.MinValue, 1))
-    assertEquals(Long.MinValue, Util.clampAdd(Long.MinValue, Long.MinValue))
-    assertEquals(-1, Util.clampAdd(Long.MaxValue, Long.MinValue))
-  }
-
-  @Test
   def testGetObjExistingClass() {
-    val obj = Util.getObj[MapConfig]("org.apache.samza.config.MapConfig")
+    val obj = Util.getObj("org.apache.samza.config.MapConfig", classOf[MapConfig])
     assertNotNull(obj)
     assertEquals(classOf[MapConfig], obj.getClass())
   }
 
   @Test(expected = classOf[ClassNotFoundException])
   def testGetObjNonexistentClass() {
-    Util.getObj("this.class.does.NotExist")
+    Util.getObj("this.class.does.NotExist", classOf[Object])
     assert(false, "This should not get hit.")
   }
 }
index 074323f..5c8328c 100644 (file)
@@ -82,7 +82,7 @@ public class ElasticsearchSystemFactory implements SystemFactory {
 
   protected static IndexRequestFactory getIndexRequestFactory(ElasticsearchConfig config) {
     if (config.getIndexRequestFactoryClassName().isPresent()) {
-      return (IndexRequestFactory) Util.getObj(config.getIndexRequestFactoryClassName().get());
+      return Util.getObj(config.getIndexRequestFactoryClassName().get(), IndexRequestFactory.class);
     } else {
       return new DefaultIndexRequestFactory();
     }
index 79bca5b..16de121 100644 (file)
@@ -24,14 +24,14 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileSystem
 import org.apache.samza.system.hdfs.writer.HdfsWriter
 import org.apache.samza.system.{OutgoingMessageEnvelope, SystemProducer}
-import org.apache.samza.util.{Logging, TimerUtils}
+import org.apache.samza.util.{Logging, TimerUtil}
 
 import scala.collection.mutable.{Map => MMap}
 
 
 class HdfsSystemProducer(
   systemName: String, clientId: String, config: HdfsConfig, metrics: HdfsSystemProducerMetrics,
-  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtils {
+  val clock: () => Long = () => System.currentTimeMillis) extends SystemProducer with Logging with TimerUtil {
   val dfs = FileSystem.newInstance(new Configuration(true))
   val writers: MMap[String, HdfsWriter[_]] = MMap.empty[String, HdfsWriter[_]]
   private val lock = new Object //synchronization lock for thread safe access
index 48d6671..8d4098f 100644 (file)
@@ -42,7 +42,7 @@ class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Loggin
       .getSystemFactory(checkpointSystemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format checkpointSystemName))
 
-    val checkpointSystemFactory = Util.getObj[SystemFactory](checkpointSystemFactoryName)
+    val checkpointSystemFactory = Util.getObj(checkpointSystemFactoryName, classOf[SystemFactory])
     val checkpointTopic = KafkaUtil.getCheckpointTopic(jobName, jobId, config)
 
     info(s"Creating a KafkaCheckpointManager to consume from $checkpointTopic")
index 9eaf895..2a17df8 100644 (file)
@@ -34,14 +34,14 @@ import org.apache.samza.system.SystemProducerException
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
 
 class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
                           getProducer: () => Producer[Array[Byte], Array[Byte]],
                           metrics: KafkaSystemProducerMetrics,
                           val clock: () => Long = () => System.nanoTime,
-                          val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtils {
+                          val dropProducerExceptions: Boolean = false) extends SystemProducer with Logging with TimerUtil {
 
   // Represents a fatal error that caused the producer to close.
   val fatalException: AtomicReference[SystemProducerException] = new AtomicReference[SystemProducerException]()
index 51af518..125cf61 100644 (file)
@@ -27,7 +27,7 @@ import kafka.api.TopicMetadata;
 import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.StreamValidationException;
 import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ScalaJavaUtil;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -58,7 +58,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Properties coordProps = new Properties();
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createCoordinatorStreamSpec(STREAM, SYSTEM());
 
     Mockito.doAnswer(invocationOnMock -> {
@@ -90,7 +90,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
 
     Mockito.doAnswer(invocationOnMock -> {
@@ -123,7 +123,7 @@ public class TestKafkaSystemAdminJava extends TestKafkaSystemAdmin {
     Map<String, ChangelogInfo> changeLogMap = new HashMap<>();
     changeLogMap.put(STREAM, new ChangelogInfo(REP_FACTOR, changeLogProps));
 
-    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, Util.javaMapAsScalaMap(changeLogMap)));
+    KafkaSystemAdmin admin = Mockito.spy(createSystemAdmin(coordProps, 1, ScalaJavaUtil.toScalaMap(changeLogMap)));
     StreamSpec spec = StreamSpec.createChangeLogStreamSpec(STREAM, SYSTEM(), PARTITIONS);
     Mockito.doAnswer(invocationOnMock -> {
       StreamSpec internalSpec = (StreamSpec) invocationOnMock.callRealMethod();
index 03b0d2c..71718b0 100644 (file)
@@ -191,7 +191,7 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness {
       .getSystemFactory(systemName)
       .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
 
-    val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+    val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory])
 
     val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, false, props)
     new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde)
index 1fa78f8..9dca23c 100644 (file)
@@ -25,6 +25,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSerializerConfig;
 import org.apache.samza.config.JavaStorageConfig;
+import org.apache.samza.config.SerializerConfig$;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -125,8 +126,8 @@ public class RocksDbKeyValueReader {
   private Serde<Object> getSerdeFromName(String name, JavaSerializerConfig serializerConfig) {
     String serdeClassName = serializerConfig.getSerdeClass(name);
     if (serdeClassName == null) {
-      serdeClassName = Util.defaultSerdeFactoryFromSerdeName(name);
+      serdeClassName = SerializerConfig$.MODULE$.getSerdeFactoryName(name);
     }
-    return Util.<SerdeFactory<Object>> getObj(serdeClassName).getSerde(name, serializerConfig);
+    return Util.getObj(serdeClassName, SerdeFactory.class).getSerde(name, serializerConfig);
   }
 }
index eae7da2..856cc4e 100644 (file)
 package org.apache.samza.storage.kv
 
 import java.io.File
+import java.util.Comparator
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
-import org.apache.samza.util.{LexicographicComparator, Logging}
+import org.apache.samza.util.Logging
 import org.rocksdb.{TtlDB, _}
 
 object RocksDbKeyValueStore extends Logging {
@@ -301,4 +302,21 @@ class RocksDbKeyValueStore(
       super.hasNext() && comparator.compare(peekKey(), to) < 0
     }
   }
+
+  /**
+    * A comparator that applies a lexicographical comparison on byte arrays.
+    */
+  class LexicographicComparator extends Comparator[Array[Byte]] {
+    def compare(k1: Array[Byte], k2: Array[Byte]): Int = {
+      val l = math.min(k1.length, k2.length)
+      var i = 0
+      while (i < l) {
+        if (k1(i) != k2(i))
+          return (k1(i) & 0xff) - (k2(i) & 0xff)
+        i += 1
+      }
+      // okay prefixes are equal, the shorter array is less
+      k1.length - k2.length
+    }
+  }
 }
index e3a2970..da80560 100644 (file)
@@ -25,12 +25,11 @@ import org.apache.samza.SamzaException
 import org.apache.samza.container.SamzaContainerContext
 import org.apache.samza.metrics.MetricsRegistry
 import org.apache.samza.serializers.Serde
-import org.apache.samza.storage.{StoreProperties, StorageEngine, StorageEngineFactory}
+import org.apache.samza.storage.{StorageEngine, StorageEngineFactory, StoreProperties}
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.task.MessageCollector
 import org.apache.samza.config.MetricsConfig.Config2Metrics
-import org.apache.samza.util.HighResolutionClock
-import org.apache.samza.util.Util.asScalaClock
+import org.apache.samza.util.{HighResolutionClock, ScalaJavaUtil}
 
 /**
  * A key value storage engine factory implementation
@@ -152,7 +151,8 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       }
     }
 
-    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore, keyValueStorageEngineMetrics, batchSize, clock)
+    new KeyValueStorageEngine(storePropertiesBuilder.build(), nullSafeStore, rawStore,
+      keyValueStorageEngineMetrics, batchSize, () => clock.nanoTime())
   }
 
 }
index 373e18a..5f7bbd8 100644 (file)
@@ -22,7 +22,7 @@ package org.apache.samza.storage.kv
 import org.apache.samza.util.Logging
 import org.apache.samza.storage.{StoreProperties, StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
-import org.apache.samza.util.TimerUtils
+import org.apache.samza.util.TimerUtil
 
 import scala.collection.JavaConverters._
 
@@ -37,7 +37,7 @@ class KeyValueStorageEngine[K, V](
   rawStore: KeyValueStore[Array[Byte], Array[Byte]],
   metrics: KeyValueStorageEngineMetrics = new KeyValueStorageEngineMetrics,
   batchSize: Int = 500,
-  val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtils with Logging {
+  val clock: () => Long = { System.nanoTime }) extends StorageEngine with KeyValueStore[K, V] with TimerUtil with Logging {
 
   var count = 0
 
index 3cc35d3..7adffa9 100644 (file)
@@ -19,8 +19,6 @@
 
 package org.apache.samza.storage.kv
 
-import org.apache.samza.util.Util.notNull
-
 import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
@@ -85,4 +83,10 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) extends KeyValueSt
   def close {
     store.close
   }
+
+  private def notNull[T](obj: T, msg: String) = {
+    if (obj == null) {
+      throw new NullPointerException(msg)
+    }
+  }
 }
index ec63358..ab29b71 100644 (file)
@@ -50,6 +50,8 @@ import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
 import org.apache.samza.util.Util;
 
 /**
@@ -267,9 +269,8 @@ public class StreamAppender extends AppenderSkeleton {
         config = JobModelManager.currentJobModelManager().jobModel().getConfig();
       } else {
         String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
-        config = SamzaObjectMapper.getObjectMapper()
-            .readValue(Util.read(new URL(url), 30000), JobModel.class)
-            .getConfig();
+        String response = HttpUtil.read(new URL(url), 30000, new ExponentialSleepStrategy());
+        config = SamzaObjectMapper.getObjectMapper().readValue(response, JobModel.class).getConfig();
       }
     } catch (IOException e) {
       throw new SamzaException("can not read the config", e);
@@ -294,7 +295,7 @@ public class StreamAppender extends AppenderSkeleton {
     String systemName = log4jSystemConfig.getSystemName();
     String systemFactoryName = log4jSystemConfig.getSystemFactory(systemName);
     if (systemFactoryName != null) {
-      systemFactory = Util.<SystemFactory>getObj(systemFactoryName);
+      systemFactory = Util.getObj(systemFactoryName, SystemFactory.class);
     } else {
       throw new SamzaException("Could not figure out \"" + systemName + "\" system factory for log4j StreamAppender to use");
     }
@@ -388,7 +389,7 @@ public class StreamAppender extends AppenderSkeleton {
     }
 
     if (serdeClass != null) {
-      SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
+      SerdeFactory<LoggingEvent> serdeFactory = Util.getObj(serdeClass, SerdeFactory.class);
       serde = serdeFactory.getSerde(systemName, config);
     } else {
       String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
index ff1268c..7ca9b35 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.monitor;
 
 import org.apache.samza.metrics.MetricsRegistry;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 
 /**
  * Helper class that instantiates the Monitor.
@@ -30,7 +30,7 @@ public class MonitorLoader {
       throws InstantiationException {
       String factoryClass = monitorConfig.getMonitorFactoryClass();
       try {
-        MonitorFactory monitorFactory = ClassLoaderHelper.fromClassName(factoryClass);
+        MonitorFactory monitorFactory = Util.getObj(factoryClass, MonitorFactory.class);
         return monitorFactory.getMonitorInstance(monitorName, monitorConfig, metricsRegistry);
       } catch (Exception e) {
         throw (InstantiationException)
index a6e0bb0..45b6a39 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.rest.resources.DefaultResourceFactory;
 import org.apache.samza.rest.resources.ResourceFactory;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.slf4j.Logger;
@@ -84,7 +84,7 @@ public class SamzaRestApplication extends ResourceConfig {
   private Collection<? extends Object> instantiateFactoryResources(String factoryClassName, Config config)
       throws InstantiationException {
     try {
-      ResourceFactory factory = ClassLoaderHelper.<ResourceFactory>fromClassName(factoryClassName);
+      ResourceFactory factory = Util.getObj(factoryClassName, ResourceFactory.class);
       return factory.getResourceInstances(config);
     } catch (Exception e) {
       throw (InstantiationException)
index 492385f..19e006f 100644 (file)
@@ -27,7 +27,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.rest.model.Job;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,10 +47,10 @@ public abstract class AbstractJobProxy implements JobProxy {
    * @return        the JobProxy produced by the factory.
    */
   public static JobProxy fromFactory(JobsResourceConfig config) {
-    String jobProxyFactory = config.getJobProxyFactory();
-    if (jobProxyFactory != null && !jobProxyFactory.isEmpty()) {
+    String jobProxyFactoryClassName = config.getJobProxyFactory();
+    if (jobProxyFactoryClassName != null && !jobProxyFactoryClassName.isEmpty()) {
       try {
-        JobProxyFactory factory = ClassLoaderHelper.fromClassName(jobProxyFactory);
+        JobProxyFactory factory = Util.getObj(jobProxyFactoryClassName, JobProxyFactory.class);
         return factory.getJobProxy(config);
       } catch (Exception e) {
         throw new SamzaException(e);
index fbddb30..fd8709f 100644 (file)
@@ -20,13 +20,14 @@ package org.apache.samza.rest.proxy.job;
 
 import java.util.Set;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.yarn.YarnApplicationInfo;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
 import org.apache.samza.rest.resources.JobsResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class SimpleYarnJobProxy extends ScriptJobProxy {
   public SimpleYarnJobProxy(JobsResourceConfig config) throws Exception {
     super(config);
     this.installFinder = new SimpleInstallationFinder(config.getInstallationsPath(),
-                                                      ClassLoaderHelper.fromClassName(config.getJobConfigFactory()));
+        Util.getObj(config.getJobConfigFactory(), ConfigFactory.class));
     this.statusProvider = new YarnRestJobStatusProvider(config);
   }
 
index 5bba683..2da47f8 100644 (file)
@@ -42,7 +42,7 @@ import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.InstallationRecord;
 import org.apache.samza.rest.proxy.job.JobInstance;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -113,11 +113,11 @@ public class SamzaTaskProxy implements TaskProxy {
   private Config getCoordinatorSystemConfig(JobInstance jobInstance) {
     try {
       InstallationRecord record = installFinder.getAllInstalledJobs().get(jobInstance);
-      ConfigFactory configFactory =  ClassLoaderHelper.fromClassName(taskResourceConfig.getJobConfigFactory());
+      ConfigFactory configFactory =  Util.getObj(taskResourceConfig.getJobConfigFactory(), ConfigFactory.class);
       Config config = configFactory.getConfig(new URI(String.format("file://%s", record.getConfigFilePath())));
       Map<String, String> configMap = ImmutableMap.of(JobConfig.JOB_ID(), jobInstance.getJobId(),
                                                       JobConfig.JOB_NAME(), jobInstance.getJobName());
-      return Util.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
+      return CoordinatorStreamUtil.buildCoordinatorStreamConfig(new MapConfig(ImmutableList.of(config, configMap)));
     } catch (Exception e) {
       LOG.error(String.format("Failed to get coordinator stream config for job : %s", jobInstance), e);
       throw new SamzaException(e);
index 7bcac7d..c3e042e 100644 (file)
@@ -21,10 +21,11 @@ package org.apache.samza.rest.proxy.task;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ConfigFactory;
 import org.apache.samza.rest.proxy.installation.InstallationFinder;
 import org.apache.samza.rest.proxy.installation.SimpleInstallationFinder;
 import org.apache.samza.rest.resources.BaseResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,8 +44,8 @@ public class SamzaTaskProxyFactory implements TaskProxyFactory {
                                 String.format("Config param %s is not defined.", BaseResourceConfig.CONFIG_JOB_INSTALLATIONS_PATH));
     String configFactoryClass = config.getJobConfigFactory();
     try {
-      InstallationFinder installFinder = new SimpleInstallationFinder(installationsPath,
-                                                                      ClassLoaderHelper.fromClassName(configFactoryClass));
+      ConfigFactory configFactory = Util.getObj(configFactoryClass, ConfigFactory.class);
+      InstallationFinder installFinder = new SimpleInstallationFinder(installationsPath, configFactory);
       return new SamzaTaskProxy(config, installFinder);
     } catch (Exception e) {
       LOG.error(String.format("Exception during instantiation through configFactory class: %s.", configFactoryClass), e);
index 301c202..7a454a3 100644 (file)
@@ -31,10 +31,9 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.job.JobInstance;
 import org.apache.samza.rest.proxy.task.TaskProxyFactory;
-import org.apache.samza.rest.proxy.task.SamzaTaskProxy;
 import org.apache.samza.rest.proxy.task.TaskProxy;
 import org.apache.samza.rest.proxy.task.TaskResourceConfig;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,11 +55,11 @@ public class TasksResource {
    * @param config  the configuration containing the {@link TaskProxyFactory} class.
    */
   public TasksResource(TaskResourceConfig config) {
-    String taskProxyFactory = config.getTaskProxyFactory();
-    Preconditions.checkArgument(StringUtils.isNotEmpty(taskProxyFactory),
+    String taskProxyFactoryClassName = config.getTaskProxyFactory();
+    Preconditions.checkArgument(StringUtils.isNotEmpty(taskProxyFactoryClassName),
                                 String.format("Missing config: %s", TaskResourceConfig.CONFIG_TASK_PROXY_FACTORY));
     try {
-      TaskProxyFactory factory = ClassLoaderHelper.fromClassName(taskProxyFactory);
+      TaskProxyFactory factory = Util.getObj(taskProxyFactoryClassName, TaskProxyFactory.class);
       taskProxy = factory.getTaskProxy(config);
     } catch (Exception e) {
       LOG.error(String.format("Exception in building TasksResource with config: %s.", config), e);
index 9f3c9a8..36c86cc 100644 (file)
@@ -34,7 +34,7 @@ import org.apache.samza.storage.StorageEngineFactory
 import org.apache.samza.storage.kv.{KeyValueStorageEngine, KeyValueStore}
 import org.apache.samza.system.{SystemProducer, SystemProducers}
 import org.apache.samza.task.TaskInstanceCollector
-import org.apache.samza.util.{CommandLine, Logging, Util}
+import org.apache.samza.util.{CommandLine, FileUtil, Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
 
 import scala.collection.JavaConverters._
@@ -99,7 +99,7 @@ object TestKeyValuePerformance extends Logging {
         val storageFactoryClassName =
           config.getStorageFactoryClassName(storeName)
                 .getOrElse(throw new SamzaException("Missing storage factory for %s." format storeName))
-        (storeName, Util.getObj[StorageEngineFactory[Array[Byte], Array[Byte]]](storageFactoryClassName))
+        (storeName, Util.getObj(storageFactoryClassName, classOf[StorageEngineFactory[Array[Byte], Array[Byte]]]))
     })
 
     for((storeName, storageEngine) <- storageEngineMappings) {
@@ -128,7 +128,7 @@ object TestKeyValuePerformance extends Logging {
         // Run the test method
         testMethod(db, config.subset("set-" + testSet + ".", true))
 
-        Util.rm(output)
+        FileUtil.rm(output)
       })
     }
   }
index 5cde446..18dc53d 100644 (file)
@@ -131,8 +131,9 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
     map.put(ApplicationConfig.PROCESSOR_ID, pId);
 
     Config config = new MapConfig(map);
+    String jobCoordinatorFactoryClassName = new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName();
     JobCoordinator jobCoordinator =
-        Util.<JobCoordinatorFactory>getObj(new JobCoordinatorConfig(config).getJobCoordinatorFactoryClassName())
+        Util.getObj(jobCoordinatorFactoryClassName, JobCoordinatorFactory.class)
             .getJobCoordinator(config);
 
     StreamProcessorLifecycleListener listener = new StreamProcessorLifecycleListener() {
index 6d02272..0b405f0 100644 (file)
@@ -43,7 +43,7 @@ import org.apache.samza.metrics.JmxMetricsAccessor;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsValidator;
 import org.apache.samza.storage.ChangelogStreamManager;
-import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.Util;
 import org.apache.samza.util.hadoop.HttpFileSystem;
 import org.apache.samza.util.CommandLine;
 import org.slf4j.Logger;
@@ -185,7 +185,7 @@ public class YarnJobValidationTool {
     MetricsValidator validator = null;
     if (options.has(validatorOpt)) {
       String validatorClass = options.valueOf(validatorOpt);
-      validator = ClassLoaderHelper.<MetricsValidator>fromClassName(validatorClass);
+      validator = Util.getObj(validatorClass, MetricsValidator.class);
     }
 
     YarnConfiguration hadoopConfig = new YarnConfiguration();
index 5230b0f..3e9d0fe 100644 (file)
@@ -26,7 +26,7 @@ import org.apache.samza.config.{Config, JobConfig, ShellCommandConfig, YarnConfi
 import org.apache.samza.job.ApplicationStatus.{Running, SuccessfulFinish, UnsuccessfulFinish}
 import org.apache.samza.job.{ApplicationStatus, StreamJob}
 import org.apache.samza.serializers.model.SamzaObjectMapper
-import org.apache.samza.util.Util
+import org.apache.samza.util.{CoordinatorStreamUtil, Util}
 import org.slf4j.LoggerFactory
 
 /**
@@ -53,7 +53,7 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
             format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
             cmdExec, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
         Some({
-          val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
+          val coordinatorSystemConfig = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
           val envMap = Map(
             ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString
             (coordinatorSystemConfig)),
index d6fc254..3fa5a2a 100644 (file)
@@ -23,14 +23,14 @@ import java.io.IOException;
 import java.net.URL;
 import junit.framework.Assert;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.samza.container.ContainerHeartbeatClient;
 import org.apache.samza.container.ContainerHeartbeatResponse;
 import org.apache.samza.coordinator.server.HttpServer;
 import org.apache.samza.job.yarn.YarnAppState;
 import org.apache.samza.job.yarn.YarnContainer;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
-import org.apache.samza.util.Util;
+import org.apache.samza.util.ExponentialSleepStrategy;
+import org.apache.samza.util.HttpUtil;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
@@ -77,7 +77,7 @@ public class TestYarnContainerHeartbeatServlet {
     when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
     yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
     URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + VALID_CONTAINER_ID);
-    String response = Util.read(url, 1000);
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertTrue(heartbeat.isAlive());
   }
@@ -90,7 +90,7 @@ public class TestYarnContainerHeartbeatServlet {
     when(container.id()).thenReturn(ConverterUtils.toContainerId(VALID_CONTAINER_ID));
     yarnAppState.runningYarnContainers.put(VALID_CONTAINER_ID, container);
     URL url = new URL(webApp.getUrl().toString() + "containerHeartbeat?executionContainerId=" + INVALID_CONTAINER_ID);
-    String response = Util.read(url, 1000);
+    String response = HttpUtil.read(url, 1000, new ExponentialSleepStrategy());
     heartbeat = mapper.readValue(response, ContainerHeartbeatResponse.class);
     Assert.assertFalse(heartbeat.isAlive());
   }