morningman commented on code in PR #48036:
URL: https://github.com/apache/doris/pull/48036#discussion_r1971180401


##########
be/src/vec/exec/format/parquet/parquet_thrift_util.h:
##########
@@ -36,26 +36,62 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 
'R', '1'};
 constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
 constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k
 
-static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** 
file_metadata,
-                                  size_t* meta_size, io::IOContext* io_ctx) {
+static Status read_thrift_footer(io::FileReaderSPtr file, 
std::vector<uint8_t>& footer,
+                                 size_t bytes_read, io::IOContext* io_ctx) {
     size_t file_size = file->size();
-    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
-    std::vector<uint8_t> footer(bytes_read);
-    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), 
bytes_read),
-                                  &bytes_read, io_ctx));
+    bytes_read = std::min(file_size, INIT_META_SIZE);
+    footer.reserve(bytes_read);
+    return file->read_at(file_size - bytes_read, Slice(footer.data(), 
bytes_read), &bytes_read,
+                         io_ctx);
+}
 
-    // validate magic
-    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
-    if (bytes_read < PARQUET_FOOTER_SIZE ||
-        memcmp(magic_ptr, PARQUET_VERSION_NUMBER, 
sizeof(PARQUET_VERSION_NUMBER)) != 0) {
-        return Status::Corruption(
-                "Invalid magic number in parquet file, bytes read: {}, file 
size: {}, path: {}, "
-                "read magic: {}",
-                bytes_read, file_size, file->path().native(),
-                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
+static bool validate_magic_number(const std::vector<uint8_t>& footer, size_t 
bytes_read) {
+    if (footer.size() < PARQUET_FOOTER_SIZE) {
+        return false;
+    }
+    const uint8_t* magic_ptr = footer.data() + bytes_read - 4;
+    return memcmp(magic_ptr, PARQUET_VERSION_NUMBER, 
sizeof(PARQUET_VERSION_NUMBER)) == 0;
+}
+
+static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** 
file_metadata,
+                                  size_t* meta_size, io::IOContext* io_ctx) {
+    std::vector<uint8_t> footer;
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(read_thrift_footer(file, footer, bytes_read, io_ctx));
+    size_t before_file_size = file->size();
+    if (!validate_magic_number(footer, bytes_read)) {
+        // try to get file length from filesystem and validate magic again
+        Status st = file->update_size();
+        if (st.ok()) {
+            size_t file_size = file->size();
+            LOG(WARNING) << "File length changed before reading footer, path: "
+                         << file->path().native() << ", before: " << 
before_file_size
+                         << ", after: " << file_size;
+            bytes_read = std::min(file_size, INIT_META_SIZE);

Review Comment:
   Looks like here you can call `read_thrift_footer()` again?



##########
be/src/io/fs/file_reader.h:
##########
@@ -88,6 +88,8 @@ class FileReader : public doris::ProfileCollector {
 
     virtual const std::string& get_data_dir_path() { return 
VIRTUAL_REMOTE_DATA_DIR; }
 
+    virtual Status update_size() { return Status::NotSupported("update_size is 
not supported"); }

Review Comment:
   I think we can leave the default implements as empty? just return 
`Status::OK()`.
   Because i am afraid that some new file reader may forget to implement this 
method and will return error.
   Leave it as empty can at least remain the behavior unchanged.



##########
be/src/vec/exec/format/parquet/parquet_thrift_util.h:
##########
@@ -36,26 +36,62 @@ constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 
'R', '1'};
 constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
 constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k
 
-static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** 
file_metadata,
-                                  size_t* meta_size, io::IOContext* io_ctx) {
+static Status read_thrift_footer(io::FileReaderSPtr file, 
std::vector<uint8_t>& footer,
+                                 size_t bytes_read, io::IOContext* io_ctx) {
     size_t file_size = file->size();
-    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
-    std::vector<uint8_t> footer(bytes_read);
-    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), 
bytes_read),
-                                  &bytes_read, io_ctx));
+    bytes_read = std::min(file_size, INIT_META_SIZE);
+    footer.reserve(bytes_read);
+    return file->read_at(file_size - bytes_read, Slice(footer.data(), 
bytes_read), &bytes_read,
+                         io_ctx);
+}
 
-    // validate magic
-    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
-    if (bytes_read < PARQUET_FOOTER_SIZE ||
-        memcmp(magic_ptr, PARQUET_VERSION_NUMBER, 
sizeof(PARQUET_VERSION_NUMBER)) != 0) {
-        return Status::Corruption(
-                "Invalid magic number in parquet file, bytes read: {}, file 
size: {}, path: {}, "
-                "read magic: {}",
-                bytes_read, file_size, file->path().native(),
-                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
+static bool validate_magic_number(const std::vector<uint8_t>& footer, size_t 
bytes_read) {
+    if (footer.size() < PARQUET_FOOTER_SIZE) {
+        return false;
+    }
+    const uint8_t* magic_ptr = footer.data() + bytes_read - 4;
+    return memcmp(magic_ptr, PARQUET_VERSION_NUMBER, 
sizeof(PARQUET_VERSION_NUMBER)) == 0;
+}
+
+static Status parse_thrift_footer(io::FileReaderSPtr file, FileMetaData** 
file_metadata,
+                                  size_t* meta_size, io::IOContext* io_ctx) {
+    std::vector<uint8_t> footer;
+    size_t bytes_read = 0;
+    RETURN_IF_ERROR(read_thrift_footer(file, footer, bytes_read, io_ctx));
+    size_t before_file_size = file->size();
+    if (!validate_magic_number(footer, bytes_read)) {
+        // try to get file length from filesystem and validate magic again
+        Status st = file->update_size();
+        if (st.ok()) {
+            size_t file_size = file->size();
+            LOG(WARNING) << "File length changed before reading footer, path: "
+                         << file->path().native() << ", before: " << 
before_file_size
+                         << ", after: " << file_size;
+            bytes_read = std::min(file_size, INIT_META_SIZE);
+            RETURN_IF_ERROR(file->read_at(file_size - bytes_read, 
Slice(footer.data(), bytes_read),
+                                          &bytes_read, io_ctx));
+            if (!validate_magic_number(footer, bytes_read)) {
+                return Status::Corruption(
+                        "Invalid magic number in parquet file, bytes read: {}, 
file size: {}, "

Review Comment:
   You can optimize here to not write this `Status::Corruption` twice.
   For example, u can use a `Status` and return it at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to