This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.18.x by this push:
new 7375c059b824 CAMEL-23291: Add ShutdownAware support to EventHubs and
Google PubSub consumers (#23831)
7375c059b824 is described below
commit 7375c059b824ec220a0d789e9a9f40b954c65ac0
Author: Claus Ibsen <[email protected]>
AuthorDate: Mon Jun 8 10:46:44 2026 +0200
CAMEL-23291: Add ShutdownAware support to EventHubs and Google PubSub
consumers (#23831)
Implement ShutdownAware in EventHubsConsumer and GooglePubsubConsumer to
prevent message loss during graceful shutdown. Track in-flight exchanges
with an AtomicInteger counter, split lifecycle so doStop() stops accepting
new messages while doShutdown() releases resources after in-flight
exchanges complete. Follows the pattern from CAMEL-23260
(ServiceBusConsumer).
Closes #23806
---
.../azure/eventhubs/EventHubsConsumer.java | 58 +++++++--
.../eventhubs/EventHubsConsumerShutdownTest.java | 145 +++++++++++++++++++++
.../google/pubsub/GooglePubsubConsumer.java | 76 ++++++++---
.../pubsub/consumer/CamelMessageReceiver.java | 34 +++--
.../pubsub/GooglePubsubConsumerShutdownTest.java | 83 ++++++++++++
5 files changed, 355 insertions(+), 41 deletions(-)
diff --git
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
index 58dc787dd88c..f465326fbe86 100644
---
a/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
+++
b/components/camel-azure/camel-azure-eventhubs/src/main/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumer.java
@@ -28,7 +28,9 @@ import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
import
org.apache.camel.component.azure.eventhubs.client.EventHubsClientFactory;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
@@ -37,7 +39,7 @@ import org.slf4j.LoggerFactory;
import static
org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_SIZE;
import static
org.apache.camel.component.azure.eventhubs.EventHubsConstants.COMPLETED_BY_TIMEOUT;
-public class EventHubsConsumer extends DefaultConsumer {
+public class EventHubsConsumer extends DefaultConsumer implements
ShutdownAware {
private static final Logger LOG =
LoggerFactory.getLogger(EventHubsConsumer.class);
@@ -45,6 +47,7 @@ public class EventHubsConsumer extends DefaultConsumer {
private EventProcessorClient processorClient;
private final AtomicInteger processedEvents;
+ private final AtomicInteger pendingExchanges = new AtomicInteger();
private ScheduledExecutorService scheduledExecutorService;
private ScheduledFuture<?> lastScheduledTask;
private EventHubsCheckpointUpdaterTask lastTask;
@@ -74,19 +77,44 @@ public class EventHubsConsumer extends DefaultConsumer {
@Override
protected void doStop() throws Exception {
if (processorClient != null) {
- // shutdown the client
+ // stop accepting new messages but keep the connection open
+ // so that in-flight exchanges can still complete
processorClient.stop();
- processorClient = null;
}
- // shutdown scheduled executor
+ // shutdown camel consumer
+ super.doStop();
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
+ processorClient = null;
+
+ // shutdown scheduled executor after all in-flight exchanges have
completed
if (scheduledExecutorService != null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(scheduledExecutorService);
scheduledExecutorService = null;
}
- // shutdown camel consumer
- super.doStop();
+ super.doShutdown();
+ }
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ if (processorClient != null) {
+ processorClient.stop();
+ }
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return pendingExchanges.get();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // noop
}
public EventHubsConfiguration getConfiguration() {
@@ -133,20 +161,30 @@ public class EventHubsConsumer extends DefaultConsumer {
}
private void onEventListener(final EventContext eventContext) {
+ pendingExchanges.incrementAndGet();
+
final Exchange exchange = createAzureEventHubExchange(eventContext);
// add exchange callback
exchange.getExchangeExtension().addOnCompletion(new Synchronization() {
@Override
public void onComplete(Exchange exchange) {
- // we update the consumer offsets
- processCommit(exchange, eventContext);
+ try {
+ // we update the consumer offsets
+ processCommit(exchange, eventContext);
+ } finally {
+ pendingExchanges.decrementAndGet();
+ }
}
@Override
public void onFailure(Exchange exchange) {
- // we do nothing here
- processRollback(exchange);
+ try {
+ // we do nothing here
+ processRollback(exchange);
+ } finally {
+ pendingExchanges.decrementAndGet();
+ }
}
});
// use default consumer callback
diff --git
a/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
new file mode 100644
index 000000000000..b52c7054ff2c
--- /dev/null
+++
b/components/camel-azure/camel-azure-eventhubs/src/test/java/org/apache/camel/component/azure/eventhubs/EventHubsConsumerShutdownTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.azure.eventhubs;
+
+import java.lang.reflect.Field;
+
+import com.azure.messaging.eventhubs.EventProcessorClient;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class EventHubsConsumerShutdownTest {
+
+ private final EventHubsEndpoint endpoint = mock();
+ private final EventHubsConfiguration configuration = mock();
+ private final AsyncProcessor processor = mock();
+ private final CamelContext context = mock();
+ private final ExtendedCamelContext ecc = mock();
+ private final ExchangeFactory ef = mock();
+ private final EventProcessorClient processorClient = mock();
+
+ @BeforeEach
+ void setUp() {
+ when(endpoint.getCamelContext()).thenReturn(context);
+ when(context.getCamelContextExtension()).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
+ when(ef.create(any(Endpoint.class), anyBoolean()))
+ .thenAnswer(invocation ->
DefaultExchange.newFromEndpoint(invocation.getArgument(0)));
+ when(endpoint.getConfiguration()).thenReturn(configuration);
+ }
+
+ @Test
+ void deferShutdownReturnsTrueAndStopsClient() throws Exception {
+ EventHubsConsumer consumer = new EventHubsConsumer(endpoint,
processor);
+ setProcessorClient(consumer, processorClient);
+
+ boolean deferred =
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+ assertTrue(deferred);
+ verify(processorClient).stop();
+ }
+
+ @Test
+ void pendingExchangesSizeIsZeroInitially() {
+ EventHubsConsumer consumer = new EventHubsConsumer(endpoint,
processor);
+
+ assertEquals(0, consumer.getPendingExchangesSize());
+ }
+
+ @Test
+ void pendingExchangesSizeTracksInflightMessages() throws Exception {
+ EventHubsConsumer consumer = new EventHubsConsumer(endpoint,
processor);
+ setProcessorClient(consumer, processorClient);
+
+ assertEquals(0, consumer.getPendingExchangesSize());
+
+ // simulate onEventListener by getting the completion callback and
invoking it
+ Exchange exchange = DefaultExchange.newFromEndpoint(endpoint);
+ // manually increment to simulate what onEventListener does
+ incrementPendingExchanges(consumer);
+
+ assertEquals(1, consumer.getPendingExchangesSize());
+
+ // simulate completion
+ decrementPendingExchanges(consumer);
+
+ assertEquals(0, consumer.getPendingExchangesSize());
+ }
+
+ @Test
+ void doStopStopsClientWithoutNullingIt() throws Exception {
+ EventHubsConsumer consumer = new EventHubsConsumer(endpoint,
processor);
+ setProcessorClient(consumer, processorClient);
+
+ consumer.doStop();
+
+ verify(processorClient).stop();
+ // processorClient should still be available for in-flight exchanges
+ assertEquals(processorClient, getProcessorClient(consumer));
+ }
+
+ @Test
+ void doShutdownNullsClient() throws Exception {
+ EventHubsConsumer consumer = new EventHubsConsumer(endpoint,
processor);
+ setProcessorClient(consumer, processorClient);
+
+ consumer.doShutdown();
+
+ assertEquals(null, getProcessorClient(consumer));
+ }
+
+ private static void setProcessorClient(EventHubsConsumer consumer,
EventProcessorClient client) throws Exception {
+ Field field =
EventHubsConsumer.class.getDeclaredField("processorClient");
+ field.setAccessible(true);
+ field.set(consumer, client);
+ }
+
+ private static EventProcessorClient getProcessorClient(EventHubsConsumer
consumer) throws Exception {
+ Field field =
EventHubsConsumer.class.getDeclaredField("processorClient");
+ field.setAccessible(true);
+ return (EventProcessorClient) field.get(consumer);
+ }
+
+ private static void incrementPendingExchanges(EventHubsConsumer consumer)
throws Exception {
+ Field field =
EventHubsConsumer.class.getDeclaredField("pendingExchanges");
+ field.setAccessible(true);
+ ((java.util.concurrent.atomic.AtomicInteger)
field.get(consumer)).incrementAndGet();
+ }
+
+ private static void decrementPendingExchanges(EventHubsConsumer consumer)
throws Exception {
+ Field field =
EventHubsConsumer.class.getDeclaredField("pendingExchanges");
+ field.setAccessible(true);
+ ((java.util.concurrent.atomic.AtomicInteger)
field.get(consumer)).decrementAndGet();
+ }
+}
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index ab01edd20f21..e6fed0419d68 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -26,6 +26,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiFuture;
@@ -44,11 +45,13 @@ import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.Subscription;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeCompletion;
import org.apache.camel.component.google.pubsub.consumer.AcknowledgeSync;
import org.apache.camel.component.google.pubsub.consumer.CamelMessageReceiver;
import
org.apache.camel.component.google.pubsub.consumer.GooglePubsubAcknowledge;
import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.UnitOfWorkHelper;
@@ -57,12 +60,13 @@ import org.apache.camel.support.task.budget.Budgets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GooglePubsubConsumer extends DefaultConsumer {
+public class GooglePubsubConsumer extends DefaultConsumer implements
ShutdownAware {
private final Logger localLog;
private final GooglePubsubEndpoint endpoint;
private final Processor processor;
+ private final AtomicInteger pendingExchanges = new AtomicInteger();
private ExecutorService executor;
private final List<Subscriber> subscribers;
private final Set<ApiFuture<PullResponse>> pendingSynchronousPullResponses;
@@ -116,6 +120,11 @@ public class GooglePubsubConsumer extends DefaultConsumer {
safeCancelSynchronousPullResponses();
+ super.doStop();
+ }
+
+ @Override
+ protected void doShutdown() throws Exception {
if (executor != null) {
if (getEndpoint() != null && getEndpoint().getCamelContext() !=
null) {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executor);
@@ -125,7 +134,26 @@ public class GooglePubsubConsumer extends DefaultConsumer {
}
executor = null;
- super.doStop();
+ super.doShutdown();
+ }
+
+ @Override
+ public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
+ if (!subscribers.isEmpty()) {
+ subscribers.forEach(AbstractApiService::stopAsync);
+ }
+ safeCancelSynchronousPullResponses();
+ return true;
+ }
+
+ @Override
+ public int getPendingExchangesSize() {
+ return pendingExchanges.get();
+ }
+
+ @Override
+ public void prepareShutdown(boolean suspendOnly, boolean forced) {
+ // noop
}
private void safeCancelSynchronousPullResponses() {
@@ -173,6 +201,14 @@ public class GooglePubsubConsumer extends DefaultConsumer {
return resolvedMaxDeliveryAttempts;
}
+ public void incrementPendingExchanges() {
+ pendingExchanges.incrementAndGet();
+ }
+
+ public void decrementPendingExchanges() {
+ pendingExchanges.decrementAndGet();
+ }
+
private class SubscriberWrapper implements Runnable {
private final String subscriptionName;
@@ -336,24 +372,30 @@ public class GooglePubsubConsumer extends DefaultConsumer
{
exchange.getIn().setHeader(pubSubHeader, value);
}
+ pendingExchanges.incrementAndGet();
try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
- // Handle exception if one occurred
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error
processing exchange", exchange,
- exchange.getException());
- }
+ // Handle exception if one occurred
+ if (exchange.getException() != null) {
+ getExceptionHandler().handleException("Error
processing exchange", exchange,
+ exchange.getException());
+ }
- // Execute synchronization callbacks (ACK/NACK) based
on exchange status
- // This is required because we are directly calling
processor.process() outside of the normal
- // Camel routing engine, so we must manually trigger
the OnCompletion callbacks
- if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
- List<Synchronization> synchronizations =
exchange.getExchangeExtension().handoverCompletions();
- UnitOfWorkHelper.doneSynchronizations(exchange,
synchronizations);
+ // Execute synchronization callbacks (ACK/NACK)
based on exchange status
+ // This is required because we are directly
calling processor.process() outside of the normal
+ // Camel routing engine, so we must manually
trigger the OnCompletion callbacks
+ if (endpoint.getAckMode() !=
GooglePubsubConstants.AckMode.NONE) {
+ List<Synchronization> synchronizations
+ =
exchange.getExchangeExtension().handoverCompletions();
+
UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
+ }
+ } finally {
+ pendingExchanges.decrementAndGet();
}
}
} catch (CancellationException e) {
diff --git
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index c2240b31c24b..eab17b95cc86 100644
---
a/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++
b/components/camel-google/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -101,23 +101,29 @@ public class CamelMessageReceiver implements
MessageReceiver {
exchange.getIn().setHeader(pubSubHeader, value);
}
+ consumer.incrementPendingExchanges();
try {
- processor.process(exchange);
- } catch (Exception e) {
- exchange.setException(e);
- }
+ try {
+ processor.process(exchange);
+ } catch (Exception e) {
+ exchange.setException(e);
+ }
- // Handle exception if one occurred
- if (exchange.getException() != null) {
- consumer.getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
- }
+ // Handle exception if one occurred
+ if (exchange.getException() != null) {
+ consumer.getExceptionHandler().handleException("Error
processing exchange", exchange,
+ exchange.getException());
+ }
- // Execute synchronization callbacks (ACK/NACK) based on exchange
status
- // This is required because we are directly calling
processor.process() outside of the normal
- // Camel routing engine, so we must manually trigger the OnCompletion
callbacks
- if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
- List<Synchronization> synchronizations =
exchange.getExchangeExtension().handoverCompletions();
- UnitOfWorkHelper.doneSynchronizations(exchange, synchronizations);
+ // Execute synchronization callbacks (ACK/NACK) based on exchange
status
+ // This is required because we are directly calling
processor.process() outside of the normal
+ // Camel routing engine, so we must manually trigger the
OnCompletion callbacks
+ if (endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
+ List<Synchronization> synchronizations =
exchange.getExchangeExtension().handoverCompletions();
+ UnitOfWorkHelper.doneSynchronizations(exchange,
synchronizations);
+ }
+ } finally {
+ consumer.decrementPendingExchanges();
}
}
diff --git
a/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
new file mode 100644
index 000000000000..ad2e16abc99d
--- /dev/null
+++
b/components/camel-google/camel-google-pubsub/src/test/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumerShutdownTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.google.pubsub;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
+import org.apache.camel.Processor;
+import org.apache.camel.ShutdownRunningTask;
+import org.apache.camel.spi.ExchangeFactory;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class GooglePubsubConsumerShutdownTest {
+
+ private final GooglePubsubEndpoint endpoint = mock();
+ private final Processor processor = mock();
+ private final CamelContext context = mock();
+ private final ExtendedCamelContext ecc = mock();
+ private final ExchangeFactory ef = mock();
+
+ @BeforeEach
+ void setUp() {
+ when(endpoint.getCamelContext()).thenReturn(context);
+ when(context.getCamelContextExtension()).thenReturn(ecc);
+ when(ecc.getExchangeFactory()).thenReturn(ef);
+ when(ef.newExchangeFactory(any())).thenReturn(ef);
+ }
+
+ @Test
+ void deferShutdownReturnsTrue() {
+ GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint,
processor);
+
+ boolean deferred =
consumer.deferShutdown(ShutdownRunningTask.CompleteAllTasks);
+
+ assertTrue(deferred);
+ }
+
+ @Test
+ void pendingExchangesSizeIsZeroInitially() {
+ GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint,
processor);
+
+ assertEquals(0, consumer.getPendingExchangesSize());
+ }
+
+ @Test
+ void pendingExchangesSizeTracksInflightMessages() {
+ GooglePubsubConsumer consumer = new GooglePubsubConsumer(endpoint,
processor);
+
+ assertEquals(0, consumer.getPendingExchangesSize());
+
+ consumer.incrementPendingExchanges();
+ assertEquals(1, consumer.getPendingExchangesSize());
+
+ consumer.incrementPendingExchanges();
+ assertEquals(2, consumer.getPendingExchangesSize());
+
+ consumer.decrementPendingExchanges();
+ assertEquals(1, consumer.getPendingExchangesSize());
+
+ consumer.decrementPendingExchanges();
+ assertEquals(0, consumer.getPendingExchangesSize());
+ }
+}