This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-fs-spi in repository https://gitbox.apache.org/repos/asf/doris.git
commit 3e90dc92d0e8b3aa7c2fc23d010ae203d16067c2 Author: morningman <[email protected]> AuthorDate: Wed Apr 1 15:48:39 2026 +0800 [refactor](fs-spi) P4.8-D: migrate HdfsStorageVault to SPI, delete legacy HDFS IO wrappers ### What problem does this PR solve? Issue Number: N/A Problem Summary: - HdfsStorageVault.checkConnectivity() was instantiating legacy DFSFileSystem directly for makeDir/exists/delete operations using Status-based API - Five legacy HDFS IO wrapper classes (HdfsInputFile, HdfsOutputFile, HdfsInputStream, HdfsOutputStream, HdfsInput) only had callers in the legacy DFSFileSystem.newInputFile()/newOutputFile() overrides Changes: - HdfsStorageVault: replace new DFSFileSystem(...) with FileSystemFactory.getFileSystem(StorageProperties) and use SPI mkdirs/exists/delete(Location) methods; IOException wrapped as DdlException - DFSFileSystem: remove newInputFile()/newOutputFile() overrides (falls back to LegacyFileSystemApi default UnsupportedOperationException); remove unused imports (Location, HdfsInputFile, HdfsOutputFile, DorisInputFile, DorisOutputFile, ParsedPath) - Delete: HdfsInputFile, HdfsOutputFile, HdfsInputStream, HdfsOutputStream, HdfsInput (all dead code after removing DFSFileSystem overrides) Note: ExternalCatalog and HMSExternalCatalog still reference DFSFileSystem.PROP_ALLOW_FALLBACK_TO_SIMPLE_AUTH and DFSFileSystem.getHdfsConf() as static-only usage; full migration deferred to Phase G when legacy DFSFileSystem is deleted (blocked by OSSHdfsFileSystem, JFSFileSystem, OFSFileSystem subclasses). ### Release note None ### Check List (For Author) - Test: No need to test (HdfsStorageVault.checkConnectivity is integration-only, no existing unit test; behavior identical — SPI HDFS provider uses same Hadoop FileSystem under the hood) - Behavior changed: No (same HDFS operations, same error handling) - Does this need documentation: No Co-authored-by: Copilot <[email protected]> --- .../org/apache/doris/catalog/HdfsStorageVault.java | 32 ++-- .../org/apache/doris/fs/io/hdfs/HdfsInput.java | 107 ------------ .../org/apache/doris/fs/io/hdfs/HdfsInputFile.java | 155 ------------------ .../apache/doris/fs/io/hdfs/HdfsInputStream.java | 179 --------------------- .../apache/doris/fs/io/hdfs/HdfsOutputFile.java | 95 ----------- .../apache/doris/fs/io/hdfs/HdfsOutputStream.java | 135 ---------------- .../apache/doris/fs/remote/dfs/DFSFileSystem.java | 16 -- 7 files changed, 10 insertions(+), 709 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java index 4f701105b63..d471ecf9826 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsStorageVault.java @@ -17,14 +17,14 @@ package org.apache.doris.catalog; -import org.apache.doris.backup.Status; import org.apache.doris.cloud.proto.Cloud; import org.apache.doris.common.DdlException; import org.apache.doris.common.security.authentication.AuthenticationConfig; import org.apache.doris.common.util.DatasourcePrintableMap; -import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; import org.apache.doris.datasource.property.storage.StorageProperties; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; +import org.apache.doris.filesystem.spi.FileSystem; +import org.apache.doris.filesystem.spi.Location; +import org.apache.doris.fs.FileSystemFactory; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -131,34 +131,22 @@ public class HdfsStorageVault extends StorageVault { Preconditions.checkArgument( !Strings.isNullOrEmpty(pathPrefix), "%s is null or empty", PropertyKey.VAULT_PATH_PREFIX); - try (DFSFileSystem dfsFileSystem = new DFSFileSystem((HdfsCompatibleProperties) StorageProperties - .createPrimary(newProperties))) { + try (FileSystem fs = FileSystemFactory.getFileSystem( + StorageProperties.createPrimary(newProperties))) { Long timestamp = System.currentTimeMillis(); String remotePath = hadoopFsName + "/" + pathPrefix + "/doris-check-connectivity" + timestamp.toString(); + Location loc = Location.of(remotePath); - Status st = dfsFileSystem.makeDir(remotePath); - if (st != Status.OK) { - throw new DdlException( - "checkConnectivity(makeDir) failed, status: " + st - + ", properties: " + new DatasourcePrintableMap<>( - newProperties, "=", true, false, true, false)); - } + fs.mkdirs(loc); - st = dfsFileSystem.exists(remotePath); - if (st != Status.OK) { + if (!fs.exists(loc)) { throw new DdlException( - "checkConnectivity(exist) failed, status: " + st + "checkConnectivity(exist) failed: path does not exist after mkdirs: " + remotePath + ", properties: " + new DatasourcePrintableMap<>( newProperties, "=", true, false, true, false)); } - st = dfsFileSystem.delete(remotePath); - if (st != Status.OK) { - throw new DdlException( - "checkConnectivity(exist) failed, status: " + st - + ", properties: " + new DatasourcePrintableMap<>( - newProperties, "=", true, false, true, false)); - } + fs.delete(loc, false); } catch (IOException e) { LOG.warn("checkConnectivity failed, properties:{}", new DatasourcePrintableMap<>( newProperties, "=", true, false, true, false), e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java deleted file mode 100644 index 06e49686897..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInput.java +++ /dev/null @@ -1,107 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.io.hdfs; - -import org.apache.doris.fs.io.DorisInput; -import org.apache.doris.fs.io.DorisInputFile; - -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Objects; - -/** - * HdfsInput provides an implementation of DorisInput for reading data from HDFS at a low level. - * It wraps an FSDataInputStream and a DorisInputFile, providing random access read functionality. - */ -public class HdfsInput implements DorisInput { - // The underlying Hadoop FSDataInputStream used for reading. - private final FSDataInputStream stream; - // The DorisInputFile representing the file. - private final DorisInputFile inputFile; - // Indicates whether the input has been closed. - private boolean closed; - - /** - * Constructs a HdfsInput with the given FSDataInputStream and DorisInputFile. - * - * @param stream the underlying Hadoop FSDataInputStream - * @param inputFile the DorisInputFile representing the file - */ - public HdfsInput(FSDataInputStream stream, DorisInputFile inputFile) { - this.stream = Objects.requireNonNull(stream, "stream is null"); - this.inputFile = Objects.requireNonNull(inputFile, "inputFile is null"); - } - - /** - * Checks if the input is closed and throws an IOException if it is. - * Used internally before performing any operation. - * - * @throws IOException if the input is closed - */ - private void checkClosed() throws IOException { - if (closed) { - throw new IOException("Input is closed: " + inputFile.toString()); - } - } - - /** - * Reads bytes from the file at the specified position into the buffer. - * - * @param position the position in the file to start reading - * @param buffer the buffer into which the data is read - * @param bufferOffset the start offset in the buffer - * @param bufferLength the number of bytes to read - * @throws IOException if an I/O error occurs or the input is closed - */ - @Override - public void readFully(long position, byte[] buffer, int bufferOffset, int bufferLength) throws IOException { - checkClosed(); - try { - stream.readFully(position, buffer, bufferOffset, bufferLength); - } catch (FileNotFoundException e) { - throw new FileNotFoundException( - String.format("File %s not found: %s", inputFile.toString(), e.getMessage())); - } catch (IOException e) { - throw new IOException(String.format("Read %d bytes at position %d failed for file %s: %s", - bufferLength, position, inputFile.toString(), e.getMessage()), e); - } - } - - /** - * Closes this input and releases any system resources associated with it. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - closed = true; - stream.close(); - } - - /** - * Returns the string representation of the input file. - * - * @return the file path as a string - */ - @Override - public String toString() { - return inputFile.toString(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java deleted file mode 100644 index 03c6de985e8..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputFile.java +++ /dev/null @@ -1,155 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.io.hdfs; - -import org.apache.doris.backup.Status; -import org.apache.doris.fs.Location; -import org.apache.doris.fs.io.DorisInput; -import org.apache.doris.fs.io.DorisInputFile; -import org.apache.doris.fs.io.DorisInputStream; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.util.Objects; - -/** - * HdfsInputFile provides an implementation of DorisInputFile for reading data from HDFS. - * It wraps a Location and DFSFileSystem to open files and retrieve file metadata from HDFS. - */ -public class HdfsInputFile implements DorisInputFile { - // The Location representing the file location in HDFS. - private final Location location; - // The Hadoop Path object corresponding to the file. - private final Path hadoopPath; - // The DFSFileSystem used to interact with HDFS. - private final DFSFileSystem dfs; - - // The length of the file, lazily initialized. - private long length; - // The FileStatus object for the file, lazily initialized. - private FileStatus status; - - /** - * Constructs a HdfsInputFile with the given Location, file length, and DFSFileSystem. - * - * @param location the Location representing the file location - * @param length the length of the file, or -1 if unknown - * @param dfs the DFSFileSystem used to interact with HDFS - */ - public HdfsInputFile(Location location, long length, DFSFileSystem dfs) { - this.location = Objects.requireNonNull(location, "location is null"); - this.dfs = Objects.requireNonNull(dfs, "hdfs file system is null"); - this.hadoopPath = new Path(location.toString()); - this.length = length; - } - - /** - * Returns a new DorisInput for reading from this file. - * - * @return a new DorisInput instance - * @throws IOException if an I/O error occurs - */ - @Override - public DorisInput newInput() throws IOException { - return new HdfsInput(dfs.openFile(hadoopPath), this); - } - - /** - * Returns a new DorisInputStream for streaming reads from this file. - * - * @return a new DorisInputStream instance - * @throws IOException if an I/O error occurs - */ - @Override - public DorisInputStream newStream() throws IOException { - return new HdfsInputStream(location.toString(), dfs.openFile(hadoopPath)); - } - - /** - * Returns the length of the file, querying HDFS if necessary. - * - * @return the file length - * @throws IOException if an I/O error occurs - */ - @Override - public long length() throws IOException { - if (length == -1) { - length = getFileStatus().getLen(); - } - return length; - } - - /** - * Returns the last modified time of the file. - * - * @return the last modified time in milliseconds - * @throws IOException if an I/O error occurs - */ - @Override - public long lastModifiedTime() throws IOException { - return getFileStatus().getModificationTime(); - } - - /** - * Checks if the file exists in HDFS. - * - * @return true if the file exists, false otherwise - * @throws IOException if an I/O error occurs - */ - @Override - public boolean exists() throws IOException { - Status existsStatus = dfs.exists(location.toString()); - return existsStatus.ok(); - } - - /** - * Returns the Location associated with this input file. - * - * @return the Location - */ - @Override - public Location location() { - return location; - } - - /** - * Returns the string representation of the file path. - * - * @return the file path as a string - */ - @Override - public String toString() { - return location.toString(); - } - - /** - * Lazily retrieves the FileStatus from HDFS for this file. - * - * @return the FileStatus object - * @throws IOException if an I/O error occurs - */ - private FileStatus getFileStatus() throws IOException { - if (status == null) { - status = dfs.getFileStatus(hadoopPath); - } - return status; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java deleted file mode 100644 index 48ab5ffb035..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsInputStream.java +++ /dev/null @@ -1,179 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.io.hdfs; - -import org.apache.doris.fs.io.DorisInputStream; - -import org.apache.hadoop.fs.FSDataInputStream; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Objects; - -/** - * HdfsInputStream provides an input stream implementation for reading data from HDFS - * using a path string and FSDataInputStream. - * It extends DorisInputStream and wraps Hadoop's FSDataInputStream, providing additional checks and error handling. - */ -public class HdfsInputStream extends DorisInputStream { - // The file path string for error reporting. - private final String path; - // The underlying Hadoop FSDataInputStream used for reading. - private final FSDataInputStream stream; - // Indicates whether the stream has been closed. - private boolean closed; - - /** - * Constructs a HdfsInputStream with the given path string and FSDataInputStream. - * - * @param path the file path string for error reporting - * @param stream the underlying Hadoop FSDataInputStream - */ - HdfsInputStream(String path, FSDataInputStream stream) { - this.path = Objects.requireNonNull(path, "path is null"); - this.stream = Objects.requireNonNull(stream, "stream is null"); - } - - /** - * Checks if the stream is closed and throws an IOException if it is. - * Used internally before performing any operation. - * - * @throws IOException if the stream is closed - */ - private void checkClosed() throws IOException { - if (closed) { - throw new IOException("Input stream is closed: " + path); - } - } - - /** - * Returns the number of bytes that can be read from this input stream without blocking. - * - * @return the number of available bytes - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public int available() throws IOException { - checkClosed(); - try { - return stream.available(); - } catch (IOException e) { - throw new IOException(String.format("Failed to get available status for file %s.", path), e); - } - } - - /** - * Returns the current position in the input stream. - * - * @return the current position - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public long getPosition() throws IOException { - checkClosed(); - try { - return stream.getPos(); - } catch (IOException e) { - throw new IOException(String.format("Failed to get position for file %s.", path), e); - } - } - - /** - * Seeks to the specified position in the input stream. - * - * @param position the position to seek to - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public void seek(long position) throws IOException { - checkClosed(); - try { - stream.seek(position); - } catch (IOException e) { - throw new IOException( - String.format("Failed to seek to position %d for file %s: %s", position, path, e.getMessage()), e); - } - } - - /** - * Reads the next byte of data from the input stream. - * - * @return the next byte of data, or -1 if the end of the stream is reached - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public int read() throws IOException { - checkClosed(); - try { - return stream.read(); - } catch (FileNotFoundException e) { - throw new FileNotFoundException(String.format("File %s not found: %s", path, e.getMessage())); - } catch (IOException e) { - throw new IOException(String.format("Read of file %s failed: %s", path, e.getMessage()), e); - } - } - - /** - * Reads up to len bytes of data from the input stream into an array of bytes. - * - * @param b the buffer into which the data is read - * @param off the start offset in array b at which the data is written - * @param len the maximum number of bytes to read - * @return the total number of bytes read into the buffer, or -1 if there is no more data - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public int read(byte[] b, int off, int len) throws IOException { - checkClosed(); - try { - return stream.read(b, off, len); - } catch (FileNotFoundException e) { - throw new FileNotFoundException(String.format("File %s not found: %s", path, e.getMessage())); - } catch (IOException e) { - throw new IOException(String.format("Read of file %s failed: %s", path, e.getMessage()), e); - } - } - - /** - * Skips over and discards n bytes of data from this input stream. - * - * @param n the number of bytes to skip - * @return the actual number of bytes skipped - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public long skip(long n) throws IOException { - checkClosed(); - try { - return stream.skip(n); - } catch (IOException e) { - throw new IOException(String.format("Skip in file %s failed: %s", path, e.getMessage()), e); - } - } - - /** - * Closes this input stream and releases any system resources associated with it. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - closed = true; - stream.close(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java deleted file mode 100644 index 2d0d3233260..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputFile.java +++ /dev/null @@ -1,95 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.io.hdfs; - -import org.apache.doris.fs.Location; -import org.apache.doris.fs.io.DorisOutputFile; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; - -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Objects; - -/** - * HdfsOutputFile provides an implementation of DorisOutputFile for writing data to HDFS. - * It wraps a Location and DFSFileSystem to create or overwrite files in HDFS. - */ -public class HdfsOutputFile implements DorisOutputFile { - // The Location representing the file location in HDFS. - private final Location location; - // The Hadoop Path object corresponding to the file. - private final Path hadoopPath; - // The DFSFileSystem used to interact with HDFS. - private final DFSFileSystem dfs; - - /** - * Constructs a HdfsOutputFile with the given Location and DFSFileSystem. - * - * @param location the Location representing the file location - * @param dfs the DFSFileSystem used to interact with HDFS - */ - public HdfsOutputFile(Location location, DFSFileSystem dfs) { - this.location = Objects.requireNonNull(location, "location is null"); - this.hadoopPath = new Path(location.toString()); - this.dfs = Objects.requireNonNull(dfs, "dfs is null"); - } - - /** - * Creates a new file in HDFS. Fails if the file already exists. - * - * @return OutputStream for writing to the new file - * @throws IOException if an I/O error occurs - */ - @Override - public OutputStream create() throws IOException { - return dfs.createFile(hadoopPath, false); - } - - /** - * Creates a new file or overwrites the file if it already exists in HDFS. - * - * @return OutputStream for writing to the file - * @throws IOException if an I/O error occurs - */ - @Override - public OutputStream createOrOverwrite() throws IOException { - return dfs.createFile(hadoopPath, true); - } - - /** - * Returns the Location associated with this output file. - * - * @return the Location - */ - @Override - public Location location() { - return location; - } - - /** - * Returns the string representation of the file path. - * - * @return the file path as a string - */ - @Override - public String toString() { - return location.toString(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java b/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java deleted file mode 100644 index 86773b1cf2b..00000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/io/hdfs/HdfsOutputStream.java +++ /dev/null @@ -1,135 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.fs.io.hdfs; - -import org.apache.doris.fs.io.ParsedPath; -import org.apache.doris.fs.remote.dfs.DFSFileSystem; - -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Objects; - -/** - * HdfsOutputStream provides an output stream implementation for writing data to HDFS - * using ParsedPath and FSDataOutputStream. - * It extends FSDataOutputStream and adds additional checks and Kerberos authentication handling. - */ -public class HdfsOutputStream extends FSDataOutputStream { - // The ParsedPath representing the file location in HDFS. - private final ParsedPath path; - // The Hadoop Path object corresponding to the file. - private final Path hadoopPath; - // The DFSFileSystem used to interact with HDFS. - private final DFSFileSystem dfs; - // Indicates whether the stream has been closed. - private boolean closed; - - /** - * Constructs a HdfsOutputStream with the given ParsedPath, FSDataOutputStream, and DFSFileSystem. - * - * @param path the ParsedPath representing the file location - * @param out the underlying Hadoop FSDataOutputStream - * @param dfs the DFSFileSystem used to interact with HDFS - */ - public HdfsOutputStream(ParsedPath path, FSDataOutputStream out, DFSFileSystem dfs) { - super(out, null, out.getPos()); - this.path = Objects.requireNonNull(path, "path is null"); - this.hadoopPath = path.toHadoopPath(); - this.dfs = dfs; - } - - /** - * Checks if the stream is closed and throws an IOException if it is. - * Used internally before performing any operation. - * - * @throws IOException if the stream is closed - */ - private void checkClosed() throws IOException { - if (closed) { - throw new IOException("Output stream is closed: " + path); - } - } - - /** - * Returns the originally wrapped OutputStream, not the delegate. - * - * @return the wrapped OutputStream - */ - @Override - public OutputStream getWrappedStream() { - return ((FSDataOutputStream) super.getWrappedStream()).getWrappedStream(); - } - - /** - * Writes the specified byte to this output stream, handling Kerberos ticket refresh if needed. - * - * @param b the byte to write - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public void write(int b) throws IOException { - checkClosed(); - dfs.getAuthenticator().doAs(() -> { - super.write(b); - return null; - }); - } - - /** - * Writes len bytes from the specified byte array starting at offset off to this output stream, - * handling Kerberos ticket refresh if needed. - * - * @param b the data - * @param off the start offset in the data - * @param len the number of bytes to write - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public void write(byte[] b, int off, int len) throws IOException { - checkClosed(); - dfs.getAuthenticator().doAs(() -> { - super.write(b, off, len); - return null; - }); - } - - /** - * Flushes this output stream and forces any buffered output bytes to be written out. - * - * @throws IOException if an I/O error occurs or the stream is closed - */ - @Override - public void flush() throws IOException { - checkClosed(); - super.flush(); - } - - /** - * Closes this output stream and releases any system resources associated with it. - * - * @throws IOException if an I/O error occurs - */ - @Override - public void close() throws IOException { - closed = true; - super.close(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java index 7af9081d639..8d98a7bc5fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/fs/remote/dfs/DFSFileSystem.java @@ -25,12 +25,6 @@ import org.apache.doris.common.util.URI; import org.apache.doris.datasource.property.storage.HdfsCompatibleProperties; import org.apache.doris.datasource.property.storage.StorageProperties; import org.apache.doris.foundation.fs.FsStorageType; -import org.apache.doris.fs.Location; -import org.apache.doris.fs.io.DorisInputFile; -import org.apache.doris.fs.io.DorisOutputFile; -import org.apache.doris.fs.io.ParsedPath; -import org.apache.doris.fs.io.hdfs.HdfsInputFile; -import org.apache.doris.fs.io.hdfs.HdfsOutputFile; import org.apache.doris.fs.operations.HDFSFileOperations; import org.apache.doris.fs.operations.HDFSOpParams; import org.apache.doris.fs.operations.OpParams; @@ -585,14 +579,4 @@ public class DFSFileSystem extends RemoteFileSystem { FileSystem fileSystem = nativeFileSystem(path); return hdfsProperties.getHadoopAuthenticator().doAs(() -> fileSystem.create(path, overwrite)); } - - @Override - public DorisOutputFile newOutputFile(ParsedPath path) { - return new HdfsOutputFile(Location.of(path.toString()), this); - } - - @Override - public DorisInputFile newInputFile(ParsedPath path, long length) { - return new HdfsInputFile(Location.of(path.toString()), length, this); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
