bframke opened a new issue #743: Kafka Component Writing Messages with ProducerTemplate URL: https://github.com/apache/camel-quarkus/issues/743 We currently are trying to send a high load of messages from an Test over an Camel Application with the Kafka Component. We use the ProducerTemplate to send Body and Headers over the given route for the Topic but most of the times get stuck. Its like the ProducerTemplate just wont get finished with its Task. We also tried to put the sendBodyAndHeaders into extra Threads so that we don't clog one Thread, but we still got stuck after some messages. **Test-CURL** `for i in {1..100}; do echo "$i"; curl -v -d '{"Event": "Event $i" }' -H "content-type: application/json" http://localhost:8081/topic/curltopic3/event; done` Maybe it could also be the problem that we just use one route with an everchanging list of topics? **Camel-Route Building** ``` public void onInit(@Observes StartupEvent evt) { _currentContext = _camelMain.getCamelContext(); _logger.info("context : {}", _currentContext); } public void camelStarted(@Observes CamelMainEvents.BeforeStart evt) throws Exception { _currentContext = _camelMain.getCamelContext(); RoutesBuilder builder = new RouteBuilder() { @Override public void configure() { from("direct:start"). id("toKafka"). log("${body}"). to("kafka:"+_topicList+"?brokers="+_brokerList); } }; _currentContext.addRoutes(builder); } ``` **REST-Endpoint we call to send Events** ```@POST @Path("topic/{topicId}/event") @Consumes(MediaType.APPLICATION_JSON) public Response sendEventToTopic(String body, @PathParam("topicId") String topic, @Context HttpHeaders httpHeaders) { ProducerTemplate producerTemplate = _currentContext.createProducerTemplate(); Map<String, Object> headers = new HashMap<>(); Map<String, Object> innerHeaders = new HashMap<>(); filterKafkaHeaders(httpHeaders, innerHeaders); createStandardHeadersIfNeeded(innerHeaders); headers.put(KafkaConstants.PARTITION_KEY, "0"); headers.put(KafkaConstants.KEY, "1"); headers.put(KafkaConstants.OVERRIDE_TOPIC, topic); _logger.info("keaders for kafka: {}", headers); _logger.info("headers for payload: {}", innerHeaders); producerTemplate.sendBodyAndHeaders("direct:start", new MessageEnvelope(innerHeaders, body), headers); return Response.ok(Response.Status.OK.toString()).build(); } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services