[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBacke...
authorJungtaek Lim <kabhwan@gmail.com>
Wed, 13 Jun 2018 04:36:20 +0000 (12:36 +0800)
committerhyukjinkwon <gurwls223@apache.org>
Wed, 13 Jun 2018 04:36:20 +0000 (12:36 +0800)
## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21506 from HeartSaVioR/SPARK-24485.

core/src/main/scala/org/apache/spark/util/Utils.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

index 7428db2..c139db4 100644 (file)
@@ -31,6 +31,7 @@ import java.nio.file.Files
 import java.security.SecureRandom
 import java.util.{Locale, Properties, Random, UUID}
 import java.util.concurrent._
+import java.util.concurrent.TimeUnit.NANOSECONDS
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.zip.GZIPInputStream
 
@@ -434,7 +435,7 @@ private[spark] object Utils extends Logging {
     new URI("file:///" + rawFileName).getPath.substring(1)
   }
 
-    /**
+  /**
    * Download a file or directory to target directory. Supports fetching the file in a variety of
    * ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
    * on the URL parameter. Fetching directories is only supported from Hadoop-compatible
@@ -507,6 +508,14 @@ private[spark] object Utils extends Logging {
     targetFile
   }
 
+  /** Records the duration of running `body`. */
+  def timeTakenMs[T](body: => T): (T, Long) = {
+    val startTime = System.nanoTime()
+    val result = body
+    val endTime = System.nanoTime()
+    (result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0))
+  }
+
   /**
    * Download `in` to `tempFile`, then move it to `destFile`.
    *
index df722b9..118c82a 100644 (file)
 package org.apache.spark.sql.execution.streaming.state
 
 import java.io._
-import java.nio.channels.ClosedChannelException
 import java.util.Locale
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.google.common.io.ByteStreams
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
     if (loadedCurrentVersionMap.isDefined) {
       return loadedCurrentVersionMap.get
     }
-    val snapshotCurrentVersionMap = readSnapshotFile(version)
-    if (snapshotCurrentVersionMap.isDefined) {
-      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
-      return snapshotCurrentVersionMap.get
-    }
 
-    // Find the most recent map before this version that we can.
-    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
-    var lastAvailableVersion = version
-    var lastAvailableMap: Option[MapType] = None
-    while (lastAvailableMap.isEmpty) {
-      lastAvailableVersion -= 1
+    logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
+      "Reading snapshot file and delta files if needed..." +
+      "Note that this is normal for the first batch of starting query.")
 
-      if (lastAvailableVersion <= 0) {
-        // Use an empty map for versions 0 or less.
-        lastAvailableMap = Some(new MapType)
-      } else {
-        lastAvailableMap =
-          synchronized { loadedMaps.get(lastAvailableVersion) }
-            .orElse(readSnapshotFile(lastAvailableVersion))
+    val (result, elapsedMs) = Utils.timeTakenMs {
+      val snapshotCurrentVersionMap = readSnapshotFile(version)
+      if (snapshotCurrentVersionMap.isDefined) {
+        synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
+        return snapshotCurrentVersionMap.get
+      }
+
+      // Find the most recent map before this version that we can.
+      // [SPARK-22305] This must be done iteratively to avoid stack overflow.
+      var lastAvailableVersion = version
+      var lastAvailableMap: Option[MapType] = None
+      while (lastAvailableMap.isEmpty) {
+        lastAvailableVersion -= 1
+
+        if (lastAvailableVersion <= 0) {
+          // Use an empty map for versions 0 or less.
+          lastAvailableMap = Some(new MapType)
+        } else {
+          lastAvailableMap =
+            synchronized { loadedMaps.get(lastAvailableVersion) }
+              .orElse(readSnapshotFile(lastAvailableVersion))
+        }
+      }
+
+      // Load all the deltas from the version after the last available one up to the target version.
+      // The last available version is the one with a full snapshot, so it doesn't need deltas.
+      val resultMap = new MapType(lastAvailableMap.get)
+      for (deltaVersion <- lastAvailableVersion + 1 to version) {
+        updateFromDeltaFile(deltaVersion, resultMap)
       }
-    }
 
-    // Load all the deltas from the version after the last available one up to the target version.
-    // The last available version is the one with a full snapshot, so it doesn't need deltas.
-    val resultMap = new MapType(lastAvailableMap.get)
-    for (deltaVersion <- lastAvailableVersion + 1 to version) {
-      updateFromDeltaFile(deltaVersion, resultMap)
+      synchronized { loadedMaps.put(version, resultMap) }
+      resultMap
     }
 
-    synchronized { loadedMaps.put(version, resultMap) }
-    resultMap
+    logDebug(s"Loading state for $version takes $elapsedMs ms.")
+
+    result
   }
 
   private def writeUpdateToDeltaFile(
@@ -490,7 +499,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
   /** Perform a snapshot of the store to allow delta files to be consolidated */
   private def doSnapshot(): Unit = {
     try {
-      val files = fetchFiles()
+      val (files, e1) = Utils.timeTakenMs(fetchFiles())
+      logDebug(s"fetchFiles() took $e1 ms.")
+
       if (files.nonEmpty) {
         val lastVersion = files.last.version
         val deltaFilesForLastVersion =
@@ -498,7 +509,8 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
         synchronized { loadedMaps.get(lastVersion) } match {
           case Some(map) =>
             if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
-              writeSnapshotFile(lastVersion, map)
+              val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map))
+              logDebug(s"writeSnapshotFile() took $e2 ms.")
             }
           case None =>
             // The last map is not loaded, probably some other instance is in charge
@@ -517,7 +529,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
    */
   private[state] def cleanup(): Unit = {
     try {
-      val files = fetchFiles()
+      val (files, e1) = Utils.timeTakenMs(fetchFiles())
+      logDebug(s"fetchFiles() took $e1 ms.")
+
       if (files.nonEmpty) {
         val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain
         if (earliestVersionToRetain > 0) {
@@ -527,9 +541,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
             mapsToRemove.foreach(loadedMaps.remove)
           }
           val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
-          filesToDelete.foreach { f =>
-            fm.delete(f.path)
+          val (_, e2) = Utils.timeTakenMs {
+            filesToDelete.foreach { f =>
+              fm.delete(f.path)
+            }
           }
+          logDebug(s"deleting files took $e2 ms.")
           logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
             filesToDelete.mkString(", "))
         }
index 1691a63..6759fb4 100644 (file)
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.streaming.state._
 import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
 import org.apache.spark.sql.types._
-import org.apache.spark.util.{CompletionIterator, NextIterator}
+import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}
 
 
 /** Used to identify the state store for a given operator. */
@@ -97,12 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
   }
 
   /** Records the duration of running `body` for the next query progress update. */
-  protected def timeTakenMs(body: => Unit): Long = {
-    val startTime = System.nanoTime()
-    val result = body
-    val endTime = System.nanoTime()
-    math.max(NANOSECONDS.toMillis(endTime - startTime), 0)
-  }
+  protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
 
   /**
    * Set the SQL metrics related to the state store.