TEZ-3902. Upgrade to netty-3.10.5.Final.jar (Jason Lowe via jeagles)
authorJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 22 May 2018 22:03:55 +0000 (17:03 -0500)
committerJonathan Eagles <jeagles@yahoo-inc.com>
Tue, 22 May 2018 22:03:55 +0000 (17:03 -0500)
pom.xml
tez-dist/src/main/assembly/tez-dist-minimal.xml
tez-ext-service-tests/pom.xml
tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java

diff --git a/pom.xml b/pom.xml
index 1c8caeb..917edb8 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -39,7 +39,7 @@
     <clover.license>${user.home}/clover.license</clover.license>
     <hadoop.version>2.7.2</hadoop.version>
     <jetty.version>9.3.22.v20171030</jetty.version>
-    <netty.version>3.6.2.Final</netty.version>
+    <netty.version>3.10.5.Final</netty.version>
     <pig.version>0.13.0</pig.version>
     <javac.version>1.8</javac.version>
     <slf4j.version>1.7.10</slf4j.version>
       <dependency>
         <groupId>com.ning</groupId>
              <artifactId>async-http-client</artifactId>
-             <version>1.8.16</version>
+             <version>1.9.40</version>
       </dependency>
       <dependency>
         <groupId>org.slf4j</groupId>
index 80633ff..fbd1782 100644 (file)
@@ -24,6 +24,7 @@
       <useAllReactorProjects>true</useAllReactorProjects>
       <excludes>
         <exclude>org.apache.tez:tez-aux-services</exclude>
+        <exclude>org.apache.tez:tez-ext-service-tests</exclude>
       </excludes>
       <binaries>
         <outputDirectory>/</outputDirectory>
index d6d8573..e123a7a 100644 (file)
@@ -29,7 +29,6 @@
     <dependency>
       <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
-      <version>3.6.2.Final</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
index ebaf9fe..47ac900 100644 (file)
@@ -367,9 +367,9 @@ public class ShuffleHandler {
       }
       // Check whether the shuffle version is compatible
       if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+          request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
           || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+              request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
         sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
       }
       final Map<String,List<String>> q =
@@ -551,12 +551,12 @@ public class ShuffleHandler {
         boolean keepAliveParam, long contentLength) {
       if (!connectionKeepAliveEnabled && !keepAliveParam) {
         LOG.info("Setting connection close header...");
-        response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+        response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
       } else {
-        response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
           String.valueOf(contentLength));
-        response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+        response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout="
             + connectionKeepAliveTimeOut);
         LOG.info("Content Length in shuffle : " + contentLength);
       }
@@ -584,7 +584,7 @@ public class ShuffleHandler {
       String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
       // hash from the fetcher
       String urlHashStr =
-        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+        request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
       if (urlHashStr == null) {
         LOG.info("Missing header hash for " + appid);
         throw new IOException("fetcher cannot be authenticated");
@@ -600,11 +600,11 @@ public class ShuffleHandler {
       String reply =
         SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), 
             tokenSecret);
-      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
       // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       if (LOG.isDebugEnabled()) {
         int len = reply.length();
@@ -654,11 +654,11 @@ public class ShuffleHandler {
     protected void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       response.setContent(
         ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
index d48cc01..e22928e 100644 (file)
@@ -992,9 +992,9 @@ public class ShuffleHandler extends AuxiliaryService {
       }
       // Check whether the shuffle version is compatible
       if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
-          request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+          request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
           || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
-              request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+              request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
         sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
       }
       final Map<String,List<String>> q =
@@ -1280,9 +1280,9 @@ public class ShuffleHandler extends AuxiliaryService {
 
     protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
       if (connectionKeepAliveEnabled || keepAliveParam) {
-        response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
-        response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-        response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
+        response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
+        response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+        response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
         if (LOG.isDebugEnabled()) {
           LOG.debug("Content Length in shuffle : " + contentLength);
         }
@@ -1290,7 +1290,7 @@ public class ShuffleHandler extends AuxiliaryService {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Setting connection close header...");
         }
-        response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+        response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
       }
     }
 
@@ -1316,7 +1316,7 @@ public class ShuffleHandler extends AuxiliaryService {
       String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
       // hash from the fetcher
       String urlHashStr =
-        request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+        request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
       if (urlHashStr == null) {
         LOG.info("Missing header hash for " + appid);
         throw new IOException("fetcher cannot be authenticated");
@@ -1332,11 +1332,11 @@ public class ShuffleHandler extends AuxiliaryService {
       String reply =
         SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
             tokenSecret);
-      response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+      response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
       // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       if (LOG.isDebugEnabled()) {
         int len = reply.length();
@@ -1420,11 +1420,11 @@ public class ShuffleHandler extends AuxiliaryService {
     protected void sendError(ChannelHandlerContext ctx, String message,
         HttpResponseStatus status) {
       HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-      response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+      response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
       // Put shuffle version into http header
-      response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
           ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
           ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
       response.setContent(
         ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
index b9fd0d2..11c92fb 100644 (file)
@@ -82,6 +82,7 @@ import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.socket.SocketChannel;
 import org.jboss.netty.channel.MessageEvent;
 import org.jboss.netty.channel.AbstractChannel;
+import org.jboss.netty.handler.codec.http.DefaultHttpHeaders;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpHeaders;
 import org.jboss.netty.handler.codec.http.HttpRequest;
@@ -1236,6 +1237,7 @@ public class TestShuffleHandler {
   public HttpRequest createMockHttpRequest() {
     HttpRequest mockHttpRequest = mock(HttpRequest.class);
     Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
+    Mockito.doReturn(new DefaultHttpHeaders()).when(mockHttpRequest).headers();
     Mockito.doAnswer(new Answer() {
       @Override
       public Object answer(InvocationOnMock invocation) throws Throwable {
index 735bb46..9243e97 100644 (file)
@@ -31,7 +31,6 @@ import org.apache.tez.http.BaseHttpConnection;
 import org.apache.tez.http.HttpConnectionParams;
 import org.apache.tez.http.SSLFactory;
 import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
 import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
 import org.apache.tez.util.StopWatch;
@@ -92,15 +91,14 @@ public class AsyncHttpConnection extends BaseHttpConnection {
            * setMaxConnections & addRequestFilter.
            */
           builder
-              .setAllowPoolingConnection(httpConnParams.isKeepAlive())
-              .setAllowSslConnectionPool(httpConnParams.isKeepAlive())
-              .setCompressionEnabled(false)
+              .setAllowPoolingConnections(httpConnParams.isKeepAlive())
+              .setAllowPoolingSslConnections(httpConnParams.isKeepAlive())
+              .setCompressionEnforced(false)
               //.setExecutorService(applicationThreadPool)
               //.addRequestFilter(new ThrottleRequestFilter())
-              .setMaximumConnectionsPerHost(1)
-              .setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout())
-              .setRequestTimeoutInMs(httpConnParams.getReadTimeout())
-              .setUseRawUrl(true)
+              .setMaxConnectionsPerHost(1)
+              .setConnectTimeout(httpConnParams.getConnectionTimeout())
+              .setDisableUrlEncodingForBoundedRequests(true)
               .build();
             httpAsyncClient = new AsyncHttpClient(builder.build());
         }