mcforres commented on issue #1543:
URL: 
https://github.com/apache/camel-kafka-connector/issues/1543#issuecomment-2446815351

   Dropping a note here on a workaround for this issue.  Context, i'm not a 
java developer, and I'm fairly new to Apache Camel.  I'm sure it would be 
better to expose this as property on the respective connectors that deal with 
streams (postgres, dynamodb, etc) rather than modifying the actual source task.
   
   In my case, I am trying to use the dynamodb streams source connector and 
experienced the same issue @camwardy described above.  
   
   With the help of AI, code was generated that reads the stream and converts 
it into a byte array and/or a string.  Validated solution on 
camel-kafka-connector 3.20.3 tag
   
   Main changes were made to the poll() method at 
https://github.com/apache/camel-kafka-connector/blob/f0e2ca924773165f322932a5d7c2eed332ae6220/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L197
   
   I replaced that entire method with this code block
   `@Override
   public synchronized List<SourceRecord> poll() {
       LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - 
freeSlots.size());
       final long startPollEpochMilli = Instant.now().toEpochMilli();
   
       long remaining = remaining(startPollEpochMilli, maxPollDuration);
       long collectedRecords = 0L;
   
       List<SourceRecord> records = new ArrayList<>();
       while (collectedRecords < maxBatchPollSize && freeSlots.size() >= 
topics.length && remaining > 0) {
           Exchange exchange = consumer.receive(remaining);
           if (exchange == null) {
               // Nothing received, abort and return what we received so far
               break;
           }
   
           LOG.debug("Received Exchange {} with Message {} from Endpoint {}", 
exchange.getExchangeId(),
                   exchange.getMessage().getMessageId(), 
exchange.getFromEndpoint());
   
           // TODO: see if there is a better way to use sourcePartition and 
sourceOffset
           Map<String, String> sourcePartition = 
Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
           Map<String, String> sourceOffset = 
Collections.singletonMap("position", exchange.getExchangeId());
   
           final Object messageHeaderKey = camelMessageHeaderKey != null ? 
exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
   
           // Retrieve the message body
           Object messageBodyValue = exchange.getMessage().getBody();
   
           // Declare the 'value' variable
           Object value;
   
           try {
               if (messageBodyValue instanceof InputStream) {
                   InputStream is = (InputStream) messageBodyValue;
                   ByteArrayOutputStream buffer = new ByteArrayOutputStream();
                   int nRead;
                   byte[] data = new byte[16384]; // Adjust buffer size as 
needed
   
                   while ((nRead = is.read(data, 0, data.length)) != -1) {
                       buffer.write(data, 0, nRead);
                   }
   
                   buffer.flush();
                   byte[] bytes = buffer.toByteArray();
   
                   // Decide whether to use bytes or convert to String based on 
your data
                   //value = bytes; // Use bytes[] for binary data
                   value = new String(bytes, StandardCharsets.UTF_8); // Use 
String for text data
               } else if (messageBodyValue instanceof StreamCache) {
                   ByteArrayOutputStream baos = new ByteArrayOutputStream();
                   ((StreamCache) messageBodyValue).writeTo(baos);
                   byte[] bytes = baos.toByteArray();
   
                   // Decide whether to use bytes or convert to String based on 
your data
                   //value = bytes; // Use bytes[] for binary data
                   value = new String(bytes, StandardCharsets.UTF_8); // Use 
String for text data
               } else {
                   value = messageBodyValue;
               }
           } catch (IOException e) {
               LOG.error("Error reading message body", e);
               value = null;
           }
   
           final Schema messageKeySchema = messageHeaderKey != null ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
           final Schema messageBodySchema = value != null ? 
SchemaHelper.buildSchemaBuilderForType(value) : null;
   
           final long timestamp = calculateTimestamp(exchange);
   
           // Take into account cached Camel streams
           if (value instanceof StreamCache) {
               StreamCache sc = (StreamCache) value;
               // Reset to ensure the cache is ready before sending it in the 
record (useful for SMTs)
               sc.reset();
           }
   
           for (String singleTopic : topics) {
               CamelSourceRecord camelRecord = new CamelSourceRecord(
                   sourcePartition,
                   sourceOffset,
                   singleTopic,
                   null,
                   messageKeySchema,
                   messageHeaderKey,
                   messageBodySchema,
                   value, // Use 'value' here
                   timestamp
               );
   
               if (mapHeaders) {
                   if (exchange.getMessage().hasHeaders()) {
                       setAdditionalHeaders(camelRecord, 
exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
                   }
               }
   
               if (mapProperties) {
                   if (exchange.hasProperties()) {
                       setAdditionalHeaders(camelRecord, 
exchange.getProperties(), PROPERTY_CAMEL_PREFIX);
                   }
               }
   
               TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord);
               Integer claimCheck = freeSlots.remove();
               camelRecord.setClaimCheck(claimCheck);
               exchangesWaitingForAck[claimCheck] = exchange;
               LOG.debug("Record: {}, containing data from exchange: {}, is 
associated with claim check number: {}",
                   camelRecord, exchange, claimCheck);
               records.add(camelRecord);
           }
           collectedRecords++;
           remaining = remaining(startPollEpochMilli, maxPollDuration);
       }
   
       return records.isEmpty() ? null : records;
   }
   `
   
   
   I added these imports to the file:
   `import java.io.InputStream;
   import java.io.ByteArrayOutputStream;
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;`
   
   These changes allowed the source connector to produce the actual records on 
the topic, rather than the InputStreamCache object.
   
   Hopefully somebody more familiar with the code base can take this and 
provide a better solution.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to