This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new 4cee906 [ZEPPELIN-5569] Implement onError 4cee906 is described below commit 4cee9068c72805693bcb630ed13fea50da24506e Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Mon Mar 21 10:27:57 2022 +0100 [ZEPPELIN-5569] Implement onError ### What is this PR for? This PR implement `OnError`, which is necessary to cleanup the connectionManager, if possible. ### What type of PR is it? - BugFix ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5569 ### How should this be tested? * I have done some manual testing, but it is difficult to test this case. ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4260 from Reamer/onError and squashes the following commits: 46d1c88f2 [Philipp Dallig] catch RuntimeException 15f22d55f [Philipp Dallig] Add Metrics f9e537912 [Philipp Dallig] Implement onError 8d591d7ed [Philipp Dallig] remove NotebookSocket from sessionIdNotebookSocketMap --- .../apache/zeppelin/socket/ConnectionManager.java | 41 +++++++++--------- .../org/apache/zeppelin/socket/NotebookServer.java | 49 +++++++++++++++------- .../org/apache/zeppelin/socket/NotebookSocket.java | 4 +- .../org/apache/zeppelin/utils/ServerUtils.java | 37 ++++++++++++++++ 4 files changed, 97 insertions(+), 34 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index 51c8fdf..a3a4089 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -20,6 +20,10 @@ package org.apache.zeppelin.socket; import com.google.gson.Gson; import com.google.gson.GsonBuilder; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; + import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.GUI; @@ -46,6 +50,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -62,21 +67,21 @@ public class ConnectionManager { .setPrettyPrinting() .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); - final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); + final Queue<NotebookSocket> connectedSockets = Metrics.gaugeCollectionSize("zeppelin_connected_sockets", Tags.empty(), new ConcurrentLinkedQueue<>()); // noteId -> connection - final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>(); + final Map<String, List<NotebookSocket>> noteSocketMap = Metrics.gaugeMapSize("zeppelin_note_sockets", Tags.empty(), new HashMap<>()); // user -> connection - final Map<String, Queue<NotebookSocket>> userSocketMap = new HashMap<>(); + final Map<String, Queue<NotebookSocket>> userSocketMap = Metrics.gaugeMapSize("zeppelin_user_sockets", Tags.empty(), new HashMap<>()); /** - * This is a special endpoint in the notebook websoket, Every connection in this Queue + * This is a special endpoint in the notebook websocket, Every connection in this Queue * will be able to watch every websocket event, it doesnt need to be listed into the map of * noteSocketMap. This can be used to get information about websocket traffic and watch what * is going on. */ final Queue<NotebookSocket> watcherSockets = new ConcurrentLinkedQueue<>(); - private final HashSet<String> collaborativeModeList = new HashSet<>(); + private final HashSet<String> collaborativeModeList = Metrics.gaugeCollectionSize("zeppelin_collaborative_modes", Tags.empty(),new HashSet<>()); private final Boolean collaborativeModeEnable = ZeppelinConfiguration .create() .isZeppelinNotebookCollaborativeModeEnable(); @@ -151,11 +156,9 @@ public class ConnectionManager { public String getAssociatedNoteId(NotebookSocket socket) { String associatedNoteId = null; synchronized (noteSocketMap) { - Set<String> noteIds = noteSocketMap.keySet(); - for (String noteId : noteIds) { - List<NotebookSocket> sockets = noteSocketMap.get(noteId); - if (sockets.contains(socket)) { - associatedNoteId = noteId; + for (Entry<String, List<NotebookSocket>> noteSocketMapEntry : noteSocketMap.entrySet()) { + if (noteSocketMapEntry.getValue().contains(socket)) { + associatedNoteId = noteSocketMapEntry.getKey(); } } } @@ -205,7 +208,7 @@ public class ConnectionManager { for (NotebookSocket ns : connectedSockets) { try { ns.send(serializeMessage(m)); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("Send error: {}", m, e); } } @@ -226,7 +229,7 @@ public class ConnectionManager { for (NotebookSocket conn : socketsToBroadcast) { try { conn.send(serializeMessage(m)); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("socket error", e); } } @@ -242,7 +245,7 @@ public class ConnectionManager { .message(serializeMessage(message)) .build() .toJson()); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("Cannot broadcast message to watcher", e); } } @@ -267,7 +270,7 @@ public class ConnectionManager { } try { conn.send(serializeMessage(m)); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("socket error", e); } } @@ -289,7 +292,7 @@ public class ConnectionManager { try { conn.send(serializedMsg); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("Cannot broadcast message to conn", e); } } @@ -319,7 +322,7 @@ public class ConnectionManager { public void unicast(Message m, NotebookSocket conn) { try { conn.send(serializeMessage(m)); - } catch (IOException e) { + } catch (IOException | RuntimeException e) { LOGGER.error("socket error", e); } broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m); @@ -385,9 +388,9 @@ public class ConnectionManager { public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap) { if (null != userParagraphMap) { - for (String user : userParagraphMap.keySet()) { - multicastToUser(user, - new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); + for (Entry<String, Paragraph> userParagraphEntry : userParagraphMap.entrySet()) { + multicastToUser(userParagraphEntry.getKey(), + new Message(Message.OP.PARAGRAPH).put("paragraph", userParagraphEntry.getValue())); } } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 238545c..6ef9cb2 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -19,8 +19,13 @@ package org.apache.zeppelin.socket; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; + +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Tags; + import java.io.IOException; import java.lang.reflect.Type; +import java.net.SocketTimeoutException; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; @@ -40,6 +45,7 @@ import javax.inject.Provider; import javax.websocket.CloseReason; import javax.websocket.EndpointConfig; import javax.websocket.OnClose; +import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; @@ -91,6 +97,7 @@ import org.apache.zeppelin.types.InterpreterSettingsList; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.util.IdHashes; import org.apache.zeppelin.utils.CorsUtils; +import org.apache.zeppelin.utils.ServerUtils; import org.apache.zeppelin.utils.TestUtils; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -147,7 +154,7 @@ public class NotebookServer implements AngularObjectRegistryListener, ZeppelinConfiguration.ConfVars.ZEPPELIN_WEBSOCKET_PARAGRAPH_STATUS_PROGRESS); // TODO(jl): This will be removed by handling session directly - private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = new ConcurrentHashMap<>(); + private final Map<String, NotebookSocket> sessionIdNotebookSocketMap = Metrics.gaugeMapSize("zeppelin_session_id_notebook_sockets", Tags.empty(), new ConcurrentHashMap<>()); private ConnectionManager connectionManager; private ZeppelinConfiguration zeppelinConfiguration; private Provider<Notebook> notebookProvider; @@ -245,7 +252,7 @@ public class NotebookServer implements AngularObjectRegistryListener, @OnOpen public void onOpen(Session session, EndpointConfig endpointConfig) throws IOException { - LOG.info("Session: {}, config: {}", session, endpointConfig.getUserProperties().keySet()); + LOG.info("Open connection to {} with Session: {}, config: {}", ServerUtils.getRemoteAddress(session), session, endpointConfig.getUserProperties().keySet()); Map<String, Object> headers = endpointConfig.getUserProperties(); String origin = String.valueOf(headers.get(CorsUtils.HEADER_ORIGIN)); @@ -318,7 +325,6 @@ public class NotebookServer implements AngularObjectRegistryListener, if (StringUtils.isEmpty(conn.getUser())) { connectionManager.addUserConnection(receivedMessage.principal, conn); } - ServiceContext context = getServiceContext(ticketEntry); // Lets be elegant here switch (receivedMessage.op) { @@ -484,7 +490,7 @@ public class NotebookServer implements AngularObjectRegistryListener, break; } } catch (Exception e) { - LOG.error("Can't handle message: " + msg, e); + LOG.error("Can't handle message: {}", msg, e); try { conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", e.getMessage()))); } catch (IOException iox) { @@ -494,24 +500,39 @@ public class NotebookServer implements AngularObjectRegistryListener, } @OnClose - public void onClose(Session session, CloseReason closeReason) throws IOException { - NotebookSocket notebookSocket = sessionIdNotebookSocketMap.get(session.getId()); - if (null == notebookSocket) { - session.close(); - } else { - int code = closeReason.getCloseCode().getCode(); - String reason = closeReason.getReasonPhrase(); - onClose(notebookSocket, code, reason); + public void onClose(Session session, CloseReason closeReason) { + NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId()); + if (notebookSocket != null) { + LOG.info("Closed connection to {} ({}) {}", ServerUtils.getRemoteAddress(session), closeReason.getCloseCode().getCode(), closeReason.getReasonPhrase()); + removeConnection(notebookSocket); } } - public void onClose(NotebookSocket notebookSocket, int code, String reason) { - LOG.info("Closed connection to {} ({}) {}", notebookSocket, code, reason); + private void removeConnection(NotebookSocket notebookSocket) { connectionManager.removeConnection(notebookSocket); connectionManager.removeConnectionFromAllNote(notebookSocket); connectionManager.removeUserConnection(notebookSocket.getUser(), notebookSocket); } + @OnError + public void onError(Session session, Throwable error) { + if (session != null) { + NotebookSocket notebookSocket = sessionIdNotebookSocketMap.remove(session.getId()); + if (notebookSocket != null) { + removeConnection(notebookSocket); + } + } + if (error instanceof SocketTimeoutException) { + LOG.warn("Socket Session to {} timed out", ServerUtils.getRemoteAddress(session)); + LOG.debug("SocketTimeoutException", error); + } else if (error instanceof IOException) { + LOG.warn("Client {} is gone", ServerUtils.getRemoteAddress(session)); + LOG.debug("IOException", error); + } else { + LOG.error("Error in WebSocket Session to {}", ServerUtils.getRemoteAddress(session), error); + } + } + protected Message deserializeMessage(String msg) { return gson.fromJson(msg, Message.class); } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index 609b4bb..58c41cd 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.socket; import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.utils.ServerUtils; + import java.io.IOException; import java.util.Map; @@ -54,6 +56,6 @@ public class NotebookSocket { @Override public String toString() { - return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress")); + return ServerUtils.getRemoteAddress(session); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java new file mode 100644 index 0000000..179a2a6 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/ServerUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.utils; + +import javax.websocket.Session; + +public class ServerUtils { + private ServerUtils() { + // do nothing + } + + /** + * + * @param session + * @return the remote address of the websocket or "unknown" if session is null + */ + public static String getRemoteAddress(Session session) { + if (session != null && session.getUserProperties() != null) { + return String.valueOf(session.getUserProperties().get("javax.websocket.endpoint.remoteAddress")); + } + return "unknown"; + } +}