[
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939739#comment-17939739
]
Donny Nadolny commented on KAFKA-19012:
---------------------------------------
Here is the code for the custom partitioner we use. An explanation of what it
does:
Normally when you repartition a topic, there is a race condition that results
in what is effectively data loss if you have consumers that are using offset
reset = latest. Say you go from 4 to 8 partitions, and the producer sees the
new partitions and publishes to them before the consumer group has rebalanced
and begun consuming from the new partitions. Shortly after that, the consumer
group rebalances and begins consuming but because it uses offset reset = latest
it will skip over those messages that were already published. To avoid that
(and for some other reasons too) we have this code which comes in to effect
when we're in the process of repartitioning a topic. First we create the new
partitions (in the example above readPartitions would go from 4 to 8, but
writePartitions would remain 4) and we allow consumers to begin consuming and
that's when this code takes effect, even though there are 8 partitions for the
purposes of producing we'll only publish to the original 4 partitions. Once all
consumers have rebalanced, we can update writePartitions to 8 and allow traffic
to flow through to all partitions. Many (maybe all?) of these misroutings have
occurred when we are not in the process of repartitioning a topic, i.e.
readPartitions == writePartitions for both the original and misrouted topic so
{{effectiveCluster}} doesn't change the cluster at all.
{code:java}
package com.stripe.kafkatools.kproxy.partitioner
import java.util.concurrent.atomic.AtomicReference
import java.util.{Map => JMap}
import com.stripe.kafkatools.config.{ClusterName, Reloading}
import com.stripe.kafkatools.kproxy.config.KproxyConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.internals.DefaultPartitioner
import org.apache.kafka.common.Cluster
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
import scala.collection._
import scala.reflect.ClassTag
object KproxyPartitioner {
private val ClusterName = "CLUSTER_NAME"
private val ReloadingConfig = "RELOADING_KAFKA_CONFIG"
def config(cluster: ClusterName, kafkaConfig: Reloading[KproxyConfig]):
Map[String, Any] =
Map(
KproxyPartitioner.ClusterName -> cluster.name,
KproxyPartitioner.ReloadingConfig -> kafkaConfig,
ProducerConfig.PARTITIONER_CLASS_CONFIG ->
classOf[KproxyPartitioner].getName
)
// only for testing
def apply(cluster: ClusterName, config: Reloading[KproxyConfig]):
KproxyPartitioner = {
val partitioner = new KproxyPartitioner()
val partitionerConfig = KproxyPartitioner.config(cluster, config)
partitioner.configure(partitionerConfig.asJava)
partitioner
}
}
/**
* For records with unspecified partition writes it to partitions in range [0,
`partitions`)
* where `partitions` is read from `KproxyConfig`. This ensures no data loss at
consumer during topic repartition.
*
*
* Partitioning Strategy:
* <ul>
* <li> If a partition is specified in a record, use it
* <li> If no partition but a key is present choose a partition in range [0,
`partitions`) based on a has of the key
* <li> If no partition or key is present choose the sticky partition that
changes when the batch is full.
*</ul>
*/
class KproxyPartitioner extends DefaultPartitioner {
private[this] val logger = LoggerFactory.getLogger(getClass)
// 2.15. SHOULD NOT use "var" as shared state
private val kafkaConfigRef = new AtomicReference[Reloading[KproxyConfig]]()
private val kafkaClusterNameRef = new AtomicReference[ClusterName]()
private def effectiveCluster(topic: String, cluster: Cluster): Cluster = {
val writePartitions = kafkaConfigRef
.get()
.current()
.topics(topic)
.partitions(kafkaClusterNameRef.get())
.writable
val effectiveCluster =
if (writePartitions < cluster.partitionCountForTopic(topic)) {
val updatedPartitions =
cluster.partitionsForTopic(topic).asScala.filter(_.partition() <
writePartitions)
new Cluster(
cluster.clusterResource.clusterId,
cluster.nodes,
updatedPartitions.asJavaCollection,
cluster.unauthorizedTopics,
cluster.invalidTopics,
cluster.internalTopics,
cluster.controller
)
} else
cluster
effectiveCluster
}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
override def partition(
topic: String,
key: Any,
keyBytes: Array[Byte],
value: Any,
valueBytes: Array[Byte],
cluster: Cluster
): Int = {
val newCluster = effectiveCluster(topic, cluster)
super.partition(topic, key, keyBytes, value, valueBytes, newCluster)
}
override def configure(configs: JMap[String, _]): Unit = {
def get[A: ClassTag](key: String): A =
configs.asScala.get(key) match {
case Some(value: A) => value
case _ => sys.error(s"No value for $key")
}
kafkaConfigRef.set(get[Reloading[KproxyConfig]](KproxyPartitioner.ReloadingConfig))
kafkaClusterNameRef.set(ClusterName(get[String](KproxyPartitioner.ClusterName)))
logger.info(s"Configs passed to KproxyPartitioner: ${kafkaConfigRef.get()}")
}
/**
* Applicable in case of batched writing without a specified key
* @param topic
* @param cluster
* @param prevPartition
*/
override def onNewBatch(topic: String, cluster: Cluster, prevPartition: Int):
Unit = {
val newCluster = effectiveCluster(topic, cluster)
super.onNewBatch(topic, newCluster, prevPartition)
()
}
} {code}
Here's the interceptor class which is very rarely used, it's pretty self
explanatory:
{code:java}
package com.stripe.kafkatools.clients
import github.gphat.censorinus.DogStatsDClient
import org.apache.kafka.clients.producer.{ProducerInterceptor, ProducerRecord,
RecordMetadata}
import java.util.{Map => JMap}
class ProducerMetricsInterceptor[K, V] extends ProducerInterceptor[K, V] {
var metricsTags: List[String] = Nil
private val successful = "success:true"
private val unsuccessful = "success:false"
private val stats: DogStatsDClient = statsClient
private val totalMetric = "produce.interceptor.total"
override def configure(configs: JMap[String, _]): Unit = {
val clientId = produceClientId(configs).getOrElse("no-client-id")
metricsTags = s"client_id:$clientId" :: Nil
}
override def onSend(record: ProducerRecord[K, V]): ProducerRecord[K, V] = {
stats.increment(totalMetric, tags = metricsTags)
try {
record.headers.add(
TimestampHeaders.Key,
TimestampHeaders.fromDouble(TimestampHeaders.getCurrentTimestamp())
)
} catch {
case _: IllegalStateException =>
// Records may be in a read-only state if we've already attempted
sending once
// If we don't catch this, it'll log a failure (but not fail the send)
()
}
record
}
override def onAcknowledgement(metadata: RecordMetadata, exception:
Exception): Unit =
if (exception == null)
stats.increment(totalMetric, tags = successful :: metricsTags)
else
stats.increment(totalMetric, tags = unsuccessful :: metricsTags)
override def close(): Unit = ()
} {code}
> Messages ending up on the wrong topic
> -------------------------------------
>
> Key: KAFKA-19012
> URL: https://issues.apache.org/jira/browse/KAFKA-19012
> Project: Kafka
> Issue Type: Bug
> Components: clients, producer
> Affects Versions: 3.2.3, 3.8.1
> Reporter: Donny Nadolny
> Assignee: Kirk True
> Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic
> than what they were published to. That is, we publish a message to topicA and
> consumers of topicB see it and fail to parse it because the message contents
> are meant for topicA. This has happened for various topics.
> We've begun adding a header with the intended topic (which we get just by
> reading the topic from the record that we're about to pass to the OSS client)
> right before we call producer.send, this header shows the correct topic
> (which also matches up with the message contents itself). Similarly we're
> able to use this header and compare it to the actual topic to prevent
> consuming these misrouted messages, but this is still concerning.
> Some details:
> - This happens rarely: it happened approximately once per 10 trillion
> messages for a few months, though there was a period of a week or so where it
> happened more frequently (once per 1 trillion messages or so)
> - It often happens in a small burst, eg 2 or 3 messages very close in time
> (but from different hosts) will be misrouted
> - It often but not always coincides with some sort of event in the cluster
> (a broker restarting or being replaced, network issues causing errors, etc).
> Also these cluster events happen quite often with no misrouted messages
> - We run many clusters, it has happened for several of them
> - There is no pattern between intended and actual topic, other than the
> intended topic tends to be higher volume ones (but I'd attribute that to
> there being more messages published -> more occurrences affecting it rather
> than it being more likely per-message)
> - It only occurs with clients that are using a non-zero linger
> - Once it happened with two sequential messages, both were intended for
> topicA but both ended up on topicB, published by the same host (presumably
> within the same linger batch)
> - Most of our clients are 3.2.3 and it has only affected those, most of our
> brokers are 3.2.3 but it has also happened with a cluster that's running
> 3.8.1 (but I suspect a client rather than broker problem because of it never
> happening with clients that use 0 linger)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)