imay commented on a change in pull request #3716:
URL: https://github.com/apache/incubator-doris/pull/3716#discussion_r432214780



##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +142,326 @@ public static String printBroker(String brokerName, 
TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    public static String readBrokerFile(String path, BrokerDesc brokerDesc) 
throws UserException {

Review comment:
       prefer return byte[] 

##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +142,326 @@ public static String printBroker(String brokerName, 
TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    public static String readBrokerFile(String path, BrokerDesc brokerDesc) 
throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, 
brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + 
path + ", broker=" + address
+                                                + ",msg=" + 
tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = 
tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path 
+ ", broker=" + address
+                                                + ", files num: " + 
fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + 
Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new 
TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, 
brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + 
path + ", broker=" + address
+                                                + ", msg=" + 
tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + 
", broker=" + address
+                                                + ", msg=" + 
tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            byte[] data = tReadResponse.getData();
+            return new String(data, "UTF-8");
+        } catch (TException | UnsupportedEncodingException e) {
+            String failMsg = "Broker read file exception. path=" + path + ", 
broker=" + address;
+            LOG.warn(failMsg, e);
+            throw new UserException(failMsg);
+        } finally {
+            // close reader
+            if (fd != null) {
+                failed = true;
+                TBrokerCloseReaderRequest tCloseReaderRequest = new 
TBrokerCloseReaderRequest(
+                        TBrokerVersion.VERSION_ONE, fd);
+                TBrokerOperationStatus tOperationStatus = null;
+                try {
+                    tOperationStatus = client.closeReader(tCloseReaderRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    try {
+                        tOperationStatus = 
client.closeReader(tCloseReaderRequest);
+                    } catch (TException ex) {
+                        LOG.warn("Broker close reader failed. path={}, 
address={}", path, address, ex);
+                    }
+                }
+                if (tOperationStatus == null || 
tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    LOG.warn("Broker close reader failed. path={}, address={}, 
error={}", path, address,
+                             tOperationStatus.getMessage());
+                } else {
+                    failed = false;
+                }
+            }
+
+            // return client
+            returnClient(client, address, failed);
+        }
+    }
+
+    public static void writeBrokerFile(byte[] data, String destFilePath, 
BrokerDesc brokerDesc) throws UserException {
+        BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc);
+        try {
+            writer.open();
+            ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+            writer.write(byteBuffer, data.length);
+        } finally {
+            writer.close();
+        }
+    }
+
+    public static void writeBrokerFile(String srcFilePath, String destFilePath,

Review comment:
       ```suggestion
       public static void writeFile(String srcFilePath, String destFilePath,
   ```

##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +142,326 @@ public static String printBroker(String brokerName, 
TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    public static String readBrokerFile(String path, BrokerDesc brokerDesc) 
throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, 
brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + 
path + ", broker=" + address
+                                                + ",msg=" + 
tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = 
tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path 
+ ", broker=" + address
+                                                + ", files num: " + 
fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + 
Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new 
TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, 
brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + 
path + ", broker=" + address
+                                                + ", msg=" + 
tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + 
", broker=" + address
+                                                + ", msg=" + 
tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            byte[] data = tReadResponse.getData();
+            return new String(data, "UTF-8");
+        } catch (TException | UnsupportedEncodingException e) {
+            String failMsg = "Broker read file exception. path=" + path + ", 
broker=" + address;
+            LOG.warn(failMsg, e);
+            throw new UserException(failMsg);
+        } finally {
+            // close reader
+            if (fd != null) {
+                failed = true;
+                TBrokerCloseReaderRequest tCloseReaderRequest = new 
TBrokerCloseReaderRequest(
+                        TBrokerVersion.VERSION_ONE, fd);
+                TBrokerOperationStatus tOperationStatus = null;
+                try {
+                    tOperationStatus = client.closeReader(tCloseReaderRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    try {
+                        tOperationStatus = 
client.closeReader(tCloseReaderRequest);
+                    } catch (TException ex) {
+                        LOG.warn("Broker close reader failed. path={}, 
address={}", path, address, ex);
+                    }
+                }
+                if (tOperationStatus == null || 
tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    LOG.warn("Broker close reader failed. path={}, address={}, 
error={}", path, address,
+                             tOperationStatus.getMessage());
+                } else {
+                    failed = false;
+                }
+            }
+
+            // return client
+            returnClient(client, address, failed);
+        }
+    }
+
+    public static void writeBrokerFile(byte[] data, String destFilePath, 
BrokerDesc brokerDesc) throws UserException {

Review comment:
       should add comment for public function to help other to use it.

##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +142,326 @@ public static String printBroker(String brokerName, 
TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    public static String readBrokerFile(String path, BrokerDesc brokerDesc) 
throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, 
brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + 
path + ", broker=" + address
+                                                + ",msg=" + 
tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = 
tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path 
+ ", broker=" + address
+                                                + ", files num: " + 
fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + 
Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new 
TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, 
brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + 
path + ", broker=" + address
+                                                + ", msg=" + 
tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != 
TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + 
", broker=" + address
+                                                + ", msg=" + 
tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            byte[] data = tReadResponse.getData();
+            return new String(data, "UTF-8");
+        } catch (TException | UnsupportedEncodingException e) {
+            String failMsg = "Broker read file exception. path=" + path + ", 
broker=" + address;
+            LOG.warn(failMsg, e);
+            throw new UserException(failMsg);
+        } finally {
+            // close reader
+            if (fd != null) {
+                failed = true;
+                TBrokerCloseReaderRequest tCloseReaderRequest = new 
TBrokerCloseReaderRequest(
+                        TBrokerVersion.VERSION_ONE, fd);
+                TBrokerOperationStatus tOperationStatus = null;
+                try {
+                    tOperationStatus = client.closeReader(tCloseReaderRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    try {
+                        tOperationStatus = 
client.closeReader(tCloseReaderRequest);
+                    } catch (TException ex) {
+                        LOG.warn("Broker close reader failed. path={}, 
address={}", path, address, ex);
+                    }
+                }
+                if (tOperationStatus == null || 
tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    LOG.warn("Broker close reader failed. path={}, address={}, 
error={}", path, address,
+                             tOperationStatus.getMessage());
+                } else {
+                    failed = false;
+                }
+            }
+
+            // return client
+            returnClient(client, address, failed);
+        }
+    }
+
+    public static void writeBrokerFile(byte[] data, String destFilePath, 
BrokerDesc brokerDesc) throws UserException {

Review comment:
       ```suggestion
       public static void writeFile(byte[] data, String destFilePath, 
BrokerDesc brokerDesc) throws UserException {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to