SAMZA-1733: Populating ListGauge metric using DiagnosticsAppender for exceptions
authorRay Matharu <rmatharu@linkedin.com>
Fri, 27 Jul 2018 18:53:41 +0000 (11:53 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Fri, 27 Jul 2018 18:53:41 +0000 (11:53 -0700)
This PR shows how the ListGauge can be used to emit exceptions using a DiagnosticsAppender.
1. DiagnosticsAppender is enabled using a config (diagnostics.appender.enable)
2. DiagnosticsAppender adds exception-events to a listgauge which is a samza container metric
2. This ListGauge uses a time-and-count based eviction policy, so that exception-events are not emitted to Kafka(SnapshotReporter) forever.

Author: Ray Matharu <rmatharu@linkedin.com>

Reviewers: Yi Pan <nickpan47@gmail.com>, Jagadish Venkatraman <jvenkatr1989@gmail.com>, Shanthoosh Venkatraman <svenkatr@linkedin.com>

Closes #543 from rayman7718/diagnosticsappender

samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/metrics/reporter/Metrics.scala
samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java [new file with mode: 0644]
samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java [new file with mode: 0644]
samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java [new file with mode: 0644]

index 75e8005..7cebcc6 100644 (file)
@@ -88,6 +88,13 @@ object JobConfig {
   // across application restarts
   val JOB_LOGGED_STORE_BASE_DIR = "job.logged.store.base.dir"
 
+  // Enables diagnostic appender for logging exception events
+  val JOB_DIAGNOSTICS_ENABLED = "job.diagnostics.enabled"
+
+  // Specify DiagnosticAppender class
+  val DIAGNOSTICS_APPENDER_CLASS = "job.diagnostics.appender.class"
+  val DEFAULT_DIAGNOSTICS_APPENDER_CLASS = "org.apache.samza.logging.log4j.SimpleDiagnosticsAppender"
+
   implicit def Config2Job(config: Config) = new JobConfig(config)
 
   /**
@@ -186,4 +193,10 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
   def getNonLoggedStorePath = getOption(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR)
 
   def getLoggedStorePath = getOption(JobConfig.JOB_LOGGED_STORE_BASE_DIR)
+
+  def getDiagnosticsEnabled = { getBoolean(JobConfig.JOB_DIAGNOSTICS_ENABLED, false) }
+
+  def getDiagnosticsAppenderClass = {
+    getOrDefault(JobConfig.DIAGNOSTICS_APPENDER_CLASS, JobConfig.DEFAULT_DIAGNOSTICS_APPENDER_CLASS)
+  }
 }
index bb1b1cf..47b73c1 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.container
 
 import java.io.File
 import java.lang.management.ManagementFactory
+import java.lang.reflect.InvocationTargetException
 import java.net.{URL, UnknownHostException}
 import java.nio.file.Path
 import java.time.Duration
@@ -53,8 +54,7 @@ import org.apache.samza.system.chooser.{DefaultChooser, MessageChooserFactory, R
 import org.apache.samza.table.TableManager
 import org.apache.samza.table.utils.SerdeUtils
 import org.apache.samza.task._
-import org.apache.samza.util.Util
-import org.apache.samza.util._
+import org.apache.samza.util.{Util, _}
 import org.apache.samza.{SamzaContainerStatus, SamzaException}
 
 import scala.collection.JavaConverters._
@@ -798,6 +798,7 @@ class SamzaContainer(
       jmxServer = new JmxServer()
 
       startMetrics
+      startDiagnostics
       startAdmins
       startOffsetManager
       startLocalityManager
@@ -934,6 +935,24 @@ class SamzaContainer(
     })
   }
 
+  def startDiagnostics {
+    if (containerContext.config.getDiagnosticsEnabled) {
+      info("Starting diagnostics.")
+
+      try {
+        val diagnosticsAppender = Class.forName(containerContext.config.getDiagnosticsAppenderClass).
+          getDeclaredConstructor(classOf[SamzaContainerMetrics]).newInstance(this.metrics);
+      }
+      catch {
+        case e@(_: ClassNotFoundException | _: InstantiationException | _: InvocationTargetException) => {
+          error("Failed to instantiate diagnostic appender", e)
+          throw new ConfigException("Failed to instantiate diagnostic appender class " +
+            containerContext.config.getDiagnosticsAppenderClass, e)
+        }
+      }
+    }
+  }
+
   def startOffsetManager {
     info("Registering task instances with offsets.")
 
index a26e666..d5cf6c6 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.samza.container
 
 import java.util
 
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent
 import org.apache.samza.metrics.{Gauge, ReadableMetricsRegistry, MetricsRegistryMap, MetricsHelper}
 
 class SamzaContainerMetrics(
@@ -48,7 +49,7 @@ class SamzaContainerMetrics(
 
   val taskStoreRestorationMetrics: util.Map[TaskName, Gauge[Long]] = new util.HashMap[TaskName, Gauge[Long]]()
 
-  val exceptions = newListGauge[String]("exceptions")
+  val exceptions = newListGauge[DiagnosticsExceptionEvent]("exceptions")
 
   def addStoreRestorationGauge(taskName: TaskName, storeName: String) {
     taskStoreRestorationMetrics.put(taskName, newGauge("%s-%s-restore-time" format(taskName.toString, storeName), -1L))
diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsExceptionEvent.java
new file mode 100644 (file)
index 0000000..d87249e
--- /dev/null
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.diagnostics;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+
+/**
+ * This class encapsulates information related to an exception event that is useful for diagnostics.
+ * It used to define container, task, and other metrics as
+ * {@link org.apache.samza.metrics.ListGauge} of type {@link DiagnosticsExceptionEvent}.
+ */
+public class DiagnosticsExceptionEvent {
+
+  private long timestamp; // the timestamp associated with this exception
+  private Class exceptionType; // store the exception type separately
+  private Throwable throwable;
+  private Map mdcMap;
+  // the MDC map associated with this exception, used to store/obtain any context associated with the throwable
+
+  public DiagnosticsExceptionEvent() {
+  }
+
+  public DiagnosticsExceptionEvent(long timestampMillis, Throwable throwable, Map mdcMap) {
+    this.throwable = throwable;
+    this.exceptionType = throwable.getClass();
+    this.timestamp = timestampMillis;
+    this.mdcMap = new HashMap(mdcMap);
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public Throwable getThrowable() {
+    return this.throwable;
+  }
+
+  public Class getExceptionType() {
+    return this.exceptionType;
+  }
+
+  public Map getMdcMap() {
+    return mdcMap;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DiagnosticsExceptionEvent that = (DiagnosticsExceptionEvent) o;
+
+    // Throwable provides no equals impl, so we assume message & stacktrace equality suffices
+    return timestamp == that.timestamp && this.exceptionType.equals(that.exceptionType) && mdcMap.equals(that.mdcMap)
+        && this.throwable.getMessage().equals(that.throwable.getMessage()) && Arrays.equals(
+        this.throwable.getStackTrace(), that.throwable.getStackTrace());
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(timestamp, exceptionType, throwable, mdcMap);
+  }
+}
\ No newline at end of file
index 8a58cd2..218157e 100644 (file)
@@ -19,9 +19,7 @@
 
 package org.apache.samza.metrics.reporter
 
-import java.util.Collections
-import java.util.HashMap
-import java.util.Map
+import java.util.{Collections, HashMap, Map}
 import scala.collection.JavaConverters._
 
 object Metrics {
@@ -52,4 +50,9 @@ class Metrics(metrics: Map[String, Map[String, Object]]) {
   def get(group: String) = immutableMetrics.get(group)
 
   def getAsMap(): Map[String, Map[String, Object]] = Collections.unmodifiableMap(immutableMetrics)
+
+  // default constructor to enable deserialization by MetricsSnapshotSerdeV2
+  def this() {
+    this(new HashMap[String, Map[String, Object]]())
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2.java
new file mode 100644 (file)
index 0000000..6ab7ce8
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MetricsSnapshotSerdeV2 implements Serde<MetricsSnapshot> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsSnapshotSerdeV2.class);
+  private final ObjectMapper objectMapper;
+
+  public MetricsSnapshotSerdeV2() {
+    objectMapper = new ObjectMapper();
+    objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.OBJECT_AND_NON_CONCRETE);
+  }
+
+  @Override
+  public MetricsSnapshot fromBytes(byte[] bytes) {
+    try {
+      return MetricsSnapshot.fromMap(
+          objectMapper.readValue(bytes, new HashMap<String, Map<String, Object>>().getClass()));
+    } catch (IOException e) {
+      LOG.info("Exception while deserializing", e);
+    }
+    return null;
+  }
+
+  @Override
+  public byte[] toBytes(MetricsSnapshot metricsSnapshot) {
+    try {
+      return objectMapper.writeValueAsString(convertMap(metricsSnapshot.getAsMap())).getBytes("UTF-8");
+    } catch (IOException e) {
+      LOG.info("Exception while serializing", e);
+    }
+    return null;
+  }
+
+  /** Metrics returns an UnmodifiableMap.
+   * Unmodifiable maps should not be serialized with type, because UnmodifiableMap cannot be deserialized.
+   * So we convert to HashMap.
+   */
+  private HashMap convertMap(Map<String, Object> map) {
+    HashMap retVal = new HashMap(map);
+    for (Map.Entry<String, Object> entry : map.entrySet()) {
+      if (entry.getValue() instanceof Map) {
+        retVal.put(entry.getKey(), convertMap((Map<String, Object>) entry.getValue()));
+      }
+    }
+    return retVal;
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerdeV2Factory.java
new file mode 100644 (file)
index 0000000..49e0770
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+
+
+public class MetricsSnapshotSerdeV2Factory implements SerdeFactory<MetricsSnapshot> {
+  @Override
+  public Serde<MetricsSnapshot> getSerde(String name, Config config) {
+    return new MetricsSnapshotSerdeV2();
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java b/samza-core/src/test/java/org/apache/samza/serializers/model/serializers/TestMetricsSnapshotSerdeV2.java
new file mode 100644 (file)
index 0000000..e4255a7
--- /dev/null
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers.model.serializers;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
+import org.apache.samza.metrics.ListGauge;
+import org.apache.samza.metrics.reporter.Metrics;
+import org.apache.samza.metrics.reporter.MetricsHeader;
+import org.apache.samza.metrics.reporter.MetricsSnapshot;
+import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestMetricsSnapshotSerdeV2 {
+
+  @Test
+  public void testSerde() {
+    MetricsHeader metricsHeader =
+        new MetricsHeader("jobName", "i001", "container 0", "source", "300.14.25.1", "1", "1", 1, 1);
+
+    ListGauge listGauge = new ListGauge<DiagnosticsExceptionEvent>("exceptions");
+    DiagnosticsExceptionEvent diagnosticsExceptionEvent1 =
+        new DiagnosticsExceptionEvent(1, new SamzaException("this is a samza exception", new RuntimeException("cause")),
+            new HashMap());
+
+    listGauge.add(diagnosticsExceptionEvent1);
+
+    String samzaContainerMetricsGroupName = "org.apache.samza.container.SamzaContainerMetrics";
+    Map<String, Map<String, Object>> metricMessage = new HashMap<>();
+    metricMessage.put(samzaContainerMetricsGroupName, new HashMap<>());
+    metricMessage.get(samzaContainerMetricsGroupName).put("exceptions", listGauge.getValues());
+    metricMessage.get(samzaContainerMetricsGroupName).put("commit-calls", 0);
+
+    MetricsSnapshot metricsSnapshot = new MetricsSnapshot(metricsHeader, new Metrics(metricMessage));
+
+    MetricsSnapshotSerdeV2 metricsSnapshotSerde = new MetricsSnapshotSerdeV2();
+    byte[] serializedBytes = metricsSnapshotSerde.toBytes(metricsSnapshot);
+
+    MetricsSnapshot deserializedMetricsSnapshot = metricsSnapshotSerde.fromBytes(serializedBytes);
+
+    Assert.assertTrue("Headers map should be equal",
+        metricsSnapshot.getHeader().getAsMap().equals(deserializedMetricsSnapshot.getHeader().getAsMap()));
+
+    Assert.assertTrue("Metrics map should be equal",
+        metricsSnapshot.getMetrics().getAsMap().equals(deserializedMetricsSnapshot.getMetrics().getAsMap()));
+  }
+}
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/SimpleDiagnosticsAppender.java
new file mode 100644 (file)
index 0000000..31f0d47
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.logging.log4j;
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.diagnostics.DiagnosticsExceptionEvent;
+import org.apache.samza.metrics.ListGauge;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Provides an in-memory appender that parses LoggingEvents to filter events relevant to diagnostics.
+ * Currently, filters exception related events and update an exception metric ({@link ListGauge}) in
+ * {@link SamzaContainerMetrics}.
+ *
+ * When used inconjunction with {@link org.apache.samza.metrics.reporter.MetricsSnapshotReporter} provides a
+ * stream of diagnostics-related events.
+ */
+public class SimpleDiagnosticsAppender extends AppenderSkeleton {
+  private static final Logger LOG = LoggerFactory.getLogger(SimpleDiagnosticsAppender.class);
+
+  // simple object to synchronize root logger attachment
+  private static final Object SYNCHRONIZATION_OBJECT = new Object();
+  protected final ListGauge<DiagnosticsExceptionEvent> samzaContainerExceptionMetric;
+
+  /**
+   * A simple log4j1.2.* appender, which attaches itself to the root logger.
+   * Attachment to the root logger is thread safe.
+   */
+  public SimpleDiagnosticsAppender(SamzaContainerMetrics samzaContainerMetrics) {
+    this.samzaContainerExceptionMetric = samzaContainerMetrics.exceptions();
+    this.setName(SimpleDiagnosticsAppender.class.getName());
+
+    synchronized (SYNCHRONIZATION_OBJECT) {
+      this.attachAppenderToRootLogger();
+    }
+  }
+
+  private void attachAppenderToRootLogger() {
+    // ensure appender is attached only once per JVM (regardless of #containers)
+    if (org.apache.log4j.Logger.getRootLogger().getAppender(SimpleDiagnosticsAppender.class.getName()) == null) {
+      LOG.info("Attaching diagnostics appender to root logger");
+      org.apache.log4j.Logger.getRootLogger().addAppender(this);
+    }
+  }
+
+  @Override
+  protected void append(LoggingEvent loggingEvent) {
+
+    try {
+      // if an event with a non-null throwable is received => exception event
+      if (loggingEvent.getThrowableInformation() != null) {
+        DiagnosticsExceptionEvent diagnosticsExceptionEvent =
+            new DiagnosticsExceptionEvent(loggingEvent.timeStamp, loggingEvent.getThrowableInformation().getThrowable(),
+                loggingEvent.getProperties());
+
+        samzaContainerExceptionMetric.add(diagnosticsExceptionEvent);
+        LOG.debug("Received DiagnosticsExceptionEvent " + diagnosticsExceptionEvent);
+      } else {
+        LOG.debug("Received non-exception event with message " + loggingEvent.getMessage());
+      }
+    } catch (Exception e) {
+      // blanket catch of all exceptions so as to not impact any job
+      LOG.error("Exception in logging event parsing", e);
+    }
+  }
+
+  @Override
+  public void close() {
+    // Do nothing.
+  }
+
+  /**
+   * Returns false since this appender requires no layout.
+   * @return false
+   */
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+}