This is an automated email from the ASF dual-hosted git repository. marat pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-karavan.git
commit 940b481a121b65f6d058f0ff456f519781ace5c9 Author: Marat Gubaidullin <ma...@talismancloud.io> AuthorDate: Sun Aug 25 12:38:04 2024 -0400 Fix #1377 --- .../camel/karavan/api/NotificationResource.java | 19 ++++++--- .../scheduler/NotificationPingScheduler.java | 41 +++++++++++++++++++ .../camel/karavan/service/NotificationService.java | 46 ++++++++++++++++++++++ 3 files changed, 101 insertions(+), 5 deletions(-) diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java b/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java index 4659fff4..3a243380 100644 --- a/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java +++ b/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java @@ -18,6 +18,8 @@ package org.apache.camel.karavan.api; import io.smallrye.mutiny.Multi; import io.vertx.core.json.JsonObject; +import io.vertx.mutiny.core.eventbus.EventBus; +import jakarta.inject.Inject; import jakarta.ws.rs.GET; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; @@ -27,16 +29,23 @@ import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.sse.OutboundSseEvent; import jakarta.ws.rs.sse.Sse; import jakarta.ws.rs.sse.SseEventSink; +import org.apache.camel.karavan.service.NotificationService; import org.jboss.resteasy.reactive.RestStreamElementType; import static org.apache.camel.karavan.listener.NotificationListener.*; @Path("/ui/notification") -public class NotificationResource extends AbstractSseResource { +public class NotificationResource { private static final String SERVICE_NAME_SYSTEM = "NOTIFICATION_SYSTEM"; private static final String SERVICE_NAME_USER = "NOTIFICATION_USER"; + @Inject + EventBus eventBus; + + @Inject + NotificationService notificationService; + @GET @Path("/system/{username}") @Produces(MediaType.SERVER_SENT_EVENTS) @@ -46,8 +55,8 @@ public class NotificationResource extends AbstractSseResource { @Context SseEventSink eventSink, @Context Sse sse ) { - sinkCleanup(SERVICE_NAME_SYSTEM, username, eventSink); - return bus.<JsonObject>consumer(NOTIFICATION_ADDRESS_SYSTEM).toMulti() + notificationService.sinkCleanup(SERVICE_NAME_SYSTEM, username, eventSink); + return eventBus.<JsonObject>consumer(NOTIFICATION_ADDRESS_SYSTEM).toMulti() .map(m -> sse.newEventBuilder() .id(m.headers().get(NOTIFICATION_HEADER_EVENT_ID)) .name(m.headers().get(NOTIFICATION_HEADER_EVENT_NAME) + ":" + m.headers().get(NOTIFICATION_HEADER_CLASS_NAME)) @@ -64,8 +73,8 @@ public class NotificationResource extends AbstractSseResource { @Context SseEventSink eventSink, @Context Sse sse ) { - sinkCleanup(SERVICE_NAME_USER, username, eventSink); - return bus.<JsonObject>consumer(username).toMulti() + notificationService.sinkCleanup(SERVICE_NAME_USER, username, eventSink); + return eventBus.<JsonObject>consumer(username).toMulti() .map(m -> sse.newEventBuilder() .id(m.headers().get(NOTIFICATION_HEADER_EVENT_ID)) .name(m.headers().get(NOTIFICATION_HEADER_EVENT_NAME) + ":" + m.headers().get(NOTIFICATION_HEADER_CLASS_NAME)) diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java new file mode 100644 index 00000000..394ed927 --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.karavan.scheduler; + +import io.quarkus.scheduler.Scheduled; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; +import org.apache.camel.karavan.service.NotificationService; +import org.jboss.resteasy.reactive.server.jaxrs.OutboundSseEventImpl; + + +@ApplicationScoped +public class NotificationPingScheduler { + + @Inject + NotificationService notificationService; + + @Scheduled(every = "30s", concurrentExecution = Scheduled.ConcurrentExecution.SKIP) + public void ping() { + notificationService.getSinks().forEach(sink -> { + if (!sink.isClosed()) { + sink.send(new OutboundSseEventImpl.BuilderImpl().name("ping").data(String.class, "ping").build()); + } + }); + } +} \ No newline at end of file diff --git a/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java b/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java new file mode 100644 index 00000000..861e3cdb --- /dev/null +++ b/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.karavan.service; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.sse.SseEventSink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@ApplicationScoped +public class NotificationService { + + private static final Map<String, SseEventSink> sinkMap = new ConcurrentHashMap<>(); + + public void sinkCleanup(String service, String username, SseEventSink eventSink) { + String key = service + ":" + username; + if (sinkMap.containsKey(key)) { + var sink = sinkMap.get(key); + if (!sink.isClosed()) { + sink.close(); + } + } + sinkMap.put(key, eventSink); + } + + public List<SseEventSink> getSinks() { + return new ArrayList<>(sinkMap.values()); + } +} \ No newline at end of file