jackjlli commented on a change in pull request #6841:
URL: https://github.com/apache/incubator-pinot/pull/6841#discussion_r620713631



##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
##########
@@ -108,7 +108,16 @@ public boolean move(URI srcUri, URI dstUri, boolean 
overwrite)
       // ensures the parent path of dst exists.
       try {
         Path parentPath = Paths.get(dstUri.getPath()).getParent();
-        URI parentUri = new URI(dstUri.getScheme(), dstUri.getHost(), 
parentPath.toString(), null);
+        /**
+         * Use authority instead of host if the value contains "_": 
uri.getHost() will be null
+         * This is related to 
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6587184
+         * @see java.net.URI.Parser#parseHostname
+         */
+        String host = dstUri.getHost();
+        if (host == null) {
+          host = dstUri.getAuthority();
+        }
+        URI parentUri = new URI(dstUri.getScheme(), host, 
parentPath.toString(), null, null);

Review comment:
       It seems you're using the constructor with 5 parameters while the 
current one only has 4 params. It'd be good to clarify that.

##########
File path: 
pinot-plugins/pinot-file-system/pinot-gcs/src/test/java/org/apache/pinot/plugin/filesystem/TestGcsUri.java
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import java.net.URI;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.plugin.filesystem.GcsUri.SCHEME;
+import static org.apache.pinot.plugin.filesystem.GcsUri.createGcsUri;
+import static org.testng.Assert.assertEquals;
+
+public class TestGcsUri {
+    @Test
+    public void testDifferentScheme() {
+        URI uri = URI.create("file://bucket/file");
+        GcsUri gcsUri = new GcsUri(uri);
+        assertEquals(gcsUri.getUri().getScheme(), SCHEME);
+    }
+
+    @Test
+    public void testNonAbsolutePath() {
+        // Relative path must be normalized to absolute path for gcs uri
+        // This is because the URI must have an absolute path component,
+        // ex. new URI("gs", "bucket",
+        GcsUri gcsUri = createGcsUri("bucket", "dir/file");
+        assertEquals(gcsUri, createGcsUri("bucket", "/dir/file"));
+    }
+
+    @Test
+    public void testUnderScoreBucketName() {
+        // This is why getAuthority is used instead of getHostName()
+        // see https://cloud.google.com/storage/docs/naming-buckets
+        // gcs allows _'s which would cause URI.getHost() to be null:
+        GcsUri gcsUri = new GcsUri(URI.create("gs://bucket_name/dir"));
+        assertEquals(gcsUri.getBucketName(), "bucket_name");
+    }
+
+    @Test
+    public void testRelativize() {

Review comment:
       It'd be good to add a test for the case when relativize returns false.

##########
File path: 
pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsPinotFS.java
##########
@@ -34,104 +35,211 @@
 import java.io.InputStream;
 import java.io.UncheckedIOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.SeekableByteChannel;
 import java.nio.file.Files;
-import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
-import static java.util.Objects.requireNonNull;
-import static joptsimple.internal.Strings.isNullOrEmpty;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.lang.String.format;
+import static org.apache.pinot.plugin.filesystem.GcsUri.createGcsUri;
 
-public class GcsPinotFS  extends PinotFS {
+public class GcsPinotFS extends PinotFS {
   public static final String PROJECT_ID = "projectId";
   public static final String GCP_KEY = "gcpKey";
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GcsPinotFS.class);
-  private static final String SCHEME = "gs";
-  private static final String DELIMITER = "/";
   private static final int BUFFER_SIZE = 128 * 1024;
-  private Storage storage;
+  // See https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch
+  private static final int BATCH_LIMIT = 100;
+  private Storage _storage;
 
   @Override
   public void init(PinotConfiguration config) {
-    Credentials credentials;
+    LOGGER.info("Configs are: {}, {}", PROJECT_ID, 
config.getProperty(PROJECT_ID));
+    _storage = initializeStorage(config);
+  }
 
+  private Storage initializeStorage(PinotConfiguration config) {
+    checkArgument(!isNullOrEmpty(config.getProperty(PROJECT_ID)));
+    checkArgument(!isNullOrEmpty(config.getProperty(GCP_KEY)));
+    String projectId = config.getProperty(PROJECT_ID);
+    String gcpKey = config.getProperty(GCP_KEY);
     try {
-      StorageOptions.Builder storageBuilder = StorageOptions.newBuilder();
-      if (!isNullOrEmpty(config.getProperty(PROJECT_ID)) && 
!isNullOrEmpty(config.getProperty(GCP_KEY))) {
-        LOGGER.info("Configs are: {}, {}", PROJECT_ID, 
config.getProperty(PROJECT_ID));
-        String projectId = config.getProperty(PROJECT_ID);
-        String gcpKey = config.getProperty(GCP_KEY);
-        storageBuilder.setProjectId(projectId);
-        credentials = 
GoogleCredentials.fromStream(Files.newInputStream(Paths.get(gcpKey)));
-      } else {
-        LOGGER.info("Configs using default credential");
-        credentials = GoogleCredentials.getApplicationDefault();
-      }
-      storage = 
storageBuilder.setCredentials(credentials).build().getService();
+      return StorageOptions.newBuilder()
+          .setProjectId(projectId)
+          
.setCredentials(GoogleCredentials.fromStream(Files.newInputStream(Paths.get(gcpKey))))
+          .build()
+          .getService();
     } catch (IOException e) {
       throw new UncheckedIOException(e);
     }
   }
 
-  private Bucket getBucket(URI uri) {
-    return storage.get(uri.getHost());
-  }
-
-  private Blob getBlob(URI uri) throws IOException {
+  @Override
+  public final boolean mkdir(URI uri)
+      throws IOException {
+    LOGGER.info("mkdir {}", uri);
     try {
-      URI base = getBase(uri);
-      String path = sanitizePath(base.relativize(uri).getPath());
-      return getBucket(uri).get(path);
-    } catch (StorageException e) {
+      GcsUri gcsUri = new GcsUri(uri);
+      // Prefix always returns a path with trailing /
+      String directoryPath = gcsUri.getPrefix();
+      // Do not create a bucket, different permissions are required
+      // The bucket should already exist
+      if (directoryPath.equals(GcsUri.DELIMITER)) {
+        return true;
+      }
+      if (existsDirectory(gcsUri)) {
+        return true;
+      }
+      Blob blob = getBucket(gcsUri).create(directoryPath, new byte[0]);
+      return blob.exists();
+    } catch (Exception e) {
       throw new IOException(e);
     }
   }
-  private boolean isPathTerminatedByDelimiter(URI uri) {
-    return uri.getPath().endsWith(DELIMITER);
+
+  @Override
+  public boolean delete(URI segmentUri, boolean forceDelete)
+      throws IOException {
+    LOGGER.info("Deleting uri {} force {}", segmentUri, forceDelete);
+    return delete(new GcsUri(segmentUri), forceDelete);
+  }
+
+  @Override
+  public boolean doMove(URI srcUri, URI dstUri)
+      throws IOException {
+    GcsUri srcGcsUri = new GcsUri(srcUri);
+    GcsUri dstGcsUri = new GcsUri(dstUri);
+    if (copy(srcGcsUri, dstGcsUri)) {
+      // Only delete if all files were successfully moved
+      return delete(srcGcsUri, true);
+    }
+    return false;
+  }
+
+  @Override
+  public boolean copy(URI srcUri, URI dstUri)
+      throws IOException {
+    LOGGER.info("Copying uri {} to uri {}", srcUri, dstUri);
+    return copy(new GcsUri(srcUri), new GcsUri(dstUri));
+  }
+
+  @Override
+  public boolean exists(URI fileUri)
+      throws IOException {
+    if (fileUri == null) {
+      return false;
+    }
+    return exists(new GcsUri(fileUri));
   }
 
-  private String normalizeToDirectoryPrefix(URI uri) throws IOException {
-    requireNonNull(uri, "uri is null");
-    URI strippedUri = getBase(uri).relativize(uri);
-    if (isPathTerminatedByDelimiter(strippedUri)) {
-      return sanitizePath(strippedUri.getPath());
+  @Override
+  public long length(URI fileUri)
+      throws IOException {
+    try {
+      GcsUri gcsUri = new GcsUri(fileUri);
+      checkState(!isPathTerminatedByDelimiter(gcsUri), "URI is a directory");
+      Blob blob = getBlob(gcsUri);
+      checkState(existsBlob(blob), "File '%s' does not exist", fileUri);
+      return blob.getSize();
+    } catch (Exception t) {
+      throw new IOException(t);
     }
-    return sanitizePath(strippedUri.getPath() + DELIMITER);
   }
 
-  private URI normalizeToDirectoryUri(URI uri) throws IOException {
-    if (isPathTerminatedByDelimiter(uri)) {
-      return uri;
+  @Override
+  public String[] listFiles(URI fileUri, boolean recursive)
+      throws IOException {
+    return listFiles(new GcsUri(fileUri), recursive);
+  }
+
+  @Override
+  public void copyToLocalFile(URI srcUri, File dstFile)
+      throws Exception {
+    LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath());
+    checkState(!dstFile.isDirectory(), "File '%s' must not be a directory", 
dstFile);
+    FileUtils.forceMkdir(dstFile.getParentFile());
+    Blob blob = getBlob(new GcsUri(srcUri));
+    checkState(existsBlob(blob), "File '%s' does not exists", srcUri);
+    blob.downloadTo(dstFile.toPath());
+  }
+
+  @Override
+  public void copyFromLocalFile(File srcFile, URI dstUri)
+      throws Exception {
+    LOGGER.info("Copying file {} to uri {}", srcFile.getAbsolutePath(), 
dstUri);
+    GcsUri dstGcsUri = new GcsUri(dstUri);
+    checkState(!isPathTerminatedByDelimiter(dstGcsUri), "Path '%s' must be a 
filename", dstGcsUri);
+    Blob blob = getBucket(dstGcsUri).create(dstGcsUri.getPath(), new byte[0]);
+    WriteChannel writeChannel = blob.writer();
+    writeChannel.setChunkSize(BUFFER_SIZE);
+    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+    SeekableByteChannel channel = Files.newByteChannel(srcFile.toPath());
+    for (int bytesRead = channel.read(buffer); bytesRead != -1; bytesRead = 
channel.read(buffer)) {
+      buffer.flip();
+      writeChannel.write(buffer);
+      buffer.clear();
     }
+    writeChannel.close();
+  }
+
+  @Override
+  public boolean isDirectory(URI uri)
+      throws IOException {
+    return existsDirectory(new GcsUri(uri));
+  }
+
+  @Override
+  public long lastModified(URI uri)
+      throws IOException {
+    return getBlob(new GcsUri(uri)).getUpdateTime();
+  }
+
+  @Override
+  public boolean touch(URI uri)
+      throws IOException {
     try {
-      return new URI(uri.getScheme(), uri.getHost(), 
sanitizePath(uri.getPath() + DELIMITER), null);
-    } catch (URISyntaxException e) {
+      GcsUri gcsUri = new GcsUri(uri);

Review comment:
       Adding one more log here since touch is a write operation.

##########
File path: 
pinot-plugins/pinot-file-system/pinot-gcs/src/main/java/org/apache/pinot/plugin/filesystem/GcsUri.java
##########
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.filesystem;
+
+import com.google.common.base.Supplier;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.base.Suppliers.memoize;
+import static java.util.Objects.requireNonNull;
+
+
+public class GcsUri {
+  public static final String SCHEME = "gs";
+  public static final String DELIMITER = "/";
+
+  private final URI uri;
+  private final Supplier<String> path;

Review comment:
       It'd be good to use _path here in order to keep the same convention as 
other classes. Same for the next following two variables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to