ignite-734: implemented ip finder

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/546ec8e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/546ec8e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/546ec8e5

Branch: refs/heads/ignite-757
Commit: 546ec8e53017db73dbcbb66d7b4f7807ae1f69b1
Parents: 4268e47
Author: Denis Magda <dma...@gridgain.com>
Authored: Wed Apr 22 15:23:00 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Wed Apr 22 15:23:00 2015 +0300

----------------------------------------------------------------------
 modules/gce/pom.xml                             |   7 +
 .../google/TcpDiscoveryGoogleCloudIpFinder.java | 256 ++++++++++++++-----
 2 files changed, 206 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546ec8e5/modules/gce/pom.xml
----------------------------------------------------------------------
diff --git a/modules/gce/pom.xml b/modules/gce/pom.xml
index 1a11614..d4452fc 100644
--- a/modules/gce/pom.xml
+++ b/modules/gce/pom.xml
@@ -47,6 +47,13 @@
             <artifactId>google-api-client</artifactId>
             <version>1.19.1</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.google.apis</groupId>
+            <artifactId>google-api-services-storage</artifactId>
+            <version>v1-rev32-1.20.0</version>
+        </dependency>
+
     </dependencies>
 
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546ec8e5/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java
----------------------------------------------------------------------
diff --git 
a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java
 
b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java
index 2d6f9d7..3bfd8f3 100644
--- 
a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java
+++ 
b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java
@@ -18,13 +18,18 @@
 package org.apache.ignite.spi.discovery.tcp.ipfinder.google;
 
 import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.InputStreamContent;
 import com.google.api.client.http.javanet.NetHttpTransport;
-import org.apache.ignite.*;
-import org.apache.ignite.resources.*;
+import com.google.api.services.storage.Storage;
+import com.google.api.services.storage.StorageScopes;
+import com.google.api.services.storage.model.*;
+import com.google.common.collect.ImmutableMap;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.*;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
 
-import com.google.api.client.auth.oauth2.*;
 import com.google.api.client.googleapis.auth.oauth2.*;
 import com.google.api.client.json.jackson2.*;
 
@@ -32,6 +37,8 @@ import java.io.*;
 import java.net.*;
 import java.security.GeneralSecurityException;
 import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Google Cloud Storage based IP finder.
@@ -39,9 +46,8 @@ import java.util.*;
  * TODO: complete
  */
 public class TcpDiscoveryGoogleCloudIpFinder extends 
TcpDiscoveryIpFinderAdapter {
-    /** Grid logger. */
-    @LoggerResource
-    private IgniteLogger log;
+    /* Default object's content. */
+    private final static ByteArrayInputStream OBJECT_CONTENT =  new 
ByteArrayInputStream(new byte[1]);
 
     /* Google Cloud Platform's project name.*/
     private String projectName;
@@ -49,11 +55,20 @@ public class TcpDiscoveryGoogleCloudIpFinder extends 
TcpDiscoveryIpFinderAdapter
     /* Google Cloud Platform's bucket name. */
     private String bucketName;
 
-    /* Google Cloud Platform's secret file name. */
-    private String secretsFileName;
+    /* Service account p12 private key file name. */
+    private String serviceAccountP12FilePath;
 
-    /* Google HTTP transport. */
-    private NetHttpTransport httpTransport;
+    /* Service account id. */
+    private String serviceAccountId;
+
+    /* Google storage. */
+    private Storage storage;
+
+    /* Init routine guard. */
+    private final AtomicBoolean initGuard = new AtomicBoolean();
+
+    /* Init routine latch. */
+    private final CountDownLatch initLatch = new CountDownLatch(1);
 
     public TcpDiscoveryGoogleCloudIpFinder() {
         setShared(true);
@@ -61,17 +76,81 @@ public class TcpDiscoveryGoogleCloudIpFinder extends 
TcpDiscoveryIpFinderAdapter
 
     /** {@inheritDoc} */
     @Override public Collection<InetSocketAddress> getRegisteredAddresses() 
throws IgniteSpiException {
-        return null;
+        init();
+
+        Collection<InetSocketAddress> addrs = new LinkedList<>();
+
+        try {
+            Storage.Objects.List listObjects = 
storage.objects().list(bucketName);
+
+            com.google.api.services.storage.model.Objects objects;
+
+            do {
+                objects = listObjects.execute();
+
+                if (objects == null || objects.getItems() == null)
+                    break;
+
+                for (StorageObject object : objects.getItems())
+                    addrs.add(addrFromString(object.getName()));
+
+                listObjects.setPageToken(objects.getNextPageToken());
+            } while (null != objects.getNextPageToken());
+        }
+        catch (Exception e) {
+            throw new IgniteSpiException("Failed to get content from the 
bucket: " + bucketName, e);
+        }
+
+        return addrs;
     }
 
     /** {@inheritDoc} */
     @Override public void registerAddresses(Collection<InetSocketAddress> 
addrs) throws IgniteSpiException {
+        assert !F.isEmpty(addrs);
+
+        init();
+
+        for (InetSocketAddress addr : addrs) {
+            String key = keyFromAddr(addr);
 
+            StorageObject object = new StorageObject();
+
+            object.setBucket(bucketName);
+            object.setName(key);
+
+            InputStreamContent content =  new 
InputStreamContent("application/octet-stream", OBJECT_CONTENT);
+            content.setLength(OBJECT_CONTENT.available());
+
+            try {
+                Storage.Objects.Insert insertObject = 
storage.objects().insert(bucketName, object, content);
+
+                insertObject.execute();
+            }
+            catch (Exception e) {
+                throw new IgniteSpiException("Failed to put entry 
[bucketName=" + bucketName +
+                    ", entry=" + key + ']', e);
+            }
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void unregisterAddresses(Collection<InetSocketAddress> 
addrs) throws IgniteSpiException {
+        assert !F.isEmpty(addrs);
+
+        init();
+
+        for (InetSocketAddress addr : addrs) {
+            String key = keyFromAddr(addr);
+
+            try {
+                Storage.Objects.Delete deleteObject = 
storage.objects().delete(bucketName, key);
 
+                deleteObject.execute();
+            } catch (Exception e) {
+                throw new IgniteSpiException("Failed to delete entry 
[bucketName=" + bucketName +
+                                                     ", entry=" + key + ']', 
e);
+            }
+        }
     }
 
     /**
@@ -97,65 +176,128 @@ public class TcpDiscoveryGoogleCloudIpFinder extends 
TcpDiscoveryIpFinderAdapter
         this.bucketName = bucketName;
     }
 
-    /**
-     * File name of a client secrets file that is used to authenticate with 
Google Cloud Platform.
-     * Client secrets (JSON) file can be created and downloaded from the 
Google Developers Console.
-     * Note that the file must correspond to a Service Account.
-     * <p>
-     * For details refer to Google Cloud Platform Documentation.
-     *
-     * @param secretsFileName Client secrets file name.
-     */
     @IgniteSpiConfiguration(optional = false)
-    public void setSecretsFileName(String secretsFileName) {
-        this.secretsFileName = secretsFileName;
+    public void setServiceAccountP12FilePath(String p12FileName) {
+        this.serviceAccountP12FilePath = p12FileName;
+    }
+
+    @IgniteSpiConfiguration(optional = false)
+    public void setServiceAccountId(String id) {
+        this.serviceAccountId = id;
     }
 
     /**
      *
      */
-    private AuthorizationCodeInstalledApp Credential authorize() throws 
IgniteSpiException {
-        GoogleClientSecrets clientSecrets = null;
+    private void init() throws IgniteSpiException {
+        if (initGuard.compareAndSet(false, true)) {
+            try {
+                NetHttpTransport httpTransport;
 
-        try {
-            clientSecrets = 
GoogleClientSecrets.load(JacksonFactory.getDefaultInstance(),
-                new InputStreamReader(new FileInputStream(new 
File(secretsFileName))));
+                try {
+                    httpTransport = 
GoogleNetHttpTransport.newTrustedTransport();
+                } catch (GeneralSecurityException | IOException e) {
+                    throw new IgniteSpiException(e);
+                }
+
+                GoogleCredential credential;
+
+                try {
+                    credential = new 
GoogleCredential.Builder().setTransport(httpTransport)
+                        
.setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId)
+                        .setServiceAccountPrivateKeyFromP12File(new 
File(serviceAccountP12FilePath))
+                        
.setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_READ_WRITE)).build();
+                } catch (Exception e) {
+                    throw new IgniteSpiException("Failed to authenticate on 
Google Cloud Platform", e);
+                }
 
-            if (clientSecrets.getDetails().getClientId() == null ||
-                    clientSecrets.getDetails().getClientSecret() == null)
-                throw new IgniteSpiException("Client secrets file is not well 
formed.");
+                try {
+                    storage = new Storage.Builder(httpTransport, 
JacksonFactory.getDefaultInstance(), credential)
+                                      .setApplicationName(projectName).build();
+                } catch (Exception e) {
+                    throw new IgniteSpiException("Failed to open a storage for 
given project name: " + projectName);
+                }
+            }
+            finally {
+                initLatch.countDown();
+            }
+        }
+        else {
+            try {
+                U.await(initLatch);
+            } catch (IgniteInterruptedCheckedException e) {
+                throw new IgniteSpiException("Thread has been interrupted.", 
e);
+            }
 
-        } catch (Exception e) {
-            throw new IgniteSpiException("Failed to load client secrets JSON 
file.", e);
+            if (storage == null)
+                throw new IgniteSpiException("IpFinder has not been 
initialized properly");
         }
+    }
+
+    private String keyFromAddr(InetSocketAddress addr) {
+        return addr.getAddress().getHostAddress() + ":" +  addr.getPort();
+    }
 
+    private InetSocketAddress addrFromString(String address) throws 
IgniteSpiException {
+        String[] res = address.split(":");
+
+        if (res.length != 2)
+            throw new IgniteSpiException("Invalid address string: " + address);
+
+
+        int port;
         try {
-            httpTransport = GoogleNetHttpTransport.newTrustedTransport();
-        } catch (GeneralSecurityException | IOException e ) {
-            throw new IgniteSpiException(e);
+            port = Integer.parseInt(res[1]);
+        }
+        catch (NumberFormatException e) {
+            throw new IgniteSpiException("Invalid port number: " + res[1]);
         }
 
-        // Set up authorization code flow.
-        // Ask for only the permissions you need. Asking for more permissions 
will
-        // reduce the number of users who finish the process for giving you 
access
-        // to their accounts. It will also increase the amount of effort you 
will
-        // have to spend explaining to users what you are doing with their 
data.
-        // Here we are listing all of the available scopes. You should remove 
scopes
-        // that you are not actually using.
-        Set<String> scopes = new HashSet<String>();
-        scopes.add(StorageScopes.DEVSTORAGE_FULL_CONTROL);
-        scopes.add(StorageScopes.DEVSTORAGE_READ_ONLY);
-        scopes.add(StorageScopes.DEVSTORAGE_READ_WRITE);
-
-        GoogleAuthorizationCodeFlow flow = new 
GoogleAuthorizationCodeFlow.Builder(
-            httpTransport, JSON_FACTORY, clientSecrets, scopes)
-                                                   
.setDataStoreFactory(dataStoreFactory)
-                                                   .build();
-
-        GooglePromtReceiver
-        // Authorize.
-        VerificationCodeReceiver receiver =
-                AUTH_LOCAL_WEBSERVER ? new LocalServerReceiver() : new 
GooglePromptReceiver();
-        return new AuthorizationCodeInstalledApp(flow, 
receiver).authorize("user");
+        return new InetSocketAddress(res[0], port);
     }
+
+
+
+    public static void main(String args[]) {
+        TcpDiscoveryGoogleCloudIpFinder ipFinder = new 
TcpDiscoveryGoogleCloudIpFinder();
+
+        String bucketName = "grid-gain-test-bucket1";
+
+        ipFinder.setBucketName(bucketName);
+        ipFinder.setProjectName("gridgain");
+        
ipFinder.setServiceAccountId("208709979073-v0mn6ttpd3mqu2b5lbhh1mvdet7os...@developer.gserviceaccount.com");
+        
ipFinder.setServiceAccountP12FilePath("C:\\ignite\\GCE\\gridgain-0889e44b58b7.p12");
+
+        List<InetSocketAddress> addresses = new LinkedList<>();
+        addresses.add(new InetSocketAddress("192.168.0.1", 23));
+        addresses.add(new InetSocketAddress("192.168.0.1", 89));
+        addresses.add(new InetSocketAddress("92.68.0.1", 1223));
+
+        System.out.println("PUT ADDR");
+        ipFinder.registerAddresses(addresses);
+
+        Collection<InetSocketAddress> result = 
ipFinder.getRegisteredAddresses();
+        System.out.println("GET ADDR");
+
+        for (InetSocketAddress add: result) {
+            System.out.println(add.getAddress().getHostAddress() + ":" + 
add.getPort());
+        }
+
+        System.out.println("REMOVE");
+        ipFinder.unregisterAddresses(addresses);
+
+
+        result = ipFinder.getRegisteredAddresses();
+        System.out.println("GET ADDR 2");
+
+        for (InetSocketAddress add: result) {
+            System.out.println(add.getAddress().getHostAddress() + ":" + 
add.getPort());
+        }
+
+        List<InetSocketAddress> addresses2 = new LinkedList<>();
+        addresses.add(new InetSocketAddress("192.1638.02.1", 23));
+        ipFinder.unregisterAddresses(addresses2);
+    }
+
+
 }

Reply via email to