ruchirvaninasdaq commented on issue #834: URL: https://github.com/apache/camel-kafka-connector/issues/834#issuecomment-756393630
Adding Serializer code also if that helps: ``` public class S3ObjectAvroSerializer implements Serializer<ResponseInputStream<GetObjectResponse>> { private static final Logger LOG = LoggerFactory.getLogger(S3ObjectAvroSerializer.class); private Schema schema; private GenericRecordSerializer recordSerializer; private MessageFactory MessageFactory; public S3ObjectAvroSerializer(Schema schema){ this.schema=schema; this.recordSerializer = new GenericRecordSerializer(this.schema); this.MessageFactory = new MessageFactory(); } public S3ObjectAvroSerializer() throws IOException { Schema.Parser parser = new Schema.Parser(); this.schema= parser.parse(getClass().getResourceAsStream("/avro/schema.avsc")); this.recordSerializer = new GenericRecordSerializer(this.schema); this.MessageFactory = new MessageFactory(); } /** * Create a Kafka serializer for control schema messages. */ public Serializer<GenericRecord> getSerializer() { return new GenericRecordSerializer(this.schema); } @Override public void configure(Map<String, ?> configs, boolean isKey) { } @Override public byte[] serialize(String topic, ResponseInputStream<GetObjectResponse> inputStream) { GenericRecord record = null; try{ record = MessageFactory.parseMessage(inputStream); } catch (Exception e){ LOG.error("Error in Serializer: "+ e ); e.printStackTrace(); } return recordSerializer.doSerialize(topic, record); } @Override public void close() { } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org