SAMZA-1748: Standalone failure tests.
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Fri, 22 Jun 2018 22:53:43 +0000 (15:53 -0700)
committerxiliu <xiliu@linkedin.com>
Fri, 22 Jun 2018 22:53:43 +0000 (15:53 -0700)
In the standalone model, a processor can leave and join the group at any point in time.  This processor reshuffle is referred to as rebalancing which results in task(work) redistribution amongst other available, live processors in the group.

Processor rebalancing in existing standalone integration tests(junit tests) is accomplished through clean shutdown of the processors. However, in real production scenarios, processor rebalancing is triggered through unclean shutdown and full garbage collection(GC) of the processors.

As a part of this patch to cover those scenarios, the following integration tests are added.

1. Force killing the leader processor of the group.
2. Force killing a single follower in the group.
3. Force killing multiple followers in the group.
4. Force killing the leader and a follower in the  group.
5. Suspending and resuming the leader of the group.

Since existing standalone integration tests cover event consumption/production after the re-balancing phase, these new tests will just test the coordination. We'll iterate on this initial suite and add tests whenever necessary.

Author: Shanthoosh Venkataraman <santhoshvenkat1988@gmail.com>

Reviewers: Jagadish V <vjagadish1989@apache.org>

Closes #554 from shanthoosh/standalone_failure_tests

13 files changed:
README.md
bin/integration-tests.sh
samza-test/src/main/config/standalone.failure.test.properties [new file with mode: 0644]
samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java [new file with mode: 0644]
samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java [new file with mode: 0644]
samza-test/src/main/python/configs/kafka.json
samza-test/src/main/python/deployment.py
samza-test/src/main/python/requirements.txt
samza-test/src/main/python/standalone_deployment.py [new file with mode: 0644]
samza-test/src/main/python/standalone_integration_tests.py [new file with mode: 0644]
samza-test/src/main/python/stream_processor.py [new file with mode: 0644]
samza-test/src/main/python/tests/standalone_failure_tests.py [new file with mode: 0644]
samza-test/src/main/python/tests/zk_client.py [new file with mode: 0644]

index a431281..1a7fdd9 100644 (file)
--- a/README.md
+++ b/README.md
@@ -46,9 +46,13 @@ To run key-value performance tests:
 
     ./gradlew samza-shell:kvPerformanceTest -PconfigPath=file://$PWD/samza-test/src/main/config/perf/kv-perf.properties
 
-To run all integration tests:
+To run yarn integration tests:
 
-    ./bin/integration-tests.sh <dir>
+    ./bin/integration-tests.sh <dir> yarn-integration-tests
+
+To run standalone integration tests:
+
+    ./bin/integration-tests.sh <dir> standalone-integration-tests
 
 ### Running checkstyle on the java code ###
 
index 14fcd1c..248236f 100755 (executable)
@@ -19,6 +19,7 @@
 DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 BASE_DIR=$DIR/..
 TEST_DIR=$1
+FAILURE_TEST_TYPE=$2
 
 if test -z "$TEST_DIR"; then
   echo
@@ -70,17 +71,25 @@ source $SAMZA_INTEGRATION_TESTS_DIR/bin/activate
 # install zopkio and requests
 pip install -r $SCRIPTS_DIR/requirements.txt
 
-# treat all trailing parameters (after dirname) as zopkio switches
+# treat all trailing parameters (after dirname, test_type) as zopkio switches
 shift
-SWITCHES="$*"
+SWITCHES="${*:3}"
 
 # default to info-level debugging if not specified
 if [[ $SWITCHES != *"console-log-level"* ]]; then
   SWITCHES="$SWITCHES --console-log-level INFO"
 fi
 
-# run the tests
-zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
+if [[ ${FAILURE_TEST_TYPE} == "yarn-integration-tests" ]]; then
+    echo "Running yarn integration tests."
+    zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/integration_tests.py
+elif [[ ${FAILURE_TEST_TYPE} == "standalone-integration-tests" ]]; then
+    echo "Running standalone integration tests."
+    zopkio --config-overrides remote_install_path=$ABS_TEST_DIR $SWITCHES $SCRIPTS_DIR/standalone_integration_tests.py
+else
+    echo "Invalid failure test type: $FAILURE_TEST_TYPE"
+    exit -1
+fi
 
 # go back to execution directory
 deactivate
diff --git a/samza-test/src/main/config/standalone.failure.test.properties b/samza-test/src/main/config/standalone.failure.test.properties
new file mode 100644 (file)
index 0000000..d855d5f
--- /dev/null
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+app.runner.class=org.apache.samza.runtime.LocalApplicationRunner
+
+app.class=org.apache.samza.test.integration.TestStandaloneIntegrationApplication
+
+app.name=test-app-name
+app.id=test-app-id
+job.name=test-app-name
+job.id=test-app-id
+
+## Kafka I/O system properties.
+task.inputs=standalone_integration_test_kafka_input_topic
+input.stream.name=standalone_integration_test_kafka_input_topic
+job.default.system=testSystemName
+systems.testSystemName.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.testSystemName.producer.bootstrap.servers=localhost:9092
+systems.testSystemName.consumer.zookeeper.connect=localhost:2181
+
+## Zookeeper coordination properties
+job.coordinator.zk.connect=localhost:2181
+job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory
+task.shutdown.ms=4000
+job.debounce.time.ms=4000
+job.coordinator.zk.consensus.timeout.ms=4000
+job.coordinator.zk.session.timeout.ms=4000
+
+job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory
\ No newline at end of file
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java b/samza-test/src/main/java/org/apache/samza/test/integration/LocalApplicationRunnerMain.java
new file mode 100644 (file)
index 0000000..e8be592
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.integration;
+
+import joptsimple.OptionSet;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.runtime.ApplicationRunnerMain;
+import org.apache.samza.runtime.ApplicationRunnerOperation;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.samza.runtime.ApplicationRunnerMain.STREAM_APPLICATION_CLASS_CONFIG;
+
+/**
+ * {@link ApplicationRunnerMain} was designed for deploying {@link StreamApplication} in yarn
+ * and doesn't work for in standalone.
+ *
+ * This runner class is built for standalone failure tests and not recommended for general use.
+ */
+public class LocalApplicationRunnerMain {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(LocalApplicationRunnerMain.class);
+
+  public static void main(String[] args) throws Exception {
+    ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
+    OptionSet options = cmdLine.parser().parse(args);
+    Config orgConfig = cmdLine.loadConfig(options);
+    Config config = Util.rewriteConfig(orgConfig);
+
+    ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+    StreamApplication app = (StreamApplication) Class.forName(config.get(STREAM_APPLICATION_CLASS_CONFIG)).newInstance();
+
+    ApplicationRunnerOperation op = cmdLine.getOperation(options);
+
+    try {
+      LOGGER.info("Launching stream application: {} to run.", app);
+      runner.run(app);
+      runner.waitForFinish();
+    } catch (Exception e) {
+      LOGGER.error("Exception occurred when invoking: {} on application: {}.", op, app, e);
+    }
+  }
+}
diff --git a/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java b/samza-test/src/main/java/org/apache/samza/test/integration/TestStandaloneIntegrationApplication.java
new file mode 100644 (file)
index 0000000..f6e3d5f
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.test.integration;
+
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Acts as a pass through filter for all the events from a input stream.
+ */
+public class TestStandaloneIntegrationApplication implements StreamApplication {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TestStandaloneIntegrationApplication.class);
+
+  @Override
+  public void init(StreamGraph graph, Config config) {
+    String inputStream = config.get("input.stream.name");
+    String outputStreamName = "standaloneIntegrationTestKafkaOutputTopic";
+    LOGGER.info("Publishing message to: {}.", outputStreamName);
+    graph.getInputStream(inputStream).sendTo(graph.getOutputStream(outputStreamName));
+  }
+}
index 91fe23e..14b2137 100644 (file)
@@ -3,7 +3,7 @@
     "kafka_instance_0": "localhost"
   },
   "kafka_port": 9092,
-  "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties",
+  "kafka_start_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-start.sh -daemon kafka_2.10-0.10.1.1/config/server.properties --override delete.topic.enable=true",
   "kafka_stop_cmd": "kafka_2.10-0.10.1.1/bin/kafka-server-stop.sh",
   "kafka_install_path": "deploy/kafka",
   "kafka_executable": "kafka_2.10-0.10.1.1.tgz",
index 89ba728..7cd3cac 100644 (file)
@@ -72,9 +72,7 @@ def setup_suite():
     runtime.set_deployer(name, deployer)
     for instance, host in c(name + '_hosts').iteritems():
       logger.info('Deploying {0} on host: {1}'.format(instance, host))
-      deployer.deploy(instance, {
-        'hostname': host
-      })
+      deployer.start(instance, {'hostname': host})
 
   # Setup Samza job deployer.
   samza_job_deployer = SamzaJobYarnDeployer({
index cf43a23..e39538e 100644 (file)
@@ -19,3 +19,4 @@ zopkio==0.2.5
 requests
 kafka-python==1.3.3
 Jinja2
+kazoo==2.5
\ No newline at end of file
diff --git a/samza-test/src/main/python/standalone_deployment.py b/samza-test/src/main/python/standalone_deployment.py
new file mode 100644 (file)
index 0000000..cd038ef
--- /dev/null
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import zopkio.adhoc_deployer as adhoc_deployer
+from zopkio.runtime import get_active_config as c
+from subprocess import PIPE, Popen
+import logging
+import time
+import urllib
+import os
+
+TEST_INPUT_TOPIC = 'standalone_integration_test_kafka_input_topic'
+TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'
+
+logger = logging.getLogger(__name__)
+deployers = {}
+
+def setup_suite():
+    """
+    Setup method that will be run once by zopkio test_runner before all the integration tests.
+    """
+    ## Download and deploy zk and kafka. Configuration for kafka, zookeeper are defined in kafka.json and zookeeper.json.
+    _download_components(['zookeeper', 'kafka'])
+
+    _deploy_components(['zookeeper', 'kafka'])
+
+    ## Create input and output topics.
+    for topic in [TEST_INPUT_TOPIC, TEST_OUTPUT_TOPIC]:
+        logger.info("Deleting topic: {0}.".format(topic))
+        _delete_kafka_topic('localhost:2181', topic)
+        logger.info("Creating topic: {0}.".format(topic))
+        _create_kafka_topic('localhost:2181', topic, 3, 1)
+
+def _download_components(components):
+    """
+    Download the :param components if unavailable in deployment directory using url defined in config.
+    """
+
+    for component in components:
+        url_key = 'url_{0}'.format(component)
+        url = c(url_key)
+        filename = os.path.basename(url)
+        if os.path.exists(filename):
+            logger.debug('Using cached file: {0}.'.format(filename))
+        else:
+            logger.info('Downloading {0} from {1}.'.format(component, url))
+            urllib.urlretrieve(url, filename)
+
+def _deploy_components(components):
+    """
+    Install and start all the :param components through binaries in deployment directory.
+    """
+
+    global deployers
+
+    for component in components:
+        config =  {
+            'install_path': os.path.join(c('remote_install_path'), c(component + '_install_path')),
+            'executable': c(component + '_executable'),
+            'post_install_cmds': c(component + '_post_install_cmds', []),
+            'start_command': c(component + '_start_cmd'),
+            'stop_command': c(component + '_stop_cmd'),
+            'extract': True,
+            'sync': True,
+        }
+        deployer = adhoc_deployer.SSHDeployer(component, config)
+        deployers[component] = deployer
+        for instance, host in c(component + '_hosts').iteritems():
+            logger.info('Deploying {0} on host: {1}'.format(instance, host))
+            deployer.start(instance, {'hostname': host})
+            time.sleep(5)
+
+def _create_kafka_topic(zookeeper_servers, topic_name, partition_count, replication_factor):
+    """
+    :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
+    :param topic_name: name of kafka topic to create.
+    :param partition_count: Number of partitions of the kafka topic.
+    :param replication_factor: Replication factor of the kafka topic.
+    """
+
+    ### Using command line utility to create kafka topic since kafka python API doesn't support configuring partitionCount during topic creation.
+    base_dir = os.getcwd()
+    create_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --create --zookeeper {1} --replication-factor {2} --partitions {3} --topic {4}'.format(base_dir, zookeeper_servers, replication_factor, partition_count, topic_name)
+    p = Popen(create_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+    output, err = p.communicate()
+    logger.info("Output from create kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))
+
+def _delete_kafka_topic(zookeeper_servers, topic_name):
+    """
+    Delete kafka topic defined by the method parameters.
+
+    :param zookeeper_servers: Comma separated list of zookeeper servers used for setting up kafka consumer connector.
+    :param topic_name: name of kafka topic to delete.
+    """
+
+    base_dir = os.getcwd()
+    delete_topic_command = 'sh {0}/deploy/kafka/kafka_2.10-0.10.1.1/bin/kafka-topics.sh --delete --zookeeper {1} --topic {2}'.format(base_dir, zookeeper_servers, topic_name)
+    logger.info("Deleting topic: {0}.".format(topic_name))
+    p = Popen(delete_topic_command.split(' '), stdin=PIPE, stdout=PIPE, stderr=PIPE)
+    output, err = p.communicate()
+    logger.info("Output from delete kafka topic: {0}\nstdout: {1}\nstderr: {2}".format(topic_name, output, err))
+
+def teardown_suite():
+    """
+    Teardown method that will be run once by zopkio test_runner after all the integration tests.
+    """
+    for component in ['kafka', 'zookeeper']:
+        deployer = deployers[component]
+        for instance, host in c(component + '_hosts').iteritems():
+            deployer.undeploy(instance)
diff --git a/samza-test/src/main/python/standalone_integration_tests.py b/samza-test/src/main/python/standalone_integration_tests.py
new file mode 100644 (file)
index 0000000..8405413
--- /dev/null
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# 'License'); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+
+dir = os.path.dirname(os.path.abspath(__file__))
+
+test = {
+    'deployment_code': os.path.join(dir, 'standalone_deployment.py'),
+    'perf_code': os.path.join(dir, 'perf.py'),
+    'configs_directory': os.path.join(dir, 'configs'),
+    'test_code': [
+        os.path.join(dir, 'tests', 'standalone_failure_tests.py'),
+    ],
+}
diff --git a/samza-test/src/main/python/stream_processor.py b/samza-test/src/main/python/stream_processor.py
new file mode 100644 (file)
index 0000000..a7a2bc8
--- /dev/null
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+import os
+import time
+import zopkio.adhoc_deployer as adhoc_deployer
+from zopkio.remote_host_helper import get_ssh_client, exec_with_env
+import zopkio.runtime as runtime
+
+logger = logging.getLogger(__name__)
+
+class StreamProcessor:
+    """
+    Represents a standalone StreamProcessor that uses zookeeper for coordination. Used in standalone failure tests to
+    to manage the lifecycle of linux process(start, kill, pause) associated with the StreamProcessor.
+    """
+
+    def __init__(self, host_name, processor_id):
+        """
+        :param host_name: Represents the host name in which this StreamProcessor will run.
+        :param processor_id: Represents the processor_id of StreamProcessor.
+        """
+        start_cmd = 'export SAMZA_LOG_DIR=\"deploy/{0}\"; export JAVA_OPTS=\"$JAVA_OPTS -Xmx2G\"; ./bin/run-class.sh  org.apache.samza.test.integration.LocalApplicationRunnerMain --config-path ./config/standalone.failure.test.properties --operation run --config processor.id={0} >> /tmp/{0}.log &'
+        self.username = runtime.get_username()
+        self.password = runtime.get_password()
+        self.processor_id = processor_id
+        self.host_name = host_name
+        self.processor_start_command = start_cmd.format(self.processor_id)
+        logger.info('Running processor start command: {0}'.format(self.processor_start_command))
+        self.deployment_config = {
+            'install_path': os.path.join(runtime.get_active_config('remote_install_path'), 'deploy/{0}'.format(self.processor_id)),
+            'executable': 'samza-test_2.11-0.15.0-SNAPSHOT.tgz',
+            'post_install_cmds': [],
+            'start_command': self.processor_start_command,
+            'stop_command': '',
+            'extract': True,
+            'sync': True,
+        }
+        self.deployer = adhoc_deployer.SSHDeployer(self.processor_id, self.deployment_config)
+
+    def start(self):
+        """
+        Submits the StreamProcessor for execution on a host: host_name.
+        """
+        logger.info("Starting processor with id: {0}.".format(self.processor_id))
+        self.deployer.start(self.processor_id, {'hostname': self.host_name})
+
+    def get_processor_id(self):
+        """
+        Returns the processorId of the StreamProcessor.
+        """
+        return self.processor_id
+
+    def kill(self):
+        """
+        Kills the StreamProcessor process through SIGKILL signal.
+        """
+        self.__send_signal_to_processor("SIGKILL")
+
+    def pause(self):
+        """
+        Pauses the StreamProcessor process through SIGSTOP signal.
+        """
+        self.__send_signal_to_processor("SIGSTOP")
+
+    def resume(self):
+        """
+        Resumes the stream processor process through SIGCONT signal.
+        """
+        self.__send_signal_to_processor("SIGCONT")
+
+    def __send_signal_to_processor(self, signal):
+        """
+        Sends a signal(:param signal) to the linux process of the StreamProcessor.
+        """
+        linux_process_pids = self.__get_pid()
+        for linux_process_pid in linux_process_pids:
+            command = "kill -{0} {1}".format(signal, linux_process_pid)
+            result = self.__execute_command(command)
+            logger.info("Result of {0} is: {1}.".format(command, result))
+
+    def __get_pid(self):
+        """
+        Determines the linux process id associated with this StreamProcessor.
+        """
+        ps_command = "ps aux | grep '{0}' | grep -v grep | tr -s ' ' | cut -d ' ' -f 2 | grep -Eo '[0-9]+'".format(self.processor_id)
+        non_failing_command = "{0}; if [ $? -le 1 ]; then true;  else false; fi;".format(ps_command)
+        logger.info("Executing command: {0}.".format(non_failing_command))
+        full_output = self.__execute_command(non_failing_command)
+        pids = []
+        if len(full_output) > 0:
+            pids = [int(pid_str) for pid_str in full_output.split('\n') if pid_str.isdigit()]
+        return pids
+
+    def __execute_command(self, command):
+        """
+        Executes the :param command on host: self.host_name.
+        """
+        with get_ssh_client(self.host_name, username=self.username, password=self.password) as ssh:
+            chan = exec_with_env(ssh, command, msg="Failed to get PID", env={})
+        execution_result = ''
+        while True:
+            result_buffer = chan.recv(16)
+            if len(result_buffer) == 0:
+                break
+            execution_result += result_buffer
+        return execution_result
diff --git a/samza-test/src/main/python/tests/standalone_failure_tests.py b/samza-test/src/main/python/tests/standalone_failure_tests.py
new file mode 100644 (file)
index 0000000..0fab742
--- /dev/null
@@ -0,0 +1,311 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import util
+import sys
+import logging
+from kafka import SimpleProducer, SimpleConsumer
+import time
+import traceback
+from stream_processor import StreamProcessor
+from zk_client import ZkClient
+import threading
+
+logger = logging.getLogger(__name__)
+NUM_MESSAGES = 50
+GROUP_COORDINATION_TIMEOUT = 14
+TEST_OUTPUT_TOPIC = 'standalone_integration_test_kafka_output_topic'
+zk_client = None
+
+### TODO: In each test add barrier state and processorId validations after fixing data serialization format in zookeeper(SAMZA-1749).
+def __purge_zk_data():
+    """
+    Recursively deletes all data nodes created in zookeeper in a test-run.
+    """
+    zk_client.purge_all_nodes()
+
+def __pump_messages_into_input_topic():
+    """
+    Produce 50 messages into input topic: standalone_integration_test_kafka_input_topic.
+    """
+    kafka_client = None
+    input_topic = 'standalone_integration_test_kafka_input_topic'
+    try:
+        kafka_client = util.get_kafka_client()
+        kafka_client.ensure_topic_exists(input_topic)
+        producer = SimpleProducer(kafka_client, async=False, req_acks=SimpleProducer.ACK_AFTER_CLUSTER_COMMIT, ack_timeout=30000)
+        logger.info('Producing {0} messages to topic: {1}'.format(NUM_MESSAGES, input_topic))
+        for message_index in range(1, NUM_MESSAGES + 1):
+            producer.send_messages(input_topic, str(message_index))
+    except:
+        logger.error(traceback.format_exc(sys.exc_info()))
+    finally:
+        if kafka_client is not None:
+            kafka_client.close()
+
+def __setup_processors():
+    """
+    Instantiates and schedules three stream processors for execution in localhost.
+    :return the instantiated stream processors.
+    """
+    processors = {}
+    for processor_id in ['standalone-processor-1', 'standalone-processor-2', 'standalone-processor-3']:
+        processors[processor_id] = StreamProcessor(host_name='localhost', processor_id=processor_id)
+        processors[processor_id].start()
+    return processors
+
+def __tear_down_processors(processors):
+    """
+    Kills all the stream processor passed in :param processors.
+    """
+    for processor_id, processor in processors.iteritems():
+        logger.info("Killing processor: {0}.".format(processor_id))
+        processor.kill()
+
+def __setup_zk_client():
+    """
+    Instantiate a ZkClient to connect to a zookeeper server in localhost.
+    """
+    global zk_client
+    zk_client = ZkClient(zookeeper_host='127.0.0.1', zookeeper_port='2181', app_name='test-app-name', app_id='test-app-id')
+    zk_client.start()
+
+def __teardown_zk_client():
+    """
+    Stops the ZkClient.
+    """
+    global zk_client
+    zk_client.stop()
+
+def job_model_watcher(event, expected_processors):
+    start_time_seconds = time.time()
+    elapsed_time_seconds = (int)(time.time() - start_time_seconds)
+    while elapsed_time_seconds <= 30:
+        recent_job_model = zk_client.get_latest_job_model()
+        if set(recent_job_model['containers'].keys()) == set(expected_processors):
+            event.set()
+            return
+        else:
+            time.sleep(2)
+        elapsed_time_seconds = (int)(time.time() - start_time_seconds)
+
+def __validate_job_model(job_model, killed_processors=[]):
+    ## Validate the TaskModel. Check if all the partitions are assigned to the containers.
+    expected_ssps = [{u'partition': 0, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
+                        {u'partition': 1, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'},
+                        {u'partition': 2, u'system': u'testSystemName', u'stream': u'standalone_integration_test_kafka_input_topic'}]
+    actual_ssps = []
+    for container_id, tasks in job_model['containers'].iteritems():
+        for partition, ssps in tasks['tasks'].iteritems():
+            actual_ssps.append(ssps['system-stream-partitions'][0])
+    actual_ssps.sort()
+    assert expected_ssps == actual_ssps, 'Expected ssp: {0}, Actual ssp: {1}.'.format(expected_ssps, actual_ssps)
+
+    ## Validate the ContainerModel. Live processors should be present in the JobModel and killed processors should not be in JobModel.
+    active_processors = zk_client.get_active_processors()
+    assert set(active_processors) == set(job_model['containers'].keys()), 'ProcessorIds: {0} does not exist in JobModel: {1}.'.format(active_processors, job_model['containers'].keys())
+    for processor_id in killed_processors:
+        assert processor_id not in job_model['containers'], 'Processor: {0} exists in JobModel: {1}.'.format(processor_id, job_model)
+
+def __get_job_model(expected_processors):
+    event = threading.Event()
+    zk_client.watch_job_model(job_model_watcher(event=event, expected_processors=expected_processors))
+    event.wait(2 * GROUP_COORDINATION_TIMEOUT)
+    return zk_client.get_latest_job_model()
+
+def test_kill_leader():
+    """
+    Launches three stream processors. Kills the leader processor. Waits till the group coordination timeout
+    and verifies that the final JobModel contains both the followers.
+    """
+    processors = {}
+    try:
+        __setup_zk_client()
+        __pump_messages_into_input_topic()
+        processors = __setup_processors()
+
+        ## Validations before killing the leader.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model, [])
+
+        leader_processor_id = zk_client.get_leader_processor_id()
+        processors.pop(leader_processor_id).kill()
+
+        ## Validations after killing the leader.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        assert leader_processor_id != zk_client.get_leader_processor_id(), '{0} is still the leader'.format(leader_processor_id)
+        __validate_job_model(job_model, [leader_processor_id])
+    except:
+        ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+        logger.error(traceback.format_exc(sys.exc_info()))
+        raise
+    finally:
+        __tear_down_processors(processors)
+        __purge_zk_data()
+        __teardown_zk_client()
+
+def test_kill_one_follower():
+    """
+    Launches three stream processors. Kills one follower processor. Waits till the group coordination timeout and
+    verifies that the final JobModel contains the leader processor and un-killed follower processor.
+    """
+    processors = {}
+    try:
+        __setup_zk_client()
+        __pump_messages_into_input_topic()
+        processors = __setup_processors()
+
+        leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
+
+        ## Validations before killing the follower.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model)
+
+        for processor_id, deployer in processors.iteritems():
+            if processor_id != leader_processor_id:
+                follower = processors.pop(processor_id)
+                follower.kill()
+                killed_processors.append(follower)
+                break
+
+        ## Validations after killing the follower.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        assert leader_processor_id == zk_client.get_leader_processor_id(), '{0} is not the leader'.format(leader_processor_id)
+        __validate_job_model(job_model, killed_processors)
+    except:
+        ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+        logger.error(traceback.format_exc(sys.exc_info()))
+        raise
+    finally:
+        __tear_down_processors(processors)
+        __purge_zk_data()
+        __teardown_zk_client()
+
+def test_kill_multiple_followers():
+    """
+    Launches three stream processors. Kills both the follower processors. Waits for group coordination timeout
+    and verifies that the final JobModel contains only the leader processor.
+    """
+    processors = {}
+    try:
+        __setup_zk_client()
+        __pump_messages_into_input_topic()
+        processors = __setup_processors()
+
+        ## Validations before killing the followers.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model)
+
+        leader_processor_id, killed_processors = zk_client.get_leader_processor_id(), []
+
+        for processor_id in processors.keys():
+            if processor_id != leader_processor_id:
+                follower = processors.pop(processor_id)
+                killed_processors.append(follower)
+                follower.kill()
+
+        ## Validations after killing the followers.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model, killed_processors)
+    except:
+        ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+        logger.error(traceback.format_exc(sys.exc_info()))
+        raise
+    finally:
+        __tear_down_processors(processors)
+        __purge_zk_data()
+        __teardown_zk_client()
+
+def test_kill_leader_and_a_follower():
+    """
+    Launches three stream processors. Kills both a leader and a follower processors.
+    Waits till the group coordination timeout and verifies that the final JobModel contains only one processor.
+    """
+    processors = {}
+    try:
+        __setup_zk_client()
+        __pump_messages_into_input_topic()
+        processors = __setup_processors()
+
+        leader_processor_id = zk_client.get_leader_processor_id()
+
+        ## Validations before killing the leader and follower.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model)
+
+        killed_processors = [leader_processor_id]
+        processors.pop(leader_processor_id).kill()
+
+        for processor_id in processors.keys():
+            follower = processors.pop(processor_id)
+            killed_processors.append(processor_id)
+            follower.kill()
+            break
+
+        ## Validations after killing the leader and follower.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model, killed_processors)
+    except:
+        ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+        logger.error(traceback.format_exc(sys.exc_info()))
+        raise
+    finally:
+        __tear_down_processors(processors)
+        __purge_zk_data()
+        __teardown_zk_client()
+
+def test_pause_resume_leader():
+    """
+    Launches three processors. Pauses the leader processor. Wait till group coordination timeout and verifies that the
+    JobModel doesn't contain leader processor. Resumes the leader processor and waits till group coordination timeout,
+    verifies that new JobModel contains the previously paused leader processor.
+    """
+    processors = {}
+    try:
+        __setup_zk_client()
+        __pump_messages_into_input_topic()
+        processors = __setup_processors()
+
+        ## Validations before pausing the leader.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model)
+
+        leader_processor_id = zk_client.get_leader_processor_id()
+        leader = processors.pop(leader_processor_id)
+
+        logger.info("Pausing the leader processor: {0}.".format(leader_processor_id))
+        leader.pause()
+
+        ## Validations after pausing the leader.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model, [leader_processor_id])
+
+        logger.info("Resuming the leader processor: {0}.".format(leader_processor_id))
+        leader.resume()
+
+        ## Validations after resuming the leader.
+        job_model = __get_job_model(expected_processors=processors.keys())
+        __validate_job_model(job_model)
+
+        leader.kill()
+    except:
+        ## Explicitly logging exception, since zopkio doesn't log complete stacktrace.
+        logger.error(traceback.format_exc(sys.exc_info()))
+        raise
+    finally:
+        __tear_down_processors(processors)
+        __purge_zk_data()
+        __teardown_zk_client()
diff --git a/samza-test/src/main/python/tests/zk_client.py b/samza-test/src/main/python/tests/zk_client.py
new file mode 100644 (file)
index 0000000..2a11a80
--- /dev/null
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+from kazoo.client import KazooClient
+import logging
+import sys
+import traceback
+
+logger = logging.getLogger(__name__)
+
+class ZkClient:
+
+    """
+    Wrapper class over KazooClient. Provides utility methods for standalone failure tests to get details about
+    processor group state stored in zookeeper.
+
+    Instantiates a kazoo client to connect to zookeeper server at :param zookeeper_host::param zookeeper_port.
+    """
+    def __init__(self, zookeeper_host, zookeeper_port, app_name, app_id):
+        self.kazoo_client = KazooClient(hosts='{0}:{1}'.format(zookeeper_host, zookeeper_port))
+        self.zk_base_node = 'app-{0}-{1}/{2}-{3}-coordinationData'.format(app_name, app_id, app_name, app_id)
+
+    def start(self):
+        """
+        Establishes connection with the zookeeper server at self.host_name:self.port.
+        """
+        self.kazoo_client.start()
+
+    def stop(self):
+        """
+        Closes and releases the connection held with the zookeeper server.
+        """
+        self.kazoo_client.stop()
+
+    def watch_job_model(self, watch_function):
+        self.kazoo_client.ensure_path('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+        self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node), watch=watch_function)
+
+    def get_latest_job_model(self):
+        """
+        Reads and returns the latest JobModel from zookeeper.
+        """
+        job_model_dict = {}
+        try:
+            childZkNodes = self.kazoo_client.get_children('{0}/JobModelGeneration/jobModels/'.format(self.zk_base_node))
+            if len(childZkNodes) > 0:
+                childZkNodes.sort()
+                childZkNodes.reverse()
+
+                job_model_generation_path = '{0}/JobModelGeneration/jobModels/{1}/'.format(self.zk_base_node, childZkNodes[0])
+                job_model, _ = self.kazoo_client.get(job_model_generation_path)
+
+                """
+                ZkClient java library stores the data in the following format in zookeeper:
+                        class_name, data_length, actual_data
+
+                JobModel json manipulation: Delete all the characters before first occurrence of '{' in jobModel json string.
+
+                Normal json deserialization without the above custom string massaging fails. This will be removed after SAMZA-1749.
+                """
+
+                first_curly_brace_index = job_model.find('{')
+                job_model = job_model[first_curly_brace_index: ]
+                job_model_dict = json.loads(job_model)
+                logger.info("Recent JobModel in zookeeper: {0}".format(job_model_dict))
+        except:
+            logger.error(traceback.format_exc(sys.exc_info()))
+        return job_model_dict
+
+    def get_leader_processor_id(self):
+        """
+        Determines the processorId of the current leader in a processors group.
+
+        Returns the processorId of the leader if leader exists.
+        Returns None otherwise.
+        """
+        leader_processor_id = None
+        try:
+            processors_path =  '{0}/processors'.format(self.zk_base_node)
+            childZkNodes = self.kazoo_client.get_children(processors_path)
+            childZkNodes.sort()
+            child_processor_path = '{0}/{1}'.format(processors_path, childZkNodes[0])
+            processor_data, _ = self.kazoo_client.get(child_processor_path)
+            host, leader_processor_id = processor_data.split(" ")
+        except:
+            logger.error(traceback.format_exc(sys.exc_info()))
+        return leader_processor_id
+
+    def purge_all_nodes(self):
+        """
+        Recursively delete all zookeeper nodes from the base node: self.zk_base_node.
+        """
+        try:
+            self.kazoo_client.delete(path=self.zk_base_node, version=-1, recursive=True)
+        except:
+            logger.error(traceback.format_exc(sys.exc_info()))
+
+    def get_active_processors(self):
+        """
+        Determines the processor ids that are active in zookeeper.
+        """
+        processor_ids = []
+        try:
+            processors_path =  '{0}/processors'.format(self.zk_base_node)
+            childZkNodes = self.kazoo_client.get_children(processors_path)
+            childZkNodes.sort()
+
+            for childZkNode in childZkNodes:
+                child_processor_path = '{0}/{1}'.format(processors_path, childZkNode)
+                processor_data, _ = self.kazoo_client.get(child_processor_path)
+                host, processor_id = processor_data.split(" ")
+                processor_ids.append(processor_id)
+        except:
+            logger.error(traceback.format_exc(sys.exc_info()))
+        return processor_ids