This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 518e98f  [ZEPPELIN-5147]. ConcurrentModificationException in 
Note#toJson
518e98f is described below

commit 518e98fd3c11e036f54813e8df26aafd69e32928
Author: Jeff Zhang <zjf...@apache.org>
AuthorDate: Mon Nov 30 23:54:22 2020 +0800

    [ZEPPELIN-5147]. ConcurrentModificationException in Note#toJson
    
    ### What is this PR for?
    
    The root cause is that when calling Note#toJson, note may be in change 
(adding/removing paragraph). In this PR, I change Note#paragraphs to 
CopyOnWriteArrayList, because adding/removing operation for paragraph is not a 
frequent operation, so it is ok for use CopyOnWriteArrayList here.
    Besides that, this PR also do some improvement on other parts, such as 
adding more logging.
    
    ### What type of PR is it?
    [Bug Fix | Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5147
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zjf...@apache.org>
    
    Closes #3985 from zjffdu/ZEPPELIN-5147 and squashes the following commits:
    
    1bc9bbccd [Jeff Zhang] [ZEPPELIN-5147]. ConcurrentModificationException in 
Note#toJson
    
    (cherry picked from commit 2fe4c58e1bc287adc908fd24ba24e503f8e1a9f7)
    Signed-off-by: Jeff Zhang <zjf...@apache.org>
---
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  |   4 +-
 .../java/org/apache/zeppelin/client/ZSession.java  |   5 +
 .../zeppelin/scheduler/SchedulerFactory.java       |   5 +-
 .../org/apache/zeppelin/rest/NotebookRestApi.java  |   3 -
 .../exception/WebApplicationExceptionMapper.java   |   7 +-
 .../apache/zeppelin/service/NotebookService.java   |  28 ++--
 .../java/org/apache/zeppelin/notebook/Note.java    | 172 +++++++++------------
 7 files changed, 107 insertions(+), 117 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java 
b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 916fe5e..837de10 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -779,7 +779,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
             int updateCount = statement.getUpdateCount();
             context.out.write("\n%text " +
                 "Query executed successfully. Affected rows : " +
-                    updateCount);
+                    updateCount + "\n");
           }
         } finally {
           if (resultSet != null) {
@@ -1028,6 +1028,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
     try {
       return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT));
     } catch (Exception e) {
+      LOGGER.error("Fail to parse {} with value: {}", 
CONCURRENT_EXECUTION_COUNT,
+              getProperty(CONCURRENT_EXECUTION_COUNT));
       return 10;
     }
   }
diff --git 
a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java 
b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
index 45bef26..f411ed6 100644
--- a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
+++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java
@@ -478,6 +478,11 @@ public class ZSession {
       return this;
     }
 
+    public Builder setMaxStatement(int maxStatement) {
+      this.maxStatement = maxStatement;
+      return this;
+    }
+
     public ZSession build() throws Exception {
       return new ZSession(clientConfig, interpreter, intpProperties, 
maxStatement);
     }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
index 8e76c0f..2405780 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java
@@ -52,7 +52,7 @@ public class SchedulerFactory {
   private SchedulerFactory() {
     ZeppelinConfiguration zConf = ZeppelinConfiguration.create();
     int threadPoolSize =
-        
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
+            
zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE);
     LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize);
     executor = 
ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, 
threadPoolSize);
   }
@@ -73,6 +73,7 @@ public class SchedulerFactory {
   public Scheduler createOrGetFIFOScheduler(String name) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(name)) {
+        LOGGER.info("Create FIFOScheduler: {}", name);
         FIFOScheduler s = new FIFOScheduler(name);
         schedulers.put(name, s);
         executor.execute(s);
@@ -84,6 +85,7 @@ public class SchedulerFactory {
   public Scheduler createOrGetParallelScheduler(String name, int 
maxConcurrency) {
     synchronized (schedulers) {
       if (!schedulers.containsKey(name)) {
+        LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", 
name, maxConcurrency);
         ParallelScheduler s = new ParallelScheduler(name, maxConcurrency);
         schedulers.put(name, s);
         executor.execute(s);
@@ -105,6 +107,7 @@ public class SchedulerFactory {
 
   public void removeScheduler(String name) {
     synchronized (schedulers) {
+      LOGGER.info("Remove scheduler: {}", name);
       Scheduler s = schedulers.remove(name);
       if (s != null) {
         s.stop();
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index b241138..82c77ed 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -541,9 +541,6 @@ public class NotebookRestApi extends AbstractRestApi {
   @ZeppelinApi
   public Response getParagraph(@PathParam("noteId") String noteId,
                                @PathParam("paragraphId") String paragraphId) 
throws IOException {
-
-    LOGGER.info("Get paragraph {} {}", noteId, paragraphId);
-
     Note note = notebook.getNote(noteId);
     checkIfNoteIsNotNull(note, noteId);
     checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this 
paragraph");
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
index 5615f87..3a1cb1c 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java
@@ -25,15 +25,19 @@ import javax.ws.rs.ext.ExceptionMapper;
 import javax.ws.rs.ext.Provider;
 
 import org.apache.zeppelin.rest.message.gson.ExceptionSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Provider
 public class WebApplicationExceptionMapper implements 
ExceptionMapper<Throwable> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WebApplicationException.class);
+
   private final Gson gson;
 
   public WebApplicationExceptionMapper() {
     GsonBuilder gsonBuilder = new 
GsonBuilder().enableComplexMapKeySerialization();
     gsonBuilder.registerTypeHierarchyAdapter(
-        Exception.class, new ExceptionSerializer());
+            Exception.class, new ExceptionSerializer());
     this.gson = gsonBuilder.create();
   }
 
@@ -42,6 +46,7 @@ public class WebApplicationExceptionMapper implements 
ExceptionMapper<Throwable>
     if (exception instanceof WebApplicationException) {
       return ((WebApplicationException) exception).getResponse();
     } else {
+      LOGGER.error("Error response", exception);
       return Response.status(500).entity(gson.toJson(exception)).build();
     }
   }
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
 
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
index 4a7db3f..c6f4124 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java
@@ -650,21 +650,23 @@ public class NotebookService {
       callback.onFailure(new NoteNotFoundException(noteId), context);
       throw new IOException("No such note");
     }
-    if (note.getParagraphCount() < maxParagraph) {
-      return note.addNewParagraph(context.getAutheInfo());
-    } else {
-      boolean removed = false;
-      for (int i = 1; i< note.getParagraphCount(); ++i) {
-        if (note.getParagraph(i).getStatus().isCompleted()) {
-          note.removeParagraph(context.getAutheInfo().getUser(), 
note.getParagraph(i).getId());
-          removed = true;
-          break;
+    synchronized (this) {
+      if (note.getParagraphCount() < maxParagraph) {
+        return note.addNewParagraph(context.getAutheInfo());
+      } else {
+        boolean removed = false;
+        for (int i = 1; i < note.getParagraphCount(); ++i) {
+          if (note.getParagraph(i).getStatus().isCompleted()) {
+            note.removeParagraph(context.getAutheInfo().getUser(), 
note.getParagraph(i).getId());
+            removed = true;
+            break;
+          }
         }
+        if (!removed) {
+          throw new IOException("All the paragraphs are not completed, unable 
to find available paragraph");
+        }
+        return note.addNewParagraph(context.getAutheInfo());
       }
-      if (!removed) {
-        throw new IOException("All the paragraphs are not completed, unable to 
find available paragraph");
-      }
-      return note.addNewParagraph(context.getAutheInfo());
     }
   }
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index 08bad00..cae5c3f 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -65,6 +65,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
  * Represent the note of Zeppelin. All the note and its paragraph operations 
are done
@@ -86,16 +87,16 @@ public class Note implements JsonSerializable {
     }
   };
   private static final Gson GSON = new GsonBuilder()
-      .setPrettyPrinting()
-      .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
-      .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
-      .registerTypeAdapterFactory(Input.TypeAdapterFactory)
-      .setExclusionStrategies(strategy)
-      .create();
+          .setPrettyPrinting()
+          .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
+          .registerTypeAdapter(Date.class, new NotebookImportDeserializer())
+          .registerTypeAdapterFactory(Input.TypeAdapterFactory)
+          .setExclusionStrategies(strategy)
+          .create();
   private static final DateTimeFormatter DATE_TIME_FORMATTER =
           DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss");
 
-  private List<Paragraph> paragraphs = new LinkedList<>();
+  private CopyOnWriteArrayList<Paragraph> paragraphs = new 
CopyOnWriteArrayList<>();
   private String name = "";
   private String id;
   private String defaultInterpreterGroup;
@@ -137,8 +138,8 @@ public class Note implements JsonSerializable {
   }
 
   public Note(String path, String defaultInterpreterGroup, InterpreterFactory 
factory,
-      InterpreterSettingManager interpreterSettingManager, 
ParagraphJobListener paragraphJobListener,
-      Credentials credentials, List<NoteEventListener> noteEventListener) {
+              InterpreterSettingManager interpreterSettingManager, 
ParagraphJobListener paragraphJobListener,
+              Credentials credentials, List<NoteEventListener> 
noteEventListener) {
     setPath(path);
     this.defaultInterpreterGroup = defaultInterpreterGroup;
     this.interpreterFactory = factory;
@@ -268,7 +269,7 @@ public class Note implements JsonSerializable {
   public String getDefaultInterpreterGroup() {
     if (StringUtils.isBlank(defaultInterpreterGroup)) {
       defaultInterpreterGroup = ZeppelinConfiguration.create()
-          
.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
+              
.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT);
     }
     return defaultInterpreterGroup;
   }
@@ -399,8 +400,8 @@ public class Note implements JsonSerializable {
           continue;
         }
         if (StringUtils.equals(noteId, angularObject.getNoteId())
-            && StringUtils.equals(paragraphId, angularObject.getParagraphId())
-            && StringUtils.equals(name, angularObject.getName())) {
+                && StringUtils.equals(paragraphId, 
angularObject.getParagraphId())
+                && StringUtils.equals(name, angularObject.getName())) {
           iter.remove();
         }
       }
@@ -438,8 +439,8 @@ public class Note implements JsonSerializable {
           continue;
         }
         if (StringUtils.equals(noteId, noteIdCandidate)
-            && StringUtils.equals(paragraphId, paragraphIdCandidate)
-            && StringUtils.equals(name, nameCandidate)) {
+                && StringUtils.equals(paragraphId, paragraphIdCandidate)
+                && StringUtils.equals(name, nameCandidate)) {
           iter.remove();
         }
       }
@@ -487,9 +488,7 @@ public class Note implements JsonSerializable {
       LOGGER.warn("Paragraph {} has a result with exception. {}", 
srcParagraph.getId(), e.getMessage());
     }
 
-    synchronized (paragraphs) {
-      paragraphs.add(newParagraph);
-    }
+    paragraphs.add(newParagraph);
 
     try {
       fireParagraphCreateEvent(newParagraph);
@@ -529,7 +528,7 @@ public class Note implements JsonSerializable {
       // Set the default parameter configuration for the paragraph
       // based on `interpreter-setting.json` config
       Map<String, Object> config =
-          interpreterSettingManager.getConfigSetting(defaultInterpreterGroup);
+              
interpreterSettingManager.getConfigSetting(defaultInterpreterGroup);
       paragraph.setConfig(config);
     }
     paragraph.setAuthenticationInfo(authenticationInfo);
@@ -543,9 +542,7 @@ public class Note implements JsonSerializable {
   }
 
   private void insertParagraph(Paragraph paragraph, int index) {
-    synchronized (paragraphs) {
-      paragraphs.add(index, paragraph);
-    }
+    paragraphs.add(index, paragraph);
     try {
       fireParagraphCreateEvent(paragraph);
     } catch (IOException e) {
@@ -562,19 +559,15 @@ public class Note implements JsonSerializable {
   public Paragraph removeParagraph(String user, String paragraphId) {
     removeAllAngularObjectInParagraph(user, paragraphId);
     interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), 
paragraphId);
-    synchronized (paragraphs) {
-      Iterator<Paragraph> i = paragraphs.iterator();
-      while (i.hasNext()) {
-        Paragraph p = i.next();
-        if (p.getId().equals(paragraphId)) {
-          i.remove();
-          try {
-            fireParagraphRemoveEvent(p);
-          } catch (IOException e) {
-            e.printStackTrace();
-          }
-          return p;
+    for (Paragraph p : paragraphs) {
+      if (p.getId().equals(paragraphId)) {
+        paragraphs.remove(p);
+        try {
+          fireParagraphRemoveEvent(p);
+        } catch (IOException e) {
+          LOGGER.error("Fail to fire ParagraphRemoveEvent", e);
         }
+        return p;
       }
     }
     return null;
@@ -587,16 +580,14 @@ public class Note implements JsonSerializable {
   }
 
   public Paragraph clearPersonalizedParagraphOutput(String paragraphId, String 
user) {
-    synchronized (paragraphs) {
-      for (Paragraph p : paragraphs) {
-        if (!p.getId().equals(paragraphId)) {
-          continue;
-        }
-
-        p = p.getUserParagraphMap().get(user);
-        clearParagraphOutputFields(p);
-        return p;
+    for (Paragraph p : paragraphs) {
+      if (!p.getId().equals(paragraphId)) {
+        continue;
       }
+
+      p = p.getUserParagraphMap().get(user);
+      clearParagraphOutputFields(p);
+      return p;
     }
     return null;
   }
@@ -608,15 +599,13 @@ public class Note implements JsonSerializable {
    * @return Paragraph
    */
   public Paragraph clearParagraphOutput(String paragraphId) {
-    synchronized (paragraphs) {
-      for (Paragraph p : paragraphs) {
-        if (!p.getId().equals(paragraphId)) {
-          continue;
-        }
-
-        clearParagraphOutputFields(p);
-        return p;
+    for (Paragraph p : paragraphs) {
+      if (!p.getId().equals(paragraphId)) {
+        continue;
       }
+
+      clearParagraphOutputFields(p);
+      return p;
     }
     return null;
   }
@@ -625,10 +614,8 @@ public class Note implements JsonSerializable {
    * Clear all paragraph output of note
    */
   public void clearAllParagraphOutput() {
-    synchronized (paragraphs) {
-      for (Paragraph p : paragraphs) {
-        p.setReturn(null, null);
-      }
+    for (Paragraph p : paragraphs) {
+      p.setReturn(null, null);
     }
   }
 
@@ -651,41 +638,37 @@ public class Note implements JsonSerializable {
    *                                   when index is out of bound
    */
   public void moveParagraph(String paragraphId, int index, boolean 
throwWhenIndexIsOutOfBound) {
-    synchronized (paragraphs) {
-      int oldIndex;
-      Paragraph p = null;
-
-      if (index < 0 || index >= paragraphs.size()) {
-        if (throwWhenIndexIsOutOfBound) {
-          throw new IndexOutOfBoundsException(
-              "paragraph size is " + paragraphs.size() + " , index is " + 
index);
-        } else {
-          return;
-        }
+    int oldIndex;
+    Paragraph p = null;
+
+    if (index < 0 || index >= paragraphs.size()) {
+      if (throwWhenIndexIsOutOfBound) {
+        throw new IndexOutOfBoundsException(
+                "paragraph size is " + paragraphs.size() + " , index is " + 
index);
+      } else {
+        return;
       }
+    }
 
-      for (int i = 0; i < paragraphs.size(); i++) {
-        if (paragraphs.get(i).getId().equals(paragraphId)) {
-          oldIndex = i;
-          if (oldIndex == index) {
-            return;
-          }
-          p = paragraphs.remove(i);
+    for (int i = 0; i < paragraphs.size(); i++) {
+      if (paragraphs.get(i).getId().equals(paragraphId)) {
+        oldIndex = i;
+        if (oldIndex == index) {
+          return;
         }
+        p = paragraphs.remove(i);
       }
+    }
 
-      if (p != null) {
-        paragraphs.add(index, p);
-      }
+    if (p != null) {
+      paragraphs.add(index, p);
     }
   }
 
   public boolean isLastParagraph(String paragraphId) {
     if (!paragraphs.isEmpty()) {
-      synchronized (paragraphs) {
-        if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) 
{
-          return true;
-        }
+      if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) {
+        return true;
       }
       return false;
     }
@@ -698,11 +681,9 @@ public class Note implements JsonSerializable {
   }
 
   public Paragraph getParagraph(String paragraphId) {
-    synchronized (paragraphs) {
-      for (Paragraph p : paragraphs) {
-        if (p.getId().equals(paragraphId)) {
-          return p;
-        }
+    for (Paragraph p : paragraphs) {
+      if (p.getId().equals(paragraphId)) {
+        return p;
       }
     }
     return null;
@@ -713,9 +694,7 @@ public class Note implements JsonSerializable {
   }
 
   public Paragraph getLastParagraph() {
-    synchronized (paragraphs) {
-      return paragraphs.get(paragraphs.size() - 1);
-    }
+    return paragraphs.get(paragraphs.size() - 1);
   }
 
   private void setParagraphMagic(Paragraph p, int index) {
@@ -876,15 +855,12 @@ public class Note implements JsonSerializable {
    * Return true if there is a running or pending paragraph
    */
   public boolean haveRunningOrPendingParagraphs() {
-    synchronized (paragraphs) {
-      for (Paragraph p : paragraphs) {
-        Status status = p.getStatus();
-        if (status.isRunning() || status.isPending()) {
-          return true;
-        }
+    for (Paragraph p : paragraphs) {
+      Status status = p.getStatus();
+      if (status.isRunning() || status.isPending()) {
+        return true;
       }
     }
-
     return false;
   }
 
@@ -902,8 +878,8 @@ public class Note implements JsonSerializable {
     return p.completion(buffer, cursor);
   }
 
-  public List<Paragraph> getParagraphs() {
-    return new ArrayList<>(this.paragraphs);
+  public CopyOnWriteArrayList<Paragraph> getParagraphs() {
+    return this.paragraphs;
   }
 
   // TODO(zjffdu) how does this used ?
@@ -948,7 +924,7 @@ public class Note implements JsonSerializable {
         if (appStates != null) {
           for (ApplicationState app : appStates) {
             ((RemoteAngularObjectRegistry) registry)
-                .removeAllAndNotifyRemoteProcess(id, app.getId());
+                    .removeAllAndNotifyRemoteProcess(id, app.getId());
           }
         }
       } else {
@@ -1190,7 +1166,7 @@ public class Note implements JsonSerializable {
       return false;
     }
     if (angularObjects != null ?
-        !angularObjects.equals(note.angularObjects) : note.angularObjects != 
null) {
+            !angularObjects.equals(note.angularObjects) : note.angularObjects 
!= null) {
       return false;
     }
     if (config != null ? !config.equals(note.config) : note.config != null) {

Reply via email to