arthurdm edited a comment on issue #2116:
URL: https://github.com/apache/camel-k/issues/2116#issuecomment-797047908


   Thanks @oscerd!   I think a very useful sample would show how to create a 
`DataSet` that took into consideration the incoming payload.  Here's the 
snippet I was trying to get to work [1].  I couldn't get my `DataSet` to be 
registered.  Keep getting this error [2].
   
   **[1]**
   ```java
   public class FlinkSample extends RouteBuilder implements Processor {
   
       @Override
       public void configure() throws Exception {
   
           // Receive a msg from the configured Kafka topic
           from("kafka:sample.topic")
               .log("Received body: ${body}")
               .split(body().tokenize("\n"))
               .process(this)
           
.to("flink:dataSet?dataSet=#myDataSet&dataSetCallback=#myDataSetCallback")
               .log("Response body: ${body}");
   
       }
   
       @BindToRegistry
       public  DataSetCallback<String> myDataSetCallback () {
           return new DataSetCallback<String>() {
               @Override
               public String onDataSet(DataSet ds, Object... payloads) {
                   return "this is the result";
               }
            };
       }
   
       @Override
       public void process(Exchange exchange) throws Exception {
           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
           DataSet<String> myDataSet = 
env.fromElements(exchange.getIn().toString());
           exchange.getContext().getRegistry().bind("myDataSet", DataSet.class, 
myDataSet);
       }
   
   }
   ```
   
   **[2]**
   ```
   Failed to start application (with profile prod): 
org.apache.camel.NoSuchBeanException: No bean could be found in the registry 
for: myDataSet of type: org.apache.flink.api.java.DataSet
        at 
org.apache.camel.support.CamelContextHelper.mandatoryLookupAndConvert(CamelContextHelper.java:253)
        at 
org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:290)
        at 
org.apache.camel.support.EndpointHelper.resolveReferenceParameter(EndpointHelper.java:250)
        at 
org.apache.camel.support.component.PropertyConfigurerSupport.property(PropertyConfigurerSupport.java:53)
        at 
org.apache.camel.component.flink.FlinkEndpointConfigurer.configure(FlinkEndpointConfigurer.java:26)
        at 
org.apache.camel.support.PropertyBindingSupport.setSimplePropertyViaConfigurer(PropertyBindingSupport.java:703)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to