SAMZA-1174; Profiling state store performance
authorjmehar2 <jmehar2@illinois.edu>
Thu, 11 May 2017 19:56:31 +0000 (12:56 -0700)
committervjagadish1989 <jvenkatr@linkedin.com>
Thu, 11 May 2017 19:56:31 +0000 (12:56 -0700)
This is s commit for [SAMZA-1174](https://issues.apache.org/jira/browse/SAMZA-1174). This commit involves gathering a log of operations (read, write, delete, etc.) happening on the state and publishing them into a kafka topic. It is names as "Access Log" behaving similar to changelog, but gathering log information.

Author: jmehar2 <jmehar2@illinois.edu>
Author: Jayasi Mehar <jayasi05@gmail.com>
Author: s-noghabi <abdolla2@illinois.edu>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>, Jagadish V <jagadish@apache.org>

Closes #132 from s-noghabi/master

samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala
samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala
samza-core/src/main/scala/org/apache/samza/serializers/SerdeManager.scala
samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala [new file with mode: 0644]
samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala [new file with mode: 0644]
samza-kv/src/main/scala/org/apache/samza/storage/kv/BaseKeyValueStorageEngineFactory.scala

index 8dbf739..0e3d568 100644 (file)
@@ -35,6 +35,11 @@ object StorageConfig {
   val CHANGELOG_SYSTEM = "job.changelog.system"
   val CHANGELOG_DELETE_RETENTION_MS = "stores.%s.changelog.delete.retention.ms"
   val DEFAULT_CHANGELOG_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1)
+  val ACCESSLOG_STREAM_SUFFIX = "access-log"
+  val ACCESSLOG_SAMPLING_RATIO = "stores.%s.accesslog.sampling.ratio"
+  val ACCESSLOG_ENABLED = "stores.%s.accesslog.enabled"
+  val DEFAULT_ACCESSLOG_SAMPLING_RATIO = 50
+
 
   implicit def Config2Storage(config: Config) = new StorageConfig(config)
 }
@@ -45,11 +50,24 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging
   def getStorageKeySerde(name: String) = getOption(StorageConfig.KEY_SERDE format name)
   def getStorageMsgSerde(name: String) = getOption(StorageConfig.MSG_SERDE format name)
 
+  def getAccessLogEnabled(storeName: String) = {
+    getBoolean(ACCESSLOG_ENABLED format storeName, false)
+  }
+
   def getChangelogStream(name: String) = {
     val javaStorageConfig = new JavaStorageConfig(config)
     Option(javaStorageConfig.getChangelogStream(name))
   }
 
+  //Returns the accesslog stream name given a changelog stream name
+  def getAccessLogStream(changeLogStream: String) = {
+    changeLogStream + "-" + ACCESSLOG_STREAM_SUFFIX
+  }
+
+  def getAccessLogSamplingRatio(storeName: String) = {
+    getInt(ACCESSLOG_SAMPLING_RATIO format storeName, DEFAULT_ACCESSLOG_SAMPLING_RATIO)
+  }
+
   def getChangeLogDeleteRetentionInMs(storeName: String) = {
     getLong(CHANGELOG_DELETE_RETENTION_MS format storeName, DEFAULT_CHANGELOG_DELETE_RETENTION_MS)
   }
index dda0b6b..353e297 100644 (file)
@@ -48,6 +48,7 @@ import org.apache.samza.system.SystemFactory
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.SystemStreamPartitionMatcher
 import org.apache.samza.system.SystemAdmin
+import org.apache.samza.system.StreamSpec
 import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import org.apache.samza.Partition
@@ -138,6 +139,7 @@ object JobModelManager extends Logging {
     changelogManager.writeChangeLogPartitionMapping(newChangelogPartitionMapping.asJava)
 
     createChangeLogStreams(config, jobModel.maxChangeLogStreamPartitions)
+    createAccessLogStreams(config, jobModel.maxChangeLogStreamPartitions)
 
     jobModelManager
   }
@@ -298,6 +300,28 @@ object JobModelManager extends Logging {
     }
   }
 
+  private def createAccessLogStreams(config: StorageConfig, changeLogPartitions: Int): Unit = {
+    val changeLogSystemStreams = config
+      .getStoreNames
+      .filter(config.getChangelogStream(_).isDefined)
+      .map(name => (name, config.getChangelogStream(name).get)).toMap
+      .mapValues(Util.getSystemStreamFromNames(_))
+
+    for ((storeName, systemStream) <- changeLogSystemStreams) {
+      val accessLog = config.getAccessLogEnabled(storeName)
+      if (accessLog) {
+        val systemAdmin = Util.getObj[SystemFactory](config
+          .getSystemFactory(systemStream.getSystem)
+          .getOrElse(throw new SamzaException("A stream uses system %s, which is missing from the configuration." format systemStream.getSystem))
+        ).getAdmin(systemStream.getSystem, config)
+
+        val accessLogSpec = new StreamSpec(config.getAccessLogStream(systemStream.getStream),
+          config.getAccessLogStream(systemStream.getStream), systemStream.getSystem, changeLogPartitions)
+        systemAdmin.createStream(accessLogSpec)
+      }
+    }
+  }
+
   private def getSystemNames(config: Config) = config.getSystemNames.toSet
 
 }
index 066d894..4540bce 100644 (file)
@@ -24,6 +24,7 @@ import org.apache.samza.config.SerializerConfig
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.OutgoingMessageEnvelope
 import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.config.StorageConfig
 
 class SerdeManager(
   serdes: Map[String, Serde[Object]] = Map(),
@@ -38,7 +39,8 @@ class SerdeManager(
     .toBytes(obj)
 
   def toBytes(envelope: OutgoingMessageEnvelope): OutgoingMessageEnvelope = {
-    val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+    val key = if (changeLogSystemStreams.contains(envelope.getSystemStream)
+      || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
       // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
       envelope.getKey
     } else if (envelope.getKeySerializerName != null) {
@@ -55,7 +57,8 @@ class SerdeManager(
       envelope.getKey
     }
 
-    val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)) {
+    val message = if (changeLogSystemStreams.contains(envelope.getSystemStream)
+      || envelope.getSystemStream.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
       // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
       envelope.getMessage
     } else if (envelope.getMessageSerializerName != null) {
@@ -90,7 +93,8 @@ class SerdeManager(
     .fromBytes(bytes)
 
   def fromBytes(envelope: IncomingMessageEnvelope) = {
-    val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+    val key = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
+      || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX) ) {
       // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
       envelope.getKey
     } else if (systemStreamKeySerdes.contains(envelope.getSystemStreamPartition)) {
@@ -104,7 +108,8 @@ class SerdeManager(
       envelope.getKey
     }
 
-    val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)) {
+    val message = if (changeLogSystemStreams.contains(envelope.getSystemStreamPartition.getSystemStream)
+      || envelope.getSystemStreamPartition.getStream.endsWith(StorageConfig.ACCESSLOG_STREAM_SUFFIX)) {
       // If the stream is a change log stream, don't do any serde. It is up to storage engines to handle serde.
       envelope.getMessage
     } else if (systemStreamMessageSerdes.contains(envelope.getSystemStreamPartition)) {
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLogMessage.scala
new file mode 100644 (file)
index 0000000..dde5599
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+import java.io.Serializable
+import java.io.ByteArrayOutputStream
+import java.io.ObjectOutputStream
+import java.util.ArrayList
+
+class AccessLogMessage(val DBOperation: Int,
+    val duration: Long,
+    val keys: ArrayList[Array[Byte]],
+    val timestamp: Long = System.currentTimeMillis()
+   ) extends Serializable {
+
+
+  def serialize() : Array[Byte] = {
+    val byteStream = new ByteArrayOutputStream()
+    val outputStream = new ObjectOutputStream(byteStream)
+    outputStream.writeObject(this)
+    outputStream.close
+    val obj: Array[Byte] = byteStream.toByteArray
+    byteStream.close
+    return obj
+  }
+
+}
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/AccessLoggedStore.scala
new file mode 100644 (file)
index 0000000..c21c9a6
--- /dev/null
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.kv
+
+
+import java.util
+import org.apache.samza.config.StorageConfig
+import org.apache.samza.task.MessageCollector
+import org.apache.samza.util.Logging
+import org.apache.samza.system.{OutgoingMessageEnvelope, SystemStream, SystemStreamPartition}
+import org.apache.samza.serializers._
+
+class AccessLoggedStore[K, V](
+    val store: KeyValueStore[K, V],
+    val collector: MessageCollector,
+    val changelogSystemStreamPartition: SystemStreamPartition,
+    val storageConfig: StorageConfig,
+    val storeName: String,
+    val keySerde: Serde[K]) extends KeyValueStore[K, V] with Logging {
+
+  object DBOperation extends Enumeration {
+    type DBOperation = Int
+    val READ = 1
+    val WRITE = 2
+    val DELETE = 3
+    val RANGE = 4
+  }
+
+  val streamName = storageConfig.getAccessLogStream(changelogSystemStreamPartition.getSystemStream.getStream)
+  val systemStream = new SystemStream(changelogSystemStreamPartition.getSystemStream.getSystem, streamName)
+  val partitionId: Int = changelogSystemStreamPartition.getPartition.getPartitionId
+  val serializer = new LongSerde()
+  val samplingRatio = storageConfig.getAccessLogSamplingRatio(storeName)
+  val rng = scala.util.Random
+
+  def get(key: K): V = {
+    val list = new util.ArrayList[Array[Byte]]
+    list.add(toBytesOrNull(key))
+    logAccess(DBOperation.READ, list, store.get(key))
+  }
+
+  def getAll(keys: util.List[K]): util.Map[K, V] = {
+    logAccess(DBOperation.READ, serializeKeys(keys), store.getAll(keys))
+  }
+
+  def put(key: K, value: V): Unit = {
+    val list = new util.ArrayList[Array[Byte]]
+    list.add(toBytesOrNull(key))
+    logAccess(DBOperation.WRITE, list, store.put(key, value))
+  }
+
+  def putAll(entries: util.List[Entry[K, V]]): Unit = {
+    logAccess(DBOperation.WRITE, serializeKeysFromEntries(entries), store.putAll(entries))
+  }
+
+  def delete(key: K): Unit = {
+    val list = new util.ArrayList[Array[Byte]]
+    list.add(toBytesOrNull(key))
+    logAccess(DBOperation.DELETE, list, store.delete(key))
+  }
+
+  def deleteAll(keys: util.List[K]): Unit = {
+    logAccess(DBOperation.DELETE, serializeKeys(keys), store.deleteAll(keys))
+  }
+
+  def range(from: K, to: K): KeyValueIterator[K, V] = {
+    val list : util.ArrayList[K] = new util.ArrayList[K]()
+    list.add(from)
+    list.add(to)
+    logAccess(DBOperation.RANGE, serializeKeys(list), store.range(from, to))
+  }
+
+  def all(): KeyValueIterator[K, V] = {
+    store.all()
+  }
+
+  def close(): Unit = {
+    trace("Closing accessLogged store.")
+
+    store.close
+  }
+
+  def flush(): Unit = {
+    trace("Flushing store.")
+
+    store.flush
+    trace("Flushed store.")
+  }
+
+
+  def serializeKeys(keys: util.List[K]): util.ArrayList[Array[Byte]] = {
+    val keysInBytes = new util.ArrayList[Array[Byte]]
+    val iter = keys.iterator
+    if (iter != null)
+      while(iter.hasNext()) {
+        val entry = iter.next()
+        keysInBytes.add(toBytesOrNull(entry))
+      }
+
+    keysInBytes
+  }
+
+  def serializeKeysFromEntries(list: util.List[Entry[K, V]]): util.ArrayList[Array[Byte]] = {
+    val keysInBytes = new util.ArrayList[Array[Byte]]
+    val iter = list.iterator
+    if (iter != null)
+      while(iter.hasNext()) {
+        val entry = iter.next().getKey
+        keysInBytes.add(toBytesOrNull(entry))
+      }
+
+    keysInBytes
+  }
+
+  private def logAccess[R](dBOperation: Int, keys: util.ArrayList[Array[Byte]],
+                                                block: => R):R = {
+    val startTimeNs = System.nanoTime()
+    val result = block
+    val endTimeNs = System.nanoTime()
+    if (rng.nextInt() < samplingRatio) {
+      val duration = endTimeNs - startTimeNs
+      val timeStamp = System.currentTimeMillis()
+      val message = new AccessLogMessage(dBOperation, duration, keys)
+      collector.send(new OutgoingMessageEnvelope(systemStream, partitionId, serializer.toBytes(timeStamp), message.serialize()))
+    }
+
+    result
+  }
+
+  def toBytesOrNull(key: K): Array[Byte] = {
+    if (key == null) {
+      return null
+    }
+    val bytes = keySerde.toBytes(key)
+    bytes
+  }
+
+}
index 8ffc817..e3a2970 100644 (file)
@@ -82,6 +82,7 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
     val storageConfig = containerContext.config.subset("stores." + storeName + ".", true)
     val storeFactory = storageConfig.get("factory")
     var storePropertiesBuilder = new StoreProperties.StorePropertiesBuilder()
+    val accessLog = storageConfig.getBoolean("accesslog.enabled", false)
 
     if (storeFactory == null) {
       throw new SamzaException("Store factory not defined. Cannot proceed with KV store creation!")
@@ -129,8 +130,14 @@ trait BaseKeyValueStorageEngineFactory[K, V] extends StorageEngineFactory[K, V]
       serialized
     }
 
+    val maybeAccessLoggedStore = if (accessLog) {
+      new AccessLoggedStore(maybeCachedStore, collector, changeLogSystemStreamPartition, storageConfig, storeName, keySerde)
+    } else {
+      maybeCachedStore
+    }
+
     // wrap with null value checking
-    val nullSafeStore = new NullSafeKeyValueStore(maybeCachedStore)
+    val nullSafeStore = new NullSafeKeyValueStore(maybeAccessLoggedStore)
 
     // create the storage engine and return
     // TODO: Decide if we should use raw bytes when restoring