TEZ-2076. Tez framework to extract/analyze data stored in ATS for specific dag (rbala...
authorRajesh Balamohan <rbalamohan@apache.org>
Tue, 19 May 2015 00:35:18 +0000 (06:05 +0530)
committerRajesh Balamohan <rbalamohan@apache.org>
Tue, 19 May 2015 00:35:18 +0000 (06:05 +0530)
20 files changed:
tez-plugins/pom.xml
tez-plugins/tez-history-parser/findbugs-exclude.xml [new file with mode: 0644]
tez-plugins/tez-history-parser/pom.xml [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java [new file with mode: 0644]
tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java [new file with mode: 0644]

index 9b2a4cb..535007a 100644 (file)
   <artifactId>tez-plugins</artifactId>
   <packaging>pom</packaging>
 
+  <modules>
+    <module>tez-history-parser</module>
+  </modules>
+
   <profiles>
     <profile>
       <id>hadoop24</id>
diff --git a/tez-plugins/tez-history-parser/findbugs-exclude.xml b/tez-plugins/tez-history-parser/findbugs-exclude.xml
new file mode 100644 (file)
index 0000000..5b11308
--- /dev/null
@@ -0,0 +1,16 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+<FindBugsFilter>
+
+</FindBugsFilter>
diff --git a/tez-plugins/tez-history-parser/pom.xml b/tez-plugins/tez-history-parser/pom.xml
new file mode 100644 (file)
index 0000000..f12e0b4
--- /dev/null
@@ -0,0 +1,190 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.8.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-history-parser</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-yarn-timeline-history</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <configuration>
+          <archive>
+            <manifest>
+              <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+          </descriptorRefs>
+          <archive>
+            <manifest>
+              <addClasspath>true</addClasspath>
+              <mainClass>org.apache.tez.history.ATSImportTool</mainClass>
+            </manifest>
+          </archive>
+        </configuration>
+        <executions>
+          <execution>
+            <id>assemble-all</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/ATSImportTool.java
new file mode 100644 (file)
index 0000000..3545cec
--- /dev/null
@@ -0,0 +1,474 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.json.impl.provider.entity.JSONRootElementProvider;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.MissingOptionException;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
+import org.apache.directory.api.util.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * <pre>
+ * Simple tool which imports ATS data pertaining to a DAG (Dag, Vertex, Task, Attempt)
+ * and creates a zip file out of it.
+ *
+ * usage:
+ *
+ * java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool
+ *
+ * OR
+ *
+ * HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar tez-history-parser-x.y.z.jar org.apache.tez.history.ATSImportTool
+ *
+ *
+ * --yarnTimelineAddress <yarnTimelineAddress>  Optional. Yarn Timeline Address(e.g http://clusterATSNode:8188)
+ * --batchSize <batchSize>       Optional. batch size for downloading data
+ * --dagId <dagId>               DagId that needs to be downloaded
+ * --downloadDir <downloadDir>   download directory where data needs to be downloaded
+ * --help                        print help
+ *
+ * </pre>
+ */
+@Evolving
+public class ATSImportTool extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ATSImportTool.class);
+
+  private static final String BATCH_SIZE = "batchSize";
+  private static final int BATCH_SIZE_DEFAULT = 100;
+
+  private static final String YARN_TIMELINE_SERVICE_ADDRESS = "yarnTimelineAddress";
+  private static final String DAG_ID = "dagId";
+  private static final String BASE_DOWNLOAD_DIR = "downloadDir";
+
+  private static final String HTTPS_SCHEME = "https://";
+  private static final String HTTP_SCHEME = "http://";
+
+  private static final String VERTEX_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String TASK_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String TASK_ATTEMPT_QUERY_STRING = "%s/%s?limit=%s&primaryFilter=%s:%s";
+  private static final String UTF8 = "UTF-8";
+
+  private static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+  private final int batchSize;
+  private final String baseUri;
+  private final String dagId;
+
+  private final File downloadDir;
+  private final File zipFile;
+  private final Client httpClient;
+
+  public ATSImportTool(String baseUri, String dagId, File baseDownloadDir, int batchSize)
+      throws TezException {
+    Preconditions.checkArgument(!Strings.isEmpty(dagId), "dagId can not be null or empty");
+    Preconditions.checkArgument(baseDownloadDir != null, "downloadDir can not be null");
+    Preconditions.checkArgument(TezDAGID.fromString(dagId) != null, "Not a valid DAG ID " + dagId);
+
+    this.baseUri = baseUri;
+    this.batchSize = batchSize;
+    this.dagId = dagId;
+
+    this.httpClient = getHttpClient();
+
+    this.downloadDir = new File(baseDownloadDir, dagId);
+    this.zipFile = new File(downloadDir, this.dagId + ".zip");
+
+    boolean result = downloadDir.mkdirs();
+    LOG.trace("Result of creating dir {}={}", downloadDir, result);
+    if (!downloadDir.exists()) {
+      throw new IllegalArgumentException("dir=" + downloadDir + " does not exist");
+    }
+
+    LOG.info("Using baseURL={}, dagId={}, batchSize={}, downloadDir={}", baseUri, dagId,
+        batchSize, downloadDir);
+  }
+
+  /**
+   * Download data from ATS for specific DAG
+   *
+   * @throws Exception
+   */
+  private void download() throws Exception {
+    FileOutputStream fos = null;
+    try {
+      fos = new FileOutputStream(zipFile, false);
+      ZipOutputStream zos = new ZipOutputStream(fos);
+      downloadData(zos);
+      IOUtils.closeQuietly(zos);
+    } catch (Exception e) {
+      LOG.error("Exception in download", e);
+      throw e;
+    } finally {
+      if (httpClient != null) {
+        httpClient.destroy();
+      }
+      IOUtils.closeQuietly(fos);
+    }
+  }
+
+  /**
+   * Download DAG data (DAG, Vertex, Task, TaskAttempts) from ATS and write to zip file
+   *
+   * @param zos
+   * @throws TezException
+   * @throws JSONException
+   * @throws IOException
+   */
+  private void downloadData(ZipOutputStream zos) throws TezException, JSONException, IOException {
+    JSONObject finalJson = new JSONObject();
+
+    //Download application details (TEZ_VERSION etc)
+    String tezAppId = "tez_" + (TezDAGID.fromString(dagId)).getApplicationId().toString();
+    String tezAppUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_APPLICATION, tezAppId);
+    JSONObject tezAppJson = getJsonRootEntity(tezAppUrl);
+    finalJson.put(Constants.APPLICATION, tezAppJson);
+
+    //Download dag
+    String dagUrl = String.format("%s/%s/%s", baseUri, Constants.TEZ_DAG_ID, dagId);
+    JSONObject dagRoot = getJsonRootEntity(dagUrl);
+    finalJson.put(Constants.DAG, dagRoot);
+
+    //Create a zip entry with dagId as its name.
+    ZipEntry zipEntry = new ZipEntry(dagId);
+    zos.putNextEntry(zipEntry);
+    //Write in formatted way
+    IOUtils.write(finalJson.toString(4), zos, UTF8);
+
+    //Download vertex
+    String vertexURL =
+        String.format(VERTEX_QUERY_STRING, baseUri,
+            Constants.TEZ_VERTEX_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(vertexURL, zos, Constants.VERTICES);
+
+    //Download task
+    String taskURL = String.format(TASK_QUERY_STRING, baseUri,
+        Constants.TEZ_TASK_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(taskURL, zos, Constants.TASKS);
+
+    //Download task attempts
+    String taskAttemptURL = String.format(TASK_ATTEMPT_QUERY_STRING, baseUri,
+        Constants.TEZ_TASK_ATTEMPT_ID, batchSize, Constants.TEZ_DAG_ID, dagId);
+    downloadJSONArrayFromATS(taskAttemptURL, zos, Constants.TASK_ATTEMPTS);
+  }
+
+  /**
+   * Download data from ATS in batches
+   *
+   * @param url
+   * @param zos
+   * @param tag
+   * @throws IOException
+   * @throws TezException
+   * @throws JSONException
+   */
+  private void downloadJSONArrayFromATS(String url, ZipOutputStream zos, String tag)
+      throws IOException, TezException, JSONException {
+
+    Preconditions.checkArgument(zos != null, "ZipOutputStream can not be null");
+
+    String baseUrl = url;
+    JSONArray entities;
+
+    long downloadedCount = 0;
+    while ((entities = getJsonRootEntity(url).optJSONArray(Constants.ENTITIES)) != null
+        && entities.length() > 0) {
+
+      int limit = (entities.length() >= batchSize) ? (entities.length() - 1) : entities.length();
+      LOG.debug("Limit={}, downloaded entities len={}", limit, entities.length());
+
+      //write downloaded part to zipfile.  This is done to avoid any memory pressure when
+      // downloading and writing 1000s of tasks.
+      ZipEntry zipEntry = new ZipEntry("part-" + System.currentTimeMillis() + ".json");
+      zos.putNextEntry(zipEntry);
+      JSONObject finalJson = new JSONObject();
+      finalJson.put(tag, entities);
+      IOUtils.write(finalJson.toString(4), zos, "UTF-8");
+      downloadedCount += entities.length();
+
+      if (entities.length() < batchSize) {
+        break;
+      }
+
+      //Set the last item in entities as the fromId
+      url = baseUrl + "&fromId="
+          + entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+
+      String firstItem = entities.getJSONObject(0).getString(Constants.ENTITY);
+      String lastItem = entities.getJSONObject(entities.length() - 1).getString(Constants.ENTITY);
+      LOG.info("Downloaded={}, First item={}, LastItem={}, new url={}", downloadedCount,
+          firstItem, lastItem, url);
+    }
+  }
+
+  private String logErrorMessage(ClientResponse response) throws IOException {
+    StringBuilder sb = new StringBuilder();
+    LOG.error("Response status={}", response.getClientResponseStatus().toString());
+    LineIterator it = null;
+    try {
+      it = IOUtils.lineIterator(response.getEntityInputStream(), UTF8);
+      while (it.hasNext()) {
+        String line = it.nextLine();
+        LOG.error(line);
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+    return sb.toString();
+  }
+
+  //For secure cluster, this should work as long as valid ticket is available in the node.
+  private JSONObject getJsonRootEntity(String url) throws TezException, IOException {
+    try {
+      WebResource wr = getHttpClient().resource(url);
+      ClientResponse response = wr.accept(MediaType.APPLICATION_JSON_TYPE)
+          .type(MediaType.APPLICATION_JSON_TYPE)
+          .get(ClientResponse.class);
+
+      if (response.getClientResponseStatus() != ClientResponse.Status.OK) {
+        // In the case of secure cluster, if there is any auth exception it sends the data back as
+        // a html page and JSON parsing could throw exceptions. Instead, get the stream contents
+        // completely and log it in case of error.
+        logErrorMessage(response);
+        throw new TezException("Failed to get response from YARN Timeline: url: " + url);
+      }
+      return response.getEntity(JSONObject.class);
+    } catch (ClientHandlerException e) {
+      throw new TezException("Error processing response from YARN Timeline. URL=" + url, e);
+    } catch (UniformInterfaceException e) {
+      throw new TezException("Error accessing content from YARN Timeline - unexpected response. "
+          + "URL=" + url, e);
+    } catch (IllegalArgumentException e) {
+      throw new TezException("Error accessing content from YARN Timeline - invalid url. URL=" + url,
+          e);
+    }
+  }
+
+  private Client getHttpClient() {
+    if (httpClient == null) {
+      ClientConfig config = new DefaultClientConfig(JSONRootElementProvider.App.class);
+      HttpURLConnectionFactory urlFactory = new PseudoAuthenticatedURLConnectionFactory();
+      return new Client(new URLConnectionClientHandler(urlFactory), config);
+    }
+    return httpClient;
+  }
+
+  static class PseudoAuthenticatedURLConnectionFactory implements HttpURLConnectionFactory {
+    @Override
+    public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+      String tokenString = (url.getQuery() == null ? "?" : "&") + "user.name=" +
+          URLEncoder.encode(UserGroupInformation.getCurrentUser().getShortUserName(), "UTF8");
+      return (HttpURLConnection) (new URL(url.toString() + tokenString)).openConnection();
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    try {
+      download();
+      return 0;
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOG.error("Error occurred when downloading data ", e);
+      return -1;
+    }
+  }
+
+  private static Options buildOptions() {
+    Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID)
+        .withDescription("DagId that needs to be downloaded").hasArg().isRequired(true).create();
+
+    Option downloadDirOption = OptionBuilder.withArgName(BASE_DOWNLOAD_DIR).withLongOpt
+        (BASE_DOWNLOAD_DIR)
+        .withDescription("Download directory where data needs to be downloaded").hasArg()
+        .isRequired(true).create();
+
+    Option atsAddressOption = OptionBuilder.withArgName(YARN_TIMELINE_SERVICE_ADDRESS).withLongOpt(
+        YARN_TIMELINE_SERVICE_ADDRESS)
+        .withDescription("Optional. ATS address (e.g http://clusterATSNode:8188)").hasArg()
+        .isRequired(false)
+        .create();
+
+    Option batchSizeOption = OptionBuilder.withArgName(BATCH_SIZE).withLongOpt(BATCH_SIZE)
+        .withDescription("Optional. batch size for downloading data").hasArg()
+        .isRequired(false)
+        .create();
+
+    Option help = OptionBuilder.withArgName("help").withLongOpt("help")
+        .withDescription("print help").isRequired(false).create();
+
+    Options opts = new Options();
+    opts.addOption(dagIdOption);
+    opts.addOption(downloadDirOption);
+    opts.addOption(atsAddressOption);
+    opts.addOption(batchSizeOption);
+    opts.addOption(help);
+    return opts;
+  }
+
+  static void printHelp(Options options) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.setWidth(240);
+    String help = LINE_SEPARATOR
+        + "java -cp tez-history-parser-x.y.z-jar-with-dependencies.jar org.apache.tez.history.ATSImportTool"
+        + LINE_SEPARATOR
+        + "OR"
+        + LINE_SEPARATOR
+        + "HADOOP_CLASSPATH=$TEZ_HOME/*:$TEZ_HOME/lib/*:$HADOOP_CLASSPATH hadoop jar "
+        + "tez-history-parser-x.y.z.jar " + ATSImportTool.class.getName()
+        + LINE_SEPARATOR;
+    formatter.printHelp(240, help, "Options", options, "", true);
+  }
+
+  static boolean hasHttpsPolicy(Configuration conf) {
+    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+    return (HttpConfig.Policy.HTTPS_ONLY == HttpConfig.Policy.fromString(yarnConf
+        .get(YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT)));
+  }
+
+  static String getBaseTimelineURL(String yarnTimelineAddress, Configuration conf)
+      throws TezException {
+    boolean isHttps = hasHttpsPolicy(conf);
+
+    if (yarnTimelineAddress == null) {
+      if (isHttps) {
+        yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS_CONF_NAME);
+      } else {
+        yarnTimelineAddress = conf.get(Constants.TIMELINE_SERVICE_WEBAPP_HTTP_ADDRESS_CONF_NAME);
+      }
+      Preconditions.checkArgument(!Strings.isEmpty(yarnTimelineAddress), "Yarn timeline address can"
+          + " not be empty. Please check configurations.");
+    } else {
+      yarnTimelineAddress = yarnTimelineAddress.trim();
+      Preconditions.checkArgument(!Strings.isEmpty(yarnTimelineAddress), "Yarn timeline address can"
+          + " not be empty. Please provide valid url with --" +
+          YARN_TIMELINE_SERVICE_ADDRESS + " option");
+    }
+
+    yarnTimelineAddress = yarnTimelineAddress.toLowerCase();
+    if (!yarnTimelineAddress.startsWith(HTTP_SCHEME)
+        && !yarnTimelineAddress.startsWith(HTTPS_SCHEME)) {
+      yarnTimelineAddress = ((isHttps) ? HTTPS_SCHEME : HTTP_SCHEME) + yarnTimelineAddress;
+    }
+
+    try {
+      yarnTimelineAddress = new URI(yarnTimelineAddress).normalize().toString().trim();
+      yarnTimelineAddress = (yarnTimelineAddress.endsWith("/")) ?
+          yarnTimelineAddress.substring(0, yarnTimelineAddress.length() - 1) :
+          yarnTimelineAddress;
+    } catch (URISyntaxException e) {
+      throw new TezException("Please provide a valid URL. url=" + yarnTimelineAddress, e);
+    }
+
+    return Joiner.on("").join(yarnTimelineAddress, Constants.RESOURCE_URI_BASE);
+  }
+
+  @VisibleForTesting
+  static int process(String[] args) {
+    Options options = buildOptions();
+    int result = -1;
+    try {
+      Configuration conf = new Configuration();
+      CommandLine cmdLine = new GnuParser().parse(options, args);
+      String dagId = cmdLine.getOptionValue(DAG_ID);
+
+      File downloadDir = new File(cmdLine.getOptionValue(BASE_DOWNLOAD_DIR));
+
+      String yarnTimelineAddress = cmdLine.getOptionValue(YARN_TIMELINE_SERVICE_ADDRESS);
+      String baseTimelineURL = getBaseTimelineURL(yarnTimelineAddress, conf);
+
+      int batchSize = (cmdLine.hasOption(BATCH_SIZE)) ?
+          (Integer.parseInt(cmdLine.getOptionValue(BATCH_SIZE))) : BATCH_SIZE_DEFAULT;
+
+      result = ToolRunner.run(conf, new ATSImportTool(baseTimelineURL, dagId,
+          downloadDir, batchSize), args);
+
+      return result;
+    } catch (MissingOptionException missingOptionException) {
+      LOG.error("Error in parsing options ", missingOptionException);
+      printHelp(options);
+    } catch (ParseException e) {
+      LOG.error("Error in parsing options ", e);
+      printHelp(options);
+    } catch (Exception e) {
+      LOG.error("Error in processing ", e);
+      throw e;
+    } finally {
+      return result;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    Utils.setupRootLogger();
+    int res = process(args);
+    System.exit(res);
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSData.java
new file mode 100644 (file)
index 0000000..f504007
--- /dev/null
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.codehaus.jettison.json.JSONException;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Main interface to pull data from ATS.
+ * <p/>
+ * It is possible that to have ATS data store in any DB (e.g LevelDB or HBase).  Depending on
+ * store, there can be multiple implementations to pull data from these stores and create the
+ * DagInfo object for analysis.
+ * <p/>
+ */
+@Evolving
+public interface ATSData {
+
+  /**
+   * Get the DAG representation for processing
+   *
+   * @param dagId
+   * @return DagInfo
+   * @throws JSONException
+   * @throws TezException
+   */
+  public DagInfo getDAGData(String dagId) throws TezException;
+
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ATSFileParser.java
new file mode 100644 (file)
index 0000000..aae20eb
--- /dev/null
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.history.parser.datamodel.BaseParser;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Simple class to parse ATS zip file of a DAG and generate the relevant in-memory structure
+ * (DagInfo) necessary for processing later.
+ */
+
+@Public
+@Evolving
+public class ATSFileParser extends BaseParser implements ATSData {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ATSFileParser.class);
+
+  private final File atsZipFile;
+
+  public ATSFileParser(File atsZipFile) throws TezException {
+    super();
+    Preconditions.checkArgument(atsZipFile.exists(), "Zipfile " + atsZipFile + " does not exist");
+    this.atsZipFile = atsZipFile;
+  }
+
+  @Override
+  public DagInfo getDAGData(String dagId) throws TezException {
+    try {
+      parseATSZipFile(atsZipFile);
+
+      linkParsedContents();
+
+      return dagInfo;
+    } catch (IOException e) {
+      LOG.error("Error in reading DAG ", e);
+      throw new TezException(e);
+    } catch (JSONException e) {
+      LOG.error("Error in parsing DAG ", e);
+      throw new TezException(e);
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
+  }
+
+  /**
+   * Parse vertices json
+   *
+   * @param verticesJson
+   * @throws JSONException
+   */
+  private void processVertices(JSONArray verticesJson) throws JSONException {
+    //Process vertex information
+    Preconditions.checkState(verticesJson != null, "Vertex json can not be null");
+    if (verticesJson != null) {
+      LOG.info("Started parsing vertex");
+      for (int i = 0; i < verticesJson.length(); i++) {
+        VertexInfo vertexInfo = VertexInfo.create(verticesJson.getJSONObject(i));
+        vertexList.add(vertexInfo);
+      }
+      LOG.info("Finished parsing vertex");
+    }
+  }
+
+  /**
+   * Parse Tasks json
+   *
+   * @param tasksJson
+   * @throws JSONException
+   */
+  private void processTasks(JSONArray tasksJson) throws JSONException {
+    //Process Task information
+    Preconditions.checkState(tasksJson != null, "Task json can not be null");
+    if (tasksJson != null) {
+      LOG.debug("Started parsing task");
+      for (int i = 0; i < tasksJson.length(); i++) {
+        TaskInfo taskInfo = TaskInfo.create(tasksJson.getJSONObject(i));
+        taskList.add(taskInfo);
+      }
+      LOG.debug("Finished parsing task");
+    }
+  }
+
+  /**
+   * Parse TaskAttempt json
+   *
+   * @param taskAttemptsJson
+   * @throws JSONException
+   */
+  private void processAttempts(JSONArray taskAttemptsJson) throws JSONException {
+    //Process TaskAttempt information
+    Preconditions.checkState(taskAttemptsJson != null, "Attempts json can not be null");
+    if (taskAttemptsJson != null) {
+      LOG.debug("Started parsing task attempts");
+      for (int i = 0; i < taskAttemptsJson.length(); i++) {
+        TaskAttemptInfo attemptInfo = TaskAttemptInfo.create(taskAttemptsJson.getJSONObject(i));
+        attemptList.add(attemptInfo);
+      }
+      LOG.debug("Finished parsing task attempts");
+    }
+  }
+
+  /**
+   * Parse TezApplication json
+   *
+   * @param tezApplicationJson
+   * @throws JSONException
+   */
+  private void processApplication(JSONObject tezApplicationJson) throws JSONException {
+    if (tezApplicationJson != null) {
+      LOG.debug("Started parsing tez application");
+      JSONObject otherInfoNode = tezApplicationJson.optJSONObject(Constants.OTHER_INFO);
+      if (otherInfoNode != null) {
+        JSONObject tezVersion = otherInfoNode.optJSONObject(Constants.TEZ_VERSION);
+        if (tezVersion != null) {
+          String version = tezVersion.optString(Constants.VERSION);
+          String buildTime = tezVersion.optString(Constants.BUILD_TIME);
+          String revision = tezVersion.optString(Constants.REVISION);
+          this.versionInfo = new VersionInfo(version, buildTime, revision);
+        }
+        //TODO: might need to parse config info? (e.g, hive settings etc. could consume memory)
+      }
+      LOG.debug("Finished parsing tez application");
+    }
+  }
+
+  private JSONObject readJson(InputStream in) throws IOException, JSONException {
+    //Read entire content to memory
+    final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    IOUtils.copy(in, bout);
+    return new JSONObject(new String(bout.toByteArray(), "UTF-8"));
+  }
+
+  /**
+   * Read zip file contents. Every file can contain "dag", "vertices", "tasks", "task_attempts"
+   *
+   * @param atsFile
+   * @throws IOException
+   * @throws JSONException
+   */
+  private void parseATSZipFile(File atsFile)
+      throws IOException, JSONException, TezException, InterruptedException {
+    final ZipFile atsZipFile = new ZipFile(atsFile);
+    try {
+      Enumeration<? extends ZipEntry> zipEntries = atsZipFile.entries();
+      while (zipEntries.hasMoreElements()) {
+        ZipEntry zipEntry = zipEntries.nextElement();
+        LOG.info("Processing " + zipEntry.getName());
+        InputStream inputStream = atsZipFile.getInputStream(zipEntry);
+        JSONObject jsonObject = readJson(inputStream);
+
+        //This json can contain dag, vertices, tasks, task_attempts
+        JSONObject dagJson = jsonObject.optJSONObject(Constants.DAG);
+        if (dagJson != null) {
+          //TODO: support for multiple dags per ATS file later.
+          dagInfo = DagInfo.create(dagJson);
+        }
+
+        //Process vertex
+        JSONArray vertexJson = jsonObject.optJSONArray(Constants.VERTICES);
+        if (vertexJson != null) {
+          processVertices(vertexJson);
+        }
+
+        //Process task
+        JSONArray taskJson = jsonObject.optJSONArray(Constants.TASKS);
+        if (taskJson != null) {
+          processTasks(taskJson);
+        }
+
+        //Process task attempts
+        JSONArray attemptsJson = jsonObject.optJSONArray(Constants.TASK_ATTEMPTS);
+        if (attemptsJson != null) {
+          processAttempts(attemptsJson);
+        }
+
+        //Process application (mainly versionInfo)
+        JSONObject tezAppJson = jsonObject.optJSONObject(Constants.APPLICATION);
+        if (tezAppJson != null) {
+          processApplication(tezAppJson);
+        }
+      }
+    } finally {
+      atsZipFile.close();
+    }
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/AdditionalInputOutputDetails.java
new file mode 100644 (file)
index 0000000..b853d5c
--- /dev/null
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+/**
+ * Additional input/ouput information present in DAG.
+ */
+
+@Public
+@Evolving
+public class AdditionalInputOutputDetails {
+  private final String name;
+  private final String clazz;
+  private final String initializer;
+  private final String userPayloadText;
+
+  public AdditionalInputOutputDetails(String name, String clazz, String initializer,
+      String userPayloadText) {
+    this.name = name;
+    this.clazz = clazz;
+    this.initializer = initializer;
+    this.userPayloadText = userPayloadText;
+  }
+
+  public final String getName() {
+    return name;
+  }
+
+  public final String getClazz() {
+    return clazz;
+  }
+
+  public final String getInitializer() {
+    return initializer;
+  }
+
+  public final String getUserPayloadText() {
+    return userPayloadText;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("name=").append(name).append(", ");
+    sb.append("clazz=").append(clazz).append(", ");
+    sb.append("initializer=").append(initializer).append(", ");
+    sb.append("userPayloadText=").append(userPayloadText);
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java
new file mode 100644 (file)
index 0000000..8bd6bfb
--- /dev/null
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.history.parser.utils.Utils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public abstract class BaseInfo {
+
+  protected TezCounters tezCounters;
+  protected List<Event> eventList;
+
+  BaseInfo(JSONObject jsonObject) throws JSONException {
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    //parse tez counters
+    tezCounters = Utils.parseTezCountersFromJSON(
+        otherInfoNode.optJSONObject(Constants.COUNTERS));
+
+    //parse events
+    eventList = Lists.newArrayList();
+    Utils.parseEvents(jsonObject.optJSONArray(Constants.EVENTS), eventList);
+  }
+
+  public TezCounters getTezCounters() {
+    return tezCounters;
+  }
+
+  /**
+   * Get start time w.r.t DAG
+   *
+   * @return long
+   */
+  public abstract long getStartTime();
+
+  /**
+   * Get finish time w.r.t DAG
+   *
+   * @return long
+   */
+  public abstract long getFinishTime();
+
+  /**
+   * Get absolute start time
+   *
+   * @return long
+   */
+  public abstract long getAbsStartTime();
+
+  /**
+   * Get absolute finish time
+   *
+   * @return long
+   */
+  public abstract long getAbsFinishTime();
+
+  public abstract String getDiagnostics();
+
+  public List<Event> getEvents() {
+    return eventList;
+  }
+
+  /**
+   * Get counter for a specific counter group name.
+   * If counterGroupName is not mentioned, it would end up returning counter found in all
+   * groups
+   *
+   * @param counterGroupName
+   * @param counter
+   * @return Map<String, TezCounter> tez counter at every counter group level
+   */
+  public Map<String, TezCounter> getCounter(String counterGroupName, String counter) {
+    //TODO: FS, TaskCounters are directly getting added as TezCounters always pass those.  Need a
+    // way to get rid of these.
+    Map<String, TezCounter> result = Maps.newHashMap();
+    Iterator<String> iterator = tezCounters.getGroupNames().iterator();
+    boolean found = false;
+    while (iterator.hasNext()) {
+      CounterGroup counterGroup = tezCounters.getGroup(iterator.next());
+      if (counterGroupName != null) {
+        String groupName = counterGroup.getName();
+        if (groupName.equals(counterGroupName)) {
+          found = true;
+        }
+      }
+
+      //Explicitly mention that no need to create the counter if not present
+      TezCounter tezCounter = counterGroup.getUnderlyingGroup().findCounter(counter, false);
+      if (tezCounter != null) {
+        result.put(counterGroup.getName(), tezCounter);
+      }
+
+      if (found) {
+        //Retrieved counter specific to a counter group. Safe to exit.
+        break;
+      }
+
+    }
+    return result;
+  }
+
+  /**
+   * Find a counter in all counter groups
+   *
+   * @param counter
+   * @return Map of countergroup to TezCounter mapping
+   */
+  public Map<String, TezCounter> getCounter(String counter) {
+    return getCounter(null, counter);
+  }
+
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseParser.java
new file mode 100644 (file)
index 0000000..a484bd5
--- /dev/null
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+
+import java.util.List;
+
+public abstract class BaseParser {
+
+  protected DagInfo dagInfo;
+  protected VersionInfo versionInfo;
+  protected final List<VertexInfo> vertexList;
+  protected final List<TaskInfo> taskList;
+  protected final List<TaskAttemptInfo> attemptList;
+
+
+  public BaseParser() {
+    vertexList = Lists.newLinkedList();
+    taskList = Lists.newLinkedList();
+    attemptList = Lists.newLinkedList();
+  }
+
+  /**
+   * link the parsed contents, so that it becomes easier to iterate from DAG-->Task and Task--DAG.
+   * e.g Link vertex to dag, task to vertex, attempt to task etc
+   */
+  protected void linkParsedContents() {
+    //Link vertex to DAG
+    for (VertexInfo vertexInfo : vertexList) {
+      vertexInfo.setDagInfo(dagInfo);
+    }
+
+    //Link task to vertex
+    for (TaskInfo taskInfo : taskList) {
+      //Link vertex to task
+      String vertexId = TezTaskID.fromString(taskInfo.getTaskId()).getVertexID().toString();
+      VertexInfo vertexInfo = dagInfo.getVertexFromId(vertexId);
+      Preconditions.checkState(vertexInfo != null, "VertexInfo for " + vertexId + " can't be "
+          + "null");
+      taskInfo.setVertexInfo(vertexInfo);
+    }
+
+    //Link task attempt to task
+    for (TaskAttemptInfo attemptInfo : attemptList) {
+      //Link task to task attempt
+      TezTaskAttemptID taskAttemptId = TezTaskAttemptID.fromString(attemptInfo
+          .getTaskAttemptId());
+      VertexInfo vertexInfo = dagInfo.getVertexFromId(taskAttemptId.getTaskID()
+          .getVertexID().toString());
+      Preconditions.checkState(vertexInfo != null, "Vertex " + taskAttemptId.getTaskID()
+          .getVertexID().toString() + " is not present in DAG");
+      TaskInfo taskInfo = vertexInfo.getTask(taskAttemptId.getTaskID().toString());
+      attemptInfo.setTaskInfo(taskInfo);
+    }
+
+    //Set container details
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      for (TaskAttemptInfo taskAttemptInfo : vertexInfo.getTaskAttempts()) {
+        dagInfo.addContainerMapping(taskAttemptInfo.getContainer(), taskAttemptInfo);
+      }
+    }
+
+
+    //Set reference time for all events
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      setReferenceTime(vertexInfo.getEvents(), dagInfo.getStartTime());
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        setReferenceTime(taskInfo.getEvents(), dagInfo.getStartTime());
+        for (TaskAttemptInfo taskAttemptInfo : taskInfo.getTaskAttempts()) {
+          setReferenceTime(taskAttemptInfo.getEvents(), dagInfo.getStartTime());
+        }
+      }
+    }
+
+    dagInfo.setVersionInfo(versionInfo);
+  }
+
+  /**
+   * Set reference time to all events
+   *
+   * @param eventList
+   * @param referenceTime
+   */
+  private void setReferenceTime(List<Event> eventList, final long referenceTime) {
+    Iterables.all(eventList, new Predicate<Event>() {
+      @Override public boolean apply(Event input) {
+        input.setReferenceTime(referenceTime);
+        return false;
+      }
+    });
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Constants.java
new file mode 100644 (file)
index 0000000..3a24f15
--- /dev/null
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import org.apache.tez.common.ATSConstants;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Private;
+
+@Private
+public class Constants extends ATSConstants {
+
+  public static final String EVENT_TIME_STAMP = "timestamp";
+  public static final String TEZ_APPLICATION = "TEZ_APPLICATION";
+  public static final String TEZ_TASK_ID = "TEZ_TASK_ID";
+  public static final String TEZ_TASK_ATTEMPT_ID = "TEZ_TASK_ATTEMPT_ID";
+
+  public static final String EDGE_ID = "edgeId";
+  public static final String INPUT_VERTEX_NAME = "inputVertexName";
+  public static final String OUTPUT_VERTEX_NAME = "outputVertexName";
+  public static final String DATA_MOVEMENT_TYPE = "dataMovementType";
+  public static final String EDGE_SOURCE_CLASS = "edgeSourceClass";
+  public static final String EDGE_DESTINATION_CLASS = "edgeDestinationClass";
+  public static final String INPUT_PAYLOAD_TEXT = "inputUserPayloadAsText";
+  public static final String OUTPUT_PAYLOAD_TEXT = "outputUserPayloadAsText";
+
+  public static final String EDGES = "edges";
+  public static final String OUT_EDGE_IDS = "outEdgeIds";
+  public static final String IN_EDGE_IDS = "inEdgeIds";
+  public static final String ADDITIONAL_INPUTS = "additionalInputs";
+  public static final String ADDITIONAL_OUTPUTS = "additionalOutputs";
+
+  public static final String NAME = "name";
+  public static final String CLASS = "class";
+  public static final String INITIALIZER = "initializer";
+  public static final String USER_PAYLOAD_TEXT = "userPayloadAsText";
+
+  //constants for ATS data export
+  public static final String DAG = "dag";
+  public static final String VERTICES = "vertices";
+  public static final String TASKS = "tasks";
+  public static final String TASK_ATTEMPTS = "task_attempts";
+  public static final String APPLICATION = "application";
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Container.java
new file mode 100644 (file)
index 0000000..4e01d1b
--- /dev/null
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Objects;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Container {
+
+  private final String id;
+  private final String host;
+
+  public Container(String id, String host) {
+    this.id = id;
+    this.host = host;
+  }
+
+  public final String getId() {
+    return id;
+  }
+
+  public final String getHost() {
+    return host;
+  }
+
+  @Override public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("id=").append(id).append(", ");
+    sb.append("host=").append(host);
+    sb.append("]");
+    return sb.toString();
+  }
+
+  @Override public int hashCode() {
+    return Objects.hashCode(id, host);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    final Container other = (Container) obj;
+    return Objects.equal(this.id, other.id)
+        && Objects.equal(this.host, other.host);
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/DagInfo.java
new file mode 100644 (file)
index 0000000..0f3c3af
--- /dev/null
@@ -0,0 +1,555 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.commons.collections.BidiMap;
+import org.apache.commons.collections.bidimap.DualHashBidiMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.event.VertexState;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class DagInfo extends BaseInfo {
+
+  private static final Log LOG = LogFactory.getLog(DagInfo.class);
+
+  //Fields populated via JSON
+  private final String name;
+  private final long startTime;
+  private final long endTime;
+  private final long submitTime;
+  private final int failedTasks;
+  private final String dagId;
+  private final int numVertices;
+  private final String status;
+  private final String diagnostics;
+  private VersionInfo versionInfo;
+
+  //VertexID --> VertexName & vice versa
+  private final BidiMap vertexNameIDMapping;
+
+  //edgeId to EdgeInfo mapping
+  private final Map<Integer, EdgeInfo> edgeInfoMap;
+
+  //Only for internal parsing (vertexname mapping)
+  private Map<String, BasicVertexInfo> basicVertexInfoMap;
+
+  //VertexName --> VertexInfo
+  private Map<String, VertexInfo> vertexNameMap;
+
+  private Multimap<Container, TaskAttemptInfo> containerMapping;
+
+  DagInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    vertexNameMap = Maps.newHashMap();
+    vertexNameIDMapping = new DualHashBidiMap();
+    edgeInfoMap = Maps.newHashMap();
+    basicVertexInfoMap = Maps.newHashMap();
+    containerMapping = LinkedHashMultimap.create();
+
+    Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+        (Constants.TEZ_DAG_ID));
+
+    dagId = jsonObject.getString(Constants.ENTITY);
+
+    //Parse additional Info
+    JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    //TODO: Not getting populated correctly for lots of jobs.  Verify
+    submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+    JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
+    name = (dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null;
+    if (dagPlan != null) {
+      JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
+      if (vertices != null) {
+        numVertices = vertices.length();
+      } else {
+        numVertices = 0;
+      }
+      parseDAGPlan(dagPlan);
+    } else {
+      numVertices = 0;
+    }
+    status = otherInfoNode.optString(Constants.STATUS);
+
+    //parse name id mapping
+    JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
+    if (vertexIDMappingJson != null) {
+      //get vertex name
+      for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) {
+        String vertexId = vertexIDMappingJson.optString(entry.getKey());
+        //vertexName --> vertexId
+        vertexNameIDMapping.put(entry.getKey(), vertexId);
+      }
+    }
+  }
+
+  public static DagInfo create(JSONObject jsonObject) throws JSONException {
+    DagInfo dagInfo = new DagInfo(jsonObject);
+    return dagInfo;
+  }
+
+  private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
+    parseEdges(dagPlan.optJSONArray(Constants.EDGES));
+
+    JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
+    parseBasicVertexInfo(verticesInfo);
+  }
+
+  private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
+    if (verticesInfo == null) {
+      LOG.info("No vertices available.");
+      return;
+    }
+
+    //Parse basic information available in DAG for vertex and edges
+    for (int i = 0; i < verticesInfo.length(); i++) {
+      BasicVertexInfo basicVertexInfo = new BasicVertexInfo();
+
+      JSONObject vJson = verticesInfo.getJSONObject(i);
+      basicVertexInfo.vertexName =
+          vJson.optString(Constants.VERTEX_NAME);
+      JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS);
+      if (inEdges != null) {
+        String[] inEdgeIds = new String[inEdges.length()];
+        for (int j = 0; j < inEdges.length(); j++) {
+          inEdgeIds[j] = inEdges.get(j).toString();
+        }
+        basicVertexInfo.inEdgeIds = inEdgeIds;
+      }
+
+      JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS);
+      if (outEdges != null) {
+        String[] outEdgeIds = new String[outEdges.length()];
+        for (int j = 0; j < outEdges.length(); j++) {
+          outEdgeIds[j] = outEdges.get(j).toString();
+        }
+        basicVertexInfo.outEdgeIds = outEdgeIds;
+      }
+
+      JSONArray addInputsJson =
+          vJson.optJSONArray(Constants.ADDITIONAL_INPUTS);
+      basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson);
+
+      JSONArray addOutputsJson =
+          vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS);
+      basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson);
+
+      basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo);
+    }
+  }
+
+  /**
+   * get additional details available for every vertex in the dag
+   *
+   * @param jsonArray
+   * @return AdditionalInputOutputDetails[]
+   * @throws JSONException
+   */
+  private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws
+      JSONException {
+    if (jsonArray != null) {
+      AdditionalInputOutputDetails[]
+          additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()];
+      for (int j = 0; j < jsonArray.length(); j++) {
+        String name = jsonArray.getJSONObject(j).optString(
+            Constants.NAME);
+        String clazz = jsonArray.getJSONObject(j).optString(
+            Constants.CLASS);
+        String initializer =
+            jsonArray.getJSONObject(j).optString(Constants.INITIALIZER);
+        String userPayloadText = jsonArray.getJSONObject(j).optString(
+            Constants.USER_PAYLOAD_TEXT);
+
+        additionalInputOutputDetails[j] =
+            new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText);
+
+      }
+      return additionalInputOutputDetails;
+    }
+    return null;
+  }
+
+  /**
+   * Parse edge details in the DAG
+   *
+   * @param edgesArray
+   *
+   * @throws JSONException
+   */
+  private void parseEdges(JSONArray edgesArray) throws JSONException {
+    if (edgesArray == null) {
+      return;
+    }
+    for (int i = 0; i < edgesArray.length(); i++) {
+      JSONObject edge = edgesArray.getJSONObject(i);
+      Integer edgeId = edge.optInt(Constants.EDGE_ID);
+      String inputVertexName =
+          edge.optString(Constants.INPUT_VERTEX_NAME);
+      String outputVertexName =
+          edge.optString(Constants.OUTPUT_VERTEX_NAME);
+      String dataMovementType =
+          edge.optString(Constants.DATA_MOVEMENT_TYPE);
+      String edgeSourceClass =
+          edge.optString(Constants.EDGE_SOURCE_CLASS);
+      String edgeDestinationClass =
+          edge.optString(Constants.EDGE_DESTINATION_CLASS);
+      String inputUserPayloadAsText =
+          edge.optString(Constants.INPUT_PAYLOAD_TEXT);
+      String outputUserPayloadAsText =
+          edge.optString(Constants.OUTPUT_PAYLOAD_TEXT);
+      EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName,
+          dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText,
+          outputUserPayloadAsText);
+      edgeInfoMap.put(edgeId, edgeInfo);
+    }
+  }
+
+  static class BasicVertexInfo {
+    String vertexName;
+    String[] inEdgeIds;
+    String[] outEdgeIds;
+    AdditionalInputOutputDetails[] additionalInputs;
+    AdditionalInputOutputDetails[] additionalOutputs;
+  }
+
+  void addVertexInfo(VertexInfo vertexInfo) {
+    BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName());
+
+    Preconditions.checkArgument(basicVertexInfo != null,
+        "VerteName " + vertexInfo.getVertexName()
+            + " not present in DAG's vertices " + basicVertexInfoMap.entrySet());
+
+    //populate additional information in VertexInfo
+    if (basicVertexInfo.additionalInputs != null) {
+      vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs));
+    }
+    if (basicVertexInfo.additionalOutputs != null) {
+      vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs));
+    }
+
+    //Populate edge information in vertex
+    if (basicVertexInfo.inEdgeIds != null) {
+      for (String edge : basicVertexInfo.inEdgeIds) {
+        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+        vertexInfo.addInEdge(edgeInfo);
+      }
+    }
+
+    if (basicVertexInfo.outEdgeIds != null) {
+      for (String edge : basicVertexInfo.outEdgeIds) {
+        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
+        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
+        vertexInfo.addOutEdge(edgeInfo);
+      }
+    }
+
+    vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo);
+  }
+
+  void setVersionInfo(VersionInfo versionInfo) {
+    this.versionInfo = versionInfo;
+  }
+
+  void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) {
+    this.containerMapping.put(container, taskAttemptInfo);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("dagID=").append(getDagId()).append(", ");
+    sb.append("dagName=").append(getName()).append(", ");
+    sb.append("status=").append(getStatus()).append(", ");
+    sb.append("startTime=").append(getStartTime()).append(", ");
+    sb.append("submitTime=").append(getAbsoluteSubmitTime()).append(", ");
+    sb.append("endTime=").append(getFinishTime()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", ");
+    sb.append("failedTasks=").append(getFailedTaskCount()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+
+  public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  public final VersionInfo getVersionInfo() {
+    return versionInfo;
+  }
+
+  public final String getName() {
+    return name;
+  }
+
+  public final Collection<EdgeInfo> getEdges() {
+    return Collections.unmodifiableCollection(edgeInfoMap.values());
+  }
+
+  public final long getAbsoluteSubmitTime() {
+    return submitTime;
+  }
+
+  public final long getAbsStartTime() {
+    return startTime;
+  }
+
+  public final long getAbsFinishTime() {
+    return endTime;
+  }
+
+  /**
+   * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this.
+   * If absolute start time is needed, call getAbsStartTime().
+   *
+   * @return starting time w.r.t to dag
+   */
+  public final long getStartTime() {
+    return 0;
+  }
+
+  @Override
+  public final long getFinishTime() {
+    long dagEndTime = (endTime - startTime);
+    if (dagEndTime < 0) {
+      //probably dag is not complete or failed in middle. get the last task attempt time
+      for (VertexInfo vertexInfo : getVertices()) {
+        dagEndTime = (vertexInfo.getFinishTime() > dagEndTime) ? vertexInfo.getFinishTime() : dagEndTime;
+      }
+    }
+    return dagEndTime;
+  }
+
+  public final long getTimeTaken() {
+    return getFinishTime();
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  /**
+   * Get vertexInfo for a given vertexid
+   *
+   * @param vertexId
+   * @return VertexInfo
+   */
+  public VertexInfo getVertexFromId(String vertexId) {
+    return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId));
+  }
+
+  /**
+   * Get vertexInfo for a given vertex name
+   *
+   * @param vertexName
+   * @return VertexInfo
+   */
+  public final VertexInfo getVertex(String vertexName) {
+    return vertexNameMap.get(vertexName);
+  }
+
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  /**
+   * Get all vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getVertices() {
+    List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values());
+    Collections.sort(vertices, new Comparator<VertexInfo>() {
+
+      @Override public int compare(VertexInfo o1, VertexInfo o2) {
+        return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ?
+                0 : 1);
+      }
+    });
+    return Collections.unmodifiableList(vertices);
+  }
+
+  /**
+   * Get list of failed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getFailedVertices() {
+    return getVertices(VertexState.FAILED);
+  }
+
+  /**
+   * Get list of killed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getKilledVertices() {
+    return getVertices(VertexState.KILLED);
+  }
+
+  /**
+   * Get list of failed vertices
+   *
+   * @return List<VertexInfo>
+   */
+  public final List<VertexInfo> getSuccessfullVertices() {
+    return getVertices(VertexState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of vertices belonging to a specific state
+   *
+   * @param state
+   * @return Collection<VertexInfo>
+   */
+  public final List<VertexInfo> getVertices(final VertexState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (vertexNameMap.values()), new Predicate<VertexInfo>() {
+                  @Override public boolean apply(VertexInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  public final Map<String, VertexInfo> getVertexMapping() {
+    return Collections.unmodifiableMap(vertexNameMap);
+  }
+
+  private Ordering<VertexInfo> getVertexOrdering() {
+    return Ordering.from(new Comparator<VertexInfo>() {
+      @Override public int compare(VertexInfo o1, VertexInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get the slowest vertex in the DAG
+   *
+   * @return VertexInfo
+   */
+  public final VertexInfo getSlowestVertex() {
+    List<VertexInfo> vertexInfoList = getVertices();
+    if (vertexInfoList.size() == 0) {
+      return null;
+    }
+    return getVertexOrdering().max(vertexInfoList);
+  }
+
+  /**
+   * Get the slowest vertex in the DAG
+   *
+   * @return VertexInfo
+   */
+  public final VertexInfo getFastestVertex() {
+    List<VertexInfo> vertexInfoList = getVertices();
+    if (vertexInfoList.size() == 0) {
+      return null;
+    }
+    return getVertexOrdering().min(vertexInfoList);
+  }
+
+  /**
+   * Get node details for this DAG. Would be useful for analyzing node to tasks.
+   *
+   * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node
+   */
+  public final Multimap<String, TaskAttemptInfo> getNodeDetails() {
+    Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create();
+    for (VertexInfo vertexInfo : getVertices()) {
+      Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping();
+      for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) {
+        nodeDetails.put(entry.getKey().getHost(), entry.getValue());
+      }
+    }
+    return nodeDetails;
+  }
+
+  /**
+   * Get containers used for this DAG
+   *
+   * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container
+   */
+  public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() {
+    List<VertexInfo> VertexInfoList = getVertices();
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+
+    for (VertexInfo vertexInfo : VertexInfoList) {
+      containerMapping.putAll(vertexInfo.getContainersMapping());
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  public final Map getVertexNameIDMapping() {
+    return vertexNameIDMapping;
+  }
+
+  public final int getNumVertices() {
+    return numVertices;
+  }
+
+  public final String getDagId() {
+    return dagId;
+  }
+
+  public final int getFailedTaskCount() {
+    return failedTasks;
+  }
+
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/EdgeInfo.java
new file mode 100644 (file)
index 0000000..ab8e831
--- /dev/null
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class EdgeInfo {
+
+  private final String inputVertexName;
+  private final String outputVertexName;
+  private final String dataMovementType;
+  private final String edgeSourceClass;
+  private final String edgeDestinationClass;
+  private final String inputUserPayloadAsText;
+  private final String outputUserPayloadAsText;
+
+  private VertexInfo sourceVertex;
+  private VertexInfo destinationVertex;
+
+  public EdgeInfo(String inputVertexName, String outputVertexName, String dataMovementType,
+      String edgeSourceClass, String edgeDestinationClass, String inputUserPayloadAsText, String
+      outputUserPayloadAsText) {
+    this.inputVertexName = inputVertexName;
+    this.outputVertexName = outputVertexName;
+    this.dataMovementType = dataMovementType;
+    this.edgeSourceClass = edgeSourceClass;
+    this.edgeDestinationClass = edgeDestinationClass;
+    this.inputUserPayloadAsText = inputUserPayloadAsText;
+    this.outputUserPayloadAsText = outputUserPayloadAsText;
+  }
+
+  public final String getInputVertexName() {
+    return inputVertexName;
+  }
+
+  public final String getOutputVertexName() {
+    return outputVertexName;
+  }
+
+  public final String getDataMovementType() {
+    return dataMovementType;
+  }
+
+  public final String getEdgeSourceClass() {
+    return edgeSourceClass;
+  }
+
+  public final String getEdgeDestinationClass() {
+    return edgeDestinationClass;
+  }
+
+  public final String getInputUserPayloadAsText() {
+    return inputUserPayloadAsText;
+  }
+
+  public final String getOutputUserPayloadAsText() {
+    return outputUserPayloadAsText;
+  }
+
+  public final VertexInfo getSourceVertex() {
+    return sourceVertex;
+  }
+
+  public final void setSourceVertex(VertexInfo sourceVertex) {
+    this.sourceVertex = sourceVertex;
+  }
+
+  public final VertexInfo getDestinationVertex() {
+    return destinationVertex;
+  }
+
+  public final void setDestinationVertex(VertexInfo destinationVertex) {
+    this.destinationVertex = destinationVertex;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("inputVertexName=").append(inputVertexName).append(", ");
+    sb.append("outputVertexName=").append(outputVertexName).append(", ");
+    sb.append("dataMovementType=").append(dataMovementType).append(", ");
+    sb.append("edgeSourceClass=").append(edgeSourceClass).append(", ");
+    sb.append("edgeDestinationClass=").append(edgeDestinationClass).append(", ");
+    sb.append("inputUserPayloadAsText=").append(inputUserPayloadAsText).append(",");
+    sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText).append(", ");
+    sb.append("sourceVertex=").append(sourceVertex.getVertexName()).append(", ");
+    sb.append("destinationVertex=").append(destinationVertex.getVertexName()).append(", ");
+    sb.append("outputUserPayloadAsText=").append(outputUserPayloadAsText);
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/Event.java
new file mode 100644 (file)
index 0000000..70310f3
--- /dev/null
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class Event {
+  private final String info;
+  private final String type;
+  private final long time;
+
+  private long refTime; //typically dag start time.
+
+  public Event(String info, String type, long time) {
+    this.time = time;
+    this.type = type;
+    this.info = info;
+  }
+
+  void setReferenceTime(long refTime) {
+    this.refTime = refTime;
+  }
+
+  public final String getInfo() {
+    return info;
+  }
+
+  public final String getType() {
+    return type;
+  }
+
+  public final long getAbsoluteTime() {
+    return time;
+  }
+
+  public final long getTime() {
+    return time - refTime;
+  }
+
+  @Override
+  public String toString() {
+    return "[info=" + info + ", type=" + type + ", time=" + time + "]";
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskAttemptInfo.java
new file mode 100644 (file)
index 0000000..4c3fa97
--- /dev/null
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+
+@Public
+@Evolving
+public class TaskAttemptInfo extends BaseInfo {
+
+  private final String taskAttemptId;
+  private final long startTime;
+  private final long endTime;
+  private final String diagnostics;
+  private final String successfulAttemptId;
+  private final long scheduledTime;
+  private final String containerId;
+  private final String nodeId;
+  private final String status;
+  private final String logUrl;
+
+  private TaskInfo taskInfo;
+
+  private Container container;
+
+  TaskAttemptInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_TASK_ATTEMPT_ID));
+
+    taskAttemptId = jsonObject.optString(Constants.ENTITY);
+
+    //Parse additional Info
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    successfulAttemptId = otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID);
+    scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
+
+    containerId = otherInfoNode.optString(Constants.CONTAINER_ID);
+    String id = otherInfoNode.optString(Constants.NODE_ID);
+    nodeId = (id != null) ? (id.split(":")[0]) : "";
+    logUrl = otherInfoNode.optString(Constants.COMPLETED_LOGS_URL);
+
+    status = otherInfoNode.optString(Constants.STATUS);
+    container = new Container(containerId, nodeId);
+  }
+
+  void setTaskInfo(TaskInfo taskInfo) {
+    Preconditions.checkArgument(taskInfo != null, "Provide valid taskInfo");
+    this.taskInfo = taskInfo;
+    taskInfo.addTaskAttemptInfo(this);
+  }
+
+  @Override
+  public final long getStartTime() {
+    return startTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime());
+  }
+
+  @Override
+  public final long getFinishTime() {
+    return endTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime());
+  }
+
+  public final long getAbsStartTime() {
+    return startTime;
+  }
+
+  public final long getAbsFinishTime() {
+    return endTime;
+  }
+
+  public final long getAbsoluteScheduledTime() {
+    return scheduledTime;
+  }
+
+  public final long getTimeTaken() {
+    return getFinishTime() - getStartTime();
+  }
+
+  public final long getScheduledTime() {
+    return scheduledTime - (getTaskInfo().getVertexInfo().getDagInfo().getAbsStartTime());
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public static TaskAttemptInfo create(JSONObject taskInfoObject) throws JSONException {
+    return new TaskAttemptInfo(taskInfoObject);
+  }
+
+  public final boolean isLocalityInfoAvailable() {
+    Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.DATA_LOCAL_TASKS.toString());
+    Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.RACK_LOCAL_TASKS.toString());
+
+    Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+    if (!dataLocalTask.isEmpty() || !rackLocalTask.isEmpty() || !otherLocalTask.isEmpty()) {
+      return true;
+    }
+    return false;
+  }
+
+  public final TezCounter getLocalityInfo() {
+    Map<String, TezCounter> dataLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.DATA_LOCAL_TASKS.toString());
+    Map<String, TezCounter> rackLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.RACK_LOCAL_TASKS.toString());
+    Map<String, TezCounter> otherLocalTask = getCounter(DAGCounter.class.getName(),
+        DAGCounter.OTHER_LOCAL_TASKS.toString());
+
+    if (!dataLocalTask.isEmpty()) {
+      return dataLocalTask.get(DAGCounter.class.getName());
+    }
+
+    if (!rackLocalTask.isEmpty()) {
+      return rackLocalTask.get(DAGCounter.class.getName());
+    }
+
+    if (!otherLocalTask.isEmpty()) {
+      return otherLocalTask.get(DAGCounter.class.getName());
+    }
+    return null;
+  }
+
+  public final TaskInfo getTaskInfo() {
+    return taskInfo;
+  }
+
+  public final String getTaskAttemptId() {
+    return taskAttemptId;
+  }
+
+  public final String getSuccessfulAttemptId() {
+    return successfulAttemptId;
+  }
+
+  public final String getNodeId() {
+    return nodeId;
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final Container getContainer() {
+    return container;
+  }
+
+  public final String getLogURL() {
+    return logUrl;
+  }
+
+  /**
+   * Get merge counter per source. Available in case of reducer task
+   *
+   * @return Map<String, TezCounter> merge phase time at every counter group level
+   */
+  public final Map<String, TezCounter> getMergePhaseTime() {
+    return getCounter(null, TaskCounter.MERGE_PHASE_TIME.name());
+  }
+
+  /**
+   * Get shuffle counter per source. Available in case of shuffle
+   *
+   * @return Map<String, TezCounter> shuffle phase time at every counter group level
+   */
+  public final Map<String, TezCounter> getShufflePhaseTime() {
+    return getCounter(null, TaskCounter.SHUFFLE_PHASE_TIME.name());
+  }
+
+  /**
+   * Get OUTPUT_BYTES counter per source. Available in case of map outputs
+   *
+   * @return Map<String, TezCounter> output bytes counter at every counter group
+   */
+  public final Map<String, TezCounter> getTaskOutputBytes() {
+    return getCounter(null, TaskCounter.OUTPUT_BYTES.name());
+  }
+
+  /**
+   * Get number of spills per source.  (SPILLED_RECORDS / OUTPUT_RECORDS)
+   *
+   * @return Map<String, Long> spill count details
+   */
+  public final Map<String, Float> getSpillCount() {
+    Map<String, TezCounter> outputRecords = getCounter(null, "OUTPUT_RECORDS");
+    Map<String, TezCounter> spilledRecords = getCounter(null, "SPILLED_RECORDS");
+    Map<String, Float> result = Maps.newHashMap();
+    for (Map.Entry<String, TezCounter> entry : spilledRecords.entrySet()) {
+      String source = entry.getKey();
+      long spilledVal = entry.getValue().getValue();
+      long outputVal = outputRecords.get(source).getValue();
+      result.put(source, (spilledVal * 1.0f) / (outputVal * 1.0f));
+    }
+    return result;
+  }
+
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("taskAttemptId=").append(getTaskAttemptId()).append(", ");
+    sb.append("scheduledTime=").append(getScheduledTime()).append(", ");
+    sb.append("startTime=").append(getStartTime()).append(", ");
+    sb.append("finishTime=").append(getFinishTime()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("successfulAttempId=").append(getSuccessfulAttemptId()).append(", ");
+    sb.append("container=").append(getContainer()).append(", ");
+    sb.append("nodeId=").append(getNodeId()).append(", ");
+    sb.append("logURL=").append(getLogURL()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/TaskInfo.java
new file mode 100644 (file)
index 0000000..cb966a4
--- /dev/null
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class TaskInfo extends BaseInfo {
+
+  private final long startTime;
+  private final long endTime;
+  private final String diagnostics;
+  private final String successfulAttemptId;
+  private final long scheduledTime;
+  private final String status;
+  private final String taskId;
+
+  private VertexInfo vertexInfo;
+
+  private Map<String, TaskAttemptInfo> attemptInfoMap = Maps
+      .newHashMap();
+
+  TaskInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_TASK_ID));
+
+    taskId = jsonObject.optString(Constants.ENTITY);
+
+    //Parse additional Info
+    final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    successfulAttemptId = otherInfoNode.optString(Constants.SUCCESSFUL_ATTEMPT_ID);
+    scheduledTime = otherInfoNode.optLong(Constants.SCHEDULED_TIME);
+    status = otherInfoNode.optString(Constants.STATUS);
+  }
+
+  @Override
+  public final long getStartTime() {
+    return startTime - (vertexInfo.getDagInfo().getAbsStartTime());
+  }
+
+  public final long getAbsStartTime() {
+    return startTime;
+  }
+
+  public final long getAbsFinishTime() {
+    return endTime;
+  }
+
+  @Override
+  public final long getFinishTime() {
+    long taskFinishTime =  endTime - (vertexInfo.getDagInfo().getAbsStartTime());
+    if (taskFinishTime < 0) {
+      //probably vertex is not complete or failed in middle. get the last task attempt time
+      for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+        taskFinishTime = (attemptInfo.getFinishTime() > taskFinishTime)
+            ? attemptInfo.getFinishTime() : taskFinishTime;
+      }
+    }
+    return taskFinishTime;
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public static TaskInfo create(JSONObject taskInfoObject) throws
+      JSONException {
+    return new TaskInfo(taskInfoObject);
+  }
+
+  void addTaskAttemptInfo(TaskAttemptInfo taskAttemptInfo) {
+    attemptInfoMap.put(taskAttemptInfo.getTaskAttemptId(), taskAttemptInfo);
+  }
+
+  void setVertexInfo(VertexInfo vertexInfo) {
+    Preconditions.checkArgument(vertexInfo != null, "Provide valid vertexInfo");
+    this.vertexInfo = vertexInfo;
+    //link it to vertex
+    vertexInfo.addTaskInfo(this);
+  }
+
+  public final VertexInfo getVertexInfo() {
+    return vertexInfo;
+  }
+
+  /**
+   * Get all task attempts
+   *
+   * @return list of task attempt info
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts() {
+    List<TaskAttemptInfo> attemptsList = Lists.newLinkedList(attemptInfoMap.values());
+    Collections.sort(attemptsList, orderingOnAttemptStartTime());
+    return Collections.unmodifiableList(attemptsList);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getFailedTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.FAILED);
+  }
+
+  /**
+   * Get list of killed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getKilledTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.KILLED);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getSuccessfulTaskAttempts() {
+    return getTaskAttempts(TaskAttemptState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of tasks belonging to a specific state
+   *
+   * @param state
+   * @return Collection<TaskAttemptInfo>
+   */
+  public final List<TaskAttemptInfo> getTaskAttempts(final TaskAttemptState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (attemptInfoMap.values()), new Predicate<TaskAttemptInfo>() {
+                  @Override public boolean apply(TaskAttemptInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  /**
+   * Get the set of containers on which the task attempts ran for this task
+   *
+   * @return Multimap<Container, TaskAttemptInfo> task attempt details at container level
+   */
+  public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  /**
+   * Get the successful task attempt
+   *
+   * @return TaskAttemptInfo
+   */
+  public final TaskAttemptInfo getSuccessfulTaskAttempt() {
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      if (attemptInfo.getStatus().equalsIgnoreCase(TaskAttemptState.SUCCEEDED.toString())) {
+        return attemptInfo;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get last task attempt to finish
+   *
+   * @return TaskAttemptInfo
+   */
+  public final TaskAttemptInfo getLastTaskAttemptToFinish() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return null;
+    }
+
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getFinishTime() < o2.getFinishTime()) ? -1 :
+            ((o1.getFinishTime() == o2.getFinishTime()) ?
+                0 : 1);
+      }
+    }).max(attemptsList);
+  }
+
+  /**
+   * Get average task attempt duration. Includes succesful and failed tasks
+   *
+   * @return float
+   */
+  public final float getAvgTaskAttemptDuration() {
+    float totalTaskDuration = 0;
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.size() == 0) {
+      return 0;
+    }
+    for (TaskAttemptInfo attemptInfo : attemptsList) {
+      totalTaskDuration += attemptInfo.getTimeTaken();
+    }
+    return ((totalTaskDuration * 1.0f) / attemptsList.size());
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnTimeTaken() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
+                0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get min task attempt duration.  This includes successful/failed task attempts as well
+   *
+   * @return long
+   */
+  public final long getMinTaskAttemptDuration() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return 0;
+    }
+
+    return orderingOnTimeTaken().min(attemptsList).getTimeTaken();
+  }
+
+  /**
+   * Get max task attempt duration.  This includes successful/failed task attempts as well
+   *
+   * @return long
+   */
+  public final long getMaxTaskAttemptDuration() {
+    List<TaskAttemptInfo> attemptsList = getTaskAttempts();
+    if (attemptsList.isEmpty()) {
+      return 0;
+    }
+
+    return orderingOnTimeTaken().max(attemptsList).getTimeTaken();
+  }
+
+  public final int getNumberOfTaskAttempts() {
+    return getTaskAttempts().size();
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final String getTaskId() {
+    return taskId;
+  }
+
+  public final long getTimeTaken() {
+    return getFinishTime() - getStartTime();
+  }
+
+  public final String getSuccessfulAttemptId() {
+    return successfulAttemptId;
+  }
+
+  public final long getAbsoluteScheduleTime() {
+    return scheduledTime;
+  }
+
+  public final long getScheduledTime() {
+    return scheduledTime - this.getVertexInfo().getDagInfo().getAbsStartTime();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("taskId=").append(getTaskId()).append(", ");
+    sb.append("scheduledTime=").append(getAbsoluteScheduleTime()).append(", ");
+    sb.append("startTime=").append(getStartTime()).append(", ");
+    sb.append("finishTime=").append(getFinishTime()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("successfulAttempId=").append(getSuccessfulAttemptId()).append(", ");
+    sb.append("status=").append(getStatus()).append(", ");
+    sb.append("vertexName=").append(getVertexInfo().getVertexName());
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VersionInfo.java
new file mode 100644 (file)
index 0000000..97d18cd
--- /dev/null
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+public class VersionInfo {
+
+  private final String buildTime;
+  private final String revision;
+  private final String version;
+
+  public VersionInfo(String buildTime, String revision, String version) {
+    this.buildTime = buildTime;
+    this.revision = revision;
+    this.version = version;
+  }
+
+  public String getBuildTime() {
+    return buildTime;
+  }
+
+  public String getRevision() {
+    return revision;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/VertexInfo.java
new file mode 100644 (file)
index 0000000..3445adb
--- /dev/null
@@ -0,0 +1,536 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.datamodel;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.LinkedHashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.Ordering;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.classification.InterfaceAudience.Public;
+import static org.apache.hadoop.classification.InterfaceStability.Evolving;
+
+@Public
+@Evolving
+public class VertexInfo extends BaseInfo {
+
+  private final String vertexName;
+  private final long endTime;
+  private final long initTime;
+  private final String diagnostics;
+  private final String processorClass;
+
+  private final int numTasks;
+  private final int failedTasks;
+  private final int completedTasks;
+  private final int succeededTasks;
+  private final int killedTasks;
+  private final int numFailedTaskAttempts;
+
+  private final String status;
+
+  private final long startTime;
+
+  //TaskID --> TaskInfo for internal reference
+  private Map<String, TaskInfo> taskInfoMap;
+
+  private final List<EdgeInfo> inEdgeList;
+  private final List<EdgeInfo> outEdgeList;
+
+  private final List<AdditionalInputOutputDetails> additionalInputInfoList;
+  private final List<AdditionalInputOutputDetails> additionalOutputInfoList;
+
+  private DagInfo dagInfo;
+
+  VertexInfo(JSONObject jsonObject) throws JSONException {
+    super(jsonObject);
+
+    Preconditions.checkArgument(
+        jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
+            (Constants.TEZ_VERTEX_ID));
+
+    taskInfoMap = Maps.newHashMap();
+
+    inEdgeList = Lists.newLinkedList();
+    outEdgeList = Lists.newLinkedList();
+    additionalInputInfoList = Lists.newLinkedList();
+    additionalOutputInfoList = Lists.newLinkedList();
+
+    //Parse additional Info
+    JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
+    startTime = otherInfoNode.optLong(Constants.START_TIME);
+    initTime = otherInfoNode.optLong(Constants.INIT_TIME);
+    endTime = otherInfoNode.optLong(Constants.FINISH_TIME);
+    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
+    numTasks = otherInfoNode.optInt(Constants.NUM_TASKS);
+    failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
+    succeededTasks =
+        otherInfoNode.optInt(Constants.NUM_SUCCEEDED_TASKS);
+    completedTasks =
+        otherInfoNode.optInt(Constants.NUM_COMPLETED_TASKS);
+    killedTasks = otherInfoNode.optInt(Constants.NUM_KILLED_TASKS);
+    numFailedTaskAttempts =
+        otherInfoNode.optInt(Constants.NUM_FAILED_TASKS_ATTEMPTS);
+    vertexName = otherInfoNode.optString(Constants.VERTEX_NAME);
+    processorClass = otherInfoNode.optString(Constants.PROCESSOR_CLASS_NAME);
+    status = otherInfoNode.optString(Constants.STATUS);
+  }
+
+  public static VertexInfo create(JSONObject vertexInfoObject) throws
+      JSONException {
+    return new VertexInfo(vertexInfoObject);
+  }
+
+  /**
+   * Update edge details with source and destination vertex objects.
+   */
+  private void updateEdgeInfo() {
+    if (dagInfo.getNumVertices() == dagInfo.getVertices().size()) {
+      //We can update EdgeInfo when all vertices are parsed
+      Map<String, VertexInfo> vertexMapping = dagInfo.getVertexMapping();
+      for (EdgeInfo edge : dagInfo.getEdges()) {
+        VertexInfo sourceVertex = vertexMapping.get(edge.getInputVertexName());
+        VertexInfo destinationVertex = vertexMapping.get(edge.getOutputVertexName());
+        edge.setSourceVertex(sourceVertex);
+        edge.setDestinationVertex(destinationVertex);
+      }
+    }
+  }
+
+  void addTaskInfo(TaskInfo taskInfo) {
+    this.taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
+  }
+
+  void setAdditionalInputInfoList(List<AdditionalInputOutputDetails> additionalInputInfoList) {
+    this.additionalInputInfoList.clear();
+    this.additionalInputInfoList.addAll(additionalInputInfoList);
+  }
+
+  void setAdditionalOutputInfoList(List<AdditionalInputOutputDetails> additionalOutputInfoList) {
+    this.additionalOutputInfoList.clear();
+    this.additionalOutputInfoList.addAll(additionalOutputInfoList);
+  }
+
+  void addInEdge(EdgeInfo edgeInfo) {
+    this.inEdgeList.add(edgeInfo);
+  }
+
+  void addOutEdge(EdgeInfo edgeInfo) {
+    this.outEdgeList.add(edgeInfo);
+  }
+
+  void setDagInfo(DagInfo dagInfo) {
+    Preconditions.checkArgument(dagInfo != null, "Provide valid dagInfo");
+    this.dagInfo = dagInfo;
+    //link vertex to dagInfo
+    dagInfo.addVertexInfo(this);
+    updateEdgeInfo();
+  }
+
+  @Override
+  public final long getStartTime() {
+    return startTime - (dagInfo.getAbsStartTime());
+  }
+
+  public final long getFirstTaskStartTime() {
+    return getFirstTaskToStart().getStartTime();
+  }
+
+  public final long getLastTaskFinishTime() {
+    if (getLastTaskToFinish() == null || getLastTaskToFinish().getFinishTime() < 0) {
+        return dagInfo.getFinishTime();
+    }
+    return getLastTaskToFinish().getFinishTime();
+  }
+
+  public final long getAbsStartTime() {
+    return startTime;
+  }
+
+  public final long getAbsFinishTime() {
+    return endTime;
+  }
+
+  public final long getAbsoluteInitTime() {
+    return initTime;
+  }
+
+  @Override
+  public final long getFinishTime() {
+    long vertexEndTime = endTime - (dagInfo.getAbsStartTime());
+    if (vertexEndTime < 0) {
+      //probably vertex is not complete or failed in middle. get the last task attempt time
+      for (TaskInfo taskInfo : getTasks()) {
+        vertexEndTime = (taskInfo.getFinishTime() > vertexEndTime)
+            ? taskInfo.getFinishTime() : vertexEndTime;
+      }
+    }
+    return vertexEndTime;
+  }
+
+  @Override
+  public final String getDiagnostics() {
+    return diagnostics;
+  }
+
+  public final String getVertexName() {
+    return vertexName;
+  }
+
+  //Quite possible that getFinishTime is not yet recorded for failed vertices (or killed vertices)
+  //Start time of vertex infers that the dependencies are done and AM has inited it.
+  public final long getTimeTaken() {
+    return (getFinishTime() - getStartTime());
+  }
+
+  //Time taken for last task to finish  - time taken for first task to start
+  public final long getTimeTakenForTasks() {
+    return (getLastTaskFinishTime() - getFirstTaskStartTime());
+  }
+
+  public final long getInitTime() {
+    return initTime - dagInfo.getAbsStartTime();
+  }
+
+  public final int getNumTasks() {
+    return numTasks;
+  }
+
+  public final int getFailedTasksCount() {
+    return failedTasks;
+  }
+
+  public final int getKilledTasksCount() {
+    return killedTasks;
+  }
+
+  public final int getCompletedTasksCount() {
+    return completedTasks;
+  }
+
+  public final int getSucceededTasksCount() {
+    return succeededTasks;
+  }
+
+  public final int getNumFailedTaskAttemptsCount() {
+    return numFailedTaskAttempts;
+  }
+
+  public final String getProcessorClassName() {
+    return processorClass;
+
+  }
+
+  /**
+   * Get all tasks
+   *
+   * @return list of taskInfo
+   */
+  public final List<TaskInfo> getTasks() {
+    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+    Collections.sort(taskInfoList, orderingOnStartTime());
+    return Collections.unmodifiableList(taskInfoList);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getFailedTasks() {
+    return getTasks(TaskState.FAILED);
+  }
+
+  /**
+   * Get list of killed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getKilledTasks() {
+    return getTasks(TaskState.KILLED);
+  }
+
+  /**
+   * Get list of failed tasks
+   *
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getSuccessfulTasks() {
+    return getTasks(TaskState.SUCCEEDED);
+  }
+
+  /**
+   * Get list of tasks belonging to a specific state
+   *
+   * @param state
+   * @return List<TaskAttemptInfo>
+   */
+  public final List<TaskInfo> getTasks(final TaskState state) {
+    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
+                    (taskInfoMap.values()), new Predicate<TaskInfo>() {
+                  @Override public boolean apply(TaskInfo input) {
+                    return input.getStatus() != null && input.getStatus().equals(state.toString());
+                  }
+                }
+            )
+        )
+    );
+  }
+
+  /**
+   * Get source vertices for this vertex
+   *
+   * @return List<VertexInfo> list of incoming vertices to this vertex
+   */
+  public final List<VertexInfo> getInputVertices() {
+    List<VertexInfo> inputVertices = Lists.newLinkedList();
+    for (EdgeInfo edge : inEdgeList) {
+      inputVertices.add(edge.getSourceVertex());
+    }
+    return Collections.unmodifiableList(inputVertices);
+  }
+
+  /**
+   * Get destination vertices for this vertex
+   *
+   * @return List<VertexInfo> list of output vertices
+   */
+  public final List<VertexInfo> getOutputVertices() {
+    List<VertexInfo> outputVertices = Lists.newLinkedList();
+    for (EdgeInfo edge : outEdgeList) {
+      outputVertices.add(edge.getDestinationVertex());
+    }
+    return Collections.unmodifiableList(outputVertices);
+  }
+
+  public List<TaskAttemptInfo> getTaskAttempts() {
+    List<TaskAttemptInfo> taskAttemptInfos = Lists.newLinkedList();
+    for (TaskInfo taskInfo : getTasks()) {
+      taskAttemptInfos.addAll(taskInfo.getTaskAttempts());
+    }
+    Collections.sort(taskAttemptInfos, orderingOnAttemptStartTime());
+    return Collections.unmodifiableList(taskAttemptInfos);
+  }
+
+  public final TaskInfo getTask(String taskId) {
+    return taskInfoMap.get(taskId);
+  }
+
+  /**
+   * Get incoming edge information for a specific vertex
+   *
+   * @return List<EdgeInfo> list of input edges on this vertex
+   */
+  public final List<EdgeInfo> getInputEdges() {
+    return Collections.unmodifiableList(inEdgeList);
+  }
+
+  /**
+   * Get outgoing edge information for a specific vertex
+   *
+   * @return List<EdgeInfo> list of output edges on this vertex
+   */
+  public final List<EdgeInfo> getOutputEdges() {
+    return Collections.unmodifiableList(outEdgeList);
+  }
+
+  public final Multimap<Container, TaskAttemptInfo> getContainersMapping() {
+    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();
+    for (TaskAttemptInfo attemptInfo : getTaskAttempts()) {
+      containerMapping.put(attemptInfo.getContainer(), attemptInfo);
+    }
+    return Multimaps.unmodifiableMultimap(containerMapping);
+  }
+
+  /**
+   * Get first task to start
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getFirstTaskToStart() {
+    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ?
+                0 : 1);
+      }
+    });
+    return taskInfoList.get(0);
+  }
+
+  /**
+   * Get last task to finish
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getLastTaskToFinish() {
+    List<TaskInfo> taskInfoList = Lists.newLinkedList(taskInfoMap.values());
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    Collections.sort(taskInfoList, new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getFinishTime() > o2.getFinishTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ?
+                0 : 1);
+      }
+    });
+    return taskInfoList.get(0);
+  }
+
+  /**
+   * Get average task duration
+   *
+   * @return long
+   */
+  public final float getAvgTaskDuration() {
+    float totalTaskDuration = 0;
+    List<TaskInfo> tasksList = getTasks();
+    if (tasksList.size() == 0) {
+      return 0;
+    }
+    for (TaskInfo taskInfo : tasksList) {
+      totalTaskDuration += taskInfo.getTimeTaken();
+    }
+    return ((totalTaskDuration * 1.0f) / tasksList.size());
+  }
+
+  /**
+   * Get min task duration in vertex
+   *
+   * @return long
+   */
+  public final long getMinTaskDuration() {
+    TaskInfo taskInfo = getMinTaskDurationTask();
+    return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+  }
+
+  /**
+   * Get max task duration in vertex
+   *
+   * @return long
+   */
+  public final long getMaxTaskDuration() {
+    TaskInfo taskInfo = getMaxTaskDurationTask();
+    return (taskInfo != null) ? taskInfo.getTimeTaken() : 0;
+  }
+
+  private Ordering<TaskInfo> orderingOnTimeTaken() {
+    return Ordering.from(new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
+            ((o1.getTimeTaken() == o2.getTimeTaken()) ? 0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskInfo> orderingOnStartTime() {
+    return Ordering.from(new Comparator<TaskInfo>() {
+      @Override public int compare(TaskInfo o1, TaskInfo o2) {
+        return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1);
+      }
+    });
+  }
+
+  private Ordering<TaskAttemptInfo> orderingOnAttemptStartTime() {
+    return Ordering.from(new Comparator<TaskAttemptInfo>() {
+      @Override public int compare(TaskAttemptInfo o1, TaskAttemptInfo o2) {
+        return (o1.getStartTime() < o2.getStartTime()) ? -1 :
+            ((o1.getStartTime() == o2.getStartTime()) ? 0 : 1);
+      }
+    });
+  }
+
+  /**
+   * Get min task duration in vertex
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getMinTaskDurationTask() {
+    List<TaskInfo> taskInfoList = getTasks();
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+
+    return orderingOnTimeTaken().min(taskInfoList);
+  }
+
+  /**
+   * Get max task duration in vertex
+   *
+   * @return TaskInfo
+   */
+  public final TaskInfo getMaxTaskDurationTask() {
+    List<TaskInfo> taskInfoList = getTasks();
+    if (taskInfoList.size() == 0) {
+      return null;
+    }
+    return orderingOnTimeTaken().max(taskInfoList);
+  }
+
+  public final String getStatus() {
+    return status;
+  }
+
+  public final DagInfo getDagInfo() {
+    return dagInfo;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("[");
+    sb.append("vertexName=").append(getVertexName()).append(", ");
+    sb.append("events=").append(getEvents()).append(", ");
+    sb.append("initTime=").append(getInitTime()).append(", ");
+    sb.append("startTime=").append(getStartTime()).append(", ");
+    sb.append("endTime=").append(getFinishTime()).append(", ");
+    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
+    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
+    sb.append("numTasks=").append(getNumTasks()).append(", ");
+    sb.append("processorClassName=").append(getProcessorClassName()).append(", ");
+    sb.append("numCompletedTasks=").append(getCompletedTasksCount()).append(", ");
+    sb.append("numFailedTaskAttempts=").append(getNumFailedTaskAttemptsCount()).append(", ");
+    sb.append("numSucceededTasks=").append(getSucceededTasksCount()).append(", ");
+    sb.append("numFailedTasks=").append(getFailedTasks()).append(", ");
+    sb.append("numKilledTasks=").append(getKilledTasks()).append(", ");
+    sb.append("tasksCount=").append(taskInfoMap.size()).append(", ");
+    sb.append("status=").append(getStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+}
diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java
new file mode 100644 (file)
index 0000000..16ef4c9
--- /dev/null
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history.parser.utils;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.tez.history.parser.datamodel.Constants;
+import org.apache.tez.history.parser.datamodel.Event;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.common.counters.TezCounters;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.List;
+
+@InterfaceAudience.Private
+public class Utils {
+
+  private static final String LOG4J_CONFIGURATION = "log4j.configuration";
+
+  /**
+   * Parse tez counters from json
+   *
+   * @param jsonObject
+   * @return TezCounters
+   * @throws JSONException
+   */
+  public static TezCounters parseTezCountersFromJSON(JSONObject jsonObject)
+      throws JSONException {
+    TezCounters counters = new TezCounters();
+
+    if (jsonObject == null) {
+      return counters; //empty counters.
+    }
+
+    final JSONArray counterGroupNodes = jsonObject.optJSONArray(Constants.COUNTER_GROUPS);
+    if (counterGroupNodes != null) {
+      for (int i = 0; i < counterGroupNodes.length(); i++) {
+        JSONObject counterGroupNode = counterGroupNodes.optJSONObject(i);
+        final String groupName = counterGroupNode.optString(Constants.COUNTER_GROUP_NAME);
+        final String groupDisplayName = counterGroupNode.optString(
+            Constants.COUNTER_GROUP_DISPLAY_NAME);
+
+        CounterGroup group = counters.addGroup(groupName, groupDisplayName);
+
+        final JSONArray counterNodes = counterGroupNode.optJSONArray(Constants.COUNTERS);
+
+        //Parse counter nodes
+        for (int j = 0; j < counterNodes.length(); j++) {
+          JSONObject counterNode = counterNodes.optJSONObject(j);
+          final String counterName = counterNode.getString(Constants.COUNTER_NAME);
+          final String counterDisplayName =
+              counterNode.getString(Constants.COUNTER_DISPLAY_NAME);
+          final long counterValue = counterNode.getLong(Constants.COUNTER_VALUE);
+          TezCounter counter = group.findCounter(
+              counterName,
+              counterDisplayName);
+          counter.setValue(counterValue);
+        }
+      }
+    }
+    return counters;
+  }
+
+  /**
+   * Parse events from json
+   *
+   * @param eventNodes
+   * @param eventList
+   * @throws JSONException
+   */
+  public static void parseEvents(JSONArray eventNodes, List<Event> eventList) throws
+      JSONException {
+    if (eventNodes == null) {
+      return;
+    }
+    for (int i = 0; i < eventNodes.length(); i++) {
+      JSONObject eventNode = eventNodes.optJSONObject(i);
+      final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
+      final String eventType = eventNode.optString(Constants.EVENT_TYPE);
+      final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
+
+      Event event = new Event(eventInfo, eventType, time);
+
+      eventList.add(event);
+
+    }
+  }
+
+  public static void setupRootLogger() {
+    if (Strings.isEmpty(System.getProperty(LOG4J_CONFIGURATION))) {
+      //By default print to console with INFO level
+      Logger.getRootLogger().
+          addAppender(new ConsoleAppender(new PatternLayout(PatternLayout.TTCC_CONVERSION_PATTERN)));
+      Logger.getRootLogger().setLevel(Level.INFO);
+    }
+  }
+
+}
diff --git a/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java b/tez-plugins/tez-history-parser/src/test/java/org/apache/tez/history/TestATSFileParser.java
new file mode 100644 (file)
index 0000000..faff182
--- /dev/null
@@ -0,0 +1,545 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.history;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.counters.DAGCounter;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.StatusGetOpts;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
+import org.apache.tez.dag.api.oldrecords.TaskState;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.examples.WordCount;
+import org.apache.tez.history.parser.ATSFileParser;
+import org.apache.tez.history.parser.datamodel.DagInfo;
+import org.apache.tez.history.parser.datamodel.EdgeInfo;
+import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
+import org.apache.tez.history.parser.datamodel.TaskInfo;
+import org.apache.tez.history.parser.datamodel.VersionInfo;
+import org.apache.tez.history.parser.datamodel.VertexInfo;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.mapreduce.output.MROutput;
+import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
+import org.apache.tez.runtime.api.ProcessorContext;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
+import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
+import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
+import org.apache.tez.runtime.library.partitioner.HashPartitioner;
+import org.apache.tez.tests.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestATSFileParser {
+
+  private static MiniDFSCluster miniDFSCluster;
+  private static MiniTezClusterWithTimeline miniTezCluster;
+
+  //location within miniHDFS cluster's hdfs
+  private static Path inputLoc = new Path("/tmp/sample.txt");
+
+  private final static String INPUT = "Input";
+  private final static String OUTPUT = "Output";
+  private final static String TOKENIZER = "Tokenizer";
+  private final static String SUMMATION = "Summation";
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem fs;
+  private static String TEST_ROOT_DIR =
+      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tmpDir";
+  private static String TEZ_BASE_DIR =
+      "target" + Path.SEPARATOR + TestATSFileParser.class.getName() + "-tez";
+  private static String DOWNLOAD_DIR = TEST_ROOT_DIR + Path.SEPARATOR + "download";
+
+  private static String timelineAddress;
+  private static TezClient tezClient;
+
+  private static int dagNumber;
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH, false);
+    EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+    miniDFSCluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
+    fs = miniDFSCluster.getFileSystem();
+    conf.set("fs.defaultFS", fs.getUri().toString());
+
+    setupTezCluster();
+  }
+
+  @AfterClass
+  public static void shutdownCluster() {
+    try {
+      if (tezClient != null) {
+        try {
+          tezClient.stop();
+        } catch (TezException e) {
+          //ignore
+        } catch (IOException e) {
+          //ignore
+        }
+      }
+      if (miniDFSCluster != null) {
+        miniDFSCluster.shutdown();
+      }
+      if (miniTezCluster != null) {
+        miniTezCluster.stop();
+      }
+    } finally {
+      try {
+        FileUtils.deleteDirectory(new File(TEST_ROOT_DIR));
+        FileUtils.deleteDirectory(new File(TEZ_BASE_DIR));
+      } catch (IOException e) {
+        //safe to ignore
+      }
+    }
+  }
+
+  // @Before
+  public static void setupTezCluster() throws Exception {
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3 * 1000);
+    conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT, 2);
+
+    //Enable per edge counters
+    conf.setBoolean(TezConfiguration.TEZ_TASK_GENERATE_COUNTERS_PER_IO, true);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, ATSHistoryLoggingService
+        .class.getName());
+
+    miniTezCluster =
+        new MiniTezClusterWithTimeline(TEZ_BASE_DIR, 1, 1, 1, true);
+
+    miniTezCluster.init(conf);
+    miniTezCluster.start();
+
+    createSampleFile(inputLoc);
+
+    TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig());
+    tezConf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    tezConf.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "0.0.0.0:8188");
+    tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+    tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+        ATSHistoryLoggingService.class.getName());
+
+    tezClient = TezClient.create("WordCount", tezConf, true);
+    tezClient.start();
+    tezClient.waitTillReady();
+  }
+
+
+  /**
+   * Run a word count example in mini cluster and check if it is possible to download
+   * data from ATS and parse it.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob() throws Exception {
+    //Run basic word count example.
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount");
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data
+    DagInfo dagInfo = getDagInfo(dagId);
+
+    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+    verifyDagInfo(dagInfo);
+
+    //Job specific
+    assertTrue(dagInfo.getNumVertices() == 2);
+    assertTrue(dagInfo.getName().equals("WordCount"));
+    assertTrue(dagInfo.getVertex(TOKENIZER).getProcessorClassName().equals(
+        WordCount.TokenProcessor.class.getName()));
+    assertTrue(dagInfo.getVertex(SUMMATION).getProcessorClassName()
+        .equals(WordCount.SumProcessor.class.getName()));
+    assertTrue(dagInfo.getEdges().size() == 1);
+    EdgeInfo edgeInfo = dagInfo.getEdges().iterator().next();
+    assertTrue(edgeInfo.getDataMovementType().
+        equals(EdgeProperty.DataMovementType.SCATTER_GATHER.toString()));
+    assertTrue(edgeInfo.getSourceVertex().getVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getDestinationVertex().getVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getInputVertexName().equals(TOKENIZER));
+    assertTrue(edgeInfo.getOutputVertexName().equals(SUMMATION));
+    assertTrue(edgeInfo.getEdgeSourceClass().equals(OrderedPartitionedKVOutput.class.getName()));
+    assertTrue(edgeInfo.getEdgeDestinationClass().equals(OrderedGroupedKVInput.class.getName()));
+    assertTrue(dagInfo.getVertices().size() == 2);
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      assertTrue(vertexInfo.getKilledTasksCount() == 0);
+      for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+        assertTrue(taskInfo.getNumberOfTaskAttempts() == 1);
+        assertTrue(taskInfo.getMaxTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getMinTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getAvgTaskAttemptDuration() >= 0);
+        assertTrue(taskInfo.getLastTaskAttemptToFinish() != null);
+        assertTrue(taskInfo.getContainersMapping().size() > 0);
+        assertTrue(taskInfo.getSuccessfulTaskAttempts().size() > 0);
+        assertTrue(taskInfo.getFailedTaskAttempts().size() == 0);
+        assertTrue(taskInfo.getKilledTaskAttempts().size() == 0);
+      }
+      assertTrue(vertexInfo.getLastTaskToFinish() != null);
+      if (vertexInfo.getVertexName().equals(TOKENIZER)) {
+        assertTrue(vertexInfo.getInputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputVertices().size() == 1);
+        assertTrue(vertexInfo.getInputVertices().size() == 0);
+      } else {
+        assertTrue(vertexInfo.getInputEdges().size() == 1);
+        assertTrue(vertexInfo.getOutputEdges().size() == 0);
+        assertTrue(vertexInfo.getOutputVertices().size() == 0);
+        assertTrue(vertexInfo.getInputVertices().size() == 1);
+      }
+    }
+  }
+
+  /**
+   * Run a word count example in mini cluster.
+   * Provide invalid URL for ATS.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testParserWithSuccessfulJob_InvalidATS() throws Exception {
+    //Run basic word count example.
+    String dagId =  runWordCount(WordCount.TokenProcessor.class.getName(),
+        WordCount.SumProcessor.class.getName(), "WordCount-With-WrongATS-URL");
+
+    //Export the data from ATS
+    String atsAddress = "--atsAddress=http://atsHost:8188";
+    String[] args = { "--dagId=" + dagId,
+        "--downloadDir=" + DOWNLOAD_DIR,
+        atsAddress
+      };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == -1);
+  }
+
+  /**
+   * Run a failed job and parse the data from ATS
+   */
+  @Test
+  public void testParserWithFailedJob() throws Exception {
+    //Run a job which would fail
+    String dagId = runWordCount(WordCount.TokenProcessor.class.getName(), FailProcessor.class
+        .getName(), "WordCount-With-Exception");
+
+    //Export the data from ATS
+    String[] args = { "--dagId=" + dagId, "--downloadDir=" + DOWNLOAD_DIR };
+
+    int result = ATSImportTool.process(args);
+    assertTrue(result == 0);
+
+    //Parse ATS data
+    DagInfo dagInfo = getDagInfo(dagId);
+
+    //Verify DAGInfo. Verifies vertex, task, taskAttempts in recursive manner
+    verifyDagInfo(dagInfo);
+
+    //Dag specific
+    VertexInfo summationVertex = dagInfo.getVertex(SUMMATION);
+    assertTrue(summationVertex.getFailedTasks().size() == 1); //1 task, 4 attempts failed
+    assertTrue(summationVertex.getFailedTasks().get(0).getFailedTaskAttempts().size() == 4);
+    assertTrue(summationVertex.getStatus().equals(VertexState.FAILED.toString()));
+
+    assertTrue(dagInfo.getFailedVertices().size() == 1);
+    assertTrue(dagInfo.getFailedVertices().get(0).getVertexName().equals(SUMMATION));
+    assertTrue(dagInfo.getSuccessfullVertices().size() == 1);
+    assertTrue(dagInfo.getSuccessfullVertices().get(0).getVertexName().equals(TOKENIZER));
+
+    assertTrue(dagInfo.getStatus().equals(DAGState.FAILED.toString()));
+
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_FAILED_TASKS.toString()), null, 4);
+    verifyCounter(dagInfo.getCounter(DAGCounter.NUM_SUCCEEDED_TASKS.toString()), null, 1);
+    verifyCounter(dagInfo.getCounter(DAGCounter.TOTAL_LAUNCHED_TASKS.toString()), null, 5);
+
+    verifyCounter(dagInfo.getCounter(TaskCounter.INPUT_RECORDS_PROCESSED.toString()),
+        "TaskCounter_Tokenizer_INPUT_Input", 10);
+    verifyCounter(dagInfo.getCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 0);
+    verifyCounter(dagInfo.getCounter(TaskCounter.OUTPUT_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation",
+        20); //Every line has 2 words. 10 lines x 2 words = 20
+    verifyCounter(dagInfo.getCounter(TaskCounter.SPILLED_RECORDS.toString()),
+        "TaskCounter_Tokenizer_OUTPUT_Summation", 20); //Same as above
+
+    //TODO: Need to check for SUMMATION vertex counters. Since all attempts are failed, counters are not getting populated.
+    //TaskCounter.REDUCE_INPUT_RECORDS
+
+    //Verify if the processor exception is given in diagnostics
+    assertTrue(dagInfo.getDiagnostics().contains("Failing this processor for some reason"));
+
+  }
+
+  /**
+   * Create sample file for wordcount program
+   *
+   * @param inputLoc
+   * @throws IOException
+   */
+  private static void createSampleFile(Path inputLoc) throws IOException {
+    fs.deleteOnExit(inputLoc);
+    FSDataOutputStream out = fs.create(inputLoc);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out));
+    for (int i = 0; i < 10; i++) {
+      writer.write("Sample " + RandomStringUtils.randomAlphanumeric(5));
+      writer.newLine();
+    }
+    writer.close();
+  }
+
+  private DagInfo getDagInfo(String dagId) throws TezException {
+    //Parse downloaded contents
+    File downloadedFile = new File(DOWNLOAD_DIR
+        + Path.SEPARATOR + dagId
+        + Path.SEPARATOR + dagId + ".zip");
+    ATSFileParser parser = new ATSFileParser(downloadedFile);
+    DagInfo dagInfo = parser.getDAGData(dagId);
+    assertTrue(dagInfo.getDagId().equals(dagId));
+    return dagInfo;
+  }
+
+  private void verifyCounter(Map<String, TezCounter> counterMap,
+      String counterGroupName, long expectedVal) {
+    //Iterate through group-->tezCounter
+    for (Map.Entry<String, TezCounter> entry : counterMap.entrySet()) {
+      if (counterGroupName != null) {
+        if (entry.getKey().equals(counterGroupName)) {
+          assertTrue(entry.getValue().getValue() == expectedVal);
+        }
+      } else {
+        assertTrue(entry.getValue().getValue() == expectedVal);
+      }
+    }
+  }
+
+  private String runWordCount(String tokenizerProcessor, String summationProcessor,
+      String dagName)
+      throws Exception {
+    dagNumber++;
+
+    //HDFS path
+    Path outputLoc = new Path("/tmp/outPath_" + System.currentTimeMillis());
+
+    DataSourceDescriptor dataSource = MRInput.createConfigBuilder(conf,
+        TextInputFormat.class, inputLoc.toString()).build();
+
+    DataSinkDescriptor dataSink =
+        MROutput.createConfigBuilder(conf, TextOutputFormat.class, outputLoc.toString()).build();
+
+    Vertex tokenizerVertex = Vertex.create(TOKENIZER, ProcessorDescriptor.create(
+        tokenizerProcessor)).addDataSource(INPUT, dataSource);
+
+    OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName()).build();
+
+    Vertex summationVertex = Vertex.create(SUMMATION,
+        ProcessorDescriptor.create(summationProcessor), 1).addDataSink(OUTPUT, dataSink);
+
+    // Create DAG and add the vertices. Connect the producer and consumer vertices via the edge
+    DAG dag = DAG.create(dagName);
+    dag.addVertex(tokenizerVertex).addVertex(summationVertex).addEdge(
+        Edge.create(tokenizerVertex, summationVertex, edgeConf.createDefaultEdgeProperty()));
+
+    DAGClient client = tezClient.submitDAG(dag);
+    client.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+    TezDAGID tezDAGID = TezDAGID.getInstance(tezClient.getAppMasterApplicationId(), dagNumber);
+
+    return tezDAGID.toString();
+  }
+
+  /**
+   * Processor which would just throw exception.
+   */
+  public static class FailProcessor extends SimpleMRProcessor {
+    public FailProcessor(ProcessorContext context) {
+      super(context);
+    }
+
+    @Override
+    public void run() throws Exception {
+      throw new Exception("Failing this processor for some reason");
+    }
+  }
+
+  private void verifyDagInfo(DagInfo dagInfo) {
+    VersionInfo versionInfo = dagInfo.getVersionInfo();
+    assertTrue(versionInfo != null); //should be present post 0.5.4
+    assertTrue(versionInfo.getVersion() != null);
+    assertTrue(versionInfo.getRevision() != null);
+    assertTrue(versionInfo.getBuildTime() != null);
+
+    assertTrue(dagInfo.getAbsStartTime() > 0);
+    assertTrue(dagInfo.getFinishTime() > 0);
+    assertTrue(dagInfo.getStartTime() == 0);
+    assertTrue(dagInfo.getAbsStartTime() > 0);
+    if (dagInfo.getStatus().equalsIgnoreCase(DAGState.SUCCEEDED.toString())) {
+      assertTrue(dagInfo.getAbsFinishTime() >= dagInfo.getAbsStartTime());
+    }
+    assertTrue(dagInfo.getFinishTime() > dagInfo.getStartTime());
+
+    assertTrue(dagInfo.getAbsStartTime() > dagInfo.getAbsoluteSubmitTime());
+    assertTrue(dagInfo.getTimeTaken() > 0);
+
+    //Verify all vertices
+    for (VertexInfo vertexInfo : dagInfo.getVertices()) {
+      verifyVertex(vertexInfo, vertexInfo.getFailedTasksCount() > 0);
+    }
+
+    VertexInfo fastestVertex = dagInfo.getFastestVertex();
+    assertTrue(fastestVertex != null);
+
+    if (dagInfo.getStatus().equals(DAGState.SUCCEEDED)) {
+      assertTrue(dagInfo.getSlowestVertex() != null);
+    }
+  }
+
+  private void verifyVertex(VertexInfo vertexInfo, boolean hasFailedTasks) {
+    assertTrue(vertexInfo != null);
+    if (hasFailedTasks) {
+      assertTrue(vertexInfo.getFailedTasksCount() > 0);
+    }
+    assertTrue(vertexInfo.getStartTime() > 0);
+    assertTrue(vertexInfo.getAbsStartTime() > 0);
+    assertTrue(vertexInfo.getFinishTime() > 0);
+    assertTrue(vertexInfo.getStartTime() < vertexInfo.getFinishTime());
+    assertTrue(vertexInfo.getVertexName() != null);
+    if (!hasFailedTasks) {
+      assertTrue(vertexInfo.getAbsFinishTime() > 0);
+      assertTrue(vertexInfo.getFailedTasks().size() == 0);
+      assertTrue(vertexInfo.getSucceededTasksCount() == vertexInfo.getSuccessfulTasks().size());
+      assertTrue(vertexInfo.getFailedTasksCount() == 0);
+      assertTrue(vertexInfo.getAvgTaskDuration() > 0);
+      assertTrue(vertexInfo.getMaxTaskDuration() > 0);
+      assertTrue(vertexInfo.getMinTaskDuration() > 0);
+      assertTrue(vertexInfo.getTimeTaken() > 0);
+      assertTrue(vertexInfo.getStatus().equalsIgnoreCase(VertexState.SUCCEEDED.toString()));
+      assertTrue(vertexInfo.getCompletedTasksCount() > 0);
+      assertTrue(vertexInfo.getFirstTaskToStart() != null);
+      assertTrue(vertexInfo.getSucceededTasksCount() > 0);
+      assertTrue(vertexInfo.getTasks().size() > 0);
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getTasks()) {
+      if (taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString())) {
+        verifyTask(taskInfo, false);
+      }
+    }
+
+    for (TaskInfo taskInfo : vertexInfo.getFailedTasks()) {
+      verifyTask(taskInfo, true);
+    }
+
+    assertTrue(vertexInfo.getProcessorClassName() != null);
+    assertTrue(vertexInfo.getStatus() != null);
+    assertTrue(vertexInfo.getDagInfo() != null);
+    assertTrue(vertexInfo.getInitTime() > 0);
+    assertTrue(vertexInfo.getNumTasks() > 0);
+  }
+
+  private void verifyTask(TaskInfo taskInfo, boolean hasFailedAttempts) {
+    assertTrue(taskInfo != null);
+    assertTrue(taskInfo.getStatus() != null);
+    assertTrue(taskInfo.getStartTime() > 0);
+
+    //Not testing for killed attempts. So if there are no failures, it should succeed
+    if (!hasFailedAttempts) {
+      assertTrue(taskInfo.getStatus().equals(TaskState.SUCCEEDED.toString()));
+      assertTrue(taskInfo.getFinishTime() > 0 && taskInfo.getAbsFinishTime() > taskInfo
+          .getFinishTime());
+      assertTrue(
+          taskInfo.getStartTime() > 0 && taskInfo.getAbsStartTime() > taskInfo.getStartTime());
+      assertTrue(taskInfo.getSuccessfulAttemptId() != null);
+      assertTrue(taskInfo.getSuccessfulTaskAttempt() != null);
+    }
+    assertTrue(taskInfo.getTaskId() != null);
+
+    for (TaskAttemptInfo attemptInfo : taskInfo.getTaskAttempts()) {
+      verifyTaskAttemptInfo(attemptInfo);
+    }
+  }
+
+  private void verifyTaskAttemptInfo(TaskAttemptInfo attemptInfo) {
+    if (attemptInfo.getStatus() != null && attemptInfo.getStatus()
+        .equals(TaskAttemptState.SUCCEEDED)) {
+      assertTrue(attemptInfo.getStartTime() > 0);
+      assertTrue(attemptInfo.getFinishTime() > 0);
+      assertTrue(attemptInfo.getAbsStartTime() > 0);
+      assertTrue(attemptInfo.getAbsFinishTime() > 0);
+      assertTrue(attemptInfo.getAbsFinishTime() > attemptInfo.getAbsStartTime());
+      assertTrue(attemptInfo.getAbsFinishTime() > attemptInfo.getFinishTime());
+      assertTrue(attemptInfo.getAbsStartTime() > attemptInfo.getStartTime());
+      assertTrue(attemptInfo.getNodeId() != null);
+      assertTrue(attemptInfo.getTimeTaken() != -1);
+      assertTrue(attemptInfo.getEvents() != null);
+      assertTrue(attemptInfo.getTezCounters() != null);
+      assertTrue(attemptInfo.getContainer() != null);
+    }
+    assertTrue(attemptInfo.getTaskInfo() != null);
+  }
+}