This is an automated email from the ASF dual-hosted git repository.

lresende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-toree.git


The following commit(s) were added to refs/heads/master by this push:
     new 47970f52 [TOREE-540] Fix deadlock on closing ZMQ by upgrading jeromq 
to 0.5.3 (#205)
47970f52 is described below

commit 47970f52081ca5bdd82e0fb987a7f99e5cfdcbfc
Author: Cheng Pan <[email protected]>
AuthorDate: Sun Aug 6 14:05:29 2023 +0800

    [TOREE-540] Fix deadlock on closing ZMQ by upgrading jeromq to 0.5.3 (#205)
---
 .../apache/toree/communication/SocketManager.scala | 10 ++---
 .../communication/socket/PubSocketRunnable.scala   |  5 ++-
 .../communication/socket/ReqSocketRunnable.scala   |  5 ++-
 .../toree/communication/socket/SocketType.scala    | 44 ----------------------
 .../socket/ZeroMQSocketRunnable.scala              | 10 ++---
 .../socket/ZeroMQSocketRunnableSpec.scala          | 23 +++++------
 project/Dependencies.scala                         |  2 +-
 7 files changed, 29 insertions(+), 70 deletions(-)

diff --git 
a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
 
b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
index 0422334a..7041a58e 100644
--- 
a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
+++ 
b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
@@ -19,7 +19,7 @@ package org.apache.toree.communication
 import java.util.UUID
 import java.util.concurrent.ConcurrentHashMap
 import org.apache.toree.communication.socket._
-import org.zeromq.ZMQ
+import org.zeromq.{SocketType, ZMQ}
 
 import scala.collection.JavaConverters._
 
@@ -99,7 +99,7 @@ class SocketManager {
   ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
-      RepSocket,
+      SocketType.REP,
       Some(inboundMessageCallback),
       Bind(address),
       Linger(0)
@@ -137,7 +137,7 @@ class SocketManager {
   ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
-      SubSocket,
+      SocketType.SUB,
       Some(inboundMessageCallback),
       Connect(address),
       Linger(0),
@@ -159,7 +159,7 @@ class SocketManager {
   ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
-      RouterSocket,
+      SocketType.ROUTER,
       Some(inboundMessageCallback),
       Bind(address),
       Linger(0)
@@ -181,7 +181,7 @@ class SocketManager {
   ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
-      DealerSocket,
+      SocketType.DEALER,
       Some(inboundMessageCallback),
       Connect(address),
       Linger(0),
diff --git 
a/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
 
b/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
index 40d680d7..c3a587ec 100644
--- 
a/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
+++ 
b/communication/src/main/scala/org/apache/toree/communication/socket/PubSocketRunnable.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.toree.communication.socket
 
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.SocketType
+import org.zeromq.ZMQ.{Context, Socket}
 
 /**
  * Represents the runnable component of a socket specifically targeted towards
@@ -30,7 +31,7 @@ class PubSocketRunnable(
   private val socketOptions: SocketOption*
 ) extends ZeroMQSocketRunnable(
   context,
-  PubSocket,
+  SocketType.PUB,
   None,
   socketOptions: _*
 ) {
diff --git 
a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
 
b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
index cd3d85e7..dea651cf 100644
--- 
a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
+++ 
b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.toree.communication.socket
 
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.SocketType
+import org.zeromq.ZMQ.{Context, Socket}
 
 /**
  * Represents the runnable component of a socket that processes messages and
@@ -35,7 +36,7 @@ class ReqSocketRunnable(
   private val socketOptions: SocketOption*
 ) extends ZeroMQSocketRunnable(
   context,
-  ReqSocket,
+  SocketType.REQ,
   inboundMessageCallback,
   socketOptions: _*
 ) {
diff --git 
a/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
 
b/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
deleted file mode 100644
index a93a6bc2..00000000
--- 
a/communication/src/main/scala/org/apache/toree/communication/socket/SocketType.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License
- */
-package org.apache.toree.communication.socket
-
-import org.zeromq.ZMQ
-
-/**
- * Represents the type option used to indicate the type of socket to create.
- *
- * @param `type` The type as an integer
- */
-sealed class SocketType(val `type`: Int)
-
-/** Represents a publish socket. */
-case object PubSocket extends SocketType(ZMQ.PUB)
-
-/** Represents a subscribe socket. */
-case object SubSocket extends SocketType(ZMQ.SUB)
-
-/** Represents a reply socket. */
-case object RepSocket extends SocketType(ZMQ.REP)
-
-/** Represents a request socket. */
-case object ReqSocket extends SocketType(ZMQ.REQ)
-
-/** Represents a router socket. */
-case object RouterSocket extends SocketType(ZMQ.ROUTER)
-
-/** Represents a dealer socket. */
-case object DealerSocket extends SocketType(ZMQ.DEALER)
diff --git 
a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
 
b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
index 0464d574..1ac7f08e 100644
--- 
a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
+++ 
b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
@@ -17,7 +17,7 @@
 package org.apache.toree.communication.socket
 
 import org.apache.toree.utils.LogLike
-import org.zeromq.{ZMsg, ZMQ}
+import org.zeromq.{SocketType, ZMsg, ZMQ}
 import org.zeromq.ZMQ.Context
 
 import scala.collection.JavaConverters._
@@ -64,7 +64,7 @@ class ZeroMQSocketRunnable(
    */
   protected def processOptions(socket: ZMQ.Socket): Unit = {
     val socketOptionsString = socketOptions.map("\n- " + 
_.toString).mkString("")
-    logger.trace(
+    logger.info(
       s"Processing options for socket $socketType: $socketOptionsString"
     )
 
@@ -103,7 +103,7 @@ class ZeroMQSocketRunnable(
    */
   protected def processNextOutboundMessage(socket: ZMQ.Socket): Boolean = {
     val message = Option(outboundMessages.poll())
-
+    message.foreach(msg => println(s"send: \n$msg"))
     message.foreach(_.send(socket))
 
     message.nonEmpty
@@ -134,11 +134,11 @@ class ZeroMQSocketRunnable(
    *
    * @return The new ZMQ.Socket instance
    */
-  protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: Int) =
+  protected def newZmqSocket(zmqContext: ZMQ.Context, socketType: SocketType) =
     zmqContext.socket(socketType)
 
   override def run(): Unit = {
-    val socket = newZmqSocket(context, 
socketType.`type`)//context.socket(socketType.`type`)
+    val socket = newZmqSocket(context, 
socketType)//context.socket(socketType.`type`)
 
     try {
       processOptions(socket)
diff --git 
a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
 
b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
index 605fa296..b77a6efd 100644
--- 
a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
+++ 
b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
@@ -20,8 +20,8 @@ import org.scalatest.concurrent.Eventually
 import org.scalatestplus.mockito.MockitoSugar
 import org.scalatest.time.{Milliseconds, Seconds, Span}
 import org.scalatest.{BeforeAndAfter, FunSpec, Matchers}
-import org.zeromq.ZMQ
-import org.zeromq.ZMQ.{Socket, Context}
+import org.zeromq.{SocketType, ZMQ}
+import org.zeromq.ZMQ.{Context, Socket}
 
 import scala.util.Try
 
@@ -50,13 +50,14 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
     inboundMessageCallback,
     socketOptions: _*
   ) {
-    override protected def newZmqSocket(zmqContext: Context, socketType: Int): 
Socket = socket
+    override protected def newZmqSocket(zmqContext: Context, socketType: 
SocketType): Socket = socket
   }
 
   before {
-    mockSocketType = mock[SocketType]
+    // TODO mockito 1.x does not support mock/spy enum, upgrade 2.x to achieve 
it
+    mockSocketType = SocketType.RAW // mock[SocketType]
     zmqContext = ZMQ.context(1)
-    pubSocket = zmqContext.socket(PubSocket.`type`)
+    pubSocket = zmqContext.socket(SocketType.PUB)
   }
 
   after {
@@ -119,7 +120,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
         val runnable: TestRunnable = new TestRunnable(
           pubSocket,
           zmqContext,
-          PubSocket,
+          SocketType.PUB,
           None,
           Connect(TestAddress),
           Linger(expected)
@@ -142,7 +143,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
         val runnable: TestRunnable = new TestRunnable(
           pubSocket,
           zmqContext,
-          PubSocket,
+          SocketType.PUB,
           None,
           Connect(TestAddress),
           Identity(expected)
@@ -163,7 +164,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
         val runnable = new TestRunnable(
           pubSocket,
           zmqContext,
-          PubSocket,
+          SocketType.PUB,
           None,
           Connect(TestAddress)
         )
@@ -189,7 +190,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
         val runnable = new TestRunnable(
           pubSocket,
           zmqContext,
-          PubSocket,
+          SocketType.PUB,
           None,
           Connect(TestAddress)
         )
@@ -213,7 +214,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
         val runnable = new TestRunnable(
           pubSocket,
           zmqContext,
-          PubSocket,
+          SocketType.PUB,
           None,
           Connect(TestAddress)
         )
@@ -235,7 +236,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
           val runnable = new TestRunnable(
             pubSocket,
             zmqContext,
-            PubSocket,
+            SocketType.PUB,
             None,
             Connect(TestAddress)
           )
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index f0a8520c..9c90ee6e 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -41,7 +41,7 @@ object Dependencies {
   // use the same jackson version in test than the one provided at runtime by 
Spark 3.0.0
   val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % 
"2.10.0" // Apache v2
 
-  val jeroMq = "org.zeromq" % "jeromq" % "0.4.3" // MPL v2
+  val jeroMq = "org.zeromq" % "jeromq" % "0.5.3" // MPL v2
 
   val joptSimple = "net.sf.jopt-simple" % "jopt-simple" % "4.9" // MIT
 

Reply via email to