SAMZA-2018: State restore improvements using RocksDB writebatch API
authorRay Matharu <rmatharu@linkedin.com>
Tue, 18 Dec 2018 21:06:04 +0000 (13:06 -0800)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Tue, 18 Dec 2018 21:06:04 +0000 (13:06 -0800)
This PR enables the RocksDbKeyValueStore to use the writeBatch API.

Author: Ray Matharu <rmatharu@linkedin.com>

Reviewers: Jacob Maes <jmakes@apache.org>, Prateek Maheshwari <pmaheshwari@apache.org>

Closes #864 from rmatharu/writebatch

samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala

index b7baede..c5a89d9 100644 (file)
@@ -172,25 +172,27 @@ class RocksDbKeyValueStore(
     }
   }
 
-  // Write batch from RocksDB API is not used currently because of: https://github.com/facebook/rocksdb/issues/262
   def putAll(entries: java.util.List[Entry[Array[Byte], Array[Byte]]]): Unit = ifOpen {
     metrics.putAlls.inc()
     val iter = entries.iterator
     var wrote = 0
     var deletes = 0
+    val writeBatch = new WriteBatch()
     while (iter.hasNext) {
       val curr = iter.next()
       if (curr.getValue == null) {
         deletes += 1
-        db.delete(writeOptions, curr.getKey)
+        writeBatch.remove(curr.getKey)
       } else {
         wrote += 1
         val key = curr.getKey
         val value = curr.getValue
         metrics.bytesWritten.inc(key.length + value.length)
-        db.put(writeOptions, key, value)
+        writeBatch.put(key, value)
       }
     }
+    db.write(writeOptions, writeBatch)
+    writeBatch.close()
     metrics.puts.inc(wrote)
     metrics.deletes.inc(deletes)
   }