[ 
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936813#comment-17936813
 ] 

Donny Nadolny commented on KAFKA-19012:
---------------------------------------

Here's our client config:


{code:java}
partitioner.class = <custom partitioner>
max.request.size = 50331648
request.timeout.ms = 500
delivery.timeout.ms = 10000
max.block.ms = 500
max.in.flight.requests.per.connection = 1
linger.ms = 10
batch.size = 524288 {code}
Our custom partitioner extends 
{{org.apache.kafka.clients.producer.internals.DefaultPartitioner}} and for the 
{{partition}} and {{onNewBatch}} methods might change the {{Cluster}} object 
(well, create a copy) to artificially reduce the number of partitions available 
in certain situations.

Here's our broker config:
{code:java}
listeners=CONTROLLER://:9091,PLAINTEXT://:9092,SSL://:9093,INTERNAL_PLAINTEXT://:9094,INTERNAL_SSL://:9095
listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,INTERNAL_PLAINTEXT:PLAINTEXT,INTERNAL_SSL:SSL
advertised.listeners=CONTROLLER://xx.xx.xx.xx:9091,PLAINTEXT://xx.xx.xx.xx:9092,SSL://xx.xx.xx.xx:9093,INTERNAL_PLAINTEXT://xx.xx.xx.xx:9094,INTERNAL_SSL://xx.xx.xx.xx:9095
control.plane.listener.name=CONTROLLER
inter.broker.listener.name=INTERNAL_SSL
#value omitted
host.name=xxx
broker.rack=xxx
num.network.threads=16
num.io.threads=16
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
queued.max.requests=500
log.dirs=xxx
num.partitions=8
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
log.cleaner.enable=true
offsets.retention.minutes=10080
replica.fetch.wait.max.ms=100
replica.socket.timeout.ms=1000
replica.lag.time.max.ms=5000
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
num.replica.fetchers=18
default.replication.factor=3
offsets.topic.replication.factor=3
zookeeper.connect=xxx
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
inter.broker.protocol.version=3.3
log.message.format.version=3.3
auto.create.topics.enable=false
auto.leader.rebalance.enable=true
leader.imbalance.per.broker.percentage=20
delete.topic.enable=true
min.insync.replicas=2
max.connections.per.ip=2048
unclean.leader.election.enable=false
quota.consumer.default=20971520
quota.producer.default=20971520
replica.fetch.max.bytes=16780000
log.message.timestamp.type=LogAppendTime
transaction.max.timeout.ms=3600000
transaction.remove.expired.transaction.cleanup.interval.ms=86400000
metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter
cruise.control.metrics.topic=_cruise-control.metrics
cruise.control.metrics.reporter.bootstrap.servers=localhost:9093
cruise.control.metrics.reporter.security.protocol=SSL
cruise.control.metrics.reporter.ssl.keystore.location=xxx
cruise.control.metrics.reporter.ssl.keystore.password=xxx
cruise.control.metrics.reporter.ssl.keystore.type=JKS
cruise.control.metrics.reporter.ssl.truststore.location=xxx
cruise.control.metrics.reporter.ssl.truststore.password=xxx
cruise.control.metrics.reporter.ssl.truststore.type=JKS
cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=
ssl.keystore.location=xxx
ssl.keystore.password=xxx
ssl.keystore.type=JKS
ssl.key.password=xxx
ssl.truststore.location=xxx
ssl.truststore.password=xxx
ssl.truststore.type=JKS
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2
ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384
authorizer.class.name=<custom authorizer class>
principal.builder.class=<custom principal builder class>
super.users=xxx
ssl.endpoint.identification.algorithm=
group.min.session.timeout.ms=500 {code}
Some values redacted with x's

> Messages ending up on the wrong topic
> -------------------------------------
>
>                 Key: KAFKA-19012
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19012
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.2.3, 3.8.1
>            Reporter: Donny Nadolny
>            Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic 
> than what they were published to. That is, we publish a message to topicA and 
> consumers of topicB see it and fail to parse it because the message contents 
> are meant for topicA. This has happened for various topics. 
> We've begun adding a header with the intended topic (which we get just by 
> reading the topic from the record that we're about to pass to the OSS client) 
> right before we call producer.send, this header shows the correct topic 
> (which also matches up with the message contents itself). Similarly we're 
> able to use this header and compare it to the actual topic to prevent 
> consuming these misrouted messages, but this is still concerning.
> Some details:
>  - This happens rarely: it happened approximately once per 10 trillion 
> messages for a few months, though there was a period of a week or so where it 
> happened more frequently (once per 1 trillion messages or so)
>  - It often happens in a small burst, eg 2 or 3 messages very close in time 
> (but from different hosts) will be misrouted
>  - It often but not always coincides with some sort of event in the cluster 
> (a broker restarting or being replaced, network issues causing errors, etc). 
> Also these cluster events happen quite often with no misrouted messages
>  - We run many clusters, it has happened for several of them
>  - There is no pattern between intended and actual topic, other than the 
> intended topic tends to be higher volume ones (but I'd attribute that to 
> there being more messages published -> more occurrences affecting it rather 
> than it being more likely per-message)
>  - It only occurs with clients that are using a non-zero linger
>  - Once it happened with two sequential messages, both were intended for 
> topicA but both ended up on topicB, published by the same host (presumably 
> within the same linger batch)
>  - Most of our clients are 3.2.3 and it has only affected those, most of our 
> brokers are 3.2.3 but it has also happened with a cluster that's running 
> 3.8.1 (but I suspect a client rather than broker problem because of it never 
> happening with clients that use 0 linger)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to