43 hours agoSAMZA-1659: Serializable OperatorSpec master
Yi Pan (Data Infrastructure) [Fri, 25 May 2018 16:37:55 +0000 (09:37 -0700)] 
SAMZA-1659: Serializable OperatorSpec

This change is to make the user supplied functions serializable. Hence, making the full user defined DAG serializable.

Author: Yi Pan (Data Infrastructure) <>
Author: Yi Pan (Data Infrastructure) <>
Author: Xinyu Liu <>

Reviewers: Jagadish <>, Prateek Maheshwari <>

Closes #475 from nickpan47/serializable-opspec-only-Jan-24-18 and squashes the following commits:

db0dea73 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix intermittent TestZkLocalApplicationRunner failure due to StreamProcessor#stop()
34716d42 [Yi Pan (Data Infrastructure)] SAMZA-1659: fix a comment on OperatorSpec#isClone()
37d4e6ae [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing latest round of review comments
68674a14 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
d3a7826c [Yi Pan (Data Infrastructure)] SAMZA-1659: addressing review comments
f83e8dd0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
acca418b [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
842a73d6 [Yi Pan (Data Infrastructure)] SAMZA-1659: making user-defined functions in high-level API serializable
ad85a2cb [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
c1567116 [Yi Pan (Data Infrastructure)] SAMZA-1659: Before re-merge with master. Still need to fix unit tests (moving OperatorSpec clone tests to OperatorSpecGraph.clone)
f2563f8e [Yi Pan (Data Infrastructure)] SAMZA-1659: serialize the whole DAG instead of each individual OperatorSpec.
24d33496 [Yi Pan (Data Infrastructure)] SAMZA-1659: updated according to review comments. Need to merge again with master.
3f643f8b [Yi Pan (Data Infrastructure)] SAMZA-1659: serialiable OperatorSpec
ed7d8c0e [Yi Pan (Data Infrastructure)] Fixed some javadoc and test files
94de218b [Yi Pan (Data Infrastructure)] Remove public access from StreamGraphImpl#getIntermediateStream(String, Serde)
8f4e9dd4 [Yi Pan (Data Infrastructure)] Serialization of StreamGraph in a wrapper class SerializedStreamGraph
f3bb1958 [Yi Pan (Data Infrastructure)] Fix some comments
c15246f5 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
e981967d [Yi Pan (Data Infrastructure)] WIP: fixing unit test for SamzaSQL translators w/ serialization of operator functions
40583051 [Yi Pan (Data Infrastructure)] WIP: update the serialization of user functions after the merge
18ba924f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
93951c5f [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
54a28801 [Yi Pan (Data Infrastructure)] WIP: broadcast, sendtotable, and streamtotablejoin serialization and unit tests
45eb1fb0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
7c8d1591 [Yi Pan (Data Infrastructure)] WIP: working on unit tests for trigger, broadcast, join, table, and SQL UDF function serialization
b973b105 [Yi Pan (Data Infrastructure)] Merge branch 'master' into serializable-opspec-only-Jan-24-18
aca42308 [Yi Pan (Data Infrastructure)] WIP: Serialize OperatorSpec only w/o StreamApplication interface change. Passed all build and tests.
0ebebfc3 [Yi Pan (Data Infrastructure)] WIP: serialization only change
1670aff0 [Yi Pan (Data Infrastructure)] WIP: class-loading of user program logic and main() method based user program logic are both included in ThreadJobFactory/ProcessJobFactory/YarnJobFactory. ThreadJobFactory test suite to be fixed.
4102aa8c [Yi Pan (Data Infrastructure)] WIP: continued working on potential offspring integration
dc7da87e [Yi Pan (Data Infrastructure)] WIP: unit tests for serialization
475a46bc [Yi Pan (Data Infrastructure)] WIP: fixed TestZkLocalApplicationRunner. Debugging issues w/ TestRepartitionWindowApp (i.e. missing changelog creation step when directly running LocalApplicationRunner)
6a14b2af [Yi Pan (Data Infrastructure)] WIP: fixed unit test failure for Windows
d4640329 [Yi Pan (Data Infrastructure)] WIP: fixing unit tests after merge
bf1ce907 [Yi Pan (Data Infrastructure)] WIP: removing StreamDescriptor first
50201728 [Yi Pan (Data Infrastructure)] Merge branch 'experiment-new-api-v2' into new-api-v2-0.14
dde1ab14 [Yi Pan (Data Infrastructure)] WIP: first end-to-end test
d7df6ed0 [Yi Pan (Data Infrastructure)] WIP: added all unit test for OperatorSpec#copy methods.
6fc6d4c0 [Yi Pan (Data Infrastructure)] WIP: experiment code to implement an end-to-end working example for new APIs
525d8bc1 [Yi Pan (Data Infrastructure)] Merge branch '0.14.0' into new-api-v2
e6fb96e5 [Yi Pan (Data Infrastructure)] WIP: merged all application types into StreamApplications
f227380f [Yi Pan (Data Infrastructure)] WIP: update the app runner classes
256155ad [Yi Pan (Data Infrastructure)] WIP: new API user code examples
4a6a58dc [Yi Pan (Data Infrastructure)] WIP: updated w/ low-level task API and global var ingestion/metrics reporter
3c50629e [Yi Pan (Data Infrastructure)] WIP: adding support for low-level task APIs
51541e13 [Yi Pan (Data Infrastructure)] WIP: cleanup StreamDescriptor
0bc7ee7b [Yi Pan (Data Infrastructure)] WIP: update the user code example on new APIs
cd528c1c [Yi Pan (Data Infrastructure)] WIP: updated spec and user DAG API
b898e6c0 [Yi Pan (Data Infrastructure)] WIP: new-api-v2
91f364f1 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
ae3dc6ff [Yi Pan (Data Infrastructure)] WIP: new api revision
8bb97520 [Yi Pan (Data Infrastructure)] WIP: proto-type of input/output stream/system specs
5573a069 [Yi Pan (Data Infrastructure)] WIP: new api revision
aeb45730 [Xinyu Liu] SAMZA-1321: Propagate end-of-stream and watermark messages

3 days agoSAMZA-1725: Set travis build idle time out to 20 minutes.
Shanthoosh Venkataraman [Thu, 24 May 2018 02:27:48 +0000 (19:27 -0700)] 
SAMZA-1725: Set travis build idle time out to 20 minutes.

Currently, average build time of samza codebase is 15 to 20 minutes. However, travis has a build idle timeout of 10 minutes and fails the build if the gradle build command doesn't log anything to console
for 10 minutes(occurs when running tests in samza-test module).

Sample travis build failure error:
No output has been received in the last 10m0s, this potentially indicates a stalled build or something wrong with the build itself.
Check the details on how to adjust your build configuration on:

Increasing the build idle-wait timeout value to 20 minutes.

Author: Shanthoosh Venkataraman <>

Reviewers: Daniel Nishimura <>

Closes #530 from shanthoosh/increase_travis_build_wait_time

3 days agoSAMZA-1540: SystemProducer instance for StreamAppender should have task.drop.producer...
Pawas Chhokra [Thu, 24 May 2018 00:59:51 +0000 (17:59 -0700)] 
SAMZA-1540: SystemProducer instance for StreamAppender should have task.drop.producer.errors==true

vjagadish1989 Kindly take a look, thanks.

Author: Pawas Chhokra <>

Reviewers: Jagadish <>

Closes #522 from PawasChhokra/SAMZA-1540_ChangesToStreamAppender

4 days agoSAMZA-1621: Delete the ephemeral processor node in StreamProcessor shutdown phase.
Shanthoosh Venkataraman [Tue, 22 May 2018 22:19:55 +0000 (15:19 -0700)] 
SAMZA-1621: Delete the ephemeral processor node in StreamProcessor shutdown phase.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #451 from shanthoosh/SAMZA-1621

5 days agoSAMZA-1722: Upgrading avro from 1.7.0 to 1.7.1
sanil15 [Tue, 22 May 2018 00:15:31 +0000 (17:15 -0700)] 
SAMZA-1722: Upgrading avro from 1.7.0 to 1.7.1

Jira: SAMZA-1722:

Author: sanil15 <>

Reviewers: Prateek Maheshwari <>

Closes #529 from Sanil15/SAMZA-1722

5 days agoSAMZA-1723: Schedule barrier changes on the debounce thread
Shanthoosh Venkataraman [Mon, 21 May 2018 18:15:51 +0000 (11:15 -0700)] 
SAMZA-1723: Schedule barrier changes on the debounce thread

In existing implementation, `ZkBarrierChangeHandler` is executed from the `ZkEventThread` and has following drawbacks:
* `ZkWatch` events are buffered into a in-memory queue(maintained by ZkClient) and delivered one at a time to ZkClient listener implementations. If the exeuction of a delivered `ZkWatch` event is in progress, then no other `ZkWatch` event will be delivered to the listeners. If `ZkBarrierChangeHandler` is executed from `ZkEventThread`,  any increase in processing latency will delay the delivery of other `ZkWatch` events(buffered in in-memory queue of ZkClient).
* During session expiration(zkConnection error scenario), buffering all events into `ScheduleAfterDebounceTime` helps us to garbage collect older generation events(to ensure correctness and not execute older generation `ZkWatch` events).

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #526 from shanthoosh/schedule_barrier_change_in_debounce_thread

5 days agoSAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor.
Shanthoosh Venkataraman [Mon, 21 May 2018 18:09:44 +0000 (11:09 -0700)] 
SAMZA-1647: Fix NPE in onJobModelExpired handler in StreamProcessor.

* Switching to using explicit lock in StreamProcessor to make things simpler on state updation.
* Switch from using synchronized in ZkJobCoordinator to prevent any potential deadlocks
between two threads (where one thread holds the StreamProcessor and other thread has ZkJobCoordinator lock).
* Misc cleanups in StreamProcessor: Remove volatile qualifiers from state variables in StreamProcessor. Remove reinstantiating the
executorService in onNewJobModel.
* ZkJobCoordinator cleanups: Make some state variables as immutable.

**NOTE**: The classes in which these changes were made were aynonymous inner classes,
so to add proper unit tests we need to do big haul of refactor.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #493 from shanthoosh/fix_npe_in_jobmodel_expired_handler

8 days agoSAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.
Shanthoosh Venkataraman [Sat, 19 May 2018 00:29:00 +0000 (17:29 -0700)] 
SAMZA-1712: Tear down ZookeeperServer connections in ZkClient on interrupts.


If a thread executing zkClient.close is interrupted, currently we swallow the ZkInterruptedException and proceed without closing the zookeeper connection.

This leads to ephemeral nodes of StreamProcessor lurking around in zookeeper after StreamProcessor shutdown.

Users had to wait till zookeeper server session timeout for the ephemeral nodes to get deleted.


Retry once on InterruptedException when closing the zkClient.

Misc changes:
* Remove unnecessary null checks.
* Remove unnecessary typecasts.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #519 from shanthoosh/handle_interrupted_exception_in_zkclient_close

8 days agoSAMZA-1508: JobRunner should not return success until the job is healthy
Jacob Maes [Fri, 18 May 2018 21:23:07 +0000 (14:23 -0700)] 
SAMZA-1508: JobRunner should not return success until the job is healthy

Author: Jacob Maes <>
Author: Jacob Maes <>

Reviewers: Prateek Maheshwari <>

Closes #367 from jmakes/samza-1508

8 days agoImplementing the fetchSinkInfo in ConfigBasedIOResolver
Srinivasulu Punuru [Fri, 18 May 2018 19:31:44 +0000 (12:31 -0700)] 
Implementing the fetchSinkInfo in ConfigBasedIOResolver

1. I think we missed implementing the fetchSinkInfo method in the ConfigBasedResolver when the API was introduced which is breaking the samza sql console tool. This fixes it.
2. latest release of mac removed realpath so the command line tools are broken. Removed the usage of realpath to fix these tools.

Thanks to nickpan47 for identifying these problems.

Author: Srinivasulu Punuru <>

Reviewers: Yi Pan <>

Closes #528 from srinipunuru/release-fix.1

8 days agoSAMZA-1720: Remove javafx.util dependency from samza-sql tests.
Shanthoosh Venkataraman [Fri, 18 May 2018 19:29:24 +0000 (12:29 -0700)] 
SAMZA-1720: Remove javafx.util dependency from samza-sql tests.

In samza-sql module, currently few test classes(`TestSamzaSqlRelMessageSerde` and `TestSamzaSqlRelRecordSerde`) are dependent upon `javafx.util.Pair` class(coming from `javafx` module). `javafx.util.Pair` is not supported by default in all JDK builds(example; open-jdk java-8 doesn't support `javafx` module) and it belongs to `javafx` package which is primarily used for developing GUI applications. This dependency is removed and replaced with `Pair` class from `apache-commons`.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish V <>

Closes #527 from shanthoosh/SAMZA-1720

10 days agoSAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail
Yi Pan (Data Infrastructure) [Thu, 17 May 2018 05:19:37 +0000 (22:19 -0700)] 
SAMZA-1715: Unit test for Kafka admin deletedMessagesCalled() fail

Test locally and works.

Author: Yi Pan (Data Infrastructure) <>

Reviewers: Jagadish <>

Closes #523 from nickpan47/fix-unittest-deleted-messages

12 days agoSAMZA-1711: Re-enable existing standalone integration tests.
Shanthoosh Venkataraman [Mon, 14 May 2018 17:28:52 +0000 (10:28 -0700)] 
SAMZA-1711: Re-enable existing standalone integration tests.


* Enable all existing standalone integration tests except `TestZkStreamProcessorSession`(`TestZkStreamProcessorSession` is flaky. It spawns `x` StreamProcessors and kills one StreamProcessor through zookeeper session expiration. Sleeps for 5 seconds and proceeds to do validation. If the rebalancing phase takes longer the sleep time, validation fails).
* Remove zookeeper unavailable unit test from LocalApplicationRunner(Race condition in zookeeper shutdown fails other tests). The deleted test will be added back in a separate test class.
* Increase zookeeper server minimum session timeout from 6 seconds to 120 seconds.
* Add assertions to validate if kafka topics setup were successful before the unit tests.


Verified by running the following script on top of this patch in master branch.

while [ $i -lt 50 ]; do
    i=`expr $i + 1`
    echo "Run " +$i
    ./gradlew clean :samza-test:test -Dtest.single="TestZkLocalApplicationRunner" --debug --stacktrace >> ~/test-logs_10
    ./gradlew clean :samza-test:test -Dtest.single="TestZkStreamProcessor" --debug --stacktrace >> ~/test-logs_10
    ./gradlew clean :samza-test:test -Dtest.single="TestStreamProcessor" --debug --stacktrace >> ~/test-logs_10
    ./gradlew clean :samza-test:test -Dtest.single="TestZkStreamProcessorFailures" --debug --stacktrace >> ~/test-logs_10


[svenkatasvenkata-ld2 samza]$ grep 'BUILD SUCCESS' ~/test-logs_10  | wc -l
[svenkatasvenkata-ld2 samza]$ grep 'BUILD FAIL' ~/test-logs_10  | wc -l

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #515 from shanthoosh/turn_all_integration_tests_on

2 weeks agoSAMZA-1696: Delete TestKeyValueStores flaky test
Ahmed Abdul Hamid [Fri, 11 May 2018 17:55:02 +0000 (10:55 -0700)] 
SAMZA-1696: Delete TestKeyValueStores flaky test

`testParallelReadWriteSameKey` is a flaky test that was failing due to a race condition between the main test thread and the thread manipulating the key-value store under test. Specifically, the main test thread could assert the store has received a value before another thread gets to set it. The hard-coded wait durations did not guarantee the main test thread would wait for all other threads to complete, causing it to assert prematurely.

This change deletes this test since the underlying `RocksDB` store wrapped within `RocksDbKeyValueStore` is thread-safe by design, which renders this test redundant and unnecessary.

Author: Ahmed Abdul Hamid <>

Reviewers: Shanthoosh Venkataraman <>

Closes #500 from ahmedahamid/dev/fix-1696

2 weeks agoRemove the iterable interface from KeyValueSnapshot
xinyuiscool [Thu, 10 May 2018 21:19:08 +0000 (14:19 -0700)] 
Remove the iterable interface from KeyValueSnapshot

The iterable interface makes it hard for the users to close it after using.

Author: xinyuiscool <>

Reviewers: Prateek M <>

Closes #516 from xinyuiscool/kv-snapshot

2 weeks agoFixed test failure for TestRocksDbKeyValueStoreJava#testPerf
Prateek Maheshwari [Thu, 10 May 2018 19:16:33 +0000 (12:16 -0700)] 
Fixed test failure for TestRocksDbKeyValueStoreJava#testPerf

Iterators (incl. those obtained from snapshots) must be closed before store close.

Author: Prateek Maheshwari <>

Reviewers: Xinyu Liu <>

Closes #514 from prateekm/rocksdb-test-fi

2 weeks agoSAMZA-1700: Clean up SystemAdmins instance creation flows
Cameron Lee [Thu, 10 May 2018 17:45:41 +0000 (10:45 -0700)] 
SAMZA-1700: Clean up SystemAdmins instance creation flows

The SystemAdmins class has a special "test" constructor for building SystemAdmins instances for unit tests. However, that test constructor has leaked into non-test code.
This makes it harder to manage all flows which use SystemAdmins. An upcoming change is to fix lifecycle management for SystemAdmins in the ApplicationRunner classes, so doing some clean up will help that future change.

Author: Cameron Lee <>

Reviewers: Prateek Maheshwari <>

Closes #509 from cameronlee314/systemadmins_cleanup

2 weeks agoFixed test failure for TestRocksDbKeyValueStoreJava#testIterate
Prateek Maheshwari [Thu, 10 May 2018 16:21:12 +0000 (09:21 -0700)] 
Fixed test failure for TestRocksDbKeyValueStoreJava#testIterate

RocksDB Snapshots and any iterators obtained from them need to be closed before the store is closed.

Otherwise the process aborts with the following message (at least on OSX):
`Assertion failed: (is_last_reference), function ~ColumnFamilyData, file db/, line 457.`

Author: Prateek Maheshwari <>

Reviewers: Jagadish V <>

Closes #513 from prateekm/rocksdb-test-fi

2 weeks agoSAMZA-1706: lazy initialization for eventhub system producer
Hai Lu [Wed, 9 May 2018 22:32:44 +0000 (15:32 -0700)] 
SAMZA-1706: lazy initialization for eventhub system producer

We are seeing slow shutdown issue for eventhub system producers for users who only use eventhub consumer (but then Samza system creates both consumer and producer together no matter what). As a workaround, add lazy initialization to the producer to avoid the slow shutdown

Author: Hai Lu <>

Reviewers: Jagadish <>

Closes #511 from lhaiesp/master

2 weeks agoSAMZA-1705: Switch to use snapshot in iterable impl of RocksDb
xinyuiscool [Wed, 9 May 2018 00:48:55 +0000 (17:48 -0700)] 
SAMZA-1705: Switch to use snapshot in iterable impl of RocksDb

We should use rocksDb.snapshot() method to keep the snapshot and creates a new iterator with it all the time. The perf shows a little bit more expensive but mostly on par with range iterator query.

Author: xinyuiscool <>

Reviewers: Jagadish V <>

Closes #510 from xinyuiscool/SAMZA-1705

2 weeks agoSAMZA-1695: Clear events in ScheduleAfterDebounceTime on session expiration
Shanthoosh Venkataraman [Tue, 8 May 2018 02:40:03 +0000 (19:40 -0700)] 
SAMZA-1695: Clear events in ScheduleAfterDebounceTime on session expiration

Let's assume there're three processors in the group [P1, P2, P3] and P1 is the leader.

1. Leader processor(P1) loses connectivity with a zookeeper server in the ensemble and it's ephemeral processor node is deleted(due to session expiration).
2. Immediate successor(P2) to the leader(P1) finds out that the leader is dead and declares itself as leader. Processor P2 Schedules onProcessorChange to publish JobModel.
3. ZkClient connection retry logic helps the Leader(P1) to reconnect to another zkServer in the ensemble and it joins as follower.
4. Processor P1 acts on the stale buffered event in the debounce queue(which it received when it's a leader) and acts as leader. At this point, there're two processors acting as leader(P1 & P2). If P1 proceeds to execute leader actions before P2, P2 will fail(and in worst case can cause state corruption).

Sample exception logs:

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #499 from shanthoosh/remove_events_from_debounce_queue_on_session_expiry

2 weeks agoSAMZA-1704: Fix compatibility issues with scala 2.12
xiliu [Tue, 8 May 2018 01:14:29 +0000 (18:14 -0700)] 
SAMZA-1704: Fix compatibility issues with scala 2.12

Need to add override keyword for overriding a method in scala 2.12.

Author: xiliu <>

Reviewers: Prateek M <>

Closes #508 from xinyuiscool/SAMZA-1704

2 weeks agoSAMZA-1703: Disable flaky test TestEmbeddedTaggedRateLimiter.testAcquireWithTimeout
xiliu [Tue, 8 May 2018 00:06:35 +0000 (17:06 -0700)] 
SAMZA-1703: Disable flaky test TestEmbeddedTaggedRateLimiter.testAcquireWithTimeout

Author: xiliu <>

Reviewers: Boris S <>

Closes #507 from xinyuiscool/SAMZA-1703

2 weeks agoSAMZA-1702: Prepare 0.14.1 release on the master branch
xiliu [Mon, 7 May 2018 23:13:27 +0000 (16:13 -0700)] 
SAMZA-1702: Prepare 0.14.1 release on the master branch

Author: xiliu <>

Reviewers: Prateek M <>

Closes #506 from xinyuiscool/SAMZA-1702-master

2 weeks agoSAMZA-1261: Fix TestProcessJob flaky test
Ahmed Abdul Hamid [Mon, 7 May 2018 20:45:53 +0000 (13:45 -0700)] 
SAMZA-1261: Fix TestProcessJob flaky test

- Fix flaky test, `TestProcessJob` `testProcessJobKillShouldWork`, which was failing intermittently due to a race condition. In particular, the thread running the test could assert `jobModelManager.stopped` before another thread, enclosed within `ProcessJob.submit`, could actually invoke `jobModelManager.stop`.

+ Refactor `ProcessJob` to improve its overall robustness
  + Handle corner cases, e.g.
     + Fail gracefully if starting process within `ProcessJob.submit` throws
     + Ignore attempts to kill a job before it is submitted
     + Ensure job status is always set appropriately
  + Remove unnecessary stdout/stderr piping code
  + Employ `wait`/`notify` instead of `Thread.sleep`
  + Eliminate all artificial wait method invocations intended to influence inter-thread execution order in unit tests
  + Add more unit tests

Author: Ahmed Abdul Hamid <>

Reviewers: Boris S<>, Shanthoosh V<>

Closes #485 from ahmedahamid/master

2 weeks agoSAMZA-1692: Standalone stability fixes.
Shanthoosh Venkataraman [Mon, 7 May 2018 20:43:05 +0000 (13:43 -0700)] 
SAMZA-1692: Standalone stability fixes.

- Currently, on session expiration processorListener with incorrect generationId is registered with zookeeper(ZkUtils generationId is incremented on reconnect but the generationId in processorListener is zero all the time). When a session reconnect happens to a processor successive to leader, leader expiration event will be skipped. This will prevent leader re-election on a current leader death and will stall the processors group. Fix is to reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when multiple processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the following steps
     - Shutdown the ScheduleAfterDebounceTime queue.
     - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, any new operations can be scheduled in ScheduleAfterDebounceTime queue. This will result in RejectedExecutionException, since executorService is stopped.

Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask23f962a8 rejected from java.util.concurrent.ScheduledThreadPoolExecutor43408be8

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #496 from shanthoosh/master

2 weeks agoSAMZA-1563: Make RocksDB store base directory configurable
Bharath Kumarasubramanian [Mon, 7 May 2018 19:34:26 +0000 (12:34 -0700)] 
SAMZA-1563: Make RocksDB store base directory configurable

xinyuiscool ^^

Author: Bharath Kumarasubramanian <>

Reviewers: Prateek M <>

Closes #491 from bharathkk/samza-1563

2 weeks agoSAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish
Bharath Kumarasubramanian [Mon, 7 May 2018 18:11:01 +0000 (11:11 -0700)] 
SAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish

Added the following APIs to ApplicationRunner
 `void waitForFinish()`
 `boolean waitForFinish(Duration timeout)`

Implemented the wait for finish methods in remote application runner. Note currently, there is disparity in the APIs in terms of associating runners with stream application. Ideally, we want to decide on the cardinal relation between them and change the APIs accordingly.

The goal of the PR is limited to introduce API (waitForFinish) parity between runners in the current setup.

Author: Bharath Kumarasubramanian <>

Reviewers: Xinyu Liu <>

Closes #503 from bharathkk/samza-1653

2 weeks agoSAMZA-1691: Support get iterable from KeyValueStore
xinyuiscool [Mon, 7 May 2018 16:51:28 +0000 (09:51 -0700)] 
SAMZA-1691: Support get iterable from KeyValueStore

Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows:

1) for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning;

2) for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed.

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #492 from xinyuiscool/SAMZA-1691

3 weeks agoSAMZA-1699: Fix NPE in ClusterResourceManager
Jagadish [Fri, 4 May 2018 21:02:35 +0000 (14:02 -0700)] 
SAMZA-1699: Fix NPE in ClusterResourceManager

When the ClusterResourcedManager receives a notification that a container is started, it moves the container from the "pending queue" to its "running queue".
In the meanwhile, it's possible for another thread to remove the mapping for the key. Here's an example:


for (String key : pendingYarnContainers.keySet()) {
  yarnContainer = pendingYarnContainers.get(key); <-- could be null depending on whether the removal happened before it.

Author: Jagadish <>

Reviewers: Prateek M<>

Closes #504 from vjagadish/npe-fix-async

3 weeks agoSAMZA-1698: Update appStatus on failures in
Shanthoosh Venkataraman [Fri, 4 May 2018 00:37:49 +0000 (17:37 -0700)] 
SAMZA-1698: Update appStatus on failures in

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #502 from shanthoosh/local_application_runner_set_exception_in_finish

3 weeks agoSAMZA-1243: Fix flaky tests in TestLocalStoreMonitor
Ahmed Abdul Hamid [Tue, 1 May 2018 20:49:21 +0000 (13:49 -0700)] 
SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Having attempted to reproduce this issue without success, careful analysis of the code indicates the underlying issue is most likely incomplete cleanup of file system directories after test execution possibly due to incomplete or manual test runs. To address this issue, this fix generates a different local store directory for every test to isolate file system side-effects across different tests and test runs.

Author: Ahmed Abdul Hamid <>

Reviewers: Shanthoosh Venkataraman <>

Closes #497 from ahmedahamid/dev/fix-1243

3 weeks agoSAMZA-1476: Fix TestStatefulTask flaky test
Ahmed Abdul Hamid [Tue, 1 May 2018 20:00:38 +0000 (13:00 -0700)] 
SAMZA-1476: Fix TestStatefulTask flaky test

Unable to reproduce the issue even after constraining CPU/memory resources available to the JVM, this fix addresses a potential root cause — a `CountDownLatch` shared between 2 threads, main test thread and Kafka producer thread, but not marked volatile even though it is reinitialized by the main test thread. This could cause the reported issue since each of the 2 threads could end up invoking `countDown`/`await` on a different `CountDownLatch` object.

It is worthwhile to mention that without this fix, a different test, `TestShutdownStatefulTask`, should have also exhibited some flakiness since it shares the same `TestTask` base with `TestStatefulTask` and exercises exactly the same portion of code that includes the failing assertion. Since no such flakiness was reported for `TestShutdownStatefulTask` however, the assumption made by this fix is that it might have not been encountered or reported.

Author: Ahmed Abdul Hamid <>

Reviewers: Prateek Maheshwari <>

Closes #498 from ahmedahamid/dev/fix-1476

3 weeks agoSAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for...
Aditya Toomula [Mon, 30 Apr 2018 23:51:37 +0000 (16:51 -0700)] 
SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for Avro and Rel conversion.

Adding Serde for rel record, as calcite expects the keys to be in string format. Rel converters are always expected to provide keys as strings. If key is an avro record, it is expected that the rel converter changes the avro record to rel record and serializes it and deserializes it when conerting rel message to samza message.

Author: Aditya Toomula <>

Reviewers: Srini P<>

Closes #495 from atoomula/rel1

4 weeks agoSAMZA-1689: Add validations before state transitions in ZkBarrierForVersionUpgrade.
Shanthoosh Venkataraman [Sat, 28 Apr 2018 01:55:15 +0000 (18:55 -0700)] 
SAMZA-1689: Add validations before state transitions in ZkBarrierForVersionUpgrade.

Prevent invalid state updations on barrier.
* Introduced a additional barrier state NEW.
* Add state validations before updating the barrier.
* Fix existing TestZkBarrier tests that are disabled and add new tests
  to verify the intended behavior.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #490 from shanthoosh/fix_barrier_state_transitions

4 weeks agoSAMZA-1582: removed the yajl-ruby dependency for security reasons
Cameron Lee [Fri, 27 Apr 2018 18:59:33 +0000 (11:59 -0700)] 
SAMZA-1582: removed the yajl-ruby dependency for security reasons

The original ticket suggested updating the yajl-ruby dependency for security reasons.
Many of the ruby dependencies for building the docs site are years old. After upgrading the dependencies, the yajl-ruby dependency just went away.
The dependencies were upgraded to the highest version that was compatible with ruby 2.0.0 (jekyll was the only dependency which has newer versions that are only compatible with ruby versions greater than 2.0.0).

Testing done:
bundle exec jekyll serve --watch --baseurl ""
sanity checked a couple of links

Author: Cameron Lee <>

Reviewers: Jagadish <>

Closes #486 from cameronlee314/yajlruby

4 weeks agoSAMZA-1688: use per partition eventhubs client
Hai Lu [Fri, 27 Apr 2018 17:22:57 +0000 (10:22 -0700)] 
SAMZA-1688: use per partition eventhubs client

Use per partition eventhubs client to improve throughput. As each eventhub client maintains only one single TCP connection.
Also reduce the time spent on getPartitionRuntimeInfo on starting up, by making the calls run in parallels in system admin and also completely removing it from system consumer.

See 8x improvement on consumption throughput from ~3Mb/s (or 1.5k QPS) to ~21Mb/s (or 10.5k QPS). Memory footprint doesn't seem to get affected. CPU usage also increases by 8x. Essentially, with per partition client, we are able to saturate the CPU usages. The benchmark I did was on a machine with 8 cores. If the machine has say 16 CPU cores, I expect the the throughput to improve by around 16x.

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #489 from lhaiesp/master

4 weeks agoMinor-fix: Fix a white-space in ResourceRequestState
Jagadish [Thu, 26 Apr 2018 00:20:01 +0000 (17:20 -0700)] 
Minor-fix: Fix a white-space in ResourceRequestState

4 weeks agoSAMZA-1687: Prioritize preferred host requests over ANY-HOST requests
Jagadish [Thu, 26 Apr 2018 00:17:34 +0000 (17:17 -0700)] 
SAMZA-1687: Prioritize preferred host requests over ANY-HOST requests

Working on a documentation that describes this better, but a TL;DR summary is that we should prioritize preferred-host requests over ANY_HOST requests.

Yarn enforces these two checks:
1. ANY_HOST requests should always be made with relax-locality = true
2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true

Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than any-host requests since data-locality is critical.

Author: Jagadish <>

Closes #488 from vjagadish/priority-host-affinity

4 weeks agoSAMZA-1681: Samza-sql - Add support for handling older record schema versions in...
Aditya Toomula [Wed, 25 Apr 2018 16:48:11 +0000 (09:48 -0700)] 
SAMZA-1681: Samza-sql - Add support for handling older record schema versions in AvroRelConverter

In addition to handling older record schema versions in AvroRelConverter, this change also handles Avro enum and fixed types and also handles the proper conversion of samza message key to rel message.

Author: Aditya Toomula <>

Reviewers: Srini P <>

Closes #481 from atoomula/rel

4 weeks agoSAMZA-1686: Set finite operation timeout when creating zkClient.
Shanthoosh Venkataraman [Tue, 24 Apr 2018 22:56:05 +0000 (15:56 -0700)] 
SAMZA-1686: Set finite operation timeout when creating zkClient.

Currently zkClient is created with operationRetryTimeOut of -1. This causes zkClient to retry indefinitely in case of irrecoverable exceptions thereby delaying the StreamProcessor shutdown.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish Venkatraman <>

Closes #487 from shanthoosh/master

4 weeks agoSAMZA-1676: miscellaneous fix and improvement for eventhubs system
Hai Lu [Tue, 24 Apr 2018 19:26:14 +0000 (12:26 -0700)] 
SAMZA-1676: miscellaneous fix and improvement for eventhubs system

Including these changes:
- Log the metadata that we are fetching from the event hubs
- Rename readLatency to consumptionLagMs
- fix the issue that readLatency metric returns negative value

Author: Hai Lu <>

Reviewers: Srini P<>, Jagadish <>

Closes #484 from lhaiesp/master

4 weeks agoSAMZA-1685: Need to resolve redirects for virtualenv download location for running...
Cameron Lee [Tue, 24 Apr 2018 03:28:43 +0000 (20:28 -0700)] 
SAMZA-1685: Need to resolve redirects for virtualenv download location for running integration tests

Testing Done: ran bin/ locally from mac/linux which did not run the integration tests before

Author: Cameron Lee <>

Reviewers: Jagadish <>

Closes #482 from cameronlee314/inttest

4 weeks agoSAMZA-1656: EventHubSystemAdmin does not fetch metadata for valid streams.
Shanthoosh Venkataraman [Mon, 23 Apr 2018 20:24:54 +0000 (13:24 -0700)] 
SAMZA-1656: EventHubSystemAdmin does not fetch metadata for valid streams.

Currently successive invocation of EventHubSystemAdmin.getSystemStreamMetadata with the same stream collection returns empty results.

This is an implementation bug, where the streams requested as a part of prior invocations of EventHubSystemAdmin.getSystemStreamMetadata are ignored(due to stale caching).

This bug causes the JobModel generation phase to fail and kills the StreamProcessor.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>, Srini P<>

Closes #474 from shanthoosh/EventHubSystemAdminBug

4 weeks agoSAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.
Shanthoosh Venkataraman [Mon, 23 Apr 2018 20:20:45 +0000 (13:20 -0700)] 
SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.

In general, jobModel configuration contains service access tokens and certs. It's a common practice to run zookeeper in a non-ACL environment. Hence for security purposes, it's essential not to store configuration as a part of JobModel in zookeeper.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #479 from shanthoosh/nuke_configuration_stored_in_JobModel and squashes the following commits:

b8d2196 [Shanthoosh Venkataraman] Address review comments.
7876a44 [Shanthoosh Venkataraman] Nuke JobModel configuration in ZkJobCoordinator.

5 weeks agoUpdate PMC membership on the Samza web-page
Jagadish [Sat, 21 Apr 2018 02:52:35 +0000 (19:52 -0700)] 
Update PMC membership on the Samza web-page

Author: Jagadish <>

Reviewers: Jagadish <>

Closes #480 from vjagadish/pmc

5 weeks agoSAMZA-1671: sql: initial insert support for table destination
Peng Du [Fri, 20 Apr 2018 16:18:31 +0000 (09:18 -0700)] 
SAMZA-1671: sql: initial insert support for table destination

Table is another main type of IO abstraction in Samza which supports
both read and write (optional). For the tables that do support writes, we
should be able to allow Samza SQL users to write a query to do that. One
example is to insert into a database. The current code only supports
inserting into a stream. This change adds the initial support for table
insert operation.

Author: Peng Du <>

Reviewers: Jagadish <>, Srini P<>

Closes #465 from pdu-mn1/sql-insert-table

5 weeks agoSAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit
Dong Lin [Wed, 18 Apr 2018 22:31:06 +0000 (15:31 -0700)] 
SAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit

Author: Dong Lin <>
Author: Dong Lin <>

Reviewers: Prateek Maheshwari <>, Jacob Maes <>, Yi Pan <>

Closes #347 from lindong28/SAMZA-1478

5 weeks agoMisc. Util cleanup
Prateek Maheshwari [Wed, 18 Apr 2018 20:32:15 +0000 (13:32 -0700)] 
Misc. Util cleanup

Major changes:
1. Broke up 'Util' class into multiple classes: 'FileUtil', 'HttpUtil', 'CoordinatorStreamUtil'.
2. Consolidated some Util classes: MathUtil, ScalaJavaUtil
3. Removed redundant Util classes: ClassloaderUtil, ScalaToJavaUtil
4. Renamed some Util classes for consistency: TimerUtils -> TimerUtil.
5. Inlined some util classes and methods to where they're used: LexicographicComparator to RocksDBKeyValueStore, defaultSerdeFactoryFromSerdeName to SerdeManager, etc.

Rest of the changes are updates to use the new classes and methods.

Testing: Local build and test works. Tested with a locally deployed Samza job.

Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>, Jagadish Venkatraman <>

Closes #455 from prateekm/util-cleanup

5 weeks agoSAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.
Shanthoosh Venkataraman [Wed, 18 Apr 2018 17:00:56 +0000 (10:00 -0700)] 
SAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #466 from shanthoosh/SAMZA-1640

5 weeks agoSAMZA-1651: Samza-sql - Implement GROUP BY SQL operator
Aditya Toomula [Tue, 17 Apr 2018 23:02:49 +0000 (16:02 -0700)] 
SAMZA-1651: Samza-sql - Implement GROUP BY SQL operator

Author: Aditya Toomula <>

Reviewers: Srini P<>

Closes #478 from atoomula/groupby1

5 weeks agoSAMZA-1664: ZkJobCoordinator stability fixes.
Shanthoosh Venkataraman [Tue, 17 Apr 2018 23:02:16 +0000 (16:02 -0700)] 
SAMZA-1664: ZkJobCoordinator stability fixes.

Issues fixed:
* Handle job coordinator shutdown gracefully in case of unclean container shutdowns.
* Fix the zookeeper session handling logic.
* Fix the forever retry timeout in ZkClient re-connect.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #476 from shanthoosh/pullInK2Changes

5 weeks agoSAMZA-1666: Benchmark SystemProducer and SystemConsumer
Srinivasulu Punuru [Tue, 17 Apr 2018 22:40:59 +0000 (15:40 -0700)] 
SAMZA-1666: Benchmark SystemProducer and SystemConsumer

* Tests to benchmark the performance of the system consumers and producers.
* Config to test the benchmark for the event hub system producer and consumer.

SystemConsumerBench and SystemProducerBench provides base generic implementation to test the benchmark for the system producers and consumers. Any new system that needs benchmark test needs a properties file.

The benchmark test itself is single threaded in the way it consumes and produces events. Scaling the benchmark tests right now involves running multiple processes of these tests in parallel.

Right now we just calculate the event rate, But in future we could create a logging metrics registry to hookup other metrics and log them in console along with event rate while the benchmark tests are being run.

Author: Srinivasulu Punuru <>

Reviewers: Jagadish <>, Wei Song<>

Closes #473 from srinipunuru/benchmark.1

5 weeks agoRevert "SAMZA-1645: A few issues found by BEAM stress test"
xiliu [Tue, 17 Apr 2018 18:50:44 +0000 (11:50 -0700)] 
Revert "SAMZA-1645: A few issues found by BEAM stress test"

This reverts commit 26294151642283c1cfb51590b51a86d2eaedd11f.

5 weeks agoInitial version for in memory system.
Bharath Kumarasubramanian [Tue, 17 Apr 2018 16:09:01 +0000 (09:09 -0700)] 
Initial version for in memory system.

Getting the PR out to unblock sanil.
Pending tasks
 1. Add documentation
 2. Add more tests
 3. Currently initialization of the stream happens using createStream on admin. We need to find a way to expose the same functionality to low level users.
 4. To populate the initial stream, clients need to get a handle on producer and produce the messages. Alternatively, we can support serialization of source data and pass it as config to the system to initialize. We have a hook in place for this but not implemented it completely.
  5. Clean up consumed data on the buffer based on the lowest offsets of the consumers.

I will create JIRAs for all the pending tasks and fix them iteratively.

xinyuiscool ^^

Author: Bharath Kumarasubramanian <>

Reviewers: Xinyu Liu <>

Closes #459 from bharathkk/single-node-testing

5 weeks agoSAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message
Aditya Toomula [Mon, 16 Apr 2018 17:26:51 +0000 (10:26 -0700)] 
SAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message

Added support for serialization of nested samza sql rel message and accordingly fixed the conversion of nested avro records to rel message. Please note that we still do not have support for the sql queries that point to fields in nested records (beyond the top level record).

Author: Aditya Toomula <>
Author: Aditya Toomula <>

Reviewers: Srinivasulu P<>

Closes #464 from atoomula/serde

6 weeks agoSAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job if...
Prateek Maheshwari [Fri, 13 Apr 2018 15:37:20 +0000 (08:37 -0700)] 
SAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job if partition count increases

As an aside, also update the gauge to report current number of partitions instead of the change, since that's what its name indicates.

Author: Prateek Maheshwari <>

Reviewers: Jagadish  V <>

Closes #468 from prateekm/partition-monitor-fixes

6 weeks agoSAMZA-1649: Improve host-aware allocation to account for strict locality
Jagadish [Fri, 13 Apr 2018 02:23:44 +0000 (19:23 -0700)] 
SAMZA-1649: Improve host-aware allocation to account for strict locality

Currently working on a doc for the behavior of the CapacityScheduler and further testing is needed on an actual cluster - but here's a summary of why we should set relax-locality = false:
 - Node-local requests are honored only when relax-locality = false
 - With relax-locality = true, the scheduler biases interactivity over data-locality for requests that ask for few resources relative to the size of the cluster.

In addition to the above change, this PR also modifies the allocator algorithm to fallback to "ANY_HOST" requests so that we make progress when the node is unavailable.

Author: Jagadish <>

Reviewers: Prateek Maheshwari <>

Closes #471 from vjagadish/relax-locality-fix

6 weeks agoSAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window
Aditya Toomula [Thu, 12 Apr 2018 23:50:30 +0000 (16:50 -0700)] 
SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window

Author: Aditya Toomula <>

Reviewers: Jagadish <>

Closes #472 from atoomula/window

6 weeks agoSAMZA-1584: Improve logging in StreamProcessor.
Shanthoosh Venkataraman [Thu, 12 Apr 2018 21:34:45 +0000 (14:34 -0700)] 
SAMZA-1584: Improve logging in StreamProcessor.

Add the processorID in the log lines wherever necessary(since we support running multiple stream applications in a JVM) and improving logging in general in StreamProcessor.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Prateek Maheshwari <>

Closes #441 from shanthoosh/SAMZA-1584

6 weeks agominor fix on eventhubs size limit for event body and partition key
Hai Lu [Thu, 12 Apr 2018 20:31:39 +0000 (13:31 -0700)] 
minor fix on eventhubs size limit for event body and partition key

Author: Hai Lu <>

Reviewers: Jagadish <>, Srinivasulu Punuru<>

Closes #470 from lhaiesp/master

6 weeks agoSAMZA-1183: Fix TestAsyncRunLoop flaky tests
Shanthoosh Venkataraman [Wed, 11 Apr 2018 22:34:43 +0000 (15:34 -0700)] 
SAMZA-1183: Fix TestAsyncRunLoop flaky tests

A. Fix TestProcessInOrder and TestProcessOutOfOrder failures.

Both the tests has two tasks(T1, T2) and the following task to message assignments:
- Task T1: [M1, M2] (Task T1 is expected to process the messages M1 and M2).
- Task T2: [M3] (Task T2 is expected to process the message M3 and stop the runloop).

In some cases, before the task T1 completes processing all it’s messages, T2 gets scheduled and stops the runloop(Stopping all tasks).

We wait in both tests through a CountdownLatch, expecting the tasks to process all the messages, which will never reach zero in the above scenario(there by causing indefinite wait in tests).

Fix: Remove the manual shutdown invocation from Task T2 and use EOS messages to stop the runloop.

B.  Fix TestCommitSingleTask and TestCommitMultipleTask failures.

Both the tests had two tasks(T1, T2).

Task T2 is expected to stop the runloop and task T1 marks the commit flag through taskCoordinator.commit(requestScope).

Failure occurs in some scenarios when the task T2 stops the runloop before the runLoop commits both the tasks.

Fix: Trigger the runloop shutdown sequence after we’ve committed the tasks.

C. Combine the duplicate tests and unify their assertions.

D. Verification: Ran the following script to verify the fixes.

`root88cf6be1c11a:~/samza# i=0`
`root88cf6be1c11a:~/samza# while [ $i -lt 100 ]; do`
       ` i=expr $i + 1`
       ` ./gradlew clean :samza-core:test -Dtest.single="TestAsyncRunLoop" --debug --stacktrace >> test-logs`
  ` done;`

There were no failures.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #463 from shanthoosh/re_enable_async_run_loop_tests_2

6 weeks agoSAMZA-1645: A few issues found by BEAM stress test
xinyuiscool [Wed, 11 Apr 2018 20:21:36 +0000 (13:21 -0700)] 
SAMZA-1645: A few issues found by BEAM stress test

1. Revert the priority set to intermediate streams.
2. Fix a watermark propagation condition

Author: xinyuiscool <>

Reviewers: Prateek M <>

Closes #469 from xinyuiscool/SAMZA-1645

6 weeks agoUpgrade to latest event hub version (1.0.1)
Srinivasulu Punuru [Tue, 10 Apr 2018 22:39:21 +0000 (15:39 -0700)] 
Upgrade to latest event hub version (1.0.1)

* Upgrade to the latest event hub version 1.0.1
* Adding configs for prefetchCount and maxEventPerPoll
* Fix the high cpu usage issue in SamzaHistogram
* Fixing a race condition in event hub system producer where the future was getting removed while it was being checked for completion resulting in NPE.

Author: Srinivasulu Punuru <>

Reviewers: Prateek Maheshwari <>

Closes #467 from srinipunuru/upgrade-eh-1.0.1

8 weeks agoRefactoring the EventHub System Producer into reusable AsyncSystemProducer
Srinivasulu Punuru [Sat, 31 Mar 2018 00:45:41 +0000 (17:45 -0700)] 
Refactoring the EventHub System Producer into reusable AsyncSystemProducer

Refactoring eventhub system producer into common reusable components
1. AsyncSystemProducer : All the system producers that have real async send API with call backs can use this
2. NoFlushAsyncSystemProducer: All the system producers that have real async call back based send API and doesn't provide flush semantics can use this.

The system producers that implement AsyncSystemproducers can be used across Samza and Brooklin.

TODO: AsyncSystemProducer needs to be moved to the api layer so that it can be used across different system producers (i.e. eventhub and kinesis)

Author: Srinivasulu Punuru <>

Reviewers: Boris S <>

Closes #458 from srinipunuru/async-prod.3

8 weeks agoSAMZA-1630: Move thread dump from stdout to logs
Prateek Maheshwari [Sat, 31 Mar 2018 00:11:49 +0000 (17:11 -0700)] 
SAMZA-1630: Move thread dump from stdout to logs

Author: Prateek Maheshwari <>

Reviewers: Jagadish Venkatraman <>

Closes #462 from prateekm/thread-dump-on-timeout

8 weeks agoSAMZA-1632: KinesisConfig: Making getProxyHost and getProxyPort APIs protected
Aditya Toomula [Thu, 29 Mar 2018 22:57:58 +0000 (15:57 -0700)] 
SAMZA-1632: KinesisConfig: Making getProxyHost and getProxyPort APIs protected

Author: Aditya Toomula <>

Reviewers: Jagadish<>

Closes #457 from atoomula/kconfig

8 weeks agoSD-1599: Improve the efficiency of the AsynRunLoop when some partitio…
James Lent [Thu, 29 Mar 2018 22:26:28 +0000 (15:26 -0700)] 
SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…

…ns are empty.

Author: James Lent <>

Reviewers: Yi Pan <>, Xinyu Liu <>

Closes #436 from jwlent55/SD-1599-improve-async-run-loop-efficiency and squashes the following commits:

ef0e53bb [James Lent] SD-1599: Remove incorrect call to containerMetrics left by previous update.
3899216e [James Lent] SD-1599: Combine the blockIfBusy and blockIfNoWork logic inside one method and a common latch.
e3837809 [James Lent] SD-1599: Mark runLoopResumedSinceLastChecked as volatile.
a7c0ac4c [James Lent] SD-1599: Explicitly set the timeout value to either 'noNewMessagesTimeout' or 0.
b2dd61e2 [James Lent] SD-1599: Address the first set of code inspection comments.
cc34518a [James Lent] SD-1599: Improve the efficiency of the AsynRunLoop when some partitions are empty.

8 weeks agoSAMZA-1630: Log a thread dump on timeouts
Prateek Maheshwari [Thu, 29 Mar 2018 01:14:34 +0000 (18:14 -0700)] 
SAMZA-1630: Log a thread dump on timeouts

Would be useful to get a thread dump on timeouts, e.g. for AsyncStreamTask callback timeout, container shutdown timeout, heartbeat monitor graceful shutdown timeout etc.

Author: Prateek Maheshwari <>
Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>

Closes #460 from prateekm/thread-dump-on-timeout

8 weeks agoSAMZA-1631: Improve logging on Task callback timeout
Prateek Maheshwari [Thu, 29 Mar 2018 00:52:58 +0000 (17:52 -0700)] 
SAMZA-1631: Improve logging on Task callback timeout

Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>

Closes #461 from prateekm/task-callback-logging

8 weeks agoSAMZA-1627: Watermark broadcast enhancements
xinyuiscool [Wed, 28 Mar 2018 17:25:15 +0000 (10:25 -0700)] 
SAMZA-1627: Watermark broadcast enhancements

Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can prove the broadcast watermark message is after all the upstream tasks finished producing the events whose event time are before the watermark. This reduced the full message count from O(n^2) to O(n).

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #456 from xinyuiscool/SAMZA-1627

8 weeks agoSkipping large messages in the EventHub system producer
Srinivasulu Punuru [Tue, 27 Mar 2018 18:39:52 +0000 (11:39 -0700)] 
Skipping large messages in the EventHub system producer

EventHubs have restriction on maximum message sizes that can be allowed. Adding a `systems.%s.eventhubs.skipMessagesLargerThanBytes` that can be used in the event hub system to make it skip messages that are larger than specific bytes so that we don't even try to send those large messages because EventHubs will reject them anyways.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #454 from srinipunuru/skip.2

2 months agoScheduleAfterDebounceTime should catch all Throwable to avoid losing unhandled Errors
thunderstumpges [Mon, 26 Mar 2018 23:08:00 +0000 (16:08 -0700)] 
ScheduleAfterDebounceTime should catch all Throwable to avoid losing unhandled Errors

If an Action executed in the scheduler throws an Error (or other Throwable?) besides Exception, it is silently lost since the Action/Runnable wrapper only catches Exception, not Throwable. This made my troubleshooting of an issue very difficult. Made it seem like the code was "hung" when really it had thrown a "NoSuchMethodError" (Error instead of Exception) due to a simple dependency issue on my side.

Catching Throwable instead ensures this is handled and propagated properly.

Author: thunderstumpges <>

Reviewers: Prateek Maheshwari <>

Closes #450 from thunderstumpges/scheduler-catch-throwable

2 months agoMaking event hub configs Samza compliant
Srinivasulu Punuru [Fri, 23 Mar 2018 00:08:11 +0000 (17:08 -0700)] 
Making event hub configs Samza compliant

Fixes for Bugs

- SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone.
- SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey and SasToken
- SAMZA-1625 EventHub systemAdmin is swallowing exceptions
- SAMZA-1626 EventHub system admin is not returning the metadata for all the ssps requested for


1. Right now event hub doesn't follow the samza's config convention of naming the secrets as "sensitive" so that they are masked before they are logged.
2. Event hub configs uses the old system.<systemName>.streams.<streamName> which is blacklisted in Samza standalone. So moving these configs to newer <streams>.<streamid>
3. Wrapping the underlying exception properly in the SamzaException in EventHubSystemAdmin
4. Porting Bharat's fix to return the metadata for all the ssps requested for in EventHubSystemAdmin

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #453 from srinipunuru/eh.1

2 months agoAdd stream-table join support for samza sql
Aditya Toomula [Thu, 22 Mar 2018 00:58:07 +0000 (17:58 -0700)] 
Add stream-table join support for samza sql

Author: Aditya Toomula <>

Reviewers: Yi Pan <>

Closes #425 from atoomula/join

2 months agoSAMZA-1623: include avro as the file suffix for hdfs producer
Hai Lu [Wed, 21 Mar 2018 20:13:16 +0000 (13:13 -0700)] 
SAMZA-1623: include avro as the file suffix for hdfs producer

AvroDataFileHdfsWriter should include avro as the file suffix as some pig jobs couldn't read the avro files if they don't come with the proper suffix

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #452 from lhaiesp/master

2 months agoSAMZA-1608 : Add hidden config to enable explicit stream creation in StreamAppender...
Daniel Nishimura [Fri, 16 Mar 2018 20:10:56 +0000 (13:10 -0700)] 
SAMZA-1608 : Add hidden config to enable explicit stream creation in StreamAppender due to bug.

Due to a intermittent bug that causes the explicit stream creation in `StreamAppender` to hang, a hidden config is added to enable/disable explicit stream creation. By default this is disabled, which reverts to the previous behavior.

When the intermittent hang bug is fixed, the config will either be removed or made public.

Author: Daniel Nishimura <>

Reviewers: Prateek Maheshwari <>

Closes #442 from dnishimura/samza-1608-disable-streamappender-create-stream

2 months agoSAMZA-1622: avro writer to support generic record
Hai Lu [Fri, 16 Mar 2018 16:22:01 +0000 (09:22 -0700)] 
SAMZA-1622: avro writer to support generic record

avro writer in HDFS system producer to support generic record

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #449 from lhaiesp/master

2 months agoSAMZA-1615: Fix a couple of issues in ControlMessageSender
Xinyu Liu [Thu, 15 Mar 2018 23:47:22 +0000 (16:47 -0700)] 
SAMZA-1615: Fix a couple of issues in ControlMessageSender

Two issues I found during testing: 1) medaDataCache.getSystemStreamMetadata(): if we pass in partitionOnly to be true, it will actually not store the metadata into the cache, resulting every call being another query to kafka. I turned off the partitionOnly in order to make it in the cache. 2) change the log for info to debug.

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #444 from xinyuiscool/SAMZA-1615

2 months agoSAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compa...
Aditya Toomula [Wed, 14 Mar 2018 00:43:45 +0000 (17:43 -0700)] 
SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets

Author: Aditya Toomula <>

Reviewers: Jagadish <>

Closes #443 from atoomula/bootstrap

2 months agoSAMZA-1618: fix HdfsFileSystemAdapter to get files recursively
Hai Lu [Tue, 13 Mar 2018 19:28:11 +0000 (12:28 -0700)] 
SAMZA-1618: fix HdfsFileSystemAdapter to get files recursively

fix HdfsFileSystemAdapter to get files recursively

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #447 from lhaiesp/master

2 months agoSAMZA-1610: Implementation of remote table provider
Peng Du [Fri, 9 Mar 2018 22:57:56 +0000 (14:57 -0800)] 
SAMZA-1610: Implementation of remote table provider

Please see commit messages for detailed descriptions.

Author: Peng Du <>

Reviewers: Jagadish <>, Wei Song <>

Closes #432 from pdu-mn1/remote-table-0222

2 months agoInfinite loop when trying to use SamzaSqlApplicationRunner in yarn mode
Srinivasulu Punuru [Fri, 9 Mar 2018 19:01:07 +0000 (11:01 -0800)] 
Infinite loop when trying to use SamzaSqlApplicationRunner in yarn mode

Right now when we try to use the SamzaSqlApplicationRunner in yarn mode it goes into the infinite loop because the appRunnerConfig that we try to set is being overwritten by the appRunnerConfig passed by the job and SamzaSqlApplicationRunner keeps creating it again and again in an infinite loop.

Fix : Userconfig for app.runner.class should not override the computed one. Added a test case to validate this.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #440 from srinipunuru/bug-fix.1

2 months agoSAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint
Shanthoosh Venkataraman [Fri, 9 Mar 2018 02:00:15 +0000 (18:00 -0800)] 
SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #438 from shanthoosh/SAMZA-1589

2 months agoSAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData
Shanthoosh Venkataraman [Wed, 7 Mar 2018 21:24:54 +0000 (13:24 -0800)] 
SAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #437 from shanthoosh/fix_zkutils_get_processor_data

2 months agoSAMZA-1602: Moving class StreamAssert from src/test to src/main
sanil15 [Wed, 7 Mar 2018 21:22:40 +0000 (13:22 -0800)] 
SAMZA-1602: Moving class StreamAssert from src/test to src/main

Tested with ./gradlew clean build, which is passing

Author: sanil15 <>

Reviewers: Xinyu Liu <>

Closes #439 from Sanil15/SAMZA-1602

2 months agoSAMZA-1498: Support arbitrary system clock timer in operators
xiliu [Wed, 7 Mar 2018 02:20:15 +0000 (18:20 -0800)] 
SAMZA-1498: Support arbitrary system clock timer in operators

This patch adds the capability to register arbitrary timers for both high-level and low-level api.
For high-level, InitableFunction will pass the TimerRegistry to user through the new OpContext, and user will implement the TimerFunction to get timer notifications.
For low-level api, user can register timer in the TaskContext, and then implements the TimerCallback for specific timer actions.

Author: xiliu <>
Author: xinyuiscool <>
Author: xinyuiscool <>

Reviewers: Pateek M <>

Closes #419 from xinyuiscool/SAMZA-1498

2 months agoSAMZA-1600: remove the combination of cleanup policy "compact,delete"…
Yi Pan (Data Infrastructure) [Tue, 6 Mar 2018 03:40:45 +0000 (19:40 -0800)] 
SAMZA-1600: remove the combination of cleanup policy "compact,delete"…

… in changelog topic properties

Author: Yi Pan (Data Infrastructure) <>

Reviewers: Jagadish <>

Closes #435 from nickpan47/SAMZA-1600

2 months agoSAMZA-1460: StreamAppender does not explicitly create logging topic
Daniel Nishimura [Tue, 6 Mar 2018 00:02:49 +0000 (16:02 -0800)] 
SAMZA-1460: StreamAppender does not explicitly create logging topic

Creates the StreamAppender stream explicitly instead of relying on auto stream creation.

Author: Daniel Nishimura <>

Reviewers: Prateek M <>

Closes #423 from dnishimura/samza-1460-streamappender-create-logging-topic

2 months agoMisc. minor cleanup.
Prateek Maheshwari [Sun, 4 Mar 2018 00:30:59 +0000 (16:30 -0800)] 
Misc. minor cleanup.

1. Added a meaningful name for the container thread pool threads.
2. Made the thread names for framework threads consistent.
3. Made a couple of monitoring/metrics threads daemon.
4. Fixed a few checkstyle warning about missing param/throws documentation.

Author: Prateek Maheshwari <>

Reviewers: Jagadish <>, Jacob M <>

Closes #433 from prateekm/container-thread-pool-name

2 months agoSAMZA-1598: Cleanup file
Daniel Nishimura [Sun, 4 Mar 2018 00:29:44 +0000 (16:29 -0800)] 
SAMZA-1598: Cleanup file

Update old links. Remove Jenkins build badge, in favor of Travis-CI.

Author: Daniel Nishimura <>

Reviewers: Jagdish <>

Closes #434 from dnishimura/samza-1598-clean-readme

2 months agoSAMZA-1596: Staging directory name has to be formatted in config
Akim Akimov [Thu, 1 Mar 2018 18:58:06 +0000 (10:58 -0800)] 
SAMZA-1596: Staging directory name has to be formatted in config

When we instantiate a HDFS config staging directory we missing a formatter for getStagingDirectory so systems.hdfs-system-name.stagingDirectory does not parse from config, but only systems.%s.stagingDirectory only parses instead.

Solution is to add formatter to getStagingDirectory method.

Author: Akim Akimov <>

Reviewers: Jagadish <>, Hai L<>

Closes #431 from Zedmor/FixStagingDirHDFS

2 months agoSAMZA-1597: Expose an interface for throttling
Wei Song [Tue, 27 Feb 2018 19:47:51 +0000 (11:47 -0800)] 
SAMZA-1597: Expose an interface for throttling

3 months agoSAMZA-1595: Fix scalacCompileOptions format to build with zinc scala compiler.
Shanthoosh Venkataraman [Fri, 23 Feb 2018 00:44:00 +0000 (16:44 -0800)] 
SAMZA-1595: Fix scalacCompileOptions format to build with zinc scala compiler.

Zinc scala compiler(part of gradle version >= 3.0) expects the scala compilation arguments as a list(where each compilation argument is an element of the list).

In samza, the compilation arguments are concatenated into a single string and passed to the compiler.

This causes build failures when samza is built with Zinc scala compiler.

Existing ant scala compiler used to build samza in open source accepts the compilation arguments both as list and string.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #430 from shanthoosh/master

3 months agoSAMZA-1594: Remove ScalaCompileOptions to make samza codebase build with gradle versi...
Shanthoosh Venkataraman [Thu, 22 Feb 2018 23:58:39 +0000 (15:58 -0800)] 
SAMZA-1594: Remove ScalaCompileOptions to make samza codebase build with gradle version > 3.0.

When samza repository is built with the gradle version greater than 3.0, we notice the following build failure.

No such property: useAnt for class: org.gradle.api.tasks.scala.ScalaCompileOptions

This needs to be fixed to build samza with  gradle version >= 3.0.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #427 from shanthoosh/fix_scalac_compile_options

3 months agoSAMZA-1593: Upgrade gradle nexus plugin.
Shanthoosh Venkataraman [Thu, 22 Feb 2018 20:39:10 +0000 (12:39 -0800)] 
SAMZA-1593: Upgrade gradle nexus plugin.

Current nexus plugin which samza repository is using is not compatible with the gradle versions greater than 3.0.

For doing the ligradle migration at linkedin, we need to update the nexus gradle plugin to the latest version.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #426 from shanthoosh/master

3 months agoFixes to get the build working with Scala 2.10 build
Srinivasulu Punuru [Wed, 21 Feb 2018 19:45:23 +0000 (11:45 -0800)] 
Fixes to get the build working with Scala 2.10 build

The fixes needed to get the build working with the Scala 2.10.

Author: Srinivasulu Punuru <>

Reviewers: Jagadish <>, Jacob M<>, Bharath K<>, Xinyu L<>

Closes #424 from srinipunuru/rel-fixes.1

3 months agoSAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators
Daniel Nishimura [Fri, 16 Feb 2018 22:01:06 +0000 (14:01 -0800)] 
SAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators

The purpose of this PR is to consolidate the creation of the changelog and checkpoint streams into the JobCoordinators. In the current state, the changelog stream is created from the JobModelManager and the checkpoint stream is created within the OffsetManager. The issue with creating the checkpoint in the OffsetManager is that the first call happens from the first SamzaContainer that runs and each subsequent SamzaContainer run will attempt to create the checkpoint stream.

There are three driving forces for this refactoring. The first motivation is to assign the creation of the changelog and checkpoint streams to the JobCoordinators where it is most appropriate. This was discussed in more detail with nickpan47  . The second motivation is to have any potential failure to stream creation happen no later than during job coordination. The third motivation is to accommodate future security work to provide a robust way to set ACLs on streams.

Follow on to this PR will be:

Author: Daniel Nishimura <>

Reviewers: Yi Pan <>, Prateek Maheshwari <>

Closes #413 from dnishimura/samza-1555-move-changelog-checkpoint-creation and squashes the following commits:

1102314 [Daniel Nishimura] Fix a comment change that Intellij inadvertently refactored.
2b2eb16 [Daniel Nishimura] Trigger CI again
366b7b7 [Daniel Nishimura] Trigger CI
dce36b6 [Daniel Nishimura] Add isBootstrapped flag back to CoordinatorStreamSystemConsumer.
2efcfd4 [Daniel Nishimura] Trigger CI
6b2d912 [Daniel Nishimura] Changes related to Yi's latest review.
9d02e7a [Daniel Nishimura] Change createChangeLogStream to a static method.
effec24 [Daniel Nishimura] Cleanup from latest code review.
1bdfda7 [Daniel Nishimura] CoordinatorStreamManager lifecycle refactoring.
5178ca5 [Daniel Nishimura] Refactor AbstractCoordinatorStreamManager as a concrete class.
894af9c [Daniel Nishimura] Merge from master branch.
4009a0b [Daniel Nishimura] Changes from code review.
3a12a75 [Daniel Nishimura] Separation of changelog manager and jobmodel manager. Create CoordinatorStream class to encapsulate creation and management of coordinator stream consumer and producer.
c188adb [Daniel Nishimura] Merge from master branch.
971fa91 [Daniel Nishimura] Move the responsibility of changelog and checkpoint stream creation to the job coordinators.

3 months agoSAMZA-1588: Add random jitter to monitor’s scheduling interval.
Shanthoosh Venkataraman [Fri, 16 Feb 2018 18:34:21 +0000 (10:34 -0800)] 
SAMZA-1588: Add random jitter to monitor’s scheduling interval.

We’ve observed in LinkedIn execution environments that, all the monitors running on the YARN node-manager machines hitting an external service at the same time based upon the configured monitor scheduling interval.

To eliminate unnecessary monitor execution spike and congestion caused to an external service at the same time, it’s essential to add a random jitter to the monitor scheduling interval.

Random jitter will be added to monitor scheduling interval based upon a boolean configuration.

Author: Shanthoosh Venkataraman <>

Reviewers: Prateek Maheshwari <>

Closes #422 from shanthoosh/SAMZA-1588

3 months agoSupport source types where the last part of the source is not the streamName
Srinivasulu Punuru [Thu, 15 Feb 2018 23:46:17 +0000 (15:46 -0800)] 
Support source types where the last part of the source is not the streamName

Contains following fixes

1. Right now Samza SQL framework assumes that the last part of the source is the stream Name, removed the assumption
2. Made consoleLoggingSystemFactory to log formatted json so that it's easily readable.
3. Added support in SamzaSqlRelMessage where the key may not be present.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #421 from srinipunuru/four-part.1