This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit 4413f2a53613e8c6679de9c4828b83a813ddbc70 Author: jmerljak <jmerl...@users.noreply.github.com> AuthorDate: Wed Jan 13 10:32:13 2021 +0100 CAMEL-15961: Implement basic Google Calendar synchronization flow --- .../stream/GoogleCalendarStreamConsumer.java | 61 ++++++++++++++++++++-- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java index b9bce9ef..8111c9f 100644 --- a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java +++ b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java @@ -22,7 +22,9 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.util.DateTime; +import com.google.api.client.util.Preconditions; import com.google.api.services.calendar.Calendar; import com.google.api.services.calendar.model.Event; import com.google.api.services.calendar.model.Events; @@ -32,14 +34,23 @@ import org.apache.camel.Processor; import org.apache.camel.support.ScheduledBatchPollingConsumer; import org.apache.camel.util.CastUtils; import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The GoogleCalendar consumer. */ public class GoogleCalendarStreamConsumer extends ScheduledBatchPollingConsumer { + private static final Logger LOG = LoggerFactory.getLogger(GoogleCalendarStreamConsumer.class); + private DateTime lastUpdate; + // sync and page tokens for synchronization flow + // see https://developers.google.com/calendar/v3/sync + private String syncToken; + private String pageToken; + public GoogleCalendarStreamConsumer(Endpoint endpoint, Processor processor) { super(endpoint, processor); } @@ -59,19 +70,21 @@ public class GoogleCalendarStreamConsumer extends ScheduledBatchPollingConsumer @Override protected int poll() throws Exception { - com.google.api.services.calendar.Calendar.Events.List request - = getClient().events().list(getConfiguration().getCalendarId()).setOrderBy("updated"); + Calendar.Events.List request = getClient().events().list(getConfiguration().getCalendarId()); if (ObjectHelper.isNotEmpty(getConfiguration().getQuery())) { + Preconditions.checkArgument(!getConfiguration().isSyncFlow(), "query is incompatible with sync flow."); request.setQ(getConfiguration().getQuery()); } if (ObjectHelper.isNotEmpty(getConfiguration().getMaxResults())) { request.setMaxResults(getConfiguration().getMaxResults()); } - if (getConfiguration().isConsumeFromNow()) { + // in synchronization flow only set timeMin on first request + if (getConfiguration().isConsumeFromNow() && syncToken == null) { Date date = new Date(); request.setTimeMin(new DateTime(date)); } if (getConfiguration().isConsiderLastUpdate()) { + Preconditions.checkArgument(!getConfiguration().isSyncFlow(), "considerLastUpdate is incompatible with sync flow."); if (ObjectHelper.isNotEmpty(lastUpdate)) { request.setUpdatedMin(lastUpdate); } @@ -80,7 +93,47 @@ public class GoogleCalendarStreamConsumer extends ScheduledBatchPollingConsumer Queue<Exchange> answer = new LinkedList<>(); List<Date> dateList = new ArrayList<>(); - Events c = request.execute(); + Events c; + + if (getConfiguration().isSyncFlow()) { + if (syncToken == null && pageToken == null) { + LOG.info("Performing full sync."); + } else if (pageToken != null) { + LOG.info("Requesting next page."); + } else { + LOG.info("Performing incremental sync."); + } + + request.setSyncToken(syncToken); + request.setPageToken(pageToken); + + try { + c = request.execute(); + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() == 410) { + // A 410 status code, "Gone", indicates that the sync token is invalid. + LOG.info("Invalid sync token, clearing sync and page tokens and re-syncing."); + syncToken = null; + pageToken = null; + return poll(); + } else { + throw e; + } + } + + if (c.getItems().isEmpty()) { + LOG.info("No new events to sync."); + } + + pageToken = c.getNextPageToken(); + if (c.getNextSyncToken() != null) { + // Store the sync token from the last request to be used during the next execution. + syncToken = c.getNextSyncToken(); + LOG.info("Sync complete."); + } + } else { + c = request.setOrderBy("updated").execute(); + } if (c != null) { List<Event> list = c.getItems();