pavibhai commented on code in PR #6293:
URL: https://github.com/apache/iceberg/pull/6293#discussion_r1041995292


##########
orc/src/main/java/org/apache/iceberg/orc/ORC.java:
##########
@@ -789,7 +808,210 @@ static Reader newFileReader(InputFile file, Configuration 
config) {
     ReaderOptions readerOptions = 
OrcFile.readerOptions(config).useUTCTimestamp(true);
     if (file instanceof HadoopInputFile) {
       readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
+    } else {
+      readerOptions.filesystem(new 
InputFileSystem(file)).maxLength(file.getLength());
     }
     return newFileReader(file.location(), readerOptions);
   }
+
+  static Writer newFileWriter(
+      OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> 
metadata) {
+    if (file instanceof HadoopOutputFile) {
+      options.fileSystem(((HadoopOutputFile) file).getFileSystem());
+    } else {
+      options.fileSystem(new OutputFileSystem(file));
+    }
+    final Path locPath = new Path(file.location());
+    final Writer writer;
+
+    try {
+      writer = OrcFile.createWriter(locPath, options);
+    } catch (IOException ioe) {
+      throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
+    }
+
+    metadata.forEach((key, value) -> writer.addUserMetadata(key, 
ByteBuffer.wrap(value)));
+
+    return writer;
+  }
+
+  private static class WrappedSeekableInputStream extends FSInputStream {
+    private final SeekableInputStream inputStream;
+    private boolean closed;
+    private final StackTraceElement[] createStack;
+
+    private WrappedSeekableInputStream(SeekableInputStream inputStream) {
+      this.inputStream = inputStream;
+      this.createStack = Thread.currentThread().getStackTrace();
+      this.closed = false;
+    }
+
+    @Override
+    public void seek(long pos) throws IOException {
+      inputStream.seek(pos);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return inputStream.getPos();
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+      throw new UnsupportedOperationException("seekToNewSource not supported");
+    }
+
+    @Override
+    public int read() throws IOException {
+      return inputStream.read();
+    }
+
+    @Override
+    public int read(@NotNull byte[] b, int off, int len) throws IOException {
+      return inputStream.read(b, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+      inputStream.close();
+      closed = true;
+    }
+
+    @SuppressWarnings("checkstyle:NoFinalizer")
+    @Override
+    protected void finalize() throws Throwable {
+      super.finalize();
+      if (!closed) {
+        close(); // releasing resources is more important than printing the 
warning
+        String trace =
+            Joiner.on("\n\t").join(Arrays.copyOfRange(createStack, 1, 
createStack.length));
+        LOG.warn("Unclosed input stream created by:\n\t{}", trace);
+      }
+    }
+  }
+
+  private static class NullFileSystem extends FileSystem {
+
+    @Override
+    public URI getUri() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream create(
+        Path f,
+        FsPermission permission,
+        boolean overwrite,
+        int bufferSize,
+        short replication,
+        long blockSize,
+        Progressable progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize, Progressable 
progress)
+        throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException, 
IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void setWorkingDirectory(Path new_dir) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  static class InputFileSystem extends NullFileSystem {
+    private final InputFile inputFile;
+    private final Path inputPath;
+
+    InputFileSystem(InputFile inputFile) {
+      this.inputFile = inputFile;
+      this.inputPath = new Path(inputFile.location());
+    }
+
+    @Override
+    public FSDataInputStream open(Path f) throws IOException {
+      return open(f, 0);
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+      Preconditions.checkArgument(
+          f.equals(inputPath), String.format("Input %s does not equal expected 
%s", f, inputPath));
+      return new FSDataInputStream(new 
WrappedSeekableInputStream(inputFile.newStream()));
+    }
+  }
+
+  static class OutputFileSystem extends NullFileSystem {
+    private final OutputFile outputFile;
+    private final Path outPath;
+
+    OutputFileSystem(OutputFile outputFile) {
+      this.outputFile = outputFile;
+      this.outPath = new Path(outputFile.location());
+    }
+
+    @Override
+    public FSDataOutputStream create(Path f) throws IOException {
+      return create(f, null, true, 0, (short) 0, 0, null);

Review Comment:
   Agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to