[
https://issues.apache.org/jira/browse/KAFKA-18986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy updated KAFKA-18986:
-----------------------------------
Description:
In the topology metadata sent from the client to the broker (similar to
subscribe in consumer groups) in KIP-1071, the number of input topics and some
internal topics is left parametric, so that a single application can be
deployed with varying number of partitions. The process of turning a
'parametric' topology into a topology where the number of partitions is defined
for all topics and the presence of all internal topics is validated is called
"configuring the topology". The result of configuring the topology is either a
configured topology, or a validation error with potentially a set of internal
topics that need to be created, or a subset of partitions that are not
co-partitioned but must be (e.g. because they are used in a join).
The configured topology in streams groups is a result of taking the "partition
metadata" (current state of all the topics involved in a topology) and the
topology, both of which are persisted in the consumer offset topic and can be
restored on fail-over. However, the configured topology itself is not stored
(since it can be derived). We still want to keep it in memory, to avoid
recomputing it all the time. Right now, during a heartbeat RPC that either
detects a change in topic metadata or initializes a topology, we configure the
topology and store the topology as "soft state". Also, when partition metadata
/ topology records are replayed from the offset topic, the configured topology
will be derived.
This is improper for the following reasons
- When a heartbeat is handled and the configured topology is initialized, it
may be that the write operations of topology and partition metadata records
fails, which means that we'll end up in an inconsistent state, with an outdated
topology / partition metadata but a new configured topology.
- It can happen that the topology is configured several times - in a heartbeat
handler, and then again when replaying the new records from the consumer offset
topic
- The topology configuration is a non-trivial operation, both in time
complexity and generally in the amount of logic that is being executed.
Ideally, we should not execute this on the replay-path.
To solve these problems, the idea would be to correct key the configured
topology with a hash of the partition metadata and a topology epoch or topology
hash. That means
1. When a heartbeat handler initializes a new topology or detects changed
partition metadata, the computes new partition metadata hash (if necessary) and
we store the configured topology as soft state in the streams group in-memory
representation together, keyed by the topology epoch and the partition metadata
hash.
2. On the replay path, the configured topology is not touched at all.
3. When using the configured topology, both inside the describe-handler or the
heartbeat-handler, we can compare the topology epoch and the partition metadata
hash, to check if the current configured topology (soft state) matches the
partition metadata and topology epoch (hard state). If not, we can configure
the topology.
Fortunately, KIP-1101 introduces the means to generate partition metadata
hashes, and proposes saving only the hash to the consumer offset topic. We can
facilitate these changes to, rather trivially, fix the handling of configured
topologies in streams groups.
was:
In KIP-1071, the number of input topics and some internal topics is left
parametric, so that a single application can be deployed with varying number of
partitions. The process of turning a 'parametric' topology into a topology
where the number of partitions is defined for all topics and the presence of
all internal topics is validated is called "configuring the topology". The
result of configuring the topology is either a configured topology, or a
validation error with potentially a set of internal topics that need to be
created.
The configured topology in streams groups is a result of taking the "partition
metadata" (current state of all the topics involved in a topology) and the
topology, both of which are persisted in the consumer offset topic and can be
restored on fail-over. However, the configured topology itself is not stored
(since it can be derived). We still want to keep it in memory, to avoid
recomputing it all the time. Right now, during a heartbeat RPC that either
detects a change in topic metadata or initializes a topology, we configure the
topology and store the topology as "soft state". Also, when partition metadata
/ topology records are replayed from the offset topic, the configured topology
will be derived.
This is improper for the following reasons
- When a heartbeat is handled and the configured topology is initialized, it
may be that the write operations of topology and partition metadata records
fails, which means that we'll end up in an inconsistent state, with an outdated
topology / partition metadata but a new configured topology.
- It can happen that the topology is configured several times - in a heartbeat
handler, and then again when replaying the new records from the consumer offset
topic
- The topology configuration is a non-trivial operation, both in time
complexity and generally in the amount of logic that is being executed.
Ideally, we should not execute this on the replay-path.
To solve these problems, the idea would be to correct key the configured
topology with a hash of the partition metadata and a topology epoch or topology
hash. That means
1. When a heartbeat handler initializes a new topology or detects changed
partition metadata, the computes new partition metadata hash (if necessary) and
we store the configured topology as soft state in the streams group in-memory
representation together, keyed by the topology epoch and the partition metadata
hash.
2. On the replay path, the configured topology is not touched at all.
3. When using the configured topology, both inside the describe-handler or the
heartbeat-handler, we can compare the topology epoch and the partition metadata
hash, to check if the current configured topology (soft state) matches the
partition metadata and topology epoch (hard state). If not, we can configure
the topology.
Fortunately, KIP-1101 introduces the means to generate partition metadata
hashes, and proposes saving only the hash to the consumer offset topic. We can
facilitate these changes to, rather trivially, fix the handling of configured
topologies in streams groups.
> Properly handle configured topology soft-state using KIP-1101
> --------------------------------------------------------------
>
> Key: KAFKA-18986
> URL: https://issues.apache.org/jira/browse/KAFKA-18986
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Lucas Brutschy
> Priority: Major
>
> In the topology metadata sent from the client to the broker (similar to
> subscribe in consumer groups) in KIP-1071, the number of input topics and
> some internal topics is left parametric, so that a single application can be
> deployed with varying number of partitions. The process of turning a
> 'parametric' topology into a topology where the number of partitions is
> defined for all topics and the presence of all internal topics is validated
> is called "configuring the topology". The result of configuring the topology
> is either a configured topology, or a validation error with potentially a set
> of internal topics that need to be created, or a subset of partitions that
> are not co-partitioned but must be (e.g. because they are used in a join).
> The configured topology in streams groups is a result of taking the
> "partition metadata" (current state of all the topics involved in a topology)
> and the topology, both of which are persisted in the consumer offset topic
> and can be restored on fail-over. However, the configured topology itself is
> not stored (since it can be derived). We still want to keep it in memory, to
> avoid recomputing it all the time. Right now, during a heartbeat RPC that
> either detects a change in topic metadata or initializes a topology, we
> configure the topology and store the topology as "soft state". Also, when
> partition metadata / topology records are replayed from the offset topic, the
> configured topology will be derived.
> This is improper for the following reasons
> - When a heartbeat is handled and the configured topology is initialized, it
> may be that the write operations of topology and partition metadata records
> fails, which means that we'll end up in an inconsistent state, with an
> outdated topology / partition metadata but a new configured topology.
> - It can happen that the topology is configured several times - in a
> heartbeat handler, and then again when replaying the new records from the
> consumer offset topic
> - The topology configuration is a non-trivial operation, both in time
> complexity and generally in the amount of logic that is being executed.
> Ideally, we should not execute this on the replay-path.
> To solve these problems, the idea would be to correct key the configured
> topology with a hash of the partition metadata and a topology epoch or
> topology hash. That means
> 1. When a heartbeat handler initializes a new topology or detects changed
> partition metadata, the computes new partition metadata hash (if necessary)
> and we store the configured topology as soft state in the streams group
> in-memory representation together, keyed by the topology epoch and the
> partition metadata hash.
> 2. On the replay path, the configured topology is not touched at all.
> 3. When using the configured topology, both inside the describe-handler or
> the heartbeat-handler, we can compare the topology epoch and the partition
> metadata hash, to check if the current configured topology (soft state)
> matches the partition metadata and topology epoch (hard state). If not, we
> can configure the topology.
> Fortunately, KIP-1101 introduces the means to generate partition metadata
> hashes, and proposes saving only the hash to the consumer offset topic. We
> can facilitate these changes to, rather trivially, fix the handling of
> configured topologies in streams groups.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)