[
https://issues.apache.org/jira/browse/KAFKA-18986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy updated KAFKA-18986:
-----------------------------------
Component/s: streams
> Properly handle configured topology soft-state using KIP-1101
> --------------------------------------------------------------
>
> Key: KAFKA-18986
> URL: https://issues.apache.org/jira/browse/KAFKA-18986
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Lucas Brutschy
> Priority: Major
>
> In the topology metadata sent from the client to the broker (similar to
> subscribe in consumer groups) in KIP-1071, the number of input topics and
> some internal topics is left parametric, so that a single application can be
> deployed with varying number of partitions. The process of turning a
> 'parametric' topology into a topology where the number of partitions is
> defined for all topics and the presence of all internal topics is validated
> is called "configuring the topology". The result of configuring the topology
> is either a configured topology, or a validation error with potentially a set
> of internal topics that need to be created, or a subset of partitions that
> are not co-partitioned but must be (e.g. because they are used in a join).
> The configured topology in streams groups is derived from the "partition
> metadata" (current state of all the topics involved in a topology) and the
> topology metadata sent by the client, both of which are persisted in the
> consumer offset topic and can be restored on fail-over. However, the
> configured topology itself is not stored (since it can be derived). We still
> want to keep it in memory, to avoid recomputing it all the time. Right now,
> during a heartbeat RPC that either detects a change in partition metadata or
> initializes a topology, we configure the topology and store the configured
> topology as "soft state". Also, when partition metadata / topology records
> are replayed from the offset topic, the configured topology will be computed.
> This is improper for the following reasons
> 1. When a heartbeat is handled and the configured topology is initialized,
> it may be that the write operations of topology and partition metadata
> records fails, which means that we'll end up in an inconsistent state, with
> an outdated topology / partition metadata but a new configured topology.
> 2. It can happen that the topology is configured several times - in a
> heartbeat handler, and then again when replaying the new records from the
> consumer offset topic
> 3. The topology configuration is a non-trivial operation, both in time
> complexity and generally in the amount of logic that is being executed.
> Ideally, we should not execute this on the replay-path.
> To solve these problems, the idea would be to correct key the configured
> topology with a hash of the partition metadata and a topology epoch or
> topology hash. That means
> 1. When a heartbeat handler initializes a new topology or detects changed
> partition metadata, the computes new partition metadata hash (if necessary)
> and we store the configured topology as soft state in the streams group
> in-memory representation together, keyed by the topology epoch and the
> partition metadata hash.
> 2. On the replay path, the configured topology is not touched at all.
> 3. When using the configured topology, both inside the describe-handler or
> the heartbeat-handler, we can compare the topology epoch and the partition
> metadata hash, to check if the current configured topology (soft state)
> matches the partition metadata and topology epoch (hard state). If not, we
> can configure the topology.
> Fortunately, KIP-1101 introduces the means to generate partition metadata
> hashes, and proposes saving only the hash to the consumer offset topic. We
> can facilitate these changes to, rather trivially, fix the handling of
> configured topologies in streams groups.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)