Yogesh created KAFKA-16201:
------------------------------
Summary: Kafka exception -
org.apache.kafka.common.errors.NotLeaderOrFollowerException
Key: KAFKA-16201
URL: https://issues.apache.org/jira/browse/KAFKA-16201
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 3.6.1
Environment: AWS EKS
Reporter: Yogesh
I am deploying Kafka inside Kubernetes cluster in HA mode (multiple brokers).
The deployment consists of
Kubernetes
Kafka 3.6.1
Refer to the following files used in the deployment
Dockerfile
{code:java}
FROM eclipse-temurin:17.0.9_9-jdk-jammy
ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin
LABEL name="kafka" version=${KAFKA_VERSION}
RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
\
&& tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
&& rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
&& rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"] {code}
entrypoint.sh
{code:java}
#!/bin/bash
NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"
ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"
CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
if [[ $i != $REPLICAS ]]; then
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done
mkdir -p $SHARE_DIR/$NODE_ID
if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi
sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e
"s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+"
\
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties
JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"
echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >>
/opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >>
/opt/kafka/config/kraft/server.properties
echo -e
"\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT"
>> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >>
/opt/kafka/config/kraft/server.properties
kafka-storage.sh format -t $CLUSTER_ID -c
/opt/kafka/config/kraft/server.properties
exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties {code}
Kafka.yaml
{code:java}
apiVersion: v1
kind: Namespace
metadata:
name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: kafka-pv-volume
labels:
type: local
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-pv-claim
namespace: kafka-kraft
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
namespace: kafka-kraft
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
namespace: kafka-kraft
spec:
serviceName: kafka-svc
replicas: 5
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
volumes:
- name: kafka-storage
persistentVolumeClaim:
claimName: kafka-pv-claim
containers:
- name: kafka-container
image: myimage/kafka-kraft:1.0
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '5'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: kafka-kraft
- name: SHARE_DIR
value: /mnt/kafka
volumeMounts:
- name: kafka-storage
mountPath: /mnt/kafka {code}
After the deployment all the containers are up and running. I then connect the
broker using following command
{code:java}
.\kafka-topics.bat --bootstrap-server
kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092
--command-config producer.properties --topic hello --create
--replication-factor 5 {code}
producer.properties
{code:java}
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required username=admin password=admin-secret;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
metadata.max.age.ms=1000 {code}
A prompt is displayed to enter a messag. Upon a sample text it throws following
error.
[Producer clientId=console-producer] Received invalid metadata error in produce
request on partition hello2-1 due to
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests
intended only for the leader, this error indicates that the broker is not the
current leader. For requests intended for any replica, this error indicates
that the broker is not a replica of the topic partition.. Going to request
metadata update now
What I have tried so far
* Tried zookeeper and kraft mode
* Tried deleting and recreating the topics (This works randomly)
Unfortunately, the problem persists and not able to produce messages.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)