morningman commented on code in PR #17233:
URL: https://github.com/apache/doris/pull/17233#discussion_r1127254996


##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult 
executeMySqlLoadJobFromStmt(ConnectContext context, Load
                     
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                 }
             }
+        } catch (Throwable t) {
+            LOG.error("Execute mysql load failed", t);
+            // drain the data from client conn util empty packet received, 
otherwise the connection will be reset
+            if (loadContextMap.containsKey(loadId) && 
!loadContextMap.get(loadId).isFinished()) {
+                LOG.warn("not drained yet, try reading left data from client 
connection.");

Review Comment:
   print load id in log, or this log is meaningless



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult 
executeMySqlLoadJobFromStmt(ConnectContext context, Load
                     
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                 }
             }
+        } catch (Throwable t) {
+            LOG.error("Execute mysql load failed", t);
+            // drain the data from client conn util empty packet received, 
otherwise the connection will be reset
+            if (loadContextMap.containsKey(loadId) && 
!loadContextMap.get(loadId).isFinished()) {
+                LOG.warn("not drained yet, try reading left data from client 
connection.");
+                ByteBuffer buffer = context.getMysqlChannel().fetchOnePacket();
+                // MySql client will send an empty packet when eof
+                while (buffer != null && buffer.limit() != 0) {
+                    buffer = context.getMysqlChannel().fetchOnePacket();
+                }
+                LOG.warn("Finished reading the left bytes.");

Review Comment:
   should be debug?



##########
fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java:
##########
@@ -1008,6 +1009,9 @@ public void cancel() {
         if (coordRef != null) {
             coordRef.cancel();
         }
+        if (mysqlLoadId != null) {

Review Comment:
   Should we reset `mysqlLoadId` after job is cancelled or finished?



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult 
executeMySqlLoadJobFromStmt(ConnectContext context, Load
                     
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                 }
             }
+        } catch (Throwable t) {
+            LOG.error("Execute mysql load failed", t);
+            // drain the data from client conn util empty packet received, 
otherwise the connection will be reset

Review Comment:
   Do we have the same problem in normal mysql connection?



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -147,7 +244,11 @@ private void fillByteBufferAsync(ConnectContext context, 
ByteBufferNetworkInputS
                     inputStream.fillByteBuffer(buffer);
                     buffer = context.getMysqlChannel().fetchOnePacket();
                 }
+                if (loadContextMap.containsKey(loadId)) {
+                    loadContextMap.get(loadId).setFinished(true);
+                }
             } catch (IOException | InterruptedException e) {
+                LOG.warn("Failed fetch packet from mysql client", e);

Review Comment:
   print load id



##########
fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java:
##########
@@ -91,10 +143,51 @@ public LoadJobRowResult 
executeMySqlLoadJobFromStmt(ConnectContext context, Load
                     
loadResult.incSkipped(result.get("NumberFilteredRows").getAsInt());
                 }
             }
+        } catch (Throwable t) {
+            LOG.error("Execute mysql load failed", t);

Review Comment:
   ```suggestion
               LOG.warn("Execute mysql load {} failed", loadId, t);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to