SAMZA-1563: Make RocksDB store base directory configurable
authorBharath Kumarasubramanian <bkumaras@linkedin.com>
Mon, 7 May 2018 19:34:26 +0000 (12:34 -0700)
committerxiliu <xiliu@linkedin.com>
Mon, 7 May 2018 19:34:26 +0000 (12:34 -0700)
xinyuiscool ^^

Author: Bharath Kumarasubramanian <bkumaras@linkedin.com>

Reviewers: Prateek M <prateekm@apache.org>

Closes #491 from bharathkk/samza-1563

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
samza-core/src/test/scala/org/apache/samza/storage/TestTaskStorageManager.scala
samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java

index 86ac427..8557480 100644 (file)
                             <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
                             <dd>Azure based coordination utils.</dd>
                         These coordination utils are currently used for intermediate stream creation.
+                        </dl>
                     </td>
                 </tr>
+
+                <tr>
+                    <td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
+                    <td class="default">
+                        <i>user.dir</i> environment property if set, else current working directory of the process
+                    </td>
+                    <td class="description">
+                        The base directory for changelog stores used by Samza application. Another way to configure the base directory is by setting environment variable <i>LOGGED_STORE_BASE_DIR</i>.
+                        <b>Note:</b> The environment variable takes precedence over <i>job.logged.store.base.dir</i>.
+
+                        <br>By opting in, users are responsible for cleaning up the store directories if necessary. Jobs using host affinity should ensure that the stores are persisted across application/container restarts.
+                        This means that the location and cleanup of this directory should be separate from the container lifecycle and resource cleanup.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="job.non-logged.store.base.dir">job.non-logged.store.base.dir</td>
+                    <td class="default">
+                        <i>user.dir</i> environment property if set, else current working directory of the process
+                    </td>
+                    <td class="description">
+                        The base directory for non-changelog stores used by Samza application.
+
+                        <br>In YARN, the default behaviour without the configuration is to create non-changelog store directories in CWD which happens to be the YARN container directory.
+                        This gets cleaned up periodically as part of NodeManager's deletion service, which is controlled by the YARN config <i>yarn.nodemanager.delete.debug-delay-sec</i>.
+
+                        <br>In non-YARN deployment models or when using a different directory other than YARN container directory, stores need to be cleaned up periodically.
+                    </td>
+                </tr>
+
                 <tr>
                                               <!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
                 <th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
index de83919..75e8005 100644 (file)
@@ -81,6 +81,13 @@ object JobConfig {
   val PROCESSOR_ID = "processor.id"
   val PROCESSOR_LIST = "processor.list"
 
+  // Represents the store path for non-changelog stores.
+  val JOB_NON_LOGGED_STORE_BASE_DIR = "job.non-logged.store.base.dir"
+
+  // Represents the store path for stores with changelog enabled. Typically the stores are not cleaned up
+  // across application restarts
+  val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -175,4 +182,8 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   }
 
   def getDebounceTimeMs = getInt(JobConfig.JOB_DEBOUNCE_TIME_MS, JobConfig.DEFAULT_DEBOUNCE_TIME_MS)
+
+  def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)
+
+  def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)
 }
index 7aec8e1..ad5cb9a 100644 (file)
@@ -76,6 +76,47 @@ object SamzaContainer extends Logging {
         classOf[JobModel])
   }
 
+  // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories
+  def getNonLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = {
+    config.getNonLoggedStorePath match {
+      case Some(nonLoggedStorePath) =>
+        new File(nonLoggedStorePath)
+      case None =>
+        defaultStoreBaseDir
+    }
+  }
+
+  // TODO: SAMZA-1701 SamzaContainer should not contain any logic related to store directories
+  def getLoggedStorageBaseDir(config: Config, defaultStoreBaseDir: File) = {
+    val defaultLoggedStorageBaseDir = config.getLoggedStorePath match {
+      case Some(durableStorePath) =>
+        new File(durableStorePath)
+      case None =>
+        defaultStoreBaseDir
+    }
+
+    var loggedStorageBaseDir:File = null
+    if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
+      val jobNameAndId = (
+        config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
+        config.getJobId.getOrElse("1")
+      )
+
+      loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
+        + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
+    } else {
+      if (config.getLoggedStorePath.isEmpty) {
+        warn("No override was provided for logged store base directory. This disables local state re-use on " +
+          "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
+          "variable in all machines running the Samza container or configure job.logged.store.base.dir for your application")
+      }
+
+      loggedStorageBaseDir = defaultLoggedStorageBaseDir
+    }
+
+    loggedStorageBaseDir
+  }
+
   def apply(
     containerId: String,
     jobModel: JobModel,
@@ -431,10 +472,6 @@ object SamzaContainer extends Logging {
       .toSet
     val containerContext = new SamzaContainerContext(containerId, config, taskNames.asJava, samzaContainerMetrics.registry)
 
-    // TODO not sure how we should make this config based, or not. Kind of
-    // strange, since it has some dynamic directories when used with YARN.
-    val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
-    info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
 
     val storeWatchPaths = new util.HashSet[Path]()
 
@@ -468,21 +505,13 @@ object SamzaContainer extends Logging {
 
       info("Got store consumers: %s" format storeConsumers)
 
-      var loggedStorageBaseDir: File = null
-      if(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR) != null) {
-        val jobNameAndId = (
-          config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")),
-          config.getJobId.getOrElse("1")
-        )
-        loggedStorageBaseDir = new File(System.getenv(ShellCommandConfig.ENV_LOGGED_STORE_BASE_DIR)
-          + File.separator + jobNameAndId._1 + "-" + jobNameAndId._2)
-      } else {
-        warn("No override was provided for logged store base directory. This disables local state re-use on " +
-          "application restart. If you want to enable this feature, set LOGGED_STORE_BASE_DIR as an environment " +
-          "variable in all machines running the Samza container")
-        loggedStorageBaseDir = defaultStoreBaseDir
-      }
+      val defaultStoreBaseDir = new File(System.getProperty("user.dir"), "state")
+      info("Got default storage engine base directory: %s" format defaultStoreBaseDir)
+
+      val nonLoggedStorageBaseDir = getNonLoggedStorageBaseDir(config, defaultStoreBaseDir)
+      info("Got base directory for non logged data stores: %s" format nonLoggedStorageBaseDir)
 
+      var loggedStorageBaseDir = getLoggedStorageBaseDir(config, defaultStoreBaseDir)
       info("Got base directory for logged data stores: %s" format loggedStorageBaseDir)
 
       val taskStores = storageEngineFactories
@@ -509,7 +538,7 @@ object SamzaContainer extends Logging {
             val storeDir = if (changeLogSystemStreamPartition != null) {
               TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
             } else {
-              TaskStorageManager.getStorePartitionDir(defaultStoreBaseDir, storeName, taskName)
+              TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName)
             }
 
             storeWatchPaths.add(storeDir.toPath)
@@ -535,7 +564,7 @@ object SamzaContainer extends Logging {
         changeLogSystemStreams = changeLogSystemStreams,
         maxChangeLogStreamPartitions,
         streamMetadataCache = streamMetadataCache,
-        storeBaseDir = defaultStoreBaseDir,
+        nonLoggedStoreBaseDir = nonLoggedStorageBaseDir,
         loggedStoreBaseDir = loggedStorageBaseDir,
         partition = taskModel.getChangelogPartition,
         systemAdmins = systemAdmins,
index 00dc20f..09744cf 100644 (file)
@@ -51,7 +51,7 @@ class TaskStorageManager(
   changeLogSystemStreams: Map[String, SystemStream] = Map(),
   changeLogStreamPartitions: Int,
   streamMetadataCache: StreamMetadataCache,
-  storeBaseDir: File = new File(System.getProperty("user.dir"), "state"),
+  nonLoggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   loggedStoreBaseDir: File = new File(System.getProperty("user.dir"), "state"),
   partition: Partition,
   systemAdmins: SystemAdmins,
@@ -84,12 +84,12 @@ class TaskStorageManager(
     debug("Cleaning base directories for stores.")
 
     taskStores.keys.foreach(storeName => {
-      val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-      info("Got default storage partition directory as %s" format storePartitionDir.toPath.toString)
+      val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName)
+      info("Got non logged storage partition directory as %s" format nonLoggedStorePartitionDir.toPath.toString)
 
-      if(storePartitionDir.exists()) {
-        info("Deleting default storage partition directory %s" format storePartitionDir.toPath.toString)
-        FileUtil.rm(storePartitionDir)
+      if(nonLoggedStorePartitionDir.exists()) {
+        info("Deleting non logged storage partition directory %s" format nonLoggedStorePartitionDir.toPath.toString)
+        FileUtil.rm(nonLoggedStorePartitionDir)
       }
 
       val loggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
@@ -179,9 +179,9 @@ class TaskStorageManager(
           info("Using logged storage partition directory: %s for store: %s." format(loggedStorePartitionDir.toPath.toString, storeName))
           if (!loggedStorePartitionDir.exists()) loggedStorePartitionDir.mkdirs()
         } else {
-          val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)
-          info("Using storage partition directory: %s for store: %s." format(storePartitionDir.toPath.toString, storeName))
-          storePartitionDir.mkdirs()
+          val nonLoggedStorePartitionDir = TaskStorageManager.getStorePartitionDir(nonLoggedStoreBaseDir, storeName, taskName)
+          info("Using non logged storage partition directory: %s for store: %s." format(nonLoggedStorePartitionDir.toPath.toString, storeName))
+          nonLoggedStorePartitionDir.mkdirs()
         }
     }
   }
index bbdb819..31d3ef6 100644 (file)
@@ -715,7 +715,7 @@ class TaskStorageManagerBuilder extends MockitoSugar {
       changeLogSystemStreams = changeLogSystemStreams,
       changeLogStreamPartitions = changeLogStreamPartitions,
       streamMetadataCache = streamMetadataCache,
-      storeBaseDir = storeBaseDir,
+      nonLoggedStoreBaseDir = storeBaseDir,
       loggedStoreBaseDir = loggedStoreBaseDir,
       partition = partition,
       systemAdmins = new SystemAdmins(systemAdmins.asJava),
index 8413194..11806f3 100644 (file)
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.commons.lang.StringUtils;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 
 
@@ -32,6 +33,7 @@ public class LocalStoreMonitorConfig extends MapConfig {
 
   /**
    * Defines the local store directory of the job.
+   * @deprecated in favor of {@link org.apache.samza.config.JobConfig#JOB_LOGGED_STORE_BASE_DIR}
    */
   static final String CONFIG_LOCAL_STORE_DIR = "job.local.store.dir";
 
@@ -68,7 +70,7 @@ public class LocalStoreMonitorConfig extends MapConfig {
    * @return the location of the job's local directory.
    */
   public String getLocalStoreBaseDir() {
-    return get(CONFIG_LOCAL_STORE_DIR);
+    return get(JobConfig.JOB_LOGGED_STORE_BASE_DIR(), get(CONFIG_LOCAL_STORE_DIR));
   }
 
   /**