8 months agoSAMZA-1261: Fix TestProcessJob flaky test
Ahmed Abdul Hamid [Mon, 7 May 2018 20:45:53 +0000 (13:45 -0700)] 
SAMZA-1261: Fix TestProcessJob flaky test

- Fix flaky test, `TestProcessJob` `testProcessJobKillShouldWork`, which was failing intermittently due to a race condition. In particular, the thread running the test could assert `jobModelManager.stopped` before another thread, enclosed within `ProcessJob.submit`, could actually invoke `jobModelManager.stop`.

+ Refactor `ProcessJob` to improve its overall robustness
  + Handle corner cases, e.g.
     + Fail gracefully if starting process within `ProcessJob.submit` throws
     + Ignore attempts to kill a job before it is submitted
     + Ensure job status is always set appropriately
  + Remove unnecessary stdout/stderr piping code
  + Employ `wait`/`notify` instead of `Thread.sleep`
  + Eliminate all artificial wait method invocations intended to influence inter-thread execution order in unit tests
  + Add more unit tests

Author: Ahmed Abdul Hamid <>

Reviewers: Boris S<>, Shanthoosh V<>

Closes #485 from ahmedahamid/master

8 months agoSAMZA-1692: Standalone stability fixes.
Shanthoosh Venkataraman [Mon, 7 May 2018 20:43:05 +0000 (13:43 -0700)] 
SAMZA-1692: Standalone stability fixes.

- Currently, on session expiration processorListener with incorrect generationId is registered with zookeeper(ZkUtils generationId is incremented on reconnect but the generationId in processorListener is zero all the time). When a session reconnect happens to a processor successive to leader, leader expiration event will be skipped. This will prevent leader re-election on a current leader death and will stall the processors group. Fix is to reinstantiate and then register processorChangeListener on session expiration.
- Add processorId to debounce thread name (this can aid debugging when multiple processors are running within a jvm).
- After ScheduleAfterDebounceTime queue is shutdown, don't accept new schedule requests. Current ZkJobCoordinator shutdown sequence comprise of the following steps
     - Shutdown the ScheduleAfterDebounceTime queue.
     - Stop the zkClient  and relinquish it's resources.

After we shutdown ScheduleAfterDebounceTime and before zkclient is stopped, any new operations can be scheduled in ScheduleAfterDebounceTime queue. This will result in RejectedExecutionException, since executorService is stopped.

Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask23f962a8 rejected from java.util.concurrent.ScheduledThreadPoolExecutor43408be8

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #496 from shanthoosh/master

8 months agoSAMZA-1563: Make RocksDB store base directory configurable
Bharath Kumarasubramanian [Mon, 7 May 2018 19:34:26 +0000 (12:34 -0700)] 
SAMZA-1563: Make RocksDB store base directory configurable

xinyuiscool ^^

Author: Bharath Kumarasubramanian <>

Reviewers: Prateek M <>

Closes #491 from bharathkk/samza-1563

8 months agoSAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish
Bharath Kumarasubramanian [Mon, 7 May 2018 18:11:01 +0000 (11:11 -0700)] 
SAMZA-1653: Support waitForFinish in remote application runner and add waitForFinish

Added the following APIs to ApplicationRunner
 `void waitForFinish()`
 `boolean waitForFinish(Duration timeout)`

Implemented the wait for finish methods in remote application runner. Note currently, there is disparity in the APIs in terms of associating runners with stream application. Ideally, we want to decide on the cardinal relation between them and change the APIs accordingly.

The goal of the PR is limited to introduce API (waitForFinish) parity between runners in the current setup.

Author: Bharath Kumarasubramanian <>

Reviewers: Xinyu Liu <>

Closes #503 from bharathkk/samza-1653

8 months agoSAMZA-1691: Support get iterable from KeyValueStore
xinyuiscool [Mon, 7 May 2018 16:51:28 +0000 (09:51 -0700)] 
SAMZA-1691: Support get iterable from KeyValueStore

Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows:

1) for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning;

2) for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed.

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #492 from xinyuiscool/SAMZA-1691

8 months agoSAMZA-1699: Fix NPE in ClusterResourceManager
Jagadish [Fri, 4 May 2018 21:02:35 +0000 (14:02 -0700)] 
SAMZA-1699: Fix NPE in ClusterResourceManager

When the ClusterResourcedManager receives a notification that a container is started, it moves the container from the "pending queue" to its "running queue".
In the meanwhile, it's possible for another thread to remove the mapping for the key. Here's an example:


for (String key : pendingYarnContainers.keySet()) {
  yarnContainer = pendingYarnContainers.get(key); <-- could be null depending on whether the removal happened before it.

Author: Jagadish <>

Reviewers: Prateek M<>

Closes #504 from vjagadish/npe-fix-async

8 months agoSAMZA-1698: Update appStatus on failures in
Shanthoosh Venkataraman [Fri, 4 May 2018 00:37:49 +0000 (17:37 -0700)] 
SAMZA-1698: Update appStatus on failures in

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #502 from shanthoosh/local_application_runner_set_exception_in_finish

8 months agoSAMZA-1243: Fix flaky tests in TestLocalStoreMonitor
Ahmed Abdul Hamid [Tue, 1 May 2018 20:49:21 +0000 (13:49 -0700)] 
SAMZA-1243: Fix flaky tests in TestLocalStoreMonitor

Having attempted to reproduce this issue without success, careful analysis of the code indicates the underlying issue is most likely incomplete cleanup of file system directories after test execution possibly due to incomplete or manual test runs. To address this issue, this fix generates a different local store directory for every test to isolate file system side-effects across different tests and test runs.

Author: Ahmed Abdul Hamid <>

Reviewers: Shanthoosh Venkataraman <>

Closes #497 from ahmedahamid/dev/fix-1243

8 months agoSAMZA-1476: Fix TestStatefulTask flaky test
Ahmed Abdul Hamid [Tue, 1 May 2018 20:00:38 +0000 (13:00 -0700)] 
SAMZA-1476: Fix TestStatefulTask flaky test

Unable to reproduce the issue even after constraining CPU/memory resources available to the JVM, this fix addresses a potential root cause — a `CountDownLatch` shared between 2 threads, main test thread and Kafka producer thread, but not marked volatile even though it is reinitialized by the main test thread. This could cause the reported issue since each of the 2 threads could end up invoking `countDown`/`await` on a different `CountDownLatch` object.

It is worthwhile to mention that without this fix, a different test, `TestShutdownStatefulTask`, should have also exhibited some flakiness since it shares the same `TestTask` base with `TestStatefulTask` and exercises exactly the same portion of code that includes the failing assertion. Since no such flakiness was reported for `TestShutdownStatefulTask` however, the assumption made by this fix is that it might have not been encountered or reported.

Author: Ahmed Abdul Hamid <>

Reviewers: Prateek Maheshwari <>

Closes #498 from ahmedahamid/dev/fix-1476

8 months agoSAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for...
Aditya Toomula [Mon, 30 Apr 2018 23:51:37 +0000 (16:51 -0700)] 
SAMZA-1693: Samza-sql - Adding Serde for rel record and few other minor fixes for Avro and Rel conversion.

Adding Serde for rel record, as calcite expects the keys to be in string format. Rel converters are always expected to provide keys as strings. If key is an avro record, it is expected that the rel converter changes the avro record to rel record and serializes it and deserializes it when conerting rel message to samza message.

Author: Aditya Toomula <>

Reviewers: Srini P<>

Closes #495 from atoomula/rel1

8 months agoSAMZA-1689: Add validations before state transitions in ZkBarrierForVersionUpgrade.
Shanthoosh Venkataraman [Sat, 28 Apr 2018 01:55:15 +0000 (18:55 -0700)] 
SAMZA-1689: Add validations before state transitions in ZkBarrierForVersionUpgrade.

Prevent invalid state updations on barrier.
* Introduced a additional barrier state NEW.
* Add state validations before updating the barrier.
* Fix existing TestZkBarrier tests that are disabled and add new tests
  to verify the intended behavior.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #490 from shanthoosh/fix_barrier_state_transitions

8 months agoSAMZA-1582: removed the yajl-ruby dependency for security reasons
Cameron Lee [Fri, 27 Apr 2018 18:59:33 +0000 (11:59 -0700)] 
SAMZA-1582: removed the yajl-ruby dependency for security reasons

The original ticket suggested updating the yajl-ruby dependency for security reasons.
Many of the ruby dependencies for building the docs site are years old. After upgrading the dependencies, the yajl-ruby dependency just went away.
The dependencies were upgraded to the highest version that was compatible with ruby 2.0.0 (jekyll was the only dependency which has newer versions that are only compatible with ruby versions greater than 2.0.0).

Testing done:
bundle exec jekyll serve --watch --baseurl ""
sanity checked a couple of links

Author: Cameron Lee <>

Reviewers: Jagadish <>

Closes #486 from cameronlee314/yajlruby

8 months agoSAMZA-1688: use per partition eventhubs client
Hai Lu [Fri, 27 Apr 2018 17:22:57 +0000 (10:22 -0700)] 
SAMZA-1688: use per partition eventhubs client

Use per partition eventhubs client to improve throughput. As each eventhub client maintains only one single TCP connection.
Also reduce the time spent on getPartitionRuntimeInfo on starting up, by making the calls run in parallels in system admin and also completely removing it from system consumer.

See 8x improvement on consumption throughput from ~3Mb/s (or 1.5k QPS) to ~21Mb/s (or 10.5k QPS). Memory footprint doesn't seem to get affected. CPU usage also increases by 8x. Essentially, with per partition client, we are able to saturate the CPU usages. The benchmark I did was on a machine with 8 cores. If the machine has say 16 CPU cores, I expect the the throughput to improve by around 16x.

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #489 from lhaiesp/master

8 months agoMinor-fix: Fix a white-space in ResourceRequestState
Jagadish [Thu, 26 Apr 2018 00:20:01 +0000 (17:20 -0700)] 
Minor-fix: Fix a white-space in ResourceRequestState

8 months agoSAMZA-1687: Prioritize preferred host requests over ANY-HOST requests
Jagadish [Thu, 26 Apr 2018 00:17:34 +0000 (17:17 -0700)] 
SAMZA-1687: Prioritize preferred host requests over ANY-HOST requests

Working on a documentation that describes this better, but a TL;DR summary is that we should prioritize preferred-host requests over ANY_HOST requests.

Yarn enforces these two checks:
1. ANY_HOST requests should always be made with relax-locality = true
2. A request with relax-locality = false should not be in the same priority as another with relax-locality = true

Since the Samza AM makes preferred-host requests with relax-locality = false, it follows that ANY_HOST requests should specify a different priority-level. We can safely set priority of preferred-host requests to be higher than any-host requests since data-locality is critical.

Author: Jagadish <>

Closes #488 from vjagadish/priority-host-affinity

8 months agoSAMZA-1681: Samza-sql - Add support for handling older record schema versions in...
Aditya Toomula [Wed, 25 Apr 2018 16:48:11 +0000 (09:48 -0700)] 
SAMZA-1681: Samza-sql - Add support for handling older record schema versions in AvroRelConverter

In addition to handling older record schema versions in AvroRelConverter, this change also handles Avro enum and fixed types and also handles the proper conversion of samza message key to rel message.

Author: Aditya Toomula <>

Reviewers: Srini P <>

Closes #481 from atoomula/rel

8 months agoSAMZA-1686: Set finite operation timeout when creating zkClient.
Shanthoosh Venkataraman [Tue, 24 Apr 2018 22:56:05 +0000 (15:56 -0700)] 
SAMZA-1686: Set finite operation timeout when creating zkClient.

Currently zkClient is created with operationRetryTimeOut of -1. This causes zkClient to retry indefinitely in case of irrecoverable exceptions thereby delaying the StreamProcessor shutdown.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish Venkatraman <>

Closes #487 from shanthoosh/master

8 months agoSAMZA-1676: miscellaneous fix and improvement for eventhubs system
Hai Lu [Tue, 24 Apr 2018 19:26:14 +0000 (12:26 -0700)] 
SAMZA-1676: miscellaneous fix and improvement for eventhubs system

Including these changes:
- Log the metadata that we are fetching from the event hubs
- Rename readLatency to consumptionLagMs
- fix the issue that readLatency metric returns negative value

Author: Hai Lu <>

Reviewers: Srini P<>, Jagadish <>

Closes #484 from lhaiesp/master

8 months agoSAMZA-1685: Need to resolve redirects for virtualenv download location for running...
Cameron Lee [Tue, 24 Apr 2018 03:28:43 +0000 (20:28 -0700)] 
SAMZA-1685: Need to resolve redirects for virtualenv download location for running integration tests

Testing Done: ran bin/ locally from mac/linux which did not run the integration tests before

Author: Cameron Lee <>

Reviewers: Jagadish <>

Closes #482 from cameronlee314/inttest

9 months agoSAMZA-1656: EventHubSystemAdmin does not fetch metadata for valid streams.
Shanthoosh Venkataraman [Mon, 23 Apr 2018 20:24:54 +0000 (13:24 -0700)] 
SAMZA-1656: EventHubSystemAdmin does not fetch metadata for valid streams.

Currently successive invocation of EventHubSystemAdmin.getSystemStreamMetadata with the same stream collection returns empty results.

This is an implementation bug, where the streams requested as a part of prior invocations of EventHubSystemAdmin.getSystemStreamMetadata are ignored(due to stale caching).

This bug causes the JobModel generation phase to fail and kills the StreamProcessor.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>, Srini P<>

Closes #474 from shanthoosh/EventHubSystemAdminBug

9 months agoSAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.
Shanthoosh Venkataraman [Mon, 23 Apr 2018 20:20:45 +0000 (13:20 -0700)] 
SAMZA-1667: Skip storing configuration as a part of JobModel in zookeeper data nodes.

In general, jobModel configuration contains service access tokens and certs. It's a common practice to run zookeeper in a non-ACL environment. Hence for security purposes, it's essential not to store configuration as a part of JobModel in zookeeper.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #479 from shanthoosh/nuke_configuration_stored_in_JobModel and squashes the following commits:

b8d2196 [Shanthoosh Venkataraman] Address review comments.
7876a44 [Shanthoosh Venkataraman] Nuke JobModel configuration in ZkJobCoordinator.

9 months agoUpdate PMC membership on the Samza web-page
Jagadish [Sat, 21 Apr 2018 02:52:35 +0000 (19:52 -0700)] 
Update PMC membership on the Samza web-page

Author: Jagadish <>

Reviewers: Jagadish <>

Closes #480 from vjagadish/pmc

9 months agoSAMZA-1671: sql: initial insert support for table destination
Peng Du [Fri, 20 Apr 2018 16:18:31 +0000 (09:18 -0700)] 
SAMZA-1671: sql: initial insert support for table destination

Table is another main type of IO abstraction in Samza which supports
both read and write (optional). For the tables that do support writes, we
should be able to allow Samza SQL users to write a query to do that. One
example is to insert into a database. The current code only supports
inserting into a stream. This change adds the initial support for table
insert operation.

Author: Peng Du <>

Reviewers: Jagadish <>, Srini P<>

Closes #465 from pdu-mn1/sql-insert-table

9 months agoSAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit
Dong Lin [Wed, 18 Apr 2018 22:31:06 +0000 (15:31 -0700)] 
SAMZA-1478: Delete unneeded data from intermediate Kafka topic on offset commit

Author: Dong Lin <>
Author: Dong Lin <>

Reviewers: Prateek Maheshwari <>, Jacob Maes <>, Yi Pan <>

Closes #347 from lindong28/SAMZA-1478

9 months agoMisc. Util cleanup
Prateek Maheshwari [Wed, 18 Apr 2018 20:32:15 +0000 (13:32 -0700)] 
Misc. Util cleanup

Major changes:
1. Broke up 'Util' class into multiple classes: 'FileUtil', 'HttpUtil', 'CoordinatorStreamUtil'.
2. Consolidated some Util classes: MathUtil, ScalaJavaUtil
3. Removed redundant Util classes: ClassloaderUtil, ScalaToJavaUtil
4. Renamed some Util classes for consistency: TimerUtils -> TimerUtil.
5. Inlined some util classes and methods to where they're used: LexicographicComparator to RocksDBKeyValueStore, defaultSerdeFactoryFromSerdeName to SerdeManager, etc.

Rest of the changes are updates to use the new classes and methods.

Testing: Local build and test works. Tested with a locally deployed Samza job.

Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>, Jagadish Venkatraman <>

Closes #455 from prateekm/util-cleanup

9 months agoSAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.
Shanthoosh Venkataraman [Wed, 18 Apr 2018 17:00:56 +0000 (10:00 -0700)] 
SAMZA-1640: JobModel Json deserialization error in ZkJobCoordinator.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #466 from shanthoosh/SAMZA-1640

9 months agoSAMZA-1651: Samza-sql - Implement GROUP BY SQL operator
Aditya Toomula [Tue, 17 Apr 2018 23:02:49 +0000 (16:02 -0700)] 
SAMZA-1651: Samza-sql - Implement GROUP BY SQL operator

Author: Aditya Toomula <>

Reviewers: Srini P<>

Closes #478 from atoomula/groupby1

9 months agoSAMZA-1664: ZkJobCoordinator stability fixes.
Shanthoosh Venkataraman [Tue, 17 Apr 2018 23:02:16 +0000 (16:02 -0700)] 
SAMZA-1664: ZkJobCoordinator stability fixes.

Issues fixed:
* Handle job coordinator shutdown gracefully in case of unclean container shutdowns.
* Fix the zookeeper session handling logic.
* Fix the forever retry timeout in ZkClient re-connect.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #476 from shanthoosh/pullInK2Changes

9 months agoSAMZA-1666: Benchmark SystemProducer and SystemConsumer
Srinivasulu Punuru [Tue, 17 Apr 2018 22:40:59 +0000 (15:40 -0700)] 
SAMZA-1666: Benchmark SystemProducer and SystemConsumer

* Tests to benchmark the performance of the system consumers and producers.
* Config to test the benchmark for the event hub system producer and consumer.

SystemConsumerBench and SystemProducerBench provides base generic implementation to test the benchmark for the system producers and consumers. Any new system that needs benchmark test needs a properties file.

The benchmark test itself is single threaded in the way it consumes and produces events. Scaling the benchmark tests right now involves running multiple processes of these tests in parallel.

Right now we just calculate the event rate, But in future we could create a logging metrics registry to hookup other metrics and log them in console along with event rate while the benchmark tests are being run.

Author: Srinivasulu Punuru <>

Reviewers: Jagadish <>, Wei Song<>

Closes #473 from srinipunuru/benchmark.1

9 months agoRevert "SAMZA-1645: A few issues found by BEAM stress test"
xiliu [Tue, 17 Apr 2018 18:50:44 +0000 (11:50 -0700)] 
Revert "SAMZA-1645: A few issues found by BEAM stress test"

This reverts commit 26294151642283c1cfb51590b51a86d2eaedd11f.

9 months agoInitial version for in memory system.
Bharath Kumarasubramanian [Tue, 17 Apr 2018 16:09:01 +0000 (09:09 -0700)] 
Initial version for in memory system.

Getting the PR out to unblock sanil.
Pending tasks
 1. Add documentation
 2. Add more tests
 3. Currently initialization of the stream happens using createStream on admin. We need to find a way to expose the same functionality to low level users.
 4. To populate the initial stream, clients need to get a handle on producer and produce the messages. Alternatively, we can support serialization of source data and pass it as config to the system to initialize. We have a hook in place for this but not implemented it completely.
  5. Clean up consumed data on the buffer based on the lowest offsets of the consumers.

I will create JIRAs for all the pending tasks and fix them iteratively.

xinyuiscool ^^

Author: Bharath Kumarasubramanian <>

Reviewers: Xinyu Liu <>

Closes #459 from bharathkk/single-node-testing

9 months agoSAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message
Aditya Toomula [Mon, 16 Apr 2018 17:26:51 +0000 (10:26 -0700)] 
SAMZA-1619: Samza-sql: Support serialization of nested samza sql relational message

Added support for serialization of nested samza sql rel message and accordingly fixed the conversion of nested avro records to rel message. Please note that we still do not have support for the sql queries that point to fields in nested records (beyond the top level record).

Author: Aditya Toomula <>
Author: Aditya Toomula <>

Reviewers: Srinivasulu P<>

Closes #464 from atoomula/serde

9 months agoSAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job if...
Prateek Maheshwari [Fri, 13 Apr 2018 15:37:20 +0000 (08:37 -0700)] 
SAMZA-1643: StreamPartitionCountMonitor should only restart/shut down the job if partition count increases

As an aside, also update the gauge to report current number of partitions instead of the change, since that's what its name indicates.

Author: Prateek Maheshwari <>

Reviewers: Jagadish  V <>

Closes #468 from prateekm/partition-monitor-fixes

9 months agoSAMZA-1649: Improve host-aware allocation to account for strict locality
Jagadish [Fri, 13 Apr 2018 02:23:44 +0000 (19:23 -0700)] 
SAMZA-1649: Improve host-aware allocation to account for strict locality

Currently working on a doc for the behavior of the CapacityScheduler and further testing is needed on an actual cluster - but here's a summary of why we should set relax-locality = false:
 - Node-local requests are honored only when relax-locality = false
 - With relax-locality = true, the scheduler biases interactivity over data-locality for requests that ask for few resources relative to the size of the cluster.

In addition to the above change, this PR also modifies the allocator algorithm to fallback to "ANY_HOST" requests so that we make progress when the node is unavailable.

Author: Jagadish <>

Reviewers: Prateek Maheshwari <>

Closes #471 from vjagadish/relax-locality-fix

9 months agoSAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window
Aditya Toomula [Thu, 12 Apr 2018 23:50:30 +0000 (16:50 -0700)] 
SAMZA-1650: Fix for firing trigger at the end of trigger interval for tumbling window

Author: Aditya Toomula <>

Reviewers: Jagadish <>

Closes #472 from atoomula/window

9 months agoSAMZA-1584: Improve logging in StreamProcessor.
Shanthoosh Venkataraman [Thu, 12 Apr 2018 21:34:45 +0000 (14:34 -0700)] 
SAMZA-1584: Improve logging in StreamProcessor.

Add the processorID in the log lines wherever necessary(since we support running multiple stream applications in a JVM) and improving logging in general in StreamProcessor.

Author: Shanthoosh Venkataraman <>
Author: Shanthoosh Venkataraman <>

Reviewers: Prateek Maheshwari <>

Closes #441 from shanthoosh/SAMZA-1584

9 months agominor fix on eventhubs size limit for event body and partition key
Hai Lu [Thu, 12 Apr 2018 20:31:39 +0000 (13:31 -0700)] 
minor fix on eventhubs size limit for event body and partition key

Author: Hai Lu <>

Reviewers: Jagadish <>, Srinivasulu Punuru<>

Closes #470 from lhaiesp/master

9 months agoSAMZA-1183: Fix TestAsyncRunLoop flaky tests
Shanthoosh Venkataraman [Wed, 11 Apr 2018 22:34:43 +0000 (15:34 -0700)] 
SAMZA-1183: Fix TestAsyncRunLoop flaky tests

A. Fix TestProcessInOrder and TestProcessOutOfOrder failures.

Both the tests has two tasks(T1, T2) and the following task to message assignments:
- Task T1: [M1, M2] (Task T1 is expected to process the messages M1 and M2).
- Task T2: [M3] (Task T2 is expected to process the message M3 and stop the runloop).

In some cases, before the task T1 completes processing all it’s messages, T2 gets scheduled and stops the runloop(Stopping all tasks).

We wait in both tests through a CountdownLatch, expecting the tasks to process all the messages, which will never reach zero in the above scenario(there by causing indefinite wait in tests).

Fix: Remove the manual shutdown invocation from Task T2 and use EOS messages to stop the runloop.

B.  Fix TestCommitSingleTask and TestCommitMultipleTask failures.

Both the tests had two tasks(T1, T2).

Task T2 is expected to stop the runloop and task T1 marks the commit flag through taskCoordinator.commit(requestScope).

Failure occurs in some scenarios when the task T2 stops the runloop before the runLoop commits both the tasks.

Fix: Trigger the runloop shutdown sequence after we’ve committed the tasks.

C. Combine the duplicate tests and unify their assertions.

D. Verification: Ran the following script to verify the fixes.

`root88cf6be1c11a:~/samza# i=0`
`root88cf6be1c11a:~/samza# while [ $i -lt 100 ]; do`
       ` i=expr $i + 1`
       ` ./gradlew clean :samza-core:test -Dtest.single="TestAsyncRunLoop" --debug --stacktrace >> test-logs`
  ` done;`

There were no failures.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #463 from shanthoosh/re_enable_async_run_loop_tests_2

9 months agoSAMZA-1645: A few issues found by BEAM stress test
xinyuiscool [Wed, 11 Apr 2018 20:21:36 +0000 (13:21 -0700)] 
SAMZA-1645: A few issues found by BEAM stress test

1. Revert the priority set to intermediate streams.
2. Fix a watermark propagation condition

Author: xinyuiscool <>

Reviewers: Prateek M <>

Closes #469 from xinyuiscool/SAMZA-1645

9 months agoUpgrade to latest event hub version (1.0.1)
Srinivasulu Punuru [Tue, 10 Apr 2018 22:39:21 +0000 (15:39 -0700)] 
Upgrade to latest event hub version (1.0.1)

* Upgrade to the latest event hub version 1.0.1
* Adding configs for prefetchCount and maxEventPerPoll
* Fix the high cpu usage issue in SamzaHistogram
* Fixing a race condition in event hub system producer where the future was getting removed while it was being checked for completion resulting in NPE.

Author: Srinivasulu Punuru <>

Reviewers: Prateek Maheshwari <>

Closes #467 from srinipunuru/upgrade-eh-1.0.1

9 months agoRefactoring the EventHub System Producer into reusable AsyncSystemProducer
Srinivasulu Punuru [Sat, 31 Mar 2018 00:45:41 +0000 (17:45 -0700)] 
Refactoring the EventHub System Producer into reusable AsyncSystemProducer

Refactoring eventhub system producer into common reusable components
1. AsyncSystemProducer : All the system producers that have real async send API with call backs can use this
2. NoFlushAsyncSystemProducer: All the system producers that have real async call back based send API and doesn't provide flush semantics can use this.

The system producers that implement AsyncSystemproducers can be used across Samza and Brooklin.

TODO: AsyncSystemProducer needs to be moved to the api layer so that it can be used across different system producers (i.e. eventhub and kinesis)

Author: Srinivasulu Punuru <>

Reviewers: Boris S <>

Closes #458 from srinipunuru/async-prod.3

9 months agoSAMZA-1630: Move thread dump from stdout to logs
Prateek Maheshwari [Sat, 31 Mar 2018 00:11:49 +0000 (17:11 -0700)] 
SAMZA-1630: Move thread dump from stdout to logs

Author: Prateek Maheshwari <>

Reviewers: Jagadish Venkatraman <>

Closes #462 from prateekm/thread-dump-on-timeout

9 months agoSAMZA-1632: KinesisConfig: Making getProxyHost and getProxyPort APIs protected
Aditya Toomula [Thu, 29 Mar 2018 22:57:58 +0000 (15:57 -0700)] 
SAMZA-1632: KinesisConfig: Making getProxyHost and getProxyPort APIs protected

Author: Aditya Toomula <>

Reviewers: Jagadish<>

Closes #457 from atoomula/kconfig

9 months agoSD-1599: Improve the efficiency of the AsynRunLoop when some partitio…
James Lent [Thu, 29 Mar 2018 22:26:28 +0000 (15:26 -0700)] 
SD-1599: Improve the efficiency of the AsynRunLoop when some partitio…

…ns are empty.

Author: James Lent <>

Reviewers: Yi Pan <>, Xinyu Liu <>

Closes #436 from jwlent55/SD-1599-improve-async-run-loop-efficiency and squashes the following commits:

ef0e53bb [James Lent] SD-1599: Remove incorrect call to containerMetrics left by previous update.
3899216e [James Lent] SD-1599: Combine the blockIfBusy and blockIfNoWork logic inside one method and a common latch.
e3837809 [James Lent] SD-1599: Mark runLoopResumedSinceLastChecked as volatile.
a7c0ac4c [James Lent] SD-1599: Explicitly set the timeout value to either 'noNewMessagesTimeout' or 0.
b2dd61e2 [James Lent] SD-1599: Address the first set of code inspection comments.
cc34518a [James Lent] SD-1599: Improve the efficiency of the AsynRunLoop when some partitions are empty.

9 months agoSAMZA-1630: Log a thread dump on timeouts
Prateek Maheshwari [Thu, 29 Mar 2018 01:14:34 +0000 (18:14 -0700)] 
SAMZA-1630: Log a thread dump on timeouts

Would be useful to get a thread dump on timeouts, e.g. for AsyncStreamTask callback timeout, container shutdown timeout, heartbeat monitor graceful shutdown timeout etc.

Author: Prateek Maheshwari <>
Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>

Closes #460 from prateekm/thread-dump-on-timeout

9 months agoSAMZA-1631: Improve logging on Task callback timeout
Prateek Maheshwari [Thu, 29 Mar 2018 00:52:58 +0000 (17:52 -0700)] 
SAMZA-1631: Improve logging on Task callback timeout

Author: Prateek Maheshwari <>

Reviewers: Jacob Maes <>

Closes #461 from prateekm/task-callback-logging

9 months agoSAMZA-1627: Watermark broadcast enhancements
xinyuiscool [Wed, 28 Mar 2018 17:25:15 +0000 (10:25 -0700)] 
SAMZA-1627: Watermark broadcast enhancements

Currently each upstream task needs to broadcast to every single partition of intermediate streams in order to aggregate watermarks in the consumers. A better way to do this is to have only one downstream consumer doing the aggregation, and then broadcast to all the partitions. This is safe as we can prove the broadcast watermark message is after all the upstream tasks finished producing the events whose event time are before the watermark. This reduced the full message count from O(n^2) to O(n).

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #456 from xinyuiscool/SAMZA-1627

9 months agoSkipping large messages in the EventHub system producer
Srinivasulu Punuru [Tue, 27 Mar 2018 18:39:52 +0000 (11:39 -0700)] 
Skipping large messages in the EventHub system producer

EventHubs have restriction on maximum message sizes that can be allowed. Adding a `systems.%s.eventhubs.skipMessagesLargerThanBytes` that can be used in the event hub system to make it skip messages that are larger than specific bytes so that we don't even try to send those large messages because EventHubs will reject them anyways.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #454 from srinipunuru/skip.2

9 months agoScheduleAfterDebounceTime should catch all Throwable to avoid losing unhandled Errors
thunderstumpges [Mon, 26 Mar 2018 23:08:00 +0000 (16:08 -0700)] 
ScheduleAfterDebounceTime should catch all Throwable to avoid losing unhandled Errors

If an Action executed in the scheduler throws an Error (or other Throwable?) besides Exception, it is silently lost since the Action/Runnable wrapper only catches Exception, not Throwable. This made my troubleshooting of an issue very difficult. Made it seem like the code was "hung" when really it had thrown a "NoSuchMethodError" (Error instead of Exception) due to a simple dependency issue on my side.

Catching Throwable instead ensures this is handled and propagated properly.

Author: thunderstumpges <>

Reviewers: Prateek Maheshwari <>

Closes #450 from thunderstumpges/scheduler-catch-throwable

10 months agoMaking event hub configs Samza compliant
Srinivasulu Punuru [Fri, 23 Mar 2018 00:08:11 +0000 (17:08 -0700)] 
Making event hub configs Samza compliant

Fixes for Bugs

- SAMZA-1571 Make Eventhubs system configs compatible with Samza standalone.
- SAMZA-1624 EventHub system should prefix the configs with senstive for SasKey and SasToken
- SAMZA-1625 EventHub systemAdmin is swallowing exceptions
- SAMZA-1626 EventHub system admin is not returning the metadata for all the ssps requested for


1. Right now event hub doesn't follow the samza's config convention of naming the secrets as "sensitive" so that they are masked before they are logged.
2. Event hub configs uses the old system.<systemName>.streams.<streamName> which is blacklisted in Samza standalone. So moving these configs to newer <streams>.<streamid>
3. Wrapping the underlying exception properly in the SamzaException in EventHubSystemAdmin
4. Porting Bharat's fix to return the metadata for all the ssps requested for in EventHubSystemAdmin

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #453 from srinipunuru/eh.1

10 months agoAdd stream-table join support for samza sql
Aditya Toomula [Thu, 22 Mar 2018 00:58:07 +0000 (17:58 -0700)] 
Add stream-table join support for samza sql

Author: Aditya Toomula <>

Reviewers: Yi Pan <>

Closes #425 from atoomula/join

10 months agoSAMZA-1623: include avro as the file suffix for hdfs producer
Hai Lu [Wed, 21 Mar 2018 20:13:16 +0000 (13:13 -0700)] 
SAMZA-1623: include avro as the file suffix for hdfs producer

AvroDataFileHdfsWriter should include avro as the file suffix as some pig jobs couldn't read the avro files if they don't come with the proper suffix

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #452 from lhaiesp/master

10 months agoSAMZA-1608 : Add hidden config to enable explicit stream creation in StreamAppender...
Daniel Nishimura [Fri, 16 Mar 2018 20:10:56 +0000 (13:10 -0700)] 
SAMZA-1608 : Add hidden config to enable explicit stream creation in StreamAppender due to bug.

Due to a intermittent bug that causes the explicit stream creation in `StreamAppender` to hang, a hidden config is added to enable/disable explicit stream creation. By default this is disabled, which reverts to the previous behavior.

When the intermittent hang bug is fixed, the config will either be removed or made public.

Author: Daniel Nishimura <>

Reviewers: Prateek Maheshwari <>

Closes #442 from dnishimura/samza-1608-disable-streamappender-create-stream

10 months agoSAMZA-1622: avro writer to support generic record
Hai Lu [Fri, 16 Mar 2018 16:22:01 +0000 (09:22 -0700)] 
SAMZA-1622: avro writer to support generic record

avro writer in HDFS system producer to support generic record

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #449 from lhaiesp/master

10 months agoSAMZA-1615: Fix a couple of issues in ControlMessageSender
Xinyu Liu [Thu, 15 Mar 2018 23:47:22 +0000 (16:47 -0700)] 
SAMZA-1615: Fix a couple of issues in ControlMessageSender

Two issues I found during testing: 1) medaDataCache.getSystemStreamMetadata(): if we pass in partitionOnly to be true, it will actually not store the metadata into the cache, resulting every call being another query to kafka. I turned off the partitionOnly in order to make it in the cache. 2) change the log for info to debug.

Author: xinyuiscool <>

Reviewers: Boris S <>

Closes #444 from xinyuiscool/SAMZA-1615

10 months agoSAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compa...
Aditya Toomula [Wed, 14 Mar 2018 00:43:45 +0000 (17:43 -0700)] 
SAMZA-1611: BootstrappingChooser should use systemAdmin offsetComparator API to compare the offsets

Author: Aditya Toomula <>

Reviewers: Jagadish <>

Closes #443 from atoomula/bootstrap

10 months agoSAMZA-1618: fix HdfsFileSystemAdapter to get files recursively
Hai Lu [Tue, 13 Mar 2018 19:28:11 +0000 (12:28 -0700)] 
SAMZA-1618: fix HdfsFileSystemAdapter to get files recursively

fix HdfsFileSystemAdapter to get files recursively

Author: Hai Lu <>

Reviewers: Xinyu Liu <>

Closes #447 from lhaiesp/master

10 months agoSAMZA-1610: Implementation of remote table provider
Peng Du [Fri, 9 Mar 2018 22:57:56 +0000 (14:57 -0800)] 
SAMZA-1610: Implementation of remote table provider

Please see commit messages for detailed descriptions.

Author: Peng Du <>

Reviewers: Jagadish <>, Wei Song <>

Closes #432 from pdu-mn1/remote-table-0222

10 months agoInfinite loop when trying to use SamzaSqlApplicationRunner in yarn mode
Srinivasulu Punuru [Fri, 9 Mar 2018 19:01:07 +0000 (11:01 -0800)] 
Infinite loop when trying to use SamzaSqlApplicationRunner in yarn mode

Right now when we try to use the SamzaSqlApplicationRunner in yarn mode it goes into the infinite loop because the appRunnerConfig that we try to set is being overwritten by the appRunnerConfig passed by the job and SamzaSqlApplicationRunner keeps creating it again and again in an infinite loop.

Fix : Userconfig for app.runner.class should not override the computed one. Added a test case to validate this.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #440 from srinipunuru/bug-fix.1

10 months agoSAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint
Shanthoosh Venkataraman [Fri, 9 Mar 2018 02:00:15 +0000 (18:00 -0800)] 
SAMZA-1589: Reduce failure retry duration in KafkaCheckpointManager.writeCheckpoint

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #438 from shanthoosh/SAMZA-1589

10 months agoSAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData
Shanthoosh Venkataraman [Wed, 7 Mar 2018 21:24:54 +0000 (13:24 -0800)] 
SAMZA-1607: Handle ZkNoNodeExistsException in zkUtils.readProcessorData

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #437 from shanthoosh/fix_zkutils_get_processor_data

10 months agoSAMZA-1602: Moving class StreamAssert from src/test to src/main
sanil15 [Wed, 7 Mar 2018 21:22:40 +0000 (13:22 -0800)] 
SAMZA-1602: Moving class StreamAssert from src/test to src/main

Tested with ./gradlew clean build, which is passing

Author: sanil15 <>

Reviewers: Xinyu Liu <>

Closes #439 from Sanil15/SAMZA-1602

10 months agoSAMZA-1498: Support arbitrary system clock timer in operators
xiliu [Wed, 7 Mar 2018 02:20:15 +0000 (18:20 -0800)] 
SAMZA-1498: Support arbitrary system clock timer in operators

This patch adds the capability to register arbitrary timers for both high-level and low-level api.
For high-level, InitableFunction will pass the TimerRegistry to user through the new OpContext, and user will implement the TimerFunction to get timer notifications.
For low-level api, user can register timer in the TaskContext, and then implements the TimerCallback for specific timer actions.

Author: xiliu <>
Author: xinyuiscool <>
Author: xinyuiscool <>

Reviewers: Pateek M <>

Closes #419 from xinyuiscool/SAMZA-1498

10 months agoSAMZA-1600: remove the combination of cleanup policy "compact,delete"…
Yi Pan (Data Infrastructure) [Tue, 6 Mar 2018 03:40:45 +0000 (19:40 -0800)] 
SAMZA-1600: remove the combination of cleanup policy "compact,delete"…

… in changelog topic properties

Author: Yi Pan (Data Infrastructure) <>

Reviewers: Jagadish <>

Closes #435 from nickpan47/SAMZA-1600

10 months agoSAMZA-1460: StreamAppender does not explicitly create logging topic
Daniel Nishimura [Tue, 6 Mar 2018 00:02:49 +0000 (16:02 -0800)] 
SAMZA-1460: StreamAppender does not explicitly create logging topic

Creates the StreamAppender stream explicitly instead of relying on auto stream creation.

Author: Daniel Nishimura <>

Reviewers: Prateek M <>

Closes #423 from dnishimura/samza-1460-streamappender-create-logging-topic

10 months agoMisc. minor cleanup.
Prateek Maheshwari [Sun, 4 Mar 2018 00:30:59 +0000 (16:30 -0800)] 
Misc. minor cleanup.

1. Added a meaningful name for the container thread pool threads.
2. Made the thread names for framework threads consistent.
3. Made a couple of monitoring/metrics threads daemon.
4. Fixed a few checkstyle warning about missing param/throws documentation.

Author: Prateek Maheshwari <>

Reviewers: Jagadish <>, Jacob M <>

Closes #433 from prateekm/container-thread-pool-name

10 months agoSAMZA-1598: Cleanup file
Daniel Nishimura [Sun, 4 Mar 2018 00:29:44 +0000 (16:29 -0800)] 
SAMZA-1598: Cleanup file

Update old links. Remove Jenkins build badge, in favor of Travis-CI.

Author: Daniel Nishimura <>

Reviewers: Jagdish <>

Closes #434 from dnishimura/samza-1598-clean-readme

10 months agoSAMZA-1596: Staging directory name has to be formatted in config
Akim Akimov [Thu, 1 Mar 2018 18:58:06 +0000 (10:58 -0800)] 
SAMZA-1596: Staging directory name has to be formatted in config

When we instantiate a HDFS config staging directory we missing a formatter for getStagingDirectory so systems.hdfs-system-name.stagingDirectory does not parse from config, but only systems.%s.stagingDirectory only parses instead.

Solution is to add formatter to getStagingDirectory method.

Author: Akim Akimov <>

Reviewers: Jagadish <>, Hai L<>

Closes #431 from Zedmor/FixStagingDirHDFS

10 months agoSAMZA-1597: Expose an interface for throttling
Wei Song [Tue, 27 Feb 2018 19:47:51 +0000 (11:47 -0800)] 
SAMZA-1597: Expose an interface for throttling

10 months agoSAMZA-1595: Fix scalacCompileOptions format to build with zinc scala compiler.
Shanthoosh Venkataraman [Fri, 23 Feb 2018 00:44:00 +0000 (16:44 -0800)] 
SAMZA-1595: Fix scalacCompileOptions format to build with zinc scala compiler.

Zinc scala compiler(part of gradle version >= 3.0) expects the scala compilation arguments as a list(where each compilation argument is an element of the list).

In samza, the compilation arguments are concatenated into a single string and passed to the compiler.

This causes build failures when samza is built with Zinc scala compiler.

Existing ant scala compiler used to build samza in open source accepts the compilation arguments both as list and string.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #430 from shanthoosh/master

10 months agoSAMZA-1594: Remove ScalaCompileOptions to make samza codebase build with gradle versi...
Shanthoosh Venkataraman [Thu, 22 Feb 2018 23:58:39 +0000 (15:58 -0800)] 
SAMZA-1594: Remove ScalaCompileOptions to make samza codebase build with gradle version > 3.0.

When samza repository is built with the gradle version greater than 3.0, we notice the following build failure.

No such property: useAnt for class: org.gradle.api.tasks.scala.ScalaCompileOptions

This needs to be fixed to build samza with  gradle version >= 3.0.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #427 from shanthoosh/fix_scalac_compile_options

10 months agoSAMZA-1593: Upgrade gradle nexus plugin.
Shanthoosh Venkataraman [Thu, 22 Feb 2018 20:39:10 +0000 (12:39 -0800)] 
SAMZA-1593: Upgrade gradle nexus plugin.

Current nexus plugin which samza repository is using is not compatible with the gradle versions greater than 3.0.

For doing the ligradle migration at linkedin, we need to update the nexus gradle plugin to the latest version.

Author: Shanthoosh Venkataraman <>

Reviewers: Xinyu Liu <>

Closes #426 from shanthoosh/master

11 months agoFixes to get the build working with Scala 2.10 build
Srinivasulu Punuru [Wed, 21 Feb 2018 19:45:23 +0000 (11:45 -0800)] 
Fixes to get the build working with Scala 2.10 build

The fixes needed to get the build working with the Scala 2.10.

Author: Srinivasulu Punuru <>

Reviewers: Jagadish <>, Jacob M<>, Bharath K<>, Xinyu L<>

Closes #424 from srinipunuru/rel-fixes.1

11 months agoSAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators
Daniel Nishimura [Fri, 16 Feb 2018 22:01:06 +0000 (14:01 -0800)] 
SAMZA-1555: Move creation of checkpoint and changelog streams to the Job Coordinators

The purpose of this PR is to consolidate the creation of the changelog and checkpoint streams into the JobCoordinators. In the current state, the changelog stream is created from the JobModelManager and the checkpoint stream is created within the OffsetManager. The issue with creating the checkpoint in the OffsetManager is that the first call happens from the first SamzaContainer that runs and each subsequent SamzaContainer run will attempt to create the checkpoint stream.

There are three driving forces for this refactoring. The first motivation is to assign the creation of the changelog and checkpoint streams to the JobCoordinators where it is most appropriate. This was discussed in more detail with nickpan47  . The second motivation is to have any potential failure to stream creation happen no later than during job coordination. The third motivation is to accommodate future security work to provide a robust way to set ACLs on streams.

Follow on to this PR will be:

Author: Daniel Nishimura <>

Reviewers: Yi Pan <>, Prateek Maheshwari <>

Closes #413 from dnishimura/samza-1555-move-changelog-checkpoint-creation and squashes the following commits:

1102314 [Daniel Nishimura] Fix a comment change that Intellij inadvertently refactored.
2b2eb16 [Daniel Nishimura] Trigger CI again
366b7b7 [Daniel Nishimura] Trigger CI
dce36b6 [Daniel Nishimura] Add isBootstrapped flag back to CoordinatorStreamSystemConsumer.
2efcfd4 [Daniel Nishimura] Trigger CI
6b2d912 [Daniel Nishimura] Changes related to Yi's latest review.
9d02e7a [Daniel Nishimura] Change createChangeLogStream to a static method.
effec24 [Daniel Nishimura] Cleanup from latest code review.
1bdfda7 [Daniel Nishimura] CoordinatorStreamManager lifecycle refactoring.
5178ca5 [Daniel Nishimura] Refactor AbstractCoordinatorStreamManager as a concrete class.
894af9c [Daniel Nishimura] Merge from master branch.
4009a0b [Daniel Nishimura] Changes from code review.
3a12a75 [Daniel Nishimura] Separation of changelog manager and jobmodel manager. Create CoordinatorStream class to encapsulate creation and management of coordinator stream consumer and producer.
c188adb [Daniel Nishimura] Merge from master branch.
971fa91 [Daniel Nishimura] Move the responsibility of changelog and checkpoint stream creation to the job coordinators.

11 months agoSAMZA-1588: Add random jitter to monitor’s scheduling interval.
Shanthoosh Venkataraman [Fri, 16 Feb 2018 18:34:21 +0000 (10:34 -0800)] 
SAMZA-1588: Add random jitter to monitor’s scheduling interval.

We’ve observed in LinkedIn execution environments that, all the monitors running on the YARN node-manager machines hitting an external service at the same time based upon the configured monitor scheduling interval.

To eliminate unnecessary monitor execution spike and congestion caused to an external service at the same time, it’s essential to add a random jitter to the monitor scheduling interval.

Random jitter will be added to monitor scheduling interval based upon a boolean configuration.

Author: Shanthoosh Venkataraman <>

Reviewers: Prateek Maheshwari <>

Closes #422 from shanthoosh/SAMZA-1588

11 months agoSupport source types where the last part of the source is not the streamName
Srinivasulu Punuru [Thu, 15 Feb 2018 23:46:17 +0000 (15:46 -0800)] 
Support source types where the last part of the source is not the streamName

Contains following fixes

1. Right now Samza SQL framework assumes that the last part of the source is the stream Name, removed the assumption
2. Made consoleLoggingSystemFactory to log formatted json so that it's easily readable.
3. Added support in SamzaSqlRelMessage where the key may not be present.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #421 from srinipunuru/four-part.1

11 months agoSAMZA-1568: Handle ZkInterruptedException in zkclient.close.
Shanthoosh Venkataraman [Wed, 14 Feb 2018 03:30:03 +0000 (19:30 -0800)] 
SAMZA-1568: Handle ZkInterruptedException in zkclient.close.

When zookeeper session failures occur in a stream processor,   leaves the group(zkClient is closed) and joins the group again.

The last step in that shutdown sequence is zkClient.close(). In some scenarios, it throws the following exception,

    org.I0Itec.zkclient.exception.ZkInterruptedException: java.lang.InterruptedException
    at org.I0Itec.zkclient.ZkClient.close(
    at org.apache.samza.zk.ZkControllerImpl.stop(

    at org.apache.samza.zk.ZkJobCoordinator.stop(
In existing implementation this is not handled, there by killing the stream processor.  The following codepath triggers this exception:

`StreamProcessor.stop -> ZkJobCoordinator.stop() ->  zkController.stop() -> zkUtils.close`

This exception causes the integration test to fail occasionally  and can cause LocalApplicationRunner.waitForFinish method call to block indefinitely(since this callback event success, updates the latch state required for waitForFinish to end).

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish <>

Closes #416 from shanthoosh/zk_utils_close

11 months agoSAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager.
Shanthoosh Venkataraman [Tue, 13 Feb 2018 19:22:34 +0000 (11:22 -0800)] 
SAMZA-1572: Add fixed retries on failure in KafkaCheckpointManager.

KafkaCheckpointManager.writeCheckpoint currently goes into a infinite loop when an irrecoverable failure happens, this indefinitely blocks the commit phase (there by preventing processing). Added finite retries (50), which would retry for fixed time in case of failure before giving up.

Author: Shanthoosh Venkataraman <>

Reviewers: Prateek M<>

Closes #420 from shanthoosh/add_fixed_retries_in_kafka_checkpoint_manager

11 months agoSAMZA-1489: TaskInstance should commit offset before it closes() if auto commit is...
Dong Lin [Thu, 1 Feb 2018 20:07:56 +0000 (12:07 -0800)] 
SAMZA-1489: TaskInstance should commit offset before it closes() if auto commit is enabled

Author: Dong Lin <>

Reviewers: Jagadish<>

Closes #417 from lindong28/SAMZA-1489

11 months agoSAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources
Jagadish [Thu, 1 Feb 2018 05:38:14 +0000 (21:38 -0800)] 
SAMZA-1552: Host affinity improvements - Improve matching of hosts to allocated resources

Author: Jagadish <>

Reviewers: Prateek <>

Closes #401 from vjagadish1989/host-affinity-fix

11 months agoSAMZA-1578: Fix watermark bug found by BEAM tests
Xinyu Liu [Fri, 26 Jan 2018 22:02:42 +0000 (14:02 -0800)] 
SAMZA-1578: Fix watermark bug found by BEAM tests

The problem is getOutputWatermark() does not return the real outputWatermark. This caused problem in user override watermark function.

Author: xiliu <>

Reviewers: Jagadish <>

Closes #415 from xinyuiscool/SAMZA-1578

11 months agoAdded some logging to stdout for easier parsing by tools.
Prateek Maheshwari [Fri, 26 Jan 2018 20:04:29 +0000 (12:04 -0800)] 
Added some logging to stdout for easier parsing by tools.

Author: Prateek Maheshwari <>

Reviewers: Jagadish<>

Closes #414 from prateekm/print-container-info

11 months agoSAMZA-1548; Add start() and stop() to SystemAdmin
Dong Lin [Fri, 26 Jan 2018 01:28:13 +0000 (17:28 -0800)] 
SAMZA-1548; Add start() and stop() to SystemAdmin

This patch adds start() and stop() to SystemAdmin interface. This can be useful for e.g. kafka.admin.AdminClient which needs to be started before it can be used.

Since we add this method in interface and expect AdminClient to be stateful and probably has its own thread, there will be higher cost to instantiate a new SystemAdmin. Thus we probably want to re-use the SystemAdmin instances instead of creating SystemAdmin on demand when needed. Therefore, this patch also adds SystemAdmins class to help manage a map from system to SystemAdmin, similar to the existing SystemProducers class in Samza.

Author: Dong Lin <>

Reviewers: Prateek Maheshwari <>

Closes #397 from lindong28/SAMZA-1548

11 months agoSAMZA-1557: Broadcast operator
Xinyu Liu [Wed, 24 Jan 2018 22:27:39 +0000 (14:27 -0800)] 
SAMZA-1557: Broadcast operator

This patch adds Broadcast operator that allows broadcasting messages to all tasks. It's the counterpart of the Samza broadcast stream in low level api, and will be used by BEAM runner to broadcast views as side input to other part of the pipeline.

Author: xiliu <>

Reviewers: Jagadish V <>

Closes #410 from xinyuiscool/SAMZA-1557

11 months agofix the bug containsValue method
michaelwong [Mon, 22 Jan 2018 22:11:25 +0000 (14:11 -0800)] 
fix the bug containsValue method

containsValue method should invoke map.containsValue not map.containsKey

Author: michaelwong <>

Reviewers: Jagadish <>

Closes #12 from jwongo/master

11 months agoupTime should be field not method
zhangyijun [Mon, 22 Jan 2018 22:05:51 +0000 (14:05 -0800)] 
upTime should be field not method

Author: zhangyijun <>

Reviewers: Jagadish <>

Closes #9 from zhangyijun/patch-1

11 months agoSAMZA-1561: Fix inconsistency problem in JobModel publish.
Shanthoosh Venkataraman [Mon, 22 Jan 2018 19:31:08 +0000 (11:31 -0800)] 
SAMZA-1561: Fix inconsistency problem in JobModel publish.

Author: Shanthoosh Venkataraman <>

Reviewers: Jagadish V<>, Xinyu Liu<>

Closes #409 from shanthoosh/master

12 months agoSAMZA-1407: upgrade junit version to 4.12
Fred Ji [Thu, 18 Jan 2018 02:35:34 +0000 (18:35 -0800)] 
SAMZA-1407: upgrade junit version to 4.12

"./gradlew clean check" passed

Author: Fred Ji <>

Reviewers: Jagadish <>

Closes #406 from fredji97/junit4_12_new

12 months agoSAMZA-1560: Handle key-serde errors in KafkaCheckpointManager
Jagadish [Thu, 18 Jan 2018 02:27:40 +0000 (18:27 -0800)] 
SAMZA-1560: Handle key-serde errors in KafkaCheckpointManager

Author: Jagadish <>

Reviewers: Xinyu Liu<>

Closes #408 from vjagadish1989/kcm-fix

12 months agoSAMZA-1558: State restore metrics should be duplicated and deprecated…
Jacob Maes [Wed, 17 Jan 2018 22:26:53 +0000 (14:26 -0800)] 
SAMZA-1558: State restore metrics should be duplicated and deprecated…

… to avoid type conflicts

Author: Jacob Maes <>

Reviewers: Prateek Maheshwari <>

Closes #407 from jmakes/samza-1558

12 months agoSAMZA-1523: Cleanup table entries before shutting down the processor
navina [Wed, 17 Jan 2018 18:11:40 +0000 (10:11 -0800)] 
SAMZA-1523: Cleanup table entries before shutting down the processor

Modified the `TableUtils#deleteProcessorEntity` to provide an option to disable optimistic locking during a call to Azure Table Storage service.

sborya PawasChhokra nickpan47   Review please?

Author: navina <>

Reviewers: Shanthoosh V<>, Boris S<>

Closes #379 from navina/azure-etag-fix

12 months agoSAMZA-1556: Adding support for multi level sources in queries
Srinivasulu Punuru [Wed, 17 Jan 2018 18:06:50 +0000 (10:06 -0800)] 
SAMZA-1556: Adding support for multi level sources in queries

Right now Samza SQL supports queries with just two levels i.e. `select * from`. But there can be sources that are identified though multiple levels. for e.g. `select * from kafka.clusterName.topicName`.

This change adds the support for sql queries with sources that have more than two levels.

Author: Srinivasulu Punuru <>

Reviewers: Miguel S<>, Aditya T<>

Closes #405 from srinipunuru/multi-level.1

12 months agoSAMZA-1500: Added metrics for RocksDB state store memory usage
Prateek Maheshwari [Wed, 17 Jan 2018 17:39:26 +0000 (09:39 -0800)] 
SAMZA-1500: Added metrics for RocksDB state store memory usage

Approximate RocksDB memory usage = Configured Block Cache size + MemTable size + Indexes and Bloom Filters size =
rocksdb.block-cache-size + rocksdb.size-all-mem-tables + rocksdb.estimate-table-readers-mem

Author: Prateek Maheshwari <>

Reviewers: Jagadish <>

Closes #404 from prateekm/rocksdb-memory

12 months agoFix for the TestSamzaSqlApplicationConfig.testConfigInit
Srinivasulu Punuru [Thu, 11 Jan 2018 01:14:03 +0000 (17:14 -0800)] 
Fix for the TestSamzaSqlApplicationConfig.testConfigInit

Currently testConfigInit checks for a hardcoded number for udfs. Whenever a new UDF is added, This test is going to fail if it is not updated. Changed the test to validate the number of udfs based on the config that is passed.

Author: Srinivasulu Punuru <>

Reviewers: Prateek Maheshwari <>

Closes #403 from srinipunuru/testfix.1

12 months agoSAMZA-1535: Support for UDFs in where clauses
Srinivasulu Punuru [Wed, 10 Jan 2018 19:17:39 +0000 (11:17 -0800)] 
SAMZA-1535: Support for UDFs in where clauses

The existing version of the udf implementation doesn't seem to support udfs in the where clauses because the Type of the object returned is "ANY" and when you do a
`select * from kafka.topic where regexMatch('.*foo', Name)` it fails in the query validation, because calcite doesn't know the type of regexMatch.

To solve the problem, We made the scalarUdf generic with a strongly typed return type.

This PR can be merged into trunk not the 0.14.

Author: Srinivasulu Punuru <>

Reviewers: Xinyu Liu <>

Closes #386 from srinipunuru/udf-where.1

12 months agoSAMZA-1530; Bump up Kafka dependency to 0.11
Dong Lin [Wed, 10 Jan 2018 18:52:38 +0000 (10:52 -0800)] 
SAMZA-1530; Bump up Kafka dependency to 0.11

Author: Dong Lin <>

Reviewers: Xinyu Liu <>

Closes #395 from lindong28/SAMZA-1530

12 months agoRevert "SAMZA-1553: Add log4j for latest Kafka build"
xiliu [Wed, 10 Jan 2018 18:33:23 +0000 (10:33 -0800)] 
Revert "SAMZA-1553: Add log4j for latest Kafka build"

This reverts commit 5238aaa6cee81a87079c9d432204422ececea793.

12 months agoSAMZA-1550: Update samza 0.14 version in tests
xiliu [Tue, 9 Jan 2018 23:53:40 +0000 (15:53 -0800)] 
SAMZA-1550: Update samza 0.14 version in tests

12 months agoSAMZA-1553: Add log4j for latest Kafka build
Xinyu Liu [Tue, 9 Jan 2018 18:48:10 +0000 (10:48 -0800)] 
SAMZA-1553: Add log4j for latest Kafka build

Add it so Samza compiles with the latest kafka.

Author: xiliu <>

Reviewers: Boris Shkolnik <>

Closes #402 from xinyuiscool/SAMZA-1553

12 months agoFix a link in
xiliu [Thu, 4 Jan 2018 17:51:20 +0000 (09:51 -0800)] 
Fix a link in