[
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936813#comment-17936813
]
Donny Nadolny commented on KAFKA-19012:
---------------------------------------
Here's our client config:
{code:java}
partitioner.class = <custom partitioner>
max.request.size = 50331648
request.timeout.ms = 500
delivery.timeout.ms = 10000
max.block.ms = 500
max.in.flight.requests.per.connection = 1
linger.ms = 10
batch.size = 524288 {code}
Our custom partitioner extends
{{org.apache.kafka.clients.producer.internals.DefaultPartitioner}} and for the
{{partition}} and {{onNewBatch}} methods might change the {{Cluster}} object
(well, create a copy) to artificially reduce the number of partitions available
in certain situations.
Here's our broker config:
{code:java}
listeners=CONTROLLER://:9091,PLAINTEXT://:9092,SSL://:9093,INTERNAL_PLAINTEXT://:9094,INTERNAL_SSL://:9095
listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,INTERNAL_PLAINTEXT:PLAINTEXT,INTERNAL_SSL:SSL
advertised.listeners=CONTROLLER://xx.xx.xx.xx:9091,PLAINTEXT://xx.xx.xx.xx:9092,SSL://xx.xx.xx.xx:9093,INTERNAL_PLAINTEXT://xx.xx.xx.xx:9094,INTERNAL_SSL://xx.xx.xx.xx:9095
control.plane.listener.name=CONTROLLER
inter.broker.listener.name=INTERNAL_SSL
#value omitted
host.name=xxx
broker.rack=xxx
num.network.threads=16
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
queued.max.requests=500
log.dirs=xxx
num.partitions=8
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
log.cleaner.enable=true
offsets.retention.minutes=10080
replica.fetch.wait.max.ms=100
replica.socket.timeout.ms=1000
replica.lag.time.max.ms=5000
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
num.replica.fetchers=18
default.replication.factor=3
offsets.topic.replication.factor=3
zookeeper.connect=xxx
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
inter.broker.protocol.version=3.3
log.message.format.version=3.3
auto.create.topics.enable=false
auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=20
delete.topic.enable=true
min.insync.replicas=2
max.connections.per.ip=2048
unclean.leader.election.enable=false
quota.consumer.default=20971520
quota.producer.default=20971520
replica.fetch.max.bytes=16780000
log.message.timestamp.type=LogAppendTime
transaction.max.timeout.ms=3600000
transaction.remove.expired.transaction.cleanup.interval.ms=86400000
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
cruise.control.metrics.topic=_cruise-control.metrics
cruise.control.metrics.reporter.bootstrap.servers=localhost:9093
cruise.control.metrics.reporter.security.protocol=SSL
cruise.control.metrics.reporter.ssl.keystore.location=xxx
cruise.control.metrics.reporter.ssl.keystore.password=xxx
cruise.control.metrics.reporter.ssl.keystore.type=JKS
cruise.control.metrics.reporter.ssl.truststore.location=xxx
cruise.control.metrics.reporter.ssl.truststore.password=xxx
cruise.control.metrics.reporter.ssl.truststore.type=JKS
cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=
ssl.keystore.location=xxx
ssl.keystore.password=xxx
ssl.keystore.type=JKS
ssl.key.password=xxx
ssl.truststore.location=xxx
ssl.truststore.password=xxx
ssl.truststore.type=JKS
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384
authorizer.class.name=<custom authorizer class>
principal.builder.class=<custom principal builder class>
super.users=xxx
ssl.endpoint.identification.algorithm=
group.min.session.timeout.ms=500 {code}
Some values redacted with x's
> Messages ending up on the wrong topic
> -------------------------------------
>
> Key: KAFKA-19012
> URL: https://issues.apache.org/jira/browse/KAFKA-19012
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 3.2.3, 3.8.1
> Reporter: Donny Nadolny
> Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic
> than what they were published to. That is, we publish a message to topicA and
> consumers of topicB see it and fail to parse it because the message contents
> are meant for topicA. This has happened for various topics.
> We've begun adding a header with the intended topic (which we get just by
> reading the topic from the record that we're about to pass to the OSS client)
> right before we call producer.send, this header shows the correct topic
> (which also matches up with the message contents itself). Similarly we're
> able to use this header and compare it to the actual topic to prevent
> consuming these misrouted messages, but this is still concerning.
> Some details:
> - This happens rarely: it happened approximately once per 10 trillion
> messages for a few months, though there was a period of a week or so where it
> happened more frequently (once per 1 trillion messages or so)
> - It often happens in a small burst, eg 2 or 3 messages very close in time
> (but from different hosts) will be misrouted
> - It often but not always coincides with some sort of event in the cluster
> (a broker restarting or being replaced, network issues causing errors, etc).
> Also these cluster events happen quite often with no misrouted messages
> - We run many clusters, it has happened for several of them
> - There is no pattern between intended and actual topic, other than the
> intended topic tends to be higher volume ones (but I'd attribute that to
> there being more messages published -> more occurrences affecting it rather
> than it being more likely per-message)
> - It only occurs with clients that are using a non-zero linger
> - Once it happened with two sequential messages, both were intended for
> topicA but both ended up on topicB, published by the same host (presumably
> within the same linger batch)
> - Most of our clients are 3.2.3 and it has only affected those, most of our
> brokers are 3.2.3 but it has also happened with a cluster that's running
> 3.8.1 (but I suspect a client rather than broker problem because of it never
> happening with clients that use 0 linger)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)