This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new ced253b CAMEL-15873: Upgrade google-cloud-bigquery dependencies ced253b is described below commit ced253b90a19b1c83558d2c398943a0eec991d05 Author: James Netherton <jamesnether...@gmail.com> AuthorDate: Wed Nov 25 09:16:10 2020 +0000 CAMEL-15873: Upgrade google-cloud-bigquery dependencies --- camel-dependencies/pom.xml | 3 +- .../catalog/docs/google-bigquery-component.adoc | 11 +- .../docs/google-bigquery-sql-component.adoc | 21 +-- components/camel-google-bigquery/pom.xml | 33 +++-- .../src/main/docs/google-bigquery-component.adoc | 11 +- .../main/docs/google-bigquery-sql-component.adoc | 21 +-- .../bigquery/GoogleBigQueryConnectionFactory.java | 159 ++++----------------- .../google/bigquery/GoogleBigQueryConstants.java | 1 + .../google/bigquery/GoogleBigQueryEndpoint.java | 4 +- .../google/bigquery/GoogleBigQueryProducer.java | 45 +++--- .../bigquery/sql/GoogleBigQuerySQLEndpoint.java | 4 +- .../bigquery/sql/GoogleBigQuerySQLProducer.java | 86 ++++++----- .../bigquery/integration/BigQueryTestSupport.java | 84 +++++------ .../google/bigquery/unit/BaseBigQueryTest.java | 17 +-- .../bigquery/unit/GoogleBigQueryProducerTest.java | 33 +++-- .../unit/sql/GoogleBigQuerySQLComponentTest.java | 6 +- .../sql/GoogleBigQuerySQLProducerBaseTest.java | 22 ++- .../GoogleBigQuerySQLProducerWithParamersTest.java | 82 ++++++++--- .../GoogleBigQuerySQLProducerWithPatternTest.java | 17 +-- .../src/test/resources/schema/simple-table.json | 6 - .../ROOT/pages/google-bigquery-component.adoc | 11 +- .../ROOT/pages/google-bigquery-sql-component.adoc | 21 +-- .../ROOT/pages/camel-3x-upgrade-guide-3_7.adoc | 8 ++ parent/pom.xml | 3 +- 24 files changed, 323 insertions(+), 386 deletions(-) diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index 6bb50ab..ae78b48 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -225,12 +225,13 @@ <gmetric4j-version>1.0.10</gmetric4j-version> <google-api-client-version>1.22.0</google-api-client-version> <google-api-common-version>1.8.1</google-api-common-version> - <google-api-services-bigquery-version>v2-rev352-1.22.0</google-api-services-bigquery-version> <google-api-services-calendar-version>v3-rev291-1.22.0</google-api-services-calendar-version> <google-api-services-drive-version>v2-rev297-1.22.0</google-api-services-drive-version> <google-api-services-mail-version>v1-rev81-1.22.0</google-api-services-mail-version> <google-api-services-sheets-version>v4-rev551-1.22.0</google-api-services-sheets-version> <google-auto-value-version>1.7</google-auto-value-version> + <google-cloud-bom-version>16.1.0</google-cloud-bom-version> + <google-cloud-guava-version>30.0-jre</google-cloud-guava-version> <google-cloud-pubsub-version>1.105.0</google-cloud-pubsub-version> <google-errorprone-version>2.3.3</google-errorprone-version> <google-findbugs-annotations2-version>2.0.3</google-findbugs-annotations2-version> diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-component.adoc index cb7cc1b..4036d54 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-component.adoc @@ -43,20 +43,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-sql-component.adoc b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-sql-component.adoc index afb2732..f0177a5 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-sql-component.adoc +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/google-bigquery-sql-component.adoc @@ -42,20 +42,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] @@ -132,7 +125,15 @@ with the following path and query parameters: // endpoint options: END -== Ouput Message Headers +== Message Headers + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Name |Type |Description +|`CamelGoogleBigQueryJobId` |`JobId` |A custom `JobId` to use +|======================================================================= + +== Output Message Headers [width="100%",cols="10%,10%,80%",options="header",] |======================================================================= diff --git a/components/camel-google-bigquery/pom.xml b/components/camel-google-bigquery/pom.xml index 3250741..c6a62b1 100644 --- a/components/camel-google-bigquery/pom.xml +++ b/components/camel-google-bigquery/pom.xml @@ -35,23 +35,36 @@ <properties> </properties> + <dependencyManagement> + <dependencies> + <!-- Override the android JDK 7 guava in libraries-bom --> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${google-cloud-guava-version}</version> + </dependency> + <dependency> + <groupId>com.google.cloud</groupId> + <artifactId>libraries-bom</artifactId> + <version>${google-cloud-bom-version}</version> + <type>pom</type> + <scope>import</scope> + </dependency> + </dependencies> + </dependencyManagement> + <dependencies> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-support</artifactId> </dependency> <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </dependency> - <dependency> - <groupId>com.google.apis</groupId> - <artifactId>google-api-services-bigquery</artifactId> - <version>${google-api-services-bigquery-version}</version> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> + <groupId>com.google.cloud</groupId> + <artifactId>google-cloud-bigquery</artifactId> </dependency> <dependency> @@ -83,7 +96,7 @@ <forkedProcessTimeoutInSeconds>300</forkedProcessTimeoutInSeconds> <includes> <!-- Here we only run test of --> - <include>**/unit/*.java</include> + <include>**/unit/**/*.java</include> </includes> </configuration> </plugin> diff --git a/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc b/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc index cb7cc1b..4036d54 100644 --- a/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc +++ b/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc @@ -43,20 +43,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] diff --git a/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc b/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc index afb2732..f0177a5 100644 --- a/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc +++ b/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc @@ -42,20 +42,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] @@ -132,7 +125,15 @@ with the following path and query parameters: // endpoint options: END -== Ouput Message Headers +== Message Headers + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Name |Type |Description +|`CamelGoogleBigQueryJobId` |`JobId` |A custom `JobId` to use +|======================================================================= + +== Output Message Headers [width="100%",cols="10%,10%,80%",options="header",] |======================================================================= diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java index 30f710d..ee9f7df 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConnectionFactory.java @@ -18,178 +18,85 @@ package org.apache.camel.component.google.bigquery; import java.io.FileInputStream; import java.io.InputStream; -import java.security.KeyFactory; -import java.security.PrivateKey; -import java.security.spec.PKCS8EncodedKeySpec; import java.util.Collection; import java.util.Collections; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.apache.ApacheHttpTransport; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.util.Base64; import com.google.api.client.util.Strings; -import com.google.api.services.bigquery.Bigquery; import com.google.api.services.bigquery.BigqueryScopes; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class GoogleBigQueryConnectionFactory { - private static final JsonFactory JSON_FACTORY = new JacksonFactory(); - private final Logger logger = LoggerFactory.getLogger(GoogleBigQueryConnectionFactory.class); - private String serviceAccount; - private String serviceAccountKey; private String credentialsFileLocation; - private String serviceURL; - private Bigquery client; + private BigQuery client; public GoogleBigQueryConnectionFactory() { } - public synchronized Bigquery getDefaultClient() throws Exception { + public GoogleBigQueryConnectionFactory(BigQuery client) { + this.client = client; + } + + public synchronized BigQuery getDefaultClient() throws Exception { if (this.client == null) { this.client = buildClient(); } return this.client; } - public Bigquery getMultiThreadClient(int parallelThreads) throws Exception { - - PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); - cm.setDefaultMaxPerRoute(parallelThreads); - cm.setMaxTotal(parallelThreads); - CloseableHttpClient httpClient = HttpClients.createMinimal(cm); - - return buildClient(new ApacheHttpTransport(httpClient)); - } - - private Bigquery buildClient() throws Exception { - return buildClient(GoogleNetHttpTransport.newTrustedTransport()); - } - - private Bigquery buildClient(HttpTransport httpTransport) throws Exception { - - GoogleCredential credential = null; + private BigQuery buildClient() throws Exception { - if (!Strings.isNullOrEmpty(serviceAccount) && !Strings.isNullOrEmpty(serviceAccountKey)) { - logger.debug("Service Account and Key have been set explicitly. Initialising BigQuery using Service Account {}", - serviceAccount); + GoogleCredentials credentials = null; - credential = createFromAccountKeyPair(httpTransport); - } - - if (credential == null && !Strings.isNullOrEmpty(credentialsFileLocation)) { + if (credentials == null && !Strings.isNullOrEmpty(credentialsFileLocation)) { logger.debug("Key File Name has been set explicitly. Initialising BigQuery using Key File {}", credentialsFileLocation); - credential = createFromFile(); + credentials = createFromFile(); } - if (credential == null) { + if (credentials == null) { logger.debug( "No explicit Service Account or Key File Name have been provided. Initialising BigQuery using defaults"); - credential = createDefault(); + credentials = createDefault(); } - Bigquery.Builder builder = new Bigquery.Builder(httpTransport, JSON_FACTORY, credential) - .setApplicationName("camel-google-bigquery"); + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() + .setCredentials(credentials); - // Local emulator, SOCKS proxy, etc. - if (serviceURL != null) { - builder.setRootUrl(serviceURL); - } - - return builder.build(); + return builder.build().getService(); } - private GoogleCredential createFromFile() throws Exception { + private GoogleCredentials createFromFile() throws Exception { try (InputStream is = new FileInputStream(credentialsFileLocation)) { - GoogleCredential credential = GoogleCredential.fromStream(is); + GoogleCredentials credentials = GoogleCredentials.fromStream(is); - if (credential.createScopedRequired()) { - credential = credential.createScoped(BigqueryScopes.all()); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(BigqueryScopes.all()); } - return credential; + return credentials; } } - private GoogleCredential createDefault() throws Exception { - GoogleCredential credential = GoogleCredential.getApplicationDefault(); + private GoogleCredentials createDefault() throws Exception { + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); Collection<String> scopes = Collections.singletonList(BigqueryScopes.BIGQUERY); - if (credential.createScopedRequired()) { - credential = credential.createScoped(scopes); - } - - return credential; - } - - private GoogleCredential createFromAccountKeyPair(HttpTransport httpTransport) { - try { - return new GoogleCredential.Builder() - .setTransport(httpTransport) - .setJsonFactory(JSON_FACTORY) - .setServiceAccountId(serviceAccount) - .setServiceAccountScopes(BigqueryScopes.all()) - .setServiceAccountPrivateKey(getPrivateKeyFromString(serviceAccountKey)) - .build(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - private PrivateKey getPrivateKeyFromString(String serviceKeyPem) { - PrivateKey privateKey; - try { - String privKeyPEM = serviceKeyPem.replace("-----BEGIN PRIVATE KEY-----", "") - .replace("-----END PRIVATE KEY-----", "") - .replace("\r", "") - .replace("\n", ""); - - byte[] encoded = Base64.decodeBase64(privKeyPEM); - - PKCS8EncodedKeySpec keySpec = new PKCS8EncodedKeySpec(encoded); - privateKey = KeyFactory.getInstance("RSA") - .generatePrivate(keySpec); - } catch (Exception e) { - String error = "Constructing Private Key from PEM string failed: " + e.getMessage(); - logger.error(error, e); - throw new RuntimeException(e); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(scopes); } - return privateKey; - } - - public String getServiceAccount() { - return serviceAccount; - } - public GoogleBigQueryConnectionFactory setServiceAccount(String serviceAccount) { - this.serviceAccount = serviceAccount; - resetClient(); - return this; - } - - public String getServiceAccountKey() { - return serviceAccountKey; - } - - public GoogleBigQueryConnectionFactory setServiceAccountKey(String serviceAccountKey) { - this.serviceAccountKey = serviceAccountKey; - resetClient(); - return this; + return credentials; } public String getCredentialsFileLocation() { @@ -202,16 +109,6 @@ public class GoogleBigQueryConnectionFactory { return this; } - public String getServiceURL() { - return serviceURL; - } - - public GoogleBigQueryConnectionFactory setServiceURL(String serviceURL) { - this.serviceURL = serviceURL; - resetClient(); - return this; - } - private synchronized void resetClient() { this.client = null; } diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java index 2deece2..dc33212 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java @@ -22,6 +22,7 @@ public final class GoogleBigQueryConstants { public static final String INSERT_ID = "CamelGoogleBigQueryInsertId"; public static final String PARTITION_DECORATOR = "CamelGoogleBigQueryPartitionDecorator"; public static final String TRANSLATED_QUERY = "CamelGoogleBigQueryTranslatedQuery"; + public static final String JOB_ID = "CamelGoogleBigQueryJobId"; /** * Prevent instantiation. diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java index d4acafb..6f527d9 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryEndpoint.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.google.bigquery; -import com.google.api.services.bigquery.Bigquery; +import com.google.cloud.bigquery.BigQuery; import org.apache.camel.Category; import org.apache.camel.Consumer; import org.apache.camel.Processor; @@ -53,7 +53,7 @@ public class GoogleBigQueryEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - Bigquery bigquery = getConfiguration().getConnectionFactory().getDefaultClient(); + BigQuery bigquery = getConfiguration().getConnectionFactory().getDefaultClient(); GoogleBigQueryProducer producer = new GoogleBigQueryProducer(bigquery, this, configuration); return producer; } diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java index d377889..8438008 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryProducer.java @@ -21,12 +21,12 @@ import java.util.List; import java.util.Map; import com.google.api.client.util.Strings; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.TableDataInsertAllRequest; -import com.google.api.services.bigquery.model.TableDataInsertAllResponse; -import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllResponse; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultProducer; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +38,9 @@ public class GoogleBigQueryProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(GoogleBigQueryProducer.class); private final GoogleBigQueryConfiguration configuration; - private Bigquery bigquery; + private BigQuery bigquery; - public GoogleBigQueryProducer(Bigquery bigquery, GoogleBigQueryEndpoint endpoint, + public GoogleBigQueryProducer(BigQuery bigquery, GoogleBigQueryEndpoint endpoint, GoogleBigQueryConfiguration configuration) { super(endpoint); this.bigquery = bigquery; @@ -124,7 +124,7 @@ public class GoogleBigQueryProducer extends DefaultProducer { ? tableId : (tableId + "$" + partitionDecorator); - List<TableDataInsertAllRequest.Rows> apiRequestRows = new ArrayList<>(); + List<InsertAllRequest.RowToInsert> apiRequestRows = new ArrayList<>(); for (Exchange ex : exchanges) { Object entryObject = ex.getIn().getBody(); if (entryObject instanceof List) { @@ -142,24 +142,21 @@ public class GoogleBigQueryProducer extends DefaultProducer { return 0; } - TableDataInsertAllRequest apiRequestData = new TableDataInsertAllRequest().setRows(apiRequestRows); + InsertAllRequest.Builder builder = InsertAllRequest.newBuilder(configuration.getDatasetId(), tableIdWithPartition) + .setRows(apiRequestRows); - Bigquery.Tabledata.InsertAll apiRequest = bigquery - .tabledata() - .insertAll(configuration.getProjectId(), - configuration.getDatasetId(), - tableIdWithPartition, - apiRequestData); - if (suffix != null) { - apiRequest.set("template_suffix", suffix); + if (ObjectHelper.isNotEmpty(suffix)) { + builder.setTemplateSuffix(suffix); } + InsertAllRequest insertAllRequest = builder.build(); + if (LOG.isTraceEnabled()) { LOG.trace("Sending {} messages to bigquery table {}, suffix {}, partition {}", apiRequestRows.size(), tableId, suffix, partitionDecorator); } - TableDataInsertAllResponse apiResponse = apiRequest.execute(); + InsertAllResponse apiResponse = bigquery.insertAll(insertAllRequest); if (apiResponse.getInsertErrors() != null && !apiResponse.getInsertErrors().isEmpty()) { throw new Exception("InsertAll into " + tableId + " failed: " + apiResponse.getInsertErrors()); @@ -172,12 +169,10 @@ public class GoogleBigQueryProducer extends DefaultProducer { if (LOG.isDebugEnabled()) { LOG.debug("uploader thread/id: {} / {} . api call completed.", Thread.currentThread().getId(), exchangeId); } - return apiRequestData.size(); + return insertAllRequest.getRows().size(); } - private TableDataInsertAllRequest.Rows createRowRequest(Exchange exchange, Map<String, Object> object) { - TableRow tableRow = new TableRow(); - tableRow.putAll(object); + private InsertAllRequest.RowToInsert createRowRequest(Exchange exchange, Map<String, Object> object) { String insertId = null; if (configuration.getUseAsInsertId() != null) { insertId = (String) (object.get(configuration.getUseAsInsertId())); @@ -186,10 +181,10 @@ public class GoogleBigQueryProducer extends DefaultProducer { insertId = exchange.getIn().getHeader(GoogleBigQueryConstants.INSERT_ID, String.class); } } - TableDataInsertAllRequest.Rows rows = new TableDataInsertAllRequest.Rows(); - rows.setInsertId(insertId); - rows.setJson(tableRow); - return rows; + if (insertId != null) { + return InsertAllRequest.RowToInsert.of(insertId, object); + } + return InsertAllRequest.RowToInsert.of(object); } @Override diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java index ab40d5e..bde60e3 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java @@ -16,7 +16,7 @@ */ package org.apache.camel.component.google.bigquery.sql; -import com.google.api.services.bigquery.Bigquery; +import com.google.cloud.bigquery.BigQuery; import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; @@ -51,7 +51,7 @@ public class GoogleBigQuerySQLEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - Bigquery bigquery = getConfiguration().getConnectionFactory().getDefaultClient(); + BigQuery bigquery = getConfiguration().getConnectionFactory().getDefaultClient(); GoogleBigQuerySQLProducer producer = new GoogleBigQuerySQLProducer(bigquery, this, configuration); return producer; } diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java index 26b4ce0..1690956 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java @@ -16,23 +16,25 @@ */ package org.apache.camel.component.google.bigquery.sql; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Set; - -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.QueryParameter; -import com.google.api.services.bigquery.model.QueryParameterType; -import com.google.api.services.bigquery.model.QueryParameterValue; -import com.google.api.services.bigquery.model.QueryRequest; -import com.google.api.services.bigquery.model.QueryResponse; +import java.util.UUID; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.JobException; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.TableResult; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.RuntimeExchangeException; import org.apache.camel.component.google.bigquery.GoogleBigQueryConstants; import org.apache.camel.support.DefaultProducer; +import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,11 +46,11 @@ public class GoogleBigQuerySQLProducer extends DefaultProducer { private static final Logger LOG = LoggerFactory.getLogger(GoogleBigQuerySQLProducer.class); private final GoogleBigQuerySQLConfiguration configuration; - private Bigquery bigquery; + private BigQuery bigquery; private String query; private Set<String> queryParameterNames; - public GoogleBigQuerySQLProducer(Bigquery bigquery, GoogleBigQuerySQLEndpoint endpoint, + public GoogleBigQuerySQLProducer(BigQuery bigquery, GoogleBigQuerySQLEndpoint endpoint, GoogleBigQuerySQLConfiguration configuration) { super(endpoint); this.bigquery = bigquery; @@ -69,33 +71,49 @@ public class GoogleBigQuerySQLProducer extends DefaultProducer { public void process(Exchange exchange) throws Exception { String translatedQuery = SqlHelper.translateQuery(query, exchange); Map<String, Object> queryParameters = extractParameters(exchange); - exchange.getMessage().setHeader(GoogleBigQueryConstants.TRANSLATED_QUERY, translatedQuery); - Long affectedRows = executeSQL(translatedQuery, queryParameters); + + Message message = exchange.getMessage(); + message.setHeader(GoogleBigQueryConstants.TRANSLATED_QUERY, translatedQuery); + JobId jobId = message.getHeader(GoogleBigQueryConstants.JOB_ID, JobId.class); + + Long affectedRows = executeSQL(jobId, translatedQuery, queryParameters); + LOG.debug("The query {} affected {} rows", query, affectedRows); - exchange.getMessage().setBody(affectedRows); + message.setBody(affectedRows); } - private Long executeSQL(String translatedQuery, Map<String, Object> queryParameters) throws Exception { - QueryRequest apiQueryRequest = new QueryRequest().setQuery(translatedQuery).setUseLegacySql(false); + private Long executeSQL(JobId jobId, String translatedQuery, Map<String, Object> queryParameters) throws Exception { + QueryJobConfiguration.Builder builder = QueryJobConfiguration.newBuilder(translatedQuery) + .setUseLegacySql(false); - Bigquery.Jobs.Query apiQuery = bigquery.jobs().query(configuration.getProjectId(), apiQueryRequest); + setQueryParameters(queryParameters, builder); - setQueryParameters(queryParameters, apiQueryRequest); + QueryJobConfiguration queryJobConfiguration = builder.build(); - if (LOG.isTraceEnabled()) { - LOG.trace("Sending query to bigquery standard sql: {}", translatedQuery); - } + try { + if (LOG.isTraceEnabled()) { + LOG.trace("Sending query to bigquery standard sql: {}", translatedQuery); + } - QueryResponse apiResponse = apiQuery.execute(); + JobId queryJobId; + if (ObjectHelper.isNotEmpty(jobId)) { + queryJobId = jobId; + } else { + queryJobId = JobId.of(configuration.getProjectId(), UUID.randomUUID().toString()); + } - if (apiResponse.getErrors() != null && !apiResponse.getErrors().isEmpty()) { - throw new Exception("Query " + translatedQuery + " failed: " + apiResponse.getErrors()); - } + TableResult result = bigquery.query(queryJobConfiguration, queryJobId); + + if (LOG.isTraceEnabled()) { + LOG.trace("Result of query {} is {}", translatedQuery, result.toString()); + } - if (LOG.isTraceEnabled()) { - LOG.trace("Result of query {} is {}", translatedQuery, apiResponse.toPrettyString()); + return result.getTotalRows(); + } catch (JobException e) { + throw new Exception("Query " + translatedQuery + " failed: " + e.getErrors(), e); + } catch (BigQueryException e) { + throw new Exception("Query " + translatedQuery + " failed: " + e.getError(), e); } - return apiResponse.getNumDmlAffectedRows(); } private Map<String, Object> extractParameters(Exchange exchange) { @@ -128,19 +146,15 @@ public class GoogleBigQuerySQLProducer extends DefaultProducer { return result; } - private void setQueryParameters(Map<String, Object> params, QueryRequest apiQueryRequest) { + private void setQueryParameters(Map<String, Object> params, QueryJobConfiguration.Builder builder) { if (params == null) { return; } - List<QueryParameter> list = new ArrayList<>(); params.forEach((key, value) -> { - QueryParameter param = new QueryParameter(); - param.setName(key).setParameterType(new QueryParameterType().setType("STRING")) - .setParameterValue(new QueryParameterValue().setValue(value.toString())); - list.add(param); + QueryParameterValue parameterValue = QueryParameterValue.of(value.toString(), StandardSQLTypeName.STRING); + builder.addNamedParameter(key, parameterValue); }); - apiQueryRequest.setQueryParameters(list); } @Override @@ -151,7 +165,7 @@ public class GoogleBigQuerySQLProducer extends DefaultProducer { @Override protected void doStart() throws Exception { super.doStart(); - String placeholder = ":#"; // TODO + String placeholder = ":#"; query = SqlHelper.resolveQuery(getEndpoint().getCamelContext(), configuration.getQuery(), placeholder); queryParameterNames = SqlHelper.extractParameterNames(query); } diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java index a52f2e0..0436148 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/BigQueryTestSupport.java @@ -17,24 +17,28 @@ package org.apache.camel.component.google.bigquery.integration; import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.services.bigquery.model.QueryRequest; -import com.google.api.services.bigquery.model.QueryResponse; -import com.google.api.services.bigquery.model.Table; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableReference; -import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.bigquery.TableResult; import org.apache.camel.BindToRegistry; import org.apache.camel.CamelContext; import org.apache.camel.component.google.bigquery.GoogleBigQueryComponent; import org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLComponent; import org.apache.camel.test.junit5.CamelTestSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +50,6 @@ public class BigQueryTestSupport extends CamelTestSupport { public static final String SERVICE_ACCOUNT; public static final String PROJECT_ID; public static final String DATASET_ID; - public static final String SERVICE_URL; public static final String CREDENTIALS_FILE_LOCATION; private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryTestSupport.class); @@ -59,7 +62,6 @@ public class BigQueryTestSupport extends CamelTestSupport { SERVICE_ACCOUNT = testProperties.getProperty("service.account"); PROJECT_ID = testProperties.getProperty("project.id"); DATASET_ID = testProperties.getProperty("bigquery.datasetId"); - SERVICE_URL = testProperties.getProperty("test.serviceURL"); CREDENTIALS_FILE_LOCATION = testProperties.getProperty("service.credentialsFileLocation"); } @@ -79,9 +81,7 @@ public class BigQueryTestSupport extends CamelTestSupport { protected void addBigqueryComponent(CamelContext context) { connectionFactory = new GoogleBigQueryConnectionFactory() - .setServiceAccount(SERVICE_ACCOUNT) - .setServiceAccountKey(SERVICE_KEY) - .setServiceURL(SERVICE_URL); + .setCredentialsFileLocation(CREDENTIALS_FILE_LOCATION); GoogleBigQueryComponent component = new GoogleBigQueryComponent(); component.setConnectionFactory(connectionFactory); @@ -90,10 +90,23 @@ public class BigQueryTestSupport extends CamelTestSupport { context.getPropertiesComponent().setLocation("ref:prop"); } + protected void addBigquerySqlComponent(CamelContext context) { + + connectionFactory = new GoogleBigQueryConnectionFactory() + .setCredentialsFileLocation(CREDENTIALS_FILE_LOCATION); + + GoogleBigQuerySQLComponent component = new GoogleBigQuerySQLComponent(); + component.setConnectionFactory(connectionFactory); + + context.addComponent("google-bigquery-sql", component); + context.getPropertiesComponent().setLocation("ref:prop"); + } + @Override protected CamelContext createCamelContext() throws Exception { CamelContext context = super.createCamelContext(); addBigqueryComponent(context); + addBigquerySqlComponent(context); return context; } @@ -107,35 +120,25 @@ public class BigQueryTestSupport extends CamelTestSupport { } protected void assertRowExist(String tableName, Map<String, String> row) throws Exception { - QueryRequest queryRequest = new QueryRequest(); String query = "SELECT * FROM " + DATASET_ID + "." + tableName + " WHERE " + row.entrySet().stream() .map(e -> e.getKey() + " = '" + e.getValue() + "'") .collect(Collectors.joining(" AND ")); LOGGER.debug("Query: {}", query); - queryRequest.setQuery(query); - QueryResponse queryResponse = getConnectionFactory() + QueryJobConfiguration queryJobConfiguration = QueryJobConfiguration.of(query); + TableResult tableResult = getConnectionFactory() .getDefaultClient() - .jobs() - .query(PROJECT_ID, queryRequest) - .execute(); - assertEquals(1, queryResponse.getRows().size()); + .query(queryJobConfiguration, JobId.of(PROJECT_ID, UUID.randomUUID().toString())); + assertEquals(1, tableResult.getTotalRows()); } protected void createBqTable(String tableId) throws Exception { - TableReference reference = new TableReference() - .setTableId(tableId) - .setDatasetId(DATASET_ID) - .setProjectId(PROJECT_ID); - InputStream in = this.getClass().getResourceAsStream("/schema/simple-table.json"); - TableSchema schema = readDefinition(in); - Table table = new Table() - .setTableReference(reference) - .setSchema(schema); + Schema schema = createSchema(); + TableId id = TableId.of(PROJECT_ID, DATASET_ID, tableId); + TableDefinition.Builder builder = StandardTableDefinition.newBuilder().setSchema(schema); + TableInfo tableInfo = TableInfo.of(id, builder.build()); try { - getConnectionFactory().getDefaultClient().tables() - .insert(PROJECT_ID, DATASET_ID, table) - .execute(); + getConnectionFactory().getDefaultClient().create(tableInfo); } catch (GoogleJsonResponseException e) { if (e.getDetails().getCode() == 409) { LOGGER.info("Table {} already exist"); @@ -145,14 +148,11 @@ public class BigQueryTestSupport extends CamelTestSupport { } } - private TableSchema readDefinition(InputStream schemaInputStream) throws Exception { - TableSchema schema = new TableSchema(); - - ObjectMapper mapper = new ObjectMapper(); - List<TableFieldSchema> fields = mapper.readValue(schemaInputStream, ArrayList.class); - - schema.setFields(fields); - - return schema; + private Schema createSchema() throws Exception { + FieldList fields = FieldList.of( + Field.of("id", StandardSQLTypeName.NUMERIC), + Field.of("col1", StandardSQLTypeName.STRING), + Field.of("col2", StandardSQLTypeName.STRING)); + return Schema.of(fields); } } diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/BaseBigQueryTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/BaseBigQueryTest.java index 6bfd7b6..29df5e5 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/BaseBigQueryTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/BaseBigQueryTest.java @@ -17,7 +17,8 @@ package org.apache.camel.component.google.bigquery.unit; import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.TableDataInsertAllResponse; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.InsertAllResponse; import org.apache.camel.component.google.bigquery.GoogleBigQueryConfiguration; import org.apache.camel.component.google.bigquery.GoogleBigQueryEndpoint; import org.apache.camel.component.google.bigquery.GoogleBigQueryProducer; @@ -25,20 +26,18 @@ import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.BeforeEach; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class BaseBigQueryTest extends CamelTestSupport { protected GoogleBigQueryEndpoint endpoint = mock(GoogleBigQueryEndpoint.class); - protected Bigquery.Tabledata.InsertAll mockInsertall = mock(Bigquery.Tabledata.InsertAll.class); protected GoogleBigQueryProducer producer; protected Bigquery.Tabledata tabledataMock; protected String tableId = "testTableId"; protected String datasetId = "testDatasetId"; protected String projectId = "testProjectId"; protected GoogleBigQueryConfiguration configuration = new GoogleBigQueryConfiguration(); - protected Bigquery bigquery; + protected BigQuery bigquery; @BeforeEach public void init() throws Exception { @@ -57,13 +56,9 @@ public class BaseBigQueryTest extends CamelTestSupport { } protected void setupBigqueryMock() throws Exception { - bigquery = mock(Bigquery.class); - + bigquery = mock(BigQuery.class); tabledataMock = mock(Bigquery.Tabledata.class); - when(bigquery.tabledata()).thenReturn(tabledataMock); - when(tabledataMock.insertAll(anyString(), anyString(), anyString(), any())).thenReturn(mockInsertall); - - TableDataInsertAllResponse mockResponse = new TableDataInsertAllResponse(); - when(mockInsertall.execute()).thenReturn(mockResponse); + InsertAllResponse mockResponse = mock(InsertAllResponse.class); + when(bigquery.insertAll(any())).thenReturn(mockResponse); } } diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/GoogleBigQueryProducerTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/GoogleBigQueryProducerTest.java index e9cc465..2658257 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/GoogleBigQueryProducerTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/GoogleBigQueryProducerTest.java @@ -21,7 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import com.google.api.services.bigquery.model.TableDataInsertAllRequest; +import com.google.cloud.bigquery.InsertAllRequest; import org.apache.camel.Exchange; import org.apache.camel.component.google.bigquery.GoogleBigQueryConstants; import org.junit.jupiter.api.Test; @@ -29,7 +29,6 @@ import org.mockito.ArgumentCaptor; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; public class GoogleBigQueryProducerTest extends BaseBigQueryTest { @@ -37,12 +36,12 @@ public class GoogleBigQueryProducerTest extends BaseBigQueryTest { @Test public void sendMessage() throws Exception { producer.process(createExchangeWithBody(new HashMap<>())); - ArgumentCaptor<TableDataInsertAllRequest> dataCaptor = ArgumentCaptor.forClass(TableDataInsertAllRequest.class); - verify(tabledataMock).insertAll(eq(projectId), eq(datasetId), eq(tableId), dataCaptor.capture()); - List<TableDataInsertAllRequest> requests = dataCaptor.getAllValues(); + ArgumentCaptor<InsertAllRequest> dataCaptor = ArgumentCaptor.forClass(InsertAllRequest.class); + verify(bigquery).insertAll(dataCaptor.capture()); + List<InsertAllRequest> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(1, requests.get(0).getRows().size()); - assertNull(requests.get(0).getRows().get(0).getInsertId()); + assertNull(requests.get(0).getRows().get(0).getId()); } @Test @@ -50,12 +49,12 @@ public class GoogleBigQueryProducerTest extends BaseBigQueryTest { Exchange exchange = createExchangeWithBody(new HashMap<>()); exchange.getIn().setHeader(GoogleBigQueryConstants.TABLE_ID, "exchange_table_id"); producer.process(exchange); - ArgumentCaptor<TableDataInsertAllRequest> dataCaptor = ArgumentCaptor.forClass(TableDataInsertAllRequest.class); - verify(tabledataMock).insertAll(eq(projectId), eq(datasetId), eq("exchange_table_id"), dataCaptor.capture()); - List<TableDataInsertAllRequest> requests = dataCaptor.getAllValues(); + ArgumentCaptor<InsertAllRequest> dataCaptor = ArgumentCaptor.forClass(InsertAllRequest.class); + verify(bigquery).insertAll(dataCaptor.capture()); + List<InsertAllRequest> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(1, requests.get(0).getRows().size()); - assertNull(requests.get(0).getRows().get(0).getInsertId()); + assertNull(requests.get(0).getRows().get(0).getId()); } @Test @@ -64,12 +63,12 @@ public class GoogleBigQueryProducerTest extends BaseBigQueryTest { Map<String, String> object = new HashMap<>(); object.put("row1", "value1"); producer.process(createExchangeWithBody(object)); - ArgumentCaptor<TableDataInsertAllRequest> dataCaptor = ArgumentCaptor.forClass(TableDataInsertAllRequest.class); - verify(tabledataMock).insertAll(eq(projectId), eq(datasetId), eq(tableId), dataCaptor.capture()); - List<TableDataInsertAllRequest> requests = dataCaptor.getAllValues(); + ArgumentCaptor<InsertAllRequest> dataCaptor = ArgumentCaptor.forClass(InsertAllRequest.class); + verify(bigquery).insertAll(dataCaptor.capture()); + List<InsertAllRequest> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(1, requests.get(0).getRows().size()); - assertEquals("value1", requests.get(0).getRows().get(0).getInsertId()); + assertEquals("value1", requests.get(0).getRows().get(0).getId()); } @Test @@ -78,9 +77,9 @@ public class GoogleBigQueryProducerTest extends BaseBigQueryTest { messages.add(new HashMap<>()); messages.add(new HashMap<>()); producer.process(createExchangeWithBody(messages)); - ArgumentCaptor<TableDataInsertAllRequest> dataCaptor = ArgumentCaptor.forClass(TableDataInsertAllRequest.class); - verify(tabledataMock).insertAll(eq(projectId), eq(datasetId), eq(tableId), dataCaptor.capture()); - List<TableDataInsertAllRequest> requests = dataCaptor.getAllValues(); + ArgumentCaptor<InsertAllRequest> dataCaptor = ArgumentCaptor.forClass(InsertAllRequest.class); + verify(bigquery).insertAll(dataCaptor.capture()); + List<InsertAllRequest> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(2, requests.get(0).getRows().size()); } diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java index 2962c84..aba57e6 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java @@ -16,16 +16,14 @@ */ package org.apache.camel.component.google.bigquery.unit.sql; -import org.apache.camel.CamelContext; import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLComponent; import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import static org.junit.jupiter.api.Assertions.assertEquals; -public class GoogleBigQuerySQLComponentTest { - private CamelContext context = Mockito.mock(CamelContext.class); +public class GoogleBigQuerySQLComponentTest extends CamelTestSupport { @Test public void testQuerySet() throws Exception { diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java index c9139bb..0614d86 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java @@ -16,27 +16,27 @@ */ package org.apache.camel.component.google.bigquery.unit.sql; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.model.QueryResponse; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLConfiguration; import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLEndpoint; import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLProducer; import org.apache.camel.test.junit5.CamelTestSupport; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class GoogleBigQuerySQLProducerBaseTest extends CamelTestSupport { protected GoogleBigQuerySQLEndpoint endpoint = mock(GoogleBigQuerySQLEndpoint.class); - protected Bigquery.Jobs mockJobs = mock(Bigquery.Jobs.class); - protected Bigquery.Jobs.Query mockQuery = mock(Bigquery.Jobs.Query.class); protected GoogleBigQuerySQLProducer producer; protected String sql; protected String projectId = "testProjectId"; protected GoogleBigQuerySQLConfiguration configuration = new GoogleBigQuerySQLConfiguration(); - protected Bigquery bigquery; + protected BigQuery bigquery; + protected TableResult tableResult; protected GoogleBigQuerySQLProducer createAndStartProducer() throws Exception { configuration.setProjectId(projectId); @@ -48,12 +48,8 @@ public class GoogleBigQuerySQLProducerBaseTest extends CamelTestSupport { } protected void setupBigqueryMock() throws Exception { - bigquery = mock(Bigquery.class); - - when(bigquery.jobs()).thenReturn(mockJobs); - when(bigquery.jobs().query(anyString(), any())).thenReturn(mockQuery); - - QueryResponse mockResponse = new QueryResponse().setNumDmlAffectedRows(1L); - when(mockQuery.execute()).thenReturn(mockResponse); + bigquery = mock(BigQuery.class); + tableResult = mock(TableResult.class); + when(bigquery.query(any(QueryJobConfiguration.class), any(JobId.class))).thenReturn(tableResult); } } diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java index de2f45c..1812b22 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java @@ -19,15 +19,22 @@ package org.apache.camel.component.google.bigquery.unit.sql; import java.util.HashMap; import java.util.Map; -import com.google.api.services.bigquery.model.QueryRequest; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.QueryParameterValue; import org.apache.camel.Exchange; +import org.apache.camel.Message; import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.component.google.bigquery.GoogleBigQueryConstants; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import static org.apache.camel.component.google.bigquery.integration.BigQueryTestSupport.PROJECT_ID; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.verify; @@ -47,18 +54,20 @@ public class GoogleBigQuerySQLProducerWithParamersTest extends GoogleBigQuerySQL body.put("data", "some data"); producer.process(createExchangeWithBody(body)); - ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); - verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + ArgumentCaptor<QueryJobConfiguration> dataCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(bigquery).query(dataCaptor.capture(), any(JobId.class)); - QueryRequest request = dataCaptor.getValue(); + QueryJobConfiguration request = dataCaptor.getValue(); assertEquals(sql, request.getQuery()); - assertEquals(2, request.getQueryParameters().size()); - assertEquals("id", request.getQueryParameters().get(1).getName()); - assertEquals("100", request.getQueryParameters().get(1).getParameterValue().getValue()); + Map<String, QueryParameterValue> namedParameters = request.getNamedParameters(); + assertEquals(2, namedParameters.size()); - assertEquals("data", request.getQueryParameters().get(0).getName()); - assertEquals("some data", request.getQueryParameters().get(0).getParameterValue().getValue()); + assertTrue(namedParameters.containsKey("id")); + assertEquals("100", namedParameters.get("id").getValue()); + + assertTrue(namedParameters.containsKey("data")); + assertEquals("some data", namedParameters.get("data").getValue()); } @Test @@ -67,24 +76,57 @@ public class GoogleBigQuerySQLProducerWithParamersTest extends GoogleBigQuerySQL body.put("id", "100"); Exchange exchange = createExchangeWithBody(body); - exchange.getMessage().getHeaders().put("id", "200"); - exchange.getMessage().getHeaders().put("data", "some data"); + Message message = exchange.getMessage(); + message.setHeader("id", "200"); + message.setHeader("data", "some data"); + + producer.process(exchange); + + ArgumentCaptor<QueryJobConfiguration> dataCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(bigquery).query(dataCaptor.capture(), any(JobId.class)); + + QueryJobConfiguration request = dataCaptor.getValue(); + assertEquals(sql, request.getQuery()); + + Map<String, QueryParameterValue> namedParameters = request.getNamedParameters(); + assertEquals(2, namedParameters.size()); + + assertTrue(namedParameters.containsKey("id")); + assertEquals("100", namedParameters.get("id").getValue(), "Body data must have higher priority"); + + assertTrue(namedParameters.containsKey("data")); + assertEquals("some data", namedParameters.get("data").getValue()); + } + + @Test + public void sendMessageWithJobIdHeader() throws Exception { + Map<String, String> body = new HashMap<>(); + body.put("id", "100"); + + Exchange exchange = createExchangeWithBody(body); + Message message = exchange.getMessage(); + message.setHeader("id", "200"); + message.setHeader("data", "some data"); + + JobId jobId = JobId.of(PROJECT_ID, "a-test-job"); + message.setHeader(GoogleBigQueryConstants.JOB_ID, jobId); producer.process(exchange); - ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); - verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + ArgumentCaptor<QueryJobConfiguration> dataCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(bigquery).query(dataCaptor.capture(), eq(jobId)); - QueryRequest request = dataCaptor.getValue(); + QueryJobConfiguration request = dataCaptor.getValue(); assertEquals(sql, request.getQuery()); - assertEquals(2, request.getQueryParameters().size()); - assertEquals("id", request.getQueryParameters().get(1).getName()); - assertEquals("Body data must have higher priority", "100", - request.getQueryParameters().get(1).getParameterValue().getValue()); + Map<String, QueryParameterValue> namedParameters = request.getNamedParameters(); + assertEquals(2, namedParameters.size()); + + assertTrue(namedParameters.containsKey("id")); + assertEquals("100", namedParameters.get("id").getValue(), "Body data must have higher priority"); - assertEquals("data", request.getQueryParameters().get(0).getName()); - assertEquals("some data", request.getQueryParameters().get(0).getParameterValue().getValue()); + assertTrue(namedParameters.containsKey("data")); + assertEquals("some data", namedParameters.get("data").getValue()); } @Test diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java index a721e15..4b51046 100644 --- a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java @@ -18,14 +18,15 @@ package org.apache.camel.component.google.bigquery.unit.sql; import java.util.List; -import com.google.api.services.bigquery.model.QueryRequest; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.QueryJobConfiguration; import org.apache.camel.Exchange; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.verify; public class GoogleBigQuerySQLProducerWithPatternTest extends GoogleBigQuerySQLProducerBaseTest { @@ -44,10 +45,10 @@ public class GoogleBigQuerySQLProducerWithPatternTest extends GoogleBigQuerySQLP String expected = "insert into dataset.testTableId(id, data) values(1, 'test')"; producer.process(exchange); - ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); - verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + ArgumentCaptor<QueryJobConfiguration> dataCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(bigquery).query(dataCaptor.capture(), any(JobId.class)); - List<QueryRequest> requests = dataCaptor.getAllValues(); + List<QueryJobConfiguration> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(expected, requests.get(0).getQuery()); } @@ -59,10 +60,10 @@ public class GoogleBigQuerySQLProducerWithPatternTest extends GoogleBigQuerySQLP String expected = "insert into dataset.testTableId(id, data) values(1, 'test')"; producer.process(exchange); - ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); - verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + ArgumentCaptor<QueryJobConfiguration> dataCaptor = ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(bigquery).query(dataCaptor.capture(), any(JobId.class)); - List<QueryRequest> requests = dataCaptor.getAllValues(); + List<QueryJobConfiguration> requests = dataCaptor.getAllValues(); assertEquals(1, requests.size()); assertEquals(expected, requests.get(0).getQuery()); } diff --git a/components/camel-google-bigquery/src/test/resources/schema/simple-table.json b/components/camel-google-bigquery/src/test/resources/schema/simple-table.json deleted file mode 100644 index 19210ff..0000000 --- a/components/camel-google-bigquery/src/test/resources/schema/simple-table.json +++ /dev/null @@ -1,6 +0,0 @@ -[ - {"name":"id", "type":"INTEGER"}, - {"name":"col1", "type":"STRING"}, - {"name":"col2", "type":"STRING"} -] - diff --git a/docs/components/modules/ROOT/pages/google-bigquery-component.adoc b/docs/components/modules/ROOT/pages/google-bigquery-component.adoc index beed093..82f4191 100644 --- a/docs/components/modules/ROOT/pages/google-bigquery-component.adoc +++ b/docs/components/modules/ROOT/pages/google-bigquery-component.adoc @@ -45,20 +45,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] diff --git a/docs/components/modules/ROOT/pages/google-bigquery-sql-component.adoc b/docs/components/modules/ROOT/pages/google-bigquery-sql-component.adoc index df44c1e..5b3cf69 100644 --- a/docs/components/modules/ROOT/pages/google-bigquery-sql-component.adoc +++ b/docs/components/modules/ROOT/pages/google-bigquery-sql-component.adoc @@ -44,20 +44,13 @@ for this component: Google BigQuery component authentication is targeted for use with the GCP Service Accounts. For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] -Google security credentials can be set explicitly via one of the two options: +Google security credentials can be set explicitly by providing the path to the GCP credentials file location. -* Service Account Email and Service Account Key (PEM format) -* GCP credentials file location - -If both are set, the Service Account Email/Key will take precedence. - -Or implicitly, where the connection factory falls back on +Or they are set implicitly, where the connection factory falls back on https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. *OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. -Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. - == URI Format [source,text] @@ -134,7 +127,15 @@ with the following path and query parameters: // endpoint options: END -== Ouput Message Headers +== Message Headers + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Name |Type |Description +|`CamelGoogleBigQueryJobId` |`JobId` |A custom `JobId` to use +|======================================================================= + +== Output Message Headers [width="100%",cols="10%,10%,80%",options="header",] |======================================================================= diff --git a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_7.adoc b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_7.adoc index 484f6f1..c7f94ae 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_7.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-3x-upgrade-guide-3_7.adoc @@ -282,3 +282,11 @@ adopting XStream's Security Framework. CAMEL-15890 fixed a bug in which values for External Ids that contained spaces would have spaces converted to "+". This has been fixed. However, any such values that now have the plus sign in salesforce will no longer match as Camel will now preserve the space. Therefore you may need to have a transformation that explicility covnerts spaces to "+" if you need to preserve the old behavior. + +=== camel-google-bigquery + +The camel-google-bigquery component was updated to use the latest version of `google-cloud-bigquery`. Some features of `GoogleBigQueryConnectionFactory` are no longer supported. + +It is no longer possible to provide a the service account private key as a String parameter to `GoogleBigQueryConnectionFactory`. Instead, you should use `setCredentialsFileLocation` to +discover the private key from your credentials file. Or use the fallback mechanism for discovering credentials by setting the `GOOGLE_APPLICATION_CREDENTIALS` environment variable. Refer to the +component documentation for more information. diff --git a/parent/pom.xml b/parent/pom.xml index c728c81..5c5d7a9 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -214,7 +214,8 @@ <google-api-services-calendar-version>v3-rev291-1.22.0</google-api-services-calendar-version> <google-api-services-sheets-version>v4-rev551-1.22.0</google-api-services-sheets-version> <google-api-services-mail-version>v1-rev81-1.22.0</google-api-services-mail-version> - <google-api-services-bigquery-version>v2-rev352-1.22.0</google-api-services-bigquery-version> + <google-cloud-bom-version>16.1.0</google-cloud-bom-version> + <google-cloud-guava-version>30.0-jre</google-cloud-guava-version> <google-cloud-pubsub-version>1.105.0</google-cloud-pubsub-version> <google-errorprone-version>2.3.3</google-errorprone-version> <google-gax-version>1.50.1</google-gax-version>