SAMZA-905: Host Affinity - State restore doesn't work if the previous shutdown was...
authorJacob Maes <jacob.maes@gmail.com>
Mon, 4 Apr 2016 20:35:37 +0000 (13:35 -0700)
committerYi Pan (Data Infrastructure) <nickpan47@gmail.com>
Mon, 4 Apr 2016 20:35:37 +0000 (13:35 -0700)
docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala

index 1d9c29e..986236a 100644 (file)
@@ -21,7 +21,7 @@ title: Host Affinity & YARN
 
 In Samza, containers are the units of physical parallelism that runs on a set of machines. Each container is essentially a process that runs one or more stream tasks. Each task instance consumes one or more partitions of the input streams and is associated with its own durable data store. 
 
-We define a *Stateful Samza Job* as the Samza job that uses a key-value store in its implementation, alone with an associated changelog stream. In stateful samza jobs, there is a 1:1 mapping between the task instance and the data store. Since the allocation of containers to machines in the Yarn cluster is completely left to Yarn, Samza does not guarantee that a container (and hence, its associated task(s)) gets deployed on the same machine. Containers can get shuffled in any of the following cases:
+We define a *Stateful Samza Job* as the Samza job that uses a key-value store in its implementation, along with an associated changelog stream. In stateful samza jobs, a task may be configured to use multiple stores. For each store there is a 1:1 mapping between the task instance and the data store. Since the allocation of containers to machines in the Yarn cluster is completely left to Yarn, Samza does not guarantee that a container (and hence, its associated task(s)) gets deployed on the same machine. Containers can get shuffled in any of the following cases:
 
 1. When a job is upgraded by pointing <code>yarn.package.path</code> to the new package path and re-submitted.
 2. When a job is simply restarted by Yarn or the user
@@ -29,7 +29,7 @@ We define a *Stateful Samza Job* as the Samza job that uses a key-value store in
 
 In any of the above cases, the task's co-located data needs to be restored every time a container starts-up. Restoring data each time can be expensive, especially for applications that have a large data set. This behavior slows the start-up time for the job so much that the job is no longer "near realtime". Furthermore, if multiple stateful samza jobs restart around the same time in the cluster and they all share the same changelog system, then it is possible to quickly saturate the changelog system's network and cause a DDoS.
 
-For instance, consider a Samza job performing a Stream-Table join. Typically, such a job requires the dataset to be available on all processors before they begin processing the input stream. The dataset is usually large (order > 1TB) read-only data that will be used to join or add attributes to incoming messages. The job may initialize this cache by populated with data directly from a remote store or changelog stream. This cache initialization happens each time the container is restarted. This causes significant latency during job start-up.
+For instance, consider a Samza job performing a Stream-Table join. Typically, such a job requires the dataset to be available on all processors before they begin processing the input stream. The dataset is usually large (order > 1TB) read-only data that will be used to join or add attributes to incoming messages. The job may initialize this cache by populating it with data directly from a remote store or changelog stream. This cache initialization happens each time the container is restarted. This causes significant latency during job start-up.
 
 The solution, then, is to simply persist the state store on the machine in which the container process is executing and re-allocate the same host for the container each time the job is restarted, in order to re-use the persisted state. Thus, the ability of Samza to allocate a container to the same machine across job restarts is referred to as ***host-affinity***. Samza leverages host-affinity to enhance our support for local state re-use.
 
@@ -47,20 +47,19 @@ This allows the Node Manager's (NM) DeletionService to clean-up the working dire
 
 ![samza-host-affinity](/img/{{site.version}}/learn/documentation/yarn/samza-host-affinity.png)
 
-When a container is *cleanly shutdown*, Samza also writes the last materialized offset from the changelog stream to the checksumed file on disk. Thus, there is an *OFFSET* file associated with each state stores' changelog partitions, that is consumed by the tasks in the container.
+Each time a task commits, Samza writes the last materialized offset from the changelog stream to the checksumed file on disk. This is also done on container shutdown. Thus, there is an *OFFSET* file associated with each state stores' changelog partitions, that is consumed by the tasks in the container.
 
 {% highlight bash %}
 ${LOGGED_STORE_BASE_DIR}/${job.name}-${job.id}/${store.name}/${task.name}/OFFSET
 {% endhighlight %}
 
-Now, when a container restarts on the same machine after a clean shutdown and the OFFSET file exists, the Samza container:
+Now, when a container restarts on the same machine after the OFFSET file exists, the Samza container:
 
 1. Opens the persisted store on disk
 2. Reads the OFFSET file
-3. Deletes the OFFSET file
-4. Restores the state store from the OFFSET value
+3. Restores the state store from the OFFSET value
 
-If the OFFSET file doesn't exist, it creates the state store and consumes from the oldest offset in the changelog to re-create the state. Note that Samza optimistically deletes the OFFSET file in step 3 to prevent data from getting corrupted due to any kind of failure during state restoration. This significantly reduces the state restoration time on container start-up as we no longer consume from the beginning of the changelog stream.
+This significantly reduces the state restoration time on container start-up as we no longer consume from the beginning of the changelog stream. If the OFFSET file doesn't exist, it creates the state store and consumes from the oldest offset in the changelog to re-create the state. Since the OFFSET file is written on each commit after flushing the store, the recorded offset is guaranteed to correspond to the current contents of the store or some older point, but never newer. This gives at least once semantics for state restore. Therefore, the changelog entries must be idempotent.
 
 It is necessary to periodically clean-up unused or orphaned state stores on the machines to manage disk-space. This feature is being worked on in [SAMZA-656](https://issues.apache.org/jira/browse/SAMZA-656).
 
@@ -116,6 +115,6 @@ Enabling this feature for a stateless Samza job should not have any adverse effe
 ## Host-affinity Guarantees
 As you have observed, host-affinity cannot be guaranteed all the time due to varibale load distribution in the Yarn cluster. Hence, this is a best-effort policy that Samza provides. However, certain scenarios are worth calling out where these guarantees may be hard to achieve or are not applicable.
 
-1. _When the number of containers and/or container-task assignment changes across successive application runs_ - We may be able to re-use local state for a subset of partitions. Currently, there is no logic in the Job Coordinator to handle partitioning of tasks among containers intelligently. Handling this is more involved as relates to [auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the containers.
+1. _When the number of containers and/or container-task assignment changes across successive application runs_ - We may be able to re-use local state for a subset of partitions. Currently, there is no logic in the Job Coordinator to handle partitioning of tasks among containers intelligently. Handling this is more involved as relates to [auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the containers. However, with [task-container mapping](https://issues.apache.org/jira/browse/SAMZA-906), this will work better for typical container count adjustments.
 2. _When SystemStreamPartitionGrouper changes across successive application runs_ - When the grouper logic used to distribute the partitions across containers changes, the data in the Coordinator Stream (for changelog-task partition assignment etc) and the data stores becomes invalid. Thus, to be safe, we should flush out all state-related data from the Coordinator Stream. An alternative is to overwrite the Task-ChangelogPartition assignment message and the Container Locality message in the Coordinator Stream, before starting up the job again.
 
index 422799a..c7b0520 100644 (file)
@@ -90,7 +90,7 @@ class TaskStorageManager(
 
     taskStores.keys.foreach(storeName => {
       val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-      info("Got logged storage partition directory as %s" format storagePartitionDir.toPath.toString)
+      info("Got storage partition directory as %s" format storagePartitionDir.toPath.toString)
 
       if(storagePartitionDir.exists()) {
         debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
@@ -100,28 +100,36 @@ class TaskStorageManager(
       val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
       info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString)
 
-      var deleteLoggedStoragePartitionDir = false
-
-      val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
-      if(offsetFileRef.exists()) {
-        debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString)
-        val offset = Util.readDataFromFile(offsetFileRef)
-        if(offset != null && !offset.isEmpty) {
-          fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
-        } else {
-          deleteLoggedStoragePartitionDir = true
-        }
-        offsetFileRef.delete()
-      } else {
-        info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
-        deleteLoggedStoragePartitionDir = true
-      }
-      if(deleteLoggedStoragePartitionDir && loggedStoragePartitionDir.exists()) {
-        Util.rm(loggedStoragePartitionDir)
+      // If we find valid offsets s.t. we can restore the state, keep the disk files. Otherwise, delete them.
+      if(!readOffsetFile(storeName, loggedStoragePartitionDir) && loggedStoragePartitionDir.exists()) {
+          Util.rm(loggedStoragePartitionDir)
       }
     })
   }
 
+  /**
+    * Attempts to read the offset file and returns {@code true} if the offsets were read successfully.
+    *
+    * @param storeName                  the name of the store for which the offsets are needed
+    * @param loggedStoragePartitionDir  the directory for the store
+    * @return                           true if the offsets were read successfully, false otherwise.
+    */
+  private def readOffsetFile(storeName: String, loggedStoragePartitionDir: File): Boolean = {
+    var offsetsRead = false
+    val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
+    if(offsetFileRef.exists()) {
+      debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString)
+      val offset = Util.readDataFromFile(offsetFileRef)
+      if(offset != null && !offset.isEmpty) {
+        fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
+        offsetsRead = true
+      }
+    } else {
+      info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
+    }
+    offsetsRead
+  }
+
   private def validateChangelogStreams = {
     info("Validating change log streams")
 
@@ -186,6 +194,7 @@ class TaskStorageManager(
     debug("Flushing stores.")
 
     taskStores.values.foreach(_.flush)
+    flushChangelogOffsetFiles()
   }
 
   def stopStores() {
@@ -196,24 +205,41 @@ class TaskStorageManager(
   def stop() {
     stopStores()
 
+    flushChangelogOffsetFiles()
+  }
+
+  /**
+    * Writes the offset files for each changelog to disk.
+    * These files are used when stores are restored from disk to determine whether
+    * there is any new information in the changelog that is not reflected in the disk
+    * copy of the store. If there is any delta, it is replayed from the changelog
+    * e.g. This can happen if the job was run on this host, then another
+    * host and back to this host.
+    */
+  private def flushChangelogOffsetFiles() {
     debug("Persisting logged key value stores")
     changeLogSystemStreams.foreach { case (store, systemStream) => {
       val streamToMetadata = systemAdmins(systemStream.getSystem)
-                              .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
+              .getSystemStreamMetadata(JavaConversions.setAsJavaSet(Set(systemStream.getStream)))
       val sspMetadata = streamToMetadata
-                          .get(systemStream.getStream)
-                          .getSystemStreamPartitionMetadata
-                          .get(partition)
+              .get(systemStream.getStream)
+              .getSystemStreamPartitionMetadata
+              .get(partition)
       val newestOffset = sspMetadata.getNewestOffset
 
       if (newestOffset != null) {
         val offsetFile = new File(TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, store, taskName), offsetFileName)
-        Util.writeDataToFile(offsetFile, newestOffset)
-        info("Successfully stored offset %s for store %s in OFFSET file " format (newestOffset, store))
+
+        try {
+          Util.writeDataToFile(offsetFile, newestOffset)
+          debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, store))
+        } catch {
+          case e: Exception => error("Exception storing offset %s for store %s" format(newestOffset, store), e)
+        }
       }
       else {
         //if newestOffset is null, then it means the store is empty. No need to persist the offset file
-        info("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, store, systemStream.getStream, partition.getPartitionId))
+        debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, store, systemStream.getStream, partition.getPartitionId))
       }
     }}
   }
index f08e8dd..c8ea64c 100644 (file)
@@ -80,8 +80,8 @@ class TestTaskStorageManager extends MockitoSugar {
 
   @Test
   def testCleanBaseDirsWithOffsetFileForLoggedStore() {
-    val checkFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
-    Util.writeDataToFile(checkFilePath, "100")
+    val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
+    Util.writeDataToFile(offsetFilePath, "100")
 
     val taskStorageManager = new TaskStorageManagerBuilder()
       .addStore(loggedStore)
@@ -92,7 +92,7 @@ class TestTaskStorageManager extends MockitoSugar {
     cleanDirMethod.setAccessible(true)
     cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
 
-    assertTrue("Offset file was not removed. Clean up failed!", !checkFilePath.exists())
+    assertTrue("Offset file was removed. Clean up failed!", offsetFilePath.exists())
     assertEquals("Offset read does not match what was in the file", "100", taskStorageManager.fileOffset.get(new SystemStreamPartition("kafka", "testStream", new Partition(0))))
   }
 
@@ -115,9 +115,7 @@ class TestTaskStorageManager extends MockitoSugar {
       .build
 
     //Invoke test method
-    val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*)
-    stopMethod.setAccessible(true)
-    stopMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+    taskStorageManager.stop()
 
     //Check conditions
     assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
@@ -125,6 +123,76 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 
   @Test
+  def testFlushCreatesOffsetFileForLoggedStore() {
+    val partition = new Partition(0)
+
+    val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+
+    val mockSystemAdmin = mock[SystemAdmin]
+    val mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "100", "101")))))
+    val myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
+    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+    //Build TaskStorageManager
+    val taskStorageManager = new TaskStorageManagerBuilder()
+            .addStore(loggedStore)
+            .setSystemAdmin("kafka", mockSystemAdmin)
+            .setPartition(partition)
+            .build
+
+    //Invoke test method
+    taskStorageManager.flush()
+
+    //Check conditions
+    assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+    assertEquals("Found incorrect value in offset file!", "100", Util.readDataFromFile(offsetFilePath))
+  }
+
+  @Test
+  def testFlushOverwritesOffsetFileForLoggedStore() {
+    val partition = new Partition(0)
+
+    val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName) + File.separator + "OFFSET")
+    Util.writeDataToFile(offsetFilePath, "100")
+
+    val mockSystemAdmin = mock[SystemAdmin]
+    var mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "139", "140")))))
+    var myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
+    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+    //Build TaskStorageManager
+    val taskStorageManager = new TaskStorageManagerBuilder()
+            .addStore(loggedStore)
+            .setSystemAdmin("kafka", mockSystemAdmin)
+            .setPartition(partition)
+            .build
+
+    //Invoke test method
+    taskStorageManager.flush()
+
+    //Check conditions
+    assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+    assertEquals("Found incorrect value in offset file!", "139", Util.readDataFromFile(offsetFilePath))
+
+    // Flush again
+    mockSspMetadata = Map("testStream" -> new SystemStreamMetadata("testStream" , JavaConversions.mapAsJavaMap[Partition, SystemStreamPartitionMetadata](Map(partition -> new SystemStreamPartitionMetadata("20", "193", "194")))))
+    myMap = JavaConversions.mapAsJavaMap[String, SystemStreamMetadata](mockSspMetadata)
+    when(mockSystemAdmin.getSystemStreamMetadata(any(JavaConversions.setAsJavaSet(Set("")).getClass))).thenReturn(myMap)
+
+    //Invoke test method
+    taskStorageManager.flush()
+
+    //Check conditions
+    assertTrue("Offset file doesn't exist!", offsetFilePath.exists())
+    assertEquals("Found incorrect value in offset file!", "193", Util.readDataFromFile(offsetFilePath))
+  }
+
+  @Test
+  def testFlushOffsetFileExceptionsHandledGracefully(): Unit = {
+
+  }
+
+  @Test
   def testStopShouldNotCreateOffsetFileForEmptyStore() {
     val partition = new Partition(0)
 
@@ -143,9 +211,7 @@ class TestTaskStorageManager extends MockitoSugar {
       .build
 
     //Invoke test method
-    val stopMethod = taskStorageManager.getClass.getDeclaredMethod("stop", new Array[java.lang.Class[_]](0):_*)
-    stopMethod.setAccessible(true)
-    stopMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+    taskStorageManager.stop()
 
     //Check conditions
     assertTrue("Offset file should not exist!", !offsetFilePath.exists())