This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 016d8f4d8d [ZEPPELIN-5926] Remove map entries if Collection is empty (#4612) 016d8f4d8d is described below commit 016d8f4d8d1ff8d7cf29efa04bf47e2f2c1ac862 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Mon Jun 12 11:24:59 2023 +0200 [ZEPPELIN-5926] Remove map entries if Collection is empty (#4612) * Remove map entries if Collection is empty * Add license header to new class --- .../apache/zeppelin/socket/ConnectionManager.java | 73 +++++++++++++--------- .../zeppelin/socket/ConnectionManagerTest.java | 58 +++++++++++++++++ .../apache/zeppelin/socket/NotebookServerTest.java | 12 +++- 3 files changed, 111 insertions(+), 32 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index a3a40895bc..c323a87b3c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -47,7 +47,7 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -69,7 +69,7 @@ public class ConnectionManager { final Queue<NotebookSocket> connectedSockets = Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new ConcurrentLinkedQueue<>()); // noteId -> connection - final Map<String, List<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>()); + final Map<String, Set<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>()); // user -> connection final Map<String, Queue<NotebookSocket>> userSocketMap = Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>()); @@ -107,11 +107,9 @@ public class ConnectionManager { synchronized (noteSocketMap) { // make sure a socket relates only an single note. removeConnectionFromAllNote(socket); - List<NotebookSocket> socketList = noteSocketMap.computeIfAbsent(noteId, k -> new LinkedList<>()); - if (!socketList.contains(socket)) { - socketList.add(socket); - } - checkCollaborativeStatus(noteId, socketList); + Set<NotebookSocket> sockets = noteSocketMap.computeIfAbsent(noteId, k -> new HashSet<>()); + sockets.add(socket); + checkCollaborativeStatus(noteId, sockets); } } @@ -124,11 +122,33 @@ public class ConnectionManager { public void removeNoteConnection(String noteId, NotebookSocket socket) { LOGGER.debug("Remove connection {} from note: {}", socket, noteId); synchronized (noteSocketMap) { - List<NotebookSocket> socketList = noteSocketMap.getOrDefault(noteId, Collections.emptyList()); - if (!socketList.isEmpty()) { - socketList.remove(socket); + Set<NotebookSocket> sockets = noteSocketMap.getOrDefault(noteId, Collections.emptySet()); + removeNoteConnection(noteId, sockets, socket); + // Remove empty socket collection from map + if (sockets.isEmpty()) { + noteSocketMap.remove(noteId); + } + } + } + + private void removeNoteConnection(String noteId, Set<NotebookSocket> sockets, + NotebookSocket socket) { + sockets.remove(socket); + checkCollaborativeStatus(noteId, sockets); + } + + public void removeConnectionFromAllNote(NotebookSocket socket) { + LOGGER.debug("Remove connection {} from all notes", socket); + synchronized (noteSocketMap) { + Iterator<Entry<String, Set<NotebookSocket>>> iterator = noteSocketMap.entrySet().iterator(); + while (iterator.hasNext()) { + Entry<String, Set<NotebookSocket>> noteSocketMapEntry = iterator.next(); + removeNoteConnection(noteSocketMapEntry.getKey(), noteSocketMapEntry.getValue(), socket); + // Remove empty socket collection from map + if (noteSocketMapEntry.getValue().isEmpty()) { + iterator.remove(); + } } - checkCollaborativeStatus(noteId, socketList); } } @@ -147,7 +167,11 @@ public class ConnectionManager { public void removeUserConnection(String user, NotebookSocket conn) { LOGGER.debug("Remove user connection {} for user: {}", conn, user); if (userSocketMap.containsKey(user)) { - userSocketMap.get(user).remove(conn); + Queue<NotebookSocket> connections = userSocketMap.get(user); + connections.remove(conn); + if (connections.isEmpty()) { + userSocketMap.remove(user); + } } else { LOGGER.warn("Closing connection that is absent in user connections"); } @@ -156,7 +180,7 @@ public class ConnectionManager { public String getAssociatedNoteId(NotebookSocket socket) { String associatedNoteId = null; synchronized (noteSocketMap) { - for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) { + for (Entry<String, Set<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) { if (noteSocketMapEntry.getValue().contains(socket)) { associatedNoteId = noteSocketMapEntry.getKey(); } @@ -166,16 +190,7 @@ public class ConnectionManager { return associatedNoteId; } - public void removeConnectionFromAllNote(NotebookSocket socket) { - synchronized (noteSocketMap) { - Set<String> noteIds = noteSocketMap.keySet(); - for (String noteId : noteIds) { - removeNoteConnection(noteId, socket); - } - } - } - - private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) { + private void checkCollaborativeStatus(String noteId, Set<NotebookSocket> socketList) { if (!collaborativeModeEnable.booleanValue()) { return; } @@ -219,11 +234,11 @@ public class ConnectionManager { List<NotebookSocket> socketsToBroadcast; synchronized (noteSocketMap) { broadcastToWatchers(noteId, StringUtils.EMPTY, m); - List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.isEmpty()) { + Set<NotebookSocket> sockets = noteSocketMap.get(noteId); + if (sockets == null || sockets.isEmpty()) { return; } - socketsToBroadcast = new ArrayList<>(socketLists); + socketsToBroadcast = new ArrayList<>(sockets); } LOGGER.debug("SEND >> {}", m); for (NotebookSocket conn : socketsToBroadcast) { @@ -256,11 +271,11 @@ public class ConnectionManager { List<NotebookSocket> socketsToBroadcast; synchronized (noteSocketMap) { broadcastToWatchers(noteId, StringUtils.EMPTY, m); - List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.isEmpty()) { + Set<NotebookSocket> socketSet = noteSocketMap.get(noteId); + if (socketSet == null || socketSet.isEmpty()) { return; } - socketsToBroadcast = new ArrayList<>(socketLists); + socketsToBroadcast = new ArrayList<>(socketSet); } LOGGER.debug("SEND >> {}", m); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java new file mode 100644 index 0000000000..d3779c9bc6 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/ConnectionManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import org.apache.zeppelin.notebook.AuthorizationService; +import org.junit.jupiter.api.Test; + +class ConnectionManagerTest { + + @Test + void checkMapGrow() { + AuthorizationService authService = mock(AuthorizationService.class); + + ConnectionManager manager = new ConnectionManager(authService); + NotebookSocket socket = mock(NotebookSocket.class); + manager.addNoteConnection("test", socket); + assertEquals(1, manager.noteSocketMap.size()); + // Remove Connection from wrong note + manager.removeNoteConnection("test1", socket); + assertEquals(1, manager.noteSocketMap.size()); + // Remove Connection from right note + manager.removeNoteConnection("test", socket); + assertEquals(0, manager.noteSocketMap.size()); + + manager.addUserConnection("TestUser", socket); + assertEquals(1, manager.userSocketMap.size()); + manager.removeUserConnection("TestUser", socket); + assertEquals(0, manager.userSocketMap.size()); + } + + @Test + void checkMapGrowRemoveAll() { + AuthorizationService authService = mock(AuthorizationService.class); + + ConnectionManager manager = new ConnectionManager(authService); + NotebookSocket socket = mock(NotebookSocket.class); + manager.addNoteConnection("test", socket); + assertEquals(1, manager.noteSocketMap.size()); + manager.removeConnectionFromAllNote(socket); + assertEquals(0, manager.noteSocketMap.size()); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index 47a5bfa9e3..f8a90ebb12 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.socket; -import static java.util.Arrays.asList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -42,6 +41,7 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; @@ -566,7 +566,10 @@ class NotebookServerTest extends AbstractTestRestApi { .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); + Set<NotebookSocket> sockets = new HashSet<>(); + sockets.add(otherConn); + sockets.add(conn); + notebookServer.getConnectionManager().noteSocketMap.put("noteId", sockets); // When notebookServer.angularObjectClientBind(conn, messageReceived); @@ -619,7 +622,10 @@ class NotebookServerTest extends AbstractTestRestApi { .put("noteId", "noteId") .put("paragraphId", "paragraphId")); - notebookServer.getConnectionManager().noteSocketMap.put("noteId", asList(conn, otherConn)); + Set<NotebookSocket> sockets = new HashSet<>(); + sockets.add(otherConn); + sockets.add(conn); + notebookServer.getConnectionManager().noteSocketMap.put("noteId", sockets); // When notebookServer.angularObjectClientUnbind(conn, messageReceived);