SAMZA-1768: Handle corrupted OFFSET file
authorXinyu Liu <xinyu@apache.org>
Mon, 30 Jul 2018 18:46:39 +0000 (11:46 -0700)
committerxiliu <xiliu@linkedin.com>
Mon, 30 Jul 2018 18:46:39 +0000 (11:46 -0700)
This patch addresses the following tickets:

SAMZA-1778: SIGSEGV when reading properties (metrics) on a closed RocksDB store
SAMZA-1777: Logged store OFFSET file write during flush should be atomic
SAMZA-1768: Handle corrupted OFFSET file elegantly

Author: xinyuiscool <xiliu@linkedin.com>

Reviewers: Prateek M <prateek@apache.org>

Closes #588 from xinyuiscool/SAMZA-1768

samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
samza-core/src/main/scala/org/apache/samza/util/FileUtil.scala
samza-core/src/test/scala/org/apache/samza/util/TestFileUtil.scala
samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala

index 731a84d..e7301ea 100644 (file)
@@ -128,7 +128,11 @@ public class StorageManagerUtil {
 
     if (offsetFileRef.exists()) {
       LOG.info("Found offset file in storage partition directory: {}", storePath);
-      offset = FileUtil.readWithChecksum(offsetFileRef);
+      try {
+        offset = FileUtil.readWithChecksum(offsetFileRef);
+      } catch (Exception e) {
+        LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e);
+      }
     } else {
       LOG.info("No offset file found in storage partition directory: {}", storePath);
     }
index 46a2089..0845b5c 100644 (file)
@@ -22,6 +22,7 @@
 package org.apache.samza.util
 
 import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}
+import java.nio.file._
 import java.util.zip.CRC32
 
 import org.apache.samza.util.Util.info
@@ -35,10 +36,12 @@ object FileUtil {
     * */
   def writeWithChecksum(file: File, data: String): Unit = {
     val checksum = getChecksum(data)
+    val tmpFilePath = file.getAbsolutePath + ".tmp"
+    val tmpFile = new File(tmpFilePath)
     var oos: ObjectOutputStream = null
     var fos: FileOutputStream = null
     try {
-      fos = new FileOutputStream(file)
+      fos = new FileOutputStream(tmpFile)
       oos = new ObjectOutputStream(fos)
       oos.writeLong(checksum)
       oos.writeUTF(data)
@@ -46,6 +49,14 @@ object FileUtil {
       oos.close()
       fos.close()
     }
+
+    //atomic swap of tmp and real offset file
+    try {
+      Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING)
+    } catch {
+      case e: AtomicMoveNotSupportedException =>
+        Files.move(tmpFile.toPath, file.toPath, StandardCopyOption.REPLACE_EXISTING)
+    }
   }
 
   /**
index 5bb6da7..ddda268 100644 (file)
@@ -49,6 +49,28 @@ class TestFileUtil {
   }
 
   @Test
+  def testWriteDataToFileWithExistingOffsetFile() {
+    // Invoke test
+    val file = new File(System.getProperty("java.io.tmpdir"), "test2")
+    // write the same file three times
+    FileUtil.writeWithChecksum(file, data)
+    FileUtil.writeWithChecksum(file, data)
+    FileUtil.writeWithChecksum(file, data)
+
+    // Check that file exists
+    assertTrue("File was not created!", file.exists())
+    val fis = new FileInputStream(file)
+    val ois = new ObjectInputStream(fis)
+
+    // Check content of the file is as expected
+    assertEquals(checksum, ois.readLong())
+    assertEquals(data, ois.readUTF())
+    ois.close()
+    fis.close()
+  }
+
+
+  @Test
   def testReadDataFromFile() {
     // Setup
     val fos = new FileOutputStream(file)
index f25097c..836dab4 100644 (file)
@@ -91,7 +91,14 @@ object RocksDbKeyValueStore extends Logging {
         .toSet
 
       (configuredMetrics ++ rocksDbMetrics)
-        .foreach(property => metrics.newGauge(property, () => rocksDb.getProperty(property)))
+        .foreach(property => metrics.newGauge(property, () =>
+          // Check isOwningHandle flag. The db is open iff the flag is true.
+          if (rocksDb.isOwningHandle) {
+            rocksDb.getProperty(property)
+          } else {
+            "0"
+          }
+        ))
 
       rocksDb
     } catch {