SAMZA-1083 Do not load task stores which are older than delete tombstones during...
authorShanthoosh Venkataraman <santhoshvenkat1988@gmail.com>
Thu, 9 Feb 2017 00:07:27 +0000 (16:07 -0800)
committerJacob Maes <jmaes@linkedin.com>
Thu, 9 Feb 2017 00:07:27 +0000 (16:07 -0800)
samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java
samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala

index 9329edf..9471a23 100644 (file)
@@ -30,6 +30,7 @@ import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaStorageConfig;
 import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.config.StorageConfig;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.job.model.ContainerModel;
@@ -238,7 +239,6 @@ public class StorageRecovery extends CommandLine {
             taskStores.put(storeName, storageEngine);
           }
         }
-
         TaskStorageManager taskStorageManager = new TaskStorageManager(
             taskModel.getTaskName(),
             Util.javaMapAsScalaMap(taskStores),
@@ -247,8 +247,11 @@ public class StorageRecovery extends CommandLine {
             maxPartitionNumber,
             streamMetadataCache,
             storeBaseDir,
-            storeBaseDir, taskModel.getChangelogPartition(),
-            Util.javaMapAsScalaMap(systemAdmins));
+            storeBaseDir,
+            taskModel.getChangelogPartition(),
+            Util.javaMapAsScalaMap(systemAdmins),
+            new StorageConfig(jobConfig).getChangeLogDeleteRetentionsInMs(),
+            new SystemClock());
 
         taskStorageManagers.add(taskStorageManager);
       }
index a3587d0..3785011 100644 (file)
@@ -20,8 +20,8 @@
 package org.apache.samza.config
 
 
+import java.util.concurrent.TimeUnit
 import org.apache.samza.SamzaException
-
 import scala.collection.JavaConversions._
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
@@ -33,6 +33,8 @@ object StorageConfig {
   val MSG_SERDE = "stores.%s.msg.serde"
   val CHANGELOG_STREAM = "stores.%s.changelog"
   val CHANGELOG_SYSTEM = "job.changelog.system"
+  val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms"
+  val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1)
 
   implicit def Config2Storage(config: Config) = new StorageConfig(config)
 }
@@ -42,6 +44,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
   def getStorageFactoryClassName(name: String) = getOption(FACTORY.format(name))
   def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
   def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
+
   def getChangelogStream(name: String) = {
     // If the config specifies 'stores.<storename>.changelog' as '<system>.<stream>' combination - it will take precedence.
     // If this config only specifies <astream> and there is a value in job.changelog.system=<asystem> -
@@ -63,12 +66,24 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
     systemStreamRes
   }
 
+  def getChangeLogDeleteRetentionInMs(storeName: String) = {
+    getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
+  }
+
   def getStoreNames: Seq[String] = {
     val conf = config.subset("stores.", true)
     conf.keys.filter(k => k.endsWith(".factory")).map(k => k.substring(0, k.length - ".factory".length)).toSeq
   }
 
   /**
+    * Build a map of storeName to changeLogDeleteRetention for all of the stores.
+    * @return a map from storeName to the changeLogDeleteRetention of the store in ms.
+    */
+  def getChangeLogDeleteRetentionsInMs: Map[String, Long] = {
+    Map(getStoreNames map {storeName => (storeName, getChangeLogDeleteRetentionInMs(storeName))} : _*)
+  }
+
+  /**
    * Helper method to check if a system has a changelog attached to it.
    */
   def isChangelogSystem(systemName: String) = {
index c3308bf..89522dc 100644 (file)
@@ -30,7 +30,7 @@ import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.config.MetricsConfig.Config2Metrics
 import org.apache.samza.config.SerializerConfig.Config2Serializer
-import org.apache.samza.config.{Config, ShellCommandConfig}
+import org.apache.samza.config.{Config, ShellCommandConfig, StorageConfig}
 import org.apache.samza.config.StorageConfig.Config2Storage
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.config.SystemConfig.Config2System
@@ -76,6 +76,8 @@ import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Throttleable
 import org.apache.samza.util.MetricsReporterLoader
+import org.apache.samza.util.ThrottlingExecutor
+import org.apache.samza.util.SystemClock
 import org.apache.samza.util.Util
 import org.apache.samza.util.Util.asScalaClock
 
@@ -582,7 +584,9 @@ object SamzaContainer extends Logging {
         storeBaseDir = defaultStoreBaseDir,
         loggedStoreBaseDir = loggedStorageBaseDir,
         partition = taskModel.getChangelogPartition,
-        systemAdmins = systemAdmins)
+        systemAdmins = systemAdmins,
+        new StorageConfig(config).getChangeLogDeleteRetentionsInMs,
+        new SystemClock)
 
       val systemStreamPartitions = taskModel
         .getSystemStreamPartitions
index 0b7bcdd..695f53a 100644 (file)
@@ -22,10 +22,20 @@ package org.apache.samza.storage
 import java.io._
 import java.util
 
+import org.apache.samza.config.StorageConfig
 import org.apache.samza.{Partition, SamzaException}
 import org.apache.samza.container.TaskName
-import org.apache.samza.system._
-import org.apache.samza.util.{Logging, Util}
+import org.apache.samza.system.StreamMetadataCache
+import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.system.SystemStreamPartition
+import org.apache.samza.system.SystemStreamPartitionIterator
+import org.apache.samza.system.ExtendedSystemAdmin
+import org.apache.samza.system.SystemStreamMetadata
+import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
+import org.apache.samza.util.Clock
 
 import scala.collection.{JavaConversions, Map}
 
@@ -53,7 +63,9 @@ class TaskStorageManager(
   storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   partition: Partition,
-  systemAdmins: Map[String, SystemAdmin]) extends Logging {
+  systemAdmins: Map[String, SystemAdmin],
+  changeLogDeleteRetentionsInMs: Map[String, Long],
+  clock: Clock) extends Logging {
 
   var taskStoresToRestore = taskStores.filter{
     case (storeName, storageEngine) => storageEngine.getStoreProperties.isLoggedStore
@@ -82,57 +94,121 @@ class TaskStorageManager(
 
     taskStores.keys.foreach(storeName => {
       val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-      info("Got storage partition directory as %s" format storagePartitionDir.toPath.toString)
+      info("Got default storage partition directory as %s" format storagePartitionDir.toPath.toString)
 
       if(storagePartitionDir.exists()) {
-        debug("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
+        info("Deleting default storage partition directory %s" format storagePartitionDir.toPath.toString)
         Util.rm(storagePartitionDir)
       }
 
-      val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
-      info("Got logged storage partition directory as %s" format loggedStoragePartitionDir.toPath.toString)
-      // If we find valid offsets s.t. we can restore the state, keep the disk files. Otherwise, delete them.
-      if (!persistedStores.contains(storeName) ||
-        (loggedStoragePartitionDir.exists() && !readOffsetFile(storeName, loggedStoragePartitionDir))) {
-        Util.rm(loggedStoragePartitionDir)
+      val loggedStoreDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+      info("Got logged storage partition directory as %s" format loggedStoreDir.toPath.toString)
+
+      // Delete the logged store if it is not valid.
+      if (!isLoggedStoreValid(storeName, loggedStoreDir)) {
+        info("Deleting logged storage partition directory %s." format loggedStoreDir.toPath.toString)
+        Util.rm(loggedStoreDir)
+      } else {
+        val offset = readOffsetFile(loggedStoreDir)
+        info("Read offset %s for the store %s from logged storage partition directory %s." format(offset, storeName, loggedStoreDir))
+        fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
       }
     })
   }
 
+  /**
+    * Directory {@code loggedStoreDir} associated with the logged store {@code storeName} is valid,
+    * if all of the following conditions are true.
+    * a) If the store has to be persisted to disk.
+    * b) If there is a valid offset file associated with the logged store.
+    * c) If the logged store has not gone stale.
+    *
+    * @return true if the logged store is valid, false otherwise.
+    */
+  private def isLoggedStoreValid(storeName: String, loggedStoreDir: File): Boolean = {
+    val changeLogDeleteRetentionInMs = changeLogDeleteRetentionsInMs.getOrElse(storeName,
+                                                                               StorageConfig.DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
+    persistedStores.contains(storeName) && isOffsetFileValid(loggedStoreDir) &&
+      !isStaleLoggedStore(loggedStoreDir, changeLogDeleteRetentionInMs)
+  }
+
+  /**
+    * Determines if the logged store directory {@code loggedStoreDir} is stale. A store is stale if the following condition is true.
+    *
+    *  ((CurrentTime) - (LastModifiedTime of the Offset file) is greater than the changelog's tombstone retention).
+    *
+    * @param loggedStoreDir the base directory of the local change-logged store.
+    * @param changeLogDeleteRetentionInMs the delete retention of the changelog in milli seconds.
+    * @return true if the store is stale, false otherwise.
+    *
+    */
+  private def isStaleLoggedStore(loggedStoreDir: File, changeLogDeleteRetentionInMs: Long): Boolean = {
+    var isStaleStore = false
+    val storePath = loggedStoreDir.toPath.toString
+    if (loggedStoreDir.exists()) {
+      val offsetFileRef = new File(loggedStoreDir, offsetFileName)
+      val offsetFileLastModifiedTime = offsetFileRef.lastModified()
+      if ((clock.currentTimeMillis() - offsetFileLastModifiedTime) >= changeLogDeleteRetentionInMs) {
+        info ("Store: %s is stale since lastModifiedTime of offset file: %s, " +
+          "is older than changelog deleteRetentionMs: %s." format(storePath, offsetFileLastModifiedTime, changeLogDeleteRetentionInMs))
+        isStaleStore = true
+      }
+    } else {
+      info("Logged storage partition directory: %s does not exist." format storePath)
+    }
+    isStaleStore
+  }
+
+  /**
+    * An offset file associated with logged store {@code loggedStoreDir} is valid if it exists and is not empty.
+    *
+    * @return true if the offset file is valid. false otherwise.
+    */
+  private def isOffsetFileValid(loggedStoreDir: File): Boolean = {
+    var hasValidOffsetFile = false
+    if (loggedStoreDir.exists()) {
+      val offsetContents = readOffsetFile(loggedStoreDir)
+      if (offsetContents != null && !offsetContents.isEmpty) {
+        hasValidOffsetFile = true
+      } else {
+        info("Offset file is not valid for store: %s." format loggedStoreDir.toPath.toString)
+      }
+    }
+    hasValidOffsetFile
+  }
+
   private def setupBaseDirs() {
     debug("Setting up base directories for stores.")
     taskStores.foreach {
       case (storeName, storageEngine) =>
         if (storageEngine.getStoreProperties.isLoggedStore) {
           val loggedStoragePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
+          info("Using logged storage partition directory: %s for store: %s." format(loggedStoragePartitionDir.toPath.toString, storeName))
           if (!loggedStoragePartitionDir.exists()) loggedStoragePartitionDir.mkdirs()
         } else {
-          TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName).mkdirs()
+          val storagePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
+          info("Using storage partition directory: %s for store: %s." format(storagePartitionDir.toPath.toString, storeName))
+          storagePartitionDir.mkdirs()
         }
     }
   }
 
   /**
-    * Attempts to read the offset file and returns {@code true} if the offsets were read successfully.
+    * Read and return the contents of the offset file.
     *
-    * @param storeName                  the name of the store for which the offsets are needed
-    * @param loggedStoragePartitionDir  the directory for the store
-    * @return                           true if the offsets were read successfully, false otherwise.
+    * @param loggedStoragePartitionDir the base directory of the store
+    * @return the content of the offset file if it exists for the store, null otherwise.
     */
-  private def readOffsetFile(storeName: String, loggedStoragePartitionDir: File): Boolean = {
-    var offsetsRead = false
+  private def readOffsetFile(loggedStoragePartitionDir: File): String = {
+    var offset : String = null
     val offsetFileRef = new File(loggedStoragePartitionDir, offsetFileName)
-    if(offsetFileRef.exists()) {
-      debug("Found offset file in partition directory: %s" format offsetFileRef.toPath.toString)
-      val offset = Util.readDataFromFile(offsetFileRef)
-      if(offset != null && !offset.isEmpty) {
-        fileOffset.put(new SystemStreamPartition(changeLogSystemStreams(storeName), partition), offset)
-        offsetsRead = true
-      }
+    if (offsetFileRef.exists()) {
+      info("Found offset file in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
+      offset = Util.readDataFromFile(offsetFileRef)
     } else {
       info("No offset file found in logged storage partition directory: %s" format loggedStoragePartitionDir.toPath.toString)
     }
-    offsetsRead
+    offset
   }
 
   private def validateChangelogStreams() = {
@@ -256,7 +332,7 @@ class TaskStorageManager(
           if (offsetFile.exists()) {
             Util.rm(offsetFile)
           }
-          debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic : %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
+          debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
         }
       } catch {
         case e: Exception => error("Exception storing offset for store %s. Skipping." format(storeName), e)
index 4d40f52..c82e6b1 100644 (file)
@@ -24,10 +24,13 @@ import java.io.File
 import java.util
 
 import org.apache.samza.Partition
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.StorageConfig
 import org.apache.samza.container.TaskName
 import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder
 import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
+import org.apache.samza.util.SystemClock
 import org.apache.samza.util.Util
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -307,6 +310,29 @@ class TestTaskStorageManager extends MockitoSugar {
   }
 
   @Test
+  def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() {
+    // This test ensures that store gets deleted when lastModifiedTime of the offset file
+    // is older than deletionRetention of the changeLog.
+    val storeDirectory = TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName)
+    val offsetFile = new File(storeDirectory, "OFFSET")
+    offsetFile.createNewFile()
+    Util.writeDataToFile(offsetFile, "Test Offset Data")
+    offsetFile.setLastModified(0)
+    val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false)
+      .addStore(loggedStore, true)
+      .build
+
+    val cleanDirMethod = taskStorageManager.getClass
+      .getDeclaredMethod("cleanBaseDirs",
+        new Array[java.lang.Class[_]](0):_*)
+    cleanDirMethod.setAccessible(true)
+    cleanDirMethod.invoke(taskStorageManager, new Array[Object](0):_*)
+
+    assertTrue("Offset file was found in store partition directory. Clean up failed!", !offsetFile.exists())
+    assertTrue("Store directory exists. Clean up failed!", !storeDirectory.exists())
+  }
+
+  @Test
   def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() {
     val offsetFilePath = new File(TaskStorageManager.getStorePartitionDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName), "OFFSET")
     Util.writeDataToFile(offsetFilePath, "100")
@@ -556,7 +582,9 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       storeBaseDir = storeBaseDir,
       loggedStoreBaseDir = loggedStoreBaseDir,
       partition = partition,
-      systemAdmins = systemAdmins
+      systemAdmins = systemAdmins,
+      new StorageConfig(new MapConfig()).getChangeLogDeleteRetentionsInMs,
+      SystemClock.instance
     )
   }
 }
index 9320cf7..770220c 100644 (file)
@@ -146,6 +146,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     val kafkaChangeLogProperties = new Properties
     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
+    kafkaChangeLogProperties.setProperty("delete.retention.ms", String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
     filteredConfigs.foreach{kv => kafkaChangeLogProperties.setProperty(kv._1, kv._2)}
     kafkaChangeLogProperties
   }