SAMZA-1389: Fix ZkProcessorLatch await(timeout, TimeUnit) api.
[samza.git] / samza-core / src / main / java / org / apache / samza / zk / ZkProcessorLatch.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.samza.zk;
20
21 import java.util.concurrent.TimeUnit;
22
23 import org.apache.samza.coordinator.Latch;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 /*
28 * Latch of the sizeN is open when countDown() was called N times.
29 * In this implementation a sequential node is created on every call of countDown().
30 * When Nth node is created await() call returns.
31 */
32 public class ZkProcessorLatch implements Latch {
33 private static final Logger LOGGER = LoggerFactory.getLogger(ZkProcessorLatch.class);
34
35 private final ZkUtils zkUtils;
36 private final String participantId;
37 private final String latchPath;
38 private final String targetPath;
39
40 public final static String LATCH_PATH = "latch";
41
42 public ZkProcessorLatch(int size, String latchId, String participantId, ZkUtils zkUtils) {
43 this.zkUtils = zkUtils;
44 this.participantId = participantId;
45 ZkKeyBuilder keyBuilder = this.zkUtils.getKeyBuilder();
46
47 latchPath = String.format("%s/%s", keyBuilder.getRootPath(), LATCH_PATH + "_" + latchId);
48 // TODO: Verify that makeSurePersistentPathsExists doesn't fail with exceptions
49 zkUtils.makeSurePersistentPathsExists(new String[] {latchPath});
50 targetPath = String.format("%s/%010d", latchPath, size - 1);
51
52 LOGGER.debug("ZkProcessorLatch targetPath " + targetPath);
53 }
54
55 @Override
56 public void await(long timeout, TimeUnit timeUnit) {
57 zkUtils.getZkClient().waitUntilExists(targetPath, timeUnit, timeout);
58 }
59
60 @Override
61 public void countDown() {
62 // create persistent (should be ephemeral? Probably not)
63 String path = zkUtils.getZkClient().createPersistentSequential(latchPath + "/", participantId);
64 LOGGER.debug("ZKProcessorLatch countDown created " + path);
65 }
66 }