DRILL-235: Add configuration parameters to control number of threads and width of...
authorSteven Phillips <sphillips@maprtech.com>
Fri, 6 Sep 2013 03:00:02 +0000 (20:00 -0700)
committerSteven Phillips <sphillips@maprtech.com>
Fri, 20 Sep 2013 19:10:07 +0000 (12:10 -0700)
13 files changed:
distribution/src/resources/drill-override.conf
exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java
exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
exec/java-exec/src/main/resources/drill-module.conf
exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
exec/java-exec/src/test/resources/drill-module.conf

index fb1b627..c2ed9df 100644 (file)
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
-       user.port : 31010,
-       bit.port : 32011
+    user: {
+      server: {
+        port: 31010
+        threads: 1
+      }
+      client: {
+        threads: 1
+      }
+    },
+    bit: {
+      server: {
+        port : 31011,
+        retry:{
+          count: 7200,
+          delay: 500
+        },
+        threads: 1
+      }
+    },
+       use.ip : false
   },
   operator: {
     packages += "org.apache.drill.exec.physical.config"
@@ -32,20 +50,25 @@ drill.exec: {
        packages += "org.apache.drill.exec.store"
   }
   metrics : {
-       context: "drillbit"
+    context: "drillbit"
   },
   zk: {
        connect: "localhost:2181",
        root: "/drill",
        refresh: 500,
        timeout: 5000,
-       retry: {
-         count: 7200,
-         delay: 500
-       }
-  }
-
+       retry: {
+         count: 7200,
+         delay: 500
+       }
+  },
+  functions: ["org.apache.drill.expr.fn.impl"],
   network: {
     start: 35000
+  },
+  work: {
+    max.width.per.endpoint: 5,
+    global.max.width: 100,
+    executor.threads: 4
   }
-}
+}
\ No newline at end of file
index 42abf54..72776d1 100644 (file)
@@ -24,17 +24,23 @@ public interface ExecConstants {
   public static final String ZK_TIMEOUT = "drill.exec.zk.timeout";
   public static final String ZK_ROOT = "drill.exec.zk.root";
   public static final String ZK_REFRESH = "drill.exec.zk.refresh";
-  public static final String BIT_RETRY_TIMES = "drill.exec.bit.retry.count";
-  public static final String BIT_RETRY_DELAY = "drill.exec.bit.retry.delay";
+  public static final String BIT_RETRY_TIMES = "drill.exec.rpc.bit.server.retry.count";
+  public static final String BIT_RETRY_DELAY = "drill.exec.rpc.bit.server.retry.delay";
   public static final String BIT_TIMEOUT = "drill.exec.bit.timeout" ;
   public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages";
   public static final String SERVICE_NAME = "drill.exec.cluster-id";
-  public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.port";
-  public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.port";
+  public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.server.port";
+  public static final String INITIAL_USER_PORT = "drill.exec.rpc.user.server.port";
   public static final String METRICS_CONTEXT_NAME = "drill.exec.metrics.context";
   public static final String FUNCTION_PACKAGES = "drill.exec.functions";
   public static final String USE_IP_ADDRESS = "drill.exec.rpc.use.ip";
   public static final String METRICS_JMX_OUTPUT_ENABLED = "drill.exec.metrics.jmx.enabled";
   public static final String METRICS_LOG_OUTPUT_ENABLED = "drill.exec.metrics.log.enabled";
   public static final String METRICS_LOG_OUTPUT_INTERVAL = "drill.exec.metrics.log.interval";
+  public static final String GLOBAL_MAX_WIDTH = "drill.exec.work.global.max.width";
+  public static final String MAX_WIDTH_PER_ENDPOINT = "drill.exec.work.max.width.per.endpoint";
+  public static final String EXECUTOR_THREADS = "drill.exec.work.executor.threads";
+  public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads";
+  public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads";
+  public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads";
 }
index 1c05e14..3dadb0c 100644 (file)
@@ -106,7 +106,7 @@ public class DrillClient implements Closeable{
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
     DrillbitEndpoint endpoint = endpoints.iterator().next();
-    this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+    this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(config.getInt(ExecConstants.CLIENT_RPC_THREADS), new NamedThreadFactory("Client-")));
     logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
     connect(endpoint);
     connected = true;
index 78c455d..dff2dd3 100644 (file)
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.planner;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.QueryContext;
 import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -46,8 +47,11 @@ public class SimpleExecPlanner implements ExecPlanner{
     
     // generate a planning set and collect stats.
     PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot);
+
+    int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT);
     
-    return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(), context.getPlanReader(), fragmentRoot, planningSet, maxWidth);
+    return parallelizer.getFragments(context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(),
+            context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint);
     
     
   }
index fd1984c..30a3d5a 100644 (file)
@@ -60,12 +60,13 @@ public class SimpleParallelizer {
    * @param globalMaxWidth  The maximum level or parallelization any stage of the query can do. Note that while this
    *                        might be the number of active Drillbits, realistically, this could be well beyond that
    *                        number of we want to do things like speed results return.
+   * @param maxWidthPerEndpoint Limits the maximum level of parallelization to this factor time the number of Drillbits
    * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
    * @throws ExecutionSetupException
    */
   public QueryWorkUnit getFragments(DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
-                                    int globalMaxWidth) throws ExecutionSetupException {
-    assignEndpoints(activeEndpoints, planningSet, globalMaxWidth);
+                                    int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException {
+    assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint);
     return generateWorkUnit(foremanNode, queryId, reader, rootNode, planningSet);
   }
 
@@ -142,7 +143,7 @@ public class SimpleParallelizer {
   }
 
   private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet,
-                               int globalMaxWidth) throws PhysicalOperatorSetupException {
+                               int globalMaxWidth, int maxWidthPerEndpoint) throws PhysicalOperatorSetupException {
     // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on
     // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this
     // could be based on endpoint load)
@@ -161,6 +162,8 @@ public class SimpleParallelizer {
         width = (int) diskCost;
       }
 
+      width = Math.min(width, maxWidthPerEndpoint*allNodes.size());
+
       if (width < 1) width = 1;
 //      logger.debug("Setting width {} on fragment {}", width, wrapper);
       wrapper.setWidth(width);
index 8b0cc91..8c1487c 100644 (file)
@@ -152,7 +152,6 @@ public class Wrapper {
         endpoints.add(all.get(i % div));
       }
     } else {
-      width = Math.min(width, values.size()*5);
       // get nodes with highest affinity.
       Collections.sort(values);
       values = Lists.reverse(values);
index 1b10ef0..12f42c2 100644 (file)
@@ -39,7 +39,7 @@ public class BootStrapContext implements Closeable{
   public BootStrapContext(DrillConfig config) {
     super();
     this.config = config;
-    this.loop = new NioEventLoopGroup(1, new NamedThreadFactory("BitServer-"));
+    this.loop = new NioEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), new NamedThreadFactory("BitServer-"));
     this.metrics = new MetricRegistry(config.getString(ExecConstants.METRICS_CONTEXT_NAME));
     this.allocator = BufferAllocator.getAllocator(config);
   }
index bc0d171..8406deb 100644 (file)
@@ -47,7 +47,8 @@ public class ServiceEngine implements Closeable{
   boolean useIP = false;
   
   public ServiceEngine(BitComHandler bitComWorker, UserWorker userWorker, BootStrapContext context){
-    this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("UserServer-")), userWorker);
+    this.userServer = new UserServer(context.getAllocator().getUnderlyingAllocator(), new NioEventLoopGroup(context.getConfig().getInt(ExecConstants.USER_SERVER_RPC_THREADS),
+            new NamedThreadFactory("UserServer-")), userWorker);
     this.bitCom = new BitComImpl(context, bitComWorker);
     this.config = context.getConfig();
   }
index ab9c16b..9a82c62 100644 (file)
@@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.cache.DistributedCache;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -63,7 +64,7 @@ public class WorkManager implements Closeable{
   private final BitComHandler bitComWorker;
   private final UserWorker userWorker;
   private final WorkerBee bee;
-  private Executor executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("WorkManager-"));
+  private Executor executor;
   private final EventThread eventThread;
   
   public WorkManager(BootStrapContext context){
@@ -72,11 +73,11 @@ public class WorkManager implements Closeable{
     this.bitComWorker = new BitComHandlerImpl(bee);
     this.userWorker = new UserWorker(bee);
     this.eventThread = new EventThread();
-    
   }
   
   public void start(DrillbitEndpoint endpoint, DistributedCache cache, BitCom com, ClusterCoordinator coord){
     this.dContext = new DrillbitContext(endpoint, bContext, coord, com, cache);
+    executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS), new NamedThreadFactory("WorkManager-"));
     eventThread.start();
   }
   
index 5e3302e..726539a 100644 (file)
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.LogicalPlan;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.exception.OptimizerException;
 import org.apache.drill.exec.ops.QueryContext;
@@ -209,7 +210,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
     SimpleParallelizer parallelizer = new SimpleParallelizer();
 
     try {
-      QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet, 10);
+      QueryWorkUnit work = parallelizer.getFragments(context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(),
+              context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH),
+              context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT));
 
       this.context.getBitCom().getListeners().addFragmentStatusListener(work.getRootFragment().getHandle(), fragmentManager);
       List<PlanFragment> leafFragments = Lists.newArrayList();
index 41282e5..7e40a05 100644 (file)
@@ -7,8 +7,25 @@ drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
-       user.port : 31010,
-       bit.port : 31011,
+    user: {
+      server: {
+        port: 31010
+        threads: 1
+      }
+      client: {
+        threads: 1
+      }
+    },
+    bit: {
+      server: {
+        port : 31011,
+        retry:{
+          count: 7200,
+          delay: 500
+        },
+        threads: 1
+      }
+    },
        use.ip : false
   },
   operator: {
@@ -20,7 +37,7 @@ drill.exec: {
   functions: ["org.apache.drill.expr.fn.impl"],
   storage: {
     packages += "org.apache.drill.exec.store"  
-  }
+  },
   metrics : { 
     context: "drillbit",
     jmx: {
@@ -42,13 +59,12 @@ drill.exec: {
        }    
   },
   functions: ["org.apache.drill.expr.fn.impl"],
-  bit: {
-    retry:{
-       count: 7200,
-       delay: 500
-    }
-  } ,  
   network: {
     start: 35000
+  },
+  work: {
+    max.width.per.endpoint: 5,
+    global.max.width: 100,
+    executor.threads: 4
   }
 }
\ No newline at end of file
index 7bb90ee..6e0bfa3 100644 (file)
@@ -61,7 +61,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
     }
     
     
-    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10);
+    QueryWorkUnit qwu = par.getFragments(localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5);
     System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
     
     System.out.print(qwu.getRootFragment().getFragmentJson());
index 744b786..99dd863 100644 (file)
@@ -1,11 +1,32 @@
-//  This file tells Drill to consider this module when class path scanning.  
-//  This file can also include any supplementary configuration information.  
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
 //  This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+
+drill.logical.function.packages += "org.apache.drill.exec.expr.fn.impl"
+
 drill.exec: {
   cluster-id: "drillbits1"
   rpc: {
-       user.port : 31010,
-       bit.port : 32010
+    user: {
+      server: {
+        port: 31010
+        threads: 1
+      }
+      client: {
+        threads: 1
+      }
+    },
+    bit: {
+      server: {
+        port : 31011,
+        retry:{
+          count: 7200,
+          delay: 500
+        },
+        threads: 1
+      }
+    },
+       use.ip : false
   },
   operator: {
     packages += "org.apache.drill.exec.physical.config"
@@ -13,30 +34,30 @@ drill.exec: {
   optimizer: {
     implementation: "org.apache.drill.exec.opt.IdentityOptimizer"
   },
+  functions: ["org.apache.drill.expr.fn.impl"],
   storage: {
-       packages += "org.apache.drill.exec.store"  
-  }
-  metrics : { 
-       context: "drillbit"
+    packages += "org.apache.drill.exec.store"
+  },
+  metrics : {
+    context: "drillbit"
   },
   zk: {
-       connect: "10.10.30.52:5181",
+       connect: "localhost:2181",
        root: "/drill",
        refresh: 500,
        timeout: 5000,
-       retry: {
-         count: 7200,
-         delay: 500
-       }    
-  }
-  bit: {
-    retry:{
-       count: 7200,
-       delay: 500
-    }
-  } ,
-
+       retry: {
+         count: 7200,
+         delay: 500
+       }
+  },
+  functions: ["org.apache.drill.expr.fn.impl"],
   network: {
     start: 35000
+  },
+  work: {
+    max.width.per.endpoint: 5,
+    global.max.width: 100,
+    executor.threads: 4
   }
 }