SAMZA-1880: Rename non-metrics classes which use Timer in their name
authorCameron Lee <calee@linkedin.com>
Wed, 26 Sep 2018 21:08:20 +0000 (14:08 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Wed, 26 Sep 2018 21:08:20 +0000 (14:08 -0700)
Summary of API changes:
1. TimerRegistry -> KeyScheduler; _register_ -> _schedule_
2. TimerFunction -> SchedulingFunction; _registerTimer_ -> _schedulingInit_, _onTimer_ -> _executeForKey_
3. TimerCallback -> SchedulingCallback _onTimer_ -> _execute_
4. TaskContext: _registerTimer_ -> _scheduleCallback_, _deleteTimer_ -> _deleteScheduledCallback_

Only terminology changes are intended (e.g. classes, var names, logs). No functionality change is intended.
An upcoming PR will further update TaskContext and the access to the scheduling logic.

Author: Cameron Lee <calee@linkedin.com>

Reviewers: Prateek Maheshwari <pmaheshwari@apaapache.org>

Closes #644 from cameronlee314/rename_timer

36 files changed:
samza-api/src/main/java/org/apache/samza/operators/Scheduler.java [moved from samza-api/src/main/java/org/apache/samza/operators/TimerRegistry.java with 61% similarity]
samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java [deleted file]
samza-api/src/main/java/org/apache/samza/scheduler/ScheduledCallback.java [moved from samza-api/src/main/java/org/apache/samza/task/TimerCallback.java with 64% similarity]
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
samza-core/src/main/java/org/apache/samza/operators/spec/BroadcastOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/SinkOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/StreamTableJoinOperatorSpec.java
samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
samza-core/src/main/java/org/apache/samza/task/EpochTimeScheduler.java [moved from samza-core/src/main/java/org/apache/samza/task/SystemTimerScheduler.java with 86% similarity]
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
samza-core/src/test/java/org/apache/samza/operators/TestOperatorSpecGraph.java
samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
samza-core/src/test/java/org/apache/samza/operators/spec/OperatorSpecTestUtils.java
samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpec.java
samza-core/src/test/java/org/apache/samza/operators/spec/TestPartitionByOperatorSpec.java
samza-core/src/test/java/org/apache/samza/operators/spec/TestWindowOperatorSpec.java
samza-core/src/test/java/org/apache/samza/task/TestEpochTimeScheduler.java [moved from samza-core/src/test/java/org/apache/samza/task/TestSystemTimerScheduler.java with 87% similarity]
samza-sql/src/test/java/org/apache/samza/sql/translator/TestProjectTranslator.java
samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java [moved from samza-test/src/test/java/org/apache/samza/test/framework/TimerTest.java with 91% similarity]
samza-test/src/test/java/org/apache/samza/test/framework/TestSchedulingApp.java [moved from samza-test/src/test/java/org/apache/samza/test/framework/TestTimerApp.java with 79% similarity]

 package org.apache.samza.operators;
 
 /**
- * Allows registering epoch-time timer callbacks from the operators.
- * See {@link org.apache.samza.operators.functions.TimerFunction} for details.
- * @param <K> type of the timer key
+ * Allows scheduling {@link org.apache.samza.operators.functions.ScheduledFunction} callbacks to be invoked later.
+ * @param <K> type of the key to schedule
  */
-public interface TimerRegistry<K> {
-
+public interface Scheduler<K> {
   /**
-   * Register a epoch-time timer with key.
-   * @param key unique timer key
-   * @param timestamp epoch time when the timer will be fired, in milliseconds
+   * Schedule a callback for the {@code key} to be invoked at {@code timestamp}.
+   * @param key unique key associated with the callback to schedule
+   * @param timestamp epoch time when the callback for the key will be invoked, in milliseconds
    */
-  void register(K key, long timestamp);
+  void schedule(K key, long timestamp);
 
   /**
-   * Delete the timer for the provided key.
-   * @param key key for the timer to delete
+   * Delete the scheduled callback for the provided {@code key}.
+   * @param key key to delete
    */
   void delete(K key);
 }
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/ScheduledFunction.java
new file mode 100644 (file)
index 0000000..952948c
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+import org.apache.samza.operators.Scheduler;
+
+import java.util.Collection;
+
+
+/**
+ * Allows scheduling a callback for a specific epoch-time.
+ * Key must be a unique identifier for its corresponding logic to execute, and is provided in the callback when the
+ * corresponding schedule time occurs.
+ *
+ * <p>
+ * Example of a {@link FlatMapFunction} with {@link ScheduledFunction}:
+ * <pre>{@code
+ *    public class ExampleScheduledFn implements FlatMapFunction<String, String>, ScheduledFunction<String, String> {
+ *      // for recurring callbacks, keep track of the scheduler from "schedule"
+ *      private Scheduler scheduler;
+ *
+ *      public void schedule(Scheduler scheduler) {
+ *        // save the scheduler for recurring callbacks
+ *        this.scheduler = scheduler;
+ *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
+ *        scheduler.schedule("do-delayed-logic", time);
+ *      }
+ *      public Collection<String> apply(String s) {
+ *        ...
+ *      }
+ *      public Collection<String> onCallback(String key, long timestamp) {
+ *        // do some logic for key "do-delayed-logic"
+ *        ...
+ *        // for recurring callbacks, call the saved scheduler again
+ *        this.scheduler.schedule("example-process", System.currentTimeMillis() + 5000);
+ *      }
+ *    }
+ * }</pre>
+ * @param <K> type of the key
+ * @param <OM> type of the output
+ */
+public interface ScheduledFunction<K, OM> {
+  /**
+   * Allows scheduling the initial callback(s) and saving the {@code scheduler} for later use for recurring callbacks.
+   * @param scheduler used to specify the schedule time(s) and key(s)
+   */
+  void schedule(Scheduler<K> scheduler);
+
+  /**
+   * Returns the output from the scheduling logic corresponding to the key that was triggered.
+   * @param key key corresponding to the callback that got invoked
+   * @param timestamp schedule time that was set for the callback for the key, in milliseconds since epoch
+   * @return {@link Collection} of output elements
+   */
+  Collection<OM> onCallback(K key, long timestamp);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/TimerFunction.java
deleted file mode 100644 (file)
index 01825c6..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.functions;
-
-import org.apache.samza.operators.TimerRegistry;
-
-import java.util.Collection;
-
-/**
- * Allows timer registration with a key and is invoked when the timer is fired.
- * Key must be a unique identifier for this timer, and is provided in the callback when the timer fires.
- *
- * <p>
- * Example of a {@link FlatMapFunction} with timer:
- * <pre>{@code
- *    public class ExampleTimerFn implements FlatMapFunction<String, String>, TimerFunction<String, String> {
- *      public void registerTimer(TimerRegistry timerRegistry) {
- *        long time = System.currentTimeMillis() + 5000; // fire after 5 sec
- *        timerRegistry.register("example-timer", time);
- *      }
- *      public Collection<String> apply(String s) {
- *        ...
- *      }
- *      public Collection<String> onTimer(String key, long timestamp) {
- *        // example-timer fired
- *        ...
- *      }
- *    }
- * }</pre>
- * @param <K> type of the key
- * @param <OM> type of the output
- */
-public interface TimerFunction<K, OM> {
-
-  /**
-   * Registers any epoch-time timers using the registry
-   * @param timerRegistry a keyed {@link TimerRegistry}
-   */
-  void registerTimer(TimerRegistry<K> timerRegistry);
-
-  /**
-   * Returns the output after the timer with key fires.
-   * @param key timer key
-   * @param timestamp time of the epoch-time timer fired, in milliseconds
-   * @return {@link Collection} of output elements
-   */
-  Collection<OM> onTimer(K key, long timestamp);
-}
  * under the License.
  */
 
-package org.apache.samza.task;
+package org.apache.samza.scheduler;
+
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+
 
 /**
- * The callback that is invoked when its corresponding timer registered via {@link TaskContext} fires.
- * @param <K> type of the timer key
+ * The callback that is invoked when its corresponding schedule time registered via
+ * {@link org.apache.samza.task.TaskContext} is reached.
+ * @param <K> type of the callback key
  */
-public interface TimerCallback<K> {
+public interface ScheduledCallback<K> {
   /**
-   * Invoked when the timer of key fires.
-   * @param key timer key
+   * Invoked when the corresponding schedule time is reached.
+   * @param key key for callback
    * @param collector contains the means of sending message envelopes to the output stream.
    * @param coordinator manages execution of tasks.
    */
-  void onTimer(K key, MessageCollector collector, TaskCoordinator coordinator);
+  void onCallback(K key, MessageCollector collector, TaskCoordinator coordinator);
 }
index ea2a3bc..eccedba 100644 (file)
@@ -24,6 +24,7 @@ import java.util.Set;
 import org.apache.samza.container.SamzaContainerContext;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 
@@ -76,21 +77,21 @@ public interface TaskContext {
   }
 
   /**
-   * Register a keyed timer with a callback of {@link TimerCallback} in this task.
+   * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
    * The callback will be invoked exclusively with any other operations for this task,
    * e.g. processing, windowing and commit.
-   * @param key timer key
-   * @param timestamp epoch time when the timer will be fired, in milliseconds
-   * @param callback callback when the timer is fired
+   * @param key key for the callback
+   * @param timestamp epoch time when the callback will be fired, in milliseconds
+   * @param callback callback to call when the {@code timestamp} is reached
    * @param <K> type of the key
    */
-  <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback);
+  <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);
 
   /**
-   * Delete the keyed timer in this task.
-   * Deletion only happens if the timer hasn't been fired. Otherwise it will not interrupt.
-   * @param key timer key
+   * Delete the scheduled {@code callback} for the {@code key}.
+   * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
+   * @param key callback key
    * @param <K> type of the key
    */
-  <K> void deleteTimer(K key);
+  <K> void deleteScheduledCallback(K key);
 }
index d65be4c..bea6373 100644 (file)
@@ -29,9 +29,9 @@ import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.table.Table;
 import org.apache.samza.table.TableManager;
-import org.apache.samza.task.SystemTimerScheduler;
+import org.apache.samza.task.EpochTimeScheduler;
 import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TimerCallback;
+import org.apache.samza.scheduler.ScheduledCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +53,7 @@ public class TaskContextImpl implements TaskContext {
   private final JobModel jobModel;
   private final StreamMetadataCache streamMetadataCache;
   private final Map<String, Object> objectRegistry = new HashMap<>();
-  private final SystemTimerScheduler timerScheduler;
+  private final EpochTimeScheduler timerScheduler;
 
   private Object userContext = null;
 
@@ -76,7 +76,7 @@ public class TaskContextImpl implements TaskContext {
     this.tableManager = tableManager;
     this.jobModel = jobModel;
     this.streamMetadataCache = streamMetadataCache;
-    this.timerScheduler = SystemTimerScheduler.create(timerExecutor);
+    this.timerScheduler = EpochTimeScheduler.create(timerExecutor);
   }
 
   @Override
@@ -134,12 +134,12 @@ public class TaskContextImpl implements TaskContext {
   }
 
   @Override
-  public <K> void registerTimer(K key, long timestamp, TimerCallback<K> callback) {
+  public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
     timerScheduler.setTimer(key, timestamp, callback);
   }
 
   @Override
-  public <K> void deleteTimer(K key) {
+  public <K> void deleteScheduledCallback(K key) {
     timerScheduler.deleteTimer(key);
   }
 
@@ -159,7 +159,7 @@ public class TaskContextImpl implements TaskContext {
     return streamMetadataCache;
   }
 
-  public SystemTimerScheduler getTimerScheduler() {
+  public EpochTimeScheduler getTimerScheduler() {
     return timerScheduler;
   }
 }
index eedf45d..5cafd26 100644 (file)
@@ -25,8 +25,8 @@ import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.metrics.Counter;
@@ -439,19 +439,19 @@ public abstract class OperatorImpl<M, RM> {
 
   /**
    * Returns a registry which allows registering arbitrary system-clock timer with K-typed key.
-   * The user-defined function in the operator spec needs to implement {@link TimerFunction#onTimer(Object, long)}
+   * The user-defined function in the operator spec needs to implement {@link ScheduledFunction#onCallback(Object, long)}
    * for timer notifications.
    * @param <K> key type for the timer.
-   * @return an instance of {@link TimerRegistry}
+   * @return an instance of {@link Scheduler}
    */
-  <K> TimerRegistry<K> createOperatorTimerRegistry() {
-    return new TimerRegistry<K>() {
+  <K> Scheduler<K> createOperatorScheduler() {
+    return new Scheduler<K>() {
       @Override
-      public void register(K key, long time) {
-        taskContext.registerTimer(key, time, (k, collector, coordinator) -> {
-            final TimerFunction<K, RM> timerFn = getOperatorSpec().getTimerFn();
-            if (timerFn != null) {
-              final Collection<RM> output = timerFn.onTimer(key, time);
+      public void schedule(K key, long time) {
+        taskContext.scheduleCallback(key, time, (k, collector, coordinator) -> {
+            final ScheduledFunction<K, RM> scheduledFn = getOperatorSpec().getScheduledFn();
+            if (scheduledFn != null) {
+              final Collection<RM> output = scheduledFn.onCallback(key, time);
 
               if (!output.isEmpty()) {
                 output.forEach(rm ->
@@ -460,7 +460,7 @@ public abstract class OperatorImpl<M, RM> {
               }
             } else {
               throw new SamzaException(
-                  String.format("Operator %s id %s (created at %s) must implement TimerFunction to use system timer.",
+                  String.format("Operator %s id %s (created at %s) must implement ScheduledFunction to use system timer.",
                       getOperatorSpec().getOpCode().name(), getOpImplId(), getOperatorSpec().getSourceLocation()));
             }
           });
@@ -468,7 +468,7 @@ public abstract class OperatorImpl<M, RM> {
 
       @Override
       public void delete(K key) {
-        taskContext.deleteTimer(key);
+        taskContext.deleteScheduledCallback(key);
       }
     };
   }
index 367576a..d76c7de 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.container.TaskContextImpl;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.PartialJoinFunction;
 import org.apache.samza.util.TimestampedValue;
@@ -172,9 +172,9 @@ public class OperatorImplGraph {
       operatorImpl.init(config, context);
       operatorImpl.registerInputStream(inputStream);
 
-      if (operatorSpec.getTimerFn() != null) {
-        final TimerRegistry timerRegistry = operatorImpl.createOperatorTimerRegistry();
-        operatorSpec.getTimerFn().registerTimer(timerRegistry);
+      if (operatorSpec.getScheduledFn() != null) {
+        final Scheduler scheduler = operatorImpl.createOperatorScheduler();
+        operatorSpec.getScheduledFn().schedule(scheduler);
       }
 
       // Note: The key here is opImplId, which may not equal opId for some impls (e.g. PartialJoinOperatorImpl).
index 82dc0bf..b175671 100644 (file)
@@ -183,7 +183,7 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
 
   @Override
   public Collection<WindowPane<K, Object>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
-    LOG.trace("Processing timer.");
+    LOG.trace("Processing time triggers");
     List<WindowPane<K, Object>> results = new ArrayList<>();
     List<TriggerKey<K>> keys = triggerScheduler.runPendingCallbacks();
 
index 2c76e60..fb6515a 100644 (file)
@@ -20,7 +20,7 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> {
@@ -43,7 +43,7 @@ public class BroadcastOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index a5cdb82..4e640dc 100644 (file)
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FilterFunction;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -68,7 +68,7 @@ class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.filterFn instanceof TimerFunction ? (TimerFunction) this.filterFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.filterFn instanceof ScheduledFunction ? (ScheduledFunction) this.filterFn : null;
   }
 }
index a93a221..160f432 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -41,7 +41,7 @@ class FlatMapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.transformFn instanceof TimerFunction ? (TimerFunction) this.transformFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.transformFn instanceof ScheduledFunction ? (ScheduledFunction) this.transformFn : null;
   }
 }
index c49443d..1af4806 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.InputTransformer;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -99,7 +99,7 @@ public class InputOperatorSpec extends OperatorSpec<IncomingMessageEnvelope, Obj
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index 1b55784..bb6ed59 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimestampedValueSerde;
 import org.apache.samza.util.TimestampedValue;
@@ -105,8 +105,8 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
   public OperatorSpec getLeftInputOpSpec() {
index 1e2190b..6ce522f 100644 (file)
@@ -23,7 +23,7 @@ import java.util.Collection;
 import org.apache.samza.config.Config;
 import org.apache.samza.operators.functions.FlatMapFunction;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.task.TaskContext;
 
@@ -71,7 +71,7 @@ class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return this.mapFn instanceof ScheduledFunction ? (ScheduledFunction) this.mapFn : null;
   }
 }
index 987f72c..3685c5f 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import java.util.ArrayList;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -45,7 +45,7 @@ class MergeOperatorSpec<M> extends StreamOperatorSpec<M, M> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index 1e021f5..0442f7c 100644 (file)
@@ -25,7 +25,7 @@ import java.util.LinkedHashSet;
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -143,5 +143,5 @@ public abstract class OperatorSpec<M, OM> implements Serializable {
 
   abstract public WatermarkFunction getWatermarkFn();
 
-  abstract public TimerFunction getTimerFn();
+  abstract public ScheduledFunction getScheduledFn();
 }
index 40a5c0e..d6238b8 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 /**
@@ -59,7 +59,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index d6bf3d9..069c867 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.KV;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -55,10 +55,10 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
       MapFunction<? super M, ? extends K> keyFunction,
       MapFunction<? super M, ? extends V> valueFunction, String opId) {
     super(OpCode.PARTITION_BY, opId);
-    checkArgument(!(keyFunction instanceof TimerFunction || keyFunction instanceof WatermarkFunction),
-        "keyFunction for partitionBy should not implement TimerFunction or WatermarkFunction.");
-    checkArgument(!(valueFunction instanceof TimerFunction || valueFunction instanceof WatermarkFunction),
-        "valueFunction for partitionBy should not implement TimerFunction or WatermarkFunction.");
+    checkArgument(!(keyFunction instanceof ScheduledFunction || keyFunction instanceof WatermarkFunction),
+        "keyFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
+    checkArgument(!(valueFunction instanceof ScheduledFunction || valueFunction instanceof WatermarkFunction),
+        "valueFunction for partitionBy should not implement ScheduledFunction or WatermarkFunction.");
     this.outputStream = outputStream;
     this.keyFunction = keyFunction;
     this.valueFunction = valueFunction;
@@ -86,7 +86,7 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index 22f393e..bf032a2 100644 (file)
@@ -20,7 +20,7 @@ package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -58,7 +58,7 @@ public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void>
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     return null;
   }
 }
index aa0f066..91e2775 100644 (file)
@@ -19,7 +19,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 
 
@@ -57,7 +57,7 @@ public class SinkOperatorSpec<M> extends OperatorSpec<M, Void> {
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return sinkFn instanceof TimerFunction ? (TimerFunction) sinkFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return sinkFn instanceof ScheduledFunction ? (ScheduledFunction) sinkFn : null;
   }
 }
index c7735c6..1849c64 100644 (file)
@@ -19,8 +19,8 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.table.TableSpec;
 
@@ -66,8 +66,8 @@ public class StreamTableJoinOperatorSpec<K, M, R, JM> extends OperatorSpec<M, JM
   }
 
   @Override
-  public TimerFunction getTimerFn() {
-    return joinFn instanceof TimerFunction ? (TimerFunction) joinFn : null;
+  public ScheduledFunction getScheduledFn() {
+    return joinFn instanceof ScheduledFunction ? (ScheduledFunction) joinFn : null;
   }
 
 }
index 8d1ad29..ede16a5 100644 (file)
@@ -20,7 +20,7 @@
 package org.apache.samza.operators.spec;
 
 import org.apache.samza.operators.functions.FoldLeftFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
 import org.apache.samza.operators.triggers.AnyTrigger;
@@ -64,14 +64,14 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
   WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) {
     super(OpCode.WINDOW, opId);
     checkArgument(window.getInitializer() == null ||
-        !(window.getInitializer() instanceof TimerFunction || window.getInitializer() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the initializer.");
+        !(window.getInitializer() instanceof ScheduledFunction || window.getInitializer() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the initializer.");
     checkArgument(window.getKeyExtractor() == null ||
-        !(window.getKeyExtractor() instanceof TimerFunction || window.getKeyExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the keyExtractor.");
+        !(window.getKeyExtractor() instanceof ScheduledFunction || window.getKeyExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the keyExtractor.");
     checkArgument(window.getEventTimeExtractor() == null ||
-        !(window.getEventTimeExtractor() instanceof TimerFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
-        "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the eventTimeExtractor.");
+        !(window.getEventTimeExtractor() instanceof ScheduledFunction || window.getEventTimeExtractor() instanceof WatermarkFunction),
+        "A window does not accepts a user-defined ScheduledFunction or WatermarkFunction as the eventTimeExtractor.");
     this.window = window;
   }
 
@@ -88,21 +88,21 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
    * @return the default triggering interval
    */
   public long getDefaultTriggerMs() {
-    List<TimeBasedTrigger> timerTriggers = new ArrayList<>();
+    List<TimeBasedTrigger> timeBasedTriggers = new ArrayList<>();
 
     if (window.getDefaultTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getDefaultTrigger()));
     }
     if (window.getEarlyTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getEarlyTrigger()));
     }
     if (window.getLateTrigger() != null) {
-      timerTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
+      timeBasedTriggers.addAll(getTimeBasedTriggers(window.getLateTrigger()));
     }
 
-    LOG.info("Got {} timer triggers", timerTriggers.size());
+    LOG.info("Got {} time-based triggers", timeBasedTriggers.size());
 
-    List<Long> candidateDurations = timerTriggers.stream()
+    List<Long> candidateDurations = timeBasedTriggers.stream()
         .map(timeBasedTrigger -> timeBasedTrigger.getDuration().toMillis())
         .collect(Collectors.toList());
 
@@ -135,9 +135,9 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK
   }
 
   @Override
-  public TimerFunction getTimerFn() {
+  public ScheduledFunction getScheduledFn() {
     FoldLeftFunction fn = window.getFoldLeftFunction();
-    return fn instanceof TimerFunction ? (TimerFunction) fn : null;
+    return fn instanceof ScheduledFunction ? (ScheduledFunction) fn : null;
   }
 
   @Override
index 3b3e008..111869c 100644 (file)
@@ -330,7 +330,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
     COMMIT,
     PROCESS,
     END_OF_STREAM,
-    TIMER,
+    SCHEDULER,
     NO_OP
   }
 
@@ -374,10 +374,10 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         }, commitMs, commitMs, TimeUnit.MILLISECONDS);
       }
 
-      final SystemTimerScheduler timerFactory = task.context().getTimerScheduler();
-      if (timerFactory != null) {
-        timerFactory.registerListener(() -> {
-            state.needTimer();
+      final EpochTimeScheduler epochTimeScheduler = task.context().getTimerScheduler();
+      if (epochTimeScheduler != null) {
+        epochTimeScheduler.registerListener(() -> {
+            state.needScheduler();
           });
       }
     }
@@ -409,8 +409,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         case WINDOW:
           window();
           break;
-        case TIMER:
-          timer();
+        case SCHEDULER:
+          scheduler();
           break;
         case COMMIT:
           commit();
@@ -551,8 +551,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       }
     }
 
-    private void timer() {
-      state.startTimer();
+    private void scheduler() {
+      state.startScheduler();
       Runnable timerWorker = new Runnable() {
         @Override
         public void run() {
@@ -560,26 +560,26 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             ReadableCoordinator coordinator = new ReadableCoordinator(task.taskName());
 
             long startTime = clock.nanoTime();
-            task.timer(coordinator);
+            task.scheduler(coordinator);
             containerMetrics.timerNs().update(clock.nanoTime() - startTime);
 
             coordinatorRequests.update(coordinator);
-            state.doneTimer();
+            state.doneScheduler();
           } catch (Throwable t) {
-            log.error("Task {} timer failed", task.taskName(), t);
+            log.error("Task {} scheduler failed", task.taskName(), t);
             abort(t);
           } finally {
-            log.trace("Task {} timer completed", task.taskName());
+            log.trace("Task {} scheduler completed", task.taskName());
             resume();
           }
         }
       };
 
       if (threadPool != null) {
-        log.trace("Task {} timer runs on the thread pool", task.taskName());
+        log.trace("Task {} scheduler runs on the thread pool", task.taskName());
         threadPool.submit(timerWorker);
       } else {
-        log.trace("Task {} timer runs on the run loop thread", task.taskName());
+        log.trace("Task {} scheduler runs on the run loop thread", task.taskName());
         timerWorker.run();
       }
     }
@@ -655,12 +655,12 @@ public class AsyncRunLoop implements Runnable, Throttleable {
   private final class AsyncTaskState {
     private volatile boolean needWindow = false;
     private volatile boolean needCommit = false;
-    private volatile boolean needTimer = false;
+    private volatile boolean needScheduler = false;
     private volatile boolean complete = false;
     private volatile boolean endOfStream = false;
     private volatile boolean windowInFlight = false;
     private volatile boolean commitInFlight = false;
-    private volatile boolean timerInFlight = false;
+    private volatile boolean schedulerInFlight = false;
     private final AtomicInteger messagesInFlight = new AtomicInteger(0);
     private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue;
 
@@ -706,28 +706,28 @@ public class AsyncRunLoop implements Runnable, Throttleable {
         needCommit = true;
       }
 
-      boolean opInFlight = windowInFlight || commitInFlight || timerInFlight;
+      boolean opInFlight = windowInFlight || commitInFlight || schedulerInFlight;
       /*
        * A task is ready to commit, when task.commit(needCommit) is requested either by user or commit thread
        * and either of the following conditions are true.
-       * a) When process, window, commit and timer are not in progress.
+       * a) When process, window, commit and scheduler are not in progress.
        * b) When task.async.commit is true and window, commit are not in progress.
        */
       if (needCommit) {
         return (messagesInFlight.get() == 0 || isAsyncCommitEnabled) && !opInFlight;
-      } else if (needWindow || needTimer || endOfStream) {
+      } else if (needWindow || needScheduler || endOfStream) {
         /*
-         * A task is ready for window, timer or end-of-stream operation.
+         * A task is ready for window, scheduler or end-of-stream operation.
          */
         return messagesInFlight.get() == 0 && !opInFlight;
       } else {
         /*
          * A task is ready to process new message, when number of task.process calls in progress < task.max.concurrency
          * and either of the following conditions are true.
-         * a) When window, commit and timer are not in progress.
-         * b) When task.async.commit is true and window and timer are not in progress.
+         * a) When window, commit and scheduler are not in progress.
+         * b) When task.async.commit is true and window and scheduler are not in progress.
          */
-        return messagesInFlight.get() < maxConcurrency && !windowInFlight && !timerInFlight && (isAsyncCommitEnabled || !commitInFlight);
+        return messagesInFlight.get() < maxConcurrency && !windowInFlight && !schedulerInFlight && (isAsyncCommitEnabled || !commitInFlight);
       }
     }
 
@@ -741,7 +741,7 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       if (isReady()) {
         if (needCommit) return WorkerOp.COMMIT;
         else if (needWindow) return WorkerOp.WINDOW;
-        else if (needTimer) return WorkerOp.TIMER;
+        else if (needScheduler) return WorkerOp.SCHEDULER;
         else if (endOfStream && pendingEnvelopeQueue.isEmpty()) return WorkerOp.END_OF_STREAM;
         else if (!pendingEnvelopeQueue.isEmpty()) return WorkerOp.PROCESS;
       }
@@ -756,8 +756,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       needCommit = true;
     }
 
-    private void needTimer() {
-      needTimer = true;
+    private void needScheduler() {
+      needScheduler = true;
     }
 
     private void startWindow() {
@@ -775,9 +775,9 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
-    private void startTimer() {
-      needTimer = false;
-      timerInFlight = true;
+    private void startScheduler() {
+      needScheduler = false;
+      schedulerInFlight = true;
     }
 
     private void doneCommit() {
@@ -793,8 +793,8 @@ public class AsyncRunLoop implements Runnable, Throttleable {
       taskMetrics.messagesInFlight().set(count);
     }
 
-    private void doneTimer() {
-      timerInFlight = false;
+    private void doneScheduler() {
+      schedulerInFlight = false;
     }
 
     /**
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.samza.scheduler.ScheduledCallback;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -35,7 +36,7 @@ import static com.google.common.base.Preconditions.checkState;
  * 2) keeps track of the timers created and timers that are ready.
  * 3) triggers listener whenever a timer fires.
  */
-public class SystemTimerScheduler {
+public class EpochTimeScheduler {
 
   /**
    * For run loop to listen to timer firing so it can schedule the callbacks.
@@ -46,18 +47,18 @@ public class SystemTimerScheduler {
 
   private final ScheduledExecutorService executor;
   private final Map<Object, ScheduledFuture> scheduledFutures = new ConcurrentHashMap<>();
-  private final Map<TimerKey<?>, TimerCallback> readyTimers = new ConcurrentHashMap<>();
+  private final Map<TimerKey<?>, ScheduledCallback> readyTimers = new ConcurrentHashMap<>();
   private TimerListener timerListener;
 
-  public static SystemTimerScheduler create(ScheduledExecutorService executor) {
-    return new SystemTimerScheduler(executor);
+  public static EpochTimeScheduler create(ScheduledExecutorService executor) {
+    return new EpochTimeScheduler(executor);
   }
 
-  private SystemTimerScheduler(ScheduledExecutorService executor) {
+  private EpochTimeScheduler(ScheduledExecutorService executor) {
     this.executor = executor;
   }
 
-  public <K> void setTimer(K key, long timestamp, TimerCallback<K> callback) {
+  public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
     checkState(!scheduledFutures.containsKey(key),
         String.format("Duplicate key %s registration for the same timer", key));
 
@@ -84,8 +85,8 @@ public class SystemTimerScheduler {
     timerListener = listener;
   }
 
-  public Map<TimerKey<?>, TimerCallback> removeReadyTimers() {
-    final Map<TimerKey<?>, TimerCallback> timers = new TreeMap<>(readyTimers);
+  public Map<TimerKey<?>, ScheduledCallback> removeReadyTimers() {
+    final Map<TimerKey<?>, ScheduledCallback> timers = new TreeMap<>(readyTimers);
     readyTimers.keySet().removeAll(timers.keySet());
     return timers;
   }
index d85f10f..9f4fd17 100644 (file)
@@ -28,6 +28,7 @@ import org.apache.samza.config.Config
 import org.apache.samza.config.StreamConfig.Config2Stream
 import org.apache.samza.job.model.JobModel
 import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.scheduler.ScheduledCallback
 import org.apache.samza.storage.kv.KeyValueStore
 import org.apache.samza.storage.{TaskSideInputStorageManager, TaskStorageManager}
 import org.apache.samza.system._
@@ -221,12 +222,12 @@ class TaskInstance(
     }
   }
 
-  def timer(coordinator: ReadableCoordinator) {
-    trace("Timer for taskName: %s" format taskName)
+  def scheduler(coordinator: ReadableCoordinator) {
+    trace("Scheduler for taskName: %s" format taskName)
 
     exceptionHandler.maybeHandle {
       context.getTimerScheduler.removeReadyTimers().entrySet().foreach { entry =>
-        entry.getValue.asInstanceOf[TimerCallback[Any]].onTimer(entry.getKey.getKey, collector, coordinator)
+        entry.getValue.asInstanceOf[ScheduledCallback[Any]].onCallback(entry.getKey.getKey, collector, coordinator)
       }
     }
   }
index 57ae6d8..abbbd3b 100644 (file)
@@ -28,7 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import org.apache.samza.SamzaException;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.InputOperatorSpec;
 import org.apache.samza.operators.spec.OperatorSpec;
@@ -176,7 +176,7 @@ public class TestOperatorSpecGraph {
     }
 
     @Override
-    public TimerFunction getTimerFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }
index 249ff09..6d12d99 100644 (file)
@@ -28,7 +28,7 @@ import org.apache.samza.metrics.Counter;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.metrics.Timer;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.task.MessageCollector;
@@ -220,7 +220,7 @@ public class TestOperatorImpl {
     }
 
     @Override
-    public TimerFunction getTimerFn() {
+    public ScheduledFunction getScheduledFn() {
       return null;
     }
   }
index a34fdc3..454a661 100644 (file)
@@ -28,7 +28,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.samza.operators.OperatorSpecGraph;
 import org.apache.samza.operators.TableImpl;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.SerializableSerde;
 import org.apache.samza.table.TableSpec;
@@ -71,7 +71,7 @@ public class OperatorSpecTestUtils {
     assertNotEquals(oOpSpec, nOpSpec);
     assertEquals(oOpSpec.getOpId(), nOpSpec.getOpId());
     assertEquals(oOpSpec.getOpCode(), nOpSpec.getOpCode());
-    assertTimerFnsNotEqual(oOpSpec.getTimerFn(), nOpSpec.getTimerFn());
+    assertScheduledFnsNotEqual(oOpSpec.getScheduledFn(), nOpSpec.getScheduledFn());
     assertWatermarkFnNotEqual(nOpSpec.getWatermarkFn(), nOpSpec.getWatermarkFn());
     assertAllOperators(oOpSpec.getRegisteredOperatorSpecs(), nOpSpec.getRegisteredOperatorSpecs());
   }
@@ -83,11 +83,11 @@ public class OperatorSpecTestUtils {
     assertNotEquals(watermarkFn, watermarkFn1);
   }
 
-  private static void assertTimerFnsNotEqual(TimerFunction timerFn, TimerFunction timerFn1) {
-    if (timerFn == timerFn1 && timerFn == null) {
+  private static void assertScheduledFnsNotEqual(ScheduledFunction scheduledFn, ScheduledFunction scheduledFn1) {
+    if (scheduledFn == scheduledFn1 && scheduledFn == null) {
       return;
     }
-    assertNotEquals(timerFn, timerFn1);
+    assertNotEquals(scheduledFn, scheduledFn1);
   }
 
   private static void assertClonedTables(Map<TableSpec, TableImpl> originalTables, Map<TableSpec, TableImpl> clonedTables) {
index a9ccd12..860e630 100644 (file)
@@ -24,7 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.operators.KV;
-import org.apache.samza.operators.TimerRegistry;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.data.TestMessageEnvelope;
 import org.apache.samza.operators.data.TestOutputMessageEnvelope;
 import org.apache.samza.operators.functions.FilterFunction;
@@ -33,7 +33,7 @@ import org.apache.samza.operators.functions.JoinFunction;
 import org.apache.samza.operators.functions.MapFunction;
 import org.apache.samza.operators.functions.SinkFunction;
 import org.apache.samza.operators.functions.StreamTableJoinFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.serializers.KVSerde;
@@ -74,7 +74,8 @@ public class TestOperatorSpec {
     }
   }
 
-  private static class MapWithTimerFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>, TimerFunction<String, TestOutputMessageEnvelope> {
+  private static class MapWithScheduledFn implements MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope>,
+                                                     ScheduledFunction<String, TestOutputMessageEnvelope> {
 
     @Override
     public TestOutputMessageEnvelope apply(TestMessageEnvelope m) {
@@ -82,12 +83,12 @@ public class TestOperatorSpec {
     }
 
     @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
+    public void schedule(Scheduler<String> scheduler) {
 
     }
 
     @Override
-    public Collection<TestOutputMessageEnvelope> onTimer(String key, long timestamp) {
+    public Collection<TestOutputMessageEnvelope> onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -164,8 +165,8 @@ public class TestOperatorSpec {
     assertTrue(cloneOperatorSpec.getTransformFn() instanceof FlatMapFunction);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -187,8 +188,8 @@ public class TestOperatorSpec {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -209,8 +210,8 @@ public class TestOperatorSpec {
     assertNotEquals(userFn, clonedUserFn);
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
@@ -360,13 +361,13 @@ public class TestOperatorSpec {
     assertEquals(streamOperatorSpec.getWatermarkFn(), testMapFn);
     assertNotNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertNull(streamOperatorSpec.getTimerFn());
-    assertNull(cloneOperatorSpec.getTimerFn());
+    assertNull(streamOperatorSpec.getScheduledFn());
+    assertNull(cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
-  public void testMapStreamOperatorSpecWithTimer() {
-    MapWithTimerFn testMapFn = new MapWithTimerFn();
+  public void testMapStreamOperatorSpecWithScheduledFunction() {
+    MapWithScheduledFn testMapFn = new MapWithScheduledFn();
 
     StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> streamOperatorSpec =
         OperatorSpecs.createMapOperatorSpec(testMapFn, "op0");
@@ -378,9 +379,9 @@ public class TestOperatorSpec {
     assertNull(streamOperatorSpec.getWatermarkFn());
     assertNull(cloneOperatorSpec.getWatermarkFn());
     assertNotEquals(cloneOperatorSpec.getTransformFn(), cloneOperatorSpec.getWatermarkFn());
-    assertEquals(streamOperatorSpec.getTimerFn(), testMapFn);
-    assertNotNull(cloneOperatorSpec.getTimerFn());
-    assertNotEquals(streamOperatorSpec.getTimerFn(), cloneOperatorSpec.getTimerFn());
+    assertEquals(streamOperatorSpec.getScheduledFn(), testMapFn);
+    assertNotNull(cloneOperatorSpec.getScheduledFn());
+    assertNotEquals(streamOperatorSpec.getScheduledFn(), cloneOperatorSpec.getScheduledFn());
   }
 
   @Test
index db7079c..1d98580 100644 (file)
@@ -23,13 +23,13 @@ import java.util.Map;
 import org.apache.samza.application.StreamApplicationDescriptorImpl;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.OperatorSpecGraph;
-import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.descriptors.GenericInputDescriptor;
 import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
@@ -58,7 +58,7 @@ public class TestPartitionByOperatorSpec {
   private final String testJobId = "1";
   private final String testRepartitionedStreamName = "parByKey";
 
-  class TimerMapFn implements MapFunction<Object, String>, TimerFunction<String, Object> {
+  class ScheduledMapFn implements MapFunction<Object, String>, ScheduledFunction<String, Object> {
 
     @Override
     public String apply(Object message) {
@@ -66,12 +66,12 @@ public class TestPartitionByOperatorSpec {
     }
 
     @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
+    public void schedule(Scheduler<String> scheduler) {
 
     }
 
     @Override
-    public Collection<Object> onTimer(String key, long timestamp) {
+    public Collection<Object> onCallback(String key, long timestamp) {
       return null;
     }
   }
@@ -117,7 +117,7 @@ public class TestPartitionByOperatorSpec {
     assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getTimerFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = inputOpSpecs.get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -126,7 +126,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getTimerFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -144,7 +144,7 @@ public class TestPartitionByOperatorSpec {
     assertNull(inputOpSpec.getKeySerde());
     assertNull(inputOpSpec.getValueSerde());
     assertTrue(inputOpSpec.isKeyed());
-    assertNull(inputOpSpec.getTimerFn());
+    assertNull(inputOpSpec.getScheduledFn());
     assertNull(inputOpSpec.getWatermarkFn());
     InputOperatorSpec originInputSpec = streamAppDesc.getInputOperators().get(testinputDescriptor.getStreamId());
     assertTrue(originInputSpec.getRegisteredOperatorSpecs().toArray()[0] instanceof PartitionByOperatorSpec);
@@ -153,7 +153,7 @@ public class TestPartitionByOperatorSpec {
     assertEquals(reparOpSpec.getKeyFunction(), keyFn);
     assertEquals(reparOpSpec.getValueFunction(), valueFn);
     assertEquals(reparOpSpec.getOutputStream().getStreamId(), reparOpSpec.getOpId());
-    assertNull(reparOpSpec.getTimerFn());
+    assertNull(reparOpSpec.getScheduledFn());
     assertNull(reparOpSpec.getWatermarkFn());
   }
 
@@ -169,8 +169,8 @@ public class TestPartitionByOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testTimerFunctionAsKeyFn() {
-    TimerMapFn keyFn = new TimerMapFn();
+  public void testScheduledFunctionAsKeyFn() {
+    ScheduledMapFn keyFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(keyFn, m -> m, "parByKey");
@@ -187,8 +187,8 @@ public class TestPartitionByOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testTimerFunctionAsValueFn() {
-    TimerMapFn valueFn = new TimerMapFn();
+  public void testScheduledFunctionAsValueFn() {
+    ScheduledMapFn valueFn = new ScheduledMapFn();
     new StreamApplicationDescriptorImpl(appDesc -> {
         MessageStream<Object> inputStream = appDesc.getInputStream(testinputDescriptor);
         inputStream.partitionBy(m -> m.toString(), valueFn, "parByKey");
index 0a2214b..41973b2 100644 (file)
@@ -19,8 +19,8 @@
 
 package org.apache.samza.operators.spec;
 
-import org.apache.samza.operators.TimerRegistry;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.Scheduler;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.serializers.Serde;
 import org.apache.samza.operators.functions.FoldLeftFunction;
@@ -90,8 +90,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsInitializer() {
-    class TimedSupplierFunction implements SupplierFunction<Collection>, TimerFunction<Object, Collection> {
+  public void testIllegalScheduledFunctionAsInitializer() {
+    class TimedSupplierFunction implements SupplierFunction<Collection>, ScheduledFunction<Object, Collection> {
 
       @Override
       public Collection get() {
@@ -99,12 +99,12 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Collection> onTimer(Object key, long timestamp) {
+      public Collection<Collection> onCallback(Object key, long timestamp) {
         return null;
       }
     }
@@ -138,8 +138,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsKeyFn() {
-    class TimerMapFunction implements MapFunction<Object, Object>, TimerFunction<Object, Object> {
+  public void testIllegalScheduledFunctionAsKeyFn() {
+    class ScheduledMapFunction implements MapFunction<Object, Object>, ScheduledFunction<Object, Object> {
 
       @Override
       public Object apply(Object message) {
@@ -147,16 +147,16 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Object> onTimer(Object key, long timestamp) {
+      public Collection<Object> onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    keyFn = new TimerMapFunction();
+    keyFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -186,8 +186,8 @@ public class TestWindowOperatorSpec {
   }
 
   @Test(expected = IllegalArgumentException.class)
-  public void testIllegalTimerFunctionAsEventTimeFn() {
-    class TimerMapFunction implements MapFunction<Object, Long>, TimerFunction<Object, Object> {
+  public void testIllegalScheduledFunctionAsEventTimeFn() {
+    class ScheduledMapFunction implements MapFunction<Object, Long>, ScheduledFunction<Object, Object> {
 
       @Override
       public Long apply(Object message) {
@@ -195,16 +195,16 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Object> onTimer(Object key, long timestamp) {
+      public Collection<Object> onCallback(Object key, long timestamp) {
         return null;
       }
     }
-    timeFn = new TimerMapFunction();
+    timeFn = new ScheduledMapFunction();
 
     getWindowOperatorSpec("w0");
   }
@@ -234,8 +234,9 @@ public class TestWindowOperatorSpec {
   }
 
   @Test
-  public void testTimerFunctionAsFoldLeftFn() {
-    class TimerFoldLeftFunction implements FoldLeftFunction<Object, Collection>, TimerFunction<Object, Collection> {
+  public void testScheduledFunctionAsFoldLeftFn() {
+    class ScheduledFoldLeftFunction
+        implements FoldLeftFunction<Object, Collection>, ScheduledFunction<Object, Collection> {
 
       @Override
       public Collection apply(Object message, Collection oldValue) {
@@ -244,19 +245,19 @@ public class TestWindowOperatorSpec {
       }
 
       @Override
-      public void registerTimer(TimerRegistry<Object> timerRegistry) {
+      public void schedule(Scheduler<Object> scheduler) {
 
       }
 
       @Override
-      public Collection<Collection> onTimer(Object key, long timestamp) {
+      public Collection<Collection> onCallback(Object key, long timestamp) {
         return null;
       }
     }
 
-    foldFn = new TimerFoldLeftFunction();
+    foldFn = new ScheduledFoldLeftFunction();
     WindowOperatorSpec<Object, Object, Collection> windowSpec = getWindowOperatorSpec("w0");
-    assertEquals(windowSpec.getTimerFn(), foldFn);
+    assertEquals(windowSpec.getScheduledFn(), foldFn);
     assertNull(windowSpec.getWatermarkFn());
   }
 
@@ -284,7 +285,7 @@ public class TestWindowOperatorSpec {
     foldFn = new WatermarkFoldLeftFunction();
     WindowOperatorSpec<Object, Object, Collection> windowSpec = getWindowOperatorSpec("w0");
     assertEquals(windowSpec.getWatermarkFn(), foldFn);
-    assertNull(windowSpec.getTimerFn());
+    assertNull(windowSpec.getScheduledFn());
   }
 
   @Test
@@ -36,7 +36,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class TestSystemTimerScheduler {
+public class TestEpochTimeScheduler {
 
   private ScheduledExecutorService createExecutorService() {
     ScheduledExecutorService service = mock(ScheduledExecutorService.class);
@@ -49,15 +49,15 @@ public class TestSystemTimerScheduler {
     return service;
   }
 
-  private void fireTimers(SystemTimerScheduler factory) {
+  private void fireTimers(EpochTimeScheduler factory) {
     factory.removeReadyTimers().entrySet().forEach(entry -> {
-        entry.getValue().onTimer(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class));
+        entry.getValue().onCallback(entry.getKey().getKey(), mock(MessageCollector.class), mock(TaskCoordinator.class));
       });
   }
 
   @Test
   public void testSingleTimer() {
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService());
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     List<String> results = new ArrayList<>();
     scheduler.setTimer("single-timer", 1, (key, collector, coordinator) -> {
         results.add(key);
@@ -71,7 +71,7 @@ public class TestSystemTimerScheduler {
 
   @Test
   public void testMultipleTimers() {
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService());
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     List<String> results = new ArrayList<>();
     scheduler.setTimer("multiple-timer-3", 3, (key, collector, coordinator) -> {
         results.add(key + ":3");
@@ -97,7 +97,7 @@ public class TestSystemTimerScheduler {
     Object key2 = new Object();
     List<String> results = new ArrayList<>();
 
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService());
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     scheduler.setTimer(key1, 2, (key, collector, coordinator) -> {
         assertEquals(key, key1);
         results.add("key1:2");
@@ -120,7 +120,7 @@ public class TestSystemTimerScheduler {
     Long key2 = Long.MAX_VALUE;
     List<String> results = new ArrayList<>();
 
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService());
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     scheduler.setTimer(key1, 1, (key, collector, coordinator) -> {
         assertEquals(key, key1);
         results.add("key:1");
@@ -144,7 +144,7 @@ public class TestSystemTimerScheduler {
     when(future.cancel(anyBoolean())).thenReturn(true);
     when(service.schedule((Runnable) anyObject(), anyLong(), anyObject())).thenAnswer(invocation -> future);
 
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(service);
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(service);
     List<String> results = new ArrayList<>();
     scheduler.setTimer("timer", 1, (key, collector, coordinator) -> {
         results.add(key);
@@ -160,7 +160,7 @@ public class TestSystemTimerScheduler {
 
   @Test
   public void testTimerListener() {
-    SystemTimerScheduler scheduler = SystemTimerScheduler.create(createExecutorService());
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     List<String> results = new ArrayList<>();
     scheduler.registerListener(() -> {
         results.add("timer-listener");
index 1acfc47..3046c1f 100644 (file)
@@ -39,7 +39,7 @@ import org.apache.samza.container.TaskName;
 import org.apache.samza.operators.MessageStream;
 import org.apache.samza.operators.MessageStreamImpl;
 import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.operators.functions.WatermarkFunction;
 import org.apache.samza.operators.spec.OperatorSpec;
 import org.apache.samza.operators.spec.StreamOperatorSpec;
@@ -192,7 +192,7 @@ public class TestProjectTranslator extends TranslatorTestBase {
       }
 
       @Override
-      public TimerFunction getTimerFn() {
+      public ScheduledFunction getScheduledFn() {
         return null;
       }
     };
@@ -26,9 +26,9 @@ import org.apache.samza.config.JobCoordinatorConfig;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.apache.samza.test.framework.TestTimerApp.*;
+import static org.apache.samza.test.framework.TestSchedulingApp.*;
 
-public class TimerTest extends StreamApplicationIntegrationTestHarness {
+public class SchedulingTest extends StreamApplicationIntegrationTestHarness {
 
   @Before
   public void setup() {
@@ -55,6 +55,6 @@ public class TimerTest extends StreamApplicationIntegrationTestHarness {
     configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
 
-    runApplication(new TestTimerApp(), "TimerTest", configs);
+    runApplication(new TestSchedulingApp(), "SchedulingTest", configs);
   }
 }
@@ -26,16 +26,16 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.application.StreamApplicationDescriptor;
+import org.apache.samza.operators.Scheduler;
 import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TimerRegistry;
 import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.TimerFunction;
+import org.apache.samza.operators.functions.ScheduledFunction;
 import org.apache.samza.serializers.JsonSerdeV2;
 import org.apache.samza.system.kafka.KafkaInputDescriptor;
 import org.apache.samza.system.kafka.KafkaSystemDescriptor;
 import org.apache.samza.test.operator.data.PageView;
 
-public class TestTimerApp implements StreamApplication {
+public class TestSchedulingApp implements StreamApplication {
   public static final String PAGE_VIEWS = "page-views";
 
   @Override
@@ -44,9 +44,9 @@ public class TestTimerApp implements StreamApplication {
     KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka");
     KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor(PAGE_VIEWS, serde);
     final MessageStream<PageView> pageViews = appDesc.getInputStream(isd);
-    final MessageStream<PageView> output = pageViews.flatMap(new FlatmapTimerFn());
+    final MessageStream<PageView> output = pageViews.flatMap(new FlatmapScheduledFn());
 
-    MessageStreamAssert.that("Output from timer function should container all complete messages", output, serde)
+    MessageStreamAssert.that("Output from scheduling function should container all complete messages", output, serde)
         .containsInAnyOrder(
             Arrays.asList(
                 new PageView("v1-complete", "p1", "u1"),
@@ -56,14 +56,15 @@ public class TestTimerApp implements StreamApplication {
             ));
   }
 
-  private static class FlatmapTimerFn implements FlatMapFunction<PageView, PageView>, TimerFunction<String, PageView> {
+  private static class FlatmapScheduledFn
+      implements FlatMapFunction<PageView, PageView>, ScheduledFunction<String, PageView> {
 
     private transient List<PageView> pageViews;
-    private transient TimerRegistry<String> timerRegistry;
+    private transient Scheduler<String> scheduler;
 
     @Override
-    public void registerTimer(TimerRegistry<String> timerRegistry) {
-      this.timerRegistry = timerRegistry;
+    public void schedule(Scheduler<String> scheduler) {
+      this.scheduler = scheduler;
       this.pageViews = new ArrayList<>();
     }
 
@@ -75,13 +76,13 @@ public class TestTimerApp implements StreamApplication {
       if (pageViews.size() == 2) {
         //got all messages for this task
         final long time = System.currentTimeMillis() + 100;
-        timerRegistry.register("CompleteTimer", time);
+        scheduler.schedule("CompleteScheduler", time);
       }
       return Collections.emptyList();
     }
 
     @Override
-    public Collection<PageView> onTimer(String key, long time) {
+    public Collection<PageView> onCallback(String key, long time) {
       return pageViews;
     }
   }