[
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642261#comment-16642261
]
Ryanne Dolan commented on KAFKA-6989:
-------------------------------------
Samza faced this same problem in the past. The solution was to introduce an
AsyncTask and processAsync() API that works like this:
- in processAsync(), you get a record and a callback to call when you're done
with the record.
- processAsync() is run in a separate thread pool from the consumer
- on commit(), the framework only commits offsets up to the first outstanding
record
- the framework blocks and won't poll() if the thread pool is full
- the thread pool size is configurable with a default of 1, which essentially
means records are processed one-by-one and in-order
- if you increase the thread pool size, you can have N outstanding records, but
order is no longer guaranteed
These semantics would also work for Streams just fine.
> Support Async Processing in Streams
> -----------------------------------
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Reporter: Guozhang Wang
> Priority: Major
> Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve
> embarrassing parallelism and good isolation. However there are a couple
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need
> to access a remote REST api, read / write to an external store, or do a heavy
> disk IO operation that may result in high latency. Current threading model
> would block any other records before this record's done, waiting on the
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an
> external system, and this call failed), and failed records are moved into a
> separate "retry" topic. How can you process such failed records in a scalable
> way? For example, imagine you need to implement a retry policy such as "retry
> with exponential backoff". Here, you have the problem that 1. you can't
> really pause processing a single record because this will pause the
> processing of the full stream (bottleneck!) and 2. there is no
> straight-forward way to "sort" failed records based on their "next retry
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay
> re-processing this event for 5 minutes") as mentioned in 2), another is for
> implementing a scheduler: e.g. do some additional operations later based on
> this processed record. based on Zalando Dublin, for example, are implementing
> a distributed web crawler. Note that although this feature can be handled in
> punctuation, it is not well aligned with our current offset committing
> behavior, which always advance the offset once the record has been done
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to
> implement multi-threading processing themselves: users can always do async
> processing in the Processor API by spawning a thread-poll, e.g. but the key
> is that the offset to be committed should be only advanced with such async
> processing is done. This is a light-weight approach: we provide all the
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take
> care of the offsets committing internally. This is a heavy-weight approach:
> the API may not cover all async scenarios, but it is a easy way to cover the
> rest majority scenarios, and users do not need to worry of internal
> implementation details such as offsets and fault tolerance.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)