This is an automated email from the ASF dual-hosted git repository.

pdallig pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 016d8f4d8d [ZEPPELIN-5926] Remove map entries if Collection is empty 
(#4612)
016d8f4d8d is described below

commit 016d8f4d8d1ff8d7cf29efa04bf47e2f2c1ac862
Author: Philipp Dallig <philipp.dal...@gmail.com>
AuthorDate: Mon Jun 12 11:24:59 2023 +0200

    [ZEPPELIN-5926] Remove map entries if Collection is empty (#4612)
    
    * Remove map entries if Collection is empty
    
    * Add license header to new class
---
 .../apache/zeppelin/socket/ConnectionManager.java  | 73 +++++++++++++---------
 .../zeppelin/socket/ConnectionManagerTest.java     | 58 +++++++++++++++++
 .../apache/zeppelin/socket/NotebookServerTest.java | 12 +++-
 3 files changed, 111 insertions(+), 32 deletions(-)

diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
index a3a40895bc..c323a87b3c 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java
@@ -47,7 +47,7 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -69,7 +69,7 @@ public class ConnectionManager {
 
   final Queue<NotebookSocket> connectedSockets = 
Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new 
ConcurrentLinkedQueue<>());
   // noteId -> connection
-  final Map<String, List<NotebookSocket>> noteSocketMap = 
Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
+  final Map<String, Set<NotebookSocket>> noteSocketMap = 
Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>());
   // user -> connection
   final Map<String, Queue<NotebookSocket>> userSocketMap = 
Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>());
 
@@ -107,11 +107,9 @@ public class ConnectionManager {
     synchronized (noteSocketMap) {
       // make sure a socket relates only an single note.
       removeConnectionFromAllNote(socket);
-      List<NotebookSocket> socketList = noteSocketMap.computeIfAbsent(noteId, 
k -> new LinkedList<>());
-      if (!socketList.contains(socket)) {
-        socketList.add(socket);
-      }
-      checkCollaborativeStatus(noteId, socketList);
+      Set<NotebookSocket> sockets = noteSocketMap.computeIfAbsent(noteId, k -> 
new HashSet<>());
+      sockets.add(socket);
+      checkCollaborativeStatus(noteId, sockets);
     }
   }
 
@@ -124,11 +122,33 @@ public class ConnectionManager {
   public void removeNoteConnection(String noteId, NotebookSocket socket) {
     LOGGER.debug("Remove connection {} from note: {}", socket, noteId);
     synchronized (noteSocketMap) {
-      List<NotebookSocket> socketList = noteSocketMap.getOrDefault(noteId, 
Collections.emptyList());
-      if (!socketList.isEmpty()) {
-        socketList.remove(socket);
+      Set<NotebookSocket> sockets = noteSocketMap.getOrDefault(noteId, 
Collections.emptySet());
+      removeNoteConnection(noteId, sockets, socket);
+      // Remove empty socket collection from map
+      if (sockets.isEmpty()) {
+        noteSocketMap.remove(noteId);
+      }
+    }
+  }
+
+  private void removeNoteConnection(String noteId, Set<NotebookSocket> sockets,
+    NotebookSocket socket) {
+    sockets.remove(socket);
+    checkCollaborativeStatus(noteId, sockets);
+  }
+
+  public void removeConnectionFromAllNote(NotebookSocket socket) {
+    LOGGER.debug("Remove connection {} from all notes", socket);
+    synchronized (noteSocketMap) {
+      Iterator<Entry<String, Set<NotebookSocket>>> iterator = 
noteSocketMap.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Entry<String, Set<NotebookSocket>> noteSocketMapEntry = 
iterator.next();
+        removeNoteConnection(noteSocketMapEntry.getKey(), 
noteSocketMapEntry.getValue(), socket);
+        // Remove empty socket collection from map
+        if (noteSocketMapEntry.getValue().isEmpty()) {
+          iterator.remove();
+        }
       }
-      checkCollaborativeStatus(noteId, socketList);
     }
   }
 
@@ -147,7 +167,11 @@ public class ConnectionManager {
   public void removeUserConnection(String user, NotebookSocket conn) {
     LOGGER.debug("Remove user connection {} for user: {}", conn, user);
     if (userSocketMap.containsKey(user)) {
-      userSocketMap.get(user).remove(conn);
+      Queue<NotebookSocket> connections = userSocketMap.get(user);
+      connections.remove(conn);
+      if (connections.isEmpty()) {
+        userSocketMap.remove(user);
+      }
     } else {
       LOGGER.warn("Closing connection that is absent in user connections");
     }
@@ -156,7 +180,7 @@ public class ConnectionManager {
   public String getAssociatedNoteId(NotebookSocket socket) {
     String associatedNoteId = null;
     synchronized (noteSocketMap) {
-      for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : 
noteSocketMap.entrySet()) {
+      for (Entry<String, Set<NotebookSocket>> noteSocketMapEntry : 
noteSocketMap.entrySet()) {
         if (noteSocketMapEntry.getValue().contains(socket)) {
           associatedNoteId = noteSocketMapEntry.getKey();
         }
@@ -166,16 +190,7 @@ public class ConnectionManager {
     return associatedNoteId;
   }
 
-  public void removeConnectionFromAllNote(NotebookSocket socket) {
-    synchronized (noteSocketMap) {
-      Set<String> noteIds = noteSocketMap.keySet();
-      for (String noteId : noteIds) {
-        removeNoteConnection(noteId, socket);
-      }
-    }
-  }
-
-  private void checkCollaborativeStatus(String noteId, List<NotebookSocket> 
socketList) {
+  private void checkCollaborativeStatus(String noteId, Set<NotebookSocket> 
socketList) {
     if (!collaborativeModeEnable.booleanValue()) {
       return;
     }
@@ -219,11 +234,11 @@ public class ConnectionManager {
     List<NotebookSocket> socketsToBroadcast;
     synchronized (noteSocketMap) {
       broadcastToWatchers(noteId, StringUtils.EMPTY, m);
-      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
-      if (socketLists == null || socketLists.isEmpty()) {
+      Set<NotebookSocket> sockets = noteSocketMap.get(noteId);
+      if (sockets == null || sockets.isEmpty()) {
         return;
       }
-      socketsToBroadcast = new ArrayList<>(socketLists);
+      socketsToBroadcast = new ArrayList<>(sockets);
     }
     LOGGER.debug("SEND >> {}", m);
     for (NotebookSocket conn : socketsToBroadcast) {
@@ -256,11 +271,11 @@ public class ConnectionManager {
     List<NotebookSocket> socketsToBroadcast;
     synchronized (noteSocketMap) {
       broadcastToWatchers(noteId, StringUtils.EMPTY, m);
-      List<NotebookSocket> socketLists = noteSocketMap.get(noteId);
-      if (socketLists == null || socketLists.isEmpty()) {
+      Set<NotebookSocket> socketSet = noteSocketMap.get(noteId);
+      if (socketSet == null || socketSet.isEmpty()) {
         return;
       }
-      socketsToBroadcast = new ArrayList<>(socketLists);
+      socketsToBroadcast = new ArrayList<>(socketSet);
     }
 
     LOGGER.debug("SEND >> {}", m);
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java
new file mode 100644
index 0000000000..d3779c9bc6
--- /dev/null
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.socket;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import org.apache.zeppelin.notebook.AuthorizationService;
+import org.junit.jupiter.api.Test;
+
+class ConnectionManagerTest {
+
+  @Test
+  void checkMapGrow() {
+    AuthorizationService authService = mock(AuthorizationService.class);
+
+    ConnectionManager manager = new ConnectionManager(authService);
+    NotebookSocket socket = mock(NotebookSocket.class);
+    manager.addNoteConnection("test", socket);
+    assertEquals(1, manager.noteSocketMap.size());
+    // Remove Connection from wrong note
+    manager.removeNoteConnection("test1", socket);
+    assertEquals(1, manager.noteSocketMap.size());
+    // Remove Connection from right note
+    manager.removeNoteConnection("test", socket);
+    assertEquals(0, manager.noteSocketMap.size());
+
+    manager.addUserConnection("TestUser", socket);
+    assertEquals(1, manager.userSocketMap.size());
+    manager.removeUserConnection("TestUser", socket);
+    assertEquals(0, manager.userSocketMap.size());
+  }
+
+  @Test
+  void checkMapGrowRemoveAll() {
+    AuthorizationService authService = mock(AuthorizationService.class);
+
+    ConnectionManager manager = new ConnectionManager(authService);
+    NotebookSocket socket = mock(NotebookSocket.class);
+    manager.addNoteConnection("test", socket);
+    assertEquals(1, manager.noteSocketMap.size());
+    manager.removeConnectionFromAllNote(socket);
+    assertEquals(0, manager.noteSocketMap.size());
+  }
+}
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
index 47a5bfa9e3..f8a90ebb12 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.zeppelin.socket;
 
-import static java.util.Arrays.asList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -42,6 +41,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.servlet.http.HttpServletRequest;
 
@@ -566,7 +566,10 @@ class NotebookServerTest extends AbstractTestRestApi {
               .put("noteId", "noteId")
               .put("paragraphId", "paragraphId"));
 
-      notebookServer.getConnectionManager().noteSocketMap.put("noteId", 
asList(conn, otherConn));
+      Set<NotebookSocket> sockets = new HashSet<>();
+      sockets.add(otherConn);
+      sockets.add(conn);
+      notebookServer.getConnectionManager().noteSocketMap.put("noteId", 
sockets);
 
       // When
       notebookServer.angularObjectClientBind(conn, messageReceived);
@@ -619,7 +622,10 @@ class NotebookServerTest extends AbstractTestRestApi {
               .put("noteId", "noteId")
               .put("paragraphId", "paragraphId"));
 
-      notebookServer.getConnectionManager().noteSocketMap.put("noteId", 
asList(conn, otherConn));
+      Set<NotebookSocket> sockets = new HashSet<>();
+      sockets.add(otherConn);
+      sockets.add(conn);
+      notebookServer.getConnectionManager().noteSocketMap.put("noteId", 
sockets);
 
       // When
       notebookServer.angularObjectClientUnbind(conn, messageReceived);

Reply via email to