TEZ-1696. Make Tez use the domain-based timeline ACLs. (hitesh)
authorHitesh Shah <hitesh@apache.org>
Fri, 12 Dec 2014 00:35:20 +0000 (16:35 -0800)
committerHitesh Shah <hitesh@apache.org>
Fri, 12 Dec 2014 00:35:20 +0000 (16:35 -0800)
30 files changed:
BUILDING.txt
CHANGES.txt
pom.xml
tez-api/src/main/java/org/apache/tez/client/TezClient.java
tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
tez-api/src/main/java/org/apache/tez/common/security/ACLConfigurationParser.java
tez-api/src/main/java/org/apache/tez/common/security/ACLManager.java
tez-api/src/main/java/org/apache/tez/common/security/ACLType.java
tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java [new file with mode: 0644]
tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
tez-dist/pom.xml
tez-plugins/pom.xml
tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history/pom.xml
tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/ATSHistoryLoggingService.java
tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryLoggingService.java
tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java [new file with mode: 0644]
tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java [new file with mode: 0644]
tez-tests/src/main/java/org/apache/tez/mapreduce/examples/TestOrderedWordCount.java
tez-tests/src/test/java/org/apache/tez/test/MiniTezCluster.java

index 0d7df9b..e27bedf 100644 (file)
@@ -65,20 +65,26 @@ Building against a specific version of hadoop:
 
 Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher.
 
-By default, it can be compiled against hadoop versions 2.4.0 and higher by just
+By default, it can be compiled against hadoop versions 2.6.0 and higher by just
 specifying the hadoop.version. For example, to build tez against hadoop 3.0.0-SNAPSHOT 
 
  $ mvn package -Dhadoop.version=3.0.0-SNAPSHOT
  
-However, to compile against hadoop versions lower than 2.4.0, the hadoop24 profile needs
-to be disabled
-
- $ mvn package  -Dhadoop.version=2.2.0 -P\!hadoop24
-
 To skip Tests and java docs
 
  $ mvn package -Dhadoop.version=3.0.0-SNAPSHOT -DskipTests -Dmaven.javadoc.skip=true
 
+However, to build against hadoop versions lower than 2.6.0, you will need to do the
+following:
+
+For Hadoop version X where 2.4.0 <= X < 2.6.0
+
+ $ mvn package  -Dhadoop.version=${X} -Phadoop24 -P\!hadoop26
+
+For Hadoop version X where X < 2.4.0
+
+ $ mvn package  -Dhadoop.version=${X} -P\!hadoop24 -P\!hadoop26
+
 ----------------------------------------------------------------------------------
 Protocol Buffer compiler:
 
@@ -94,7 +100,6 @@ You can also specify the path to protoc while building using -Dprotoc.path
 
  $ mvn package -DskipTests -Dprotoc.path=/usr/local/bin/protoc
 
-
 ----------------------------------------------------------------------------------
 Building the docs:
 
index 4f19ba0..52507f8 100644 (file)
@@ -6,6 +6,7 @@ Release 0.6.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1696. Make Tez use the domain-based timeline ACLs.
   TEZ-1835. TestFaultTolerance#testRandomFailingTasks is timing out
   TEZ-1832. TestSecureShuffle fails with NoClassDefFoundError: org/bouncycastle/x509/X509V1CertificateGenerator
   TEZ-1672. Update jetty to use stable 7.x version - 7.6.16.v20140903.
diff --git a/pom.xml b/pom.xml
index 8d1e037..fa50b17 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
   <properties>
     <maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
     <clover.license>${user.home}/clover.license</clover.license>
-    <hadoop.version>2.4.0</hadoop.version>
+    <hadoop.version>2.6.0</hadoop.version>
     <jetty.version>7.6.16.v20140903</jetty.version>
     <distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
     <distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
       </dependency>
       <dependency>
         <groupId>org.apache.tez</groupId>
+        <artifactId>tez-tests</artifactId>
+        <version>${project.version}</version>
+        <type>test-jar</type>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.tez</groupId>
         <artifactId>tez-dag</artifactId>
         <version>${project.version}</version>
       </dependency>
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>com.sun.jersey</groupId>
+        <artifactId>jersey-json</artifactId>
+        <version>1.9</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-hdfs</artifactId>
         <type>test-jar</type>
index fc70b48..ab6e12d 100644 (file)
@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DAGSubmissionTimedOut;
@@ -48,6 +50,7 @@ import org.apache.tez.dag.api.SessionNotRunning;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolBlockingPB;
 import org.apache.tez.dag.api.client.rpc.DAGClientAMProtocolRPC.GetAMStatusRequestProto;
@@ -107,9 +110,14 @@ public class TezClient {
       new JobTokenSecretManager();
   private Map<String, LocalResource> additionalLocalResources = Maps.newHashMap();
   private TezApiVersionInfo apiVersionInfo;
+  private HistoryACLPolicyManager historyACLPolicyManager;
 
   private int preWarmDAGCounter = 0;
 
+  private static final String atsHistoryLoggingServiceClassName =
+      "org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService";
+  private static final String atsHistoryACLManagerClassName =
+      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
 
   private TezClient(String name, TezConfiguration tezConf) {
     this(name, tezConf, tezConf.getBoolean(
@@ -139,6 +147,7 @@ public class TezClient {
     tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, isSession);
     this.amConfig = new AMConfiguration(tezConf, localResources, credentials);
     this.apiVersionInfo = new TezApiVersionInfo();
+
     LOG.info("Tez Client Version: " + apiVersionInfo.toString());
   }
 
@@ -288,7 +297,27 @@ public class TezClient {
 
     frameworkClient = createFrameworkClient();
     frameworkClient.init(amConfig.getTezConfiguration(), amConfig.getYarnConfiguration());
-    frameworkClient.start();    
+    frameworkClient.start();
+
+    if (this.amConfig.getTezConfiguration().get(
+        TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS, "")
+        .equals(atsHistoryLoggingServiceClassName)) {
+      LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+      try {
+        historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+            atsHistoryACLManagerClassName);
+        historyACLPolicyManager.setConf(this.amConfig.getYarnConfiguration());
+      } catch (TezUncheckedException e) {
+        LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+            + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+        if (!amConfig.getTezConfiguration().getBoolean(
+            TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+            TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+          throw e;
+        }
+        historyACLPolicyManager = null;
+      }
+    }
 
     if (isSession) {
       LOG.info("Session mode. Starting session.");
@@ -314,7 +343,8 @@ public class TezClient {
             TezClientUtils.createApplicationSubmissionContext(
                 sessionAppId,
                 null, clientName, amConfig,
-                tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo);
+                tezJarResources, sessionCredentials, usingTezArchiveDeploy, apiVersionInfo,
+                historyACLPolicyManager);
   
         // Set Tez Sessions to not retry on AM crashes if recovery is disabled
         if (!amConfig.getTezConfiguration().getBoolean(
@@ -373,10 +403,16 @@ public class TezClient {
             + lr.getType() + " is not supported, only " + LocalResourceType.FILE + " is supported");
       }
     }
-    
+
+    Map<String, String> aclConfigs = null;
+    if (historyACLPolicyManager != null) {
+      aclConfigs = historyACLPolicyManager.setupSessionDAGACLs(
+          amConfig.getTezConfiguration(), sessionAppId, dag.getName(), dag.getDagAccessControls());
+    }
+
     Map<String, LocalResource> tezJarResources = getTezJarResources(sessionCredentials);
     DAGPlan dagPlan = TezClientUtils.prepareAndCreateDAGPlan(dag, amConfig, tezJarResources,
-        usingTezArchiveDeploy, sessionCredentials);
+        usingTezArchiveDeploy, sessionCredentials, aclConfigs);
 
     SubmitDAGRequestProto.Builder requestBuilder = SubmitDAGRequestProto.newBuilder();
     requestBuilder.setDAGPlan(dagPlan).build();
@@ -690,7 +726,7 @@ public class TezClient {
       ApplicationSubmissionContext appContext = TezClientUtils
           .createApplicationSubmissionContext( 
               appId, dag, dag.getName(), amConfig, tezJarResources, credentials,
-              usingTezArchiveDeploy, apiVersionInfo);
+              usingTezArchiveDeploy, apiVersionInfo, historyACLPolicyManager);
       LOG.info("Submitting DAG to YARN"
           + ", applicationId=" + appId
           + ", dagName=" + dag.getName());
index de4bdd0..f5f8548 100644 (file)
@@ -78,6 +78,7 @@ import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezYARNUtils;
 import org.apache.tez.common.VersionInfo;
 import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.JobTokenSecretManager;
 import org.apache.tez.common.security.TokenCache;
@@ -398,6 +399,7 @@ public class TezClientUtils {
    * @param amConfig AM Configuration
    * @param tezJarResources Resources to be used by the AM
    * @param sessionCreds the credential object which will be populated with session specific
+   * @param historyACLPolicyManager
    * @return an ApplicationSubmissionContext to launch a Tez AM
    * @throws IOException
    * @throws YarnException
@@ -406,7 +408,8 @@ public class TezClientUtils {
       ApplicationId appId, DAG dag, String amName,
       AMConfiguration amConfig, Map<String, LocalResource> tezJarResources,
       Credentials sessionCreds, boolean tezLrsAsArchive,
-      TezApiVersionInfo apiVersionInfo) throws IOException, YarnException{
+      TezApiVersionInfo apiVersionInfo, HistoryACLPolicyManager historyACLPolicyManager)
+      throws IOException, YarnException {
 
     Preconditions.checkNotNull(sessionCreds);
     TezConfiguration conf = amConfig.getTezConfiguration();
@@ -512,8 +515,24 @@ public class TezClientUtils {
     }
     amLocalResources.putAll(tezJarResources);
 
+    // Setup Session ACLs and update conf as needed
+    Map<String, String> aclConfigs = null;
+    if (historyACLPolicyManager != null) {
+      if (dag == null) {
+        aclConfigs = historyACLPolicyManager.setupSessionACLs(amConfig.getTezConfiguration(),
+            appId);
+      } else {
+        // Non-session mode
+        // As only a single DAG is support, we should combine AM and DAG ACLs under the same
+        // acl management layer
+        aclConfigs = historyACLPolicyManager.setupNonSessionACLs(amConfig.getTezConfiguration(),
+            appId, dag.getDagAccessControls());
+      }
+    }
+
     // emit conf as PB file
-    ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration());
+    ConfigurationProto finalConfProto = createFinalConfProtoForApp(amConfig.getTezConfiguration(),
+        aclConfigs);
     
     FSDataOutputStream amConfPBOutBinaryStream = null;
     try {
@@ -635,9 +654,17 @@ public class TezClientUtils {
   static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
       Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
       Credentials credentials) throws IOException {
-    Credentials dagCredentials = setupDAGCredentials(dag, credentials, amConfig.getTezConfiguration());
+    return prepareAndCreateDAGPlan(dag, amConfig, tezJarResources, tezLrsAsArchive, credentials,
+        null);
+  }
+
+  static DAGPlan prepareAndCreateDAGPlan(DAG dag, AMConfiguration amConfig,
+      Map<String, LocalResource> tezJarResources, boolean tezLrsAsArchive,
+      Credentials credentials, Map<String, String> additionalDAGConfigs) throws IOException {
+    Credentials dagCredentials = setupDAGCredentials(dag, credentials,
+        amConfig.getTezConfiguration());
     return dag.createDag(amConfig.getTezConfiguration(), dagCredentials, tezJarResources,
-        amConfig.getBinaryConfLR(), tezLrsAsArchive);
+        amConfig.getBinaryConfLR(), tezLrsAsArchive, additionalDAGConfigs);
   }
   
   static void maybeAddDefaultLoggingJavaOpts(String logLevel, List<String> vargs) {
@@ -699,6 +726,11 @@ public class TezClientUtils {
   }
 
   static ConfigurationProto createFinalConfProtoForApp(Configuration amConf) {
+    return createFinalConfProtoForApp(amConf, null);
+  }
+
+  static ConfigurationProto createFinalConfProtoForApp(Configuration amConf,
+    Map<String, String> additionalConfigs) {
     assert amConf != null;
     ConfigurationProto.Builder builder = ConfigurationProto.newBuilder();
     for (Entry<String, String> entry : amConf) {
@@ -707,9 +739,18 @@ public class TezClientUtils {
       kvp.setValue(entry.getValue());
       builder.addConfKeyValues(kvp);
     }
+    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
+      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        builder.addConfKeyValues(kvp);
+      }
+    }
     return builder.build();
   }
 
+
   /**
    * Helper function to create a YARN LocalResource
    * @param fs FileSystem object
index ff35af8..e6c4101 100644 (file)
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -128,4 +129,24 @@ public class ACLConfigurationParser {
     return Collections.unmodifiableMap(allowedGroups);
   }
 
+  public void addAllowedUsers(Map<ACLType, Set<String>> additionalAllowedUsers) {
+    for (Entry<ACLType, Set<String>> entry : additionalAllowedUsers.entrySet()) {
+      if (allowedUsers.containsKey(entry.getKey())) {
+        allowedUsers.get(entry.getKey()).addAll(entry.getValue());
+      } else {
+        allowedUsers.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
+  public void addAllowedGroups(Map<ACLType, Set<String>> additionalAllowedGroups) {
+    for (Entry<ACLType, Set<String>> entry : additionalAllowedGroups.entrySet()) {
+      if (allowedGroups.containsKey(entry.getKey())) {
+        allowedGroups.get(entry.getKey()).addAll(entry.getValue());
+      } else {
+        allowedGroups.put(entry.getKey(), entry.getValue());
+      }
+    }
+  }
+
 }
index f83678b..c6a8f26 100644 (file)
@@ -44,7 +44,7 @@ import com.google.common.annotations.VisibleForTesting;
 public class ACLManager {
 
   private static final Log LOG = LogFactory.getLog(ACLManager.class);
-  static final String WILDCARD_ACL_VALUE = "*";
+  public static final String WILDCARD_ACL_VALUE = "*";
 
   private final String dagUser;
   private final String amUser;
@@ -218,7 +218,7 @@ public class ACLManager {
     return acls;
   }
 
-  static String toCommaSeparatedString(Collection<String> collection) {
+  public static String toCommaSeparatedString(Collection<String> collection) {
     StringBuilder sb = new StringBuilder();
     boolean first = true;
     for (String s : collection) {
index 854e928..fd00f22 100644 (file)
@@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
  * ACL Types
  */
 @Private
-enum ACLType {
+public enum ACLType {
   /** View permissions on the Application Master */
   AM_VIEW_ACL,
   /** Modify permissions on the Application Master */
diff --git a/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java b/tez-api/src/main/java/org/apache/tez/common/security/HistoryACLPolicyManager.java
new file mode 100644 (file)
index 0000000..a3b62ec
--- /dev/null
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.security;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * ACL Policy Manager
+ * An instance of this implements any ACL related activity when starting a session or
+ * submitting a DAG
+ */
+@Unstable
+@Private
+public interface HistoryACLPolicyManager extends Configurable {
+
+  /**
+   * Take any necessary steps for setting up Session ACLs
+   * @param conf Configuration
+   * @param applicationId Application ID for the session
+   * @throws Exception
+   */
+  public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
+      throws IOException;
+
+  /**
+   * Take any necessary steps for setting up ACLs for an AM which is running in non-session mode
+   * @param conf Configuration
+   * @param applicationId Application ID for the AM
+   * @param dagAccessControls ACLs defined for the DAG being submitted
+   * @throws Exception
+   */
+  public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId applicationId,
+      DAGAccessControls dagAccessControls) throws IOException;
+
+  /**
+   * Take any necessary steps for setting up ACLs for a DAG that is submitted to a Session
+   * @param conf Configuration
+   * @param applicationId Application ID for the AM
+   * @param dagAccessControls ACLs defined for the DAG being submitted
+   * @throws Exception
+   */
+  public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
+      String dagName, DAGAccessControls dagAccessControls) throws IOException;
+
+
+  public void updateTimelineEntityDomain(Object timelineEntity, String domainId);
+
+
+}
index 91b468d..b4fdbd2 100644 (file)
@@ -84,6 +84,7 @@ public class DAG {
   Credentials credentials = new Credentials();
   Set<VertexGroup> vertexGroups = Sets.newHashSet();
   Set<GroupInputEdge> groupInputEdges = Sets.newHashSet();
+
   private DAGAccessControls dagAccessControls;
   Map<String, LocalResource> commonTaskLocalFiles = Maps.newHashMap();
   String dagInfo;
@@ -198,6 +199,11 @@ public class DAG {
     return this;
   }
 
+  @Private
+  public synchronized DAGAccessControls getDagAccessControls() {
+    return dagAccessControls;
+  }
+
   /**
    * One of the methods that can be used to provide information about required
    * Credentials when running on a secure cluster. A combination of this and
@@ -606,10 +612,18 @@ public class DAG {
   }
 
   // create protobuf message describing DAG
+  public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
+                           Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
+                           boolean tezLrsAsArchive) {
+    return createDag(dagConf, extraCredentials, tezJarResources, binaryConfig, tezLrsAsArchive,
+        null);
+  }
+
+  // create protobuf message describing DAG
   @Private
   public DAGPlan createDag(Configuration dagConf, Credentials extraCredentials,
       Map<String, LocalResource> tezJarResources, LocalResource binaryConfig,
-      boolean tezLrsAsArchive) {
+      boolean tezLrsAsArchive, Map<String, String> additionalConfigs) {
     verify(true);
 
     DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
@@ -829,6 +843,14 @@ public class DAG {
         confProtoBuilder.addConfKeyValues(kvp);
       }
     }
+    if (additionalConfigs != null && !additionalConfigs.isEmpty()) {
+      for (Entry<String, String> entry : additionalConfigs.entrySet()) {
+        PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
+        kvp.setKey(entry.getKey());
+        kvp.setValue(entry.getValue());
+        confProtoBuilder.addConfKeyValues(kvp);
+      }
+    }
     dagBuilder.setDagKeyValues(confProtoBuilder); // This does not seem to be used anywhere
     // should this replace BINARY_PB_CONF???
 
index 06c7008..9f717a7 100644 (file)
@@ -791,6 +791,16 @@ public class TezConfiguration extends Configuration {
       + "yarn.ats.max.polling.time.per.event.millis";
   public static final int YARN_ATS_MAX_POLLING_TIME_PER_EVENT_DEFAULT = 10;
 
+
+  public static final String YARN_ATS_ACL_DOMAINS_AUTO_CREATE = TEZ_PREFIX
+      + "yarn.ats.acl.domains.auto-create";
+  public static final boolean YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT = true;
+
+  public static final String YARN_ATS_ACL_SESSION_DOMAIN_ID = TEZ_PREFIX
+      + "yarn.ats.acl.session.domain.id";
+  public static final String YARN_ATS_ACL_DAG_DOMAIN_ID = TEZ_PREFIX
+      + "yarn.ats.acl.dag.domain.id";
+
   /**
    * Boolean value. Enable recovery of DAGs. This allows a restarted app master to recover the 
    * incomplete DAGs from the previous instance of the app master.
@@ -900,6 +910,12 @@ public class TezConfiguration extends Configuration {
       + "disable.client-version-check";
   public static final boolean TEZ_AM_DISABLE_CLIENT_VERSION_CHECK_DEFAULT = false;
 
-
+  /**
+   * Boolean value.
+   * Allow disabling of Timeline Domains even if Timeline is being used.
+   */
+  public static final String TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS = TEZ_PREFIX
+      + "allow.disabled.timeline-domains";
+  public static final boolean TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT = false;
 
 }
index 789de24..35483a6 100644 (file)
@@ -1922,7 +1922,7 @@ public class DAGAppMaster extends AbstractService {
     // for an app later
     DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
         submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
-        newDAG.getUserName());
+        newDAG.getUserName(), newDAG.getConf());
     try {
       historyEventHandler.handleCriticalEvent(
           new DAGHistoryEvent(newDAG.getID(), submittedEvent));
index 7f0fab3..ab122c8 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -55,13 +56,15 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
   private String user;
   private Map<String, LocalResource> cumulativeAdditionalLocalResources;
 
+  private Configuration conf;
+
   public DAGSubmittedEvent() {
   }
 
   public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
       DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
       Map<String, LocalResource> cumulativeAdditionalLocalResources,
-      String user) {
+      String user, Configuration conf) {
     this.dagID = dagID;
     this.dagName = dagPlan.getName();
     this.submitTime = submitTime;
@@ -69,6 +72,7 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     this.applicationAttemptId = applicationAttemptId;
     this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
     this.user = user;
+    this.conf = conf;
   }
 
   @Override
@@ -183,4 +187,8 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
     return user;
   }
 
+  public Configuration getConf() {
+    return conf;
+  }
+
 }
index cd770a3..3b6c3ef 100644 (file)
@@ -167,7 +167,7 @@ public class TestHistoryEventsProtoConversion {
         ApplicationId.newInstance(0, 1), 1), 1001l,
         DAGPlan.newBuilder().setName("foo").build(),
         ApplicationAttemptId.newInstance(
-            ApplicationId.newInstance(0, 1), 1), null, "");
+            ApplicationId.newInstance(0, 1), 1), null, "", null);
     DAGSubmittedEvent deserializedEvent = (DAGSubmittedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getApplicationAttemptId(),
index e0f8c21..db8add7 100644 (file)
@@ -116,7 +116,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user);
+              null, user, null);
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
index 85738b8..955e5dd 100644 (file)
@@ -54,7 +54,7 @@
     <profile>
       <id>hadoop24</id>
       <activation>
-         <activeByDefault>true</activeByDefault>
+        <activeByDefault>false</activeByDefault>
       </activation>
       <dependencies>
         <dependency>
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>hadoop26</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
   </profiles>
 
   <build>
index b6ce081..1b0ebbe 100644 (file)
     <profile>
       <id>hadoop24</id>
       <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <modules>
+        <module>tez-yarn-timeline-history</module>
+      </modules>
+    </profile>
+    <profile>
+      <id>hadoop26</id>
+      <activation>
         <property>
           <name>!skipATS</name>
         </property>
       </activation>
       <modules>
-        <module>tez-yarn-timeline-history</module>
+        <module>tez-yarn-timeline-history-with-acls</module>
       </modules>
     </profile>
     <profile>
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml b/tez-plugins/tez-yarn-timeline-history-with-acls/pom.xml
new file mode 100644 (file)
index 0000000..f277d97
--- /dev/null
@@ -0,0 +1,139 @@
+<!--
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.tez</groupId>
+    <artifactId>tez-plugins</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>tez-yarn-timeline-history-with-acls</artifactId>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-dag</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
+
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/ats/acls/ATSHistoryACLPolicyManager.java
new file mode 100644 (file)
index 0000000..e11643f
--- /dev/null
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats.acls;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.tez.common.security.ACLConfigurationParser;
+import org.apache.tez.common.security.ACLManager;
+import org.apache.tez.common.security.ACLType;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class ATSHistoryACLPolicyManager implements HistoryACLPolicyManager {
+
+  private final static Log LOG = LogFactory.getLog(ATSHistoryACLPolicyManager.class);
+
+  TimelineClient timelineClient;
+  Configuration conf;
+  String user;
+  final static String DOMAIN_ID_PREFIX = "Tez_ATS_";
+
+  private void initializeTimelineClient() {
+    if (this.conf == null) {
+      throw new TezUncheckedException("ATSACLManager not configured");
+    }
+    if (timelineClient != null) {
+      this.timelineClient.stop();
+      this.timelineClient = null;
+    }
+    this.timelineClient = TimelineClient.createTimelineClient();
+    this.timelineClient.init(this.conf);
+    this.timelineClient.start();
+    try {
+      this.user = UserGroupInformation.getCurrentUser().getShortUserName();
+    } catch (IOException e) {
+      throw new TezUncheckedException("Unable to get Current User UGI", e);
+    }
+  }
+
+  private String getMergedViewACLs(ACLConfigurationParser parser,
+      DAGAccessControls dagAccessControls) {
+    Map<ACLType, Set<String>> allowedUsers = parser.getAllowedUsers();
+    Map<ACLType, Set<String>> allowedGroups = parser.getAllowedGroups();
+
+    Set<String> viewUsers = new HashSet<String>();
+    viewUsers.add(user);
+    if (allowedUsers.containsKey(ACLType.AM_VIEW_ACL)) {
+      viewUsers.addAll(allowedUsers.get(ACLType.AM_VIEW_ACL));
+    }
+    if (dagAccessControls != null && dagAccessControls.getUsersWithViewACLs() != null) {
+      viewUsers.addAll(dagAccessControls.getUsersWithViewACLs());
+    }
+
+    if (viewUsers.contains(ACLManager.WILDCARD_ACL_VALUE)) {
+      return ACLManager.WILDCARD_ACL_VALUE;
+    }
+
+    Set<String> viewGroups = new HashSet<String>();
+    if (allowedGroups.containsKey(ACLType.AM_VIEW_ACL)) {
+      viewGroups.addAll(allowedGroups.get(ACLType.AM_VIEW_ACL));
+    }
+    if (dagAccessControls != null && dagAccessControls.getGroupsWithViewACLs() != null) {
+      viewGroups.addAll(dagAccessControls.getGroupsWithViewACLs());
+    }
+
+    return ACLManager.toCommaSeparatedString(viewUsers) + " " +
+        ACLManager.toCommaSeparatedString(viewGroups);
+  }
+
+  private void createTimelineDomain(String domainId, Configuration tezConf,
+      DAGAccessControls dagAccessControls) throws IOException {
+    TimelineDomain timelineDomain = new TimelineDomain();
+    timelineDomain.setId(domainId);
+
+    ACLConfigurationParser parser = new ACLConfigurationParser(tezConf, false);
+    timelineDomain.setReaders(getMergedViewACLs(parser, dagAccessControls));
+    timelineDomain.setWriters(user);
+
+    try {
+      timelineClient.putDomain(timelineDomain);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  private Map<String, String> createSessionDomain(Configuration tezConf,
+      ApplicationId applicationId, DAGAccessControls dagAccessControls)
+      throws IOException {
+    String domainId =
+        tezConf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+    if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
+        TezConfiguration.TEZ_AM_ACLS_ENABLED_DEFAULT)) {
+      if (domainId != null) {
+        throw new TezUncheckedException("ACLs disabled but DomainId is specified"
+            + ", aclsEnabled=true, domainId=" + domainId);
+      }
+      return null;
+    }
+
+    boolean autoCreateDomain = tezConf.getBoolean(TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE,
+        TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT);
+
+    if (domainId != null) {
+      // do nothing
+      LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
+      return null;
+    } else {
+      if (!autoCreateDomain) {
+        // Error - Cannot fallback to default as that leaves ACLs open
+        throw new TezUncheckedException("Timeline DomainId is not specified and auto-create"
+            + " Domains is disabled");
+      }
+      domainId = DOMAIN_ID_PREFIX + applicationId.toString();
+      createTimelineDomain(domainId, tezConf, dagAccessControls);
+      LOG.info("Created Timeline Domain for History ACLs, domainId=" + domainId);
+      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID, domainId);
+    }
+  }
+
+  private Map<String, String> createDAGDomain(Configuration tezConf,
+      ApplicationId applicationId, String dagName, DAGAccessControls dagAccessControls)
+      throws IOException {
+    if (dagAccessControls == null) {
+      // No DAG specific ACLs
+      return null;
+    }
+
+    String domainId =
+        tezConf.get(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+    if (!tezConf.getBoolean(TezConfiguration.TEZ_AM_ACLS_ENABLED,
+        TezConfiguration.TEZ_AM_ACLS_ENABLED_DEFAULT)) {
+      if (domainId != null) {
+        throw new TezUncheckedException("ACLs disabled but domainId for DAG is specified"
+            + ", aclsEnabled=true, domainId=" + domainId);
+      }
+      return null;
+    }
+
+    boolean autoCreateDomain = tezConf.getBoolean(TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE,
+        TezConfiguration.YARN_ATS_ACL_DOMAINS_AUTO_CREATE_DEFAULT);
+
+    if (domainId != null) {
+      // do nothing
+      LOG.info("Using specified domainId with Timeline, domainId=" + domainId);
+      return null;
+    } else {
+      if (!autoCreateDomain) {
+        // Error - Cannot fallback to default as that leaves ACLs open
+        throw new TezUncheckedException("Timeline DomainId is not specified and auto-create"
+            + " Domains is disabled");
+      }
+
+      domainId = DOMAIN_ID_PREFIX + applicationId.toString() + "_" + dagName;
+      createTimelineDomain(domainId, tezConf, dagAccessControls);
+      LOG.info("Created Timeline Domain for DAG-specific History ACLs, domainId=" + domainId);
+      return Collections.singletonMap(TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID, domainId);
+    }
+  }
+
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    initializeTimelineClient();
+  }
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public Map<String, String> setupSessionACLs(Configuration conf, ApplicationId applicationId)
+      throws IOException {
+    return createSessionDomain(conf, applicationId, null);
+  }
+
+  @Override
+  public Map<String, String> setupNonSessionACLs(Configuration conf, ApplicationId applicationId,
+      DAGAccessControls dagAccessControls) throws IOException {
+    return createSessionDomain(conf, applicationId, dagAccessControls);
+  }
+
+  @Override
+  public Map<String, String> setupSessionDAGACLs(Configuration conf, ApplicationId applicationId,
+      String dagName, DAGAccessControls dagAccessControls) throws IOException {
+    return createDAGDomain(conf, applicationId, dagName, dagAccessControls);
+  }
+
+  @Override
+  public void updateTimelineEntityDomain(Object timelineEntity, String domainId) {
+    if (!(timelineEntity instanceof TimelineEntity)) {
+      throw new UnsupportedOperationException("Invalid object provided of type"
+          + timelineEntity.getClass().getName());
+    }
+    TimelineEntity entity = (TimelineEntity) timelineEntity;
+    entity.setDomainId(domainId);
+  }
+
+}
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats b/tez-plugins/tez-yarn-timeline-history-with-acls/src/main/java/org/apache/tez/dag/history/logging/ats
new file mode 100644 (file)
index 0000000..9778443
--- /dev/null
@@ -0,0 +1 @@
+../../../../../../../../../../tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats
\ No newline at end of file
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/dag/history/ats/acls/TestATSHistoryWithACLs.java
new file mode 100644 (file)
index 0000000..11e726a
--- /dev/null
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.ats.acls;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.common.security.DAGAccessControls;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.history.logging.ats.ATSHistoryLoggingService;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithACLs {
+
+  private static final Log LOG = LogFactory.getLog(TestATSHistoryWithACLs.class);
+
+  protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+  protected static MiniDFSCluster dfsCluster = null;
+  private static String timelineAddress;
+  private Random random = new Random();
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestATSHistoryWithACLs.class.getName() + "-tmpDir";
+
+  private static String user;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+          .build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (mrrTezCluster == null) {
+      try {
+        mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithACLs.class.getName(),
+            1, 1, 1, true);
+        Configuration conf = new Configuration();
+        conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+        conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+        conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+        mrrTezCluster.init(conf);
+        mrrTezCluster.start();
+      } catch (Throwable e) {
+        LOG.info("Failed to start Mini Tez Cluster", e);
+      }
+    }
+    user = UserGroupInformation.getCurrentUser().getShortUserName();
+    timelineAddress = mrrTezCluster.getConfig().get(
+        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+    if (timelineAddress != null) {
+      // Hack to handle bug in MiniYARNCluster handling of webapp address
+      timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    LOG.info("Shutdown invoked");
+    Thread.sleep(10000);
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  // To be replaced after Timeline has java APIs for domains
+  private <K> K getTimelineData(String url, Class<K> clazz) {
+    Client client = new Client();
+    WebResource resource = client.resource(url);
+
+    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    assertEquals(200, response.getStatus());
+    assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+    K entity = response.getEntity(clazz);
+    assertNotNull(entity);
+    return entity;
+  }
+
+  private TimelineDomain getDomain(String domainId) {
+    assertNotNull(timelineAddress);
+    String url = "http://" + timelineAddress + "/ws/v1/timeline/domain/" + domainId;
+    LOG.info("Getting timeline domain: " + url);
+    TimelineDomain domain = getTimelineData(url, TimelineDomain.class);
+    assertNotNull(domain);
+    assertNotNull(domain.getOwner());
+    assertNotNull(domain.getReaders());
+    assertNotNull(domain.getWriters());
+    LOG.info("TimelineDomain for id " + domainId
+        + ", owner=" + domain.getOwner()
+        + ", readers=" + domain.getReaders()
+        + ", writers=" + domain.getWriters());
+    return domain;
+  }
+
+  private void verifyDomainACLs(TimelineDomain timelineDomain,
+    Collection<String> users, Collection<String> groups) {
+    String readers = timelineDomain.getReaders();
+    int pos = readers.indexOf(" ");
+    String readerUsers = readers.substring(0, pos);
+    String readerGroups = readers.substring(pos+1);
+
+    assertTrue(readerUsers.contains(user));
+    for (String s : users) {
+      assertTrue(readerUsers.contains(s));
+    }
+    for (String s : groups) {
+      assertTrue(readerGroups.contains(s));
+    }
+
+    if (!user.equals("nobody1") && !users.contains("nobody1")) {
+      assertFalse(readerUsers.contains("nobody1"));
+    }
+
+  }
+
+  @Test (timeout=50000)
+  public void testSimpleAMACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    String viewAcls = "nobody nobody_group";
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+    TimelineDomain timelineDomain = getDomain(
+        ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString());
+    verifyDomainACLs(timelineDomain,
+        Collections.singleton("nobody"), Collections.singleton("nobody_group"));
+
+    verifyEntityDomains(applicationId, true);
+  }
+
+  @Test (timeout=50000)
+  public void testDAGACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    String viewAcls = "nobody nobody_group";
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+      DAGAccessControls accessControls = new DAGAccessControls();
+      accessControls.setUsersWithViewACLs(Collections.singleton("nobody2"));
+      accessControls.setGroupsWithViewACLs(Collections.singleton("nobody_group2"));
+      dag.setAccessControls(accessControls);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.set(TezConfiguration.TEZ_AM_VIEW_ACLS, viewAcls);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+    TimelineDomain timelineDomain = getDomain(
+        ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString());
+    verifyDomainACLs(timelineDomain,
+        Collections.singleton("nobody"), Collections.singleton("nobody_group"));
+
+    timelineDomain = getDomain(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX
+        + applicationId.toString() + "_TezSleepProcessor");
+    verifyDomainACLs(timelineDomain,
+        Sets.newHashSet("nobody", "nobody2"),
+        Sets.newHashSet("nobody_group", "nobody_group2"));
+
+    verifyEntityDomains(applicationId, false);
+  }
+
+  private void verifyEntityDomains(ApplicationId applicationId, boolean sameDomain) {
+    assertNotNull(timelineAddress);
+
+    String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+        + "tez_" + applicationId.toString();
+    LOG.info("Getting timeline entity for tez application: " + appUrl);
+    TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+
+    TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+    String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+        + tezDAGID.toString();
+    LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+    TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+
+    // App and DAG entities should have different domains
+    assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString(),
+        appEntity.getDomainId());
+    if (!sameDomain) {
+      assertEquals(ATSHistoryACLPolicyManager.DOMAIN_ID_PREFIX + applicationId.toString()
+          + "_TezSleepProcessor", dagEntity.getDomainId());
+    } else {
+      assertEquals(appEntity.getDomainId(), dagEntity.getDomainId());
+    }
+  }
+
+
+}
diff --git a/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests b/tez-plugins/tez-yarn-timeline-history-with-acls/src/test/java/org/apache/tez/tests
new file mode 100644 (file)
index 0000000..526cdf5
--- /dev/null
@@ -0,0 +1 @@
+../../../../../../../tez-yarn-timeline-history/src/test/java/org/apache/tez/tests
\ No newline at end of file
index f53a5e8..9b77c29 100644 (file)
       <artifactId>tez-dag</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tez</groupId>
+      <artifactId>tez-tests</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
     </dependency>
       <artifactId>hadoop-yarn-client</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-tests</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-common</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>org.codehaus.jettison</groupId>
       <artifactId>jettison</artifactId>
     </dependency>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-json</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
     </plugins>
   </build>
 
+  <profiles>
+    <profile>
+      <id>hadoop26</id>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-yarn-timeline-history</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
 </project>
index ce09a3f..c68d395 100644 (file)
 
 package org.apache.tez.dag.history.logging.ats;
 
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,8 +34,11 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.tez.common.ReflectionUtils;
+import org.apache.tez.common.security.HistoryACLPolicyManager;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -59,12 +64,18 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
   TimelineClient timelineClient;
 
   private HashSet<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
+  private Map<TezDAGID, String> dagDomainIdMap = new HashMap<TezDAGID, String>();
   private long maxTimeToWaitOnShutdown;
   private boolean waitForeverOnShutdown = false;
 
   private int maxEventsPerBatch;
   private long maxPollingTimeMillis;
 
+  private String sessionDomainId;
+  private static final String atsHistoryACLManagerClassName =
+      "org.apache.tez.dag.history.ats.acls.ATSHistoryACLPolicyManager";
+  private HistoryACLPolicyManager historyACLPolicyManager;
+
   public ATSHistoryLoggingService() {
     super(ATSHistoryLoggingService.class.getName());
   }
@@ -86,6 +97,23 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     if (maxTimeToWaitOnShutdown < 0) {
       waitForeverOnShutdown = true;
     }
+    sessionDomainId = conf.get(TezConfiguration.YARN_ATS_ACL_SESSION_DOMAIN_ID);
+
+    LOG.info("Using " + atsHistoryACLManagerClassName + " to manage Timeline ACLs");
+    try {
+      historyACLPolicyManager = ReflectionUtils.createClazzInstance(
+          atsHistoryACLManagerClassName);
+      historyACLPolicyManager.setConf(conf);
+    } catch (TezUncheckedException e) {
+      LOG.warn("Could not instantiate object for " + atsHistoryACLManagerClassName
+          + ". ACLs cannot be enforced correctly for history data in Timeline", e);
+      if (!conf.getBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS,
+          TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS_DEFAULT)) {
+        throw e;
+      }
+      historyACLPolicyManager = null;
+    }
+
   }
 
   @Override
@@ -223,6 +251,13 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
         skippedDAGs.add(dagId);
         return false;
       }
+      if (historyACLPolicyManager != null) {
+        String dagDomainId = dagSubmittedEvent.getConf().get(
+            TezConfiguration.YARN_ATS_ACL_DAG_DOMAIN_ID);
+        if (dagDomainId != null) {
+          dagDomainIdMap.put(dagId, dagDomainId);
+        }
+      }
     }
     if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
       // Remove from set to keep size small
@@ -240,13 +275,25 @@ public class ATSHistoryLoggingService extends HistoryLoggingService {
     return true;
   }
 
-
-
   private void handleEvents(List<DAGHistoryEvent> events) {
     TimelineEntity[] entities = new TimelineEntity[events.size()];
     for (int i = 0; i < events.size(); ++i) {
-      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(
-          events.get(i).getHistoryEvent());
+      DAGHistoryEvent event = events.get(i);
+      String domainId = sessionDomainId;
+      TezDAGID dagId = event.getDagID();
+
+      if (historyACLPolicyManager != null && dagId != null) {
+        if (dagDomainIdMap.containsKey(dagId)) {
+          domainId = dagDomainIdMap.get(dagId);
+        }
+      }
+
+      entities[i] = HistoryEventTimelineConversion.convertToTimelineEntity(event.getHistoryEvent());
+      if (historyACLPolicyManager != null) {
+        if (domainId != null && !domainId.isEmpty()) {
+          historyACLPolicyManager.updateTimelineEntityDomain(entities[i], domainId);
+        }
+      }
     }
 
     if (LOG.isDebugEnabled()) {
index a9e00c1..18ec43e 100644 (file)
@@ -61,12 +61,14 @@ public class TestATSHistoryLoggingService {
     conf.setLong(TezConfiguration.YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS,
         1000l);
     conf.setInt(TezConfiguration.YARN_ATS_MAX_EVENTS_PER_BATCH, 2);
+    conf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
     atsInvokeCounter = 0;
     atsEntitiesCounter = 0;
     atsHistoryLoggingService.init(conf);
     atsHistoryLoggingService.timelineClient = mock(TimelineClient.class);
     atsHistoryLoggingService.start();
     when(appContext.getClock()).thenReturn(clock);
+    when(appContext.getCurrentDAGID()).thenReturn(null);
     when(atsHistoryLoggingService.timelineClient.putEntities(
         Matchers.<TimelineEntity[]>anyVararg())).thenAnswer(
         new Answer<Object>() {
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestATSHistoryWithMiniCluster.java
new file mode 100644 (file)
index 0000000..f3c18cf
--- /dev/null
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.logging.ats;
+
+import java.io.IOException;
+import java.util.Random;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.runtime.library.processor.SleepProcessor;
+import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
+import org.apache.tez.test.MiniTezClusterWithTimeline;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+
+public class TestATSHistoryWithMiniCluster {
+
+  private static final Log LOG = LogFactory.getLog(TestATSHistoryWithMiniCluster.class);
+
+  protected static MiniTezClusterWithTimeline mrrTezCluster = null;
+  protected static MiniDFSCluster dfsCluster = null;
+  private static String timelineAddress;
+  private Random random = new Random();
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+
+  private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+      + TestATSHistoryWithMiniCluster.class.getName() + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null)
+          .build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    if (mrrTezCluster == null) {
+      try {
+        mrrTezCluster = new MiniTezClusterWithTimeline(TestATSHistoryWithMiniCluster.class.getName(),
+            1, 1, 1, true);
+        Configuration conf = new Configuration();
+        conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+        conf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
+        conf.setInt("yarn.nodemanager.delete.debug-delay-sec", 20000);
+        mrrTezCluster.init(conf);
+        mrrTezCluster.start();
+      } catch (Throwable e) {
+        LOG.info("Failed to start Mini Tez Cluster", e);
+      }
+    }
+    timelineAddress = mrrTezCluster.getConfig().get(
+        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS);
+    if (timelineAddress != null) {
+      // Hack to handle bug in MiniYARNCluster handling of webapp address
+      timelineAddress = timelineAddress.replace("0.0.0.0", "localhost");
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
+    LOG.info("Shutdown invoked");
+    Thread.sleep(10000);
+    if (mrrTezCluster != null) {
+      mrrTezCluster.stop();
+      mrrTezCluster = null;
+    }
+    if (dfsCluster != null) {
+      dfsCluster.shutdown();
+      dfsCluster = null;
+    }
+  }
+
+  // To be replaced after Timeline has java APIs for domains
+  private <K> K getTimelineData(String url, Class<K> clazz) {
+    Client client = new Client();
+    WebResource resource = client.resource(url);
+
+    ClientResponse response = resource.accept(MediaType.APPLICATION_JSON)
+        .get(ClientResponse.class);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
+
+    K entity = response.getEntity(clazz);
+    Assert.assertNotNull(entity);
+    return entity;
+  }
+
+  @Test (timeout=50000)
+  public void testSimpleAMACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+//    verifyEntityExistence(applicationId);
+  }
+
+  @Test (timeout=50000)
+  public void testDAGACls() throws Exception {
+    TezClient tezSession = null;
+    ApplicationId applicationId;
+    try {
+      SleepProcessorConfig spConf = new SleepProcessorConfig(1);
+
+      DAG dag = DAG.create("TezSleepProcessor");
+      Vertex vertex = Vertex.create("SleepVertex", ProcessorDescriptor.create(
+              SleepProcessor.class.getName()).setUserPayload(spConf.toUserPayload()), 1,
+          Resource.newInstance(256, 1));
+      dag.addVertex(vertex);
+
+      TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+      tezConf.setBoolean(TezConfiguration.TEZ_AM_ALLOW_DISABLED_TIMELINE_DOMAINS, true);
+      tezConf.set(TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+          ATSHistoryLoggingService.class.getName());
+      Path remoteStagingDir = remoteFs.makeQualified(new Path("/tmp", String.valueOf(random
+          .nextInt(100000))));
+      remoteFs.mkdirs(remoteStagingDir);
+      tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, remoteStagingDir.toString());
+
+      tezSession = TezClient.create("TezSleepProcessor", tezConf, true);
+      tezSession.start();
+
+      applicationId = tezSession.getAppMasterApplicationId();
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+//    verifyEntityExistence(applicationId);
+  }
+
+  private void verifyEntityExistence(ApplicationId applicationId) {
+    Assert.assertNotNull(timelineAddress);
+
+    String appUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_APPLICATION/"
+        + "tez_" + applicationId.toString()  + "?fields=otherinfo";
+    LOG.info("Getting timeline entity for tez application: " + appUrl);
+    TimelineEntity appEntity = getTimelineData(appUrl, TimelineEntity.class);
+    Assert.assertNotNull(appEntity);
+
+    TezDAGID tezDAGID = TezDAGID.getInstance(applicationId, 1);
+    String dagUrl = "http://" + timelineAddress + "/ws/v1/timeline/TEZ_DAG_ID/"
+        + tezDAGID.toString() + "?fields=otherinfo";
+    LOG.info("Getting timeline entity for tez dag: " + dagUrl);
+    TimelineEntity dagEntity = getTimelineData(dagUrl, TimelineEntity.class);
+    Assert.assertNotNull(dagEntity);
+  }
+
+
+}
index 0f2942c..3ba2f86 100644 (file)
@@ -118,7 +118,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case DAG_SUBMITTED:
           event = new DAGSubmittedEvent(tezDAGID, random.nextInt(), dagPlan, applicationAttemptId,
-              null, user);
+              null, user, null);
           break;
         case DAG_INITIALIZED:
           event = new DAGInitializedEvent(tezDAGID, random.nextInt(), user, dagPlan.getName(), null);
@@ -264,7 +264,7 @@ public class TestHistoryEventTimelineConversion {
     long submitTime = random.nextLong();
 
     DAGSubmittedEvent event = new DAGSubmittedEvent(tezDAGID, submitTime, dagPlan,
-        applicationAttemptId, null, user);
+        applicationAttemptId, null, user, null);
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_DAG_ID.name(), timelineEntity.getEntityType());
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/tests/MiniTezClusterWithTimeline.java
new file mode 100644 (file)
index 0000000..cdb990a
--- /dev/null
@@ -0,0 +1,253 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.app.DAGAppMaster;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+
+/**
+ * Configures and starts the Tez-specific components in the YARN cluster.
+ *
+ * When using this mini cluster, the user is expected to
+ */
+public class MiniTezClusterWithTimeline extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder.getJar(DAGAppMaster.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTezClusterWithTimeline.class);
+
+  private static final String YARN_CLUSTER_CONFIG = "yarn-site.xml";
+
+  private Path confFilePath;
+
+  public MiniTezClusterWithTimeline(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 4, 4);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+      int numLocalDirs, int numLogDirs)  {
+    super(testName, noOfNMs, numLocalDirs, numLogDirs);
+  }
+
+  public MiniTezClusterWithTimeline(String testName, int noOfNMs,
+                        int numLocalDirs, int numLogDirs, boolean enableAHS)  {
+    super(testName, 1, noOfNMs, numLocalDirs, numLogDirs, enableAHS);
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_TEZ_FRAMEWORK_NAME);
+    // Use libs from cluster since no build is available
+    conf.setBoolean(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, true);
+    // blacklisting disabled to prevent scheduling issues
+    conf.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir" + Path.SEPARATOR).getAbsolutePath());
+    }
+    
+    if (conf.get(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC) == null) {
+      // nothing defined. set quick delete value
+      conf.setLong(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0l);
+    }
+    
+    File appJarLocalFile = new File(MiniTezClusterWithTimeline.APPJAR);
+
+    if (!appJarLocalFile.exists()) {
+      String message = "TezAppJar " + MiniTezClusterWithTimeline.APPJAR
+          + " not found. Exiting.";
+      LOG.info(message);
+      throw new TezUncheckedException(message);
+    } else {
+      LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath());
+    }
+    
+    FileSystem fs = FileSystem.get(conf);
+    Path testRootDir = fs.makeQualified(new Path("target", getName() + "-tmpDir"));
+    Path appRemoteJar = new Path(testRootDir, "TezAppJar.jar");
+    // Copy AppJar and make it public.
+    Path appMasterJar = new Path(MiniTezClusterWithTimeline.APPJAR);
+    fs.copyFromLocalFile(appMasterJar, appRemoteJar);
+    fs.setPermission(appRemoteJar, new FsPermission("777"));
+
+    conf.set(TezConfiguration.TEZ_LIB_URIS, appRemoteJar.toUri().toString());
+    LOG.info("Set TEZ-LIB-URI to: " + conf.get(TezConfiguration.TEZ_LIB_URIS));
+
+    // VMEM monitoring disabled, PMEM monitoring enabled.
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, false);
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      /*
+       * Re-configure the staging path on Windows if the file system is localFs.
+       * We need to use a absolute path that contains the drive letter. The unit
+       * test could run on a different drive than the AM. We can run into the
+       * issue that job files are localized to the drive where the test runs on,
+       * while the AM starts on a different drive and fails to find the job
+       * metafiles. Using absolute path can avoid this ambiguity.
+       */
+      if (Path.WINDOWS) {
+        if (LocalFileSystem.class.isInstance(stagingPath.getFileSystem(conf))) {
+          conf.set(MRJobConfig.MR_AM_STAGING_DIR,
+              new File(conf.get(MRJobConfig.MR_AM_STAGING_DIR))
+                  .getAbsolutePath());
+        }
+      }
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      fc.mkdir(stagingPath, null, true);
+
+      //mkdir done directory as well
+      String doneDir =
+          JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new TezUncheckedException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test");
+
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES,
+        new String[] { ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID });
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID), ShuffleHandler.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    LOG.info("Starting MiniTezClusterWithTimeline");
+    super.serviceStart();
+    File workDir = super.getTestWorkDir();
+    Configuration conf = super.getConfig();
+
+    confFilePath = new Path(workDir.getAbsolutePath(), YARN_CLUSTER_CONFIG);
+    File confFile = new File(confFilePath.toString());
+    try {
+      confFile.createNewFile();
+      conf.writeXml(new FileOutputStream(confFile));
+      confFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    confFilePath = new Path(confFile.getAbsolutePath());
+    conf.setStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        workDir.getAbsolutePath(), System.getProperty("java.class.path"));
+    LOG.info("Setting yarn-site.xml via YARN-APP-CP at: "
+        + conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH));
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    waitForAppsToFinish();
+    super.serviceStop();
+  }
+
+  private void waitForAppsToFinish() {
+    YarnClient yarnClient = YarnClient.createYarnClient(); 
+    yarnClient.init(getConfig());
+    yarnClient.start();
+    try {
+      while(true) {
+        List<ApplicationReport> appReports = yarnClient.getApplications();
+        Collection<ApplicationReport> unCompletedApps = Collections2.filter(appReports, new Predicate<ApplicationReport>(){
+          @Override
+          public boolean apply(ApplicationReport appReport) {
+            return EnumSet.of(YarnApplicationState.NEW, YarnApplicationState.NEW_SAVING,
+            YarnApplicationState.SUBMITTED, YarnApplicationState.ACCEPTED, YarnApplicationState.RUNNING)
+            .contains(appReport.getYarnApplicationState());
+          }
+        });
+        if (unCompletedApps.size()==0){
+          break;
+        }
+        LOG.info("wait for applications to finish in MiniTezClusterWithTimeline");
+        Thread.sleep(1000);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      yarnClient.stop();
+    }
+  }
+  
+  public Path getConfigFilePath() {
+    return confFilePath;
+  }
+
+}
index ec6b429..3efc5d8 100644 (file)
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.security.DAGAccessControls;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSourceDescriptor;
 import org.apache.tez.dag.api.Edge;
@@ -95,6 +96,10 @@ public class TestOrderedWordCount extends Configured implements Tool {
 
   private static Log LOG = LogFactory.getLog(TestOrderedWordCount.class);
 
+  private static final String DAG_VIEW_ACLS = "tez.testorderedwordcount.view-acls";
+  private static final String DAG_MODIFY_ACLS = "tez.testorderedwordcount.modify-acls";
+
+
   public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{
 
@@ -291,9 +296,32 @@ public class TestOrderedWordCount extends Configured implements Tool {
         Edge.create(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"),
             edgeConf2.createDefaultEdgeProperty()));
 
+    updateDAGACls(conf, dag, dagIndex);
+
     return dag;
   }
 
+  private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) {
+    LOG.info("Checking DAG specific ACLS");
+    DAGAccessControls accessControls = null;
+    String suffix = "." + dagIndex;
+    if (conf.get(DAG_VIEW_ACLS + suffix) != null
+        || conf.get(DAG_MODIFY_ACLS + suffix) != null) {
+      accessControls = new DAGAccessControls(
+          conf.get(DAG_VIEW_ACLS + suffix), conf.get(DAG_MODIFY_ACLS + suffix));
+
+    } else if (conf.get(DAG_VIEW_ACLS) != null
+      || conf.get(DAG_MODIFY_ACLS) != null) {
+      accessControls = new DAGAccessControls(
+          conf.get(DAG_VIEW_ACLS), conf.get(DAG_MODIFY_ACLS));
+    }
+    if (accessControls != null) {
+      LOG.info("Setting DAG specific ACLS");
+      dag.setAccessControls(accessControls);
+    }
+  }
+
+
   private static void printUsage() {
     String options = " [-generateSplitsInClient true/<false>]";
     System.err.println("Usage: testorderedwordcount <in> <out>" + options);
index ff6c5cd..390ff32 100644 (file)
@@ -106,6 +106,8 @@ public class MiniTezCluster extends MiniYARNCluster {
           + " not found. Exiting.";
       LOG.info(message);
       throw new TezUncheckedException(message);
+    } else {
+      LOG.info("Using Tez AppJar: " + appJarLocalFile.getAbsolutePath());
     }
     
     FileSystem fs = FileSystem.get(conf);
@@ -182,6 +184,7 @@ public class MiniTezCluster extends MiniYARNCluster {
 
   @Override
   public void serviceStart() throws Exception {
+    LOG.info("Starting MiniTezCluster");
     super.serviceStart();
     File workDir = super.getTestWorkDir();
     Configuration conf = super.getConfig();