TEZ-3960. Better error handling in proto history logger and add doAs support. (harishjp)
authorHarish JP <harishjp@gmail.com>
Fri, 29 Jun 2018 17:01:15 +0000 (22:31 +0530)
committerHarish JP <harishjp@gmail.com>
Fri, 29 Jun 2018 17:01:38 +0000 (22:31 +0530)
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DagManifesFileScanner.java
tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/DatePartitionedLogger.java
tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageReader.java
tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoMessageWriter.java
tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestDagManifestFileScanner.java
tez-plugins/tez-protobuf-history-plugin/src/test/java/org/apache/tez/dag/history/logging/proto/TestProtoHistoryLoggingService.java

index 50b17b9..43014a4 100644 (file)
@@ -1474,6 +1474,17 @@ public class TezConfiguration extends Configuration {
   public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
 
   /**
+   * Long value. The amount of time in seconds to wait to ensure all events for a day is synced
+   * to disk. This should be maximum time variation b/w machines + maximum time to sync file
+   * content and metadata.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="boolean")
+  public static final String TEZ_HISTORY_LOGGING_PROTO_DOAS =
+      TEZ_PREFIX + "history.logging.proto-doas";
+  public static final boolean TEZ_HISTORY_LOGGING_PROTO_DOAS_DEFAULT = false;
+
+  /**
    * Int value. Time, in milliseconds, to wait while flushing YARN ATS data during shutdown.
    * Expert level setting.
    */
index c8ea02f..697083c 100644 (file)
@@ -19,58 +19,75 @@ package org.apache.tez.dag.history.logging.proto;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.history.logging.proto.HistoryLoggerProtos.ManifestEntryProto;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Helper class to scan all the dag manifest files to get manifest entries.
+ * Helper class to scan all the dag manifest files to get manifest entries. This class is
+ * not thread safe.
  */
 public class DagManifesFileScanner implements Closeable {
-  private static final int OFFSET_VERSION = 1;
+  private static final Logger LOG = LoggerFactory.getLogger(DagManifesFileScanner.class);
+  private static final int SCANNER_OFFSET_VERSION = 2;
+  private static final int MAX_RETRY = 3;
 
   private final ObjectMapper mapper = new ObjectMapper();
   private final DatePartitionedLogger<ManifestEntryProto> manifestLogger;
   private final long syncTime;
+  private final boolean withDoas;
 
   private String scanDir;
   private Map<String, Long> offsets;
-  private List<Path> newFiles;
+  private Map<String, Integer> retryCount;
+  private List<FileStatus> newFiles;
 
   private ProtoMessageReader<ManifestEntryProto> reader;
+  private String currentFilePath;
 
   public DagManifesFileScanner(DatePartitionedLogger<ManifestEntryProto> manifestLogger) {
     this.manifestLogger = manifestLogger;
     this.syncTime = manifestLogger.getConfig().getLong(
         TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS,
         TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT);
+    this.withDoas = manifestLogger.getConfig().getBoolean(
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS,
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_DOAS_DEFAULT);
     this.setOffset(LocalDate.ofEpochDay(0));
   }
 
+  // Update the offset version and checks below to ensure correct versions are supported.
   // All public to simplify json conversion.
   public static class DagManifestOffset {
     public int version;
     public String scanDir;
     public Map<String, Long> offsets;
+    public Map<String, Integer> retryCount;
   }
 
   public void setOffset(String offset) {
     try {
       DagManifestOffset dagOffset = mapper.readValue(offset, DagManifestOffset.class);
-      if (dagOffset.version != OFFSET_VERSION) {
+      if (dagOffset.version > SCANNER_OFFSET_VERSION) {
         throw new IllegalArgumentException("Version mismatch: " + dagOffset.version);
       }
       this.scanDir = dagOffset.scanDir;
-      this.offsets = dagOffset.offsets;
+      this.offsets = dagOffset.offsets == null ? new HashMap<>() : dagOffset.offsets;
+      this.retryCount = dagOffset.retryCount == null ? new HashMap<>() : dagOffset.retryCount;
       this.newFiles = new ArrayList<>();
     } catch (IOException e) {
       throw new IllegalArgumentException("Invalid offset", e);
@@ -80,15 +97,17 @@ public class DagManifesFileScanner implements Closeable {
   public void setOffset(LocalDate date) {
     this.scanDir = manifestLogger.getDirForDate(date);
     this.offsets = new HashMap<>();
+    this.retryCount = new HashMap<>();
     this.newFiles = new ArrayList<>();
   }
 
   public String getOffset() {
     try {
       DagManifestOffset offset = new DagManifestOffset();
-      offset.version = OFFSET_VERSION;
+      offset.version = SCANNER_OFFSET_VERSION;
       offset.scanDir = scanDir;
       offset.offsets = offsets;
+      offset.retryCount = retryCount;
       return mapper.writeValueAsString(offset);
     } catch (IOException e) {
       throw new RuntimeException("Unexpected exception while converting to json.", e);
@@ -98,17 +117,26 @@ public class DagManifesFileScanner implements Closeable {
   public ManifestEntryProto getNext() throws IOException {
     while (true) {
       if (reader != null) {
-        ManifestEntryProto evt = reader.readEvent();
+        ManifestEntryProto evt = null;
+        try {
+          evt = reader.readEvent();
+          retryCount.remove(currentFilePath);
+        } catch (IOException e) {
+          LOG.error("Error trying to read event from file: {}", currentFilePath, e);
+          incrementError(currentFilePath);
+        }
         if (evt != null) {
           offsets.put(reader.getFilePath().getName(), reader.getOffset());
           return evt;
         } else {
           IOUtils.closeQuietly(reader);
           reader = null;
+          currentFilePath = null;
         }
       }
       if (!newFiles.isEmpty()) {
-        this.reader = manifestLogger.getReader(newFiles.remove(0));
+        this.reader = getNextReader();
+        this.currentFilePath = reader != null ? reader.getFilePath().toString() : null;
       } else {
         if (!loadMore()) {
           return null;
@@ -117,6 +145,32 @@ public class DagManifesFileScanner implements Closeable {
     }
   }
 
+  private void incrementError(String path) {
+    int count = retryCount.getOrDefault(path, 0);
+    retryCount.put(path, count + 1);
+  }
+
+  private ProtoMessageReader<ManifestEntryProto> getNextReader() throws IOException {
+    FileStatus status = newFiles.remove(0);
+    PrivilegedAction<ProtoMessageReader<ManifestEntryProto>> action = () -> {
+      try {
+        return manifestLogger.getReader(status.getPath());
+      } catch (IOException e) {
+        String path = status.getPath().toString();
+        LOG.error("Error trying to open file: {}", path, e);
+        incrementError(path);
+        return null;
+      }
+    };
+    if (withDoas) {
+      UserGroupInformation proxyUser = UserGroupInformation.createProxyUser(
+          status.getOwner(), UserGroupInformation.getCurrentUser());
+      return proxyUser.doAs(action);
+    } else {
+      return action.run();
+    }
+  }
+
   @Override
   public void close() throws IOException {
     if (reader != null) {
@@ -125,15 +179,35 @@ public class DagManifesFileScanner implements Closeable {
     }
   }
 
-  private boolean loadMore() throws IOException {
+  private void filterErrors(List<FileStatus> files) {
+    Iterator<FileStatus> iter = files.iterator();
+    while (iter.hasNext()) {
+      FileStatus status = iter.next();
+      String path = status.getPath().toString();
+      if (retryCount.getOrDefault(path, 0) > MAX_RETRY) {
+        LOG.warn("Removing file {}, too many errors", path);
+        iter.remove();
+      }
+    }
+  }
+
+  private void loadNewFiles(String todayDir) throws IOException {
     newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets);
+    if (!scanDir.equals(todayDir)) {
+      filterErrors(newFiles);
+    }
+  }
+
+  private boolean loadMore() throws IOException {
+    LocalDateTime now = manifestLogger.getNow();
+    LocalDate today = now.toLocalDate();
+    String todayDir = manifestLogger.getDirForDate(today);
+    loadNewFiles(todayDir);
     while (newFiles.isEmpty()) {
-      LocalDateTime utcNow = manifestLogger.getNow();
-      if (utcNow.getHour() * 3600 + utcNow.getMinute() * 60 + utcNow.getSecond() < syncTime) {
+      if (now.getHour() * 3600 + now.getMinute() * 60 + now.getSecond() < syncTime) {
         // We are in the delay window for today, do not advance date if we are moving from
         // yesterday.
-        String yesterDir = manifestLogger.getDirForDate(utcNow.toLocalDate().minusDays(1));
-        if (yesterDir.equals(scanDir)) {
+        if (scanDir.equals(manifestLogger.getDirForDate(today.minusDays(1)))) {
           return false;
         }
       }
@@ -143,7 +217,8 @@ public class DagManifesFileScanner implements Closeable {
       }
       scanDir = nextDir;
       offsets = new HashMap<>();
-      newFiles = manifestLogger.scanForChangedFiles(scanDir, offsets);
+      retryCount = new HashMap<>();
+      loadNewFiles(todayDir);
     }
     return true;
   }
index 8f89b2e..4ac64c6 100644 (file)
@@ -33,6 +33,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.yarn.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
@@ -43,27 +45,40 @@ import com.google.protobuf.Parser;
  * @param <T> The proto message type.
  */
 public class DatePartitionedLogger<T extends MessageLite> {
+  private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
   // Everyone has permission to write, but with sticky set so that delete is restricted.
   // This is required, since the path is same for all users and everyone writes into it.
   private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);
 
+  // Since the directories have broad permissions restrict the file read access.
+  private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);
+
   private final Parser<T> parser;
   private final Path basePath;
   private final Configuration conf;
   private final Clock clock;
-  private final FileSystem fileSystem;
 
   public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
       throws IOException {
-    this.conf = conf;
+    this.conf = new Configuration(conf);
     this.clock = clock;
     this.parser = parser;
-    this.fileSystem = baseDir.getFileSystem(conf);
-    if (!fileSystem.exists(baseDir)) {
-      fileSystem.mkdirs(baseDir);
-      fileSystem.setPermission(baseDir, DIR_PERMISSION);
+    createDirIfNotExists(baseDir);
+    this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
+    FsPermission.setUMask(this.conf, FILE_UMASK);
+  }
+
+  private void createDirIfNotExists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    try {
+      if (!fileSystem.exists(path)) {
+        fileSystem.mkdirs(path);
+        fileSystem.setPermission(path, DIR_PERMISSION);
+      }
+    } catch (IOException e) {
+      // Ignore this exception, if there is a problem it'll fail when trying to read or write.
+      LOG.warn("Error while trying to set permission: ", e);
     }
-    this.basePath = fileSystem.resolvePath(baseDir);
   }
 
   /**
@@ -86,13 +101,14 @@ public class DatePartitionedLogger<T extends MessageLite> {
    */
   public Path getPathForDate(LocalDate date, String fileName) throws IOException {
     Path path = new Path(basePath, getDirForDate(date));
-    if (!fileSystem.exists(path)) {
-      fileSystem.mkdirs(path);
-      fileSystem.setPermission(path, DIR_PERMISSION);
-    }
+    createDirIfNotExists(path);
     return new Path(path, fileName);
   }
 
+  public Path getPathForSubdir(String dirName, String fileName) {
+    return new Path(new Path(basePath, dirName), fileName);
+  }
+
   /**
    * Extract the date from the directory name, this should be a directory created by this class.
    */
@@ -116,6 +132,7 @@ public class DatePartitionedLogger<T extends MessageLite> {
   public String getNextDirectory(String currentDir) throws IOException {
     // Fast check, if the next day directory exists return it.
     String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
+    FileSystem fileSystem = basePath.getFileSystem(conf);
     if (fileSystem.exists(new Path(basePath, nextDate))) {
       return nextDate;
     }
@@ -135,10 +152,11 @@ public class DatePartitionedLogger<T extends MessageLite> {
    * Returns new or changed files in the given directory. The offsets are used to find
    * changed files.
    */
-  public List<Path> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
+  public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
       throws IOException {
     Path dirPath = new Path(basePath, subDir);
-    List<Path> newFiles = new ArrayList<>();
+    FileSystem fileSystem = basePath.getFileSystem(conf);
+    List<FileStatus> newFiles = new ArrayList<>();
     if (!fileSystem.exists(dirPath)) {
       return newFiles;
     }
@@ -147,7 +165,7 @@ public class DatePartitionedLogger<T extends MessageLite> {
       Long offset = currentOffsets.get(fileName);
       // If the offset was never added or offset < fileSize.
       if (offset == null || offset < status.getLen()) {
-        newFiles.add(new Path(dirPath, fileName));
+        newFiles.add(status);
       }
     }
     return newFiles;
index 60cbda5..206b1c1 100644 (file)
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.history.logging.proto;
 
 import java.io.IOException;
+import java.time.LocalDate;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,13 +50,15 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
   private boolean loggingDisabled = false;
 
   private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<DAGHistoryEvent>(10000);
+      new LinkedBlockingQueue<>(10000);
   private Thread eventHandlingThread;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
 
   private TezProtoLoggers loggers;
   private ProtoMessageWriter<HistoryEventProto> appEventsWriter;
   private ProtoMessageWriter<HistoryEventProto> dagEventsWriter;
+  private ProtoMessageWriter<ManifestEntryProto> manifestEventsWriter;
+  private LocalDate manifestDate;
   private TezDAGID currentDagId;
   private long dagSubmittedEventOffset = -1;
 
@@ -101,6 +104,7 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
     eventHandlingThread.join();
     IOUtils.closeQuietly(appEventsWriter);
     IOUtils.closeQuietly(dagEventsWriter);
+    IOUtils.closeQuietly(manifestEventsWriter);
     LOG.info("Stopped ProtoHistoryLoggingService");
   }
 
@@ -161,7 +165,8 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
       } else if (type == HistoryEventType.DAG_SUBMITTED) {
         finishCurrentDag(null);
         currentDagId = dagId;
-        dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString());
+        dagEventsWriter = loggers.getDagEventsLogger().getWriter(dagId.toString()
+            + "_" + appContext.getApplicationAttemptId().getAttemptId());
         dagSubmittedEventOffset = dagEventsWriter.getOffset();
         dagEventsWriter.writeProto(converter.convert(historyEvent));
       } else if (dagEventsWriter != null) {
@@ -174,16 +179,21 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
     if (dagEventsWriter == null) {
       return;
     }
-    ProtoMessageWriter<ManifestEntryProto> writer = null;
     try {
       long finishEventOffset = -1;
       if (event != null) {
         finishEventOffset = dagEventsWriter.getOffset();
         dagEventsWriter.writeProto(converter.convert(event));
       }
-      // Do not cache this writer, it should be created at the time of writing
-      writer = loggers.getManifestEventsLogger()
-          .getWriter(appContext.getApplicationAttemptId().toString());
+      DatePartitionedLogger<ManifestEntryProto> manifestLogger = loggers.getManifestEventsLogger();
+      if (manifestDate == null || !manifestDate.equals(manifestLogger.getNow().toLocalDate())) {
+        // The day has changed write to a new file.
+        IOUtils.closeQuietly(manifestEventsWriter);
+        manifestEventsWriter = manifestLogger.getWriter(
+            appContext.getApplicationAttemptId().toString());
+        manifestDate = manifestLogger.getDateFromDir(
+            manifestEventsWriter.getPath().getParent().getName());
+      }
       ManifestEntryProto.Builder entry = ManifestEntryProto.newBuilder()
           .setDagId(currentDagId.toString())
           .setAppId(currentDagId.getApplicationId().toString())
@@ -196,13 +206,13 @@ public class ProtoHistoryLoggingService extends HistoryLoggingService {
       if (event != null) {
         entry.setDagId(event.getDagID().toString());
       }
-      writer.writeProto(entry.build());
+      manifestEventsWriter.writeProto(entry.build());
+      manifestEventsWriter.hflush();
       appEventsWriter.hflush();
     } finally {
       // On an error, cleanup everything this will ensure, we do not use one dag's writer
       // into another dag.
       IOUtils.closeQuietly(dagEventsWriter);
-      IOUtils.closeQuietly(writer);
       dagEventsWriter = null;
       currentDagId = null;
       dagSubmittedEventOffset = -1;
index e5f5e6b..d736fea 100644 (file)
@@ -24,19 +24,22 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageReader<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Reader reader;
+  private final Reader reader;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageReader(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
     this.filePath = filePath;
-    this.reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(filePath));
+    // The writer does not flush the length during hflush. Using length options lets us read
+    // past length in the FileStatus but it will throw EOFException during a read instead
+    // of returning null.
+    this.reader = new Reader(conf, Reader.file(filePath), Reader.length(Long.MAX_VALUE));
     this.writable = new ProtoMessageWritable<>(parser);
   }
 
index ca9ba61..869b603 100644 (file)
@@ -26,24 +26,24 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Writer;
 
 import com.google.protobuf.MessageLite;
 import com.google.protobuf.Parser;
 
 public class ProtoMessageWriter<T extends MessageLite> implements Closeable {
   private final Path filePath;
-  private final SequenceFile.Writer writer;
+  private final Writer writer;
   private final ProtoMessageWritable<T> writable;
 
   ProtoMessageWriter(Configuration conf, Path filePath, Parser<T> parser) throws IOException {
     this.filePath = filePath;
     this.writer = SequenceFile.createWriter(
         conf,
-        SequenceFile.Writer.file(filePath),
-        SequenceFile.Writer.keyClass(NullWritable.class),
-        SequenceFile.Writer.valueClass(ProtoMessageWritable.class),
-        SequenceFile.Writer.appendIfExists(true),
-        SequenceFile.Writer.compression(CompressionType.RECORD));
+        Writer.file(filePath),
+        Writer.keyClass(NullWritable.class),
+        Writer.valueClass(ProtoMessageWritable.class),
+        Writer.compression(CompressionType.RECORD));
     this.writable = new ProtoMessageWritable<>(parser);
   }
 
index fcaa315..4950522 100644 (file)
@@ -20,6 +20,9 @@ package org.apache.tez.dag.history.logging.proto;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -43,12 +46,14 @@ public class TestDagManifestFileScanner {
     clock = new MockClock();
     Configuration conf = new Configuration(false);
     conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR, basePath);
+    // LocalFileSystem does not implement truncate.
+    conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
     TezProtoLoggers loggers = new TezProtoLoggers();
     loggers.setup(conf, clock);
     manifestLogger = loggers.getManifestEventsLogger();
   }
 
-  @Test
+  @Test(timeout=5000)
   public void testNormal() throws Exception {
     clock.setTime(0); // 0th day.
     createManifestEvents(0, 8);
@@ -85,6 +90,37 @@ public class TestDagManifestFileScanner {
     // Not able to test append since the LocalFileSystem does not implement append.
   }
 
+  private Path deleteFilePath = null;
+  @Test(timeout=5000)
+  public void testError() throws Exception {
+    clock.setTime(0); // 0th day.
+    createManifestEvents(0, 4);
+    corruptFiles();
+    clock.setTime((24 * 60 * 60 + 1) * 1000); // 1 day 1 sec.
+    createManifestEvents(24 * 3600, 1);
+
+    DagManifesFileScanner scanner = new DagManifesFileScanner(manifestLogger);
+    Assert.assertNotNull(scanner.getNext());
+    deleteFilePath.getFileSystem(manifestLogger.getConfig()).delete(deleteFilePath, false);
+    // 4 files - 1 file deleted - 1 truncated - 1 corrupted => 1 remains.
+    Assert.assertNull(scanner.getNext());
+
+    // Save offset for later use.
+    String offset = scanner.getOffset();
+
+    // Move time outside the window, it should skip files with error and give more data for
+    // next day.
+    clock.setTime((24 * 60 * 60 + 61) * 1000); // 1 day 61 sec.
+    Assert.assertNotNull(scanner.getNext());
+    Assert.assertNull(scanner.getNext());
+
+    // Reset the offset
+    scanner.setOffset(offset);
+    Assert.assertNotNull(scanner.getNext());
+    Assert.assertNull(scanner.getNext());
+    scanner.close();
+  }
+
   private void createManifestEvents(long time, int numEvents) throws IOException {
     for (int i = 0; i < numEvents; ++i) {
       ApplicationId appId = ApplicationId.newInstance(1000l, i);
@@ -103,6 +139,33 @@ public class TestDagManifestFileScanner {
     }
   }
 
+  private void corruptFiles() throws IOException {
+    int op = 0;
+    Configuration conf = manifestLogger.getConfig();
+    Path base = new Path(
+        conf.get(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_BASE_DIR) + "/dag_meta");
+    FileSystem fs = base.getFileSystem(conf);
+    for (FileStatus status : fs.listStatus(base)) {
+      if (status.isDirectory()) {
+        for (FileStatus file : fs.listStatus(status.getPath())) {
+          if (!file.getPath().getName().startsWith("application_")) {
+            continue;
+          }
+          switch (op) {
+            case 0:
+            case 1:
+              fs.truncate(file.getPath(), op == 1 ? 0 : file.getLen() - 20);
+              break;
+            case 3:
+              deleteFilePath = file.getPath();
+              break;
+          }
+          op++;
+        }
+      }
+    }
+  }
+
   private static class MockClock implements Clock {
     private long time = 0;
 
index 4bd5d4e..bc79b07 100644 (file)
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.logging.proto;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.time.LocalDate;
 import java.util.ArrayList;
@@ -86,14 +87,18 @@ public class TestProtoHistoryLoggingService {
 
     // Verify dag events are logged.
     DatePartitionedLogger<HistoryEventProto> dagLogger = loggers.getDagEventsLogger();
-    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString());
+    Path dagFilePath = dagLogger.getPathForDate(LocalDate.ofEpochDay(0), dagId.toString() + "_" + 1);
     ProtoMessageReader<HistoryEventProto> reader = dagLogger.getReader(dagFilePath);
     HistoryEventProto evt = reader.readEvent();
     int ind = 1;
     while (evt != null) {
       Assert.assertEquals(protos.get(ind), evt);
       ind++;
-      evt = reader.readEvent();
+      try {
+        evt = reader.readEvent();
+      } catch (EOFException e) {
+        evt = null;
+      }
     }
     reader.close();