[
https://issues.apache.org/jira/browse/KAFKA-15190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17744472#comment-17744472
]
Joe Wreschnig edited comment on KAFKA-15190 at 7/19/23 7:49 AM:
----------------------------------------------------------------
[~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe
lots of task shuffling when using deterministic client IDs; when I inject a
fake process ID via the file it's perfectly stable. (In both I've also set
acceptable.recovery.lag to MAX_VALUE so the assignments should be as otherwise
"stateless" as possible, as far as I understand the algorithm.) I'm not too
familiar with the code but although {{StreamsPartitionAssignor}} sometimes
calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it
really is the process ID. I'm also not sure the {{client.id}} *must* be unique
so trying to reuse it as-is for this may not work.
[~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much
time I will have for "getting the work done" reasonably soon. The meat of the
proposal would be:
{quote}One new configuration option is added and another's behavior is modified:
- A new streams configuration option {{process.id}} is added which may be set
to a UUID; if provided it overrides the use of {{group.instance.id}}
and the generation of a random process ID.
- If {{process.id}} is not set and a static membership ID has been set (i.e.
using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}}), the process
ID is generated deterministically using the application ID and that
{{group.instance.id}}.
The presence of a persisted {{kafka-streams-process-metadata}} file overrides
these options; and if none of these options are provided and no file is found a
new random process ID is generated.
{quote}
If only introducing {{process.id}} requires a KIP, reusing
{{group.instance.id}} is really all we need to solve our specific issue. The
workaround I have at the moment is:
{code:java}
public static void injectProcessID(String stateDir, Properties config) {
String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
String instanceID =
config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
if (appID == null || instanceID == null) {
logger.warn("application and instance IDs must be set for a stable
process ID");
return;
}
UUID processID = UUID.nameUUIDFromBytes​((appID + "\000" +
instanceID).getBytes());
// ... write the file ...
{code}
was (Author: JIRAUSER280731):
[~mjsax], setting {{client.id}} doesn't seem to be sufficient, I still observe
lots of task shuffling when using deterministic client IDs; when I inject a
fake process ID via the file it's perfectly stable. (In both I've also set
acceptable.recovery.lag to MAX_VALUE so the assignments should be as otherwise
"stateless" as possible, as far as I understand the algorithm.) I'm not too
familiar with the code but although {{StreamsPartitionAssignor}} sometimes
calls it a client ID and sometimes a process ID it's a {{UUID}} so I assume it
really is the process ID. I'm also not sure the {{client.id}} *must* be unique
so trying to reuse it as-is for this may not work.
[~ableegoldman] I'm reluctant to put together a KIP as I'm not sure how much
time I will have for "getting the work done" reasonably soon. The meat of the
proposal would be:
{quote}One new configuration option is added and another's behavior is modified:
- A new streams configuration option {{process.id}} is added which may be set
to a UUID; if provided it overrides the use of {{group.instance.id}}
and the generation of a random process ID.
- If {{process.id}} is not set and a static membership ID has been set (i.e.
using {{StreamsConfig.consumerPrefix}} with {{group.instance.id}), the process
ID is generated deterministically using the application ID and that
{{{}group.instance.id{}}}.
The presence of a persisted {{kafka-streams-process-metadata}} file overrides
these options; and if none of these options are provided and no file is found a
new random process ID is generated.
{quote}
If only introducing {{process.id}} requires a KIP, reusing
{{group.instance.id}} is really all we need to solve our specific issue. The
workaround I have at the moment is:
{code:java}
public static void injectProcessID(String stateDir, Properties config) {
String appID = config.getProperty(StreamsConfig.APPLICATION_ID_CONFIG);
String instanceID =
config.getProperty(StreamsConfig.consumerPrefix(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
if (appID == null || instanceID == null) {
logger.warn("application and instance IDs must be set for a stable
process ID");
return;
}
UUID processID = UUID.nameUUIDFromBytes​((appID + "\000" +
instanceID).getBytes());
// ... write the file ...
{code}
> Allow configuring a streams process ID
> --------------------------------------
>
> Key: KAFKA-15190
> URL: https://issues.apache.org/jira/browse/KAFKA-15190
> Project: Kafka
> Issue Type: Wish
> Components: streams
> Reporter: Joe Wreschnig
> Priority: Major
> Labels: needs-kip
>
> We run our Kafka Streams applications in containers with no persistent
> storage, and therefore the mitigation of persisting process ID the state
> directly in KAFKA-10716 does not help us avoid shuffling lots of tasks during
> restarts.
> However, we do have a persistent container ID (from a Kubernetes
> StatefulSet). Would it be possible to expose a configuration option to let us
> set the streams process ID ourselves?
> We are already using this ID as our group.instance.id - would it make sense
> to have the process ID be automatically derived from this (plus
> application/client IDs) if it's set? The two IDs seem to have overlapping
> goals of identifying "this consumer" across restarts.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)