bframke opened a new issue #743: Kafka Component Writing Messages with 
ProducerTemplate
URL: https://github.com/apache/camel-quarkus/issues/743
 
 
   We currently are trying to send a high load of messages from an Test over an 
Camel Application with the Kafka Component. We use the ProducerTemplate to send 
Body and Headers over the given route for the Topic but most of the times get 
stuck. Its like the ProducerTemplate just wont get finished with its Task. 
   
   We also tried to put the sendBodyAndHeaders into extra Threads so that we 
don't clog one Thread, but we still got stuck after some messages.
   
   **Test-CURL**
   `for i in {1..100}; do echo "$i"; curl -v -d '{"Event": "Event $i" }' -H 
"content-type: application/json" http://localhost:8081/topic/curltopic3/event; 
done`
   
   Maybe it could also be the problem that we just use one route with an 
everchanging list of topics?
   
   **Camel-Route Building**
   ``` 
       public void onInit(@Observes StartupEvent evt) {
           _currentContext = _camelMain.getCamelContext();
           _logger.info("context : {}", _currentContext);
       }
   
       public void camelStarted(@Observes CamelMainEvents.BeforeStart evt) 
throws Exception {
           _currentContext = _camelMain.getCamelContext();
   
           RoutesBuilder builder = new RouteBuilder() {
               @Override
               public void configure() {
                   from("direct:start").
                           id("toKafka").
                           log("${body}").
                           to("kafka:"+_topicList+"?brokers="+_brokerList);
               }
           };
   
           _currentContext.addRoutes(builder);
       }
   ```
   
   **REST-Endpoint we call to send Events**
   ```@POST
       @Path("topic/{topicId}/event")
       @Consumes(MediaType.APPLICATION_JSON)
       public Response sendEventToTopic(String body,
                            @PathParam("topicId") String topic,
                            @Context HttpHeaders httpHeaders) {
           ProducerTemplate producerTemplate = 
_currentContext.createProducerTemplate();
           Map<String, Object> headers = new HashMap<>();
           Map<String, Object> innerHeaders = new HashMap<>();
   
           filterKafkaHeaders(httpHeaders, innerHeaders);
           createStandardHeadersIfNeeded(innerHeaders);
   
           headers.put(KafkaConstants.PARTITION_KEY, "0");
           headers.put(KafkaConstants.KEY, "1");
           headers.put(KafkaConstants.OVERRIDE_TOPIC, topic);
   
           _logger.info("keaders for kafka: {}", headers);
           _logger.info("headers for payload: {}", innerHeaders);
   
           producerTemplate.sendBodyAndHeaders("direct:start", new 
MessageEnvelope(innerHeaders, body), headers);
   
           return Response.ok(Response.Status.OK.toString()).build();
       }
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to