This is an automated email from the ASF dual-hosted git repository. dmvolod pushed a commit to branch camel-2.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.x by this push: new f8c5f58 CAMEL-13082: Filter results already seen by the Olingo consumers and producers Closes #2720 * Olingo[4,2]Configuration * Adds filterAlreadySeen property * Added here rather than to the endpoint as the latter would require ultimately, in order to satisfy various checks, changes to the endpoint read() method in the api's Olingo4AppImpl, which is unnecessary since this is used by the consumer. f8c5f58 is described below commit f8c5f58d9b6d4232284b45d9c7f4e6ff89ec13cc Author: phantomjinx <p.g.richard...@phantomjinx.co.uk> AuthorDate: Fri Jan 18 16:03:48 2019 +0000 CAMEL-13082: Filter results already seen by the Olingo consumers and producers Closes #2720 * Olingo[4,2]Configuration * Adds filterAlreadySeen property * Added here rather than to the endpoint as the latter would require ultimately, in order to satisfy various checks, changes to the endpoint read() method in the api's Olingo4AppImpl, which is unnecessary since this is used by the consumer. * Olingo[4,2]Endpoint * Adds filterAlreadySeen property names to the property names collection so that parseQueryParams() does not remove it. * Adds the filterAlreadySeen property during interceptProperties() in a just-in-time manner (avoiding possibility of it causing problems with endpoint and consumer initialisation). * Olingo[4,2][Consumer,Producer] * Adds a resultIndex for indexing hashcodes of existing results * On receiving a response, if the resultIndex has been initialised then the response results are checked by getting their hashcodes. If they're in the resultIndex then they are discarded. * Prior to any messages interceptProperties is called and if the properties contain the filterAlreadySeen property then the resultIndex is initialised. * Once the results have been placed in the exchange, interceptResult() is called and some housekeeping is done to index all the results in the resultIndex ready for checking on the next polling. --- .../src/main/docs/olingo2-component.adoc | 6 +- .../component/olingo2/Olingo2Configuration.java | 17 +++ .../camel/component/olingo2/Olingo2Consumer.java | 34 ++++- .../camel/component/olingo2/Olingo2Endpoint.java | 7 + .../camel/component/olingo2/Olingo2Index.java | 151 +++++++++++++++++++++ .../camel/component/olingo2/Olingo2Producer.java | 33 +++++ .../component/olingo2/Olingo2ComponentTest.java | 139 ++++++++++++++++++- .../src/main/docs/olingo4-component.adoc | 6 +- .../component/olingo4/Olingo4Configuration.java | 18 ++- .../camel/component/olingo4/Olingo4Consumer.java | 35 ++++- .../camel/component/olingo4/Olingo4Endpoint.java | 5 + .../camel/component/olingo4/Olingo4Index.java | 116 ++++++++++++++++ .../camel/component/olingo4/Olingo4Producer.java | 34 ++++- .../component/olingo4/Olingo4ComponentTest.java | 133 +++++++++++++++++- .../springboot/Olingo2ComponentConfiguration.java | 13 ++ .../springboot/Olingo4ComponentConfiguration.java | 13 ++ 16 files changed, 750 insertions(+), 10 deletions(-) diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/docs/olingo2-component.adoc b/components/camel-olingo2/camel-olingo2-component/src/main/docs/olingo2-component.adoc index 425e7f4..68afdbc 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/main/docs/olingo2-component.adoc +++ b/components/camel-olingo2/camel-olingo2-component/src/main/docs/olingo2-component.adoc @@ -82,7 +82,7 @@ with the following path and query parameters: |=== -==== Query Parameters (14 parameters): +==== Query Parameters (15 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -90,6 +90,7 @@ with the following path and query parameters: | Name | Description | Default | Type | *connectTimeout* (common) | HTTP connection creation timeout in milliseconds, defaults to 30,000 (30 seconds) | 30000 | int | *contentType* (common) | Content-Type header value can be used to specify JSON or XML message format, defaults to application/json;charset=utf-8 | application/json;charset=utf-8 | String +| *filterAlreadySeen* (common) | Set this to true to filter out results that have already been communicated by this component. | false | boolean | *httpAsyncClientBuilder* (common) | Custom HTTP async client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpAsyncClientBuilder | *httpClientBuilder* (common) | Custom HTTP client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpClientBuilder | *httpHeaders* (common) | Custom HTTP headers to inject into every request, this could include OAuth tokens, etc. | | Map @@ -108,7 +109,7 @@ with the following path and query parameters: === Spring Boot Auto-Configuration -The component supports 14 options, which are listed below. +The component supports 15 options, which are listed below. @@ -118,6 +119,7 @@ The component supports 14 options, which are listed below. | *camel.component.olingo2.configuration.api-name* | What kind of operation to perform | | Olingo2ApiName | *camel.component.olingo2.configuration.connect-timeout* | HTTP connection creation timeout in milliseconds, defaults to 30,000 (30 seconds) | 30000 | Integer | *camel.component.olingo2.configuration.content-type* | Content-Type header value can be used to specify JSON or XML message format, defaults to application/json;charset=utf-8 | application/json;charset=utf-8 | String +| *camel.component.olingo2.configuration.filter-already-seen* | Set this to true to filter out results that have already been communicated by this component. | false | Boolean | *camel.component.olingo2.configuration.http-async-client-builder* | Custom HTTP async client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpAsyncClientBuilder | *camel.component.olingo2.configuration.http-client-builder* | Custom HTTP client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpClientBuilder | *camel.component.olingo2.configuration.http-headers* | Custom HTTP headers to inject into every request, this could include OAuth tokens, etc. | | Map diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Configuration.java b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Configuration.java index d2fa870..f2c66d0 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Configuration.java +++ b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Configuration.java @@ -62,6 +62,8 @@ public class Olingo2Configuration { private HttpAsyncClientBuilder httpAsyncClientBuilder; @UriParam private HttpClientBuilder httpClientBuilder; + @UriParam + private boolean filterAlreadySeen; public Olingo2ApiName getApiName() { return apiName; @@ -186,6 +188,21 @@ public class Olingo2Configuration { this.httpClientBuilder = httpClientBuilder; } + /** + * Filter flag for filtering out already seen results + */ + public boolean getFilterAlreadySeen() { + return filterAlreadySeen; + } + + /** + * Set this to true to filter out results that have already been communicated by this component. + * @param filterAlreadySeen + */ + public void setFilterAlreadySeen(boolean filterAlreadySeen) { + this.filterAlreadySeen = filterAlreadySeen; + } + @Override public int hashCode() { return new HashCodeBuilder() diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Consumer.java b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Consumer.java index ff108c5..e784cfd 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Consumer.java +++ b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Consumer.java @@ -19,7 +19,7 @@ package org.apache.camel.component.olingo2; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; - +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.olingo2.api.Olingo2ResponseHandler; @@ -35,6 +35,8 @@ import org.apache.olingo.odata2.api.ep.feed.ODataFeed; */ public class Olingo2Consumer extends AbstractApiConsumer<Olingo2ApiName, Olingo2Configuration> { + private Olingo2Index resultIndex; + public Olingo2Consumer(Olingo2Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -58,6 +60,10 @@ public class Olingo2Consumer extends AbstractApiConsumer<Olingo2ApiName, Olingo2 args.put(Olingo2Endpoint.RESPONSE_HANDLER_PROPERTY, new Olingo2ResponseHandler<Object>() { @Override public void onResponse(Object response, Map<String, String> responseHeaders) { + if (resultIndex != null) { + response = resultIndex.filterResponse(response); + } + result[0] = response; latch.countDown(); } @@ -99,4 +105,30 @@ public class Olingo2Consumer extends AbstractApiConsumer<Olingo2ApiName, Olingo2 } } + @Override + public void interceptProperties(Map<String, Object> properties) { + // + // If we have a filterAlreadySeen property then initialise the filter index + // + Object value = properties.get(Olingo2Endpoint.FILTER_ALREADY_SEEN); + if (value == null) { + return; + } + + // + // Initialise the index if not already and if filterAlreadySeen has been set + // + if (Boolean.parseBoolean(value.toString()) && resultIndex == null) { + resultIndex = new Olingo2Index(); + } + } + + @Override + public void interceptResult(Object result, Exchange resultExchange) { + if (resultIndex == null) { + return; + } + + resultIndex.index(result); + } } diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java index 31709b4..d724503 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java +++ b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Endpoint.java @@ -43,6 +43,8 @@ public class Olingo2Endpoint extends AbstractApiEndpoint<Olingo2ApiName, Olingo2 protected static final String RESOURCE_PATH_PROPERTY = "resourcePath"; protected static final String RESPONSE_HANDLER_PROPERTY = "responseHandler"; + protected static final String SERVICE_URI_PROPERTY = "serviceUri"; + protected static final String FILTER_ALREADY_SEEN = "filterAlreadySeen"; private static final String KEY_PREDICATE_PROPERTY = "keyPredicate"; private static final String QUERY_PARAMS_PROPERTY = "queryParams"; @@ -74,6 +76,8 @@ public class Olingo2Endpoint extends AbstractApiEndpoint<Olingo2ApiName, Olingo2 // avoid adding edm as queryParam endpointPropertyNames.add(EDM_PROPERTY); endpointPropertyNames.add(ENDPOINT_HTTP_HEADERS_PROPERTY); + endpointPropertyNames.add(SERVICE_URI_PROPERTY); + endpointPropertyNames.add(FILTER_ALREADY_SEEN); } public Producer createProducer() throws Exception { @@ -164,6 +168,9 @@ public class Olingo2Endpoint extends AbstractApiEndpoint<Olingo2ApiName, Olingo2 // read Edm if not set yet properties.put(EDM_PROPERTY, apiProxy.getEdm()); + // handle filterAlreadySeen property + properties.put(FILTER_ALREADY_SEEN, configuration.getFilterAlreadySeen()); + // handle keyPredicate final String keyPredicate = (String) properties.get(KEY_PREDICATE_PROPERTY); if (keyPredicate != null) { diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Index.java b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Index.java new file mode 100644 index 0000000..81ac6a2 --- /dev/null +++ b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Index.java @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2016 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.olingo2; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.olingo.odata2.api.ep.entry.EntryMetadata; +import org.apache.olingo.odata2.api.ep.entry.ODataEntry; +import org.apache.olingo.odata2.api.ep.feed.ODataFeed; + +public class Olingo2Index { + + private Set<Integer> resultIndex = new HashSet<>(); + + /** + * Hash only certain data since other parts change between message + * exchanges. + * + * @param metadata + * @return hashcode of metadata + */ + private int hash(EntryMetadata metadata) { + final int prime = 31; + int result = 1; + result = prime * result + ((metadata.getId() == null) ? 0 : metadata.getId().hashCode()); + result = prime * result + ((metadata.getUri() == null) ? 0 : metadata.getUri().hashCode()); + return result; + } + + /** + * Hash entry leaving out certain fields that change + * between exchange messages + * + * @param entry + * @return hascode of entry + */ + private int hash(ODataEntry entry) { + final int prime = 31; + int result = 1; + // Hash metadata to ignore certain entries + result = prime * result + ((entry.getMetadata() == null) ? 0 : hash(entry.getMetadata())); + result = prime * result + ((entry.getProperties() == null) ? 0 : entry.getProperties().hashCode()); + + // Ignore mediaMetadata, expandSelectTree since its object changes each time + + return result; + } + + private Object filter(Object o) { + if (resultIndex.contains(o.hashCode())) { + return null; + } + return o; + } + + private void indexDefault(Object o) { + resultIndex.add(o.hashCode()); + } + + private Iterable<?> filter(Iterable<?> iterable) { + List<Object> filtered = new ArrayList<>(); + for (Object o : iterable) { + if (resultIndex.contains(o.hashCode())) { + continue; + } + filtered.add(o); + } + + return filtered; + } + + private void index(Iterable<?> iterable) { + for (Object o : iterable) { + resultIndex.add(o.hashCode()); + } + } + + private ODataFeed filter(ODataFeed odataFeed) { + List<ODataEntry> entries = odataFeed.getEntries(); + + if (entries.isEmpty()) { + return odataFeed; + } + + List<ODataEntry> copyEntries = new ArrayList<>(); + copyEntries.addAll(entries); + + for (ODataEntry entry : copyEntries) { + if (resultIndex.contains(hash(entry))) { + entries.remove(entry); + } + } + return odataFeed; + } + + private void index(ODataFeed odataFeed) { + for (ODataEntry entry : odataFeed.getEntries()) { + resultIndex.add(hash(entry)); + } + } + + /** + * Index the results + */ + public void index(Object result) { + if (result instanceof ODataFeed) { + index((ODataFeed) result); + } else if (result instanceof Iterable) { + index((Iterable<?>) result); + } else { + indexDefault(result); + } + } + + @SuppressWarnings( "unchecked" ) + public Object filterResponse(Object response) { + if (response instanceof ODataFeed) { + response = filter((ODataFeed) response); + } else + if (response instanceof Iterable) { + response = filter((Iterable<Object>) response); + } else if (response.getClass().isArray()) { + List<Object> result = new ArrayList<>(); + final int size = Array.getLength(response); + for (int i = 0; i < size; i++) { + result.add(Array.get(response, i)); + } + response = filter(result); + } else { + response = filter(response); + } + + return response; + } +} diff --git a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Producer.java b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Producer.java index e2fabc0..01db766 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Producer.java +++ b/components/camel-olingo2/camel-olingo2-component/src/main/java/org/apache/camel/component/olingo2/Olingo2Producer.java @@ -41,6 +41,8 @@ public class Olingo2Producer extends AbstractApiProducer<Olingo2ApiName, Olingo2 private static final String RESPONSE_HTTP_HEADERS = "responseHttpHeaders"; + private Olingo2Index resultIndex; + public Olingo2Producer(Olingo2Endpoint endpoint) { super(endpoint, Olingo2PropertiesHelper.getHelper()); } @@ -60,6 +62,10 @@ public class Olingo2Producer extends AbstractApiProducer<Olingo2ApiName, Olingo2 properties.put(Olingo2Endpoint.RESPONSE_HANDLER_PROPERTY, new Olingo2ResponseHandler<Object>() { @Override public void onResponse(Object response, Map<String, String> responseHeaders) { + if (resultIndex != null) { + response = resultIndex.filterResponse(response); + } + // producer returns a single response, even for methods with List return types exchange.getOut().setBody(response); // copy headers @@ -108,4 +114,31 @@ public class Olingo2Producer extends AbstractApiProducer<Olingo2ApiName, Olingo2 return false; } + + @Override + public void interceptProperties(Map<String, Object> properties) { + // + // If we have a filterAlreadySeen property then initialise the filter index + // + Object value = properties.get(Olingo2Endpoint.FILTER_ALREADY_SEEN); + if (value == null) { + return; + } + + // + // Initialise the index if not already and if filterAlreadySeen has been set + // + if (Boolean.parseBoolean(value.toString()) && resultIndex == null) { + resultIndex = new Olingo2Index(); + } + } + + @Override + public void interceptResult(Object result, Exchange resultExchange) { + if (resultIndex == null) { + return; + } + + resultIndex.index(result); + } } diff --git a/components/camel-olingo2/camel-olingo2-component/src/test/java/org/apache/camel/component/olingo2/Olingo2ComponentTest.java b/components/camel-olingo2/camel-olingo2-component/src/test/java/org/apache/camel/component/olingo2/Olingo2ComponentTest.java index beb12c1..65d26ee 100644 --- a/components/camel-olingo2/camel-olingo2-component/src/test/java/org/apache/camel/component/olingo2/Olingo2ComponentTest.java +++ b/components/camel-olingo2/camel-olingo2-component/src/test/java/org/apache/camel/component/olingo2/Olingo2ComponentTest.java @@ -20,8 +20,8 @@ import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.component.olingo2.api.batch.Olingo2BatchChangeRequest; import org.apache.camel.component.olingo2.api.batch.Olingo2BatchQueryRequest; import org.apache.camel.component.olingo2.api.batch.Olingo2BatchRequest; @@ -247,6 +247,130 @@ public class Olingo2ComponentTest extends AbstractOlingo2TestSupport { LOG.info("Read deleted entry exception: {}", exception); } + /** + * Read entity set of the People object + * and filter already seen items on subsequent exchanges + * Use a delay since the mock endpoint does not always get + * the correct number of exchanges before being satisfied. + */ + @Test + public void testConsumerReadFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "olingo2://read/Manufacturers?filterAlreadySeen=true&consumer.delay=2&consumer.sendEmptyMessageWhenIdle=true"; + final ODataFeed manufacturers = (ODataFeed)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(manufacturers); + int expectedManufacturers = manufacturers.getEntries().size(); + + int expectedMsgCount = 3; + MockEndpoint mockEndpoint = getMockEndpoint("mock:consumer-alreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.setResultWaitTime(60000); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + + if (i == 0) { + // + // First polled messages contained all the manufacturers + // + assertTrue(body instanceof ODataFeed); + ODataFeed set = (ODataFeed) body; + assertEquals(expectedManufacturers, set.getEntries().size()); + } + else { + // + // Subsequent polling messages should be empty + // since the filterAlreadySeen property is true + // + assertNull(body); + } + } + } + + /** + * + * Read entity set of the People object + * and with no filter already seen, all items + * should be present in each message + * + * @throws Exception + */ + @Test + public void testProducerReadNoFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "direct:read-people-nofilterseen"; + int expectedMsgCount = 3; + + int expectedEntities = -1; + for (int i = 0; i < expectedMsgCount; ++i) { + final ODataFeed manufacturers = (ODataFeed)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(manufacturers); + if (i == 0) { + expectedEntities = manufacturers.getEntries().size(); + } + } + + MockEndpoint mockEndpoint = getMockEndpoint("mock:producer-noalreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + assertTrue(body instanceof ODataFeed); + ODataFeed set = (ODataFeed) body; + + // + // All messages contained all the manufacturers + // + assertEquals(expectedEntities, set.getEntries().size()); + } + } + + /** + * Read entity set of the People object + * and filter already seen items on subsequent exchanges + */ + @Test + public void testProducerReadFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "direct:read-people-filterseen"; + int expectedMsgCount = 3; + + int expectedEntities = -1; + for (int i = 0; i < expectedMsgCount; ++i) { + final ODataFeed manufacturers = (ODataFeed)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(manufacturers); + if (i == 0) { + expectedEntities = manufacturers.getEntries().size(); + } + } + + MockEndpoint mockEndpoint = getMockEndpoint("mock:producer-alreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + assertTrue(body instanceof ODataFeed); + ODataFeed set = (ODataFeed) body; + + if (i == 0) { + // + // First polled messages contained all the manufacturers + // + assertEquals(expectedEntities, set.getEntries().size()); + } + else { + // + // Subsequent messages should be empty + // since the filterAlreadySeen property is true + // + assertEquals(0, set.getEntries().size()); + } + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -287,6 +411,19 @@ public class Olingo2ComponentTest extends AbstractOlingo2TestSupport { from("direct://BATCH") .to("olingo2://batch"); + from("direct:read-people-nofilterseen") + .to("olingo2://read/Manufacturers") + .to("mock:producer-noalreadyseen"); + + from("direct:read-people-filterseen") + .to("olingo2://read/Manufacturers?filterAlreadySeen=true") + .to("mock:producer-alreadyseen"); + + // + // Consumer endpoint + // + from("olingo2://read/Manufacturers?filterAlreadySeen=true&consumer.delay=2&consumer.sendEmptyMessageWhenIdle=true") + .to("mock:consumer-alreadyseen"); } }; } diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/docs/olingo4-component.adoc b/components/camel-olingo4/camel-olingo4-component/src/main/docs/olingo4-component.adoc index 8cd46d8..4270311 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/main/docs/olingo4-component.adoc +++ b/components/camel-olingo4/camel-olingo4-component/src/main/docs/olingo4-component.adoc @@ -83,7 +83,7 @@ with the following path and query parameters: |=== -==== Query Parameters (14 parameters): +==== Query Parameters (15 parameters): [width="100%",cols="2,5,^1,2",options="header"] @@ -91,6 +91,7 @@ with the following path and query parameters: | Name | Description | Default | Type | *connectTimeout* (common) | HTTP connection creation timeout in milliseconds, defaults to 30,000 (30 seconds) | 30000 | int | *contentType* (common) | Content-Type header value can be used to specify JSON or XML message format, defaults to application/json;charset=utf-8 | application/json;charset=utf-8 | String +| *filterAlreadySeen* (common) | Set this to true to filter out results that have already been communicated by this component. | false | boolean | *httpAsyncClientBuilder* (common) | Custom HTTP async client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpAsyncClientBuilder | *httpClientBuilder* (common) | Custom HTTP client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpClientBuilder | *httpHeaders* (common) | Custom HTTP headers to inject into every request, this could include OAuth tokens, etc. | | Map @@ -109,7 +110,7 @@ with the following path and query parameters: === Spring Boot Auto-Configuration -The component supports 14 options, which are listed below. +The component supports 15 options, which are listed below. @@ -119,6 +120,7 @@ The component supports 14 options, which are listed below. | *camel.component.olingo4.configuration.api-name* | What kind of operation to perform | | Olingo4ApiName | *camel.component.olingo4.configuration.connect-timeout* | HTTP connection creation timeout in milliseconds, defaults to 30,000 (30 seconds) | 30000 | Integer | *camel.component.olingo4.configuration.content-type* | Content-Type header value can be used to specify JSON or XML message format, defaults to application/json;charset=utf-8 | application/json;charset=utf-8 | String +| *camel.component.olingo4.configuration.filter-already-seen* | Set this to true to filter out results that have already been communicated by this component. | false | Boolean | *camel.component.olingo4.configuration.http-async-client-builder* | Custom HTTP async client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpAsyncClientBuilder | *camel.component.olingo4.configuration.http-client-builder* | Custom HTTP client builder for more complex HTTP client configuration, overrides connectionTimeout, socketTimeout, proxy and sslContext. Note that a socketTimeout MUST be specified in the builder, otherwise OData requests could block indefinitely | | HttpClientBuilder | *camel.component.olingo4.configuration.http-headers* | Custom HTTP headers to inject into every request, this could include OAuth tokens, etc. | | Map diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Configuration.java b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Configuration.java index 68672ba..8aac4d9 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Configuration.java +++ b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Configuration.java @@ -17,7 +17,6 @@ package org.apache.camel.component.olingo4; import java.util.Map; - import org.apache.camel.component.olingo4.internal.Olingo4ApiName; import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriParam; @@ -62,6 +61,8 @@ public class Olingo4Configuration { private HttpAsyncClientBuilder httpAsyncClientBuilder; @UriParam private HttpClientBuilder httpClientBuilder; + @UriParam + private boolean filterAlreadySeen; public Olingo4ApiName getApiName() { return apiName; @@ -186,6 +187,21 @@ public class Olingo4Configuration { this.httpClientBuilder = httpClientBuilder; } + /** + * Filter flag for filtering out already seen results + */ + public boolean getFilterAlreadySeen() { + return filterAlreadySeen; + } + + /** + * Set this to true to filter out results that have already been communicated by this component. + * @param filterAlreadySeen + */ + public void setFilterAlreadySeen(boolean filterAlreadySeen) { + this.filterAlreadySeen = filterAlreadySeen; + } + @Override public int hashCode() { return new HashCodeBuilder() diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Consumer.java b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Consumer.java index 16256bc..eedc0ed 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Consumer.java +++ b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Consumer.java @@ -19,7 +19,7 @@ package org.apache.camel.component.olingo4; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; - +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.olingo4.api.Olingo4ResponseHandler; @@ -34,6 +34,8 @@ import org.apache.olingo.client.api.domain.ClientEntitySet; */ public class Olingo4Consumer extends AbstractApiConsumer<Olingo4ApiName, Olingo4Configuration> { + private Olingo4Index resultIndex; + public Olingo4Consumer(Olingo4Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -57,6 +59,10 @@ public class Olingo4Consumer extends AbstractApiConsumer<Olingo4ApiName, Olingo4 args.put(Olingo4Endpoint.RESPONSE_HANDLER_PROPERTY, new Olingo4ResponseHandler<Object>() { @Override public void onResponse(Object response, Map<String, String> responseHeaders) { + if (resultIndex != null) { + response = resultIndex.filterResponse(response); + } + result[0] = response; latch.countDown(); } @@ -98,4 +104,31 @@ public class Olingo4Consumer extends AbstractApiConsumer<Olingo4ApiName, Olingo4 throw ObjectHelper.wrapRuntimeCamelException(t); } } + + @Override + public void interceptProperties(Map<String, Object> properties) { + // + // If we have a filterAlreadySeen property then initialise the filter index + // + Object value = properties.get(Olingo4Endpoint.FILTER_ALREADY_SEEN); + if (value == null) { + return; + } + + // + // Initialise the index if not already and if filterAlreadySeen has been set + // + if (Boolean.parseBoolean(value.toString()) && resultIndex == null) { + resultIndex = new Olingo4Index(); + } + } + + @Override + public void interceptResult(Object result, Exchange resultExchange) { + if (resultIndex == null) { + return; + } + + resultIndex.index(result); + } } diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java index 62e94a3..b211a0a 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java +++ b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Endpoint.java @@ -44,6 +44,7 @@ public class Olingo4Endpoint extends AbstractApiEndpoint<Olingo4ApiName, Olingo4 protected static final String RESOURCE_PATH_PROPERTY = "resourcePath"; protected static final String RESPONSE_HANDLER_PROPERTY = "responseHandler"; protected static final String SERVICE_URI_PROPERTY = "serviceUri"; + protected static final String FILTER_ALREADY_SEEN = "filterAlreadySeen"; private static final String KEY_PREDICATE_PROPERTY = "keyPredicate"; private static final String QUERY_PARAMS_PROPERTY = "queryParams"; @@ -75,6 +76,7 @@ public class Olingo4Endpoint extends AbstractApiEndpoint<Olingo4ApiName, Olingo4 endpointPropertyNames.add(EDM_PROPERTY); endpointPropertyNames.add(ENDPOINT_HTTP_HEADERS_PROPERTY); endpointPropertyNames.add(SERVICE_URI_PROPERTY); + endpointPropertyNames.add(FILTER_ALREADY_SEEN); } public Producer createProducer() throws Exception { @@ -165,6 +167,9 @@ public class Olingo4Endpoint extends AbstractApiEndpoint<Olingo4ApiName, Olingo4 // read Edm if not set yet properties.put(EDM_PROPERTY, apiProxy.getEdm()); + // handle filterAlreadySeen property + properties.put(FILTER_ALREADY_SEEN, configuration.getFilterAlreadySeen()); + // handle keyPredicate final String keyPredicate = (String)properties.get(KEY_PREDICATE_PROPERTY); if (keyPredicate != null) { diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Index.java b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Index.java new file mode 100644 index 0000000..a5f7694 --- /dev/null +++ b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Index.java @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2016 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.olingo4; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.olingo.client.api.domain.ClientEntity; +import org.apache.olingo.client.api.domain.ClientEntitySet; + +public class Olingo4Index { + + private Set<Integer> resultIndex = new HashSet<>(); + + private Object filter(Object o) { + if (resultIndex.contains(o.hashCode())) { + return null; + } + return o; + } + + private void indexDefault(Object o) { + resultIndex.add(o.hashCode()); + } + + private Iterable<?> filter(Iterable<?> iterable) { + List<Object> filtered = new ArrayList<>(); + for (Object o : iterable) { + if (resultIndex.contains(o.hashCode())) { + continue; + } + filtered.add(o); + } + + return filtered; + } + + private void index(Iterable<?> iterable) { + for (Object o : iterable) { + resultIndex.add(o.hashCode()); + } + } + + private ClientEntitySet filter(ClientEntitySet entitySet) { + List<ClientEntity> entities = entitySet.getEntities(); + + if (entities.isEmpty()) { + return entitySet; + } + + List<ClientEntity> copyEntities = new ArrayList<>(); + copyEntities.addAll(entities); + + for (ClientEntity entity : copyEntities) { + if (resultIndex.contains(entity.hashCode())) { + entities.remove(entity); + } + } + + return entitySet; + } + + private void index(ClientEntitySet entitySet) { + for (ClientEntity entity : entitySet.getEntities()) { + resultIndex.add(entity.hashCode()); + } + } + + /** + * Index the results + */ + public void index(Object result) { + if (result instanceof ClientEntitySet) { + index((ClientEntitySet) result); + } else if (result instanceof Iterable) { + index((Iterable<?>) result); + } else { + indexDefault(result); + } + } + + @SuppressWarnings( "unchecked" ) + public Object filterResponse(Object response) { + if (response instanceof ClientEntitySet) { + response = filter((ClientEntitySet) response); + } else if (response instanceof Iterable) { + response = filter((Iterable<Object>) response); + } else if (response.getClass().isArray()) { + List<Object> result = new ArrayList<>(); + final int size = Array.getLength(response); + for (int i = 0; i < size; i++) { + result.add(Array.get(response, i)); + } + response = filter(result); + } else { + response = filter(response); + } + + return response; + } +} diff --git a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Producer.java b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Producer.java index 5baa1e9..639881d 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Producer.java +++ b/components/camel-olingo4/camel-olingo4-component/src/main/java/org/apache/camel/component/olingo4/Olingo4Producer.java @@ -18,7 +18,6 @@ package org.apache.camel.component.olingo4; import java.util.HashMap; import java.util.Map; - import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; @@ -39,6 +38,8 @@ public class Olingo4Producer extends AbstractApiProducer<Olingo4ApiName, Olingo4 private static final Logger LOG = LoggerFactory.getLogger(Olingo4Producer.class); + private Olingo4Index resultIndex; + public Olingo4Producer(Olingo4Endpoint endpoint) { super(endpoint, Olingo4PropertiesHelper.getHelper()); } @@ -58,6 +59,10 @@ public class Olingo4Producer extends AbstractApiProducer<Olingo4ApiName, Olingo4 properties.put(Olingo4Endpoint.RESPONSE_HANDLER_PROPERTY, new Olingo4ResponseHandler<Object>() { @Override public void onResponse(Object response, Map<String, String> responseHeaders) { + if (resultIndex != null) { + response = resultIndex.filterResponse(response); + } + // producer returns a single response, even for methods with // List return types exchange.getOut().setBody(response); @@ -107,4 +112,31 @@ public class Olingo4Producer extends AbstractApiProducer<Olingo4ApiName, Olingo4 return false; } + + @Override + public void interceptProperties(Map<String, Object> properties) { + // + // If we have a filterAlreadySeen property then initialise the filter index + // + Object value = properties.get(Olingo4Endpoint.FILTER_ALREADY_SEEN); + if (value == null) { + return; + } + + // + // Initialise the index if not already and if filterAlreadySeen has been set + // + if (Boolean.parseBoolean(value.toString()) && resultIndex == null) { + resultIndex = new Olingo4Index(); + } + } + + @Override + public void interceptResult(Object result, Exchange resultExchange) { + if (resultIndex == null) { + return; + } + + resultIndex.index(result); + } } diff --git a/components/camel-olingo4/camel-olingo4-component/src/test/java/org/apache/camel/component/olingo4/Olingo4ComponentTest.java b/components/camel-olingo4/camel-olingo4-component/src/test/java/org/apache/camel/component/olingo4/Olingo4ComponentTest.java index 4300553..fd34bdb 100644 --- a/components/camel-olingo4/camel-olingo4-component/src/test/java/org/apache/camel/component/olingo4/Olingo4ComponentTest.java +++ b/components/camel-olingo4/camel-olingo4-component/src/test/java/org/apache/camel/component/olingo4/Olingo4ComponentTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.camel.CamelExecutionException; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -297,6 +296,124 @@ public class Olingo4ComponentTest extends AbstractOlingo4TestSupport { } } + /** + * Read entity set of the People object + * and filter already seen items on subsequent exchanges + * Use a delay since the mock endpoint does not always get + * the correct number of exchanges before being satisfied. + */ + @Test + public void testConsumerReadFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "olingo4://read/People?filterAlreadySeen=true&consumer.delay=2&consumer.sendEmptyMessageWhenIdle=true"; + int expectedEntities = 20; + final ClientEntitySet entities = (ClientEntitySet)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(entities); + assertEquals(expectedEntities, entities.getEntities().size()); + + int expectedMsgCount = 3; + MockEndpoint mockEndpoint = getMockEndpoint("mock:consumer-alreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + + if (i == 0) { + // + // First polled messages contained all the entities + // + assertTrue(body instanceof ClientEntitySet); + ClientEntitySet set = (ClientEntitySet) body; + assertEquals(expectedEntities, set.getEntities().size()); + } + else { + // + // Subsequent polling messages should be empty + // since the filterAlreadySeen property is true + // + assertNull(body); + } + } + } + + /** + * + * Read entity set of the People object + * and with no filter already seen, all items + * should be present in each message + * + * @throws Exception + */ + @Test + public void testProducerReadNoFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "direct:read-people-nofilterseen"; + int expectedEntities = 20; + int expectedMsgCount = 3; + + for (int i = 0; i < expectedMsgCount; ++i) { + final ClientEntitySet entities = (ClientEntitySet)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(entities); + } + + MockEndpoint mockEndpoint = getMockEndpoint("mock:producer-noalreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + assertTrue(body instanceof ClientEntitySet); + ClientEntitySet set = (ClientEntitySet) body; + + // + // All messages contained all the entities + // + assertEquals(expectedEntities, set.getEntities().size()); + } + } + + /** + * Read entity set of the People object + * and filter already seen items on subsequent exchanges + */ + @Test + public void testProducerReadFilterAlreadySeen() throws Exception { + final Map<String, Object> headers = new HashMap<>(); + String endpoint = "direct:read-people-filterseen"; + int expectedEntities = 20; + int expectedMsgCount = 3; + + for (int i = 0; i < expectedMsgCount; ++i) { + final ClientEntitySet entities = (ClientEntitySet)requestBodyAndHeaders(endpoint, null, headers); + assertNotNull(entities); + } + + MockEndpoint mockEndpoint = getMockEndpoint("mock:producer-alreadyseen"); + mockEndpoint.expectedMessageCount(expectedMsgCount); + mockEndpoint.assertIsSatisfied(); + + for (int i = 0; i < expectedMsgCount; ++i) { + Object body = mockEndpoint.getExchanges().get(i).getIn().getBody(); + assertTrue(body instanceof ClientEntitySet); + ClientEntitySet set = (ClientEntitySet) body; + + if (i == 0) { + // + // First polled messages contained all the entities + // + assertEquals(expectedEntities, set.getEntities().size()); + } + else { + // + // Subsequent messages should be empty + // since the filterAlreadySeen property is true + // + assertEquals(0, set.getEntities().size()); + } + } + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -340,6 +457,20 @@ public class Olingo4ComponentTest extends AbstractOlingo4TestSupport { from("direct:read-etag").to("olingo4://read/Airlines('AA')").to("mock:check-etag-header"); from("direct:delete-with-etag").to("olingo4://delete/Airlines('AA')"); + + from("direct:read-people-nofilterseen") + .to("olingo4://read/People") + .to("mock:producer-noalreadyseen"); + + from("direct:read-people-filterseen") + .to("olingo4://read/People?filterAlreadySeen=true") + .to("mock:producer-alreadyseen"); + + // + // Consumer endpoint + // + from("olingo4://read/People?filterAlreadySeen=true&consumer.delay=2&consumer.sendEmptyMessageWhenIdle=true") + .to("mock:consumer-alreadyseen"); } }; } diff --git a/platforms/spring-boot/components-starter/camel-olingo2-starter/src/main/java/org/apache/camel/component/olingo2/springboot/Olingo2ComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-olingo2-starter/src/main/java/org/apache/camel/component/olingo2/springboot/Olingo2ComponentConfiguration.java index 39063e3..cf25303 100644 --- a/platforms/spring-boot/components-starter/camel-olingo2-starter/src/main/java/org/apache/camel/component/olingo2/springboot/Olingo2ComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-olingo2-starter/src/main/java/org/apache/camel/component/olingo2/springboot/Olingo2ComponentConfiguration.java @@ -140,6 +140,11 @@ public class Olingo2ComponentConfiguration * builder, otherwise OData requests could block indefinitely */ private HttpClientBuilder httpClientBuilder; + /** + * Set this to true to filter out results that have already been + * communicated by this component. + */ + private Boolean filterAlreadySeen = false; public Olingo2ApiName getApiName() { return apiName; @@ -230,5 +235,13 @@ public class Olingo2ComponentConfiguration public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) { this.httpClientBuilder = httpClientBuilder; } + + public Boolean getFilterAlreadySeen() { + return filterAlreadySeen; + } + + public void setFilterAlreadySeen(Boolean filterAlreadySeen) { + this.filterAlreadySeen = filterAlreadySeen; + } } } \ No newline at end of file diff --git a/platforms/spring-boot/components-starter/camel-olingo4-starter/src/main/java/org/apache/camel/component/olingo4/springboot/Olingo4ComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-olingo4-starter/src/main/java/org/apache/camel/component/olingo4/springboot/Olingo4ComponentConfiguration.java index 61356c5..8584f3b 100644 --- a/platforms/spring-boot/components-starter/camel-olingo4-starter/src/main/java/org/apache/camel/component/olingo4/springboot/Olingo4ComponentConfiguration.java +++ b/platforms/spring-boot/components-starter/camel-olingo4-starter/src/main/java/org/apache/camel/component/olingo4/springboot/Olingo4ComponentConfiguration.java @@ -140,6 +140,11 @@ public class Olingo4ComponentConfiguration * builder, otherwise OData requests could block indefinitely */ private HttpClientBuilder httpClientBuilder; + /** + * Set this to true to filter out results that have already been + * communicated by this component. + */ + private Boolean filterAlreadySeen = false; public Olingo4ApiName getApiName() { return apiName; @@ -230,5 +235,13 @@ public class Olingo4ComponentConfiguration public void setHttpClientBuilder(HttpClientBuilder httpClientBuilder) { this.httpClientBuilder = httpClientBuilder; } + + public Boolean getFilterAlreadySeen() { + return filterAlreadySeen; + } + + public void setFilterAlreadySeen(Boolean filterAlreadySeen) { + this.filterAlreadySeen = filterAlreadySeen; + } } } \ No newline at end of file