SAMZA-1428: Fix scala 2.10 compilation issue with java 8 interface st…
authorJacob Maes <jmaes@linkedin.com>
Thu, 21 Sep 2017 18:57:31 +0000 (11:57 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 21 Sep 2017 18:57:31 +0000 (11:57 -0700)
…atic methods

Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Boris Shkolnik <boryas@apache.org>

Closes #300 from jmakes/samza-1428

samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtilsFactory.java
samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java

index 700a107..76a42a9 100644 (file)
@@ -21,6 +21,8 @@ package org.apache.samza.config;
 
 import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
+import org.apache.samza.util.ClassLoaderHelper;
 import org.apache.samza.zk.ZkCoordinationUtilsFactory;
 
 public class JobCoordinatorConfig extends MapConfig {
@@ -49,6 +51,13 @@ public class JobCoordinatorConfig extends MapConfig {
     return className;
   }
 
+  public CoordinationUtilsFactory getCoordinationUtilsFactory() {
+    // load the class
+    String coordinationUtilsFactoryClass = getJobCoordinationUtilsFactoryClassName();
+
+    return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
+  }
+
   public String getJobCoordinatorFactoryClassName() {
     String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
     if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
index 914216b..686cc61 100644 (file)
@@ -19,8 +19,6 @@
 package org.apache.samza.coordinator;
 
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobCoordinatorConfig;
-import org.apache.samza.util.*;
 
 
 /**
@@ -28,14 +26,6 @@ import org.apache.samza.util.*;
  */
 public interface CoordinationUtilsFactory {
 
-  public static CoordinationUtilsFactory getCoordinationUtilsFactory(Config config) {
-    // load the class
-    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
-    String coordinationUtilsFactoryClass =   jcConfig.getJobCoordinationUtilsFactoryClassName();
-
-    return ClassLoaderHelper.fromClassName(coordinationUtilsFactoryClass, CoordinationUtilsFactory.class);
-  }
-
   /**
    * get a unique service instance
    * @param groupId - unique id to identify the service
index 077c124..49a54bb 100644 (file)
@@ -34,9 +34,9 @@ import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
@@ -217,9 +217,10 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
     // Move the scope of coordination utils within stream creation to address long idle connection problem.
     // Refer SAMZA-1385 for more details
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
     String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
     CoordinationUtils coordinationUtils =
-        CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
+        jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, config);
     if (coordinationUtils == null) {
       LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
       // each application process will try creating the streams, which
index f9c1252..1816380 100644 (file)
@@ -29,6 +29,7 @@ import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
@@ -44,6 +45,7 @@ import org.apache.samza.system.StreamSpec;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
@@ -53,11 +55,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(CoordinationUtilsFactory.class)
+@PrepareForTest(LocalApplicationRunner.class)
 public class TestLocalApplicationRunner {
 
   private static final String PLAN_JSON =
@@ -105,9 +106,10 @@ public class TestLocalApplicationRunner {
     };
     when(planner.plan(anyObject())).thenReturn(plan);
 
-    mockStatic(CoordinationUtilsFactory.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
-    when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+    when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+    PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
 
     LocalApplicationRunner spy = spy(runner);
     try {
@@ -164,8 +166,9 @@ public class TestLocalApplicationRunner {
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
-    mockStatic(CoordinationUtilsFactory.class);
-    when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
+    JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+    when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+    PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
 
     DistributedLockWithState lock = mock(DistributedLockWithState.class);
     when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);