This is an automated email from the ASF dual-hosted git repository.

marat pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-karavan.git

commit 940b481a121b65f6d058f0ff456f519781ace5c9
Author: Marat Gubaidullin <ma...@talismancloud.io>
AuthorDate: Sun Aug 25 12:38:04 2024 -0400

    Fix #1377
---
 .../camel/karavan/api/NotificationResource.java    | 19 ++++++---
 .../scheduler/NotificationPingScheduler.java       | 41 +++++++++++++++++++
 .../camel/karavan/service/NotificationService.java | 46 ++++++++++++++++++++++
 3 files changed, 101 insertions(+), 5 deletions(-)

diff --git 
a/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java
 
b/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java
index 4659fff4..3a243380 100644
--- 
a/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java
+++ 
b/karavan-app/src/main/java/org/apache/camel/karavan/api/NotificationResource.java
@@ -18,6 +18,8 @@ package org.apache.camel.karavan.api;
 
 import io.smallrye.mutiny.Multi;
 import io.vertx.core.json.JsonObject;
+import io.vertx.mutiny.core.eventbus.EventBus;
+import jakarta.inject.Inject;
 import jakarta.ws.rs.GET;
 import jakarta.ws.rs.Path;
 import jakarta.ws.rs.PathParam;
@@ -27,16 +29,23 @@ import jakarta.ws.rs.core.MediaType;
 import jakarta.ws.rs.sse.OutboundSseEvent;
 import jakarta.ws.rs.sse.Sse;
 import jakarta.ws.rs.sse.SseEventSink;
+import org.apache.camel.karavan.service.NotificationService;
 import org.jboss.resteasy.reactive.RestStreamElementType;
 
 import static org.apache.camel.karavan.listener.NotificationListener.*;
 
 @Path("/ui/notification")
-public class NotificationResource extends AbstractSseResource {
+public class NotificationResource {
 
     private static final String SERVICE_NAME_SYSTEM = "NOTIFICATION_SYSTEM";
     private static final String SERVICE_NAME_USER = "NOTIFICATION_USER";
 
+    @Inject
+    EventBus eventBus;
+
+    @Inject
+    NotificationService notificationService;
+
     @GET
     @Path("/system/{username}")
     @Produces(MediaType.SERVER_SENT_EVENTS)
@@ -46,8 +55,8 @@ public class NotificationResource extends AbstractSseResource 
{
             @Context SseEventSink eventSink,
             @Context Sse sse
     ) {
-        sinkCleanup(SERVICE_NAME_SYSTEM, username, eventSink);
-        return bus.<JsonObject>consumer(NOTIFICATION_ADDRESS_SYSTEM).toMulti()
+        notificationService.sinkCleanup(SERVICE_NAME_SYSTEM, username, 
eventSink);
+        return 
eventBus.<JsonObject>consumer(NOTIFICATION_ADDRESS_SYSTEM).toMulti()
                 .map(m -> sse.newEventBuilder()
                         .id(m.headers().get(NOTIFICATION_HEADER_EVENT_ID))
                         .name(m.headers().get(NOTIFICATION_HEADER_EVENT_NAME) 
+ ":" + m.headers().get(NOTIFICATION_HEADER_CLASS_NAME))
@@ -64,8 +73,8 @@ public class NotificationResource extends AbstractSseResource 
{
             @Context SseEventSink eventSink,
             @Context Sse sse
     ) {
-        sinkCleanup(SERVICE_NAME_USER, username, eventSink);
-        return bus.<JsonObject>consumer(username).toMulti()
+        notificationService.sinkCleanup(SERVICE_NAME_USER, username, 
eventSink);
+        return eventBus.<JsonObject>consumer(username).toMulti()
                 .map(m -> sse.newEventBuilder()
                         .id(m.headers().get(NOTIFICATION_HEADER_EVENT_ID))
                         .name(m.headers().get(NOTIFICATION_HEADER_EVENT_NAME) 
+ ":" + m.headers().get(NOTIFICATION_HEADER_CLASS_NAME))
diff --git 
a/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java
 
b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java
new file mode 100644
index 00000000..394ed927
--- /dev/null
+++ 
b/karavan-app/src/main/java/org/apache/camel/karavan/scheduler/NotificationPingScheduler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.karavan.scheduler;
+
+import io.quarkus.scheduler.Scheduled;
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.inject.Inject;
+import org.apache.camel.karavan.service.NotificationService;
+import org.jboss.resteasy.reactive.server.jaxrs.OutboundSseEventImpl;
+
+
+@ApplicationScoped
+public class NotificationPingScheduler {
+
+    @Inject
+    NotificationService notificationService;
+
+    @Scheduled(every = "30s", concurrentExecution = 
Scheduled.ConcurrentExecution.SKIP)
+    public void ping() {
+        notificationService.getSinks().forEach(sink -> {
+            if (!sink.isClosed()) {
+                sink.send(new 
OutboundSseEventImpl.BuilderImpl().name("ping").data(String.class, 
"ping").build());
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git 
a/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java
 
b/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java
new file mode 100644
index 00000000..861e3cdb
--- /dev/null
+++ 
b/karavan-app/src/main/java/org/apache/camel/karavan/service/NotificationService.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.karavan.service;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.ws.rs.sse.SseEventSink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@ApplicationScoped
+public class NotificationService {
+
+    private static final Map<String, SseEventSink> sinkMap = new 
ConcurrentHashMap<>();
+
+    public void sinkCleanup(String service, String username, SseEventSink 
eventSink) {
+        String key = service + ":" + username;
+        if (sinkMap.containsKey(key)) {
+            var sink = sinkMap.get(key);
+            if (!sink.isClosed()) {
+                sink.close();
+            }
+        }
+        sinkMap.put(key, eventSink);
+    }
+
+    public List<SseEventSink> getSinks() {
+        return new ArrayList<>(sinkMap.values());
+    }
+}
\ No newline at end of file

Reply via email to