5 months agoAuto update partition for pulsar kafka producer (#28) master
Rajan Dhabalia [Thu, 31 Mar 2022 17:19:19 +0000 (10:19 -0700)] 
Auto update partition for pulsar kafka producer (#28)

6 months agoImport dependencies from Pulsar's parent POM (#35)
Christophe Bornet [Tue, 8 Mar 2022 10:02:40 +0000 (11:02 +0100)] 
Import dependencies from Pulsar's parent POM (#35)

7 months agoRemoving Flink in favor of https://github.com/apache/flink/tree/master/flink-connecto...
Andrey Yegorov [Thu, 3 Feb 2022 21:59:01 +0000 (13:59 -0800)] 
Removing Flink in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar (#34)

### Motivation

Removing Flink adapter in favor of https://github.com/apache/flink/tree/master/flink-connectors/flink-connector-pulsar
This one is based on an old version of Flink which brings in dependencies with various CVEs, since that version Flink added pulsar connector in their project.

### Modifications

Removed Flink adapter, tests, examples, and dependencies.

8 months agoUpgrade Log4J2 to 2.17.1 (#32)
Lari Hotari [Thu, 20 Jan 2022 17:34:54 +0000 (19:34 +0200)] 
Upgrade Log4J2 to 2.17.1 (#32)

9 months agoUpgrade Log4J2 to 2.15.0 to mitigate CVE-2021-44228 (#30)
Lari Hotari [Fri, 10 Dec 2021 11:43:30 +0000 (13:43 +0200)] 
Upgrade Log4J2 to 2.15.0 to mitigate CVE-2021-44228 (#30)

14 months ago[pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26)
Rajan Dhabalia [Tue, 20 Jul 2021 20:05:37 +0000 (13:05 -0700)] 
[pulsar-kafka] Support encryption for pulsar-kafka producer/consumer (#26)

* [pulsar-kafka] Support encryption for pulsar-kafka producer/consumer

* add properties param for getEncryptionKey method

15 months agoAdd build instructions to README (#24)
Enrico Olivelli [Sat, 26 Jun 2021 12:07:25 +0000 (14:07 +0200)] 
Add build instructions to README (#24)

Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
15 months agoFixed integration tests (#22)
Andrey Yegorov [Fri, 18 Jun 2021 08:09:50 +0000 (01:09 -0700)] 
Fixed integration tests (#22)

### Motivation

KafkaApiTest is failing

### Modifications

1. conversion between Pulsar topic name and Kafka TopicPartition ended up with TopicPartition using name with "-partition-<partition idx>"

2. Seek was not working correctly:

PulsarKafkaConsumer seeks to beginning, as asked.
Clears lastReceivedOffset in the process.

on poll it checks
            if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
                 log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
seek didn't update unpolledPartitions - reset offset uses default strategy to reset => seeks to the end

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is already covered by existing tests, such as KafkaApiTest.

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*


### Documentation

  - Does this pull request introduce a new feature? NO

15 months ago[Build] Add GitHub Actions build for pulsar-adapters (#19)
Lari Hotari [Thu, 17 Jun 2021 10:49:10 +0000 (13:49 +0300)] 
[Build] Add GitHub Actions build for pulsar-adapters (#19)

15 months agoAdd guava dependency to modules that need it (#21)
Lari Hotari [Thu, 17 Jun 2021 10:43:35 +0000 (13:43 +0300)] 
Add guava dependency to modules that need it (#21)

- guava dependency came via buildtools previously

15 months ago[Build] Fix leakage of buildtools dependency to all libraries (#20)
Lari Hotari [Wed, 16 Jun 2021 19:59:36 +0000 (22:59 +0300)] 
[Build] Fix leakage of buildtools dependency to all libraries (#20)

15 months agoPulsar Log4j2 Appender: allow configuration of the client (#17)
Lari Hotari [Wed, 16 Jun 2021 07:33:40 +0000 (10:33 +0300)] 
Pulsar Log4j2 Appender: allow configuration of the client (#17)

Fixes #15, #16

### Motivation

When Pulsar security is enabled, the current implementation is unusable. This PR also covers the feature request #16 to allow configuring most client and producer configuration properties.
The issue #15 makes the Pulsar Log4j2 Appender unusable in many use cases. This PR fixes the issue by removing the hard coded producer name.
The current build for Pulsar Log4j2 Appender is broken. This PR fixes the build by properly configuring Log4j2 plugin annotation processor for the maven build. There were broken tests which were unnecessary and testing Log4j2 internals instead of the appender. Those tests have been removed

### Modifications

- add support for configuring Pulsar client and producer configuration properties (#16)
- fix maven build (Log4j2 plugin annotation processor)
- remove broken tests which were unnecessary
- remove the hard coded producer name which prevented using the appender in many use cases (#15)

15 months ago[Build] Upgrade Pulsar dependency to 2.8.0 and fix build errors (#18)
Lari Hotari [Wed, 16 Jun 2021 07:29:27 +0000 (10:29 +0300)] 
[Build] Upgrade Pulsar dependency to 2.8.0 and fix build errors (#18)

16 months ago[pulsar-kafka] add auto update partition support to producer/consumer (#13)
Rajan Dhabalia [Fri, 14 May 2021 06:24:29 +0000 (23:24 -0700)] 
[pulsar-kafka] add auto update partition support to producer/consumer (#13)

17 months agoFeature: Run Kafka streams app with Pulsar (#10)
Andrey Yegorov [Sun, 11 Apr 2021 08:30:50 +0000 (01:30 -0700)] 
Feature: Run Kafka streams app with Pulsar (#10)

17 months agobuild pulsar-adaptors with pulsar-2.8-snapshot (#9)
Andrey Yegorov [Thu, 8 Apr 2021 16:02:33 +0000 (09:02 -0700)] 
build pulsar-adaptors with pulsar-2.8-snapshot (#9)

22 months agoMove examples and tests from main repo here (#3)
Sijie Guo [Thu, 12 Nov 2020 00:45:46 +0000 (17:45 -0700)] 
Move examples and tests from main repo here (#3)

22 months agoFix checkstyle issue on pulsar-spark module (#2)
Sijie Guo [Wed, 11 Nov 2020 16:57:58 +0000 (09:57 -0700)] 
Fix checkstyle issue on pulsar-spark module (#2)


Enable checkstyle plugin on pulsar-spark module and fix all the issues.

22 months agoBuild pulsar-adapters (#1)
Sijie Guo [Fri, 6 Nov 2020 17:32:50 +0000 (10:32 -0700)] 
Build pulsar-adapters (#1)

* Build pulsar-adapters

22 months agoInitialize pulsar-adapters project
Sijie Guo [Sun, 12 Apr 2020 20:20:45 +0000 (13:20 -0700)] 
Initialize pulsar-adapters project

22 months agoAllow building Apache Pulsar on JDK15+ - upgrade Maven Assembly Plugin (#8360)
Enrico Olivelli [Thu, 29 Oct 2020 07:24:07 +0000 (08:24 +0100)] 
Allow building Apache Pulsar on JDK15+ - upgrade Maven Assembly Plugin (#8360)

Currently Pulsar does not build on JDK15

update Maven Assembly plugin
move maven-git-commit-id plugin execution only inside Pulsar Common (in order to speed up the build)
upgrading the Maven Assembly Plugin resulted in a different output for the "binaries", I had to update license files
forced version of Jersey to 2.31 in "Presto" distribution (otherwise the license check failed, because it does not handle different versions between main pulsar binaries and presto distribution inside lib/presto/lib, and this makes sense to me)
Verifying this change
This change is already covered by existing tests

* Allow to build Pulsar on JDK14+
- update Apache Maven Assembly Plugin
- use explicit import for Record class (prevent clash with java.lang.Record)

* revert Apache pom update

* fix

* Fix presto distribution - due to maven assembly plugin upgrade

* fix licenses

* Fix kafka clients


Co-authored-by: Enrico Olivelli <enrico.olivelli@diennea.com>
Co-authored-by: Enrico Olivelli <eolivelli@apache.org>
23 months agoRemove bouncy castle shaded module to avoid bring error of verifySingleJar (#7453)
Jia Zhai [Fri, 23 Oct 2020 06:36:13 +0000 (14:36 +0800)] 
Remove bouncy castle shaded module to avoid bring error of verifySingleJar (#7453)

### Motivation

shade bouncy castle will cause some signature errors, this PR tries to remove the bouncy castle shaded module.

Here is the related error stack:
10:01:34.257 [pulsar-client-io-33-1] ERROR org.apache.pulsar.client.impl.ConsumerImpl - MessageCryptoBc may not included in the jar. e:
java.lang.SecurityException: JCE cannot authenticate the provider BC
at javax.crypto.Cipher.getInstance(Cipher.java:657) ~[?:1.8.0_121]
at javax.crypto.Cipher.getInstance(Cipher.java:596) ~[?:1.8.0_121]
at org.apache.pulsar.client.impl.crypto.MessageCryptoBc.<init>(MessageCryptoBc.java:147) ~[classes/:?]
at org.apache.pulsar.client.impl.ConsumerImpl.<init>(ConsumerImpl.java:270) ~[classes/:?]
at org.apache.pulsar.client.impl.ConsumerImpl.newConsumerImpl(ConsumerImpl.java:209) ~[classes/:?]
at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doSingleTopicSubscribeAsync$5(PulsarClientImpl.java:364) ~[classes/:?]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]

Caused by: java.util.jar.JarException: file:/Users/jia/.m2/repository/org/apache/pulsar/bouncy-castle-bc-shaded/2.7.0-SNAPSHOT/bouncy-castle-bc-shaded-2.7.0-SNAPSHOT.jar has unsigned entries - org/bouncycastle/cert/AttributeCertificateHolder.class
at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:500) ~[?:1.8.0_121]
at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:361) ~[?:1.8.0_121]
at javax.crypto.JarVerifier.verify(JarVerifier.java:289) ~[?:1.8.0_121]
at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:159) ~[?:1.8.0_121]
at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:185) ~[?:1.8.0_121]
at javax.crypto.Cipher.getInstance(Cipher.java:653) ~[?:1.8.0_121]

### Modifications

- Remove bouncy castle shaded module, avoid package bouncy castle into a dependency jar.
- enhance test case to identify this error.

### Verifying this change

ut passed.

* remove dep of bc-shaded from other module

* remove bc-shaded module

* enhance testECDSAEncryption and testRSAEncryption to cover error case

* fix license check

* remove bc-shaded module

* build a jar in jar to avoid break bc signature

* use new bc dependency by classifier in maven

* build pulsar-all docker image instead of pull from dockerhub in integration tests

* remove nar

* fix licence, fix error brings in #7640

* add bc when broker/client is referenced in pom

* add missing bc reference in pom

* change ci back to not build docker image

23 months ago[pulsar-admin] allow to get ledger metadata along with topic stats-internal (#8180)
Rajan Dhabalia [Fri, 2 Oct 2020 21:34:29 +0000 (14:34 -0700)] 
[pulsar-admin] allow to get ledger metadata along with topic stats-internal (#8180)

### Motivation

Pulsar-admin api and CLI utility is a tool used for operational and system debugging purpose. `pulsar-admin topics stats-internal` command gives internals of the topic and it also shows list of ledgers where data has been written. However, sometimes operation team wants to know more details about ledgers such as : number of ensembles and list of bookies which own the ledger to find out issue with faulty bookie. right now, we have to use zookeeper-shell utility and read znode value to fetch such information.
So, it would be useful if ledger-info also has ledger-metadata and we can get it from pulsar-admin CLI.

### Modification
- Added ledger metadata into ledger-info response of the `topics stats-internal` command.
- user can pass additional flag to add ledger metadata else user will not see any change in output of `topics stats-internal`

### Result
./bin/pulsar-admin persistent stats-internal <topic> -m
    "ledgerId" : 23,
    "entries" : 200,
    "size" : 11100,
    "offloaded" : false,
    "metadata" : "LedgerMetadata{formatVersion=3, ensembleSize=2, writeQuorumSize=2, ackQuorumSize=2, state=CLOSED, length=11100, lastEntryId=199, digestType=CRC32C, password=OMITTED, ensembles={0=[,], 100=[,]}, customMetadata={component=base64:bWFuYWdlZC1sZWRnZXI=, pulsar/managed-ledger=base64:c2FtcGxlL3N0YW5kYWxvbmUvbnMxL3BlcnNpc3RlbnQvdDE=}}"

23 months agoFix JavaDoc issues. Spotted by IntelliJ (#8054)
Frank J Kelly [Wed, 30 Sep 2020 03:35:32 +0000 (23:35 -0400)] 
Fix JavaDoc issues. Spotted by IntelliJ (#8054)

Issues spotted by IntelliJ

### Motivation
IntelliJ complains and highlights these issues

### Modifications
Sync JavaDoc with code

### Verifying this change
This change is a trivial rework / code cleanup without any test coverage.

2 years agoSupport acknowledging a list of messages (#7688)
feynmanlin [Mon, 17 Aug 2020 13:08:37 +0000 (21:08 +0800)] 
Support acknowledging a list of messages (#7688)

Fixes #7626

### Motivation
Expose `MessagesImpl` ,so that we can ack list of messages by using `CompletableFuture<Void> acknowledgeAsync(Messages<?> messages)`

### Modifications
Change the visibility level of the method from protect to public

### Verifying this change
unit test:

2 years agoAvoid dependency on Apache HTTP client for TLS hostname verifier (#7612) (#7664)
Matteo Merli [Mon, 27 Jul 2020 16:36:39 +0000 (09:36 -0700)] 
Avoid dependency on Apache HTTP client for TLS hostname verifier (#7612) (#7664)

2 years ago#5922 - Update poms to use project.groupId instead of org.apache.pulsar where applica...
Varghese Cottagiri [Thu, 16 Jul 2020 05:24:26 +0000 (01:24 -0400)] 
#5922 - Update poms to use project.groupId instead of org.apache.pulsar where applicable (#7548)

2 years agoBumped version to 2.7.0-SNAPSHOT (#7233)
lipenghui [Thu, 11 Jun 2020 08:30:26 +0000 (16:30 +0800)] 
Bumped version to 2.7.0-SNAPSHOT (#7233)

* Bumped version to 2.7.0-SNAPSHOT

2 years agoFix apache rat check (#7225)
lipenghui [Tue, 9 Jun 2020 23:13:41 +0000 (07:13 +0800)] 
Fix apache rat check (#7225)

2 years agoFix kafka_0_9 Consumer partition topic throw error (#7179)
liudezhi [Mon, 8 Jun 2020 05:26:04 +0000 (13:26 +0800)] 
Fix kafka_0_9 Consumer partition topic throw error (#7179)

Master Issue: #7178
## Motivation

When using pulsar-client-kafka_0_9 version to consume partition topic, will throw java.lang.ClassCastException,
org.apache.pulsar.client.impl.TopicMessageIdImpl cannot be cast to org.apache.pulsar.client.impl.MessageIdImpl
## Modifications
 public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
        List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();

        List<TopicPartition> topicPartitions = new ArrayList<>();
        try {
            for (String topic : topics) {
                // Create individual subscription on each partition, that way we can keep using the
                // acknowledgeCumulative()
                int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                if (numberOfPartitions > 1) {
                    // Subscribe to each partition
                    for (int i = 0; i < numberOfPartitions; i++) {
                        String partitionName = TopicName.get(topic).getPartition(i).toString();
                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                        int partitionIndex = i;
                        TopicPartition tp = new TopicPartition(
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
We should remove consumerBuilder.topics(topics)。

2 years ago[Issue 4803][client] return null if the message value/data is not set by producer...
Neng Lu [Tue, 19 May 2020 06:11:42 +0000 (23:11 -0700)] 
[Issue 4803][client] return null if the message value/data is not set by producer (#6379)

Fixes #4803

### Motivation
Allow the typed consumer receive messages with `null` value if the producer sends message without payload.

### Modifications
- add a flag in `MessageMetadata` to indicate if the payload is set when the message is created
- check and return `null` if the flag is not set when reading data from a message

2 years ago[pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)
Rajan Dhabalia [Tue, 19 May 2020 04:53:36 +0000 (21:53 -0700)] 
[pulsar-storm] Fix: Authentication is failing with storm adapter (#6782)

### Motivation
In #4284, made [Authentication](https://github.com/apache/pulsar/blob/master/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Authentication.java#L33) a transient under ClientConfigurationData. `Authentication` is a serializable class because ClientConfig is used in Storm and Spark adapter to pass client configuration and Storm serializes and deserializes spout and bolt while executing them in topology. Now, after making `Authentication` transient variable storm always deserializes it as null and authentication fails. Also `Authentication` is a serializable class so, any auth-implementation must be serializable.

### Modification
Keep Authentication param serializable.

### Result
It fixes pulsar-storm with authentication enabled.

2 years agoMake messageReceiveTimeoutMs in the PulsarConsumerSource configurable (#6783)
luceneReader [Sat, 25 Apr 2020 06:47:56 +0000 (14:47 +0800)] 
Make messageReceiveTimeoutMs in the PulsarConsumerSource configurable (#6783)

The messageReceiveTimeoutMs value in the PulsarConsumerSource class is hardcoded to 100ms and cannot be altered through configuration at present.

2 years agoUpdate the status of pulsar-flink connector (#6657)
Sijie Guo [Thu, 9 Apr 2020 19:06:52 +0000 (12:06 -0700)] 
Update the status of pulsar-flink connector (#6657)

2 years agoSupport Consumers Set Custom Retry Delay (#6449)
liudezhi [Mon, 6 Apr 2020 18:17:01 +0000 (02:17 +0800)] 
Support Consumers Set Custom Retry Delay (#6449)

### Contribution Checklist

  - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
    Skip *Issue XYZ* if there is no associated github issue for this pull request.
    Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.

  - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.

  - Each pull request should address only one issue, not mix up code from multiple issues.

  - Each commit in the pull request has a meaningful commit message

  - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.

**(The sections below can be removed for hotfixes of typos)**

Master Issue: #6448

### Motivation

For many online business systems, various exceptions usually occur in business logic processing, so the message needs to be re-consumed, but users hope that this delay time can be controlled flexibly. The current user's processing method is usually to send this message to a special retry topic, because production can specify any delay, so consumers subscribe the business topic and retry topic at the same time. I think this logic can be supported by pulsar itself, making it easier for users to use, and it looks like this is a very common requirement.

### Modifications

This change can be supported on the client side,  need to add a set of interfaces to org.apache.pulsar.client.api.Consumer
void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException;
CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit);
CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, int delayLevel);
DeadLetterPolicy add retry topic
public class DeadLetterPolicy {

     * Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
    private int maxRedeliverCount;

     * Name of the retry topic where the failing messages will be sent.
    private String retryLetterTopic;

     * Name of the dead topic where the failing messages will be sent.
    private String deadLetterTopic;


org.apache.pulsar.client.impl.ConsumerImpl add a retry producer
  private volatile Producer<T> deadLetterProducer;

  private volatile Producer<T> retryLetterProducer;
Can specify whether to enable retry when creating a consumer,default unenable
    public ConsumerBuilder<T> enableRetry(boolean retryEnable) {
        return this;

2 years agoNumber of placeholder doesn't match number of arguments in logging (#6618)
Yijie Shen [Fri, 27 Mar 2020 11:31:06 +0000 (19:31 +0800)] 
Number of placeholder doesn't match number of arguments in logging (#6618)

2 years agoISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer...
duli559 [Thu, 26 Mar 2020 11:00:25 +0000 (19:00 +0800)] 
ISSUE-5934: Support read/write properties from/to Message in flink pulsar consumer/producer (#5955)

Fix #5934

Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class


1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'.
2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message.

* fix unit test failure

Co-authored-by: herodu <herodu@tencent.com>
Co-authored-by: Sijie Guo <sijie@apache.org>
Co-authored-by: duli <554979476@163.com>
2 years ago[Maven Cleanup] Remove managed-ledger and zk-utils test-jar dependencies when possibl...
Yijie Shen [Wed, 11 Mar 2020 01:31:00 +0000 (09:31 +0800)] 
[Maven Cleanup] Remove managed-ledger and zk-utils test-jar dependencies when possible (#6513)

Currently, many modules depend on `managed-ledger-test.jar` just because they want to use MockBookkeeper and MockZookeeper.  This made module dependencies hard to understand and track.

This PR introduces a new `testmocks` module and pulls all mocks from managed-ledger tests into the new module.

2 years agoIndependent schema is set for each consumer generated by topic (#6356)
congbo [Fri, 6 Mar 2020 06:28:30 +0000 (14:28 +0800)] 
Independent schema is set for each consumer generated by topic (#6356)

### Motivation

Master Issue: #5454

When one Consumer subscribe multi topic, setSchemaInfoPorvider() will be covered by the consumer generated by the last topic.

### Modification
clone schema for each consumer generated by topic.
### Verifying this change
Add the schemaTest for it.

2 years ago[Flink-Connector]Get PulsarClient from cache should always return an open instance...
Yijie Shen [Sun, 1 Mar 2020 09:28:56 +0000 (17:28 +0800)] 
[Flink-Connector]Get PulsarClient from cache should always return an open instance (#6436)

2 years ago[Pulsar-Client] Stop shade snappy-java in pulsar-client-shaded (#6375)
Yijie Shen [Fri, 21 Feb 2020 06:20:14 +0000 (14:20 +0800)] 
[Pulsar-Client] Stop shade snappy-java in pulsar-client-shaded (#6375)

Fixes #6260

Snappy, like other compressions (LZ4, ZSTD), depends on native libraries to do the real encode/decode stuff. When we shade them in a fat jar, only the java implementations of snappy class are shaded, however, left the JNI incompatible with the underlying c++ code.

We should just remove the shade for snappy, and let maven import its lib as a dependency.

I've tested the shaded jar locally generated by this pr, it works for all compression codecs.

2 years ago[pulsar-kafka] support block-producer on queue-full with sendTimeOut configuration...
Rajan Dhabalia [Mon, 10 Feb 2020 01:31:11 +0000 (17:31 -0800)] 
[pulsar-kafka] support block-producer on queue-full with sendTimeOut configuration (#6139)

### Motivation
Right now, pulsar-kafka producer block the publish when queue is full if `sendTimeOut > 0`. However, we have multiple users who want to configure `sendTimeOut` but doesn't want to block the thread and need immediate failure.
So, add option to configure `BLOCK_IF_PRODUCER_QUEUE_FULL` and it will not impact existing behavior because if the `BLOCK_IF_PRODUCER_QUEUE_FULL` doesn't exist then it will fallback to existing behavior.

2 years ago[Issue 6024][pulsar_storm] PulsarSpout emit to multiple streams (#6039)
David Willcox [Mon, 3 Feb 2020 18:03:19 +0000 (12:03 -0600)] 
[Issue 6024][pulsar_storm] PulsarSpout emit to multiple streams (#6039)

Fixes #6024

### Motivation
This is all described in detail in https://github.com/apache/pulsar/issues/6024, but in short, an insurmountable obstacle to using Pulsar in our storm topology is the fact that `PulsarSpout` only emits to the "default" stream. In our environment, we need to emit on different streams based on the content of each received message. This change extends `PulsarSpout` to recognize a `Values` extension that specifies an alternate output stream, and uses that stream when given.

### Modifications
A new `PulsarTuple` class is added. It extends `Values` and adds a method to return the output stream.

When emitting a tuple after calling `toValues(msg)`, `PulsarSpout` checks if the returned `Values` is a `PulsarTuple`. If so, it emits to the designated stream, otherwise it emits as before.

2 years agoSimplify templating in log messages (#6133)
Sergii Zhevzhyk [Fri, 24 Jan 2020 21:03:22 +0000 (22:03 +0100)] 
Simplify templating in log messages (#6133)

2 years ago[build] Skip javadoc task for pulsar-client-kafka-compact modules (#5836)
Sijie Guo [Tue, 14 Jan 2020 13:13:39 +0000 (21:13 +0800)] 
[build] Skip javadoc task for pulsar-client-kafka-compact modules (#5836)


pulsar-client-kafka-compact depends on pulsar-client implementation hence it pulls in
protobuf dependencies. This results in `class file for com.google.protobuf.GeneratedMessageV3 not found`
errors when generating javadoc for those modules.


Skip javadoc tasks for these modules. Because:

- pulsar-client-kafka-compact is a kafka wrapper. Kafka already provides javadoc for this API.
- we didn't publish the javadoc for this module.

2 years agoUpgrade Avro to 1.9.1 (#5938)
Masahiro Sakamoto [Mon, 6 Jan 2020 04:06:05 +0000 (13:06 +0900)] 
Upgrade Avro to 1.9.1 (#5938)

### Motivation

Currently, Pulsar uses Avro 1.8.2, a version released two years ago. The latest version of Avro is 1.9.1, which uses FasterXML's Jackson 2.x instead of Codehaus's Jackson 1.x. Jackson is prone to security issues, so we should not keep using older versions.

### Modifications

Avro 1.9 has some major changes:

- The library used to handle logical datetime values has changed from Joda-Time to JSR-310 (https://github.com/apache/avro/pull/631)
- Namespaces no longer include "$" when generating schemas containing inner classes using ReflectData (https://github.com/apache/avro/pull/283)
- Validation of default values has been enabled (https://github.com/apache/avro/pull/288). This results in a validation error when parsing the following schema:
  "name": "fieldName",
  "type": [
  "default": "defaultValue"
The default value of a nullable field must be null (cf. https://issues.apache.org/jira/browse/AVRO-1803), and the default value of the field as above is actually null. However, this PR disables the validation in order to maintain the traditional behavior.

2 years ago[pulsar-flink]Cache Pulsar client to make it shared among tasks in a process (#5900)
Yijie Shen [Fri, 3 Jan 2020 13:00:28 +0000 (21:00 +0800)] 
[pulsar-flink]Cache Pulsar client to make it shared among tasks in a process (#5900)

* Cache Pulsar client to make it shared among tasks in a process

* code format & add tests

* fix style

Co-authored-by: Sijie Guo <guosijie@gmail.com>
2 years agoAdded catch for NoClassDefFoundError wherever there was a ClassNotFoundException...
Fred Eisele [Fri, 20 Dec 2019 05:28:49 +0000 (23:28 -0600)] 
Added catch for NoClassDefFoundError wherever there was a ClassNotFoundException (#5870)

Fixes #5726

### Motivation

When running pulsar-io connectors and functions from the Intellij IDE some actions fail
due to uncaught class-not-found throwables.
The expectation being that the class is being dynamically loaded and only the ClassNotFoundException will occur if the class is not found.
When the function is created or run with https://pulsar.apache.org/docs/en/functions-deploying/#local-run-mode this is indeed the case.
When running under the control https://pulsar.apache.org/docs/en/functions-debug/#debug-with-localrun-mode as a gradle plugin the class may already be known and throw a NoClassDefFoundError.
It seems to me that any time ClassNotFoundException is handled then NoClassDefFoundError should also be caught.

### Modifications

Wherever there was a `catch (ClassNotFoundException ` I replaced it with
`catch (ClassNotFoundException | NoClassDefFoundError ` .
There were multiple cases where the ClassNotFoundException were handled e.g. the jar loader failed so the nar loader was used to handle the jar loader's failure.

2 years agoBump version to 2.6.0 (#5820)
Sijie Guo [Mon, 9 Dec 2019 16:50:21 +0000 (08:50 -0800)] 
Bump version to 2.6.0 (#5820)


Bump the development version to 2.6.0-SNAPSHOT

2 years ago[PIP-38] Support batch receive in java client. (#4621)
lipenghui [Tue, 19 Nov 2019 07:30:10 +0000 (15:30 +0800)] 
[PIP-38] Support batch receive in java client. (#4621)

Support messages batch receiving, some application scenarios can be made simpler. Users often increase application throughput through batch operations. For example, batch insert or update database.

At present, we provide the ability to receive a single message. If users want to take advantage of batch operating advantages, need to implement a message collector him self. So this proposal aims to provide a universal interface and mechanism for batch receiving messages.

For example:
Messages messages = consumer.batchReceive();
Verifying this change
Added new UT to verify this change.

2 years agoUse StandardCharsets.UTF_8 for converting String to bytes (#5372)
Like [Wed, 6 Nov 2019 12:33:28 +0000 (20:33 +0800)] 
Use StandardCharsets.UTF_8 for converting String to bytes (#5372)

Use StandardCharsets.UTF_8 for converting String to bytes to avoid potential encoding error.

2 years agoexpose new message with different schema (#5517)
Yi Tang [Tue, 5 Nov 2019 03:07:07 +0000 (11:07 +0800)] 
expose new message with different schema (#5517)

Master Issue: #5141

Expose new message with different schema interface, which not required same parameterized type with the producer.
Since the producer and messages sent by it may have different inner types, it's better to have a type agnostic producer interceptor with a checkin method.

2 years ago[Test] Migrate deprecated Matchers to ArgumentMatchers (#5423)
Like [Thu, 31 Oct 2019 20:09:51 +0000 (04:09 +0800)] 
[Test] Migrate deprecated Matchers to ArgumentMatchers (#5423)

* Migrate deprecated Matchers to ArgumentMatchers

* Add ServerCnxTest

* add PulsarKafkaProducerTest

2 years agoUse individual netty-* artifacts instead of netty-all (#3613)
Matteo Merli [Fri, 25 Oct 2019 16:43:45 +0000 (09:43 -0700)] 
Use individual netty-* artifacts instead of netty-all (#3613)

3 years agoRemoved managed-ledger-shaded artifact (#4200)
Matteo Merli [Thu, 26 Sep 2019 04:01:59 +0000 (21:01 -0700)] 
Removed managed-ledger-shaded artifact (#4200)

* Removed managed-ledger-shaded artifact

* Fixed dep artifact renaming error

3 years agoFix client backoff (#5261)
Boyang Jerry Peng [Tue, 24 Sep 2019 20:48:37 +0000 (13:48 -0700)] 
Fix client backoff (#5261)

* fix client backoff

* fix tests

* cleaning up

3 years agoFixed spark receiver to account for all the consumer config options (#5152)
Matteo Merli [Wed, 11 Sep 2019 21:19:27 +0000 (14:19 -0700)] 
Fixed spark receiver to account for all the consumer config options (#5152)

3 years agoMake client keepalive interval configurable on pulsar-client-kafka (#5131)
Rikuo Takahama [Fri, 6 Sep 2019 15:36:07 +0000 (00:36 +0900)] 
Make client keepalive interval configurable on pulsar-client-kafka (#5131)

3 years agoexpose getLastMessageId method in ConsumerImpl (#4911)
Jia Zhai [Sun, 18 Aug 2019 07:39:37 +0000 (15:39 +0800)] 
expose getLastMessageId method in ConsumerImpl (#4911)

Fixes #4909
### Motivation

It would be good to expose method `getLastMessageId` in `ConsumerImpl` to a public method.
eg. some times user would like to know the lag messages; or only consume messages before current time.

### Modifications

- expose method `getLastMessageId` in consumer api.
- add unit test.

### Verifying this change
Ut passed

3 years agoAdd support of pulsar-kafka-adapter for kafka-0.8 api (#4797)
Rajan Dhabalia [Tue, 6 Aug 2019 16:53:56 +0000 (09:53 -0700)] 
Add support of pulsar-kafka-adapter for kafka-0.8 api (#4797)

* Add support pulsar-kafka-adapter for kafka-0.8 api

clean up pulsar-kafka adapter

add tests

add low level consumer

add simple consumer

corrected pulsar-client-kafka_0_8

fix the module name

add batch and partitioned-topic support

fix headers

add getOffset api support

added pulsarOffset request/response

 clean up

* add pulsar-kafka integration test

* use earliestTime offset

* add default serializer/de

3 years agoAdd support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
Xiaobing Fang [Tue, 6 Aug 2019 03:10:43 +0000 (11:10 +0800)] 
Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)

Fixes #4791


Currently the Pulsar Kafka wrapper is using Kafka 0.10.x version. However, there are users who use legacy-kafka version in their system and willing to move to pulsar. This PR provides pulsar-kafka adapter for kafka-api-version 0.9.X. So, this adapter can help users in their migration process from kafka-0.9 to pulsar.

3 years agoPerform Checkstyle analysis in the pulsar-flink module (#4832)
vzhikserg [Mon, 29 Jul 2019 09:05:57 +0000 (11:05 +0200)] 
Perform Checkstyle analysis in the pulsar-flink module (#4832)

* Add maven plugin for style checking. Fix some style violations.

* Fix issues shown by the style checker in the pulsar-flink module

3 years agoTest cleanup and simplification (#4799)
vzhikserg [Thu, 25 Jul 2019 16:14:02 +0000 (18:14 +0200)] 
Test cleanup and simplification (#4799)

* Simplified assert statements in the tests. Switch to usage of static imports in tests. (Part 1)

* Simplify assert statements in the tests and use the appropriate assert statements. Switch to usage of static imports in tests. Remove unused imports (Part 2)

3 years agoFix:PulsarKafkaProducer is not thread safe (#4745)
Xiaobing Fang [Wed, 24 Jul 2019 01:51:38 +0000 (09:51 +0800)] 
Fix:PulsarKafkaProducer is not thread safe (#4745)

fix #4707

3 years agoUpgrade to Mockito 2.x (#4671)
Matteo Merli [Wed, 10 Jul 2019 06:17:07 +0000 (23:17 -0700)] 
Upgrade to Mockito 2.x (#4671)

Upgrading to Mockito 2.28 and PowerMock 2.0. This a pre-step to be able to run CI with Java 11 / 12

3 years agoCleanup in the pulsar-log4j-appender project (#4681)
vzhikserg [Mon, 8 Jul 2019 15:08:56 +0000 (17:08 +0200)] 
Cleanup in the pulsar-log4j-appender project (#4681)

3 years agoSupport Pulsar schema for pulsar kafka client wrapper (#4534)
tuteng [Sun, 7 Jul 2019 16:02:26 +0000 (00:02 +0800)] 
Support Pulsar schema for pulsar kafka client wrapper (#4534)

Fixes https://github.com/apache/pulsar/issues/4228

Master Issue: https://github.com/apache/pulsar/issues/4228

### Motivation

Use Pulsar schema in pulsar kafka client.

### Modifications

Support schema of pulsar for pulsar kafka client

### Verifying this change

Add Unit test

3 years agoIssue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)
Guillaume Rosauro [Tue, 2 Jul 2019 15:51:39 +0000 (17:51 +0200)] 
Issue #4638 : Update Kafka connect-api to version 2.3.0 (#4650)

* Issue #4638: Update Kafka connect-api to version 2.3.0

* remove 'block.on.buffer.full' property (already removed from kafka)

3 years agoBumped version to 2.5.0-SNAPSHOT (#4581)
lipenghui [Tue, 25 Jun 2019 11:59:01 +0000 (19:59 +0800)] 
Bumped version to 2.5.0-SNAPSHOT (#4581)

3 years agoIntroduce batch message container framework and support key based batching container...
lipenghui [Wed, 19 Jun 2019 20:47:48 +0000 (04:47 +0800)] 
Introduce batch message container framework and support key based batching container (#4435)

### Motivation

Introduce batch message container framework to support multiple ways to do message batch.
Currently, pulsar support a most basic batch message container, use the batch message container framework can quickly implement other types batch message container, even users can customize their own batch message container.

Add a new batch message container named BatchMessageKeyBasedContainer to support batching message in key_shared subscription mode.

3 years ago[pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerializer (#4549)
Sijie Guo [Tue, 18 Jun 2019 09:36:44 +0000 (02:36 -0700)] 
[pulsar-kafka] Fix KafkaProducerInterceptorWrapper handles LongSerializer (#4549)


KafkaProducerInterceptorWrapper uses a LongDeserializer for retrieve deserializer


Fix the bug

*Verify this change*

Add unit test to cover the convertion

3 years ago[pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)
Rajan Dhabalia [Mon, 10 Jun 2019 17:03:22 +0000 (10:03 -0700)] 
[pulsar-storm] add option to pass consumerConfiguration to PulsarSpout (#4494)

3 years ago[pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)
Rajan Dhabalia [Sun, 9 Jun 2019 18:07:55 +0000 (11:07 -0700)] 
[pulsar-storm] pulsar-bolt: add option to pass producer-configuration (#4495)

3 years ago[Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl ...
Shivji Kumar Jha [Thu, 6 Jun 2019 15:47:00 +0000 (21:17 +0530)] 
[Issue 4379] [Java Client] Build auth from class and params in PulsarClientImpl (#4381)

* Flink client to accept all pulsar client conf

In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
1. accept all params of client, producer and consumer
2. Keep pace with pulsar-client

* Flink client to accept all pulsar client conf

Added test cases

* Removing commented code

* flink: construct auth when building pulsarsource

* fixed failing tests

* removed unused import

* Added builder defaults for lombok builder
Set Auth from class and params (if set) in PulsarClientImpl.java

* Remove @BUilder.default from attributes where no defaults exist

* Added tests for ClientConfiguration Data builders

* cosmetic changes in code

Co-Authored-By: Sijie Guo <guosijie@gmail.com>
* fixing typo

* Removed test, not true anymore

* Removed lombok builders

* fixed the failing tests

* Because the authentication field is transient, it is not serialized. On desirialization then its null and desirialization crashes with NPE

3 years ago[pulsar-storm] Upgrade storm version to 2.0.0 (#4486)
Rajan Dhabalia [Thu, 6 Jun 2019 06:58:47 +0000 (23:58 -0700)] 
[pulsar-storm] Upgrade storm version to 2.0.0 (#4486)

### Motivation
[Storm-2.0](http://storm.apache.org/2019/05/30/storm200-released.html) has been released now and it has performance improvements. So, upgrading storm version in pulsar-storm so, user can use upgraded pulsar-storm in their topology.

3 years agoAdd missed serialization schema argument for PulsarOutputFormat constructor (#4373)
Like [Wed, 29 May 2019 20:09:30 +0000 (04:09 +0800)] 
Add missed serialization schema argument for PulsarOutputFormat constructor (#4373)

The constructor of PulsarOutputFormat expects a serializationSchema passed in otherwise it's member serializationSchema will be assigned by itself.

3 years agoConfigure static PulsarByteBufAllocator to handle OOM errors (#4196)
Matteo Merli [Wed, 29 May 2019 15:31:47 +0000 (08:31 -0700)] 
Configure static PulsarByteBufAllocator to handle OOM errors (#4196)

* Configure static PulsarByteBufAllocator to handle OOM errors

* Always specify `pulsar.allocator.exit_on_oom` when starting pulsar services

* Reverted metrics back

* Fixed compression tests

* Explicitely set the underlying allocator to netty default

* Fixed shading

3 years ago[pulsar-storm] support reader for pulsar-spout (#4236)
Rajan Dhabalia [Mon, 20 May 2019 20:22:23 +0000 (13:22 -0700)] 
[pulsar-storm] support reader for pulsar-spout (#4236)

* [pulsar-storm] pulsar-spout can use reader to read message without durable subscription

* fix test

3 years ago[Issue 4283][flink] construct auth when building pulsar source (#4284)
Shivji Kumar Jha [Sat, 18 May 2019 12:27:04 +0000 (17:57 +0530)] 
[Issue 4283][flink] construct auth when building pulsar source (#4284)

Fixes #4283

### Motivation

pulsar flink connector now uses ClientConfigData to instantiate pulsar client. Pulsar client composes Authentication which can not be serialized. In an environment with Auth there is no way to set auth in pulsar client.

### Modifications

Keep auth params away from persistence and serialization. Construct auth when building pulsar source

3 years ago[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)
Rajan Dhabalia [Wed, 15 May 2019 10:05:37 +0000 (03:05 -0700)] 
[pulsar-storm] add more metrics to troubleshoot spout throughput (#4280)

### Motivation

Many time user sees lower throughput in pulsar-spout even though standalone consumer can consume such msgRate easily. It would be hard to debug user's topology without enough information so, adding two metrics which can impact spout throughput.
- number of message filed: spout sleeps when it sees failed message so, it's important to have visibility of that count
- number of times spout-thread not found the message in queue: spout topology internally sleeps if it doesn't see any emitted tuple in collector after triggering `nextTuple()` api.

This metrics gives more visibility about consumer throughput.

3 years ago[pulsar-storm] allow option to configure queue size of pulsar-spout-consumer (#4239)
Rajan Dhabalia [Wed, 15 May 2019 06:44:39 +0000 (23:44 -0700)] 
[pulsar-storm] allow option to configure queue size of pulsar-spout-consumer (#4239)

3 years ago[issue 3954] [flink] Use (client/consumer/producer) pojo to pass config to pulsar...
Shivji Kumar Jha [Thu, 9 May 2019 08:31:38 +0000 (14:01 +0530)] 
[issue 3954] [flink] Use (client/consumer/producer) pojo to pass config to pulsar client (#4232)

* Flink client to accept all pulsar client conf

In this patch, we provide handles for flink connecotr to accept ClientConfigurationData, ProducerConfigurationData, ConsumerConfigurationData so flink client can:
1. accept all params of client, producer and consumer
2. Keep pace with pulsar-client

* Flink client to accept all pulsar client conf

Added test cases

* Removing commented code

3 years ago[pulsar-storm] provide auto-unsubscribe option in pulsar-spout (#4238)
Rajan Dhabalia [Thu, 9 May 2019 01:36:56 +0000 (18:36 -0700)] 
[pulsar-storm] provide auto-unsubscribe option in pulsar-spout (#4238)

3 years ago[pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add...
wpl [Mon, 29 Apr 2019 09:57:35 +0000 (04:57 -0500)] 
[pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example (#4143)

### Motivation

upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example

### Modifications

1. upgrade  SparkStreamingPulsarReceiver.java use pulsar-client, remove pulsar-client-1x pom
2. add  simple spark example

3 years ago[pulsar-flink] Add subscription initial position (#4129)
lipenghui [Fri, 26 Apr 2019 15:38:17 +0000 (23:38 +0800)] 
[pulsar-flink] Add subscription initial position (#4129)

### Motivation

Allow user to specify the initial position for consumer source builder.

### Modifications

Add initial position for PulsarConsumerSource.

3 years ago[pulsar-storm] Fix NPE while emitting next tuple (#3991)
Rajan Dhabalia [Tue, 9 Apr 2019 02:09:46 +0000 (19:09 -0700)] 
[pulsar-storm] Fix NPE while emitting next tuple (#3991)

### Motivation

[PulsarSpout] removes messages from [pendingMessageRetries](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L126) but it doesn't remove from the `failedMessages` queue because of that PulsarSpout throws NPE while [emitting next tuple](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L181)

stack-trace with old pulsar-storm lib: 1.20
2019-04-05 18:49:58.240 b.s.util CmsSpout_[1 1] [INFO] Async loop Stacktrace is: {} java.lang.NullPointerException
    at org.apache.pulsar.storm.PulsarSpout.emitNextAvailableTuple(PulsarSpout.java:176)
    at org.apache.pulsar.storm.PulsarSpout.nextTuple(PulsarSpout.java:160)
    at backtype.storm.daemon.executor$fn__7365$fn__7380$fn__7411.invoke(executor.clj:577)
    at backtype.storm.util$async_loop$fn__551.invoke(util.clj:491)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:748)

3 years ago[Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)
Richard Yu [Tue, 2 Apr 2019 16:41:04 +0000 (09:41 -0700)] 
[Issue-2122] [pulsar-client] Adding configuration for backoff strategy (#3848)

Fixes #2122

### Motivation

Current backoff strategy is set by default and is too aggressive. What we should do is allow it to be configurable by the user.

### Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (not sure)

3 years agoissue#3939 : Allow client authentication from pulsar-flink package (#3949)
Shivji Kumar Jha [Tue, 2 Apr 2019 16:02:01 +0000 (21:32 +0530)] 
issue#3939 : Allow client authentication from pulsar-flink package (#3949)

pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally.

Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client.

3 years ago[Flink] Allow to customize PulsarProducer (#3955)
Cristian [Tue, 2 Apr 2019 04:15:44 +0000 (21:15 -0700)] 
[Flink] Allow to customize PulsarProducer (#3955)

This is an improvement over #3894.

Because of how Flink instantiates functions, instead of passing a
custom `PulsarProducer` client we need to pass an object that is
serializable. The current implementation will default to always
call `createProducer()` because `producer` is `transient`, so it will
always be null when Flink creates new instances of the sink.

3 years agoThrow error explicitly on param value not in range (#3950)
Shivji Kumar Jha [Mon, 1 Apr 2019 16:07:35 +0000 (21:37 +0530)] 
Throw error explicitly on param value not in range (#3950)

3 years agoSupport Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer...
Marvin Cai [Thu, 28 Mar 2019 20:33:58 +0000 (13:33 -0700)] 
Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer. (#3911)

* Support Kafka's ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG in PulsarKafkaConsumer.
Apply onConsume in poll() before returning the ConsumerRecords,
apply onCommit in doCommitOffsets() before committing all offsets.

Also apply doc to reflect support for ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG.

* Update error message for applying onConsume and onCommit for interceptors to include the specific interceptor name.

3 years agoSupport Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of messag...
Marvin Cai [Tue, 26 Mar 2019 17:49:57 +0000 (10:49 -0700)] 
Support Kafka's ConsumerConfig.MAX_POLL_RECORDS_CONFIG to config max number of message will return in a single poll. (#3887)

Also update doc to reflect that we already supporting earlist and latest strategy for ConsumerConfig.AUTO_OFFSET_RESET_CONFIG.

3 years ago[flink] Allow to specify a custom Pulsar producer (#3894)
Cristian [Mon, 25 Mar 2019 03:28:38 +0000 (20:28 -0700)] 
[flink] Allow to specify a custom Pulsar producer (#3894)

This is necessary in pretty much any non-trivial use-case. The ability
to control the settings of the Pulsar producer is paramount to
building real-life applications

3 years agoAdd a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig...
Marvin Cai [Wed, 20 Mar 2019 03:27:30 +0000 (20:27 -0700)] 
Add a wrapper around Kafka's ProducerInterceptor to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090 (#3843)

Add a wrapper around Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor` to support Kafka's ProducerConfig.INTERCEPTOR_CLASSES_CONFIG. #1090

The wrapper will try to delegate all call to underlying instance of Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor`  it holds.

When `PulsarKafkaProducer` convert a Kafka's `ProducerRecord` to Pulsar's `Message`, the schema(fixed to type of Schema<byte[]>), key, value, eventTimestamp and partitionID is set.
When doing the delegation, we'll do
Pulsar`Message` -> Kafka's `ProducerRecord` -> invoke underlying Kafka's `org.apache.kafka.clients.producer.ProducerInterceptor#onSend`  -> Pulsar`Message`
It'll try to preserve all the information. Verified through unit test.
For `org.apache.kafka.clients.producer.ProducerInterceptor#onSendAcknowledgement` it'll call `org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement` only partitionID, eventTimestamp, key byte lenth, value byte length will be pass in.

3 years agoFix the loop of consumer poll, so the consumer can cache more than one record in...
se7enkings [Tue, 19 Mar 2019 17:37:01 +0000 (01:37 +0800)] 
Fix the loop of consumer poll, so the consumer can cache more than one record in signal poll. (#3852)

3 years agoSupport Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)
Marvin Cai [Mon, 11 Mar 2019 10:16:58 +0000 (03:16 -0700)] 
Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG. (#3797)


Support Kafka's ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG  #1090

Previously `ProducerBuilder.sendTimeout` was set by parsing Kafka's `ProducerConfig.MAX_BLOCK_MS_CONFIG`.
According to Kafka's [document](https://kafka.apache.org/20/documentation.html) it's for

> Controlling how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block either because the buffer is full or metadata unavailable.

But `ProducerBuilder.sendTimeout`  is for

> If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.

And Kafka's `ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG`, according to the document is for:
> Controlling the maximum amount of time the client will wait for the response of a request.

Which I think would be better fit purpose of Pulsar's `ProducerBuilder.sendTimeout`.

3 years agoAdd support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG (#1090) (#3753)
Marvin Cai [Sat, 9 Mar 2019 01:34:25 +0000 (17:34 -0800)] 
Add support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG (#1090) (#3753)

Add support for Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in PulsarKafkaProducer by utilizing pulsar's ClientBuilder.keepAliveInterval

3 years agoAdded support for "negative acks" in Java client (#3703)
Matteo Merli [Thu, 7 Mar 2019 23:11:10 +0000 (15:11 -0800)] 
Added support for "negative acks" in Java client (#3703)

* Added support for "negative acks" in Java client

* Fixed redelivery delay to be >= than configured

* Fixed redelivery after timeout

* Fixed timeout interval calculation

* Removed the 1.1 nonsense

* Fixed test cleanup

* Avoid failure when passing empty set of msg ids

3 years agoadd reset cousor to a specific publish time (#3622)
冉小龙 [Thu, 28 Feb 2019 14:48:37 +0000 (22:48 +0800)] 
add reset cousor to a specific publish time (#3622)

Signed-off-by: xiaolong.ran ranxiaolong716@gmail.com
Fixes #3446 #3565

Reset the subscription associated with this consumer to a specific publish time.

3 years agoReplace Junit with testng (#3675)
Like [Mon, 25 Feb 2019 16:23:01 +0000 (00:23 +0800)] 
Replace Junit with testng (#3675)

Closes #3663