SAMZA-1335: Improve logging for LocalStoreMonitor
authorJacob Maes <jmaes@linkedin.com>
Thu, 15 Jun 2017 20:40:43 +0000 (13:40 -0700)
committerJacob Maes <jmaes@linkedin.com>
Thu, 15 Jun 2017 20:40:43 +0000 (13:40 -0700)
Author: Jacob Maes <jmaes@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshw@linkedin.com>

Closes #226 from jmakes/samza-1335

samza-rest/src/main/java/org/apache/samza/monitor/LocalStoreMonitor.java
samza-rest/src/main/java/org/apache/samza/rest/model/Task.java

index 8195491..8b25636 100644 (file)
@@ -83,9 +83,10 @@ public class LocalStoreMonitor implements Monitor {
                              String.format("%s-%s", jobInstance.getJobName(), jobInstance.getJobId()));
       try {
         JobStatus jobStatus = jobsClient.getJobStatus(jobInstance);
+        LOG.info("Job: {} has the status: {}.", jobInstance, jobStatus);
         for (Task task : jobsClient.getTasks(jobInstance)) {
+          LOG.info("Evaluating stores for task: {}", task);
           for (String storeName : jobDir.list(DirectoryFileFilter.DIRECTORY)) {
-            LOG.info("Job: {} has the running status: {} with preferred host: {}.", jobInstance, jobStatus, task.getPreferredHost());
             /**
              *  A task store is active if all of the following conditions are true:
              *  a) If the store is amongst the active stores of the task.
@@ -95,9 +96,9 @@ public class LocalStoreMonitor implements Monitor {
             if (jobStatus.hasBeenStarted()
                 && task.getStoreNames().contains(storeName)
                 && task.getPreferredHost().equals(localHostName)) {
-              LOG.info(String.format("Store %s is actively used by the task: %s.", storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s is actively used by the task: %s.", storeName, task.getTaskName()));
             } else {
-              LOG.info(String.format("Store %s not used by the task: %s.", storeName, task.getTaskName()));
+              LOG.info(String.format("Local store: %s not used by the task: %s.", storeName, task.getTaskName()));
               markSweepTaskStore(TaskStorageManager.getStorePartitionDir(jobDir, storeName, new TaskName(task.getTaskName())));
             }
           }
@@ -135,7 +136,7 @@ public class LocalStoreMonitor implements Monitor {
    * Role of this method is to garbage collect(mark-sweep) the task store.
    * @param taskStoreDir store directory of the task to perform garbage collection.
    *
-   *  This method cleans up each of the task store directory in two phases.
+   * This method cleans up each of the task store directory in two phases.
    *
    *  Phase 1:
    *  Delete the offset file in the task store if (curTime - lastModifiedTimeOfOffsetFile) > offsetTTL.
@@ -143,8 +144,13 @@ public class LocalStoreMonitor implements Monitor {
    *  Phase 2:
    *  Delete the task store directory if the offsetFile does not exist in task store directory.
    *
-   *  Time interval between the two phases is controlled by this monitor scheduling
-   *  interval in milli seconds.
+   * The separate phases are a safety precaution to prevent deleting a store that is currently being used.
+   * A running task will recreate the deleted offset file on the next commit. If a task is not running or
+   * running on a different host and gets moved to this host, it will not use a store without the offset file.
+   *
+   * Time interval between the two phases is controlled by this monitor scheduling
+   * interval in milli seconds.
+   *
    * @throws IOException if there is an exception during the clean up of the task store files.
    */
   private void markSweepTaskStore(File taskStoreDir) throws IOException {
@@ -158,7 +164,7 @@ public class LocalStoreMonitor implements Monitor {
       localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.inc();
     } else if ((CLOCK.currentTimeMillis() - offsetFile.lastModified()) >= config.getOffsetFileTTL()) {
       LOG.info("Deleting the offset file from the store: {}, since the last modified timestamp: {} "
-                   + "of the offset file is older than config file ttl: {}.",
+                   + "is older than the configured ttl: {}.",
                   taskStorePath, offsetFile.lastModified(), config.getOffsetFileTTL());
       offsetFile.delete();
     }
index 94e8370..bb3c46c 100644 (file)
@@ -134,4 +134,9 @@ public class Task {
     result = 31 * result + storeNames.hashCode();
     return result;
   }
+
+  @Override
+  public String toString() {
+    return String.format("taskName:%s container:%s preferredHost:%s stores:%s", taskName, containerId, preferredHost, storeNames.toString());
+  }
 }