This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch sandbox/camel-3.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 55bc3a5547992043084460eef361bbcf5c8c11c1 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Nov 28 12:02:15 2018 +0100 CAMEL-12930 - Ability to execute DML statements in Google Bigquery component, thanks to Roman Lusnikov for the patch --- .../main/docs/google-bigquery-sql-component.adoc | 151 +++++++++++++++++++ .../google/bigquery/GoogleBigQueryConstants.java | 1 + .../bigquery/sql/GoogleBigQuerySQLComponent.java | 81 +++++++++++ .../sql/GoogleBigQuerySQLConfiguration.java | 76 ++++++++++ .../bigquery/sql/GoogleBigQuerySQLEndpoint.java | 80 +++++++++++ .../bigquery/sql/GoogleBigQuerySQLProducer.java | 159 +++++++++++++++++++++ .../component/google/bigquery/sql/SqlHelper.java | 94 ++++++++++++ .../org/apache/camel/component/google-bigquery-sql | 18 +++ .../integration/sql/SqlQueryFromResourceTest.java | 85 +++++++++++ .../bigquery/integration/sql/SqlQueryTest.java | 85 +++++++++++ .../unit/sql/GoogleBigQuerySQLComponentTest.java | 52 +++++++ .../sql/GoogleBigQuerySQLProducerBaseTest.java | 44 ++++++ .../GoogleBigQuerySQLProducerWithParamersTest.java | 91 ++++++++++++ .../GoogleBigQuerySQLProducerWithPatternTest.java | 69 +++++++++ .../google/bigquery/unit/sql/SqlHelperTest.java | 113 +++++++++++++++ .../src/test/resources/sql/delete.sql | 1 + .../src/test/resources/sql/insert.sql | 2 + 17 files changed, 1202 insertions(+) diff --git a/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc b/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc new file mode 100644 index 0000000..23f5df6 --- /dev/null +++ b/components/camel-google-bigquery/src/main/docs/google-bigquery-sql-component.adoc @@ -0,0 +1,151 @@ +[[google-bigquery-sql-component]] +== Google BigQuery Standard SQL Component +== Google BigQuery Component +*Available as of Camel version 2.23* + +### Component Description + +The Google Bigquery SQL component provides access +to https://cloud.google.com/bigquery/[Cloud BigQuery Infrastructure] via +the https://developers.google.com/apis-explorer/#p/bigquery/v2/bigquery.jobs.query[Google Client Services API]. + +The current implementation supports only standard SQL +https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax[DML queries]. + +Maven users will need to add the following dependency to their pom.xml +for this component: + +[source,xml] +------------------------------------------------------ +<dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-google-bigquery</artifactId> + <version>x.x.x</version> + <!-- use the same version as your Camel core version --> +</dependency> + +------------------------------------------------------ + +[[GoogleBigQuery-AuthenticationConfiguration]] + +### Authentication Configuration + +Google BigQuery component authentication is targeted for use with the GCP Service Accounts. +For more information please refer to https://cloud.google.com/docs/authentication[Google Cloud Platform Auth Guide] + +Google security credentials can be set explicitly via one of the two options: + +* Service Account Email and Service Account Key (PEM format) +* GCP credentials file location + +If both are set, the Service Account Email/Key will take precedence. + +Or implicitly, where the connection factory falls back on +https://developers.google.com/identity/protocols/application-default-credentials#howtheywork[Application Default Credentials]. + +*OBS!* The location of the default credentials file is configurable - via GOOGLE_APPLICATION_CREDENTIALS environment variable. + +Service Account Email and Service Account Key can be found in the GCP JSON credentials file as client_email and private_key respectively. + +### URI Format + +[source,java] +-------------------------------------------------------- + google-bigquery-sql://project-id:query?[options] +-------------------------------------------------------- + +Examples: +[source,java] +-------------------------------------------------------- + google-bigquery-sql://project-17248459:delete * from test.table where id=@myId + google-bigquery-sql://project-17248459:delete * from ${datasetId}.${tableId} where id=@myId +-------------------------------------------------------- +where + + * parameters in form ${name} are extracted from message headers and formed the translated query + * parameters in form @name are extracted from body or message headers and sent to Google Bigquery + +You can externalize your SQL queries to files in the classpath or file system as shown: +[source,java] +-------------------------------------------------------- + google-bigquery-sql://project-17248459::classpath:delete.sql +-------------------------------------------------------- + + + +### Options + +// component options: START +The Google BigQuery Standard SQL component supports 3 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *projectId* (producer) | Google Cloud Project Id | | String +| *connectionFactory* (producer) | ConnectionFactory to obtain connection to Bigquery Service. If non provided the default one will be used | | GoogleBigQuery ConnectionFactory +| *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean +|=== +// component options: END + +// endpoint options: START +The Google BigQuery Standard SQL endpoint is configured using URI syntax: + +---- +google-bigquery-sql:query +---- + +with the following path and query parameters: + +==== Path Parameters (2 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *projectId* | *Required* Google Cloud Project Id | | String +| *query* | *Required* BigQuery standard SQL query | | String +|=== + + +==== Query Parameters (2 parameters): + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *connectionFactory* (producer) | ConnectionFactory to obtain connection to Bigquery Service. If non provided the default will be used. | | GoogleBigQuery ConnectionFactory +| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean +|=== +// endpoint options: END +// spring-boot-auto-configure options: START +=== Spring Boot Auto-Configuration + + +The component supports 4 options, which are listed below. + + + +[width="100%",cols="2,5,^1,2",options="header"] +|=== +| Name | Description | Default | Type +| *camel.component.google-bigquery-sql.enabled* | Whether to enable auto configuration of the google-bigquery component. This is enabled by default. | | Boolean +| *camel.component.google-bigquery-sql.project-id* | Google Cloud Project Id | | String +//| *camel.component.google-bigquery.resolve-property-placeholders* | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | Boolean +|=== +// spring-boot-auto-configure options: END + + +### Ouput Message Headers + +[width="100%",cols="10%,10%,80%",options="header",] +|======================================================================= +|Name |Type |Description +|`CamelGoogleBigQueryTranslatedQuery` |`String` | Preprocessed query text +|======================================================================= + + +### Producer Endpoints + +Google BigQuery SQL endpoint expects the payload to be either empty or a map of query parameters. \ No newline at end of file diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java index cfd3c57..945cde7 100644 --- a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/GoogleBigQueryConstants.java @@ -21,6 +21,7 @@ public final class GoogleBigQueryConstants { public static final String TABLE_ID = "CamelGoogleBigQueryTableId"; public static final String INSERT_ID = "CamelGoogleBigQueryInsertId"; public static final String PARTITION_DECORATOR = "CamelGoogleBigQueryPartitionDecorator"; + public static final String TRANSLATED_QUERY = "CamelGoogleBigQueryTranslatedQuery"; /** * Prevent instantiation. diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLComponent.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLComponent.java new file mode 100644 index 0000000..fcbcf66 --- /dev/null +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLComponent.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.sql; + +import java.util.Map; +import org.apache.camel.CamelContext; +import org.apache.camel.Endpoint; +import org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory; +import org.apache.camel.impl.DefaultComponent; + +public class GoogleBigQuerySQLComponent extends DefaultComponent { + private String projectId; + private GoogleBigQueryConnectionFactory connectionFactory; + + public GoogleBigQuerySQLComponent() { + super(); + } + + public GoogleBigQuerySQLComponent(CamelContext camelContext) { + super(camelContext); + } + + // Endpoint represents a single table + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + String[] parts = remaining.split(":"); + + if (parts.length < 2) { + throw new IllegalArgumentException("Google BigQuery Endpoint format \"projectId:<query>\""); + } + + GoogleBigQuerySQLConfiguration configuration = new GoogleBigQuerySQLConfiguration(); + setProperties(configuration, parameters); + configuration.parseRemaining(remaining); + + if (configuration.getConnectionFactory() == null) { + configuration.setConnectionFactory(getConnectionFactory()); + } + + return new GoogleBigQuerySQLEndpoint(uri, this, configuration); + } + + public String getProjectId() { + return projectId; + } + + /** + * Google Cloud Project Id + */ + public void setProjectId(String projectId) { + this.projectId = projectId; + } + + public GoogleBigQueryConnectionFactory getConnectionFactory() { + if (connectionFactory == null) { + connectionFactory = new GoogleBigQueryConnectionFactory(); + } + return connectionFactory; + } + + /** + * ConnectionFactory to obtain connection to Bigquery Service. If non provided the default one will be used + */ + public void setConnectionFactory(GoogleBigQueryConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } +} diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLConfiguration.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLConfiguration.java new file mode 100644 index 0000000..25ed0dff --- /dev/null +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLConfiguration.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.sql; + +import org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriParams; +import org.apache.camel.spi.UriPath; + +@UriParams +public class GoogleBigQuerySQLConfiguration { + + @UriParam(description = "ConnectionFactory to obtain connection to Bigquery Service. If non provided the default one will be used") + private GoogleBigQueryConnectionFactory connectionFactory; + @UriPath(label = "common", description = "Google Cloud Project Id") + @Metadata(required = "true") + private String projectId; + @UriPath(label = "common", description = "BigQuery standard SQL query") + @Metadata(required = "true") + private String query; + + public void parseRemaining(String remaining) { + int indexOfColon = remaining.indexOf(":"); + + if (indexOfColon < 0) { + throw new IllegalArgumentException("Google BigQuery Endpoint format \"projectId:query\""); + } + + projectId = remaining.substring(0, indexOfColon); + query = remaining.substring(indexOfColon + 1); + } + + /** + * ConnectionFactory to obtain connection to Bigquery Service. If non provided the default will be used. + */ + public GoogleBigQueryConnectionFactory getConnectionFactory() { + return connectionFactory; + } + + public void setConnectionFactory(GoogleBigQueryConnectionFactory connectionFactory) { + this.connectionFactory = connectionFactory; + } + + public String getQuery() { + return query; + } + + public GoogleBigQuerySQLConfiguration setQuery(String query) { + this.query = query; + return this; + } + + public String getProjectId() { + return projectId; + } + + public GoogleBigQuerySQLConfiguration setProjectId(String projectId) { + this.projectId = projectId; + return this; + } +} diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java new file mode 100644 index 0000000..fee9e96 --- /dev/null +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLEndpoint.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.sql; + +import com.google.api.services.bigquery.Bigquery; +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; + +/** + * Google BigQuery data warehouse for analytics. + * + * BigQuery Endpoint Definition + * Represents a table within a BigQuery dataset + * Contains configuration details for a single table and the utility methods (such as check, create) to ease operations + * URI Parameters: + * * Logger ID - To ensure that logging is unified under Route Logger, the logger ID can be passed on + * via an endpoint URI parameter + * * Partitioned - to indicate that the table needs to be partitioned - every UTC day to be written into a + * timestamped separate table + * side effect: Australian operational day is always split between two UTC days, and, therefore, tables + * + * Another consideration is that exceptions are not handled within the class. They are expected to bubble up and be handled + * by Camel. + */ +@UriEndpoint(firstVersion = "2.23.0", scheme = "google-bigquery-sql", title = "Google BigQuery Standard SQL", syntax = "google-bigquery-sql:query", + label = "cloud,messaging", producerOnly = true) +public class GoogleBigQuerySQLEndpoint extends DefaultEndpoint { + + @UriParam + protected final GoogleBigQuerySQLConfiguration configuration; + + protected GoogleBigQuerySQLEndpoint(String endpointUri, GoogleBigQuerySQLComponent component, GoogleBigQuerySQLConfiguration configuration) { + super(endpointUri, component); + this.configuration = configuration; + } + + @Override + public Producer createProducer() throws Exception { + Bigquery bigquery = getConfiguration().getConnectionFactory().getDefaultClient(); + GoogleBigQuerySQLProducer producer = new GoogleBigQuerySQLProducer(bigquery, this, configuration); + return producer; + } + + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Cannot consume from the BigQuery endpoint: " + getEndpointUri()); + } + + public boolean isSingleton() { + return true; + } + + public GoogleBigQuerySQLConfiguration getConfiguration() { + return configuration; + } + + @Override + public GoogleBigQuerySQLComponent getComponent() { + return (GoogleBigQuerySQLComponent)super.getComponent(); + } + + +} diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java new file mode 100644 index 0000000..e9ce93d --- /dev/null +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/GoogleBigQuerySQLProducer.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.camel.component.google.bigquery.sql; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.*; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.component.google.bigquery.GoogleBigQueryConstants; +import org.apache.camel.impl.DefaultProducer; + +import java.util.*; + +/** + * Generic BigQuery Producer + */ +public class GoogleBigQuerySQLProducer extends DefaultProducer { + + private final GoogleBigQuerySQLConfiguration configuration; + private Bigquery bigquery; + private String query; + private Set<String> queryParameterNames; + + public GoogleBigQuerySQLProducer(Bigquery bigquery, GoogleBigQuerySQLEndpoint endpoint, + GoogleBigQuerySQLConfiguration configuration) { + super(endpoint); + this.bigquery = bigquery; + this.configuration = configuration; + } + + /** + * Process the exchange + * + * The incoming exchange can be a grouped exchange in which case all the exchanges will be combined. + * + * The incoming can be + * <ul> + * <li>A map where all map keys will map to field records. One map object maps to one bigquery row</li> + * <li>A list of maps. Each entry in the list will map to one bigquery row</li> + * </ul> + * The incoming message is expected to be a List of Maps + * The assumptions: + * - All incoming records go into the same table + * - Incoming records sorted by the timestamp + */ + @Override + public void process(Exchange exchange) throws Exception { + String translatedQuery = SqlHelper.translateQuery(query, exchange); + Map<String, Object> queryParameters = extractParameters(exchange); + exchange.getMessage().setHeader(GoogleBigQueryConstants.TRANSLATED_QUERY, translatedQuery); + Long affectedRows = executeSQL(translatedQuery, queryParameters); + log.debug("The query {} affected {} rows", query, affectedRows); + exchange.getMessage().setBody(affectedRows); + } + + private Long executeSQL(String translatedQuery, Map<String, Object> queryParameters) throws Exception { + QueryRequest apiQueryRequest = new QueryRequest() + .setQuery(translatedQuery) + .setUseLegacySql(false); + + Bigquery.Jobs.Query apiQuery = bigquery + .jobs() + .query(configuration.getProjectId(), apiQueryRequest); + + setQueryParameters(queryParameters, apiQueryRequest); + + if (log.isTraceEnabled()) { + log.trace("Sending query to bigquery standard sql: {}", + translatedQuery); + } + + QueryResponse apiResponse = apiQuery.execute(); + + if (apiResponse.getErrors() != null && !apiResponse.getErrors().isEmpty()) { + throw new Exception("Query " + translatedQuery + " failed: " + apiResponse.getErrors()); + } + + if (log.isTraceEnabled()) { + log.trace("Result of query {} is {}", + translatedQuery, apiResponse.toPrettyString()); + } + return apiResponse.getNumDmlAffectedRows(); + } + + private Map<String, Object> extractParameters(Exchange exchange) { + if (queryParameterNames == null || queryParameterNames.size() == 0) + return null; + + Message message = exchange.getMessage(); + + HashMap<String, Object> headers = new HashMap<>(message.getHeaders()); + if (message.getBody() instanceof Map) { + try { + headers.putAll(message.getBody(Map.class)); + } catch (ClassCastException e) { + e.printStackTrace(); + } + } + + HashMap<String, Object> result = new HashMap<>(queryParameterNames.size()); + queryParameterNames.forEach(s -> { + Object value = headers.get(s); + if (value == null) { + throw new RuntimeExchangeException("SQL parameter with name '" + s + "' not found in the message headers", exchange); + } + + result.put(s, headers.get(s)); + }); + + return result; + } + + private void setQueryParameters(Map<String, Object> params, QueryRequest apiQueryRequest) { + if (params == null) + return; + + List<QueryParameter> list = new ArrayList<>(); + params.forEach((key, value) -> { + QueryParameter param = new QueryParameter(); + param.setName(key) + .setParameterType(new QueryParameterType().setType("STRING")) + .setParameterValue(new QueryParameterValue().setValue(value.toString())); + list.add(param); + }); + apiQueryRequest.setQueryParameters(list); + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public GoogleBigQuerySQLEndpoint getEndpoint() { + return (GoogleBigQuerySQLEndpoint) super.getEndpoint(); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + String placeholder = ":#"; // TODO + query = SqlHelper + .resolveQuery(getEndpoint().getCamelContext(), configuration.getQuery(), placeholder); + queryParameterNames = SqlHelper.extractParameterNames(query); + } +} diff --git a/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/SqlHelper.java b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/SqlHelper.java new file mode 100644 index 0000000..eb41856 --- /dev/null +++ b/components/camel-google-bigquery/src/main/java/org/apache/camel/component/google/bigquery/sql/SqlHelper.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.sql; + +import org.apache.camel.*; +import org.apache.camel.util.ResourceHelper; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashSet; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public final class SqlHelper { + + private static Pattern pattern = Pattern.compile("\\$\\{(\\w+)}"); + private static Pattern parameterPattern = Pattern.compile("@(\\w+)"); + + private SqlHelper() { + } + + /** + * Resolve the query by loading the query from the classpath or file resource if needed. + */ + public static String resolveQuery(CamelContext camelContext, String query, String placeholder) throws NoTypeConversionAvailableException, IOException { + String answer = query; + if (ResourceHelper.hasScheme(query)) { + InputStream is = ResourceHelper + .resolveMandatoryResourceAsInputStream(camelContext, query); + answer = camelContext.getTypeConverter().mandatoryConvertTo(String.class, is); + if (placeholder != null) { + answer = answer.replaceAll(placeholder, "@"); + } + } + return answer; + } + + /** + * Replaces pattern in query in form of "${param}" with values from message header + * Raises an error if param value not found in headers + * @param exchange + * @return Translated query text + */ + public static String translateQuery(String query, Exchange exchange) { + Message message = exchange.getMessage(); + Matcher matcher = pattern.matcher(query); + StringBuffer stringBuffer = new StringBuffer(); + while (matcher.find()) { + String paramKey = matcher.group(1); + + String value = message.getHeader(paramKey, String.class); + if (value == null) { + value = exchange.getProperty(paramKey, String.class); + if (value == null) + throw new RuntimeExchangeException("SQL pattern with name '" + paramKey + "' not found in the message headers", exchange); + } + + String replacement = Matcher.quoteReplacement(value); + matcher.appendReplacement(stringBuffer, replacement); + } + matcher.appendTail(stringBuffer); + return stringBuffer.toString(); + } + + /** + * Extracts list of parameters in form "@name" from query text + * @param query + * @return list of parameter names + */ + public static Set<String> extractParameterNames(String query) { + Matcher matcher = parameterPattern.matcher(query); + Set<String> result = new HashSet<>(); + while (matcher.find()) { + String paramName = matcher.group(1); + result.add(paramName); + } + return result; + } +} diff --git a/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery-sql b/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery-sql new file mode 100644 index 0000000..1bf97f0 --- /dev/null +++ b/components/camel-google-bigquery/src/main/resources/META-INF/services/org/apache/camel/component/google-bigquery-sql @@ -0,0 +1,18 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ + +class=org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLComponent diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryFromResourceTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryFromResourceTest.java new file mode 100644 index 0000000..1ea8f46 --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryFromResourceTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.integration.sql; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.bigquery.integration.BigQueryTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Before; +import org.junit.Test; + +public class SqlQueryFromResourceTest extends BigQueryTestSupport { + private static final String TABLE_ID = "test_sql_table"; + + @EndpointInject(uri = "direct:in") + private Endpoint directIn; + + @EndpointInject(uri = "google-bigquery-sql:{{project.id}}:classpath:sql/insert.sql") + private Endpoint bigqueryEndpoint; + + @EndpointInject(uri = "mock:sendResult") + private MockEndpoint sendResult; + + @Produce(uri = "direct:in") + private ProducerTemplate producer; + + @Before + public void init() throws Exception { + createBqTable(TABLE_ID); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from(directIn) + .routeId("InsertRow") + .to(bigqueryEndpoint) + .to(sendResult); + } + }; + } + + @Test + public void insertRecordBySql() throws Exception { + Exchange exchange = new DefaultExchange(context); + String uuidCol1 = UUID.randomUUID().toString(); + String uuidCol2 = UUID.randomUUID().toString(); + + Map<String, String> object = new HashMap<>(); + object.put("col1", uuidCol1); + object.put("col2", uuidCol2); + exchange.getIn().setBody(object); + + sendResult.expectedMessageCount(1); + sendResult.expectedBodiesReceived(1); + producer.send(exchange); + sendResult.assertIsSatisfied(4000); + + assertRowExist(TABLE_ID, object); + } + +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryTest.java new file mode 100644 index 0000000..fc490e5 --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/integration/sql/SqlQueryTest.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.integration.sql; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.google.bigquery.integration.BigQueryTestSupport; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultExchange; +import org.junit.Before; +import org.junit.Test; + +public class SqlQueryTest extends BigQueryTestSupport { + private static final String TABLE_ID = "test_sql_table"; + + @EndpointInject(uri = "direct:in") + private Endpoint directIn; + + @EndpointInject(uri = "google-bigquery-sql:{{project.id}}: insert into {{bigquery.datasetId}}." + TABLE_ID + "(col1, col2) values (@col1, @col2)") + private Endpoint bigqueryEndpoint; + + @EndpointInject(uri = "mock:sendResult") + private MockEndpoint sendResult; + + @Produce(uri = "direct:in") + private ProducerTemplate producer; + + @Before + public void init() throws Exception { + createBqTable(TABLE_ID); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() { + from(directIn) + .routeId("InsertRow") + .to(bigqueryEndpoint) + .to(sendResult); + } + }; + } + + @Test + public void insertRecordBySql() throws Exception { + Exchange exchange = new DefaultExchange(context); + String uuidCol1 = UUID.randomUUID().toString(); + String uuidCol2 = UUID.randomUUID().toString(); + + Map<String, String> object = new HashMap<>(); + object.put("col1", uuidCol1); + object.put("col2", uuidCol2); + exchange.getIn().setBody(object); + + sendResult.expectedMessageCount(1); + sendResult.expectedBodiesReceived(1); + producer.send(exchange); + sendResult.assertIsSatisfied(4000); + + assertRowExist(TABLE_ID, object); + } + +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java new file mode 100644 index 0000000..ba24d0d --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLComponentTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.unit.sql; + +import static org.junit.Assert.assertEquals; + +import org.apache.camel.CamelContext; +import org.apache.camel.component.google.bigquery.GoogleBigQueryComponent; +import org.apache.camel.component.google.bigquery.GoogleBigQueryEndpoint; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLComponent; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLEndpoint; +import org.apache.camel.component.google.bigquery.sql.SqlHelper; +import org.junit.Test; +import org.mockito.Mockito; + +public class GoogleBigQuerySQLComponentTest { + private CamelContext context = Mockito.mock(CamelContext.class); + + @Test + public void testQuerySet() throws Exception { + String uri = "google-bigquery-sql:myproject:insert into testDatasetId.testTableId(id) values(1)"; + + GoogleBigQuerySQLEndpoint endpoint = (GoogleBigQuerySQLEndpoint)new GoogleBigQuerySQLComponent(context).createEndpoint(uri); + + assertEquals("myproject", endpoint.getConfiguration().getProjectId()); + assertEquals("insert into testDatasetId.testTableId(id) values(1)", endpoint.getConfiguration().getQuery()); + } + + @Test + public void testQueryFromResourceSet() throws Exception { + String uri = "google-bigquery-sql:myproject:classpath:sql/delete.sql"; + + GoogleBigQuerySQLEndpoint endpoint = (GoogleBigQuerySQLEndpoint)new GoogleBigQuerySQLComponent(context).createEndpoint(uri); + + assertEquals("myproject", endpoint.getConfiguration().getProjectId()); + assertEquals("classpath:sql/delete.sql", endpoint.getConfiguration().getQuery()); + } +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java new file mode 100644 index 0000000..244f27c --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerBaseTest.java @@ -0,0 +1,44 @@ +package org.apache.camel.component.google.bigquery.unit.sql; + +import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.QueryResponse; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLConfiguration; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLEndpoint; +import org.apache.camel.component.google.bigquery.sql.GoogleBigQuerySQLProducer; +import org.apache.camel.test.junit4.CamelTestSupport; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class GoogleBigQuerySQLProducerBaseTest extends CamelTestSupport { + protected GoogleBigQuerySQLEndpoint endpoint = mock(GoogleBigQuerySQLEndpoint.class); + protected Bigquery.Jobs mockJobs = mock(Bigquery.Jobs.class); + protected Bigquery.Jobs.Query mockQuery = mock(Bigquery.Jobs.Query.class); + protected GoogleBigQuerySQLProducer producer; + protected String sql; + protected String projectId = "testProjectId"; + protected GoogleBigQuerySQLConfiguration configuration = new GoogleBigQuerySQLConfiguration(); + protected Bigquery bigquery; + + protected GoogleBigQuerySQLProducer createAndStartProducer() throws Exception { + configuration.setProjectId(projectId); + configuration.setQuery(sql); + + GoogleBigQuerySQLProducer sqlProducer = new GoogleBigQuerySQLProducer(bigquery, endpoint, configuration); + sqlProducer.start(); + return sqlProducer; + } + + protected void setupBigqueryMock() throws Exception { + bigquery = mock(Bigquery.class); + + when(bigquery.jobs()).thenReturn(mockJobs); + when(bigquery.jobs().query(anyString(), any())).thenReturn(mockQuery); + + QueryResponse mockResponse = new QueryResponse() + .setNumDmlAffectedRows(1L); + when(mockQuery.execute()).thenReturn(mockResponse); + } +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java new file mode 100644 index 0000000..6227dfb --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithParamersTest.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.unit.sql; + +import com.google.api.services.bigquery.model.QueryRequest; +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeExchangeException; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; + +public class GoogleBigQuerySQLProducerWithParamersTest extends GoogleBigQuerySQLProducerBaseTest { + + @Before + public void init() throws Exception { + sql = "insert into testDatasetId.testTableId(id, data) values(@id, @data)"; + setupBigqueryMock(); + producer = createAndStartProducer(); + } + + @Test + public void sendMessageWithParametersInBody() throws Exception { + Map<String, String> body = new HashMap<>(); + body.put("id", "100"); + body.put("data", "some data"); + producer.process(createExchangeWithBody(body)); + + ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); + verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + + QueryRequest request = dataCaptor.getValue(); + assertEquals(sql, request.getQuery()); + assertEquals(2, request.getQueryParameters().size()); + + assertEquals("id", request.getQueryParameters().get(1).getName()); + assertEquals("100", request.getQueryParameters().get(1).getParameterValue().getValue()); + + assertEquals("data", request.getQueryParameters().get(0).getName()); + assertEquals("some data", request.getQueryParameters().get(0).getParameterValue().getValue()); + } + + @Test + public void sendMessageWithParametersInBodyAndHeaders() throws Exception { + Map<String, String> body = new HashMap<>(); + body.put("id", "100"); + + Exchange exchange = createExchangeWithBody(body); + exchange.getMessage().getHeaders().put("id", "200"); + exchange.getMessage().getHeaders().put("data", "some data"); + + producer.process(exchange); + + ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); + verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + + QueryRequest request = dataCaptor.getValue(); + assertEquals(sql, request.getQuery()); + assertEquals(2, request.getQueryParameters().size()); + + assertEquals("id", request.getQueryParameters().get(1).getName()); + assertEquals("Body data must have higher priority", "100", request.getQueryParameters().get(1).getParameterValue().getValue()); + + assertEquals("data", request.getQueryParameters().get(0).getName()); + assertEquals("some data", request.getQueryParameters().get(0).getParameterValue().getValue()); + } + + @Test(expected = RuntimeExchangeException.class) + public void sendMessageWithoutParameters() throws Exception { + producer.process(createExchangeWithBody(new HashMap<>())); + } +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java new file mode 100644 index 0000000..11cb050 --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/GoogleBigQuerySQLProducerWithPatternTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.unit.sql; + +import com.google.api.services.bigquery.model.QueryRequest; +import org.apache.camel.Exchange; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.List; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; + +public class GoogleBigQuerySQLProducerWithPatternTest extends GoogleBigQuerySQLProducerBaseTest { + + @Before + public void init() throws Exception { + sql = "insert into ${testDatasetId}.testTableId(id, data) values(1, 'test')"; + setupBigqueryMock(); + producer = createAndStartProducer(); + } + + @Test + public void sendExchangeWithProperties() throws Exception { + Exchange exchange = createExchangeWithBody(null); + exchange.getProperties().put("testDatasetId", "dataset"); + String expected = "insert into dataset.testTableId(id, data) values(1, 'test')"; + producer.process(exchange); + + ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); + verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + + List<QueryRequest> requests = dataCaptor.getAllValues(); + assertEquals(1, requests.size()); + assertEquals(expected, requests.get(0).getQuery()); + } + + @Test + public void sendMessageWithHeaders() throws Exception { + Exchange exchange = createExchangeWithBody(null); + exchange.getMessage().getHeaders().put("testDatasetId", "dataset"); + String expected = "insert into dataset.testTableId(id, data) values(1, 'test')"; + producer.process(exchange); + + ArgumentCaptor<QueryRequest> dataCaptor = ArgumentCaptor.forClass(QueryRequest.class); + verify(bigquery.jobs()).query(eq(projectId), dataCaptor.capture()); + + List<QueryRequest> requests = dataCaptor.getAllValues(); + assertEquals(1, requests.size()); + assertEquals(expected, requests.get(0).getQuery()); + } + +} diff --git a/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/SqlHelperTest.java b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/SqlHelperTest.java new file mode 100644 index 0000000..de46355 --- /dev/null +++ b/components/camel-google-bigquery/src/test/java/org/apache/camel/component/google/bigquery/unit/sql/SqlHelperTest.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.google.bigquery.unit.sql; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.RuntimeExchangeException; +import org.apache.camel.component.google.bigquery.sql.SqlHelper; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; + +import java.util.Set; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.when; + +public class SqlHelperTest { + private CamelContext context = Mockito.mock(CamelContext.class); + + @Test + public void testResolveQuery() throws Exception { + String answer = SqlHelper.resolveQuery(context, "delete from test.test_sql_table where id = 1", null); + assertEquals("delete from test.test_sql_table where id = 1", answer); + } + + @Test + @Ignore + public void testResolveClasspathQuery() throws Exception { + String answer = SqlHelper.resolveQuery(context, "classpath:sql/delete.sql", ":"); + assertEquals("delete from test.test_sql_table where id = @id", answer); + } + + String query = "INSERT INTO ${report}.test( -- TODO \n" + + " id,\n" + + " region\n" + + ")\n" + + "SELECT\n" + + " id,\n" + + " region\n" + + "FROM\n" + + " ${import}.test\n" + + "WHERE\n" + + " rec_date = @date AND id = @id\n"; + + String expected = "INSERT INTO report_data.test( -- TODO \n" + + " id,\n" + + " region\n" + + ")\n" + + "SELECT\n" + + " id,\n" + + " region\n" + + "FROM\n" + + " import_data.test\n" + + "WHERE\n" + + " rec_date = @date AND id = @id\n"; + + Exchange exchange = Mockito.mock(Exchange.class); + Message message = Mockito.mock(Message.class); + + @Test + public void testTranslateQuery() { + when(exchange.getMessage()).thenReturn(message); + when(message.getHeader(eq("report"), eq(String.class))).thenReturn("report_data"); + when(message.getHeader(eq("import"), eq(String.class))).thenReturn("import_data"); + + String answer = SqlHelper.translateQuery(query, exchange); + assertEquals(expected, answer); + } + + @Test + public void testTranslateQueryProperties() { + when(exchange.getMessage()).thenReturn(message); + when(exchange.getProperty(eq("report"), eq(String.class))).thenReturn("report_data"); + when(exchange.getProperty(eq("import"), eq(String.class))).thenReturn("import_data"); + + String answer = SqlHelper.translateQuery(query, exchange); + assertEquals(expected, answer); + } + + @Test(expected = RuntimeExchangeException.class) + public void testTranslateQueryWithoutParam() { + when(exchange.getMessage()).thenReturn(message); + when(message.getHeader(eq("report"), eq(String.class))).thenReturn("report_data"); + + SqlHelper.translateQuery(query, exchange); + fail("Should have thrown exception"); + } + + @Test + public void testExtractParameterNames() { + Set<String> answer = SqlHelper.extractParameterNames(query); + assertEquals(2, answer.size()); + assertTrue("Parameter 'date' not found", answer.contains("date")); + assertTrue("Parameter 'id' not found", answer.contains("id")); + } +} diff --git a/components/camel-google-bigquery/src/test/resources/sql/delete.sql b/components/camel-google-bigquery/src/test/resources/sql/delete.sql new file mode 100644 index 0000000..d074f55 --- /dev/null +++ b/components/camel-google-bigquery/src/test/resources/sql/delete.sql @@ -0,0 +1 @@ +delete from test.test_sql_table where id = :id \ No newline at end of file diff --git a/components/camel-google-bigquery/src/test/resources/sql/insert.sql b/components/camel-google-bigquery/src/test/resources/sql/insert.sql new file mode 100644 index 0000000..74048aa --- /dev/null +++ b/components/camel-google-bigquery/src/test/resources/sql/insert.sql @@ -0,0 +1,2 @@ +insert into test.test_sql_table (col1, col2) +values (@col1, @col2) \ No newline at end of file