mcforres commented on issue #1543: URL: https://github.com/apache/camel-kafka-connector/issues/1543#issuecomment-2446815351
Dropping a note here on a workaround for this issue. Context, i'm not a java developer, and I'm fairly new to Apache Camel. I'm sure it would be better to expose this as property on the respective connectors that deal with streams (postgres, dynamodb, etc) rather than modifying the actual source task. In my case, I am trying to use the dynamodb streams source connector and experienced the same issue @camwardy described above. With the help of AI, code was generated that reads the stream and converts it into a byte array and/or a string. Validated solution on camel-kafka-connector 3.20.3 tag Main changes were made to the poll() method at https://github.com/apache/camel-kafka-connector/blob/f0e2ca924773165f322932a5d7c2eed332ae6220/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L197 I replaced that entire method with this code block `@Override public synchronized List<SourceRecord> poll() { LOG.debug("Number of records waiting an ack: {}", freeSlots.capacity() - freeSlots.size()); final long startPollEpochMilli = Instant.now().toEpochMilli(); long remaining = remaining(startPollEpochMilli, maxPollDuration); long collectedRecords = 0L; List<SourceRecord> records = new ArrayList<>(); while (collectedRecords < maxBatchPollSize && freeSlots.size() >= topics.length && remaining > 0) { Exchange exchange = consumer.receive(remaining); if (exchange == null) { // Nothing received, abort and return what we received so far break; } LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); // TODO: see if there is a better way to use sourcePartition and sourceOffset Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; // Retrieve the message body Object messageBodyValue = exchange.getMessage().getBody(); // Declare the 'value' variable Object value; try { if (messageBodyValue instanceof InputStream) { InputStream is = (InputStream) messageBodyValue; ByteArrayOutputStream buffer = new ByteArrayOutputStream(); int nRead; byte[] data = new byte[16384]; // Adjust buffer size as needed while ((nRead = is.read(data, 0, data.length)) != -1) { buffer.write(data, 0, nRead); } buffer.flush(); byte[] bytes = buffer.toByteArray(); // Decide whether to use bytes or convert to String based on your data //value = bytes; // Use bytes[] for binary data value = new String(bytes, StandardCharsets.UTF_8); // Use String for text data } else if (messageBodyValue instanceof StreamCache) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ((StreamCache) messageBodyValue).writeTo(baos); byte[] bytes = baos.toByteArray(); // Decide whether to use bytes or convert to String based on your data //value = bytes; // Use bytes[] for binary data value = new String(bytes, StandardCharsets.UTF_8); // Use String for text data } else { value = messageBodyValue; } } catch (IOException e) { LOG.error("Error reading message body", e); value = null; } final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; final Schema messageBodySchema = value != null ? SchemaHelper.buildSchemaBuilderForType(value) : null; final long timestamp = calculateTimestamp(exchange); // Take into account cached Camel streams if (value instanceof StreamCache) { StreamCache sc = (StreamCache) value; // Reset to ensure the cache is ready before sending it in the record (useful for SMTs) sc.reset(); } for (String singleTopic : topics) { CamelSourceRecord camelRecord = new CamelSourceRecord( sourcePartition, sourceOffset, singleTopic, null, messageKeySchema, messageHeaderKey, messageBodySchema, value, // Use 'value' here timestamp ); if (mapHeaders) { if (exchange.getMessage().hasHeaders()) { setAdditionalHeaders(camelRecord, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); } } if (mapProperties) { if (exchange.hasProperties()) { setAdditionalHeaders(camelRecord, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); } } TaskHelper.logRecordContent(LOG, loggingLevel, camelRecord); Integer claimCheck = freeSlots.remove(); camelRecord.setClaimCheck(claimCheck); exchangesWaitingForAck[claimCheck] = exchange; LOG.debug("Record: {}, containing data from exchange: {}, is associated with claim check number: {}", camelRecord, exchange, claimCheck); records.add(camelRecord); } collectedRecords++; remaining = remaining(startPollEpochMilli, maxPollDuration); } return records.isEmpty() ? null : records; } ` I added these imports to the file: `import java.io.InputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets;` These changes allowed the source connector to produce the actual records on the topic, rather than the InputStreamCache object. Hopefully somebody more familiar with the code base can take this and provide a better solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org