This is an automated email from the ASF dual-hosted git repository. zjffdu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 518e98f [ZEPPELIN-5147]. ConcurrentModificationException in Note#toJson 518e98f is described below commit 518e98fd3c11e036f54813e8df26aafd69e32928 Author: Jeff Zhang <zjf...@apache.org> AuthorDate: Mon Nov 30 23:54:22 2020 +0800 [ZEPPELIN-5147]. ConcurrentModificationException in Note#toJson ### What is this PR for? The root cause is that when calling Note#toJson, note may be in change (adding/removing paragraph). In this PR, I change Note#paragraphs to CopyOnWriteArrayList, because adding/removing operation for paragraph is not a frequent operation, so it is ok for use CopyOnWriteArrayList here. Besides that, this PR also do some improvement on other parts, such as adding more logging. ### What type of PR is it? [Bug Fix | Improvement ] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5147 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <zjf...@apache.org> Closes #3985 from zjffdu/ZEPPELIN-5147 and squashes the following commits: 1bc9bbccd [Jeff Zhang] [ZEPPELIN-5147]. ConcurrentModificationException in Note#toJson (cherry picked from commit 2fe4c58e1bc287adc908fd24ba24e503f8e1a9f7) Signed-off-by: Jeff Zhang <zjf...@apache.org> --- .../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 4 +- .../java/org/apache/zeppelin/client/ZSession.java | 5 + .../zeppelin/scheduler/SchedulerFactory.java | 5 +- .../org/apache/zeppelin/rest/NotebookRestApi.java | 3 - .../exception/WebApplicationExceptionMapper.java | 7 +- .../apache/zeppelin/service/NotebookService.java | 28 ++-- .../java/org/apache/zeppelin/notebook/Note.java | 172 +++++++++------------ 7 files changed, 107 insertions(+), 117 deletions(-) diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java index 916fe5e..837de10 100644 --- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java +++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java @@ -779,7 +779,7 @@ public class JDBCInterpreter extends KerberosInterpreter { int updateCount = statement.getUpdateCount(); context.out.write("\n%text " + "Query executed successfully. Affected rows : " + - updateCount); + updateCount + "\n"); } } finally { if (resultSet != null) { @@ -1028,6 +1028,8 @@ public class JDBCInterpreter extends KerberosInterpreter { try { return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT)); } catch (Exception e) { + LOGGER.error("Fail to parse {} with value: {}", CONCURRENT_EXECUTION_COUNT, + getProperty(CONCURRENT_EXECUTION_COUNT)); return 10; } } diff --git a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java index 45bef26..f411ed6 100644 --- a/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java +++ b/zeppelin-client/src/main/java/org/apache/zeppelin/client/ZSession.java @@ -478,6 +478,11 @@ public class ZSession { return this; } + public Builder setMaxStatement(int maxStatement) { + this.maxStatement = maxStatement; + return this; + } + public ZSession build() throws Exception { return new ZSession(clientConfig, interpreter, intpProperties, maxStatement); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index 8e76c0f..2405780 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -52,7 +52,7 @@ public class SchedulerFactory { private SchedulerFactory() { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); int threadPoolSize = - zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE); + zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE); LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize); executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize); } @@ -73,6 +73,7 @@ public class SchedulerFactory { public Scheduler createOrGetFIFOScheduler(String name) { synchronized (schedulers) { if (!schedulers.containsKey(name)) { + LOGGER.info("Create FIFOScheduler: {}", name); FIFOScheduler s = new FIFOScheduler(name); schedulers.put(name, s); executor.execute(s); @@ -84,6 +85,7 @@ public class SchedulerFactory { public Scheduler createOrGetParallelScheduler(String name, int maxConcurrency) { synchronized (schedulers) { if (!schedulers.containsKey(name)) { + LOGGER.info("Create ParallelScheduler: {} with maxConcurrency: {}", name, maxConcurrency); ParallelScheduler s = new ParallelScheduler(name, maxConcurrency); schedulers.put(name, s); executor.execute(s); @@ -105,6 +107,7 @@ public class SchedulerFactory { public void removeScheduler(String name) { synchronized (schedulers) { + LOGGER.info("Remove scheduler: {}", name); Scheduler s = schedulers.remove(name); if (s != null) { s.stop(); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index b241138..82c77ed 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -541,9 +541,6 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getParagraph(@PathParam("noteId") String noteId, @PathParam("paragraphId") String paragraphId) throws IOException { - - LOGGER.info("Get paragraph {} {}", noteId, paragraphId); - Note note = notebook.getNote(noteId); checkIfNoteIsNotNull(note, noteId); checkIfUserCanRead(noteId, "Insufficient privileges you cannot get this paragraph"); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java index 5615f87..3a1cb1c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/exception/WebApplicationExceptionMapper.java @@ -25,15 +25,19 @@ import javax.ws.rs.ext.ExceptionMapper; import javax.ws.rs.ext.Provider; import org.apache.zeppelin.rest.message.gson.ExceptionSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Provider public class WebApplicationExceptionMapper implements ExceptionMapper<Throwable> { + private static final Logger LOGGER = LoggerFactory.getLogger(WebApplicationException.class); + private final Gson gson; public WebApplicationExceptionMapper() { GsonBuilder gsonBuilder = new GsonBuilder().enableComplexMapKeySerialization(); gsonBuilder.registerTypeHierarchyAdapter( - Exception.class, new ExceptionSerializer()); + Exception.class, new ExceptionSerializer()); this.gson = gsonBuilder.create(); } @@ -42,6 +46,7 @@ public class WebApplicationExceptionMapper implements ExceptionMapper<Throwable> if (exception instanceof WebApplicationException) { return ((WebApplicationException) exception).getResponse(); } else { + LOGGER.error("Error response", exception); return Response.status(500).entity(gson.toJson(exception)).build(); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java index 4a7db3f..c6f4124 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/NotebookService.java @@ -650,21 +650,23 @@ public class NotebookService { callback.onFailure(new NoteNotFoundException(noteId), context); throw new IOException("No such note"); } - if (note.getParagraphCount() < maxParagraph) { - return note.addNewParagraph(context.getAutheInfo()); - } else { - boolean removed = false; - for (int i = 1; i< note.getParagraphCount(); ++i) { - if (note.getParagraph(i).getStatus().isCompleted()) { - note.removeParagraph(context.getAutheInfo().getUser(), note.getParagraph(i).getId()); - removed = true; - break; + synchronized (this) { + if (note.getParagraphCount() < maxParagraph) { + return note.addNewParagraph(context.getAutheInfo()); + } else { + boolean removed = false; + for (int i = 1; i < note.getParagraphCount(); ++i) { + if (note.getParagraph(i).getStatus().isCompleted()) { + note.removeParagraph(context.getAutheInfo().getUser(), note.getParagraph(i).getId()); + removed = true; + break; + } } + if (!removed) { + throw new IOException("All the paragraphs are not completed, unable to find available paragraph"); + } + return note.addNewParagraph(context.getAutheInfo()); } - if (!removed) { - throw new IOException("All the paragraphs are not completed, unable to find available paragraph"); - } - return note.addNewParagraph(context.getAutheInfo()); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 08bad00..cae5c3f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -65,6 +65,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; /** * Represent the note of Zeppelin. All the note and its paragraph operations are done @@ -86,16 +87,16 @@ public class Note implements JsonSerializable { } }; private static final Gson GSON = new GsonBuilder() - .setPrettyPrinting() - .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") - .registerTypeAdapter(Date.class, new NotebookImportDeserializer()) - .registerTypeAdapterFactory(Input.TypeAdapterFactory) - .setExclusionStrategies(strategy) - .create(); + .setPrettyPrinting() + .setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + .registerTypeAdapter(Date.class, new NotebookImportDeserializer()) + .registerTypeAdapterFactory(Input.TypeAdapterFactory) + .setExclusionStrategies(strategy) + .create(); private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss"); - private List<Paragraph> paragraphs = new LinkedList<>(); + private CopyOnWriteArrayList<Paragraph> paragraphs = new CopyOnWriteArrayList<>(); private String name = ""; private String id; private String defaultInterpreterGroup; @@ -137,8 +138,8 @@ public class Note implements JsonSerializable { } public Note(String path, String defaultInterpreterGroup, InterpreterFactory factory, - InterpreterSettingManager interpreterSettingManager, ParagraphJobListener paragraphJobListener, - Credentials credentials, List<NoteEventListener> noteEventListener) { + InterpreterSettingManager interpreterSettingManager, ParagraphJobListener paragraphJobListener, + Credentials credentials, List<NoteEventListener> noteEventListener) { setPath(path); this.defaultInterpreterGroup = defaultInterpreterGroup; this.interpreterFactory = factory; @@ -268,7 +269,7 @@ public class Note implements JsonSerializable { public String getDefaultInterpreterGroup() { if (StringUtils.isBlank(defaultInterpreterGroup)) { defaultInterpreterGroup = ZeppelinConfiguration.create() - .getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT); + .getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_DEFAULT); } return defaultInterpreterGroup; } @@ -399,8 +400,8 @@ public class Note implements JsonSerializable { continue; } if (StringUtils.equals(noteId, angularObject.getNoteId()) - && StringUtils.equals(paragraphId, angularObject.getParagraphId()) - && StringUtils.equals(name, angularObject.getName())) { + && StringUtils.equals(paragraphId, angularObject.getParagraphId()) + && StringUtils.equals(name, angularObject.getName())) { iter.remove(); } } @@ -438,8 +439,8 @@ public class Note implements JsonSerializable { continue; } if (StringUtils.equals(noteId, noteIdCandidate) - && StringUtils.equals(paragraphId, paragraphIdCandidate) - && StringUtils.equals(name, nameCandidate)) { + && StringUtils.equals(paragraphId, paragraphIdCandidate) + && StringUtils.equals(name, nameCandidate)) { iter.remove(); } } @@ -487,9 +488,7 @@ public class Note implements JsonSerializable { LOGGER.warn("Paragraph {} has a result with exception. {}", srcParagraph.getId(), e.getMessage()); } - synchronized (paragraphs) { - paragraphs.add(newParagraph); - } + paragraphs.add(newParagraph); try { fireParagraphCreateEvent(newParagraph); @@ -529,7 +528,7 @@ public class Note implements JsonSerializable { // Set the default parameter configuration for the paragraph // based on `interpreter-setting.json` config Map<String, Object> config = - interpreterSettingManager.getConfigSetting(defaultInterpreterGroup); + interpreterSettingManager.getConfigSetting(defaultInterpreterGroup); paragraph.setConfig(config); } paragraph.setAuthenticationInfo(authenticationInfo); @@ -543,9 +542,7 @@ public class Note implements JsonSerializable { } private void insertParagraph(Paragraph paragraph, int index) { - synchronized (paragraphs) { - paragraphs.add(index, paragraph); - } + paragraphs.add(index, paragraph); try { fireParagraphCreateEvent(paragraph); } catch (IOException e) { @@ -562,19 +559,15 @@ public class Note implements JsonSerializable { public Paragraph removeParagraph(String user, String paragraphId) { removeAllAngularObjectInParagraph(user, paragraphId); interpreterSettingManager.removeResourcesBelongsToParagraph(getId(), paragraphId); - synchronized (paragraphs) { - Iterator<Paragraph> i = paragraphs.iterator(); - while (i.hasNext()) { - Paragraph p = i.next(); - if (p.getId().equals(paragraphId)) { - i.remove(); - try { - fireParagraphRemoveEvent(p); - } catch (IOException e) { - e.printStackTrace(); - } - return p; + for (Paragraph p : paragraphs) { + if (p.getId().equals(paragraphId)) { + paragraphs.remove(p); + try { + fireParagraphRemoveEvent(p); + } catch (IOException e) { + LOGGER.error("Fail to fire ParagraphRemoveEvent", e); } + return p; } } return null; @@ -587,16 +580,14 @@ public class Note implements JsonSerializable { } public Paragraph clearPersonalizedParagraphOutput(String paragraphId, String user) { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - if (!p.getId().equals(paragraphId)) { - continue; - } - - p = p.getUserParagraphMap().get(user); - clearParagraphOutputFields(p); - return p; + for (Paragraph p : paragraphs) { + if (!p.getId().equals(paragraphId)) { + continue; } + + p = p.getUserParagraphMap().get(user); + clearParagraphOutputFields(p); + return p; } return null; } @@ -608,15 +599,13 @@ public class Note implements JsonSerializable { * @return Paragraph */ public Paragraph clearParagraphOutput(String paragraphId) { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - if (!p.getId().equals(paragraphId)) { - continue; - } - - clearParagraphOutputFields(p); - return p; + for (Paragraph p : paragraphs) { + if (!p.getId().equals(paragraphId)) { + continue; } + + clearParagraphOutputFields(p); + return p; } return null; } @@ -625,10 +614,8 @@ public class Note implements JsonSerializable { * Clear all paragraph output of note */ public void clearAllParagraphOutput() { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - p.setReturn(null, null); - } + for (Paragraph p : paragraphs) { + p.setReturn(null, null); } } @@ -651,41 +638,37 @@ public class Note implements JsonSerializable { * when index is out of bound */ public void moveParagraph(String paragraphId, int index, boolean throwWhenIndexIsOutOfBound) { - synchronized (paragraphs) { - int oldIndex; - Paragraph p = null; - - if (index < 0 || index >= paragraphs.size()) { - if (throwWhenIndexIsOutOfBound) { - throw new IndexOutOfBoundsException( - "paragraph size is " + paragraphs.size() + " , index is " + index); - } else { - return; - } + int oldIndex; + Paragraph p = null; + + if (index < 0 || index >= paragraphs.size()) { + if (throwWhenIndexIsOutOfBound) { + throw new IndexOutOfBoundsException( + "paragraph size is " + paragraphs.size() + " , index is " + index); + } else { + return; } + } - for (int i = 0; i < paragraphs.size(); i++) { - if (paragraphs.get(i).getId().equals(paragraphId)) { - oldIndex = i; - if (oldIndex == index) { - return; - } - p = paragraphs.remove(i); + for (int i = 0; i < paragraphs.size(); i++) { + if (paragraphs.get(i).getId().equals(paragraphId)) { + oldIndex = i; + if (oldIndex == index) { + return; } + p = paragraphs.remove(i); } + } - if (p != null) { - paragraphs.add(index, p); - } + if (p != null) { + paragraphs.add(index, p); } } public boolean isLastParagraph(String paragraphId) { if (!paragraphs.isEmpty()) { - synchronized (paragraphs) { - if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) { - return true; - } + if (paragraphId.equals(paragraphs.get(paragraphs.size() - 1).getId())) { + return true; } return false; } @@ -698,11 +681,9 @@ public class Note implements JsonSerializable { } public Paragraph getParagraph(String paragraphId) { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - if (p.getId().equals(paragraphId)) { - return p; - } + for (Paragraph p : paragraphs) { + if (p.getId().equals(paragraphId)) { + return p; } } return null; @@ -713,9 +694,7 @@ public class Note implements JsonSerializable { } public Paragraph getLastParagraph() { - synchronized (paragraphs) { - return paragraphs.get(paragraphs.size() - 1); - } + return paragraphs.get(paragraphs.size() - 1); } private void setParagraphMagic(Paragraph p, int index) { @@ -876,15 +855,12 @@ public class Note implements JsonSerializable { * Return true if there is a running or pending paragraph */ public boolean haveRunningOrPendingParagraphs() { - synchronized (paragraphs) { - for (Paragraph p : paragraphs) { - Status status = p.getStatus(); - if (status.isRunning() || status.isPending()) { - return true; - } + for (Paragraph p : paragraphs) { + Status status = p.getStatus(); + if (status.isRunning() || status.isPending()) { + return true; } } - return false; } @@ -902,8 +878,8 @@ public class Note implements JsonSerializable { return p.completion(buffer, cursor); } - public List<Paragraph> getParagraphs() { - return new ArrayList<>(this.paragraphs); + public CopyOnWriteArrayList<Paragraph> getParagraphs() { + return this.paragraphs; } // TODO(zjffdu) how does this used ? @@ -948,7 +924,7 @@ public class Note implements JsonSerializable { if (appStates != null) { for (ApplicationState app : appStates) { ((RemoteAngularObjectRegistry) registry) - .removeAllAndNotifyRemoteProcess(id, app.getId()); + .removeAllAndNotifyRemoteProcess(id, app.getId()); } } } else { @@ -1190,7 +1166,7 @@ public class Note implements JsonSerializable { return false; } if (angularObjects != null ? - !angularObjects.equals(note.angularObjects) : note.angularObjects != null) { + !angularObjects.equals(note.angularObjects) : note.angularObjects != null) { return false; } if (config != null ? !config.equals(note.config) : note.config != null) {