This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4413f2a53613e8c6679de9c4828b83a813ddbc70
Author: jmerljak <jmerl...@users.noreply.github.com>
AuthorDate: Wed Jan 13 10:32:13 2021 +0100

    CAMEL-15961: Implement basic Google Calendar synchronization flow
---
 .../stream/GoogleCalendarStreamConsumer.java       | 61 ++++++++++++++++++++--
 1 file changed, 57 insertions(+), 4 deletions(-)

diff --git 
a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
 
b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
index b9bce9ef..8111c9f 100644
--- 
a/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
+++ 
b/components/camel-google-calendar/src/main/java/org/apache/camel/component/google/calendar/stream/GoogleCalendarStreamConsumer.java
@@ -22,7 +22,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import com.google.api.client.googleapis.json.GoogleJsonResponseException;
 import com.google.api.client.util.DateTime;
+import com.google.api.client.util.Preconditions;
 import com.google.api.services.calendar.Calendar;
 import com.google.api.services.calendar.model.Event;
 import com.google.api.services.calendar.model.Events;
@@ -32,14 +34,23 @@ import org.apache.camel.Processor;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The GoogleCalendar consumer.
  */
 public class GoogleCalendarStreamConsumer extends 
ScheduledBatchPollingConsumer {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(GoogleCalendarStreamConsumer.class);
+
     private DateTime lastUpdate;
 
+    // sync and page tokens for synchronization flow
+    // see https://developers.google.com/calendar/v3/sync
+    private String syncToken;
+    private String pageToken;
+
     public GoogleCalendarStreamConsumer(Endpoint endpoint, Processor 
processor) {
         super(endpoint, processor);
     }
@@ -59,19 +70,21 @@ public class GoogleCalendarStreamConsumer extends 
ScheduledBatchPollingConsumer
 
     @Override
     protected int poll() throws Exception {
-        com.google.api.services.calendar.Calendar.Events.List request
-                = 
getClient().events().list(getConfiguration().getCalendarId()).setOrderBy("updated");
+        Calendar.Events.List request = 
getClient().events().list(getConfiguration().getCalendarId());
         if (ObjectHelper.isNotEmpty(getConfiguration().getQuery())) {
+            Preconditions.checkArgument(!getConfiguration().isSyncFlow(), 
"query is incompatible with sync flow.");
             request.setQ(getConfiguration().getQuery());
         }
         if (ObjectHelper.isNotEmpty(getConfiguration().getMaxResults())) {
             request.setMaxResults(getConfiguration().getMaxResults());
         }
-        if (getConfiguration().isConsumeFromNow()) {
+        // in synchronization flow only set timeMin on first request
+        if (getConfiguration().isConsumeFromNow() && syncToken == null) {
             Date date = new Date();
             request.setTimeMin(new DateTime(date));
         }
         if (getConfiguration().isConsiderLastUpdate()) {
+            Preconditions.checkArgument(!getConfiguration().isSyncFlow(), 
"considerLastUpdate is incompatible with sync flow.");
             if (ObjectHelper.isNotEmpty(lastUpdate)) {
                 request.setUpdatedMin(lastUpdate);
             }
@@ -80,7 +93,47 @@ public class GoogleCalendarStreamConsumer extends 
ScheduledBatchPollingConsumer
         Queue<Exchange> answer = new LinkedList<>();
         List<Date> dateList = new ArrayList<>();
 
-        Events c = request.execute();
+        Events c;
+
+        if (getConfiguration().isSyncFlow()) {
+            if (syncToken == null && pageToken == null) {
+                LOG.info("Performing full sync.");
+            } else if (pageToken != null) {
+                LOG.info("Requesting next page.");
+            } else {
+                LOG.info("Performing incremental sync.");
+            }
+
+            request.setSyncToken(syncToken);
+            request.setPageToken(pageToken);
+
+            try {
+                c = request.execute();
+            } catch (GoogleJsonResponseException e) {
+                if (e.getStatusCode() == 410) {
+                    // A 410 status code, "Gone", indicates that the sync 
token is invalid.
+                    LOG.info("Invalid sync token, clearing sync and page 
tokens and re-syncing.");
+                    syncToken = null;
+                    pageToken = null;
+                    return poll();
+                } else {
+                    throw e;
+                }
+            }
+
+            if (c.getItems().isEmpty()) {
+                LOG.info("No new events to sync.");
+            }
+
+            pageToken = c.getNextPageToken();
+            if (c.getNextSyncToken() != null) {
+                // Store the sync token from the last request to be used 
during the next execution.
+                syncToken = c.getNextSyncToken();
+                LOG.info("Sync complete.");
+            }
+        } else {
+            c = request.setOrderBy("updated").execute();
+        }
 
         if (c != null) {
             List<Event> list = c.getItems();

Reply via email to