davsclaus commented on a change in pull request #4091:
URL: https://github.com/apache/camel/pull/4091#discussion_r470942687



##########
File path: 
components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsProducer.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import 
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import 
org.apache.camel.component.azure.eventhubs.operations.EventHubsProducerOperations;
+import org.apache.camel.support.DefaultAsyncProducer;
+
+public class EventHubsProducer extends DefaultAsyncProducer {
+
+    private EventHubProducerAsyncClient producerAsyncClient;
+    private EventHubsProducerOperations producerOperations;
+
+    public EventHubsProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // create the client
+        producerAsyncClient = 
EventHubsClientFactory.createEventHubProducerAsyncClient(getEndpoint().getConfiguration());
+
+        // create our operations
+        producerOperations = new 
EventHubsProducerOperations(producerAsyncClient, getConfiguration());
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            return producerOperations.sendEvents(exchange, callback);
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
+        }
+
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // shutdown async client
+        producerAsyncClient.close();

Review comment:
       The != null here

##########
File path: 
components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsEndpoint.java
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.models.ErrorContext;
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
+
+/**
+ * The azure-eventhubs component that integrates Azure Event Hubs using AMQP 
protocol. Azure EventHubs is a highly scalable publish-subscribe service that
+ * can ingest millions of events per second and stream them to multiple 
consumers.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "azure-eventhubs", title = 
"Azure Event Hubs", syntax = "azure-eventhubs:namespace/eventHubName", category 
= {
+        Category.CLOUD, Category.MESSAGING })
+public class EventHubsEndpoint extends DefaultEndpoint {
+
+    @UriParam
+    private EventHubsConfiguration configuration;
+
+    public EventHubsEndpoint(final String uri, final Component component, 
final EventHubsConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        return new EventHubsProducer(this);
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new EventHubsConsumer(this, processor);

Review comment:
       Call configureConsumer(consumer) also, see other components how-to

##########
File path: 
components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import com.azure.messaging.eventhubs.EventProcessorClient;
+import com.azure.messaging.eventhubs.models.ErrorContext;
+import com.azure.messaging.eventhubs.models.EventContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import 
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.DefaultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EventHubsConsumer extends DefaultConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsConsumer.class);
+
+    // we use the EventProcessorClient as recommended by Azure docs to consume 
from all partitions
+    private EventProcessorClient processorClient;
+
+    public EventHubsConsumer(final EventHubsEndpoint endpoint, final Processor 
processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // create the client
+        processorClient = 
EventHubsClientFactory.createEventProcessorClient(getConfiguration(),
+                this::onEventListener, this::onErrorListener);
+
+        // start the client but we will rely on the Azure Client Scheduler for 
thread management
+        processorClient.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // shutdown the client
+        processorClient.stop();

Review comment:
       guard with != null as if camel fails to startup then this start method 
may not have been invoked and camel will call stop 

##########
File path: 
components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs.operations;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
+import 
org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+public class EventHubsProducerOperations {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsProducerOperations.class);
+
+    private final EventHubProducerAsyncClient producerAsyncClient;
+    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;
+
+    public EventHubsProducerOperations(final EventHubProducerAsyncClient 
producerAsyncClient, final EventHubsConfiguration configuration) {
+        ObjectHelper.notNull(producerAsyncClient, "client cannot be null");
+
+        this.producerAsyncClient = producerAsyncClient;
+        configurationOptionsProxy = new 
EventHubsConfigurationOptionsProxy(configuration);
+    }
+
+    public boolean sendEvents(final Exchange exchange, final AsyncCallback 
callback) {
+        ObjectHelper.notNull(exchange, "exchange cannot be null");
+        ObjectHelper.notNull(callback, "callback cannot be null");
+
+        final SendOptions sendOptions = 
createSendOptions(configurationOptionsProxy.getPartitionKey(exchange), 
configurationOptionsProxy.getPartitionId(exchange));
+        final Iterable<EventData> eventData = createEventData(exchange);
+
+        return sendAsyncEvents(eventData, sendOptions, exchange, callback);
+    }
+
+    private boolean sendAsyncEvents(final Iterable<EventData> eventData, final 
SendOptions sendOptions, final Exchange exchange, final AsyncCallback 
asyncCallback) {
+        final AtomicBoolean done = new AtomicBoolean(false);

Review comment:
       The asyncCallback must only be invoked once, not sure if the 3 branches 
below can cause it to be invoked twice etc.
   
   So you should essentially only return true if the method exits early and its 
the same thread that called this method that is calling the callback.
   
   So I would remove the atomic boolean, and make this method return false. And 
then in the sendAsyncEventsWithSuitableMethod you call the callback with 
(false) as parameter as it must match what this method returned.

##########
File path: 
components/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/operations/EventHubsProducerOperations.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs.operations;
+
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.azure.eventhubs.EventHubsConfiguration;
+import 
org.apache.camel.component.azure.eventhubs.EventHubsConfigurationOptionsProxy;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+public class EventHubsProducerOperations {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventHubsProducerOperations.class);
+
+    private final EventHubProducerAsyncClient producerAsyncClient;
+    private final EventHubsConfigurationOptionsProxy configurationOptionsProxy;
+
+    public EventHubsProducerOperations(final EventHubProducerAsyncClient 
producerAsyncClient, final EventHubsConfiguration configuration) {
+        ObjectHelper.notNull(producerAsyncClient, "client cannot be null");
+
+        this.producerAsyncClient = producerAsyncClient;
+        configurationOptionsProxy = new 
EventHubsConfigurationOptionsProxy(configuration);
+    }
+
+    public boolean sendEvents(final Exchange exchange, final AsyncCallback 
callback) {
+        ObjectHelper.notNull(exchange, "exchange cannot be null");
+        ObjectHelper.notNull(callback, "callback cannot be null");
+
+        final SendOptions sendOptions = 
createSendOptions(configurationOptionsProxy.getPartitionKey(exchange), 
configurationOptionsProxy.getPartitionId(exchange));
+        final Iterable<EventData> eventData = createEventData(exchange);
+
+        return sendAsyncEvents(eventData, sendOptions, exchange, callback);
+    }
+
+    private boolean sendAsyncEvents(final Iterable<EventData> eventData, final 
SendOptions sendOptions, final Exchange exchange, final AsyncCallback 
asyncCallback) {
+        final AtomicBoolean done = new AtomicBoolean(false);

Review comment:
       Also does the subcribe event below make any sense to react on? I would 
assume its either only the error or the completion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to