[
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yazgoo updated KAFKA-17049:
---------------------------
Description:
This follows https://issues.apache.org/jira/browse/KAFKA-10413
When runnning the following script, which
1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
docker stop "$1" || true
docker rm -v -f "$1" || true
}
write_topic() {
# write 200 messages to the topic
json='{"name": "test"}'
docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) |
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092
--topic test_topic$1"
}
launch_minio() {
# Launch Minio (Fake S3)
docker run --network host -d --name minio \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
minio/minio server --console-address :9001 /data
docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
# Start Kafka Connect with S3 Connector
docker run --network host -d --name "kafka-connect$1" \
-e AWS_ACCESS_KEY_ID=minioadmin \
-e AWS_SECRET_ACCESS_KEY=minioadmin \
-e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
-e CONNECT_LISTENERS="http://localhost:808$1" \
-e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
-e CONNECT_REST_PORT="808$1" \
-e CONNECT_GROUP_ID="connect-cluster" \
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
-e
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
-e
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
--entrypoint bash \
confluentinc/cp-kafka-connect:7.6.1 \
-c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
docker volume prune -f
for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka
minio
do
dkill "$container"
done
}
launch_kafka() {
docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d
--name kafka -p 9092:9092 apache/kafka
for i in {1..2}
do
# Create a Kafka topic
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic
"test_topic$i"
write_topic "$i"
done
for topic in connect-configs connect-offsets connect-status
do
# with cleanup.policy=compact, we can't have more than 1 partition
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic
$topic --config cleanup.policy=compact
done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
sleep 5
# Check if Kafka Connect is up
curl http://localhost:8081/ || continue
break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
"name": "s3-connector'"$i"'",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "120",
"topics": "test_topic'"$i"'",
"s3.region": "us-east-1",
"store.url": "http://0.0.0.0:9000",
"s3.bucket.name": "my-minio-bucket",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class":
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE"
}
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
When the script ends, I have one worker with connector #1 tasks, the other one
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
|grep worker_id | sort | uniq -c
120 "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
|grep worker_id | sort | uniq -c
120 "worker_id": "k1:8081"
{code}
Then I wait 3 minutes
And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
|grep worker_id | sort | uniq -c
60 "worker_id": "k2:8082"
60 "worker_id": "k3:8083"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
|grep worker_id | sort | uniq -c
80 "worker_id": "k1:8081"
20 "worker_id": "k2:8082"
20 "worker_id": "k3:8083"
{code}
In the end, we indeed get 80 tasks on each workers, but for distribution
reasons , I think it should be (40, 40, 40) for each connector, because all
task don't do the same amount of work, which will lead to a processing/network
imbalance overall.
In my test I always get the same outcome.
This is consistent with what I see in production.
was:
This follows https://issues.apache.org/jira/browse/KAFKA-10413
When runnning the following script, which
1. runs one worker
2. declares two connectors
3. add two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
docker stop "$1" || true
docker rm -v -f "$1" || true
}
write_topic() {
# write 200 messages to the topic
json='{"name": "test"}'
docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) |
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092
--topic test_topic$1"
}
launch_minio() {
# Launch Minio (Fake S3)
docker run --network host -d --name minio \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
minio/minio server --console-address :9001 /data
docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
# Start Kafka Connect with S3 Connector
docker run --network host -d --name "kafka-connect$1" \
-e AWS_ACCESS_KEY_ID=minioadmin \
-e AWS_SECRET_ACCESS_KEY=minioadmin \
-e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
-e CONNECT_LISTENERS="http://localhost:808$1" \
-e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
-e CONNECT_REST_PORT="808$1" \
-e CONNECT_GROUP_ID="connect-cluster" \
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
-e
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
-e
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
--entrypoint bash \
confluentinc/cp-kafka-connect:7.6.1 \
-c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
docker volume prune -f
for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka
minio
do
dkill "$container"
done
}
launch_kafka() {
docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d
--name kafka -p 9092:9092 apache/kafka
for i in {1..2}
do
# Create a Kafka topic
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic
"test_topic$i"
write_topic "$i"
done
for topic in connect-configs connect-offsets connect-status
do
# with cleanup.policy=compact, we can't have more than 1 partition
docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic
$topic --config cleanup.policy=compact
done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
sleep 5
# Check if Kafka Connect is up
curl http://localhost:8081/ || continue
break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
"name": "s3-connector'"$i"'",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "120",
"topics": "test_topic'"$i"'",
"s3.region": "us-east-1",
"store.url": "http://0.0.0.0:9000",
"s3.bucket.name": "my-minio-bucket",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class":
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "NONE"
}
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
When the script ends, I have one worker with connector #1 tasks, the other one
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
|grep worker_id | sort | uniq -c
120 "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
|grep worker_id | sort | uniq -c
120 "worker_id": "k1:8081"
{code}
Then I wait 3 minutes
And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
|grep worker_id | sort | uniq -c
60 "worker_id": "k2:8082"
60 "worker_id": "k3:8083"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
|grep worker_id | sort | uniq -c
80 "worker_id": "k1:8081"
20 "worker_id": "k2:8082"
20 "worker_id": "k3:8083"
{code}
In the end, we indeed get 80 tasks on each workers, but for distribution
reasons , I think it should be (40, 40, 40) for each connector, because all
task don't do the same amount of work, which will lead to a processing/network
imbalance overall.
In my test I always get the same outcome.
This is consistent with what I see in production.
> unbalanced connectors
> ---------------------
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: yazgoo
> Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
> docker stop "$1" || true
> docker rm -v -f "$1" || true
> }
> write_topic() {
> # write 200 messages to the topic
> json='{"name": "test"}'
> docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) |
> /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092
> --topic test_topic$1"
> }
> launch_minio() {
> # Launch Minio (Fake S3)
> docker run --network host -d --name minio \
> -e MINIO_ROOT_USER=minioadmin \
> -e MINIO_ROOT_PASSWORD=minioadmin \
> minio/minio server --console-address :9001 /data
> docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
> # Start Kafka Connect with S3 Connector
> docker run --network host -d --name "kafka-connect$1" \
> -e AWS_ACCESS_KEY_ID=minioadmin \
> -e AWS_SECRET_ACCESS_KEY=minioadmin \
> -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
> -e CONNECT_LISTENERS="http://localhost:808$1" \
> -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
> -e CONNECT_REST_PORT="808$1" \
> -e CONNECT_GROUP_ID="connect-cluster" \
> -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
> -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
> -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
> -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
> -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
> \
> -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
> -e
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
> -e
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
> \
> -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
> -e
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
> --entrypoint bash \
> confluentinc/cp-kafka-connect:7.6.1 \
> -c "confluent-hub install --no-prompt
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
> docker volume prune -f
> for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka
> minio
> do
> dkill "$container"
> done
> }
> launch_kafka() {
> docker run --network host --hostname localhost --ulimit nofile=65536:65536
> -d --name kafka -p 9092:9092 apache/kafka
> for i in {1..2}
> do
> # Create a Kafka topic
> docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120
> --topic "test_topic$i"
> write_topic "$i"
> done
> for topic in connect-configs connect-offsets connect-status
> do
> # with cleanup.policy=compact, we can't have more than 1 partition
> docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic
> $topic --config cleanup.policy=compact
> done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
> sleep 5
> # Check if Kafka Connect is up
> curl http://localhost:8081/ || continue
> break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
> "name": "s3-connector'"$i"'",
> "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "tasks.max": "120",
> "topics": "test_topic'"$i"'",
> "s3.region": "us-east-1",
> "store.url": "http://0.0.0.0:9000",
> "s3.bucket.name": "my-minio-bucket",
> "s3.part.size": "5242880",
> "flush.size": "3",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
> "schema.generator.class":
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "schema.compatibility": "NONE"
> }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>
>
> When the script ends, I have one worker with connector #1 tasks, the other
> one with connector #2 tasks.
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
> |grep worker_id | sort | uniq -c
> 120 "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
> |grep worker_id | sort | uniq -c
> 120 "worker_id": "k1:8081"
> {code}
>
> Then I wait 3 minutes
> And I get the final state:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks
> |grep worker_id | sort | uniq -c
> 60 "worker_id": "k2:8082"
> 60 "worker_id": "k3:8083"{code}
>
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks
> |grep worker_id | sort | uniq -c
> 80 "worker_id": "k1:8081"
> 20 "worker_id": "k2:8082"
> 20 "worker_id": "k3:8083"
> {code}
>
> In the end, we indeed get 80 tasks on each workers, but for distribution
> reasons , I think it should be (40, 40, 40) for each connector, because all
> task don't do the same amount of work, which will lead to a
> processing/network imbalance overall.
> In my test I always get the same outcome.
> This is consistent with what I see in production.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)