SAMZA-1161: Adding metrics into LocalStoreMonitor.
authorShanthoosh Venkataraman <svenkataraman@linkedin.com>
Wed, 29 Mar 2017 21:07:34 +0000 (14:07 -0700)
committerJacob Maes <jmaes@linkedin.com>
Wed, 29 Mar 2017 21:07:34 +0000 (14:07 -0700)
Rocksdb LocalStoreMonitor is responsible for clearing up unused local task partition stores. Metrics are required to understand the behavior of this monitor in production(especially when it's clearing up unused rocksdb local stores).

The following two metrics will be emitted from this monitor:
a) Total disk space cleared in bytes.
b) Total number of rocksdb task partition stores cleared.

Author: Shanthoosh Venkataraman <svenkataraman@linkedin.com>

Reviewers: Jacob Maes <jmaes@linkedin.com>

Closes #95 from shanthoosh/metrics_into_local_store_monitor

samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorConfig.java
samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorFactory.java
samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorMetrics.java [new file with mode: 0644]
samza-rest/src/test/java/org/apache/samza/monitor/TestLocalStoreMonitor.java

index f1e333c..2c4ac95 100644 (file)
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
 import org.apache.commons.io.FileUtils;
 import org.apache.samza.container.TaskName;
-import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.rest.model.JobStatus;
 import org.apache.samza.rest.model.Task;
 import org.apache.samza.rest.proxy.job.JobInstance;
@@ -56,15 +55,16 @@ public class LocalStoreMonitor implements Monitor {
 
   private final LocalStoreMonitorConfig config;
 
-  // MetricsRegistry should be used in the future to send metrics from this monitor.
-  // Metrics from the monitor is a way to know if the monitor is alive.
+  private final LocalStoreMonitorMetrics localStoreMonitorMetrics;
+
   public LocalStoreMonitor(LocalStoreMonitorConfig config,
-                           MetricsRegistry metricsRegistry,
+                           LocalStoreMonitorMetrics localStoreMonitorMetrics,
                            JobsClient jobsClient) {
     Preconditions.checkState(!Strings.isNullOrEmpty(config.getLocalStoreBaseDir()),
                              String.format("%s is not set in config.", LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR));
     this.config = config;
     this.jobsClient = jobsClient;
+    this.localStoreMonitorMetrics = localStoreMonitorMetrics;
   }
 
   /**
@@ -81,8 +81,6 @@ public class LocalStoreMonitor implements Monitor {
     for (JobInstance jobInstance : getHostAffinityEnabledJobs(localStoreDir)) {
       File jobDir = new File(localStoreDir,
                              String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
-      Preconditions.checkState(jobDir.exists(), "JobDir is null");
-      Preconditions.checkNotNull(jobDir , "JobDir is null");
       JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
       for (Task task : jobsClient.getTasks(jobInstance)) {
         for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
@@ -146,7 +144,10 @@ public class LocalStoreMonitor implements Monitor {
     File offsetFile = new File(taskStoreDir, OFFSET_FILE_NAME);
     if (!offsetFile.exists()) {
       LOG.info("Deleting the task store : {}, since it has no offset file.", taskStorePath);
+      long taskStoreSizeInBytes = taskStoreDir.getTotalSpace();
       FileUtils.deleteDirectory(taskStoreDir);
+      localStoreMonitorMetrics.diskSpaceFreedInBytes.inc(taskStoreSizeInBytes);
+      localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
       LOG.info("Deleting the offset file from the store : {}, since the last modified timestamp : {} "
                    + "of the offset file is older than config file ttl : {}.",
index 7601dd8..5e50f7d 100644 (file)
@@ -33,14 +33,14 @@ public class LocalStoreMonitorConfig extends MapConfig {
   /**
    * Defines the local store directory of the job.
    */
-  static final String CONFIG_LOCAL_STORE_DIR = "jobs.local.store.dir";
+  static final String CONFIG_LOCAL_STORE_DIR = "job.local.store.dir";
 
   /**
    * Defines the ttl of the offset file in milliseconds.
    * This must not be larger than delete.retention.ms(slightly lower is better).
    * For instance, if the delete.retention.ms is 24 hrs, this should be set to 23.5 hrs.
    */
-  private static final String CONFIG_OFFSET_FILE_TTL = "jobs.offset.ttl.ms";
+  private static final String CONFIG_OFFSET_FILE_TTL = "job.offset.ttl.ms";
 
   /**
    * Defines the comma separated list of job status servers of the form
index bc09e65..5e81a79 100644 (file)
@@ -28,7 +28,8 @@ public class LocalStoreMonitorFactory implements MonitorFactory {
   @Override
   public Monitor getMonitorInstance(String monitorName, MonitorConfig config, MetricsRegistry metricsRegistry) throws Exception {
     LocalStoreMonitorConfig monitorConfig = new LocalStoreMonitorConfig(config);
+    LocalStoreMonitorMetrics localStoreMonitorMetrics = new LocalStoreMonitorMetrics(String.format("%s-", monitorName), metricsRegistry);
     JobsClient jobsClient = new JobsClient(monitorConfig.getJobStatusServers());
-    return new LocalStoreMonitor(monitorConfig, metricsRegistry, jobsClient);
+    return new LocalStoreMonitor(monitorConfig, localStoreMonitorMetrics, jobsClient);
   }
 }
diff --git a/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorMetrics.java b/samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitorMetrics.java
new file mode 100644 (file)
index 0000000..204f3dd
--- /dev/null
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.monitor;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+/**
+ * Contains all the metrics published by {@link LocalStoreMonitor}.
+ */
+public class LocalStoreMonitorMetrics extends MetricsBase {
+
+  /** Total number of task partition stores deleted by the LocalStoreMonitor. */
+  public final Counter noOfDeletedTaskPartitionStores;
+
+  /** Total disk space cleared by the LocalStoreMonitor. */
+  public final Counter diskSpaceFreedInBytes;
+
+  public LocalStoreMonitorMetrics(String prefix, MetricsRegistry registry) {
+    super(prefix, registry);
+    diskSpaceFreedInBytes = newCounter("diskSpaceFreedInBytes");
+    noOfDeletedTaskPartitionStores = newCounter("noOfDeletedTaskPartitionStores");
+  }
+}
index 0e9b866..bd666ad 100644 (file)
@@ -33,6 +33,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertTrue;
 
 public class TestLocalStoreMonitor {
@@ -50,10 +51,15 @@ public class TestLocalStoreMonitor {
   // Create mock for jobs client.
   private JobsClient jobsClientMock = Mockito.mock(JobsClient.class);
 
+  private LocalStoreMonitorMetrics localStoreMonitorMetrics;
+
+  private long taskStoreSize;
+
   @Before
   public void setUp() throws Exception {
     // Make scaffold directories for testing.
     FileUtils.forceMkdir(taskStoreDir);
+    taskStoreSize = taskStoreDir.getTotalSpace();
 
     // Set default return values for methods.
     Mockito.when(jobsClientMock.getJobStatus(Mockito.any()))
@@ -63,9 +69,11 @@ public class TestLocalStoreMonitor {
     Mockito.when(jobsClientMock.getTasks(Mockito.any()))
            .thenReturn(ImmutableList.of(task));
 
+    localStoreMonitorMetrics = new LocalStoreMonitorMetrics("TestMonitorName", new NoOpMetricsRegistry());
+
     // Initialize the local store monitor with mock and config
     localStoreMonitor = new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(config)),
-                                              new NoOpMetricsRegistry(),
+                                              localStoreMonitorMetrics,
                                               jobsClientMock);
   }
 
@@ -79,6 +87,8 @@ public class TestLocalStoreMonitor {
   public void shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile() throws Exception {
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
+    assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
+    assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
   }
 
   @Test
@@ -88,6 +98,7 @@ public class TestLocalStoreMonitor {
     offsetFile.setLastModified(0);
     localStoreMonitor.monitor();
     assertTrue("Offset file should not exist.", !offsetFile.exists());
+    assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
   }
 
   @Test
@@ -96,8 +107,11 @@ public class TestLocalStoreMonitor {
     FileUtils.forceMkdir(inActiveStoreDir);
     File inActiveTaskDir = new File(inActiveStoreDir, "test-task");
     FileUtils.forceMkdir(inActiveTaskDir);
+    long inActiveTaskDirSize = inActiveTaskDir.getTotalSpace();
     localStoreMonitor.monitor();
     assertTrue("Inactive task store directory should not exist.", !inActiveTaskDir.exists());
+    assertEquals(taskStoreSize + inActiveTaskDirSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
+    assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
   }
 
   @Test
@@ -105,6 +119,7 @@ public class TestLocalStoreMonitor {
     File offsetFile = createOffsetFile(taskStoreDir);
     localStoreMonitor.monitor();
     assertTrue("Offset file should exist.", offsetFile.exists());
+    assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
   }
 
   @Test
@@ -114,6 +129,7 @@ public class TestLocalStoreMonitor {
     File offsetFile = createOffsetFile(taskStoreDir);
     localStoreMonitor.monitor();
     assertTrue("Offset file should exist.", offsetFile.exists());
+    assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
   }
 
   @Test
@@ -124,6 +140,8 @@ public class TestLocalStoreMonitor {
            .thenReturn(ImmutableList.of(task));
     localStoreMonitor.monitor();
     assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
+    assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
+    assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
   }
 
   private static File createOffsetFile(File taskStoreDir) throws Exception {