10elements opened a new issue, #1720: URL: https://github.com/apache/camel-kafka-connector/issues/1720
I'm trying to use the CamelAwsddbsinkSinkConnector v3.20.3 in MSK Connect(v2.7.1) to load AVRO messages from a kafka topic into a Dynamodb table, the AVRO schema is stored in a confluent schema registry. I noticed no matter what I tried, the connector seemed to failed to find the correct fields in the message after the deserialization and hence failed to put the message into the Dynamodb table. This is my connector config: ``` { "connector.class": "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector", "transforms.tojson.type": "org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform", "transforms.tojson.converter.type": "value", "topics": "test_orders", "tasks.max": "1", "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": "true", "transforms": "tojson", "camel.sink.contentLogLevel": "DEBUG", "value.converter.schema.registry.url": "https://svcs--schemaregistry.euc1.prvw.ktdev.io", "camel.kamelet.aws-ddb-sink.operation": "PutItem", "camel.kamelet.aws-ddb-sink.table": "etl--prvw--euc1--test-orders", "value.converter": "io.confluent.connect.avro.AvroConverter", "camel.kamelet.aws-ddb-sink.region": "eu-central-1", "key.converter": "org.apache.kafka.connect.storage.StringConverter" } ``` I read from https://camel.apache.org/camel-kamelets/4.10.x/aws-ddb-sink.html#_expected_data_format_for_sink that the expected input data should be a JSON, so I tried to add the `SchemaAndStructToJsonTransform` SMT that I found out from this [issue](https://github.com/apache/camel-kafka-connector/issues/843) after the Avro Converter to convert the kafka connect struct after the deserialization into a JSON before pass it to the sink connector, but still, the connector failed with an error like this: ``` [Worker-0c1825974767d8637] software.amazon.awssdk.services.dynamodb.model.DynamoDbException: One or more parameter values were invalid: Missing the key order_id in the item (Service: DynamoDb, Status Code: 400, Request ID: V2V0KK8U36SVSQ8EE9GSK75ARJVV4KQNSO5AEMVJF66Q9ASUAAJG) -- | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) | [Worker-0c1825974767d8637] at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) | [Worker-0c1825974767d8637] at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.putItem(DefaultDynamoDbClient.java:4243) | [Worker-0c1825974767d8637] at org.apache.camel.component.aws2.ddb.PutItemCommand.execute(PutItemCommand.java:32) | [Worker-0c1825974767d8637] at org.apache.camel.component.aws2.ddb.Ddb2Producer.process(Ddb2Producer.java:55) | [Worker-0c1825974767d8637] at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66) | [Worker-0c1825974767d8637] at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172) | [Worker-0c1825974767d8637] at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59) | [Worker-0c1825974767d8637] at org.apache.camel.processor.Pipeline.process(Pipeline.java:165) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392) | [Worker-0c1825974767d8637] at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108) | [Worker-0c1825974767d8637] at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176) | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148) | [Worker-0c1825974767d8637] at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191) | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240) | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) | [Worker-0c1825974767d8637] at java.base/java.lang.Thread.run(Thread.java:829) ``` So to me it looks like the converted JSON is malformed and don't have the correct field that's supposed to exist, I then consumed the topic and deserialized and print out the message in a separate consumer, and I can confirm that the AVRO messages are correct and have all the required fields such as `order_id`: ``` 2025-06-10 15:00:46,632 - __main__ - INFO - Connected to Schema Registry at http://localhost:8081 2025-06-10 15:00:47,154 - httpx - INFO - HTTP Request: GET http://localhost:8081/subjects/test_orders-value/versions/latest "HTTP/1.1 200 OK" 2025-06-10 15:00:47,156 - __main__ - INFO - Retrieved schema for subject 'test_orders-value', version 1 from registry. 2025-06-10 15:00:47,240 - __main__ - INFO - Subscribed to topic: test_orders 2025-06-10 15:00:47,240 - __main__ - INFO - Starting to consume messages. Press Ctrl+C to stop. 2025-06-10 15:00:47,285 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials 2025-06-10 15:00:55,138 - httpx - INFO - HTTP Request: GET http://localhost:8081/schemas/ids/399 "HTTP/1.1 200 OK" 2025-06-10 15:00:55,140 - __main__ - INFO - Message 1: Partition 0, Offset 0 {'order_id': 256795, 'ordered_at': '2025-05-22T01:11:12.953333', 'product_id': 3351, 'quantity': 9, 'customer_id': 99025, 'customer_name': 'Metro Shipping Co'} ``` Can someone confirm if the CamelAwsddbsinkSinkConnector is even able to work with AVRO data? If it does, am I missing anything in my connector config in order to get it work correctly? P.S I can't use the latest version of CamelAwsddbsinkSinkConnector because I'm using MSK connect which is still on 2.7.1 and uses java 11. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org