SAMZA-1786: Introduce metadata store abstraction.
authorShanthoosh Venkataraman <spvenkat@usc.edu>
Wed, 29 Aug 2018 22:38:36 +0000 (15:38 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Wed, 29 Aug 2018 22:38:36 +0000 (15:38 -0700)
As a part of SEP-11, this patch adds MetadataStore interface to store task and container locality in both yarn and standalone deployment models. Please refer to SEP-11 for more details.

Few important points to note:
1. As a part of this changes, LocalityManager/TaskAsssignmentManager alone will be updated to use this interface(subsequently in upcoming future RB's other util classes will be moved to use this interface as well).
2. In an immediate followup RB, ZkMetadataStore(storing metadata information in zookeeper) will be added. It will be used in standalone to read/write locality into zookeeper(through LocalityManager & other standard util classes).
3. In future, ExecutionPlan, streamGraph and other job related metadata can be stored in any custom store through the same abstraction.

Testing:
1. Added unit tests for new classes introduced in the patch(Fixed the existing unit tests in LocalityManager/TaskAssignmentManager).
2. All the changes in the patch were validated with test jobs in samza-hello-samza(https://github.com/apache/samza-hello-samza).
3. LinkedIn testing job(maes-tests-host-affinity) was verified with these changes to validate if things work end-to-end.

Author: Shanthoosh Venkataraman <spvenkat@usc.edu>

Reviewers: Bharath Kumarasubramanian <bkumaras@linkedin.com>, Prateek Maheshwari <pmaheshwari@apache.org>, Daniel Nishimura <dnishimu@linkedin.com>

Closes #583 from shanthoosh/metadata_store_iface

17 files changed:
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStoreFactory.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamKeySerde.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/test/java/org/apache/samza/clustermanager/TestClusterBasedJobCoordinator.java
samza-core/src/test/java/org/apache/samza/container/TestLocalityManager.java
samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
samza-core/src/test/java/org/apache/samza/coordinator/JobModelManagerTestUtil.java
samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java [new file with mode: 0644]
samza-rest/src/main/java/org/apache/samza/rest/proxy/task/SamzaTaskProxy.java

diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStore.java
new file mode 100644 (file)
index 0000000..aaa420b
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metadatastore;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+import java.util.Map;
+
+/**
+ * Store abstraction responsible for managing the metadata of a Samza job.
+ */
+@InterfaceStability.Evolving
+public interface MetadataStore {
+
+  /**
+   * Initializes the metadata store, if applicable, setting up the underlying resources
+   * and connections to the store endpoints.
+   *
+   * @param config the configuration for instantiating the MetadataStore.
+   */
+  void init(Config config, MetricsRegistry metricsRegistry);
+
+  /**
+   * Gets the value associated with the specified {@code key}.
+   *
+   * @param key the key with which the associated value is to be fetched.
+   * @return if found, the value associated with the specified {@code key}; otherwise, {@code null}.
+   */
+  byte[] get(byte[] key);
+
+  /**
+   * Updates the mapping of the specified key-value pair.
+   *
+   * @param key the key with which the specified {@code value} is to be associated.
+   * @param value the value with which the specified {@code key} is to be associated.
+   */
+  void put(byte[] key, byte[] value);
+
+  /**
+   * Deletes the mapping for the specified {@code key} from this metadata store (if such mapping exists).
+   *
+   * @param key the key for which the mapping is to be deleted.
+   */
+  void delete(byte[] key);
+
+  /**
+   * Returns all the entries in this metadata store.
+   *
+   * @return all entries in this metadata store.
+   */
+  Map<byte[], byte[]> all();
+
+  /**
+   * Flushes the metadata store, if applicable.
+   */
+  void flush();
+
+  /**
+   * Closes the metadata store, if applicable, relinquishing all the underlying resources
+   * and connections.
+   */
+  void close();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStoreFactory.java b/samza-api/src/main/java/org/apache/samza/metadatastore/MetadataStoreFactory.java
new file mode 100644 (file)
index 0000000..0892236
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.metadatastore;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Builds the {@link MetadataStore} based upon the provided namespace, {@link Config}
+ * and {@link MetricsRegistry}.
+ */
+public interface MetadataStoreFactory {
+  MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry);
+}
index 743c87c..c70b15a 100644 (file)
 
 package org.apache.samza.container;
 
-import org.apache.samza.container.grouper.task.TaskAssignmentManager;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.grouper.task.TaskAssignmentManager;
+import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
-
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Locality Manager is used to persist and read the container-to-host
- * assignment information from the coordinator stream
+ * assignment information from the coordinator stream.
  * */
 public class LocalityManager {
-  private static final String CONTAINER_PREFIX = "SamzaContainer-";
   private static final Logger LOG = LoggerFactory.getLogger(LocalityManager.class);
 
-  private final CoordinatorStreamManager coordinatorStreamManager;
+  private final Config config;
+  private final Serde<String> keySerde;
+  private final Serde<String> valueSerde;
+  private final MetadataStore metadataStore;
   private final TaskAssignmentManager taskAssignmentManager;
 
-  private Map<String, Map<String, String>> containerToHostMapping = new HashMap<>();
+  /**
+   * Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
+   * Uses {@link CoordinatorStreamKeySerde} and {@link CoordinatorStreamValueSerde} to
+   * serialize messages before reading/writing into coordinator stream.
+   *
+   * @param config the configuration required for setting up metadata store.
+   * @param metricsRegistry the registry for reporting metrics.
+   */
+  public LocalityManager(Config config, MetricsRegistry metricsRegistry) {
+    this(config, metricsRegistry, new CoordinatorStreamKeySerde(SetContainerHostMapping.TYPE),
+         new CoordinatorStreamValueSerde(SetContainerHostMapping.TYPE));
+  }
 
   /**
-   * Constructor that creates a read-write or write-only locality manager.
+   * Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
+   * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs before reading/writing
+   * into {@link MetadataStore}.
    *
-   * @param coordinatorStreamManager Coordinator stream manager.
+   * Key and value serializer are different for yarn (uses CoordinatorStreamMessage) and standalone (native ObjectOutputStream for serialization) modes.
+   * @param config the configuration required for setting up metadata store.
+   * @param metricsRegistry the registry for reporting metrics.
+   * @param keySerde the key serializer.
+   * @param valueSerde the value serializer.
    */
-  public LocalityManager(CoordinatorStreamManager coordinatorStreamManager) {
-    this.coordinatorStreamManager = coordinatorStreamManager;
-    this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
+  LocalityManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) {
+    this.config = config;
+    MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+    this.metadataStore = metadataStoreFactory.getMetadataStore(SetContainerHostMapping.TYPE, config, metricsRegistry);
+    this.metadataStore.init(config, metricsRegistry);
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+    this.taskAssignmentManager = new TaskAssignmentManager(config, metricsRegistry, keySerde, valueSerde);
   }
 
   /**
-   * Method to allow read container locality information from coordinator stream. This method is used
-   * in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Method to allow read container locality information from the {@link MetadataStore}.
+   * This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
    *
-   * @return the map of containerId: (hostname, jmxAddress, jmxTunnelAddress)
+   * @return the map of containerId: (hostname)
    */
   public Map<String, Map<String, String>> readContainerLocality() {
-    if (coordinatorStreamManager == null) {
-      throw new IllegalStateException("No coordinator stream manager to read container locality from.");
-    }
-
     Map<String, Map<String, String>> allMappings = new HashMap<>();
-    for (CoordinatorStreamMessage message : coordinatorStreamManager.getBootstrappedStream(
-        SetContainerHostMapping.TYPE)) {
-      SetContainerHostMapping mapping = new SetContainerHostMapping(message);
-      Map<String, String> localityMappings = new HashMap<>();
-      localityMappings.put(SetContainerHostMapping.HOST_KEY, mapping.getHostLocality());
-      localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
-      localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
-      allMappings.put(mapping.getKey(), localityMappings);
-    }
-    containerToHostMapping = Collections.unmodifiableMap(allMappings);
-
+    metadataStore.all().forEach((keyBytes, valueBytes) -> {
+        if (valueBytes != null) {
+          String locationId = valueSerde.fromBytes(valueBytes);
+          allMappings.put(keySerde.fromBytes(keyBytes), ImmutableMap.of(SetContainerHostMapping.HOST_KEY, locationId));
+        }
+      });
     if (LOG.isDebugEnabled()) {
-      for (Map.Entry<String, Map<String, String>> entry : containerToHostMapping.entrySet()) {
+      for (Map.Entry<String, Map<String, String>> entry : allMappings.entrySet()) {
         LOG.debug(String.format("Locality for container %s: %s", entry.getKey(), entry.getValue()));
       }
     }
 
-    return allMappings;
+    return Collections.unmodifiableMap(allMappings);
   }
 
   /**
-   * Method to write locality info to coordinator stream. This method is used in {@link SamzaContainer}.
+   * Method to write locality information to the {@link MetadataStore}. This method is used in {@link SamzaContainer}.
    *
    * @param containerId  the {@link SamzaContainer} ID
    * @param hostName  the hostname
-   * @param jmxAddress  the JMX URL address
-   * @param jmxTunnelingAddress  the JMX Tunnel URL address
    */
-  public void writeContainerToHostMapping(String containerId, String hostName, String jmxAddress,
-      String jmxTunnelingAddress) {
-    if (coordinatorStreamManager == null) {
-      throw new IllegalStateException("No coordinator stream manager to write locality info to.");
-    }
-
+  public void writeContainerToHostMapping(String containerId, String hostName) {
+    Map<String, Map<String, String>> containerToHostMapping = readContainerLocality();
     Map<String, String> existingMappings = containerToHostMapping.get(containerId);
-    String existingHostMapping =
-        existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
+    String existingHostMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.HOST_KEY) : null;
     if (existingHostMapping != null && !existingHostMapping.equals(hostName)) {
       LOG.info("Container {} moved from {} to {}", new Object[]{containerId, existingHostMapping, hostName});
     } else {
       LOG.info("Container {} started at {}", containerId, hostName);
     }
-    coordinatorStreamManager.send(
-        new SetContainerHostMapping(CONTAINER_PREFIX + containerId, String.valueOf(containerId), hostName, jmxAddress,
-            jmxTunnelingAddress));
-    Map<String, String> mappings = new HashMap<>();
-    mappings.put(SetContainerHostMapping.HOST_KEY, hostName);
-    mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
-    mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
-    containerToHostMapping.put(containerId, mappings);
+
+    metadataStore.put(keySerde.toBytes(containerId), valueSerde.toBytes(hostName));
   }
 
-  public TaskAssignmentManager getTaskAssignmentManager() {
-    return taskAssignmentManager;
+  public void close() {
+    metadataStore.close();
+    taskAssignmentManager.close();
   }
 
-  public CoordinatorStreamManager getCoordinatorStreamManager() {
-    return coordinatorStreamManager;
+  public TaskAssignmentManager getTaskAssignmentManager() {
+    return taskAssignmentManager;
   }
 }
index 6ec070a..42a6e81 100644 (file)
@@ -21,62 +21,92 @@ package org.apache.samza.container.grouper.task;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
-import org.apache.samza.coordinator.stream.messages.Delete;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamKeySerde;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.util.Util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Task assignment Manager is used to persist and read the task-to-container
- * assignment information from the coordinator stream
+ * assignment information from the coordinator stream.
  * */
 public class TaskAssignmentManager {
-  private static final Logger log = LoggerFactory.getLogger(TaskAssignmentManager.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TaskAssignmentManager.class);
+
+  private final Config config;
   private final Map<String, String> taskNameToContainerId = new HashMap<>();
-  private final CoordinatorStreamManager coordinatorStreamManager;
-  private static final String SOURCE = "SamzaTaskAssignmentManager";
+  private final Serde<String> keySerde;
+  private final Serde<String> valueSerde;
+
+  private MetadataStore metadataStore;
 
   /**
-   * Default constructor that creates a read-write manager
+   * Builds the TaskAssignmentManager based upon {@link Config} and {@link MetricsRegistry}.
+   * Uses {@link CoordinatorStreamKeySerde} and {@link CoordinatorStreamValueSerde} to
+   * serialize messages before reading/writing into coordinator stream.
    *
-   * @param coordinatorStreamManager coordinator stream manager.
+   * @param config the configuration required for setting up metadata store.
+   * @param metricsRegistry the registry for reporting metrics.
    */
-  public TaskAssignmentManager(CoordinatorStreamManager coordinatorStreamManager) {
-    this.coordinatorStreamManager = coordinatorStreamManager;
-    coordinatorStreamManager.register(SOURCE);
+  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry) {
+    this(config, metricsRegistry, new CoordinatorStreamKeySerde(SetTaskContainerMapping.TYPE),
+         new CoordinatorStreamValueSerde(SetTaskContainerMapping.TYPE));
   }
 
   /**
-   * Method to allow read container task information from coordinator stream. This method is used
-   * in {@link org.apache.samza.coordinator.JobModelManager}.
+   * Builds the LocalityManager based upon {@link Config} and {@link MetricsRegistry}.
+   *
+   * Uses keySerde, valueSerde to serialize/deserialize (key, value) pairs before reading/writing
+   * into {@link MetadataStore}.
+   *
+   * Key and value serializer are different for yarn(uses CoordinatorStreamMessage) and standalone(uses native
+   * ObjectOutputStream for serialization) modes.
+   * @param config the configuration required for setting up metadata store.
+   * @param metricsRegistry the registry for reporting metrics.
+   * @param keySerde the key serializer.
+   * @param valueSerde the value serializer.
+   */
+  public TaskAssignmentManager(Config config, MetricsRegistry metricsRegistry, Serde<String> keySerde, Serde<String> valueSerde) {
+    this.config = config;
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+    MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+    this.metadataStore = metadataStoreFactory.getMetadataStore(SetTaskContainerMapping.TYPE, config, metricsRegistry);
+  }
+
+  public void init(Config config, MetricsRegistry metricsRegistry) {
+    this.metadataStore.init(config, metricsRegistry);
+  }
+
+  /**
+   * Method to allow read container task information from {@link MetadataStore}. This method is used in {@link org.apache.samza.coordinator.JobModelManager}.
    *
    * @return the map of taskName: containerId
    */
   public Map<String, String> readTaskAssignment() {
     taskNameToContainerId.clear();
-    for (CoordinatorStreamMessage message: coordinatorStreamManager.getBootstrappedStream(SetTaskContainerMapping.TYPE)) {
-      if (message.isDelete()) {
-        taskNameToContainerId.remove(message.getKey());
-        log.debug("Got TaskContainerMapping delete message: {}", message);
-      } else {
-        SetTaskContainerMapping mapping = new SetTaskContainerMapping(message);
-        taskNameToContainerId.put(mapping.getKey(), mapping.getTaskAssignment());
-        log.debug("Got TaskContainerMapping message: {}", mapping);
-      }
-    }
-
-    for (Map.Entry<String, String> entry : taskNameToContainerId.entrySet()) {
-      log.debug("Assignment for task \"{}\": {}", entry.getKey(), entry.getValue());
-    }
-
+    metadataStore.all().forEach((keyBytes, valueBytes) -> {
+        String taskName = keySerde.fromBytes(keyBytes);
+        String containerId = valueSerde.fromBytes(valueBytes);
+        if (containerId != null) {
+          taskNameToContainerId.put(taskName, containerId);
+        }
+        LOG.debug("Assignment for task {}: {}", taskName, containerId);
+      });
     return Collections.unmodifiableMap(new HashMap<>(taskNameToContainerId));
   }
 
   /**
-   * Method to write task container info to coordinator stream.
+   * Method to write task container info to {@link MetadataStore}.
    *
    * @param taskName    the task name
    * @param containerId the SamzaContainer ID or {@code null} to delete the mapping
@@ -84,28 +114,33 @@ public class TaskAssignmentManager {
   public void writeTaskContainerMapping(String taskName, String containerId) {
     String existingContainerId = taskNameToContainerId.get(taskName);
     if (existingContainerId != null && !existingContainerId.equals(containerId)) {
-      log.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
+      LOG.info("Task \"{}\" moved from container {} to container {}", new Object[]{taskName, existingContainerId, containerId});
     } else {
-      log.debug("Task \"{}\" assigned to container {}", taskName, containerId);
+      LOG.debug("Task \"{}\" assigned to container {}", taskName, containerId);
     }
 
     if (containerId == null) {
-      coordinatorStreamManager.send(new Delete(SOURCE, taskName, SetTaskContainerMapping.TYPE));
+      metadataStore.delete(keySerde.toBytes(taskName));
       taskNameToContainerId.remove(taskName);
     } else {
-      coordinatorStreamManager.send(new SetTaskContainerMapping(SOURCE, taskName, String.valueOf(containerId)));
+      metadataStore.put(keySerde.toBytes(taskName), valueSerde.toBytes(containerId));
       taskNameToContainerId.put(taskName, containerId);
     }
   }
 
   /**
-   * Deletes the task container info from the coordinator stream for each of the specified task names.
+   * Deletes the task container info from the {@link MetadataStore} for the task names.
    *
    * @param taskNames the task names for which the mapping will be deleted.
    */
   public void deleteTaskContainerMappings(Iterable<String> taskNames) {
     for (String taskName : taskNames) {
-      writeTaskContainerMapping(taskName, null);
+      metadataStore.delete(keySerde.toBytes(taskName));
+      taskNameToContainerId.remove(taskName);
     }
   }
+
+  public void close() {
+    metadataStore.close();
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamMetadataStoreFactory.java
new file mode 100644 (file)
index 0000000..c38dc56
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Builds the {@link CoordinatorStreamStore} based upon the provided {@link Config}
+ * and {@link MetricsRegistry}.
+ */
+public class CoordinatorStreamMetadataStoreFactory implements MetadataStoreFactory {
+
+  @Override
+  public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
+    return new CoordinatorStreamStore(namespace, config, metricsRegistry);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java b/samza-core/src/main/java/org/apache/samza/coordinator/metadatastore/CoordinatorStreamStore.java
new file mode 100644 (file)
index 0000000..d74188b
--- /dev/null
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.UnsignedBytes;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeMap;
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of the {@link MetadataStore} interface where the metadata of the Samza job is stored in coordinator stream.
+ *
+ * This class is thread safe.
+ */
+public class CoordinatorStreamStore implements MetadataStore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(CoordinatorStreamStore.class);
+  private static final String SOURCE = "SamzaContainer";
+
+  private final Config config;
+  private final SystemStream coordinatorSystemStream;
+  private final SystemStreamPartition coordinatorSystemStreamPartition;
+  private final SystemProducer systemProducer;
+  private final SystemConsumer systemConsumer;
+  private final SystemAdmin systemAdmin;
+  private final String type;
+  private final Serde<List<?>> keySerde;
+
+  // Using custom comparator since java default comparator offers object identity equality(not value equality) for byte arrays.
+  private final Map<byte[], byte[]> bootstrappedMessages = new TreeMap<>(UnsignedBytes.lexicographicalComparator());
+  private final Object bootstrapLock = new Object();
+  private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  private SystemStreamPartitionIterator iterator;
+
+  public CoordinatorStreamStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
+    this.config = config;
+    this.type = namespace;
+    this.keySerde = new JsonSerde<>();
+    this.coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+    SystemFactory systemFactory = CoordinatorStreamUtil.getCoordinatorSystemFactory(config);
+    this.systemProducer = systemFactory.getProducer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry);
+    this.systemConsumer = systemFactory.getConsumer(this.coordinatorSystemStream.getSystem(), config, metricsRegistry);
+    this.systemAdmin = systemFactory.getAdmin(this.coordinatorSystemStream.getSystem(), config);
+  }
+
+  @Override
+  public void init(Config config, MetricsRegistry metricsRegistry) {
+    if (isInitialized.compareAndSet(false, true)) {
+      LOG.info("Starting the coordinator stream system consumer with config: {}.", config);
+      registerConsumer();
+      systemConsumer.start();
+      systemProducer.register(SOURCE);
+      systemProducer.start();
+      iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+      bootstrapMessagesFromStream();
+    } else {
+      LOG.info("Store had already been initialized. Skipping.", coordinatorSystemStreamPartition);
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) {
+    bootstrapMessagesFromStream();
+    return bootstrappedMessages.get(key);
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) {
+    OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(coordinatorSystemStream, 0, key, value);
+    systemProducer.send(SOURCE, envelope);
+    flush();
+  }
+
+  @Override
+  public void delete(byte[] key) {
+    // Since kafka doesn't support individual message deletion, store value as null for a key to delete.
+    put(key, null);
+  }
+
+  @Override
+  public Map<byte[], byte[]> all() {
+    bootstrapMessagesFromStream();
+    return Collections.unmodifiableMap(bootstrappedMessages);
+  }
+
+  /**
+   * Returns all the messages from the earliest offset all the way to the latest.
+   */
+  private void bootstrapMessagesFromStream() {
+    synchronized (bootstrapLock) {
+      while (iterator.hasNext()) {
+        IncomingMessageEnvelope envelope = iterator.next();
+        byte[] keyAsBytes = (byte[]) envelope.getKey();
+        Object[] keyArray = keySerde.fromBytes(keyAsBytes).toArray();
+        CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, new HashMap<>());
+        if (Objects.equals(coordinatorStreamMessage.getType(), type)) {
+          if (envelope.getMessage() != null) {
+            bootstrappedMessages.put(keyAsBytes, (byte[]) envelope.getMessage());
+          } else {
+            bootstrappedMessages.remove(keyAsBytes);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      LOG.info("Stopping the coordinator stream system consumer.", config);
+      systemAdmin.stop();
+      systemProducer.stop();
+      systemConsumer.stop();
+    } catch (Exception e) {
+      LOG.error("Exception occurred when closing the metadata store:", e);
+    }
+  }
+
+  @Override
+  public void flush() {
+    try {
+      systemProducer.flush(SOURCE);
+    } catch (Exception e) {
+      LOG.error("Exception occurred when flushing the metadata store:", e);
+    }
+  }
+
+  private void registerConsumer() {
+    LOG.debug("Attempting to register system stream partition: {}", coordinatorSystemStreamPartition);
+    String streamName = coordinatorSystemStreamPartition.getStream();
+    Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(Sets.newHashSet(streamName));
+
+    SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName);
+    Preconditions.checkNotNull(systemStreamMetadata, String.format("System stream metadata does not exist for stream: %s.", streamName));
+
+    SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition());
+    Preconditions.checkNotNull(systemStreamPartitionMetadata, String.format("System stream partition metadata does not exist for: %s.", coordinatorSystemStreamPartition));
+
+    String startingOffset = systemStreamPartitionMetadata.getOldestOffset();
+    LOG.info("Registering system stream partition: {} with offset: {}.", coordinatorSystemStreamPartition, startingOffset);
+    systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamKeySerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamKeySerde.java
new file mode 100644 (file)
index 0000000..4eb9024
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Serializer for keys written into coordinator stream.
+ */
+public class CoordinatorStreamKeySerde implements Serde<String> {
+
+  private final Serde<List<?>> keySerde;
+  private final String type;
+
+  public CoordinatorStreamKeySerde(String type) {
+    this.type = type;
+    this.keySerde = new JsonSerde<>();
+  }
+
+  @Override
+  public String fromBytes(byte[] bytes) {
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage(keySerde.fromBytes(bytes).toArray(), new HashMap<>());
+    return message.getKey();
+  }
+
+  @Override
+  public byte[] toBytes(String key) {
+    Object[] keyArray = new Object[]{CoordinatorStreamMessage.VERSION, type, key};
+    return keySerde.toBytes(Arrays.asList(keyArray));
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
new file mode 100644 (file)
index 0000000..fee099e
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.stream;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.SamzaException;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+
+/**
+ * Serializer for values written into the coordinator stream.
+ */
+public class CoordinatorStreamValueSerde implements Serde<String> {
+
+  private static final String SOURCE = "SamzaContainer";
+
+  private final String type;
+  private final Serde<Map<String, Object>> messageSerde;
+
+  public CoordinatorStreamValueSerde(String type) {
+    Preconditions.checkNotNull(type);
+    this.type = type;
+    messageSerde = new JsonSerde<>();
+  }
+
+  @Override
+  public String fromBytes(byte[] bytes) {
+    Map<String, Object> values = messageSerde.fromBytes(bytes);
+    CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[]{}, values);
+    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+      SetContainerHostMapping hostMapping = new SetContainerHostMapping(message);
+      return hostMapping.getHostLocality();
+    } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
+      SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping(message);
+      return setTaskContainerMapping.getTaskAssignment();
+    } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
+      SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
+      return String.valueOf(changelogMapping.getPartition());
+    } else {
+      throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
+    }
+  }
+
+  @Override
+  public byte[] toBytes(String value) {
+    if (type.equalsIgnoreCase(SetContainerHostMapping.TYPE)) {
+      SetContainerHostMapping hostMapping = new SetContainerHostMapping(SOURCE, "", value, "", "");
+      return messageSerde.toBytes(hostMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetTaskContainerMapping.TYPE)) {
+      SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping(SOURCE, "", value);
+      return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
+      SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
+      return messageSerde.toBytes(changelogMapping.getMessageMap());
+    } else {
+      throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
+    }
+  }
+}
index 15d9d20..ddcaa5e 100644 (file)
@@ -23,6 +23,7 @@ package org.apache.samza.config
 import java.io.File
 
 import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.coordinator.metadatastore.CoordinatorStreamMetadataStoreFactory
 import org.apache.samza.runtime.DefaultLocationIdProviderFactory
 import org.apache.samza.util.Logging
 
@@ -78,6 +79,7 @@ object JobConfig {
   val DEFAULT_MONITOR_PARTITION_CHANGE_FREQUENCY_MS = 300000
   val JOB_SECURITY_MANAGER_FACTORY = "job.security.manager.factory"
 
+  val METADATA_STORE_FACTORY = "metadata.store.factory"
   val LOCATION_ID_PROVIDER_FACTORY = "locationid.provider.factory"
 
   // Processor Config Constants
@@ -199,6 +201,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
 
   def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)
 
+  def getMetadataStoreFactory = getOption(JobConfig.METADATA_STORE_FACTORY).getOrElse(classOf[CoordinatorStreamMetadataStoreFactory].getCanonicalName)
+
   def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
 
   def getDiagnosticsAppenderClass = {
index 47b73c1..0c889d2 100644 (file)
@@ -43,7 +43,6 @@ import org.apache.samza.config._
 import org.apache.samza.container.disk.DiskSpaceMonitor.Listener
 import org.apache.samza.container.disk.{DiskQuotaPolicyFactory, DiskSpaceMonitor, NoThrottlingDiskQuotaPolicyFactory, PollingScanDiskSpaceMonitor}
 import org.apache.samza.container.host.{StatisticsMonitorImpl, SystemMemoryStatistics, SystemStatisticsMonitor}
-import org.apache.samza.coordinator.stream.{CoordinatorStreamManager, CoordinatorStreamSystemProducer}
 import org.apache.samza.job.model.{ContainerModel, JobModel}
 import org.apache.samza.metrics.{JmxServer, JvmMetrics, MetricsRegistryMap, MetricsReporter}
 import org.apache.samza.serializers._
@@ -130,13 +129,10 @@ object SamzaContainer extends Logging {
     val containerName = "samza-container-%s" format containerId
     val maxChangeLogStreamPartitions = jobModel.maxChangeLogStreamPartitions
 
-    var coordinatorStreamManager: CoordinatorStreamManager = null
     var localityManager: LocalityManager = null
     if (new ClusterManagerConfig(config).getHostAffinityEnabled()) {
       val registryMap = new MetricsRegistryMap(containerName)
-      val coordinatorStreamSystemProducer = new CoordinatorStreamSystemProducer(config, new SamzaContainerMetrics(containerName, registryMap).registry)
-      coordinatorStreamManager = new CoordinatorStreamManager(coordinatorStreamSystemProducer)
-      localityManager = new LocalityManager(coordinatorStreamManager)
+      localityManager = new LocalityManager(config, registryMap)
     }
 
     val containerPID = ManagementFactory.getRuntimeMXBean().getName()
@@ -723,7 +719,6 @@ object SamzaContainer extends Logging {
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
       offsetManager = offsetManager,
-      coordinatorStreamManager = coordinatorStreamManager,
       localityManager = localityManager,
       securityManager = securityManager,
       metrics = samzaContainerMetrics,
@@ -761,7 +756,6 @@ class SamzaContainer(
   diskSpaceMonitor: DiskSpaceMonitor = null,
   hostStatisticsMonitor: SystemStatisticsMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
-  coordinatorStreamManager: CoordinatorStreamManager = null,
   localityManager: LocalityManager = null,
   securityManager: SecurityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -965,22 +959,14 @@ class SamzaContainer(
 
   def startLocalityManager {
     if(localityManager != null) {
-      if(coordinatorStreamManager == null) {
-        // This should never happen.
-        throw new IllegalStateException("Cannot start LocalityManager without a CoordinatorStreamManager")
-      }
-
       val containerName = "SamzaContainer-" + String.valueOf(containerContext.id)
-      info("Registering %s with the coordinator stream manager." format containerName)
-      coordinatorStreamManager.start
-      coordinatorStreamManager.register(containerName)
-
-      info("Writing container locality and JMX address to Coordinator Stream")
+      info("Registering %s with metadata store" format containerName)
       try {
         val hostInet = Util.getLocalHost
         val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
         val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
-        localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName, jmxUrl, jmxTunnelingUrl)
+        info("Writing container locality and JMX address to metadata store")
+        localityManager.writeContainerToHostMapping(containerContext.id, hostInet.getHostName)
       } catch {
         case uhe: UnknownHostException =>
           warn("Received UnknownHostException when persisting locality info for container %s: " +
@@ -1156,9 +1142,9 @@ class SamzaContainer(
   }
 
   def shutdownLocalityManager {
-    if(coordinatorStreamManager != null) {
-      info("Shutting down coordinator stream manager used by locality manager.")
-      coordinatorStreamManager.stop
+    if(localityManager != null) {
+      info("Shutting down locality manager.")
+      localityManager.close()
     }
   }
 
index df37ecd..f939736 100644 (file)
@@ -38,6 +38,7 @@ import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.coordinator.stream.CoordinatorStreamManager
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.job.model.TaskModel
+import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -68,11 +69,10 @@ object JobModelManager extends Logging {
    * @return JobModelManager
    */
   def apply(coordinatorStreamManager: CoordinatorStreamManager, changelogPartitionMapping: util.Map[TaskName, Integer]) = {
-    val localityManager = new LocalityManager(coordinatorStreamManager)
-
     val config = coordinatorStreamManager.getConfig
+    val localityManager = new LocalityManager(config, new MetricsRegistryMap())
 
-      // Map the name of each system to the corresponding SystemAdmin
+    // Map the name of each system to the corresponding SystemAdmin
     val systemAdmins = new SystemAdmins(config)
     val streamMetadataCache = new StreamMetadataCache(systemAdmins, 0)
 
@@ -99,7 +99,7 @@ object JobModelManager extends Logging {
 
     val server = new HttpServer
     server.addServlet("/", new JobServlet(jobModelRef))
-    currentJobModelManager = new JobModelManager(jobModel, server)
+    currentJobModelManager = new JobModelManager(jobModel, server, localityManager)
     currentJobModelManager
   }
 
@@ -241,7 +241,12 @@ class JobModelManager(
   /**
    * HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
    */
-  val server: HttpServer = null) extends Logging {
+  val server: HttpServer = null,
+
+  /**
+   * LocalityManager employed to read and write container and task locality information to metadata store.
+   */
+  val localityManager: LocalityManager = null) extends Logging {
 
   debug("Got job model: %s." format jobModel)
 
@@ -258,6 +263,11 @@ class JobModelManager(
       debug("Stopping HTTP server.")
       server.stop
       info("Stopped HTTP server.")
+      if (localityManager != null) {
+        info("Stopping localityManager")
+        localityManager.close()
+        info("Stopped localityManager")
+      }
     }
   }
 }
index 877adc5..e2776b2 100644 (file)
@@ -19,7 +19,6 @@
 
 package org.apache.samza.clustermanager;
 
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -28,20 +27,29 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.system.MockSystemFactory;
+import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
-
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link ClusterBasedJobCoordinator}
  */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestClusterBasedJobCoordinator {
 
   Map<String, String> configMap;
@@ -57,6 +65,12 @@ public class TestClusterBasedJobCoordinator {
     configMap.put("job.coordinator.monitor-partition-change.frequency.ms", "1");
 
     MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(0)), new ArrayList<>());
+    MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "__samza_coordinator_test-job_1", new Partition(0)), new ArrayList<>());
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(new MockCoordinatorStreamSystemFactory());
+    when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("kafka", "test"));
+    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
   }
 
   @After
@@ -65,8 +79,7 @@ public class TestClusterBasedJobCoordinator {
   }
 
   @Test
-  public void testPartitionCountMonitorWithDurableStates()
-      throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
+  public void testPartitionCountMonitorWithDurableStates() {
     configMap.put("stores.mystore.changelog", "mychangelog");
     Config config = new MapConfig(configMap);
 
@@ -85,7 +98,7 @@ public class TestClusterBasedJobCoordinator {
   }
 
   @Test
-  public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
+  public void testPartitionCountMonitorWithoutDurableStates() {
     Config config = new MapConfig(configMap);
 
     // mimic job runner code to write the config to coordinator stream
@@ -101,5 +114,4 @@ public class TestClusterBasedJobCoordinator {
     monitor.updatePartitionCountMetric();
     assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
   }
-
 }
index 051ff13..387491f 100644 (file)
 
 package org.apache.samza.container;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
-import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.runner.RunWith;
 import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.*;
-
+import static org.mockito.Mockito.*;
 
 /**
  * Unit tests for {@link LocalityManager}
  */
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestLocalityManager {
 
-  private final MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
-      new MockCoordinatorStreamSystemFactory();
-  private final Config config = new MapConfig(
-      new HashMap<String, String>() {
-        {
-          this.put("job.name", "test-job");
-          this.put("job.coordinator.system", "test-kafka");
-        }
-      });
+  private MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory;
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
 
   @Before
   public void setup() {
+    mockCoordinatorStreamSystemFactory = new MockCoordinatorStreamSystemFactory();
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
+    when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test"));
+    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
   }
 
   @After
@@ -62,24 +67,10 @@ public class TestLocalityManager {
     MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
   }
 
-  @Test public void testLocalityManager() throws Exception {
-    MockCoordinatorStreamSystemProducer producer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    MockCoordinatorStreamSystemConsumer consumer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
-    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
-    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
+  @Test public void testLocalityManager() {
+    LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
 
-    coordinatorStreamManager.register("SamzaContainer-containerId-0");
-    assertTrue(producer.isRegistered());
-    assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-0");
-    assertTrue(consumer.isRegistered());
-
-    coordinatorStreamManager.start();
-    assertTrue(producer.isStarted());
-    assertTrue(consumer.isStarted());
-
-    localityManager.writeContainerToHostMapping("0", "localhost", "jmx:localhost:8080", "jmx:tunnel:localhost:9090");
+    localityManager.writeContainerToHostMapping("0", "localhost");
     Map<String, Map<String, String>> localMap = localityManager.readContainerLocality();
     Map<String, Map<String, String>> expectedMap =
       new HashMap<String, Map<String, String>>() {
@@ -88,49 +79,34 @@ public class TestLocalityManager {
             new HashMap<String, String>() {
               {
                 this.put(SetContainerHostMapping.HOST_KEY, "localhost");
-                this.put(SetContainerHostMapping.JMX_URL_KEY, "jmx:localhost:8080");
-                this.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, "jmx:tunnel:localhost:9090");
               }
             });
         }
       };
     assertEquals(expectedMap, localMap);
 
-    coordinatorStreamManager.stop();
+    localityManager.close();
+
+    MockCoordinatorStreamSystemProducer producer = mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+    MockCoordinatorStreamSystemConsumer consumer = mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
     assertTrue(producer.isStopped());
     assertTrue(consumer.isStopped());
   }
 
   @Test public void testWriteOnlyLocalityManager() {
-    MockCoordinatorStreamSystemProducer producer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(producer);
-    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
-
-    coordinatorStreamManager.register("SamzaContainer-containerId-1");
-    assertTrue(producer.isRegistered());
-    assertEquals(producer.getRegisteredSource(), "SamzaContainer-containerId-1");
-
-    coordinatorStreamManager.start();
-    assertTrue(producer.isStarted());
-
-    localityManager.writeContainerToHostMapping("1", "localhost", "jmx:localhost:8181", "jmx:tunnel:localhost:9191");
-    try {
-      localityManager.readContainerLocality();
-      fail("Should have thrown UnsupportedOperationException");
-    } catch (UnsupportedOperationException uoe) {
-      // expected
-    }
-    assertEquals(producer.getEnvelopes().size(), 1);
-    CoordinatorStreamMessage coordinatorStreamMessage =
-        MockCoordinatorStreamSystemFactory.deserializeCoordinatorStreamMessage(producer.getEnvelopes().get(0));
-
-    SetContainerHostMapping expectedContainerMap =
-        new SetContainerHostMapping("SamzaContainer-1", "1", "localhost", "jmx:localhost:8181",
-            "jmx:tunnel:localhost:9191");
-    assertEquals(expectedContainerMap, coordinatorStreamMessage);
-
-    coordinatorStreamManager.stop();
+    LocalityManager localityManager = new LocalityManager(config, new MetricsRegistryMap());
+
+    localityManager.writeContainerToHostMapping("1", "localhost");
+
+    assertEquals(localityManager.readContainerLocality().size(), 1);
+
+    assertEquals(ImmutableMap.of("1", ImmutableMap.of("host", "localhost")), localityManager.readContainerLocality());
+
+    localityManager.close();
+
+    MockCoordinatorStreamSystemProducer producer = mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
+    MockCoordinatorStreamSystemConsumer consumer = mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
     assertTrue(producer.isStopped());
+    assertTrue(consumer.isStopped());
   }
 }
index 7b57a8b..879171e 100644 (file)
@@ -21,31 +21,43 @@ package org.apache.samza.container.grouper.task;
 
 import java.util.HashMap;
 import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemConsumer;
-import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory.MockCoordinatorStreamSystemProducer;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.system.*;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(CoordinatorStreamUtil.class)
 public class TestTaskAssignmentManager {
-  private final Config config = new MapConfig(
-      new HashMap<String, String>() {
-        {
-          this.put("job.name", "test-job");
-          this.put("job.coordinator.system", "test-kafka");
-        }
-      });
+
+  private MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory;
+
+  private final Config config = new MapConfig(ImmutableMap.of("job.name", "test-job", "job.coordinator.system", "test-kafka"));
 
   @Before
   public void setup() {
     MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+    mockCoordinatorStreamSystemFactory = new MockCoordinatorStreamSystemFactory();
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(mockCoordinatorStreamSystemFactory);
+    when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test"));
+    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
   }
 
   @After
@@ -53,35 +65,12 @@ public class TestTaskAssignmentManager {
     MockCoordinatorStreamSystemFactory.disableMockConsumerCache();
   }
 
-  @Test public void testTaskAssignmentManager() throws Exception {
-    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
-        new MockCoordinatorStreamSystemFactory();
-    MockCoordinatorStreamSystemProducer producer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    MockCoordinatorStreamSystemConsumer consumer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
-    consumer.register();
-    CoordinatorStreamManager
-        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
-
-    assertTrue(producer.isRegistered());
-    assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-
-    coordinatorStreamManager.start();
-    assertTrue(producer.isStarted());
-    assertTrue(consumer.isStarted());
-
-    Map<String, String> expectedMap =
-      new HashMap<String, String>() {
-        {
-          this.put("Task0", "0");
-          this.put("Task1", "1");
-          this.put("Task2", "2");
-          this.put("Task3", "0");
-          this.put("Task4", "1");
-        }
-      };
+  @Test
+  public void testTaskAssignmentManager() {
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
+    taskAssignmentManager.init(config, new MetricsRegistryMap());
+
+    Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
       taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
@@ -91,37 +80,14 @@ public class TestTaskAssignmentManager {
 
     assertEquals(expectedMap, localMap);
 
-    coordinatorStreamManager.stop();
-    assertTrue(producer.isStopped());
-    assertTrue(consumer.isStopped());
+    taskAssignmentManager.close();
   }
 
-  @Test public void testDeleteMappings() throws Exception {
-    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
-        new MockCoordinatorStreamSystemFactory();
-    MockCoordinatorStreamSystemProducer producer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    MockCoordinatorStreamSystemConsumer consumer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
-    consumer.register();
-    CoordinatorStreamManager
-        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
-
-    assertTrue(producer.isRegistered());
-    assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-
-    coordinatorStreamManager.start();
-    assertTrue(producer.isStarted());
-    assertTrue(consumer.isStarted());
-
-    Map<String, String> expectedMap =
-      new HashMap<String, String>() {
-        {
-          this.put("Task0", "0");
-          this.put("Task1", "1");
-        }
-      };
+  @Test public void testDeleteMappings() {
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
+    taskAssignmentManager.init(config, new MetricsRegistryMap());
+
+    Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
 
     for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
       taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue());
@@ -131,40 +97,22 @@ public class TestTaskAssignmentManager {
     assertEquals(expectedMap, localMap);
 
     taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
+
     Map<String, String> deletedMap = taskAssignmentManager.readTaskAssignment();
     assertTrue(deletedMap.isEmpty());
 
-    coordinatorStreamManager.stop();
-    assertTrue(producer.isStopped());
-    assertTrue(consumer.isStopped());
+    taskAssignmentManager.close();
   }
 
-  @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() throws Exception {
-    MockCoordinatorStreamSystemFactory mockCoordinatorStreamSystemFactory =
-        new MockCoordinatorStreamSystemFactory();
-    MockCoordinatorStreamSystemProducer producer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(config, null);
-    MockCoordinatorStreamSystemConsumer consumer =
-        mockCoordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(config, null);
-    consumer.register();
-    CoordinatorStreamManager
-        coordinatorStreamManager = new CoordinatorStreamManager(producer, consumer);
-    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamManager);
-
-    assertTrue(producer.isRegistered());
-    assertEquals(producer.getRegisteredSource(), "SamzaTaskAssignmentManager");
-
-    coordinatorStreamManager.start();
-    assertTrue(producer.isStarted());
-    assertTrue(consumer.isStarted());
+  @Test public void testTaskAssignmentManagerEmptyCoordinatorStream() {
+    TaskAssignmentManager taskAssignmentManager = new TaskAssignmentManager(config, new MetricsRegistryMap());
+    taskAssignmentManager.init(config, new MetricsRegistryMap());
 
     Map<String, String> expectedMap = new HashMap<>();
     Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
 
     assertEquals(expectedMap, localMap);
 
-    coordinatorStreamManager.stop();
-    assertTrue(producer.isStopped());
-    assertTrue(consumer.isStopped());
+    taskAssignmentManager.close();
   }
 }
index 6a69889..b7514c4 100644 (file)
@@ -49,7 +49,7 @@ public class JobModelManagerTestUtil {
       containers.put(String.valueOf(i), container);
     }
     JobModel jobModel = new JobModel(config, containers, localityManager);
-    return new JobModelManager(jobModel, server);
+    return new JobModelManager(jobModel, server, null);
   }
 
   public static JobModelManager getJobModelManagerUsingReadModel(Config config, int containerCount, StreamMetadataCache streamMetadataCache,
@@ -59,7 +59,7 @@ public class JobModelManagerTestUtil {
       containerIds.add(String.valueOf(i));
     }
     JobModel jobModel = JobModelManager.readJobModel(config, new HashMap<>(), locManager, streamMetadataCache, containerIds);
-    return new JobModelManager(jobModel, server);
+    return new JobModelManager(jobModel, server, null);
   }
 
 
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java b/samza-core/src/test/java/org/apache/samza/coordinator/metadatastore/TestCoordinatorStreamStore.java
new file mode 100644 (file)
index 0000000..da2d984
--- /dev/null
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator.metadatastore;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
+import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.CoordinatorStreamUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(CoordinatorStreamUtil.class)
+public class TestCoordinatorStreamStore {
+
+  private CoordinatorStreamStore coordinatorStreamStore;
+
+  @Before
+  public void setUp() {
+    MockCoordinatorStreamSystemFactory.enableMockConsumerCache();
+    Map<String, String> configMap = ImmutableMap.of("job.name", "test-job",
+                                                    "job.coordinator.system", "test-kafka");
+    MockCoordinatorStreamSystemFactory systemFactory = new MockCoordinatorStreamSystemFactory();
+    PowerMockito.mockStatic(CoordinatorStreamUtil.class);
+    when(CoordinatorStreamUtil.getCoordinatorSystemFactory(anyObject())).thenReturn(systemFactory);
+    when(CoordinatorStreamUtil.getCoordinatorSystemStream(anyObject())).thenReturn(new SystemStream("test-kafka", "test"));
+    when(CoordinatorStreamUtil.getCoordinatorStreamName(anyObject(), anyObject())).thenReturn("test");
+    coordinatorStreamStore = new CoordinatorStreamStore(SetTaskContainerMapping.TYPE, new MapConfig(configMap), new MetricsRegistryMap());
+    coordinatorStreamStore.init(new MapConfig(), new MetricsRegistryMap());
+  }
+
+  @Test
+  public void testReadAfterWrite() {
+    byte[] key = getKey("test-key1");
+    byte[] value = getValue("test-value1");
+    Assert.assertNull(coordinatorStreamStore.get(key));
+    coordinatorStreamStore.put(key, value);
+    Assert.assertEquals(value, coordinatorStreamStore.get(key));
+    Assert.assertEquals(1, coordinatorStreamStore.all().size());
+  }
+
+  @Test
+  public void testReadAfterDelete() {
+    byte[] key = getKey("test-key1");
+    byte[] value = getValue("test-value1");
+    Assert.assertNull(coordinatorStreamStore.get(key));
+    coordinatorStreamStore.put(key, value);
+    Assert.assertEquals(value, coordinatorStreamStore.get(key));
+    coordinatorStreamStore.delete(key);
+    Assert.assertNull(coordinatorStreamStore.get(key));
+    Assert.assertEquals(0, coordinatorStreamStore.all().size());
+  }
+
+  @Test
+  public void testReadOfNonExistentKey() {
+    Assert.assertNull(coordinatorStreamStore.get("randomKey".getBytes()));
+    Assert.assertEquals(0, coordinatorStreamStore.all().size());
+  }
+
+  @Test
+  public void testMultipleUpdatesForSameKey() {
+    byte[] key = getKey("test-key1");
+    byte[] value = getValue("test-value1");
+    byte[] value1 = getValue("test-value2");
+    coordinatorStreamStore.put(key, value);
+    coordinatorStreamStore.put(key, value1);
+    Assert.assertEquals(value1, coordinatorStreamStore.get(key));
+    Assert.assertEquals(1, coordinatorStreamStore.all().size());
+  }
+
+  @Test
+  public void testAllEntries() {
+    byte[] key = getKey("test-key1");
+    byte[] key1 = getKey("test-key2");
+    byte[] key2 = getKey("test-key3");
+    byte[] value = getValue("test-value1");
+    byte[] value1 = getValue("test-value2");
+    byte[] value2 = getValue("test-value3");
+    coordinatorStreamStore.put(key, value);
+    coordinatorStreamStore.put(key1, value1);
+    coordinatorStreamStore.put(key2, value2);
+    ImmutableMap<byte[], byte[]> expected = ImmutableMap.of(key, value, key1, value1, key2, value2);
+    Assert.assertEquals(expected, coordinatorStreamStore.all());
+  }
+
+  private byte[] getValue(String value) {
+    Serde<Map<String, Object>> messageSerde = new JsonSerde<>();
+    SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", "testTask", value);
+    return messageSerde.toBytes(setTaskContainerMapping.getMessageMap());
+  }
+
+  private byte[] getKey(String key) {
+    JsonSerde<List<?>> keySerde = new JsonSerde<>();
+    SetTaskContainerMapping setTaskContainerMapping = new SetTaskContainerMapping("testSource", key, "");
+    return keySerde.toBytes(Arrays.asList(setTaskContainerMapping.getKeyArray()));
+  }
+
+}
index 2da47f8..b14d14b 100644 (file)
@@ -34,7 +34,6 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.LocalityManager;
-import org.apache.samza.coordinator.stream.CoordinatorStreamManager;
 import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.metrics.MetricsRegistryMap;
@@ -130,8 +129,7 @@ public class SamzaTaskProxy implements TaskProxy {
    * @return list of {@link Task} constructed from job model in coordinator stream.
    */
   protected List<Task> readTasksFromCoordinatorStream(CoordinatorStreamSystemConsumer consumer) {
-    CoordinatorStreamManager coordinatorStreamManager = new CoordinatorStreamManager(consumer);
-    LocalityManager localityManager = new LocalityManager(coordinatorStreamManager);
+    LocalityManager localityManager = new LocalityManager(consumer.getConfig(), new MetricsRegistryMap());
     Map<String, Map<String, String>> containerIdToHostMapping = localityManager.readContainerLocality();
     Map<String, String> taskNameToContainerIdMapping = localityManager.getTaskAssignmentManager().readTaskAssignment();
     StorageConfig storageConfig = new StorageConfig(consumer.getConfig());