[
https://issues.apache.org/jira/browse/KAFKA-13601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anil Dasari updated KAFKA-13601:
--------------------------------
Description:
Exactly once in s3 connector with scheduled rotation and field partitioner can
be achieved with consumer offset sync' commit.
Currently, WorkerSinkTask committing the consumer offsets asynchronously.
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean
closing, final int seqno) \{
log.info("{} Committing offsets", this);
if (closing) \{
doCommitSync(offsets, seqno);
} else \{
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception error) {
lastCommittedOffsets = offsets;
onCommitCompleted(error, seqno);
}
};
consumer.commitAsync(offsets, cb);
}
}
Add config to sink to chose sync' offset commit
was:
Exactly once in s3 connector with scheduled rotation and field partitioner can
be achieved with consumer offset sync' commit.
Currently, WorkerSinkTask committing the consumer offsets asynchronously.
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean
closing, final int seqno) \{
log.info("{} Committing offsets", this);
if (closing) \{
doCommitSync(offsets, seqno);
} else \{
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception error) {
lastCommittedOffsets = offsets;
onCommitCompleted(error, seqno);
}
};
consumer.commitAsync(offsets, cb);
}
}
Add config to sink to chose sync' offset commit
> Add option to support sync offset commit in Kafka Connect Sink
> --------------------------------------------------------------
>
> Key: KAFKA-13601
> URL: https://issues.apache.org/jira/browse/KAFKA-13601
> Project: Kafka
> Issue Type: New Feature
> Components: KafkaConnect
> Reporter: Anil Dasari
> Priority: Major
>
> Exactly once in s3 connector with scheduled rotation and field partitioner
> can be achieved with consumer offset sync' commit.
> Currently, WorkerSinkTask committing the consumer offsets asynchronously.
> private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean
> closing, final int seqno) \{
> log.info("{} Committing offsets", this);
> if (closing) \{
> doCommitSync(offsets, seqno);
> } else \{
> OffsetCommitCallback cb = new OffsetCommitCallback() {
> @Override
> public void onComplete(Map<TopicPartition, OffsetAndMetadata>
> offsets, Exception error) {
> lastCommittedOffsets = offsets;
> onCommitCompleted(error, seqno);
> }
> };
> consumer.commitAsync(offsets, cb);
> }
> }
> Add config to sink to chose sync' offset commit
--
This message was sent by Atlassian Jira
(v8.20.1#820001)