SAMZA-1788: Add LocationIdProvider abstraction.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Fri, 10 Aug 2018 21:26:42 +0000 (14:26 -0700)
committerJagadish <jvenkatraman@linkedin.com>
Fri, 10 Aug 2018 21:26:42 +0000 (14:26 -0700)
Currently in standalone, by default hostName of the standalone processor is used as LocationId. However, for containerized environments like azure cloud, kubernetes this defaulting does not work. Standalone processors can be launched from different kubernetes container on a physical machine(where each kubernetes container has different locatliyID than other kubernetes container within same machine).

To solve this problem, we introduce locationID abstraction to allow users to plugin a uniqueId identifying the execution environment of the processor.

In containerized environments, LocationId is a composite key of multiple fields: (sliceId, containerId, hostname) By default hostname will be used as LocationId(if not configured by the user).

All the processors of an application registered from an locationID should be able to share(read/write) their local state stores. Any custom LocationIdProvider is expected to honor this contract when generating the locationID.

This patch is part of SEP-11. Please refer to it for more details.

Author: Shanthoosh Venkataraman <spvenkat@usc.edu>

Reviewers: Jagadish<jagadish@apache.org>

Closes #585 from shanthoosh/add_location_id_interface

samza-api/src/main/java/org/apache/samza/runtime/LocationId.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala

diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationId.java
new file mode 100644 (file)
index 0000000..48ea7ff
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.Objects;
+
+/**
+ * Represents the physical execution environment of the StreamProcessor.
+ * All the stream processors which run from a LocationId should be able to share (read/write)
+ * their local state stores.
+ */
+public class LocationId {
+  private final String locationId;
+
+  public LocationId(String locationId) {
+    if (locationId == null) {
+      throw new IllegalArgumentException("LocationId cannot be null");
+    }
+    this.locationId = locationId;
+  }
+
+  public String getId() {
+    return this.locationId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    LocationId that = (LocationId) o;
+
+    return Objects.equals(locationId, that.locationId);
+  }
+
+  @Override
+  public int hashCode() {
+    return locationId.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return locationId;
+  }
+}
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProvider.java
new file mode 100644 (file)
index 0000000..d0e0af0
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+/**
+ * Generates {@link LocationId} that uniquely identifies the
+ * execution environment of a stream processor.
+ */
+public interface LocationIdProvider {
+
+  LocationId getLocationId();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java b/samza-api/src/main/java/org/apache/samza/runtime/LocationIdProviderFactory.java
new file mode 100644 (file)
index 0000000..b8017d8
--- /dev/null
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+
+/**
+ * Builds the {@link LocationIdProvider}.
+ */
+public interface LocationIdProviderFactory {
+  LocationIdProvider getLocationIdProvider(Config config);
+}
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java b/samza-core/src/main/java/org/apache/samza/runtime/DefaultLocationIdProviderFactory.java
new file mode 100644 (file)
index 0000000..47c5330
--- /dev/null
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.util.Util;
+
+/**
+ * Uses the address of the local host for generating {@link LocationId}. 
+ */ 
+public class DefaultLocationIdProviderFactory implements LocationIdProviderFactory {
+  @Override
+  public LocationIdProvider getLocationIdProvider(Config config) {
+    return ()  -> new LocationId(Util.getLocalHost().getHostName());
+  }
+}
index 7cebcc6..15d9d20 100644 (file)
@@ -23,6 +23,7 @@ package org.apache.samza.config
 import java.io.File
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
 
 object JobConfig {
@@ -77,6 +78,8 @@ object JobConfig {
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
+  val LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory"
+
   // Processor Config Constants
   val PROCESSOR_ID = "processor.id"
   val PROCESSOR_LIST = "processor.list"
@@ -168,6 +171,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
 
+  def getLocationIdProviderFactory = getOption(JobConfig.LOCATION_ID_PROVIDER_FACTORY).getOrElse(classOf[DefaultLocationIdProviderFactory].getCanonicalName)
+
   def getSecurityManagerFactory = getOption(JobConfig.JOB_SECURITY_MANAGER_FACTORY)
 
   def getSSPMatcherClass = getOption(JobConfig.SSP_MATCHER_CLASS)