[
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-10005:
----------------------------------
Description:
In Kafka Streams we have two restoration callbacks:
* RestoreCallback (BatchingRestoreCallback): specified per-store via
registration to specify the logic of applying a batch of records read from the
changelog to the store. Used for both updating standby tasks and restoring
active tasks.
* RestoreListener: specified per-instance via `setRestoreListener`, to specify
the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
As we can see these two callbacks are for quite different purposes, however
today we allow user's to register a per-store RestoreCallback which is also
implementing the RestoreListener. Such weird mixing is actually motivated by
Streams internal usage to enable / disable bulk loading inside RocksDB. For
user's however this is less meaningful to specify a callback to be a listener
since the `onRestoreStart / End` has the storeName passed in, so that users can
just define different listening logic if needed for different stores.
On the other hand, this mixing of two callbacks enforces Streams to check
internally if the passed in per-store callback is also implementing listener,
and if yes trigger their calls, which increases the complexity. Besides, toggle
rocksDB for bulk loading requires us to open / close / reopen / reclose 4 times
during the restoration which could also be costly.
Given that we have KIP-441 in place, I think we should consider different ways
other than toggle bulk loading during restoration for Streams (e.g. using
different threads for restoration).
The proposal for this ticket is to completely decouple the listener from
callback -- i.e. we would not presume users passing in a callback function that
implements both RestoreCallback and RestoreListener, and also for RocksDB we
replace the bulk loading mechanism with other ways of optimization:
https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/
was:
In Kafka Streams we have two restoration callbacks:
* RestoreCallback (BatchingRestoreCallback): specified per-store via
registration to specify the logic of applying a batch of records read from the
changelog to the store. Used for both updating standby tasks and restoring
active tasks.
* RestoreListener: specified per-instance via `setRestoreListener`, to specify
the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
As we can see these two callbacks are for quite different purposes, however
today we allow user's to register a per-store RestoreCallback which is also
implementing the RestoreListener. Such weird mixing is actually motivated by
Streams internal usage to enable / disable bulk loading inside RocksDB. For
user's however this is less meaningful to specify a callback to be a listener
since the `onRestoreStart / End` has the storeName passed in, so that users can
just define different listening logic if needed for different stores.
On the other hand, this mixing of two callbacks enforces Streams to check
internally if the passed in per-store callback is also implementing listener,
and if yes trigger their calls, which increases the complexity.
Given that we have KIP-441 in place, I think we should consider different ways
other than toggle bulk loading during restoration for Streams (e.g. using
different threads for restoration).
The proposal for this ticket is to completely decouple the listener from
callback -- i.e. we would not presume users passing in a callback function that
implements both RestoreCallback and RestoreListener, and also for RocksDB we
replace the bulk loading mechanism with other ways of optimization:
https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/
> Decouple RestoreListener from RestoreCallback and not enable bulk loading for
> RocksDB
> -------------------------------------------------------------------------------------
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
> Issue Type: Improvement
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Priority: Major
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via
> registration to specify the logic of applying a batch of records read from
> the changelog to the store. Used for both updating standby tasks and
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however
> today we allow user's to register a per-store RestoreCallback which is also
> implementing the RestoreListener. Such weird mixing is actually motivated by
> Streams internal usage to enable / disable bulk loading inside RocksDB. For
> user's however this is less meaningful to specify a callback to be a listener
> since the `onRestoreStart / End` has the storeName passed in, so that users
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check
> internally if the passed in per-store callback is also implementing listener,
> and if yes trigger their calls, which increases the complexity. Besides,
> toggle rocksDB for bulk loading requires us to open / close / reopen /
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different
> ways other than toggle bulk loading during restoration for Streams (e.g.
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from
> callback -- i.e. we would not presume users passing in a callback function
> that implements both RestoreCallback and RestoreListener, and also for
> RocksDB we replace the bulk loading mechanism with other ways of
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/
--
This message was sent by Atlassian Jira
(v8.3.4#803005)