SAMZA-1291: StandAlone config
authorBoris Shkolnik <boryas@apache.org>
Mon, 22 May 2017 16:36:12 +0000 (09:36 -0700)
committerXinyu Liu <xiliu@xiliu-ld.linkedin.biz>
Mon, 22 May 2017 16:36:12 +0000 (09:36 -0700)
Author: Boris Shkolnik <boryas@apache.org>

Reviewers: Xinyu Liu <xinyu@apache.org>

Closes #192 from sborya/StandAloneConfig

docs/learn/documentation/versioned/jobs/configuration-table.html
samza-core/src/main/java/org/apache/samza/config/ZkConfig.java
samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java

index afa42f5..fe1580f 100644 (file)
                         <a href="#stores-changelog" class="property">stores.store-name.changelog</a>.
                     </td>
                 </tr>
+                 <tr>
+                    <td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
+                    <td class="default"></td>
+                    <td class="description">
+                       Class to use for job coordination. Currently available values are:
+                       <dl>
+                            <dt><code>org.apache.samza.standalone.StandaloneJobCoordinatorFactory</code></dt>
+                            <dd>Fixed partition mapping. No Zoookeeper. </dd>
+                            <dt><code>org.apache.samza.zk.ZkJobCoordinatorFactory</code></dt>
+                            <dd>Zookeeper-based coordination. </dd>
+                        </dl>
+                        Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
+                    </td>
+                </tr>
+
+                <tr>
+                                              <!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
+                <th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.connect">job.coordinator.zk.connect</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required</strong> for applications with Zookeeper-based coordination. Zookeeper coordinates (in "host:port[/znode]" format) to be used for coordination.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.session.timeout.ms">job.coordinator.zk.session.timeout.ms</td>
+                    <td class="default"> 30000 </td>
+                    <td class="description">
+                        Zookeeper session timeout for all the ZK connections in milliseconds. Session timeout controls how long zk client will wait before throwing an exception, when it cannot talk to one of ZK servers.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.connection.timeout.ms">job.coordinator.zk.connection.timeout.ms</td>
+                    <td class="default"> 60000 </td>
+                    <td class="description">
+                        Zookeeper connection timeout in milliseconds. Zk connection timeout controls how long client tries to connect to ZK server before giving up.
+                    </td>
+                </tr>
+                <tr>
+                    <td class="property" id="job.coordinator.zk.new.consensus.timeout.ms">job.coordinator.zk.consensus.timeout.ms</td>
+                    <td class="default"> 40000 </td>
+                    <td class="description">
+                       How long each processor will wait for all the processors to report acceptance of the new job model before rolling back.
+                    </td>
+                </tr>
                 <tr>
                     <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th>
                 </tr>
                         The fully-qualified name of the Java class which determines the factory class which will build the TaskNameGrouper.
                         The default configuration value if the property is not present is <code>task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerCountFactory</code>.<br>
                         The user can specify a custom implementation of the TaskNameGrouperFactory where a custom logic is implemented for grouping the tasks.
+                    <p><strong>Note:</strong> For non-cluster applications (ones using coordination service) one must use <i>org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</i>
                     </td>
                 </tr>
 
index fc483eb..34d2542 100644 (file)
@@ -21,15 +21,15 @@ package org.apache.samza.config;
 
 public class ZkConfig extends MapConfig {
   // Connection string for ZK, format: :<hostname>:<port>,..."
-  public static final String ZK_CONNECT = "coordinator.zk.connect";
-  public static final String ZK_SESSION_TIMEOUT_MS = "coordinator.zk.session-timeout-ms";
-  public static final String ZK_CONNECTION_TIMEOUT_MS = "coordinator.zk.connection-timeout-ms";
+  public static final String ZK_CONNECT = "job.coordinator.zk.connect";
+  public static final String ZK_SESSION_TIMEOUT_MS = "job.coordinator.zk.session.timeout.ms";
+  public static final String ZK_CONNECTION_TIMEOUT_MS = "job.coordinator.zk.connection.timeout.ms";
+  public static final String ZK_CONSENSUS_TIMEOUT_MS = "job.coordinator.zk.consensus.timeout.ms";
 
   public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 60000;
   public static final int DEFAULT_SESSION_TIMEOUT_MS = 30000;
-  public static final String ZK_BARRIER_TIMEOUT_MS = "coordinator.zk.barrier-timeout-ms";
-  public static final int DEFAULT_BARRIER_TIMEOUT_MS = 40000;
-
+  public static final int DEFAULT_CONSENSUS_TIMEOUT_MS = 40000;
+  
   public ZkConfig(Config config) {
     super(config);
   }
@@ -50,6 +50,6 @@ public class ZkConfig extends MapConfig {
   }
 
   public int getZkBarrierTimeoutMs() {
-    return getInt(ZK_BARRIER_TIMEOUT_MS, DEFAULT_BARRIER_TIMEOUT_MS);
+    return getInt(ZK_CONSENSUS_TIMEOUT_MS, DEFAULT_CONSENSUS_TIMEOUT_MS);
   }
 }
index 547e32b..9c91fd3 100644 (file)
@@ -55,7 +55,7 @@ public class TestZkBarrierForVersionUpgrade {
     String processorId = "p1";
     Map<String, String> map = new HashMap<>();
     map.put(ZkConfig.ZK_CONNECT, testZkConnectionString);
-    map.put(ZkConfig.ZK_BARRIER_TIMEOUT_MS, "200");
+    map.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, "200");
     Config config = new MapConfig(map);
 
     CoordinationServiceFactory serviceFactory = new ZkCoordinationServiceFactory();