SAMZA-1858: Public APIs for shared context
authorCameron Lee <calee@linkedin.com>
Mon, 1 Oct 2018 22:45:38 +0000 (15:45 -0700)
committerPrateek Maheshwari <pmaheshwari@apache.org>
Mon, 1 Oct 2018 22:45:38 +0000 (15:45 -0700)
This is just the API classes and their implementations, in an attempt to keep individual PRs more manageable. An upcoming PR will actually integrate them and remove the classes that these are intended to replace.
SEP: https://cwiki.apache.org/confluence/display/SAMZA/SEP-15%3A+New+Runtime+Context+API

Author: Cameron Lee <calee@linkedin.com>

Reviewers: Yi Pan <nickpan47@gmail.com>, Prateek Maheshwari <pmaheshwari@apache.org>, Shanthoosh Venkatraman <svenkatr@linkedin.com>

Closes #626 from cameronlee314/shared_context_apis

21 files changed:
samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/ContainerContext.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/Context.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/JobContext.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/context/TaskContext.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/job/model/ContainerModel.java [moved from samza-core/src/main/java/org/apache/samza/job/model/ContainerModel.java with 81% similarity]
samza-api/src/main/java/org/apache/samza/job/model/TaskModel.java [moved from samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java with 84% similarity]
samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java [new file with mode: 0644]
samza-api/src/main/java/org/apache/samza/task/TaskContext.java
samza-core/src/main/java/org/apache/samza/container/TaskContextImpl.java
samza-core/src/main/java/org/apache/samza/context/ContainerContextImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/context/ContextImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java [new file with mode: 0644]
samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java [new file with mode: 0644]
samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java [new file with mode: 0644]

diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContext.java
new file mode 100644 (file)
index 0000000..08e0b60
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+/**
+ * An application should implement this to contain any runtime objects required by processing logic which can be shared
+ * across all tasks in a container. A single instance of this will be created in each container.
+ * <p>
+ * This needs to be created by an implementation of {@link ApplicationContainerContextFactory}. The factory should
+ * create the runtime objects contained within this context.
+ * <p>
+ * If it is necessary to have a separate instance per task, then use {@link ApplicationTaskContext} instead.
+ * <p>
+ * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ */
+public interface ApplicationContainerContext {
+  /**
+   * Lifecycle logic which will run after tasks in the container are initialized but before processing begins.
+   * <p>
+   * If this throws an exception, then the container will fail to start.
+   */
+  void start();
+
+  /**
+   * Lifecycle logic which will run after processing ends but before tasks in the container are closed.
+   * <p>
+   * If this throws an exception, then the container will fail to fully shut down.
+   */
+  void stop();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationContainerContextFactory.java
new file mode 100644 (file)
index 0000000..fbc2eef
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import java.io.Serializable;
+
+
+/**
+ * An application should implement this if it has a {@link ApplicationContainerContext} that is needed for
+ * initialization.
+ * <p>
+ * This will be called to create an instance of {@link ApplicationContainerContext} during the container initialization
+ * stage. At that stage, the framework-provided job-level and container-level contexts are available for creating the
+ * {@link ApplicationContainerContext}.
+ * <p>
+ * This is {@link Serializable} because it is specified in {@link org.apache.samza.application.ApplicationDescriptor}.
+ * @param <T> concrete type of {@link ApplicationContainerContext} returned by this factory
+ */
+public interface ApplicationContainerContextFactory<T extends ApplicationContainerContext> extends Serializable {
+  /**
+   * Create an instance of the application-defined {@link ApplicationContainerContext}.
+   *
+   * @param jobContext framework-provided job context used for building {@link ApplicationContainerContext}
+   * @param containerContext framework-provided container context used for building {@link ApplicationContainerContext}
+   * @return new instance of the application-defined {@link ApplicationContainerContext}
+   */
+  T create(JobContext jobContext, ContainerContext containerContext);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContext.java
new file mode 100644 (file)
index 0000000..ffc5383
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+/**
+ * An application should implement this to contain any runtime objects required by processing logic which cannot be
+ * shared across tasks. A new instance of this will be created for each task.
+ * <p>
+ * This needs to be created by an implementation of {@link ApplicationTaskContextFactory}. The factory should create
+ * the runtime objects contained within this context.
+ * <p>
+ * If it is possible to share an instance of this across tasks in a container, then use
+ * {@link ApplicationContainerContext} instead.
+ * <p>
+ * This class does not need to be {@link java.io.Serializable} and instances are not persisted across deployments.
+ */
+public interface ApplicationTaskContext {
+  /**
+   * Lifecycle logic which will run after tasks are initialized but before processing begins.
+   * <p>
+   * If this throws an exception, then the container will fail to start.
+   */
+  void start();
+
+  /**
+   * Lifecycle logic which will run after processing ends but before tasks are closed.
+   * <p>
+   * If this throws an exception, then the container will fail to fully shut down.
+   */
+  void stop();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java b/samza-api/src/main/java/org/apache/samza/context/ApplicationTaskContextFactory.java
new file mode 100644 (file)
index 0000000..af9ad68
--- /dev/null
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import java.io.Serializable;
+
+
+/**
+ * An application should implement this if it has a {@link ApplicationTaskContext} that is needed for
+ * initialization. This will be used to create instance(s) of that {@link ApplicationTaskContext}.
+ * <p>
+ * This will be called to create an instance of {@link ApplicationTaskContext} during the initialization stage of each
+ * task. At that stage, the framework-provided job-level, container-level, and task-level contexts are available for
+ * creating the {@link ApplicationTaskContext}. Also, the application-defined container-level context is available.
+ * <p>
+ * This is {@link Serializable} because it is specified in {@link org.apache.samza.application.ApplicationDescriptor}.
+ * @param <T> concrete type of {@link ApplicationTaskContext} returned by this factory
+ */
+public interface ApplicationTaskContextFactory<T extends ApplicationTaskContext> extends Serializable {
+  /**
+   * Create an instance of the application-defined {@link ApplicationTaskContext}.
+   *
+   * @param jobContext framework-provided job context used for building {@link ApplicationTaskContext}
+   * @param containerContext framework-provided container context used for building {@link ApplicationTaskContext}
+   * @param taskContext framework-provided task context used for building {@link ApplicationTaskContext}
+   * @param applicationContainerContext application-provided container context used for building
+   * {@link ApplicationTaskContext}
+   * @return new instance of the application-defined {@link ApplicationContainerContext}
+   */
+  T create(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
+      ApplicationContainerContext applicationContainerContext);
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java b/samza-api/src/main/java/org/apache/samza/context/ContainerContext.java
new file mode 100644 (file)
index 0000000..51d7918
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+/**
+ * Contains information at container granularity, provided by the Samza framework, to be used to instantiate an
+ * application at runtime.
+ * <p>
+ * Note that application-defined container-level context is accessible through
+ * {@link ApplicationContainerContext}.
+ */
+public interface ContainerContext {
+  /**
+   * Returns the {@link ContainerModel} associated with this container. This contains information like the id and the
+   * associated {@link org.apache.samza.job.model.TaskModel}s.
+   * @return {@link ContainerModel} associated with this container
+   */
+  ContainerModel getContainerModel();
+
+  /**
+   * Returns the {@link MetricsRegistry} for this container. Metrics built using this registry will be associated with
+   * the container.
+   * @return {@link MetricsRegistry} for this container
+   */
+  MetricsRegistry getContainerMetricsRegistry();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/Context.java b/samza-api/src/main/java/org/apache/samza/context/Context.java
new file mode 100644 (file)
index 0000000..bfe66d3
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+/**
+ * Container object for all context provided to instantiate an application at runtime.
+ */
+public interface Context {
+  /**
+   * Returns the framework-provided context for the overall job that is being run.
+   * @return framework-provided job context
+   */
+  JobContext getJobContext();
+
+  /**
+   * Returns the framework-provided context for the container that this is in.
+   * <p>
+   * Note that this is not the application-defined container context. Use
+   * {@link Context#getApplicationContainerContext()} to get the application-defined container context.
+   * @return framework-provided container context
+   */
+  ContainerContext getContainerContext();
+
+  /**
+   * Returns the framework-provided context for the task that that this is in.
+   * <p>
+   * Note that this is not the application-defined task context. Use {@link Context#getApplicationTaskContext()}
+   * to get the application-defined task context.
+   * @return framework-provided task context
+   */
+  TaskContext getTaskContext();
+
+  /**
+   * Returns the application-defined container context object specified by the
+   * {@link ApplicationContainerContextFactory}. This is shared across all tasks in the container, but not across
+   * containers.
+   * <p>
+   * In order to use this in application code, it should be casted to the concrete type that corresponds to the
+   * {@link ApplicationContainerContextFactory}.
+   * <p>
+   * Note that this is not the framework-provided container context. Use {@link Context#getContainerContext()} to get
+   * the framework-provided container context.
+   * @return application-defined container context
+   * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+   */
+  ApplicationContainerContext getApplicationContainerContext();
+
+  /**
+   * Returns the application-defined task context object specified by the {@link ApplicationTaskContextFactory}.
+   * Each task will have a separate instance of this.
+   * <p>
+   * In order to use this in application code, it should be casted to the concrete type that corresponds to the
+   * {@link ApplicationTaskContextFactory}.
+   * <p>
+   * Note that this is not the framework-provided task context. Use {@link Context#getTaskContext()} to get the
+   * framework-provided task context.
+   * @return application-defined task context
+   * @throws IllegalStateException if no context could be built (e.g. no factory provided)
+   */
+  ApplicationTaskContext getApplicationTaskContext();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/JobContext.java b/samza-api/src/main/java/org/apache/samza/context/JobContext.java
new file mode 100644 (file)
index 0000000..9b09fa9
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.apache.samza.config.Config;
+
+
+/**
+ * Contains information at job granularity, provided by the Samza framework, to be used to instantiate an application at
+ * runtime.
+ */
+public interface JobContext {
+  /**
+   * Returns the final configuration for this job.
+   * @return configuration for this job
+   */
+  Config getConfig();
+
+  /**
+   * Returns the name of the job.
+   * @return name of the job
+   */
+  String getJobName();
+
+  /**
+   * Returns the instance id for this instance of this job.
+   * @return instance id for the job
+   */
+  String getJobId();
+}
diff --git a/samza-api/src/main/java/org/apache/samza/context/TaskContext.java b/samza-api/src/main/java/org/apache/samza/context/TaskContext.java
new file mode 100644 (file)
index 0000000..d29f6a5
--- /dev/null
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.scheduler.CallbackScheduler;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
+
+
+/**
+ * Contains information at task granularity, provided by the Samza framework, to be used to instantiate an application
+ * at runtime.
+ * <p>
+ * Note that application-defined task-level context is accessible through {@link ApplicationTaskContext}.
+ */
+public interface TaskContext {
+  /**
+   * Returns the {@link TaskModel} associated with this task. This contains information like the task name and
+   * associated {@link SystemStreamPartition}s.
+   * @return {@link TaskModel} associated with this task
+   */
+  TaskModel getTaskModel();
+
+  /**
+   * Returns the {@link MetricsRegistry} for this task. Metrics built using this registry will be associated with the
+   * task.
+   * @return {@link MetricsRegistry} for this task
+   */
+  MetricsRegistry getTaskMetricsRegistry();
+
+  /**
+   * Returns the {@link KeyValueStore} corresponding to the {@code storeName}. In application code, it is recommended to
+   * cast the resulting stores to {@link KeyValueStore}s with the correct concrete type parameters.
+   * @param storeName name of the {@link KeyValueStore} to get
+   * @return {@link KeyValueStore} corresponding to the {@code storeName}
+   * @throws IllegalArgumentException if there is no store associated with {@code storeName}
+   */
+  KeyValueStore<?, ?> getStore(String storeName);
+
+  /**
+   * Returns the {@link Table} corresponding to the {@code tableId}. In application code, it is recommended to cast this
+   * to the resulting tables to {@link Table}s with the correct concrete type parameters.
+   * @param tableId id of the {@link Table} to get
+   * @return {@link Table} corresponding to the {@code tableId}
+   * @throws IllegalArgumentException if there is no table associated with {@code tableId}
+   */
+  Table<?> getTable(String tableId);
+
+  /**
+   * Returns a task-level {@link CallbackScheduler} which can be used to delay execution of some logic.
+   * @return {@link CallbackScheduler} for this task
+   */
+  CallbackScheduler getCallbackScheduler();
+
+  /**
+   * Set the starting offset for the given {@link SystemStreamPartition}. Offsets can only be set for a
+   * {@link SystemStreamPartition} assigned to this task. The {@link SystemStreamPartition}s assigned to this task can
+   * be accessed through {@link TaskModel#getSystemStreamPartitions()} for the {@link TaskModel} obtained by calling
+   * {@link #getTaskModel()}. Trying to set the offset for any other partition will have no effect.
+   *
+   * NOTE: this feature is experimental, and the API may change in a future release.
+   *
+   * @param systemStreamPartition {@link org.apache.samza.system.SystemStreamPartition} whose offset should be set
+   * @param offset to set for the given {@link org.apache.samza.system.SystemStreamPartition}
+   */
+  @InterfaceStability.Evolving
+  void setStartingOffset(SystemStreamPartition systemStreamPartition, String offset);
+}
\ No newline at end of file
@@ -24,20 +24,9 @@ import java.util.Map;
 import org.apache.samza.container.TaskName;
 
 /**
+ * This contains metadata about a Samza container, such as which tasks a Samza container should process.
  * <p>
- * The data model is used to define which TaskModels a SamzaContainer should
- * process. The model is used in the job coordinator and SamzaContainer to
- * determine how to execute Samza jobs.
- * </p>
- *
- * <p>
- * The hierarchy for a Samza's job data model is that jobs have containers, and
- * containers have tasks. Each data model contains relevant information, such as
- * an id, partition information, etc.
- * </p>
- * <p>
- * <b>Note</b>: This class has a natural ordering that is inconsistent with equals.
- * </p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and containers have tasks.
  */
 public class ContainerModel {
   private final String id;
@@ -48,10 +37,19 @@ public class ContainerModel {
     this.tasks = Collections.unmodifiableMap(tasks);
   }
 
+  /**
+   * Returns the id for the container associated with this model.
+   * @return id for the container
+   */
   public String getId() {
     return id;
   }
 
+  /**
+   * Returns a map for all tasks in this container. The keys are the names of the tasks in the container and the values
+   * are the corresponding {@link TaskModel}s.
+   * @return map from {@link TaskName} to {@link TaskModel}
+   */
   public Map<TaskName, TaskModel> getTasks() {
     return tasks;
   }
@@ -27,16 +27,9 @@ import org.apache.samza.system.SystemStreamPartition;
 
 
 /**
+ * This contains metadata about a Samza task, such as the stream partitions that it is consuming.
  * <p>
- * The data model used to represent a task. The model is used in the job
- * coordinator and SamzaContainer to determine how to execute Samza jobs.
- * </p>
- *
- * <p>
- * The hierarchy for a Samza's job data model is that jobs have containers, and
- * containers have tasks. Each data model contains relevant information, such as
- * an id, partition information, etc.
- * </p>
+ * The hierarchy for a Samza's job data model is that jobs have containers, and containers have tasks.
  */
 public class TaskModel implements Comparable<TaskModel> {
   private final TaskName taskName;
@@ -49,14 +42,26 @@ public class TaskModel implements Comparable<TaskModel> {
     this.changelogPartition = changelogPartition;
   }
 
+  /**
+   * Returns the name of the task.
+   * @return name of the task
+   */
   public TaskName getTaskName() {
     return taskName;
   }
 
+  /**
+   * Returns the {@link SystemStreamPartition}s that this task is responsible for consuming.
+   * @return {@link SystemStreamPartition}s for this task
+   */
   public Set<SystemStreamPartition> getSystemStreamPartitions() {
     return systemStreamPartitions;
   }
 
+  /**
+   * Returns the {@link Partition} used for all changelogs for this task.
+   * @return changelog partition for this task
+   */
   public Partition getChangelogPartition() {
     return changelogPartition;
   }
diff --git a/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java b/samza-api/src/main/java/org/apache/samza/scheduler/CallbackScheduler.java
new file mode 100644 (file)
index 0000000..e230304
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.scheduler;
+
+/**
+ * Provides a way for applications to register some logic to be executed at a future time.
+ */
+public interface CallbackScheduler {
+  /**
+   * Schedule the {@code callback} for the provided {@code key} to be invoked at epoch-time {@code timestamp}.
+   * The callback will be invoked exclusively with any other operations for this task, e.g. processing, windowing, and
+   * commit.
+   * @param key callback key
+   * @param timestamp epoch time when the callback will be fired, in milliseconds
+   * @param callback callback to run
+   * @param <K> type of the key
+   */
+  <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback);
+
+  /**
+   * Delete the scheduled {@code callback} for the {@code key}.
+   * Deletion only happens if the callback hasn't been fired. Otherwise it will not interrupt.
+   * @param key callback key
+   * @param <K> type of the key
+   */
+  <K> void deleteCallback(K key);
+}
index eccedba..007028a 100644 (file)
@@ -32,6 +32,7 @@ import org.apache.samza.table.Table;
 /**
  * A TaskContext provides resources about the {@link org.apache.samza.task.StreamTask}, particularly during
  * initialization in an {@link org.apache.samza.task.InitableTask}.
+ * TODO this will be replaced by {@link org.apache.samza.context.TaskContext} in the near future by SAMZA-1714
  */
 public interface TaskContext {
   MetricsRegistry getMetricsRegistry();
index bea6373..25ffe8f 100644 (file)
@@ -40,6 +40,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 
+
+/**
+ * TODO this will be replaced by {@link org.apache.samza.context.TaskContextImpl} in the near future by SAMZA-1714
+ */
 public class TaskContextImpl implements TaskContext {
   private static final Logger LOG = LoggerFactory.getLogger(TaskContextImpl.class);
 
diff --git a/samza-core/src/main/java/org/apache/samza/context/ContainerContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/ContainerContextImpl.java
new file mode 100644 (file)
index 0000000..1fc2ddd
--- /dev/null
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.metrics.MetricsRegistry;
+
+
+public class ContainerContextImpl implements ContainerContext {
+  private final ContainerModel containerModel;
+  private final MetricsRegistry containerMetricsRegistry;
+
+  public ContainerContextImpl(ContainerModel containerModel, MetricsRegistry containerMetricsRegistry) {
+    this.containerModel = containerModel;
+    this.containerMetricsRegistry = containerMetricsRegistry;
+  }
+
+  @Override
+  public ContainerModel getContainerModel() {
+    return this.containerModel;
+  }
+
+  @Override
+  public MetricsRegistry getContainerMetricsRegistry() {
+    return this.containerMetricsRegistry;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/ContextImpl.java
new file mode 100644 (file)
index 0000000..c2c182f
--- /dev/null
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+public class ContextImpl implements Context {
+  private final JobContext jobContext;
+  private final ContainerContext containerContext;
+  private final TaskContext taskContext;
+  private final ApplicationContainerContext applicationContainerContext;
+  private final ApplicationTaskContext applicationTaskContext;
+
+  /**
+   * @param jobContext non-null job context
+   * @param containerContext non-null framework container context
+   * @param taskContext non-null framework task context
+   * @param applicationContainerContext nullable application-defined container context
+   * @param applicationTaskContext nullable application-defined task context
+   */
+  public ContextImpl(JobContext jobContext, ContainerContext containerContext, TaskContext taskContext,
+      ApplicationContainerContext applicationContainerContext, ApplicationTaskContext applicationTaskContext) {
+    this.jobContext = jobContext;
+    this.containerContext = containerContext;
+    this.taskContext = taskContext;
+    this.applicationContainerContext = applicationContainerContext;
+    this.applicationTaskContext = applicationTaskContext;
+  }
+
+  @Override
+  public JobContext getJobContext() {
+    return this.jobContext;
+  }
+
+  @Override
+  public ContainerContext getContainerContext() {
+    return this.containerContext;
+  }
+
+  @Override
+  public TaskContext getTaskContext() {
+    return this.taskContext;
+  }
+
+  @Override
+  public ApplicationContainerContext getApplicationContainerContext() {
+    if (this.applicationContainerContext == null) {
+      throw new IllegalStateException("No application-defined container context exists");
+    }
+    return this.applicationContainerContext;
+  }
+
+  @Override
+  public ApplicationTaskContext getApplicationTaskContext() {
+    if (this.applicationTaskContext == null) {
+      throw new IllegalStateException("No application-defined task context exists");
+    }
+    return this.applicationTaskContext;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/JobContextImpl.java
new file mode 100644 (file)
index 0000000..8fe44e4
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.apache.samza.config.Config;
+
+
+public class JobContextImpl implements JobContext {
+  private final Config config;
+  private final String jobName;
+  private final String jobId;
+
+  public JobContextImpl(Config config, String jobName, String jobId) {
+    this.config = config;
+    this.jobName = jobName;
+    this.jobId = jobId;
+  }
+
+  @Override
+  public Config getConfig() {
+    return this.config;
+  }
+
+  @Override
+  public String getJobName() {
+    return this.jobName;
+  }
+
+  @Override
+  public String getJobId() {
+    return this.jobId;
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java b/samza-core/src/main/java/org/apache/samza/context/TaskContextImpl.java
new file mode 100644 (file)
index 0000000..e975dcd
--- /dev/null
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import java.util.function.Function;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.scheduler.CallbackScheduler;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.Table;
+import org.apache.samza.table.TableManager;
+
+
+public class TaskContextImpl implements TaskContext {
+  private final TaskModel taskModel;
+  private final MetricsRegistry taskMetricsRegistry;
+  private final Function<String, KeyValueStore> keyValueStoreProvider;
+  private final TableManager tableManager;
+  private final CallbackScheduler callbackScheduler;
+  private final OffsetManager offsetManager;
+
+  public TaskContextImpl(TaskModel taskModel,
+      MetricsRegistry taskMetricsRegistry,
+      Function<String, KeyValueStore> keyValueStoreProvider,
+      TableManager tableManager,
+      CallbackScheduler callbackScheduler,
+      OffsetManager offsetManager) {
+    this.taskModel = taskModel;
+    this.taskMetricsRegistry = taskMetricsRegistry;
+    this.keyValueStoreProvider = keyValueStoreProvider;
+    this.tableManager = tableManager;
+    this.callbackScheduler = callbackScheduler;
+    this.offsetManager = offsetManager;
+  }
+
+  @Override
+  public TaskModel getTaskModel() {
+    return this.taskModel;
+  }
+
+  @Override
+  public MetricsRegistry getTaskMetricsRegistry() {
+    return this.taskMetricsRegistry;
+  }
+
+  @Override
+  public KeyValueStore getStore(String storeName) {
+    KeyValueStore store = this.keyValueStoreProvider.apply(storeName);
+    if (store == null) {
+      throw new IllegalArgumentException(String.format("No store found for storeName: %s", storeName));
+    }
+    return store;
+  }
+
+  @Override
+  public Table getTable(String tableId) {
+    return this.tableManager.getTable(tableId);
+  }
+
+  @Override
+  public CallbackScheduler getCallbackScheduler() {
+    return this.callbackScheduler;
+  }
+
+  @Override
+  public void setStartingOffset(SystemStreamPartition systemStreamPartition, String offset) {
+    this.offsetManager.setStartingOffset(this.taskModel.getTaskName(), systemStreamPartition, offset);
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java b/samza-core/src/main/java/org/apache/samza/scheduler/CallbackSchedulerImpl.java
new file mode 100644 (file)
index 0000000..1f87c7c
--- /dev/null
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.scheduler;
+
+import org.apache.samza.task.EpochTimeScheduler;
+
+
+/**
+ * Delegates to {@link EpochTimeScheduler}. This is useful because it provides a write-only interface for user-facing
+ * purposes.
+ */
+public class CallbackSchedulerImpl implements CallbackScheduler {
+  private final EpochTimeScheduler epochTimeScheduler;
+
+  public CallbackSchedulerImpl(EpochTimeScheduler epochTimeScheduler) {
+    this.epochTimeScheduler = epochTimeScheduler;
+  }
+
+  @Override
+  public <K> void scheduleCallback(K key, long timestamp, ScheduledCallback<K> callback) {
+    this.epochTimeScheduler.setTimer(key, timestamp, callback);
+  }
+
+  @Override
+  public <K> void deleteCallback(K key) {
+    this.epochTimeScheduler.deleteTimer(key);
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestContextImpl.java
new file mode 100644 (file)
index 0000000..33ad3a5
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestContextImpl {
+  /**
+   * Given a concrete context, getApplicationContainerContext should return it.
+   */
+  @Test
+  public void testGetApplicationContainerContext() {
+    MockApplicationContainerContext applicationContainerContext = new MockApplicationContainerContext();
+    Context context = buildWithApplicationContainerContext(applicationContainerContext);
+    assertEquals(applicationContainerContext, context.getApplicationContainerContext());
+  }
+
+  /**
+   * Given no concrete context, getApplicationContainerContext should throw an exception.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testGetMissingApplicationContainerContext() {
+    Context context = buildWithApplicationContainerContext(null);
+    context.getApplicationContainerContext();
+  }
+
+  /**
+   * Given a concrete context, getApplicationTaskContext should return it.
+   */
+  @Test
+  public void testGetApplicationTaskContext() {
+    MockApplicationTaskContext applicationTaskContext = new MockApplicationTaskContext();
+    Context context = buildWithApplicationTaskContext(applicationTaskContext);
+    assertEquals(applicationTaskContext, context.getApplicationTaskContext());
+  }
+
+  /**
+   * Given no concrete context, getApplicationTaskContext should throw an exception.
+   */
+  @Test(expected = IllegalStateException.class)
+  public void testGetMissingApplicationTaskContext() {
+    Context context = buildWithApplicationTaskContext(null);
+    context.getApplicationTaskContext();
+  }
+
+  private static Context buildWithApplicationContainerContext(ApplicationContainerContext applicationContainerContext) {
+    return new ContextImpl(null, null, null, applicationContainerContext, null);
+  }
+
+  private static Context buildWithApplicationTaskContext(ApplicationTaskContext applicationTaskContext) {
+    return new ContextImpl(null, null, null, null, applicationTaskContext);
+  }
+
+  /**
+   * Simple empty implementation for testing.
+   */
+  private class MockApplicationContainerContext implements ApplicationContainerContext {
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
+
+  /**
+   * Simple empty implementation for testing.
+   */
+  private class MockApplicationTaskContext implements ApplicationTaskContext {
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java b/samza-core/src/test/java/org/apache/samza/context/TestTaskContextImpl.java
new file mode 100644 (file)
index 0000000..78f886c
--- /dev/null
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.context;
+
+import java.util.function.Function;
+import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.OffsetManager;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.scheduler.CallbackScheduler;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.table.TableManager;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestTaskContextImpl {
+  private static final TaskName TASK_NAME = new TaskName("myTaskName");
+
+  @Mock
+  private TaskModel taskModel;
+  @Mock
+  private MetricsRegistry taskMetricsRegistry;
+  @Mock
+  private Function<String, KeyValueStore> keyValueStoreProvider;
+  @Mock
+  private TableManager tableManager;
+  @Mock
+  private CallbackScheduler callbackScheduler;
+  @Mock
+  private OffsetManager offsetManager;
+
+  private TaskContextImpl taskContext;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    taskContext =
+        new TaskContextImpl(taskModel, taskMetricsRegistry, keyValueStoreProvider, tableManager, callbackScheduler,
+            offsetManager);
+    when(this.taskModel.getTaskName()).thenReturn(TASK_NAME);
+  }
+
+  /**
+   * Given that there is a store corresponding to the storeName, getStore should return the store.
+   */
+  @Test
+  public void testGetStore() {
+    KeyValueStore store = mock(KeyValueStore.class);
+    when(keyValueStoreProvider.apply("myStore")).thenReturn(store);
+    assertEquals(store, taskContext.getStore("myStore"));
+  }
+
+  /**
+   * Given that there is not a store corresponding to the storeName, getStore should throw an exception.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void testGetMissingStore() {
+    KeyValueStore store = mock(KeyValueStore.class);
+    when(keyValueStoreProvider.apply("myStore")).thenReturn(null);
+    assertEquals(store, taskContext.getStore("myStore"));
+  }
+
+  /**
+   * Given an SSP and offset, setStartingOffset should delegate to the offset manager.
+   */
+  @Test
+  public void testSetStartingOffset() {
+    SystemStreamPartition ssp = new SystemStreamPartition("mySystem", "myStream", new Partition(0));
+    taskContext.setStartingOffset(ssp, "123");
+    verify(offsetManager).setStartingOffset(TASK_NAME, ssp, "123");
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestCallbackSchedulerImpl.java
new file mode 100644 (file)
index 0000000..649a4e4
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.scheduler;
+
+import org.apache.samza.task.EpochTimeScheduler;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+
+public class TestCallbackSchedulerImpl {
+  @Mock
+  private EpochTimeScheduler epochTimeScheduler;
+
+  private CallbackSchedulerImpl scheduler;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    scheduler = new CallbackSchedulerImpl(epochTimeScheduler);
+  }
+
+  /**
+   * scheduleCallback should delegate to the inner scheduler
+   */
+  @Test
+  public void testScheduleCallback() {
+    @SuppressWarnings("unchecked")
+    ScheduledCallback<String> stringCallback = mock(ScheduledCallback.class);
+    scheduler.scheduleCallback("string_key", 123, stringCallback);
+    verify(epochTimeScheduler).setTimer("string_key", 123, stringCallback);
+
+    // check some other type of key
+    @SuppressWarnings("unchecked")
+    ScheduledCallback<Integer> intCallback = mock(ScheduledCallback.class);
+    scheduler.scheduleCallback(777, 456, intCallback);
+    verify(epochTimeScheduler).setTimer(777, 456, intCallback);
+  }
+
+  /**
+   * deleteCallback should delegate to the inner scheduler
+   */
+  @Test
+  public void testDeleteCallback() {
+    scheduler.deleteCallback("string_key");
+    verify(epochTimeScheduler).deleteTimer("string_key");
+
+    // check some other type of key
+    scheduler.deleteCallback(777);
+    verify(epochTimeScheduler).deleteTimer(777);
+  }
+}
\ No newline at end of file