10elements opened a new issue, #1720:
URL: https://github.com/apache/camel-kafka-connector/issues/1720

   I'm trying to use the CamelAwsddbsinkSinkConnector v3.20.3 in MSK 
Connect(v2.7.1) to load AVRO messages from a kafka topic into a Dynamodb table, 
the AVRO schema is stored in a confluent schema registry. I noticed no matter 
what I tried, the connector seemed to failed to find the correct fields in the 
message after the deserialization and hence failed to put the message into the 
Dynamodb table.
   
   This is my connector config:
   ```
   {
     "connector.class": 
"org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
     "transforms.tojson.type": 
"org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform",
     "transforms.tojson.converter.type": "value",
     "topics": "test_orders",
     "tasks.max": "1",
     "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": "true",
     "transforms": "tojson",
     "camel.sink.contentLogLevel": "DEBUG",
     "value.converter.schema.registry.url": 
"https://svcs--schemaregistry.euc1.prvw.ktdev.io";,
     "camel.kamelet.aws-ddb-sink.operation": "PutItem",
     "camel.kamelet.aws-ddb-sink.table": "etl--prvw--euc1--test-orders",
     "value.converter": "io.confluent.connect.avro.AvroConverter",
     "camel.kamelet.aws-ddb-sink.region": "eu-central-1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
   ```
   I read from 
https://camel.apache.org/camel-kamelets/4.10.x/aws-ddb-sink.html#_expected_data_format_for_sink
 that the expected input data should be a JSON, so I tried to add the 
`SchemaAndStructToJsonTransform` SMT that I found out from this 
[issue](https://github.com/apache/camel-kafka-connector/issues/843) after the 
Avro Converter to convert the kafka connect struct after the deserialization 
into a JSON before pass it to the sink connector, but still, the connector 
failed with an error like this:
   ```
   
   [Worker-0c1825974767d8637] 
software.amazon.awssdk.services.dynamodb.model.DynamoDbException: One or more 
parameter values were invalid: Missing the key order_id in the item (Service: 
DynamoDb, Status Code: 400, Request ID: 
V2V0KK8U36SVSQ8EE9GSK75ARJVV4KQNSO5AEMVJF66Q9ASUAAJG)
   --
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
     | [Worker-0c1825974767d8637] at 
software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.putItem(DefaultDynamoDbClient.java:4243)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.component.aws2.ddb.PutItemCommand.execute(PutItemCommand.java:32)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.component.aws2.ddb.Ddb2Producer.process(Ddb2Producer.java:55)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.processor.Pipeline.process(Pipeline.java:165)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
     | [Worker-0c1825974767d8637] at 
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
     | [Worker-0c1825974767d8637] at 
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
     | [Worker-0c1825974767d8637] at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
     | [Worker-0c1825974767d8637] at 
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
     | [Worker-0c1825974767d8637] at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
     | [Worker-0c1825974767d8637] at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
     | [Worker-0c1825974767d8637] at 
java.base/java.lang.Thread.run(Thread.java:829)
   ```
   So to me it looks like the converted JSON is malformed and don't have the 
correct field that's supposed to exist, I then consumed the topic and 
deserialized and print out the message in a separate consumer, and I can 
confirm that the AVRO messages are correct and have all the required fields 
such as `order_id`:
   ```
   2025-06-10 15:00:46,632 - __main__ - INFO - Connected to Schema Registry at 
http://localhost:8081
   2025-06-10 15:00:47,154 - httpx - INFO - HTTP Request: GET 
http://localhost:8081/subjects/test_orders-value/versions/latest "HTTP/1.1 200 
OK"
   2025-06-10 15:00:47,156 - __main__ - INFO - Retrieved schema for subject 
'test_orders-value', version 1 from registry.
   2025-06-10 15:00:47,240 - __main__ - INFO - Subscribed to topic: test_orders
   2025-06-10 15:00:47,240 - __main__ - INFO - Starting to consume messages. 
Press Ctrl+C to stop.
   2025-06-10 15:00:47,285 - botocore.credentials - INFO - Found credentials in 
shared credentials file: ~/.aws/credentials
   2025-06-10 15:00:55,138 - httpx - INFO - HTTP Request: GET 
http://localhost:8081/schemas/ids/399 "HTTP/1.1 200 OK"
   2025-06-10 15:00:55,140 - __main__ - INFO - Message 1: Partition 0, Offset 0
   {'order_id': 256795, 'ordered_at': '2025-05-22T01:11:12.953333', 
'product_id': 3351, 'quantity': 9, 'customer_id': 99025, 'customer_name': 
'Metro Shipping Co'}
   ```
   
   Can someone confirm if the CamelAwsddbsinkSinkConnector is even able to work 
with AVRO data? If it does, am I missing anything in my connector config in 
order to get it work correctly?
   
   P.S I can't use the latest version of CamelAwsddbsinkSinkConnector because 
I'm using MSK connect which is still on 2.7.1 and uses java 11.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to