thjaeckle opened a new issue, #11703:
URL: https://github.com/apache/iceberg/issues/11703

   ### Query engine
   
   _No response_
   
   ### Question
   
   Hi Iceberg community.
   
   I currently try out the to the Iceberg project moved Kafka Connect connector.
   I got it working in a way that when starting Kafka Connect in distributed 
mode and creating an iceberg sink, it consumes the messages from the configured 
Kafka topic and writes into the configured AWS S3 bucket, updating the 
configured AWS Glue catalog with the schema.  
   It basically writes one big parquet file, containing all of the messages in 
the topic.
   
   So far so good. It however seems to then do "nothing" more .. 
   When I restart Kafka connect or just the iceberg sink connector, it 
re-processes again all messages in the topic.
   
   ACLs are configured in the used Confluent Kafka, so all topics and group ids 
have to have a special "prefix".  
   I therefore manually created the Kafka topics.  
   
   And I see that the Kafka "control" topic for the Iceberg connector does not 
receive any messages.  
   The logs however indicate no issue - I however also never see the log 
message:
   
https://github.com/apache/iceberg/blob/36140b819c60743b39176e28a63ab588df445329/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Coordinator.java#L265-L270
   
   And also no other INFO/WARN logs which would indicate a misconfiguration.
   
   My iceberg sink configuration is:
   ```json
   {
         "config.providers": "env",
         "config.providers.env.class": 
"org.apache.kafka.common.config.provider.EnvVarConfigProvider",
         "allow.auto.create.topics": "false",
         "name": "iceberg-sink-connector",
         "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
         "tasks.max": "2",
         "topics": "${env:DITTO_EVENTS_TOPIC}",
         "errors.tolerance": "all",
         "errors.log.enable": "true",
         "errors.log.include.messages": "true",
         "errors.deadletterqueue.topic.name": "${env:DITTO_EVENTS_DLT_TOPIC}",
         "errors.deadletterqueue.context.headers.enable": "true",
         "transforms": "dropContext",
         "transforms.dropContext.type": 
"org.apache.kafka.connect.transforms.ReplaceField$Value",
         "transforms.dropContext.exclude": "_context",
         "iceberg.tables": "bronze_foo_model_v2,bronze_foo_model_v3",
         "iceberg.tables.dynamic-enabled": "false",
         "iceberg.tables.route-field": "definition",
         "iceberg.tables.default-id-columns": "thingId",
         "iceberg.tables.auto-create-enabled": "true",
         "iceberg.tables.evolve-schema-enabled": "true",
         "iceberg.tables.schema-force-optional": "true",
         "iceberg.table.bronze_foo.model_v2.route-regex": 
"^.*/model-2\\.\\d\\.\\d\\.tm\\.jsonld$",
         "iceberg.table.bronze_foo.model_v3.route-regex": 
"^.*/model-3\\.\\d\\.\\d\\.tm\\.jsonld$",
         "iceberg.catalog.catalog-impl": 
"org.apache.iceberg.aws.glue.GlueCatalog",
         "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
         "iceberg.catalog.warehouse": "${env:ICEBERG_CATALOG_WAREHOUSE}",
         "iceberg.catalog.client.region": "eu-central-1",
         "iceberg.connect.group-id": 
"${env:DITTO_EVENTS_ICEBERG_CONNECT_GROUPID}",
         "iceberg.control.topic": "${env:DITTO_EVENTS_ICEBERG_CONTROL_TOPIC}",
         "iceberg.control.group-id-prefix": 
"${env:DITTO_EVENTS_ICEBERG_CONTROL_GROUPID_PREFIX}",
         "iceberg.control.commit.interval-ms": "300000",
         "iceberg.kafka.security.protocol": "${env:CONNECT_SECURITY_PROTOCOL}",
         "iceberg.kafka.ssl.endpoint.identification.algorithm": 
"${env:CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM}",
         "iceberg.kafka.sasl.mechanism": "${env:CONNECT_SASL_MECHANISM}",
         "iceberg.kafka.sasl.jaas.config": "${env:CONNECT_SASL_JAAS_CONFIG}",
         "value.converter.schemas.enable": "false",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "key.converter": "org.apache.kafka.connect.storage.StringConverter",
         "schemas.enable": "false"
       }
   ```
   
   I tried enabling TRACE logs, I however did not find any suspicious there as 
well ..
   
   Do you have any suggestions how to troubleshoot this or any ideas what I 
might do wrong?  
   Thanks a lot in advance


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to