TEZ-3911: Optional min/max/avg aggr. task counters reported to HistoryLoggingService...
authorVineet Garg <vgarg@apache.org>
Wed, 9 May 2018 19:02:15 +0000 (12:02 -0700)
committerGopal V <gopalv@apache.org>
Wed, 9 May 2018 19:02:15 +0000 (12:02 -0700)
Signed-off-by: Gopal V <gopalv@apache.org>
16 files changed:
tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounterGroup.java
tez-api/src/main/java/org/apache/tez/common/counters/AbstractCounters.java
tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java [new file with mode: 0644]
tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java [new file with mode: 0644]
tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java [new file with mode: 0644]
tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java [new file with mode: 0644]
tez-api/src/main/java/org/apache/tez/common/counters/CounterGroupBase.java
tez-api/src/main/java/org/apache/tez/common/counters/FileSystemCounterGroup.java
tez-api/src/main/java/org/apache/tez/common/counters/FrameworkCounterGroup.java
tez-api/src/main/java/org/apache/tez/common/counters/TezCounter.java
tez-api/src/main/java/org/apache/tez/common/counters/TezCounters.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java

index 6e07849..47d536f 100644 (file)
@@ -107,6 +107,10 @@ public class ATSConstants {
   public static final String COUNTER_NAME = "counterName";
   public static final String COUNTER_DISPLAY_NAME = "counterDisplayName";
   public static final String COUNTER_VALUE = "counterValue";
+  public static final String COUNTER_MIN_VALUE = "counterMinValue";
+  public static final String COUNTER_MAX_VALUE = "counterMaxValue";
+  public static final String COUNTER_INSTANCE_COUNT = "counterInstanceCount";
+
 
   /* Url related */
   public static final String RESOURCE_URI_BASE = "/ws/v1/timeline";
index a4b153f..1d1b56d 100644 (file)
@@ -194,10 +194,15 @@ public abstract class AbstractCounterGroup<T extends TezCounter>
 
   @Override
   public void incrAllCounters(CounterGroupBase<T> rightGroup) {
+    aggrAllCounters(rightGroup);
+  }
+
+  @Override
+  public void aggrAllCounters(CounterGroupBase<T> rightGroup) {
     try {
       for (TezCounter right : rightGroup) {
         TezCounter left = findCounter(right.getName(), right.getDisplayName());
-        left.increment(right.getValue());
+        left.aggregate(right);
       }
     } catch (LimitExceededException e) {
       counters.clear();
index 470cb78..5910164 100644 (file)
@@ -354,13 +354,22 @@ public abstract class AbstractCounters<C extends TezCounter,
    * @param other the other Counters instance
    */
   public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
+    aggrAllCounters(other);
+  }
+
+  /**
+   * Increments multiple counters by their amounts in another Counters
+   * instance.
+   * @param other the other Counters instance
+   */
+  public synchronized void aggrAllCounters(AbstractCounters<C, G> other) {
     for(G right : other) {
       String groupName = right.getName();
       G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
       if (left == null) {
         left = addGroup(groupName, right.getDisplayName());
       }
-      left.incrAllCounters(right);
+      left.aggrAllCounters(right);
     }
   }
 
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateFrameworkCounter.java
new file mode 100644 (file)
index 0000000..aa7d446
--- /dev/null
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import org.apache.tez.common.counters.FrameworkCounterGroup.FrameworkCounter;
+
+@SuppressWarnings("rawtypes")
+public class AggregateFrameworkCounter<T extends Enum<T>> extends FrameworkCounter implements AggregateTezCounter  {
+  
+  private long min = Long.MAX_VALUE;
+  private long max = Long.MIN_VALUE;
+  private long count = 0;
+
+  @SuppressWarnings("unchecked")
+  public AggregateFrameworkCounter(Enum<T> ref, String groupName) {
+    super(ref, groupName);
+  }
+
+  @Override
+  public void increment(long incr) {
+    throw new IllegalArgumentException("Cannot increment an aggregate counter directly");
+  }
+  
+  @Override
+  public void aggregate(TezCounter other) {
+    final long val = other.getValue();
+    final long othermax;
+    final long othermin;
+    final long othercount;
+    if (other instanceof AggregateTezCounter) {
+      othermax = ((AggregateTezCounter) other).getMax();
+      othermin = ((AggregateTezCounter) other).getMin();
+      othercount = ((AggregateTezCounter) other).getCount();
+    } else {
+      othermin = othermax = val;
+      othercount = 1;
+    }
+    this.count += othercount;
+    super.increment(val);
+    if (this.min == Long.MAX_VALUE) {
+      this.min = othermin;
+      this.max = othermax;
+      return;
+    }
+    this.min = Math.min(this.min, othermin);
+    this.max = Math.max(this.max, othermax);
+  }
+
+  @Override
+  public long getMin() {
+    return min;
+  }
+
+  @Override
+  public long getMax() {
+    return max;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public FrameworkCounter<T> asFrameworkCounter() {
+    return ((FrameworkCounter<T>)this);
+  }
+
+  @Override
+  public long getCount() {
+    return count;
+  }
+
+}
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounter.java
new file mode 100644 (file)
index 0000000..bf711da
--- /dev/null
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+public interface AggregateTezCounter {
+
+  public abstract void aggregate(TezCounter other);
+
+  public abstract long getMin();
+
+  public abstract long getMax();
+  
+  public abstract long getCount();
+
+}
\ No newline at end of file
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounterDelegate.java
new file mode 100644 (file)
index 0000000..ae2ca7b
--- /dev/null
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class AggregateTezCounterDelegate<T extends TezCounter> extends AbstractCounter implements AggregateTezCounter {
+
+  private final T child;
+  private long min = Long.MAX_VALUE;
+  private long max = Long.MIN_VALUE;
+  private long count = 0;
+
+  public AggregateTezCounterDelegate(T child) {
+    this.child = child;
+  }
+  
+  @Override
+  public String getName() {
+    return child.getName(); // this is a pass-through
+  }
+
+  @Override
+  public String getDisplayName() {
+    return child.getDisplayName();
+  }
+
+  @Override
+  public long getValue() {
+    return child.getValue();
+  }
+
+  @Override
+  public void setValue(long value) {
+    this.child.setValue(value);
+  }
+
+  @Override
+  public void increment(long incr) {
+    throw new UnsupportedOperationException("Cannot increment an aggregate counter");
+  }
+  
+  /* (non-Javadoc)
+   * @see org.apache.tez.common.counters.AggregateTezCounter#aggregate(org.apache.tez.common.counters.TezCounter)
+   */
+  @Override
+  public void aggregate(TezCounter other) {
+    final long val = other.getValue();
+    final long othermax;
+    final long othermin;
+    final long othercount;
+    if (other instanceof AggregateTezCounter) {
+      othermax = ((AggregateTezCounter) other).getMax();
+      othermin = ((AggregateTezCounter) other).getMin();
+      othercount = ((AggregateTezCounter) other).getCount();
+    } else {
+      othermin = othermax = val;
+      othercount = 1;
+    }
+    this.count += othercount;
+    this.child.increment(val);
+    if (this.min == Long.MAX_VALUE) {
+      this.min = othermin;
+      this.max = othermax;
+      return;
+    }
+    this.min = Math.min(this.min, othermin);
+    this.max = Math.max(this.max, othermax);
+  }
+
+  @Override
+  public TezCounter getUnderlyingCounter() {
+    return this.child;
+  }
+
+  @Override
+  public void readFields(DataInput arg0) throws IOException {
+    throw new UnsupportedOperationException("Cannot deserialize an aggregate counter");
+  }
+
+  @Override
+  public void write(DataOutput arg0) throws IOException {
+    throw new UnsupportedOperationException("Cannot deserialize an aggregate counter");
+  }
+
+  @Override
+  public long getMin() {
+    return min;
+  }
+
+  @Override
+  public long getMax() {
+    return max;
+  }
+
+  @Override
+  public long getCount() {
+    return count;
+  }
+}
diff --git a/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java b/tez-api/src/main/java/org/apache/tez/common/counters/AggregateTezCounters.java
new file mode 100644 (file)
index 0000000..332c24a
--- /dev/null
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.counters;
+
+public class AggregateTezCounters extends TezCounters {
+  
+  private static final GroupFactory groupFactory = new GroupFactory();
+  
+  public AggregateTezCounters() {
+    super(groupFactory);
+  }
+  
+  // Mix framework group implementation into CounterGroup interface
+  private static class AggregateFrameworkGroupImpl<T extends Enum<T>>
+      extends FrameworkCounterGroup<T, TezCounter> implements CounterGroup {
+
+    AggregateFrameworkGroupImpl(Class<T> cls) {
+      super(cls);
+    }
+
+    @Override
+    protected FrameworkCounter<T> newCounter(T key) {
+      return (new AggregateFrameworkCounter<T>(key, getName()))
+          .asFrameworkCounter();
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  // Mix generic group implementation into CounterGroup interface
+  // and provide some mandatory group factory methods.
+  private static class AggregateGenericGroup extends AbstractCounterGroup<TezCounter>
+      implements CounterGroup {
+
+    AggregateGenericGroup(String name, String displayName, Limits limits) {
+      super(name, displayName, limits);
+    }
+
+    @Override
+    protected TezCounter newCounter(String name, String displayName, long value) {
+      return new AggregateTezCounterDelegate<GenericCounter>(new GenericCounter(name, displayName, value));
+    }
+
+    @Override
+    protected TezCounter newCounter() {
+      return new AggregateTezCounterDelegate<GenericCounter>(new GenericCounter());
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  // Mix file system group implementation into the CounterGroup interface
+  private static class AggregateFileSystemGroup extends FileSystemCounterGroup<TezCounter>
+      implements CounterGroup {
+
+    @Override
+    protected TezCounter newCounter(String scheme, FileSystemCounter key) {
+      return new AggregateTezCounterDelegate<FSCounter>(new FSCounter(scheme, key));
+    }
+
+    @Override
+    public CounterGroupBase<TezCounter> getUnderlyingGroup() {
+      return this;
+    }
+  }
+
+  /**
+   * Provide factory methods for counter group factory implementation.
+   * See also the GroupFactory in
+   *  {@link org.apache.hadoop.TezCounters.Counters mapred.Counters}
+   */
+  private static class GroupFactory
+      extends CounterGroupFactory<TezCounter, CounterGroup> {
+
+    @Override
+    protected <T extends Enum<T>>
+    FrameworkGroupFactory<CounterGroup>
+        newFrameworkGroupFactory(final Class<T> cls) {
+      return new FrameworkGroupFactory<CounterGroup>() {
+        @Override public CounterGroup newGroup(String name) {
+          return new AggregateFrameworkGroupImpl<T>(cls); // impl in this package
+        }
+      };
+    }
+
+    @Override
+    protected CounterGroup newGenericGroup(String name, String displayName,
+                                           Limits limits) {
+      return new AggregateGenericGroup(name, displayName, limits);
+    }
+
+    @Override
+    protected CounterGroup newFileSystemGroup() {
+      return new AggregateFileSystemGroup();
+    }
+  }
+}
index be4bf77..216d2f4 100644 (file)
@@ -97,8 +97,17 @@ public interface CounterGroupBase<T extends TezCounter>
    * Increment all counters by a group of counters
    * @param rightGroup  the group to be added to this group
    */
+  @Deprecated
   void incrAllCounters(CounterGroupBase<T> rightGroup);
   
+  /**
+   * Aggregate all counters by a group of counters
+   * @param rightGroup  the group to be added to this group
+   */
+  public default void aggrAllCounters(CounterGroupBase<T> rightGroup) {
+    incrAllCounters(rightGroup);
+  }
+
   @Private
   /**
    * Exposes the underlying group type if a facade.
index 5024154..3ea4acd 100644 (file)
@@ -225,12 +225,17 @@ public abstract class FileSystemCounterGroup<C extends TezCounter>
   }
 
   @Override
-  public void incrAllCounters(CounterGroupBase<C> other) {
+  public void incrAllCounters(CounterGroupBase<C> rightGroup) {
+    aggrAllCounters(rightGroup);
+  }
+
+  @Override
+  public void aggrAllCounters(CounterGroupBase<C> other) {
     if (checkNotNull(other.getUnderlyingGroup(), "other group")
         instanceof FileSystemCounterGroup<?>) {
       for (TezCounter counter : other) {
         FSCounter c = (FSCounter) ((TezCounter)counter).getUnderlyingCounter();
-        findCounter(c.scheme, c.key) .increment(counter.getValue());
+        findCounter(c.scheme, c.key) .aggregate(counter);
       }
     }
   }
index 3a4aa97..bcb6454 100644 (file)
@@ -190,14 +190,20 @@ public abstract class FrameworkCounterGroup<T extends Enum<T>,
     return n;
   }
 
+  @Override
+  @SuppressWarnings("deprecation")
+  public void incrAllCounters(CounterGroupBase<C> rightGroup) {
+    aggrAllCounters(rightGroup);
+  }
+
   @SuppressWarnings("rawtypes")
   @Override
-  public void incrAllCounters(CounterGroupBase<C> other) {
+  public void aggrAllCounters(CounterGroupBase<C> other) {
     if (checkNotNull(other, "other counter group")
         instanceof FrameworkCounterGroup<?, ?>) {
       for (TezCounter counter : other) {
         findCounter(((FrameworkCounter) counter).key.name())
-            .increment(counter.getValue());
+            .aggregate(counter);
       }
     }
   }
index 2b40ed2..4cb1ae9 100644 (file)
@@ -73,6 +73,14 @@ public interface TezCounter extends Writable {
    * @param incr the value to increase this counter by
    */
   void increment(long incr);
+
+  /**
+   * Aggregate this counter with another counter
+   * @param other TezCounter to aggregate with, by default this is incr(other.getValue())
+   */
+  public default void aggregate(TezCounter other) {
+    increment(other.getValue());
+  };
  
   /**
    * Return the underlying object if this is a facade.
index ca03f41..a1205b9 100644 (file)
@@ -128,7 +128,17 @@ public class TezCounters extends AbstractCounters<TezCounter, CounterGroup> {
    * Default constructor
    */
   public TezCounters() {
-    super(groupFactory);
+    this(groupFactory);
+  }
+
+  /**
+   * Construct the Counters object from the another counters object
+   * @param <C> the type of counter
+   * @param <G> the type of counter group
+   */
+  public <C extends TezCounter, G extends CounterGroupBase<C>> TezCounters(
+      CounterGroupFactory<TezCounter, CounterGroup> customGroupFactory) {
+    super(customGroupFactory);
   }
 
   /**
index ecd8d17..bd5e0ff 100644 (file)
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.counters.AggregateTezCounters;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -700,7 +701,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       updateCpuCounters();
       TezCounters counters = new TezCounters();
       counters.incrAllCounters(dagCounters);
-      return incrTaskCounters(counters, vertices.values());
+      return aggrTaskCounters(counters, vertices.values());
 
     } finally {
       readLock.unlock();
@@ -732,7 +733,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       updateCpuCounters();
       TezCounters counters = new TezCounters();
       counters.incrAllCounters(dagCounters);
-      return incrTaskCounters(counters, vertices.values());
+      return aggrTaskCounters(counters, vertices.values());
 
     } finally {
       readLock.unlock();
@@ -748,10 +749,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
     return false;
   }
 
-  public static TezCounters incrTaskCounters(
+  public static TezCounters aggrTaskCounters(
       TezCounters counters, Collection<Vertex> vertices) {
     for (Vertex vertex : vertices) {
-      counters.incrAllCounters(vertex.getAllCounters());
+      counters.aggrAllCounters(vertex.getAllCounters());
     }
     return counters;
   }
@@ -1399,7 +1400,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       updateCpuCounters();
       TezCounters counters = null;
       try {
-        counters = getAllCounters();
+        counters = constructFinalFullcounters();
       } catch (LimitExceededException e) {
         addDiagnostic("Counters limit exceeded: " + e.getMessage());
         finalState = DAGState.FAILED;
@@ -1868,17 +1869,18 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         // Already constructed. Just return.
         return;
       }
-      this.constructFinalFullcounters();
+      this.fullCounters = this.constructFinalFullcounters();
     }
   }
 
   @Private
-  public void constructFinalFullcounters() {
-    this.fullCounters = new TezCounters();
-    this.fullCounters.incrAllCounters(dagCounters);
+  public TezCounters constructFinalFullcounters() {
+    final AggregateTezCounters aggregateTezCounters = new AggregateTezCounters();
+    aggregateTezCounters.aggrAllCounters(dagCounters);
     for (Vertex v : this.vertices.values()) {
-      this.fullCounters.incrAllCounters(v.getAllCounters());
+      aggregateTezCounters.aggrAllCounters(v.getAllCounters());
     }
+    return aggregateTezCounters;
   }
 
   /**
index f3fc269..0184657 100644 (file)
@@ -63,6 +63,7 @@ import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.ReflectionUtils;
 import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.common.counters.AggregateTezCounters;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.common.io.NonSyncByteArrayOutputStream;
@@ -1197,8 +1198,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
 
       TezCounters counters = new TezCounters();
-      counters.incrAllCounters(this.counters);
-      return incrTaskCounters(counters, tasks.values());
+      counters.aggrAllCounters(this.counters);
+      return aggrTaskCounters(counters, tasks.values());
 
     } finally {
       readLock.unlock();
@@ -1226,8 +1227,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       }
 
       TezCounters counters = new TezCounters();
-      counters.incrAllCounters(this.counters);
-      cachedCounters = incrTaskCounters(counters, tasks.values());
+      counters.aggrAllCounters(this.counters);
+      cachedCounters = aggrTaskCounters(counters, tasks.values());
       return cachedCounters;
     } finally {
       readLock.unlock();
@@ -1236,7 +1237,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
 
   @Override
   public void addCounters(final TezCounters tezCounters) {
-    counters.incrAllCounters(tezCounters);
+    counters.aggrAllCounters(tezCounters);
   }
 
   @Override
@@ -1335,10 +1336,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     return false;
   }
 
-  public static TezCounters incrTaskCounters(
+  public static TezCounters aggrTaskCounters(
       TezCounters counters, Collection<Task> tasks) {
     for (Task task : tasks) {
-      counters.incrAllCounters(task.getCounters());
+      counters.aggrAllCounters(task.getCounters());
     }
     return counters;
   }
@@ -2057,7 +2058,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         || !recoveryData.isVertexSucceeded()) {
       logJobHistoryVertexCompletedHelper(VertexState.SUCCEEDED, finishTime,
           logSuccessDiagnostics ? StringUtils.join(getDiagnostics(), LINE_SEPARATOR) : "",
-          getAllCounters());
+          constructFinalFullcounters());
     }
   }
 
@@ -2066,7 +2067,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         || !recoveryData.isVertexFinished()) {
       TezCounters counters = null;
       try {
-        counters = getAllCounters();
+        counters = constructFinalFullcounters();
       } catch (LimitExceededException e) {
         // Ignore as failed vertex
         addDiagnostic("Counters limit exceeded: " + e.getMessage());
@@ -3325,7 +3326,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
         // Already constructed. Just return.
         return;
       }
-      this.constructFinalFullcounters();
+      this.fullCounters = this.constructFinalFullcounters();
     }
   }
 
@@ -3334,16 +3335,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
   @Private
-  public void constructFinalFullcounters() {
-    this.fullCounters = new TezCounters();
-    this.fullCounters.incrAllCounters(counters);
+  public TezCounters constructFinalFullcounters() {
+    AggregateTezCounters aggregateTezCounters = new AggregateTezCounters();
+    aggregateTezCounters.aggrAllCounters(counters);
     this.vertexStats = new VertexStats();
 
     for (Task t : this.tasks.values()) {
       vertexStats.updateStats(t.getReport());
       TezCounters counters = t.getCounters();
-      this.fullCounters.incrAllCounters(counters);
+      aggregateTezCounters.aggrAllCounters(counters);
     }
+    return aggregateTezCounters;
   }
 
   private static class RootInputInitFailedTransition implements
index dce9e52..b2622ad 100644 (file)
@@ -34,6 +34,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.tez.common.ATSConstants;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.VersionInfo;
+import org.apache.tez.common.counters.AggregateTezCounter;
+import org.apache.tez.common.counters.AggregateTezCounterDelegate;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -153,6 +155,15 @@ public class DAGUtils {
                     counter.getDisplayName());
             }
             counterMap.put(ATSConstants.COUNTER_VALUE, counter.getValue());
+            if (counter instanceof AggregateTezCounter) {
+              counterMap.put(ATSConstants.COUNTER_INSTANCE_COUNT,
+                  ((AggregateTezCounter)counter).getCount());
+              counterMap.put(ATSConstants.COUNTER_MAX_VALUE,
+                    ((AggregateTezCounter)counter).getMax());
+              counterMap.put(ATSConstants.COUNTER_MIN_VALUE,
+                  ((AggregateTezCounter)counter).getMin());
+
+            }
             counterList.add(counterMap);
           }
         }
index 6268912..859537b 100644 (file)
@@ -45,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.tez.common.counters.AggregateFrameworkCounter;
+import org.apache.tez.common.counters.AggregateTezCounterDelegate;
 import org.apache.tez.common.counters.CounterGroup;
 import org.apache.tez.common.counters.DAGCounter;
 import org.apache.tez.common.counters.TezCounters;
@@ -457,6 +459,88 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
   }
 
+  @Test
+  public void testCountersAggregation() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
+                                                null, false, false);
+    tezClient.start();
+
+    final String vAName = "A";
+    final String vBName = "B";
+    final String procCounterName = "Proc";
+    final String globalCounterName = "Global";
+    DAG dag = DAG.create("testCountersAggregation");
+    Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10);
+    Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 1);
+    dag.addVertex(vA)
+        .addVertex(vB)
+        .addEdge(
+            Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+                                                    DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+                                                    OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
+    TezCounters temp = new TezCounters();
+    temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutput out = new DataOutputStream(bos);
+    temp.write(out);
+    final byte[] payload = bos.toByteArray();
+
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    mockApp.countersDelegate = new CountersDelegate() {
+      int counterValue = 0;
+      @Override
+      public TezCounters getCounters(TaskSpec taskSpec) {
+        String vName = taskSpec.getVertexName();
+        TezCounters counters = new TezCounters();
+        final DataInputByteBuffer in  = new DataInputByteBuffer();
+        in.reset(ByteBuffer.wrap(payload));
+        try {
+          // this ensures that the serde code path is covered.
+          // the internal merges of counters covers the constructor code path.
+          counters.readFields(in);
+        } catch (IOException e) {
+          Assert.fail(e.getMessage());
+        }
+        counters.findCounter(vName, procCounterName).setValue(++counterValue);
+        for (OutputSpec output : taskSpec.getOutputs()) {
+          counters.findCounter(vName, output.getDestinationVertexName()).setValue(++counterValue);
+        }
+        for (InputSpec input : taskSpec.getInputs()) {
+          counters.findCounter(vName, input.getSourceVertexName()).setValue(++counterValue);
+        }
+        return counters;
+      }
+    };
+    mockApp.doSleep = false;
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockLauncher.startScheduling(true);
+    DAGStatus status = dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
+    TezCounters counters = dagImpl.getAllCounters();
+
+    // verify processor counters
+    VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName);
+    VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName);
+    TezCounters vACounters = vAImpl.getAllCounters();
+    TezCounters vBCounters = vBImpl.getAllCounters();
+
+    Assert.assertEquals(19, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMax());
+    Assert.assertEquals(1, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMin());
+    Assert.assertEquals(20, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMax());
+    Assert.assertEquals(2, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMin());
+
+    Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMin());
+    Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMax());
+    Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMin());
+    Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMax());
+
+    tezClient.stop();
+  }
 
   @Test (timeout = 10000)
   public void testBasicCounters() throws Exception {