morningman commented on code in PR #17233: URL: https://github.com/apache/doris/pull/17233#discussion_r1127254996
########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -91,10 +143,51 @@ public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, Load loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } } + } catch (Throwable t) { + LOG.error("Execute mysql load failed", t); + // drain the data from client conn util empty packet received, otherwise the connection will be reset + if (loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) { + LOG.warn("not drained yet, try reading left data from client connection."); Review Comment: print load id in log, or this log is meaningless ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -91,10 +143,51 @@ public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, Load loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } } + } catch (Throwable t) { + LOG.error("Execute mysql load failed", t); + // drain the data from client conn util empty packet received, otherwise the connection will be reset + if (loadContextMap.containsKey(loadId) && !loadContextMap.get(loadId).isFinished()) { + LOG.warn("not drained yet, try reading left data from client connection."); + ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket(); + // MySql client will send an empty packet when eof + while (buffer != null && buffer.limit() != 0) { + buffer = context.getMysqlChannel().fetchOnePacket(); + } + LOG.warn("Finished reading the left bytes."); Review Comment: should be debug? ########## fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java: ########## @@ -1008,6 +1009,9 @@ public void cancel() { if (coordRef != null) { coordRef.cancel(); } + if (mysqlLoadId != null) { Review Comment: Should we reset `mysqlLoadId` after job is cancelled or finished? ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -91,10 +143,51 @@ public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, Load loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } } + } catch (Throwable t) { + LOG.error("Execute mysql load failed", t); + // drain the data from client conn util empty packet received, otherwise the connection will be reset Review Comment: Do we have the same problem in normal mysql connection? ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -147,7 +244,11 @@ private void fillByteBufferAsync(ConnectContext context, ByteBufferNetworkInputS inputStream.fillByteBuffer(buffer); buffer = context.getMysqlChannel().fetchOnePacket(); } + if (loadContextMap.containsKey(loadId)) { + loadContextMap.get(loadId).setFinished(true); + } } catch (IOException | InterruptedException e) { + LOG.warn("Failed fetch packet from mysql client", e); Review Comment: print load id ########## fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java: ########## @@ -91,10 +143,51 @@ public LoadJobRowResult executeMySqlLoadJobFromStmt(ConnectContext context, Load loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt()); } } + } catch (Throwable t) { + LOG.error("Execute mysql load failed", t); Review Comment: ```suggestion LOG.warn("Execute mysql load {} failed", loadId, t); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org