This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-fs-spi
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 3e90dc92d0e8b3aa7c2fc23d010ae203d16067c2
Author: morningman <[email protected]>
AuthorDate: Wed Apr 1 15:48:39 2026 +0800

    [refactor](fs-spi) P4.8-D: migrate HdfsStorageVault to SPI, delete legacy 
HDFS IO wrappers
    
    ### What problem does this PR solve?
    
    Issue Number: N/A
    
    Problem Summary:
    - HdfsStorageVault.checkConnectivity() was instantiating legacy 
DFSFileSystem
      directly for makeDir/exists/delete operations using Status-based API
    - Five legacy HDFS IO wrapper classes (HdfsInputFile, HdfsOutputFile,
      HdfsInputStream, HdfsOutputStream, HdfsInput) only had callers in the 
legacy
      DFSFileSystem.newInputFile()/newOutputFile() overrides
    
    Changes:
    - HdfsStorageVault: replace new DFSFileSystem(...) with
      FileSystemFactory.getFileSystem(StorageProperties) and use SPI
      mkdirs/exists/delete(Location) methods; IOException wrapped as 
DdlException
    - DFSFileSystem: remove newInputFile()/newOutputFile() overrides (falls back
      to LegacyFileSystemApi default UnsupportedOperationException); remove 
unused
      imports (Location, HdfsInputFile, HdfsOutputFile, DorisInputFile,
      DorisOutputFile, ParsedPath)
    - Delete: HdfsInputFile, HdfsOutputFile, HdfsInputStream, HdfsOutputStream,
      HdfsInput (all dead code after removing DFSFileSystem overrides)
    
    Note: ExternalCatalog and HMSExternalCatalog still reference
    DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH and 
DFSFileSystem.getHdfsConf()
    as static-only usage; full migration deferred to Phase G when legacy 
DFSFileSystem
    is deleted (blocked by OSSHdfsFileSystem, JFSFileSystem, OFSFileSystem 
subclasses).
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: No need to test (HdfsStorageVault.checkConnectivity is 
integration-only,
      no existing unit test; behavior identical — SPI HDFS provider uses same 
Hadoop
      FileSystem under the hood)
    - Behavior changed: No (same HDFS operations, same error handling)
    - Does this need documentation: No
    
    Co-authored-by: Copilot <[email protected]>
---
 .../org/apache/doris/catalog/HdfsStorageVault.java |  32 ++--
 .../org/apache/doris/fs/io/hdfs/HdfsInput.java     | 107 ------------
 .../org/apache/doris/fs/io/hdfs/HdfsInputFile.java | 155 ------------------
 .../apache/doris/fs/io/hdfs/HdfsInputStream.java   | 179 ---------------------
 .../apache/doris/fs/io/hdfs/HdfsOutputFile.java    |  95 -----------
 .../apache/doris/fs/io/hdfs/HdfsOutputStream.java  | 135 ----------------
 .../apache/doris/fs/remote/dfs/DFSFileSystem.java  |  16 --
 7 files changed, 10 insertions(+), 709 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
index 4f701105b63..d471ecf9826 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java
@@ -17,14 +17,14 @@
 
 package org.apache.doris.catalog;
 
-import org.apache.doris.backup.Status;
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.security.authentication.AuthenticationConfig;
 import org.apache.doris.common.util.DatasourcePrintableMap;
-import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
+import org.apache.doris.filesystem.spi.FileSystem;
+import org.apache.doris.filesystem.spi.Location;
+import org.apache.doris.fs.FileSystemFactory;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -131,34 +131,22 @@ public class HdfsStorageVault extends StorageVault {
         Preconditions.checkArgument(
                 !Strings.isNullOrEmpty(pathPrefix), "%s is null or empty", 
PropertyKey.VAULT_PATH_PREFIX);
 
-        try (DFSFileSystem dfsFileSystem = new 
DFSFileSystem((HdfsCompatibleProperties) StorageProperties
-                .createPrimary(newProperties))) {
+        try (FileSystem fs = FileSystemFactory.getFileSystem(
+                StorageProperties.createPrimary(newProperties))) {
             Long timestamp = System.currentTimeMillis();
             String remotePath = hadoopFsName + "/" + pathPrefix + 
"/doris-check-connectivity" + timestamp.toString();
+            Location loc = Location.of(remotePath);
 
-            Status st = dfsFileSystem.makeDir(remotePath);
-            if (st != Status.OK) {
-                throw new DdlException(
-                        "checkConnectivity(makeDir) failed, status: " + st
-                                + ", properties: " + new 
DatasourcePrintableMap<>(
-                                newProperties, "=", true, false, true, false));
-            }
+            fs.mkdirs(loc);
 
-            st = dfsFileSystem.exists(remotePath);
-            if (st != Status.OK) {
+            if (!fs.exists(loc)) {
                 throw new DdlException(
-                        "checkConnectivity(exist) failed, status: " + st
+                        "checkConnectivity(exist) failed: path does not exist 
after mkdirs: " + remotePath
                                 + ", properties: " + new 
DatasourcePrintableMap<>(
                                 newProperties, "=", true, false, true, false));
             }
 
-            st = dfsFileSystem.delete(remotePath);
-            if (st != Status.OK) {
-                throw new DdlException(
-                        "checkConnectivity(exist) failed, status: " + st
-                                + ", properties: " + new 
DatasourcePrintableMap<>(
-                                newProperties, "=", true, false, true, false));
-            }
+            fs.delete(loc, false);
         } catch (IOException e) {
             LOG.warn("checkConnectivity failed, properties:{}", new 
DatasourcePrintableMap<>(
                     newProperties, "=", true, false, true, false), e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java
deleted file mode 100644
index 06e49686897..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java
+++ /dev/null
@@ -1,107 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.io.hdfs;
-
-import org.apache.doris.fs.io.DorisInput;
-import org.apache.doris.fs.io.DorisInputFile;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- * HdfsInput provides an implementation of DorisInput for reading data from 
HDFS at a low level.
- * It wraps an FSDataInputStream and a DorisInputFile, providing random access 
read functionality.
- */
-public class HdfsInput implements DorisInput {
-    // The underlying Hadoop FSDataInputStream used for reading.
-    private final FSDataInputStream stream;
-    // The DorisInputFile representing the file.
-    private final DorisInputFile inputFile;
-    // Indicates whether the input has been closed.
-    private boolean closed;
-
-    /**
-     * Constructs a HdfsInput with the given FSDataInputStream and 
DorisInputFile.
-     *
-     * @param stream the underlying Hadoop FSDataInputStream
-     * @param inputFile the DorisInputFile representing the file
-     */
-    public HdfsInput(FSDataInputStream stream, DorisInputFile inputFile) {
-        this.stream = Objects.requireNonNull(stream, "stream is null");
-        this.inputFile = Objects.requireNonNull(inputFile, "inputFile is 
null");
-    }
-
-    /**
-     * Checks if the input is closed and throws an IOException if it is.
-     * Used internally before performing any operation.
-     *
-     * @throws IOException if the input is closed
-     */
-    private void checkClosed() throws IOException {
-        if (closed) {
-            throw new IOException("Input is closed: " + inputFile.toString());
-        }
-    }
-
-    /**
-     * Reads bytes from the file at the specified position into the buffer.
-     *
-     * @param position the position in the file to start reading
-     * @param buffer the buffer into which the data is read
-     * @param bufferOffset the start offset in the buffer
-     * @param bufferLength the number of bytes to read
-     * @throws IOException if an I/O error occurs or the input is closed
-     */
-    @Override
-    public void readFully(long position, byte[] buffer, int bufferOffset, int 
bufferLength) throws IOException {
-        checkClosed();
-        try {
-            stream.readFully(position, buffer, bufferOffset, bufferLength);
-        } catch (FileNotFoundException e) {
-            throw new FileNotFoundException(
-                    String.format("File %s not found: %s", 
inputFile.toString(), e.getMessage()));
-        } catch (IOException e) {
-            throw new IOException(String.format("Read %d bytes at position %d 
failed for file %s: %s",
-                    bufferLength, position, inputFile.toString(), 
e.getMessage()), e);
-        }
-    }
-
-    /**
-     * Closes this input and releases any system resources associated with it.
-     *
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public void close() throws IOException {
-        closed = true;
-        stream.close();
-    }
-
-    /**
-     * Returns the string representation of the input file.
-     *
-     * @return the file path as a string
-     */
-    @Override
-    public String toString() {
-        return inputFile.toString();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java
deleted file mode 100644
index 03c6de985e8..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java
+++ /dev/null
@@ -1,155 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.io.hdfs;
-
-import org.apache.doris.backup.Status;
-import org.apache.doris.fs.Location;
-import org.apache.doris.fs.io.DorisInput;
-import org.apache.doris.fs.io.DorisInputFile;
-import org.apache.doris.fs.io.DorisInputStream;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- * HdfsInputFile provides an implementation of DorisInputFile for reading data 
from HDFS.
- * It wraps a Location and DFSFileSystem to open files and retrieve file 
metadata from HDFS.
- */
-public class HdfsInputFile implements DorisInputFile {
-    // The Location representing the file location in HDFS.
-    private final Location location;
-    // The Hadoop Path object corresponding to the file.
-    private final Path hadoopPath;
-    // The DFSFileSystem used to interact with HDFS.
-    private final DFSFileSystem dfs;
-
-    // The length of the file, lazily initialized.
-    private long length;
-    // The FileStatus object for the file, lazily initialized.
-    private FileStatus status;
-
-    /**
-     * Constructs a HdfsInputFile with the given Location, file length, and 
DFSFileSystem.
-     *
-     * @param location the Location representing the file location
-     * @param length the length of the file, or -1 if unknown
-     * @param dfs the DFSFileSystem used to interact with HDFS
-     */
-    public HdfsInputFile(Location location, long length, DFSFileSystem dfs) {
-        this.location = Objects.requireNonNull(location, "location is null");
-        this.dfs = Objects.requireNonNull(dfs, "hdfs file system is null");
-        this.hadoopPath = new Path(location.toString());
-        this.length = length;
-    }
-
-    /**
-     * Returns a new DorisInput for reading from this file.
-     *
-     * @return a new DorisInput instance
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public DorisInput newInput() throws IOException {
-        return new HdfsInput(dfs.openFile(hadoopPath), this);
-    }
-
-    /**
-     * Returns a new DorisInputStream for streaming reads from this file.
-     *
-     * @return a new DorisInputStream instance
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public DorisInputStream newStream() throws IOException {
-        return new HdfsInputStream(location.toString(), 
dfs.openFile(hadoopPath));
-    }
-
-    /**
-     * Returns the length of the file, querying HDFS if necessary.
-     *
-     * @return the file length
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public long length() throws IOException {
-        if (length == -1) {
-            length = getFileStatus().getLen();
-        }
-        return length;
-    }
-
-    /**
-     * Returns the last modified time of the file.
-     *
-     * @return the last modified time in milliseconds
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public long lastModifiedTime() throws IOException {
-        return getFileStatus().getModificationTime();
-    }
-
-    /**
-     * Checks if the file exists in HDFS.
-     *
-     * @return true if the file exists, false otherwise
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public boolean exists() throws IOException {
-        Status existsStatus = dfs.exists(location.toString());
-        return existsStatus.ok();
-    }
-
-    /**
-     * Returns the Location associated with this input file.
-     *
-     * @return the Location
-     */
-    @Override
-    public Location location() {
-        return location;
-    }
-
-    /**
-     * Returns the string representation of the file path.
-     *
-     * @return the file path as a string
-     */
-    @Override
-    public String toString() {
-        return location.toString();
-    }
-
-    /**
-     * Lazily retrieves the FileStatus from HDFS for this file.
-     *
-     * @return the FileStatus object
-     * @throws IOException if an I/O error occurs
-     */
-    private FileStatus getFileStatus() throws IOException {
-        if (status == null) {
-            status = dfs.getFileStatus(hadoopPath);
-        }
-        return status;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java
deleted file mode 100644
index 48ab5ffb035..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.io.hdfs;
-
-import org.apache.doris.fs.io.DorisInputStream;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Objects;
-
-/**
- * HdfsInputStream provides an input stream implementation for reading data 
from HDFS
- * using a path string and FSDataInputStream.
- * It extends DorisInputStream and wraps Hadoop's FSDataInputStream, providing 
additional checks and error handling.
- */
-public class HdfsInputStream extends DorisInputStream {
-    // The file path string for error reporting.
-    private final String path;
-    // The underlying Hadoop FSDataInputStream used for reading.
-    private final FSDataInputStream stream;
-    // Indicates whether the stream has been closed.
-    private boolean closed;
-
-    /**
-     * Constructs a HdfsInputStream with the given path string and 
FSDataInputStream.
-     *
-     * @param path the file path string for error reporting
-     * @param stream the underlying Hadoop FSDataInputStream
-     */
-    HdfsInputStream(String path, FSDataInputStream stream) {
-        this.path = Objects.requireNonNull(path, "path is null");
-        this.stream = Objects.requireNonNull(stream, "stream is null");
-    }
-
-    /**
-     * Checks if the stream is closed and throws an IOException if it is.
-     * Used internally before performing any operation.
-     *
-     * @throws IOException if the stream is closed
-     */
-    private void checkClosed() throws IOException {
-        if (closed) {
-            throw new IOException("Input stream is closed: " + path);
-        }
-    }
-
-    /**
-     * Returns the number of bytes that can be read from this input stream 
without blocking.
-     *
-     * @return the number of available bytes
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public int available() throws IOException {
-        checkClosed();
-        try {
-            return stream.available();
-        } catch (IOException e) {
-            throw new IOException(String.format("Failed to get available 
status for file %s.", path), e);
-        }
-    }
-
-    /**
-     * Returns the current position in the input stream.
-     *
-     * @return the current position
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public long getPosition() throws IOException {
-        checkClosed();
-        try {
-            return stream.getPos();
-        } catch (IOException e) {
-            throw new IOException(String.format("Failed to get position for 
file %s.", path), e);
-        }
-    }
-
-    /**
-     * Seeks to the specified position in the input stream.
-     *
-     * @param position the position to seek to
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public void seek(long position) throws IOException {
-        checkClosed();
-        try {
-            stream.seek(position);
-        } catch (IOException e) {
-            throw new IOException(
-                    String.format("Failed to seek to position %d for file %s: 
%s", position, path, e.getMessage()), e);
-        }
-    }
-
-    /**
-     * Reads the next byte of data from the input stream.
-     *
-     * @return the next byte of data, or -1 if the end of the stream is reached
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public int read() throws IOException {
-        checkClosed();
-        try {
-            return stream.read();
-        } catch (FileNotFoundException e) {
-            throw new FileNotFoundException(String.format("File %s not found: 
%s", path, e.getMessage()));
-        } catch (IOException e) {
-            throw new IOException(String.format("Read of file %s failed: %s", 
path, e.getMessage()), e);
-        }
-    }
-
-    /**
-     * Reads up to len bytes of data from the input stream into an array of 
bytes.
-     *
-     * @param b the buffer into which the data is read
-     * @param off the start offset in array b at which the data is written
-     * @param len the maximum number of bytes to read
-     * @return the total number of bytes read into the buffer, or -1 if there 
is no more data
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        checkClosed();
-        try {
-            return stream.read(b, off, len);
-        } catch (FileNotFoundException e) {
-            throw new FileNotFoundException(String.format("File %s not found: 
%s", path, e.getMessage()));
-        } catch (IOException e) {
-            throw new IOException(String.format("Read of file %s failed: %s", 
path, e.getMessage()), e);
-        }
-    }
-
-    /**
-     * Skips over and discards n bytes of data from this input stream.
-     *
-     * @param n the number of bytes to skip
-     * @return the actual number of bytes skipped
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public long skip(long n) throws IOException {
-        checkClosed();
-        try {
-            return stream.skip(n);
-        } catch (IOException e) {
-            throw new IOException(String.format("Skip in file %s failed: %s", 
path, e.getMessage()), e);
-        }
-    }
-
-    /**
-     * Closes this input stream and releases any system resources associated 
with it.
-     *
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public void close() throws IOException {
-        closed = true;
-        stream.close();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java
deleted file mode 100644
index 2d0d3233260..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java
+++ /dev/null
@@ -1,95 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.io.hdfs;
-
-import org.apache.doris.fs.Location;
-import org.apache.doris.fs.io.DorisOutputFile;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * HdfsOutputFile provides an implementation of DorisOutputFile for writing 
data to HDFS.
- * It wraps a Location and DFSFileSystem to create or overwrite files in HDFS.
- */
-public class HdfsOutputFile implements DorisOutputFile {
-    // The Location representing the file location in HDFS.
-    private final Location location;
-    // The Hadoop Path object corresponding to the file.
-    private final Path hadoopPath;
-    // The DFSFileSystem used to interact with HDFS.
-    private final DFSFileSystem dfs;
-
-    /**
-     * Constructs a HdfsOutputFile with the given Location and DFSFileSystem.
-     *
-     * @param location the Location representing the file location
-     * @param dfs the DFSFileSystem used to interact with HDFS
-     */
-    public HdfsOutputFile(Location location, DFSFileSystem dfs) {
-        this.location = Objects.requireNonNull(location, "location is null");
-        this.hadoopPath = new Path(location.toString());
-        this.dfs = Objects.requireNonNull(dfs, "dfs is null");
-    }
-
-    /**
-     * Creates a new file in HDFS. Fails if the file already exists.
-     *
-     * @return OutputStream for writing to the new file
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public OutputStream create() throws IOException {
-        return dfs.createFile(hadoopPath, false);
-    }
-
-    /**
-     * Creates a new file or overwrites the file if it already exists in HDFS.
-     *
-     * @return OutputStream for writing to the file
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public OutputStream createOrOverwrite() throws IOException {
-        return dfs.createFile(hadoopPath, true);
-    }
-
-    /**
-     * Returns the Location associated with this output file.
-     *
-     * @return the Location
-     */
-    @Override
-    public Location location() {
-        return location;
-    }
-
-    /**
-     * Returns the string representation of the file path.
-     *
-     * @return the file path as a string
-     */
-    @Override
-    public String toString() {
-        return location.toString();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java
deleted file mode 100644
index 86773b1cf2b..00000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java
+++ /dev/null
@@ -1,135 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.fs.io.hdfs;
-
-import org.apache.doris.fs.io.ParsedPath;
-import org.apache.doris.fs.remote.dfs.DFSFileSystem;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Objects;
-
-/**
- * HdfsOutputStream provides an output stream implementation for writing data 
to HDFS
- * using ParsedPath and FSDataOutputStream.
- * It extends FSDataOutputStream and adds additional checks and Kerberos 
authentication handling.
- */
-public class HdfsOutputStream extends FSDataOutputStream {
-    // The ParsedPath representing the file location in HDFS.
-    private final ParsedPath path;
-    // The Hadoop Path object corresponding to the file.
-    private final Path hadoopPath;
-    // The DFSFileSystem used to interact with HDFS.
-    private final DFSFileSystem dfs;
-    // Indicates whether the stream has been closed.
-    private boolean closed;
-
-    /**
-     * Constructs a HdfsOutputStream with the given ParsedPath, 
FSDataOutputStream, and DFSFileSystem.
-     *
-     * @param path the ParsedPath representing the file location
-     * @param out the underlying Hadoop FSDataOutputStream
-     * @param dfs the DFSFileSystem used to interact with HDFS
-     */
-    public HdfsOutputStream(ParsedPath path, FSDataOutputStream out, 
DFSFileSystem dfs) {
-        super(out, null, out.getPos());
-        this.path = Objects.requireNonNull(path, "path is null");
-        this.hadoopPath = path.toHadoopPath();
-        this.dfs = dfs;
-    }
-
-    /**
-     * Checks if the stream is closed and throws an IOException if it is.
-     * Used internally before performing any operation.
-     *
-     * @throws IOException if the stream is closed
-     */
-    private void checkClosed() throws IOException {
-        if (closed) {
-            throw new IOException("Output stream is closed: " + path);
-        }
-    }
-
-    /**
-     * Returns the originally wrapped OutputStream, not the delegate.
-     *
-     * @return the wrapped OutputStream
-     */
-    @Override
-    public OutputStream getWrappedStream() {
-        return ((FSDataOutputStream) 
super.getWrappedStream()).getWrappedStream();
-    }
-
-    /**
-     * Writes the specified byte to this output stream, handling Kerberos 
ticket refresh if needed.
-     *
-     * @param b the byte to write
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public void write(int b) throws IOException {
-        checkClosed();
-        dfs.getAuthenticator().doAs(() -> {
-            super.write(b);
-            return null;
-        });
-    }
-
-    /**
-     * Writes len bytes from the specified byte array starting at offset off 
to this output stream,
-     * handling Kerberos ticket refresh if needed.
-     *
-     * @param b the data
-     * @param off the start offset in the data
-     * @param len the number of bytes to write
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public void write(byte[] b, int off, int len) throws IOException {
-        checkClosed();
-        dfs.getAuthenticator().doAs(() -> {
-            super.write(b, off, len);
-            return null;
-        });
-    }
-
-    /**
-     * Flushes this output stream and forces any buffered output bytes to be 
written out.
-     *
-     * @throws IOException if an I/O error occurs or the stream is closed
-     */
-    @Override
-    public void flush() throws IOException {
-        checkClosed();
-        super.flush();
-    }
-
-    /**
-     * Closes this output stream and releases any system resources associated 
with it.
-     *
-     * @throws IOException if an I/O error occurs
-     */
-    @Override
-    public void close() throws IOException {
-        closed = true;
-        super.close();
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java 
b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
index 7af9081d639..8d98a7bc5fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java
@@ -25,12 +25,6 @@ import org.apache.doris.common.util.URI;
 import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.foundation.fs.FsStorageType;
-import org.apache.doris.fs.Location;
-import org.apache.doris.fs.io.DorisInputFile;
-import org.apache.doris.fs.io.DorisOutputFile;
-import org.apache.doris.fs.io.ParsedPath;
-import org.apache.doris.fs.io.hdfs.HdfsInputFile;
-import org.apache.doris.fs.io.hdfs.HdfsOutputFile;
 import org.apache.doris.fs.operations.HDFSFileOperations;
 import org.apache.doris.fs.operations.HDFSOpParams;
 import org.apache.doris.fs.operations.OpParams;
@@ -585,14 +579,4 @@ public class DFSFileSystem extends RemoteFileSystem {
         FileSystem fileSystem = nativeFileSystem(path);
         return hdfsProperties.getHadoopAuthenticator().doAs(() -> 
fileSystem.create(path, overwrite));
     }
-
-    @Override
-    public DorisOutputFile newOutputFile(ParsedPath path) {
-        return new HdfsOutputFile(Location.of(path.toString()), this);
-    }
-
-    @Override
-    public DorisInputFile newInputFile(ParsedPath path, long length) {
-        return new HdfsInputFile(Location.of(path.toString()), length, this);
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to