pvillard31 commented on code in PR #11052: URL: https://github.com/apache/nifi/pull/11052#discussion_r3005922418
########## nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageFileIO.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.gcs; + +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serial; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Collections; +import java.util.Map; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import static java.net.HttpURLConnection.HTTP_NO_CONTENT; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.OAUTH2_TOKEN; + +/** + * Google Cloud Storage implementation of Iceberg FileIO using Java HttpClient for REST API operations + */ +class GoogleCloudStorageFileIO implements FileIO { + + @Serial + private static final long serialVersionUID = 1L; + + private static final Logger logger = LoggerFactory.getLogger(GoogleCloudStorageFileIO.class); + + private Map<String, String> properties = Collections.emptyMap(); + + private transient GoogleCloudStorageProperties storageProperties; + private transient HttpClientProvider httpClientProvider; + + /** + * Initialize FileIO with standard properties defined in Apache Iceberg GCPProperties class + * + * @param properties Properties defined according to Iceberg GCPProperties + */ + @Override + public void initialize(final Map<String, String> properties) { + this.properties = Map.copyOf(properties); + this.storageProperties = new GoogleCloudStorageProperties(properties); + + final String bearerToken = properties.get(OAUTH2_TOKEN.getProperty()); + this.httpClientProvider = new HttpClientProvider(bearerToken); + } + + /** + * Create Iceberg Input File with unspecified length + * + * @param path Input File Path + * @return Input File with unspecified length + */ + @Override + public InputFile newInputFile(final String path) { + return new GoogleCloudStorageInputFile(httpClientProvider, storageProperties, GoogleCloudStorageLocation.parse(path), path, null); + } + + /** + * Create Iceberg Input File with length specified + * + * @param path Input File Path + * @param length Input File Length in bytes + * @return Input File with length specified + */ + @Override + public InputFile newInputFile(final String path, final long length) { + return new GoogleCloudStorageInputFile(httpClientProvider, storageProperties, GoogleCloudStorageLocation.parse(path), path, length); + } + + /** + * Create Iceberg Output File + * + * @param path Output File Path + * @return Output File + */ + @Override + public OutputFile newOutputFile(final String path) { + return new GoogleCloudStorageOutputFile(httpClientProvider, storageProperties, GoogleCloudStorageLocation.parse(path), path); + } + + /** + * Delete File at specified location + * + * @param path Location of file to be deleted + */ + @Override + public void deleteFile(final String path) { + final GoogleCloudStorageLocation location = GoogleCloudStorageLocation.parse(path); + final String uri = storageProperties.metadataUri(location); + final HttpRequest request = httpClientProvider.newRequestBuilder(uri).DELETE().build(); + try { + final HttpResponse<String> response = httpClientProvider.send(request, HttpResponse.BodyHandlers.ofString()); + final int statusCode = response.statusCode(); + + if (HTTP_NO_CONTENT == statusCode || HTTP_NOT_FOUND == statusCode) { + logger.debug("Delete File [{}] completed: HTTP {}", path, statusCode); + } else { + final String responseBody = response.body(); + throw new HttpResponseException("Delete File [%s] failed: HTTP %d [%s]".formatted(path, statusCode, responseBody)); + } + } catch (final IOException e) { + throw new HttpRequestException("Delete File [%s] failed".formatted(path), e); + } + } + + /** + * Get current configuration properties + * + * @return Configuration properties + */ + @Override + public Map<String, String> properties() { + return properties; + } + + /** + * Close client resources + */ + @Override + public void close() { + httpClientProvider.close(); Review Comment: Should we have a null check here? (if close() is called before initialize() is re-invoked) ########## nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageProperties.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.gcs; + +import java.util.Map; +import java.util.Objects; + +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.DECRYPTION_KEY; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.ENCRYPTION_KEY; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.READ_CHUNK_SIZE_BYTES; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.SERVICE_HOST; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.USER_PROJECT; +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageProperty.WRITE_CHUNK_SIZE_BYTES; + +/** + * Google Cloud Storage Properties encapsulating value parsing and URI resolution + */ +class GoogleCloudStorageProperties { + + static final String DEFAULT_SERVICE_HOST = "https://storage.googleapis.com"; + static final int DEFAULT_WRITE_CHUNK_SIZE = 8 * 1024 * 1024; + static final int DEFAULT_READ_CHUNK_SIZE = 2 * 1024 * 1024; + static final int MINIMUM_CHUNK_SIZE = 256 * 1024; Review Comment: We never seem to be enforcing this compared to what we get with WRITE_CHUNK_SIZE_BYTES. I understand that if the catalog is well configured that should not be an issue but thought I'd flag. ########## nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/HttpClientProvider.java: ########## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.gcs; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; + +import static org.apache.nifi.services.iceberg.gcs.GoogleCloudStorageHeader.AUTHORIZATION; + +/** + * HTTP Client Provider encapsulates request and response methods with a configured HTTP Client and standard settings + */ +class HttpClientProvider { + + private static final String BEARER_FORMAT = "Bearer %s"; + + /** Connect Timeout based on default setting from Google Cloud Storage library */ + private static final Duration CONNECT_TIMEOUT = Duration.ofSeconds(20); + + /** Request Timeout provides failsafe behavior */ + static final Duration REQUEST_TIMEOUT = Duration.ofMinutes(5); + + private final String bearerToken; + private final HttpClient httpClient; + + HttpClientProvider(final String bearerToken) { + this.bearerToken = bearerToken; + this.httpClient = HttpClient.newBuilder() + .connectTimeout(CONNECT_TIMEOUT) + .build(); + } + + <T> HttpResponse<T> send(final HttpRequest request, final HttpResponse.BodyHandler<T> bodyHandler) throws IOException { + try { + return httpClient.send(request, bodyHandler); + } catch (final InterruptedException e) { Review Comment: Should we implement retry with backoff for transient failures (429, 500, 503, ...)? ########## nifi-extension-bundles/nifi-iceberg-bundle/nifi-iceberg-gcs/src/main/java/org/apache/nifi/services/iceberg/gcs/GoogleCloudStorageInputFile.java: ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.services.iceberg.gcs; + +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.SeekableInputStream; + +import java.io.IOException; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static java.net.HttpURLConnection.HTTP_OK; + +/** + * Google Cloud Storage implementation of Apache Iceberg InputFile using HttpClient for REST operations + */ +class GoogleCloudStorageInputFile implements InputFile { + + private static final String QUESTION_MARK = "?"; + private static final char AMPERSAND = '&'; + private static final String FIELDS_SIZE_QUERY = "fields=size"; + + private static final Pattern SIZE_PATTERN = Pattern.compile("\"size\"\\s*:\\s*\"(\\d+)\""); + private static final int SIZE_GROUP = 1; + + private final HttpClientProvider httpClientProvider; + + private final GoogleCloudStorageProperties storageProperties; + + private final GoogleCloudStorageLocation location; + + private final String path; + + private volatile Long cachedLength; + + GoogleCloudStorageInputFile( + final HttpClientProvider httpClientProvider, + final GoogleCloudStorageProperties storageProperties, + final GoogleCloudStorageLocation location, + final String path, + final Long cachedLength + ) { + this.httpClientProvider = httpClientProvider; + this.storageProperties = storageProperties; + this.location = location; + this.path = path; + this.cachedLength = cachedLength; + } + + /** + * Get Input File length in bytes and fetch remote object size when length is not cached + * + * @return Input File length in bytes + */ + @Override + public long getLength() { + if (cachedLength == null) { + cachedLength = getObjectSize(); + } + return cachedLength; + } + + /** + * Create Seekable InputStream for InputFile reading + * + * @return Seekable InputStream + */ + @Override + public SeekableInputStream newStream() { + return new GoogleCloudStorageSeekableInputStream(httpClientProvider, storageProperties, location, cachedLength); + } + + /** + * Get InputFile Location + * + * @return InputFile Location + */ + @Override + public String location() { + return path; + } + + /** + * Get status of InputFile existence based on Metadata URI + * + * @return InputFile existence status + */ + @Override + public boolean exists() { + final String uri = getMetadataUri(); + final HttpRequest request = httpClientProvider.newRequestBuilder(uri).GET().build(); + + try { + final HttpResponse<Void> response = httpClientProvider.send(request, HttpResponse.BodyHandlers.discarding()); + return response.statusCode() == HTTP_OK; Review Comment: Isn't it risky here to only check for HTTP_OK? what if you get something like error 500 or error 429 and assume that the file does not exist and you then overwrite the file (I mean OutputFile.create() path, which calls toInputFile().exists() to guard against overwriting an existing file). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
