SAMZA-394; fix legacy integration tests
authorNavina Ramesh <navi.trinity@gmail.com>
Thu, 26 Feb 2015 22:41:17 +0000 (14:41 -0800)
committerChris Riccomini <criccomini@apache.org>
Thu, 26 Feb 2015 22:41:17 +0000 (14:41 -0800)
23 files changed:
bin/setup-int-test.sh [new file with mode: 0755]
build.gradle
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
samza-test/src/main/config/join/README [new file with mode: 0644]
samza-test/src/main/config/join/checker.samza [moved from samza-test/src/main/config/join/checker.samsa with 73% similarity]
samza-test/src/main/config/join/common.properties [new file with mode: 0644]
samza-test/src/main/config/join/emitter.samza [moved from samza-test/src/main/config/join/emitter.samsa with 79% similarity]
samza-test/src/main/config/join/joiner.samza [moved from samza-test/src/main/config/join/joiner.samsa with 77% similarity]
samza-test/src/main/config/join/reset.sh [new file with mode: 0644]
samza-test/src/main/config/join/watcher.samza [moved from samza-test/src/main/config/join/watcher.samsa with 81% similarity]
samza-test/src/main/config/negate-number.properties
samza-test/src/main/java/org/apache/samza/test/integration/SimpleStatefulTask.java
samza-test/src/main/java/org/apache/samza/test/integration/StatePerfTestTask.java
samza-test/src/main/java/org/apache/samza/test/integration/join/Checker.java
samza-test/src/main/java/org/apache/samza/test/integration/join/Emitter.java
samza-test/src/main/java/org/apache/samza/test/integration/join/EpochPartitioner.java
samza-test/src/main/java/org/apache/samza/test/integration/join/Joiner.java
samza-test/src/main/java/org/apache/samza/test/integration/join/Watcher.java
samza-test/src/main/python/samza_failure_testing.py [new file with mode: 0755]
samza-test/src/main/resources/hello-stateful-world.samza [moved from samza-test/src/main/config/hello-stateful-world.samsa with 100% similarity]
samza-test/src/main/resources/log4j.xml

diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
new file mode 100755 (executable)
index 0000000..112bda6
--- /dev/null
@@ -0,0 +1,49 @@
+#!/bin/bash -e
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This script will setup the environment for integration test and kick-start the samza jobs
+
+if [ $# -eq 0 ] || [ -z "$1" ] ; then
+    echo "Usage: ./bin/setup-int-test.sh <DEPLOY_DIR>"
+    exit -1
+fi
+
+DEPLOY_ROOT_DIR=$1
+KAFKA_DIR=$1/kafka
+SAMZA_DIR=$1/samza
+
+# Setup the deployment Grid
+# $BASE_DIR/bin/grid.sh bootstrap
+
+# sleep 10
+
+# Setup the topics
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic epoch
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitter-state
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitted
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic joiner-state
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic completed-keys
+$KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic checker-state
+
+# Start the jobs
+for job in checker joiner emitter watcher
+do
+    $SAMZA_DIR/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$SAMZA_DIR/config/join/common.properties --config-path=file://$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
+done
+
+
index 9ca6cdc..97d848a 100644 (file)
@@ -80,6 +80,7 @@ rat {
     'samza-test/state/mystore/**',
     'README.md',
     'RELEASE.md',
+    'samza-test/src/main/resources/**'
   ]
 }
 
@@ -382,6 +383,7 @@ project(":samza-test_$scalaVersion") {
     from(file("$projectDir/src/main/config")) { into "config/" }
     from(file("$projectDir/src/main/resources")) { into "lib/" }
     from(project(':samza-shell').file("src/main/bash")) { into "bin/" }
+    from(file("$projectDir/src/main/python/ghostface_killah.py")) { into "bin/"}
     from(project(':samza-shell').file("src/main/resources")) { into "lib/" }
     from(project(':samza-shell').file("src/main/resources/log4j-console.xml")) { into "bin/" }
     from '../LICENSE'
index 2fc6c65..e3b9d30 100644 (file)
@@ -79,6 +79,7 @@ object SamzaContainer extends Logging {
     // exception in the main method.
     val containerId = System.getenv(ShellCommandConfig.ENV_CONTAINER_ID).toInt
     val coordinatorUrl = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL)
+    logger.info("######### Coordinator URL in SafeMain() - " + coordinatorUrl)
     val jobModel = readJobModel(coordinatorUrl)
     val containerModel = jobModel.getContainers()(containerId.toInt)
     val config = jobModel.getConfig
index 16345cd..0b720ec 100644 (file)
@@ -67,7 +67,7 @@ class JobRunner(config: Config) extends Logging with Runnable {
     Option(job.waitForStatus(Running, 500)) match {
       case Some(appStatus) => {
         if (Running.equals(appStatus)) {
-          info("job started successfully")
+          info("job started successfully - " + appStatus)
         } else {
           warn("unable to start job successfully. job has status %s" format (appStatus))
         }
index 6985af6..b80d349 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.samza.coordinator.JobCoordinator
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def   getJob(config: Config): StreamJob = {
     val coordinator = JobCoordinator(config, 1)
     val containerModel = coordinator.jobModel.getContainers.get(0)
 
@@ -53,16 +53,16 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
           }
         }
       }
+      coordinator.start
 
       commandBuilder
         .setConfig(config)
         .setId(0)
-
-      coordinator.start
+        .setUrl(coordinator.server.getUrl)
 
       new ProcessJob(commandBuilder)
     } finally {
-      coordinator.stop
+//      coordinator.stop
     }
   }
 }
diff --git a/samza-test/src/main/config/join/README b/samza-test/src/main/config/join/README
new file mode 100644 (file)
index 0000000..27bda8b
--- /dev/null
@@ -0,0 +1,77 @@
+INTEGRATION TEST
+
+* What does the test do? *
+This is a correctness test that attempts to do partitioned messaging and use state. It is meant to be run while killing samsa and kafka machines to test fault-tolerance.
+It runs in iterations and each iteration has a correctness criteria that is checked before launching the next iteration. Here are the jobs and their function
+
+emitter.samza:
+  This job takes input from the "epoch" topic. Epochs are number 0, 1, 2,...
+  For each new epoch each emitter task does something like the following:
+     for i = 0...count:
+       send("emitted", i, partition)
+  where partition is the task partition id.
+  
+joiner.samza:
+  This job takes in the emitted values from emitter and joins them together by key.
+  When it has received an emitted value from each partition it outputs the key to the topic "completed".
+  To track which partitions have emitted their value it keeps a store with | seperated numbers. 
+  The first entry is the epoch and the remaining entries are partitions that have emitted the key.
+  
+checker.samza:
+  This job has a single partition and stores all the completed keys. When all the keys are completed it sends an incremented epoch to the epoch topic, kicking off a new round.
+  
+watcher.samza:
+  This job watches the epoch topic. If the epoch doesn't advance within some SLA this job sends an alert email.
+  
+The state maintained by some of these jobs is slightly complex because of the need to make everything idempotent. So, for example, instead of keeping the partition count
+in the joiner job we keep the set of partitions so that double counting can not occur.
+
+To run, simply start all four jobs at once.
+
+* How to setup test ? *
+
+NOTE: You will need to have Paramiko installed in order to run these tests.
+
+This test is meant to be used with hello-samza's bin/grid script. If you use hello-samza, and bin/grid bootstrap, then DEPLOY_DIR can be set to samza-hello-samza/deploy/samza,
+and you can run the integration tests with ./bin/setup-int-test.sh path/to/samza-hello-samza/deploy.
+
+The steps to setup the integration tests is similar to the samza-hello-samza setup, using the grid script.
+Once you deploy Zookeeper, YARN and Kafka, you have to generate the tar ball with the tests.
+Before generating the tar ball, update the "yarn.package.path" in $SAMZA_SRC/src/config/join/common.properties file to the path where the published tar will be made available.
+
+--> Release the tar
+cd $SAMZA_SRC
+./gradlew releaseTestJobs
+cp ~/samza-test/build/distributions/samza-test_*.tgz $DEPLOY_DIR
+tar -xvf $DEPLOY_DIR/samza-test_*.tgz -C $DEPLOY_DIR/samza
+
+--> Create the topics and start the samza jobs
+./bin/setup-int-test.sh $DEPLOY_DIR
+
+Now, you should be able to view all 4 jobs running in the Yarn UI.
+
+FAILURE TESTING:
+
+* What does the test do? *
+This is used to test the resilience of the system. It periodically brings down a random container or kafka broker in the system and waits to see if it recovers correctly.
+
+* How to setup test? *
+Ensure that the 4 jobs are running via the YARN UI.
+In order to trigger the failure testing, run the python script: $SAMZA_SRC/samza-test/src/main/python/samza_failure_testing.py
+
+Usage: samza_failure_testing.py [options]
+
+Options:
+  -h, --help            show this help message and exit
+  --node-list=nodes.txt
+                        A list of nodes in the YARN cluster
+  --kill-time=s         The time in seconds to sleep between
+  --kafka-dir=dir       The directory in which to find kafka
+  --kafka-host=localhost
+                        Host on which Kafka is installed
+  --yarn-dir=dir        The directory in which to find yarn
+  --kill-kafka          Should we kill Kafka?
+  --kill-container      Should we kill Application Container?
+  --yarn-host=localhost
+                        Host that will respond to Yarn REST queries
+
 # Job
 job.name=checker
 
-systems.kafka.partitioner.class=samza.test.integration.join.EpochPartitioner
+systems.kafka.partitioner.class=org.apache.samza.test.integration.join.EpochPartitioner
 
 # Task
-task.class=samza.test.integration.join.Checker
+task.class=org.apache.samza.test.integration.join.Checker
 task.inputs=kafka.completed-keys
 
-stores.checker-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
+stores.checker-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.checker-state.key.serde=string
 stores.checker-state.msg.serde=string
 stores.checker-state.changelog=kafka.checker-state
+stores.checker-state.changelog.replication.factor=1
 
-task.window.ms=300000
+task.window.ms=30000
+
+num.partitions=2
+expected.keys=5000
 
-num.partitions=4
-expected.keys=100000
diff --git a/samza-test/src/main/config/join/common.properties b/samza-test/src/main/config/join/common.properties
new file mode 100644 (file)
index 0000000..ad10aac
--- /dev/null
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+####################
+### UPDATE THIS! ###
+####################
+yarn.package.path=<YARN.PACKAGE.PATH>
+
+job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.system=kafka-checkpoints
+task.checkpoint.replication.factor=1
+
+serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
+
+ # Kafka System
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181
+systems.kafka.consumer.auto.offset.reset=smallest
+systems.kafka.producer.bootstrap.servers=localhost:9092
+systems.kafka.samza.key.serde=string
+systems.kafka.samza.msg.serde=string
+
+ # Checkpoints System
+systems.kafka-checkpoints.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka-checkpoints.producer.bootstrap.servers=localhost:9092
+systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
+
+yarn.container.retry.count=-1
+yarn.container.retry.window.ms=60000
+
 job.name=emitter
 
 # Task
-task.class=samza.test.integration.join.Emitter
+task.class=org.apache.samza.test.integration.join.Emitter
 task.inputs=kafka.epoch
+task.commit.ms=1000
+task.window.ms=5
 
-stores.emitter-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
+
+stores.emitter-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.emitter-state.key.serde=string
 stores.emitter-state.msg.serde=string
 stores.emitter-state.changelog=kafka.emitter-state
+stores.emitter-state.changelog.replication.factor=1
 
-task.window.ms=0
-
-count=100000
+count=5000
similarity index 77%
rename from samza-test/src/main/config/join/joiner.samsa
rename to samza-test/src/main/config/join/joiner.samza
index 4ecee2b..a138e9e 100644 (file)
 job.name=joiner
 
 # Task
-task.class=samza.test.integration.join.Joiner
+task.class=org.apache.samza.test.integration.join.Joiner
 task.inputs=kafka.emitted
 
-stores.joiner-state.factory=samza.storage.kv.KeyValueStorageEngineFactory
+stores.joiner-state.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
 stores.joiner-state.key.serde=string
 stores.joiner-state.msg.serde=string
-stores.joiner-state.changelog=kafka.checker-state
+stores.joiner-state.changelog=kafka.joiner-state
+stores.joiner-state.changelog.replication.factor=1
 
-num.partitions=4
+num.partitions=2
diff --git a/samza-test/src/main/config/join/reset.sh b/samza-test/src/main/config/join/reset.sh
new file mode 100644 (file)
index 0000000..f94bded
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#!/bin/bash
+
+###############################################################################################
+## setup script for join test--delete all kafka and zk data and restart them in a fresh state
+###############################################################################################
+
+echo "Shutting down and cleaning up..."
+pkill -f kafka.Kafka
+pkill -f org.apache.zookeeper.server.quorum.QuorumPeerMain
+sleep 5
+rm -rf /tmp/kafka-logs /tmp/zookeeper
+sleep 2
+
+echo "Starting zk and kafka..."
+bin/zookeeper-server-start.sh config/zookeeper.properties &
+sleep 5
+bin/kafka-server-start.sh config/server.properties &
+sleep 5
+
+echo "Creating topics..."
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic epoch
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitter-state
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic emitted
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --create --topic joiner-state
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic completed-keys
+bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --create --topic checker-state
+
+echo "all done"
\ No newline at end of file
 job.name=watcher
 
 # Task
-task.class=samza.test.integration.join.Joiner
+task.class=org.apache.samza.test.integration.join.Watcher
 task.inputs=kafka.epoch
 
-task.window.ms=300000
+task.window.ms=40000
 
-max.time.between.epochs.ms=600000
-mail.smtp.host=TODO
-mail.to=dev@samza.apache.org
-mail.from=gregor@incubator.apache.org
+max.time.between.epochs.ms=60000
index b9f898c..2d2f75a 100644 (file)
@@ -27,6 +27,8 @@ task.class=org.apache.samza.test.integration.NegateNumberTask
 task.inputs=kafka.samza-test-topic
 task.max.messages=50
 task.outputs=kafka.samza-test-topic-output
+task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
+task.checkpoint.replication.factor=1
 
 # Serializers
 serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
index 4dbcb75..67e56e0 100644 (file)
@@ -31,6 +31,11 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 
+/**
+ * This is a simple task that writes each message to a state store and prints them all out on reload.
+ * 
+ * It is useful for command line testing with the kafka console producer and consumer and text messages.
+ */
 public class SimpleStatefulTask implements StreamTask, InitableTask {
   
   private KeyValueStore<String, String> store;
index 77f770e..d7fecd8 100644 (file)
@@ -29,6 +29,9 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 
+/**
+ * A simple performance test that just reads in messages and writes them to a state store as quickly as possible and periodically prints out throughput numbers
+ */
 public class StatePerfTestTask implements StreamTask, InitableTask {
   
   private KeyValueStore<String, String> store;
index 2a2177a..1fef1ea 100644 (file)
@@ -20,6 +20,7 @@
 package org.apache.samza.test.integration.join;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueIterator;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -31,9 +32,17 @@ import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
-import static java.lang.System.out;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 
 public class Checker implements StreamTask, WindowableTask, InitableTask {
+  
+  private static Logger logger = LoggerFactory.getLogger(Checker.class);
 
   private static String CURRENT_EPOCH = "current-epoch";
   private KeyValueStore<String, String> store;
@@ -52,40 +61,63 @@ public class Checker implements StreamTask, WindowableTask, InitableTask {
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
     String key = (String) envelope.getKey();
     String epoch = (String) envelope.getMessage();
+    logger.info("Got key=" + key + ", epoch = " + epoch + " in checker...");
     checkEpoch(epoch);
     this.store.put(key, epoch);
   }
   
   @Override
   public void window(MessageCollector collector, TaskCoordinator coordinator) {
-    KeyValueIterator<String, String> iter = this.store.all();
     String currentEpoch = this.store.get(CURRENT_EPOCH);
-    out.println("Checking if epoch " + currentEpoch + " is complete.");
+    logger.info("Checking if epoch " + currentEpoch + " is complete.");
     int count = 0;
+    KeyValueIterator<String, String> iter = this.store.all();
+
     while(iter.hasNext()) {
-      String foundEpoch = iter.next().getValue();
-      if(foundEpoch.equals(currentEpoch))
-        count += 1;
+      Entry<String, String> entry= iter.next();
+      String foundEpoch = entry.getValue();
+      if(foundEpoch.equals(currentEpoch)) {
+          count += 1;
+      } else {
+          logger.info("####### Found a different epoch! - " + foundEpoch + " Current epoch is " + currentEpoch);
+      }
     }
     iter.close();
     if(count == expectedKeys + 1) {
-      out.println("Epoch " + currentEpoch + " is complete.");
+      logger.info("Epoch " + currentEpoch + " is complete.");
       int nextEpoch = Integer.parseInt(currentEpoch) + 1;
-      for(int i = 0; i < numPartitions; i++)
-        collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), i, Integer.toString(nextEpoch))); 
+      for(int i = 0; i < numPartitions; i++) {
+          logger.info("Emitting next epoch - " + Integer.toString(i) + " -> " + Integer.toString(nextEpoch));
+          collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "epoch"), Integer.toString(i), Integer.toString(nextEpoch)));
+      }
+      this.store.put(CURRENT_EPOCH, Integer.toString(nextEpoch));
     } else if(count > expectedKeys + 1) {
       throw new IllegalStateException("Got " + count + " keys, which is more than the expected " + (expectedKeys + 1));
     } else {
-      out.println("Only found " + count + " valid keys, try again later.");
+      logger.info("Only found " + count + " valid keys, try again later.");
     }
   }
   
-  private void checkEpoch(String epoch) {
+/*  private void checkEpoch(String epoch) {
     String curr = this.store.get(CURRENT_EPOCH);
     if(curr == null)
       this.store.put(CURRENT_EPOCH, epoch);
-    else if(!curr.equals(epoch))
+    else if(!curr.equals(epoch)) // should have curr > epoch
       throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
-  }
-
+  }*/
+    private void checkEpoch(String epoch) {
+        String curr = this.store.get(CURRENT_EPOCH);
+        if(curr == null)
+            this.store.put(CURRENT_EPOCH, epoch);
+        else {
+            int currentEpochInStore = Integer.parseInt(curr);
+            int currentEpochInMsg = Integer.parseInt(epoch);
+            if (currentEpochInMsg <= currentEpochInStore) {
+                if(currentEpochInMsg < currentEpochInStore)
+                    logger.info("#### Ignoring received epoch = " + epoch + " less than what is in store " + curr);
+            } else { // should have curr > epoch
+                throw new IllegalArgumentException("Got epoch " + epoch + " but have not yet completed " + curr);
+            }
+        }
+    }
 }
index f20bb7f..e958b51 100644 (file)
@@ -32,15 +32,15 @@ import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.TaskCoordinator.RequestScope;
 import org.apache.samza.task.WindowableTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 
-/**
- * This job takes input from "epoch" and for each epoch emits "max" records of form
- *   (key = counter, value = epoch-partition)
- *   
- */
 @SuppressWarnings("unchecked")
 public class Emitter implements StreamTask, InitableTask, WindowableTask {
   
+  private static Logger logger = LoggerFactory.getLogger(Emitter.class);
+  
   private static String EPOCH = "the-epoch";
   private static String COUNT = "the-count";
   
@@ -59,6 +59,8 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
     if(envelope.getSystemStreamPartition().getStream().equals("epoch")) {
       int newEpoch = Integer.parseInt((String) envelope.getMessage());
+      logger.info("New epoch in message - " + newEpoch);
+
       Integer epoch = getInt(EPOCH);
       if(epoch == null || newEpoch == epoch)
         return;
@@ -66,7 +68,8 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
         throw new IllegalArgumentException("Got new epoch " + newEpoch + " which is less than current epoch " + epoch);
       
       // it's a new era, reset current epoch and count
-      this.state.put(EPOCH, Integer.toString(epoch));
+      logger.info("Epoch: " + newEpoch);
+      this.state.put(EPOCH, Integer.toString(newEpoch));
       this.state.put(COUNT, "0");
       coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
     }
@@ -80,15 +83,19 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
     }
     int counter = getInt(COUNT);
     if(counter < max) {
-      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName);
+      logger.info("Emitting: " + counter + ", epoch = " + epoch + ", task = " + taskName);
+      OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(new SystemStream("kafka", "emitted"), Integer.toString(counter), epoch + "-" + taskName.toString());
       collector.send(envelope);
       this.state.put(COUNT, Integer.toString(getInt(COUNT) + 1));
-    } else {
-      trySleep(100);
     }
+/*    if(counter == max) {
+        logger.info("###### Committing because we finished emitting counter in this epoch");
+        coordinator.commit(RequestScope.ALL_TASKS_IN_CONTAINER);
+    }*/
   }
   
   private void resetEpoch() {
+    logger.info("Resetting epoch to 0");
     state.put(EPOCH, "0");
     state.put(COUNT, "0");
   }
@@ -97,13 +104,5 @@ public class Emitter implements StreamTask, InitableTask, WindowableTask {
     String value = this.state.get(key);
     return value == null? null : Integer.parseInt(value);
   }
-  
-  private void trySleep(long ms) {
-    try {
-      Thread.sleep(ms);
-    } catch(Exception e) {
-      e.printStackTrace();
-    }
-  }
 
 }
index 670ccf9..438d77c 100644 (file)
@@ -27,6 +27,9 @@ public class EpochPartitioner implements Partitioner {
   public EpochPartitioner(VerifiableProperties p){}
   
   public int partition(Object key, int numParts) {
-    return Integer.parseInt((String) key);
+    if(key instanceof Integer)
+      return (Integer) key;
+    else
+      return Integer.parseInt((String) key);
   }
 }
index 299c1fa..cb30838 100644 (file)
 
 package org.apache.samza.test.integration.join;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.samza.config.Config;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.storage.kv.KeyValueStore;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.OutgoingMessageEnvelope;
@@ -32,17 +34,24 @@ import org.apache.samza.task.MessageCollector;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
 
 @SuppressWarnings("unchecked")
 public class Joiner implements StreamTask, InitableTask {
   
+  private static Logger logger = LoggerFactory.getLogger(Joiner.class);
+  
   private KeyValueStore<String, String> store;
   private int expected;
+  private TaskName taskName;
 
   @Override
   public void init(Config config, TaskContext context) {
     this.store = (KeyValueStore<String, String>) context.getStore("joiner-state");
     this.expected = config.getInt("num.partitions");
+    this.taskName = context.getTaskName();
   }
 
   @Override
@@ -51,22 +60,31 @@ public class Joiner implements StreamTask, InitableTask {
     String value = (String) envelope.getMessage();
     String[] pieces = value.split("-");
     int epoch = Integer.parseInt(pieces[0]);
-    int partition = Integer.parseInt(pieces[1]);
+
+    int partition = Integer.parseInt(pieces[1].split(" ")[1]);
     Partitions partitions = loadPartitions(epoch, key);
-    if(partitions.epoch != epoch) {
+    logger.info("Joiner got epoch = " + epoch + ", partition = " + partition + ", parts = " + partitions);
+    if(partitions.epoch < epoch) {
       // we are in a new era
       if(partitions.partitions.size() != expected)
         throw new IllegalArgumentException("Should have " + expected + " partitions when new epoch starts.");
+      logger.info("Reseting epoch to " + epoch);
       this.store.delete(key);
       partitions.epoch = epoch;
       partitions.partitions.clear();
       partitions.partitions.add(partition);
+    } else if(partitions.epoch > epoch){
+      logger.info("Ignoring message for epoch " + epoch);
     } else {
       partitions.partitions.add(partition);
-      if(partitions.partitions.size() == expected)
+      if(partitions.partitions.size() == expected) {
+        logger.info("Completed: " + key + " -> " + Integer.toString(epoch));
         collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "completed-keys"), key, Integer.toString(epoch)));
+//        logger.info("Completed key " + key + " for epoch " + epoch);
+      }
     }
     this.store.put(key, partitions.toString());
+    logger.info("Join store in Task " + this.taskName + " " + key + " -> " + partitions.toString());
   }
   
   private Partitions loadPartitions(int epoch, String key) {
index b3efac5..7c82e0a 100644 (file)
 
 package org.apache.samza.test.integration.join;
 
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-
-import javax.mail.Message;
-import javax.mail.MessagingException;
-import javax.mail.Session;
-import javax.mail.Transport;
-import javax.mail.internet.InternetAddress;
-import javax.mail.internet.MimeMessage;
-
 import org.apache.samza.config.Config;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.task.InitableTask;
@@ -38,29 +27,28 @@ import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.TaskContext;
 import org.apache.samza.task.TaskCoordinator;
 import org.apache.samza.task.WindowableTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Watcher implements StreamTask, WindowableTask, InitableTask {
+  
+  private static Logger logger = LoggerFactory.getLogger(Watcher.class);
 
   private boolean inError = false;
   private long lastEpochChange = System.currentTimeMillis();
   private long maxTimeBetweenEpochsMs;
   private int currentEpoch = 0;
-  private String smtpHost;
-  private String to;
-  private String from;
   
   @Override
   public void init(Config config, TaskContext context) {
     this.maxTimeBetweenEpochsMs = config.getLong("max.time.between.epochs.ms");
-    this.smtpHost = config.get("mail.smtp.host");
-    this.to = config.get("mail.to");
-    this.from = config.get("mail.from");
   }
   
   @Override
   public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
     int epoch = Integer.parseInt((String) envelope.getMessage());
     if(epoch > currentEpoch) {
+      logger.info("Epoch changed to " + epoch + " from " + currentEpoch);
       this.currentEpoch = epoch;
       this.lastEpochChange = System.currentTimeMillis();
       this.inError = false;
@@ -72,24 +60,8 @@ public class Watcher implements StreamTask, WindowableTask, InitableTask {
     boolean isLagging = System.currentTimeMillis() - lastEpochChange > maxTimeBetweenEpochsMs;
     if(!inError && isLagging) {
       this.inError = true;
-      sendEmail(from, to, "Job failed to make progress!", String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000)));
-    }
-  }
-  
-  private void sendEmail(String from, String to, String subject, String body) {
-    Properties props = new Properties();
-    props.put("mail.smtp.host", smtpHost);
-    Session session = Session.getInstance(props, null);
-    try {
-        MimeMessage msg = new MimeMessage(session);
-        msg.setFrom(new InternetAddress(from));
-        msg.addRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
-        msg.setSubject(subject);
-        msg.setSentDate(new Date());
-        msg.setText(body);
-        Transport.send(msg);
-    } catch (MessagingException e) {
-        throw new RuntimeException(e);
+      logger.info("Error state detected, alerting...");
+      logger.error("Job failed to make progress!" + String.format("No epoch change for %d minutes.", this.maxTimeBetweenEpochsMs / (60*1000)));
     }
   }
   
diff --git a/samza-test/src/main/python/samza_failure_testing.py b/samza-test/src/main/python/samza_failure_testing.py
new file mode 100755 (executable)
index 0000000..198db26
--- /dev/null
@@ -0,0 +1,166 @@
+#################################################################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#################################################################################################################################
+
+import sys
+import paramiko
+import os.path
+import random
+import requests
+import logging
+from time import sleep
+from optparse import OptionParser
+
+#################################################################################################################################
+# A "chaos monkey"-like script that periodically kills parts of Samza, including YARN (RM, NM), Kafka, and Samza (AM, container)
+# This script depends on paramiko, an ssh library
+#################################################################################################################################
+
+# Create an ssh connection to the given host
+def connect(host):
+    client = paramiko.SSHClient()
+    client.load_system_host_keys()
+    client.set_missing_host_key_policy(paramiko.WarningPolicy())
+    client.connect(host)
+    return client
+
+# Run the given command line using the ssh connection (throw an error if anything on stderr)
+def execute(conn, cmd):
+    logging.info("Executing command {0}".format(cmd))
+    stdin, stdout, stderr = conn.exec_command(cmd)
+    output = stdout.read()
+    err = stderr.read()
+    if output:
+        logging.info(output)
+        return output
+    if err:
+        logging.error(err)
+        raise Exception("Error executing command: %s" %err)
+
+# Unfortunately pkill on mac seems to have a length limit which prevents it working with out epic java command line arguments
+def pkill(pid):
+    return "kill {0}".format(pid)
+
+def get_pid(pattern):
+    return "ps aux | grep " + pattern + " | grep -v 'grep' | awk -F' *' '{print $2}'"
+
+# Kill the kafka broker
+def kill_kafka(options):
+    connection = connect(options.kafka_host)
+    logging.info("Killing Kafka Broker ...")
+    kafka_pid = execute(connection, get_pid("kafka.Kafka"))
+    if kafka_pid is not None:
+        execute(connection, pkill(kafka_pid))
+        sleep(20)
+        logging.info("Restarting Kafka Broker...")
+        execute(connection, "nohup " + options.kafka_dir + "/bin/kafka-server-start.sh " + options.kafka_dir + "/config/server.properties > " + options.kafka_dir + "/logs/kafka.log 2>&1 &")
+        connection.close()
+    else:
+        logging.info("Could not determine Kafka broker process. Not doing anything")
+
+
+def query_yarn(host, port, query):
+    return requests.get("http://{0}:{1}/{2}".format(host, port, query))
+
+def get_app_ids_running(host, port):
+    logging.info("Querying RM for RUNNING application information")
+    list_apps_command = "ws/v1/cluster/apps?status=RUNNING"
+    response = query_yarn(host, port, list_apps_command).json()
+    if len(response) == 0 or not response['apps']:
+      raise Exception("Got an empty apps response back. Can't run kill script without Samza jobs running.")
+    apps = reduce(list.__add__, map(lambda x: list(x), response['apps'].values()))
+    appInfo = []
+    for app in apps:
+        appInfo.append((app['id'], app['name']))
+    return appInfo
+
+# Kill the samza container instances
+def kill_containers(hosts, app_id):
+    for host in hosts:
+        connection = connect(host)
+        pid = execute(connection, get_pid("samza-container-0 | grep {0}".format(app_id)))
+        if pid:
+            logging.info("Killing samza container on {0} with pid {1}".format(host, pid))
+            execute(connection, pkill(pid))
+        connection.close()
+    if pid is None:
+        logging.info("Couldn't find any container on the list of hosts. Nothing to kill :(")
+
+def require_arg(options, name):
+    if not hasattr(options, name) or getattr(options, name) is None:
+        print >> sys.stderr, "Missing required property:", name
+        sys.exit(1)
+
+# Command line options
+parser = OptionParser()
+parser.add_option("--node-list", dest="filename", help="A list of nodes in the YARN cluster", metavar="nodes.txt")
+parser.add_option("--kill-time", dest="kill_time", help="The time in seconds to sleep between", metavar="s", default=90)
+parser.add_option("--kafka-dir", dest="kafka_dir", help="The directory in which to find kafka", metavar="dir")
+parser.add_option("--kafka-host", dest="kafka_host", help="Host on which Kafka is installed", metavar="localhost")
+parser.add_option("--yarn-dir", dest="yarn_dir", help="The directory in which to find yarn", metavar="dir")
+parser.add_option("--kill-kafka", action="store_true", dest="kill_kafka", default=False, help="Should we kill Kafka?")
+parser.add_option("--kill-container", action="store_true", dest="kill_container", default=False, help="Should we kill Application Container?")
+parser.add_option("--yarn-host", dest="yarn_host", help="Host that will respond to Yarn REST queries ", metavar="localhost")
+
+(options, args) = parser.parse_args()
+
+kill_script_log_path = '/tmp/samza-kill-log.log'
+logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S', filename=kill_script_log_path, level=logging.INFO)
+console = logging.StreamHandler()
+formatter = logging.Formatter(fmt='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S')
+console.setFormatter(formatter)
+logging.getLogger('').addHandler(console)
+
+components = []
+app_info = {}
+if options.kill_container:
+    require_arg(options, 'filename')
+    require_arg(options, 'yarn_host')
+    require_arg(options, 'yarn_dir')
+    components.append("container")
+    hosts = [line.strip() for line in open(options.filename).readlines()]
+    if len(hosts) < 1:
+        print >> sys.stderr, "No hosts in host file."
+        sys.exit(1)
+    app_info = get_app_ids_running(options.yarn_host, 8088)
+
+if options.kill_kafka:
+    require_arg(options, 'kafka_host')
+    require_arg(options, 'kafka_dir')
+    components.append("kafka")
+
+if len(components) == 0:
+    parser.print_help()
+else:
+    while True:
+        kill_time = int(options.kill_time)
+        component_id = 0
+        if len(components) > 1:
+            component_id = random.randint(0, len(components) - 1)
+        kill_component = components[component_id]
+        if kill_component == "kafka":
+            logging.info("Choosing Kafka broker on {0}".format(options.kafka_host))
+            kill_kafka(options)
+            logging.info("Sleeping for {0}".format(kill_time * 2))
+            sleep(kill_time * 2)
+        elif kill_component == "container":
+            app_id_to_kill = random.randint(0, len(app_info) - 1)
+            logging.info("Choosing a Samza Container for {0}".format(app_info[app_id_to_kill]))
+            kill_containers(hosts, app_info[app_id_to_kill][0])
+            logging.info("Sleeping for {0}".format(kill_time))
+            sleep(kill_time)
index 2e54cc2..f93e4dd 100644 (file)
 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+    <appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+        <param name="DatePattern" value="'.'yyyy-MM-dd" />
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+        </layout>
+    </appender>
+
   <appender name="console" class="org.apache.log4j.ConsoleAppender">
     <param name="Target" value="System.out" />
     <layout class="org.apache.log4j.PatternLayout">
@@ -25,7 +33,8 @@
 
   <root>
     <priority value="info" />
-    <appender-ref ref="console" />
+    <appender-ref ref="console" /> 
+    <appender-ref ref="RollingAppender" />
   </root>
 
 </log4j:configuration>