ignite-734: implemented ip finder
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/546ec8e5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/546ec8e5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/546ec8e5 Branch: refs/heads/ignite-757 Commit: 546ec8e53017db73dbcbb66d7b4f7807ae1f69b1 Parents: 4268e47 Author: Denis Magda <dma...@gridgain.com> Authored: Wed Apr 22 15:23:00 2015 +0300 Committer: Denis Magda <dma...@gridgain.com> Committed: Wed Apr 22 15:23:00 2015 +0300 ---------------------------------------------------------------------- modules/gce/pom.xml | 7 + .../google/TcpDiscoveryGoogleCloudIpFinder.java | 256 ++++++++++++++----- 2 files changed, 206 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546ec8e5/modules/gce/pom.xml ---------------------------------------------------------------------- diff --git a/modules/gce/pom.xml b/modules/gce/pom.xml index 1a11614..d4452fc 100644 --- a/modules/gce/pom.xml +++ b/modules/gce/pom.xml @@ -47,6 +47,13 @@ <artifactId>google-api-client</artifactId> <version>1.19.1</version> </dependency> + + <dependency> + <groupId>com.google.apis</groupId> + <artifactId>google-api-services-storage</artifactId> + <version>v1-rev32-1.20.0</version> + </dependency> + </dependencies> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/546ec8e5/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java ---------------------------------------------------------------------- diff --git a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java index 2d6f9d7..3bfd8f3 100644 --- a/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java +++ b/modules/gce/src/main/java/org/apache/ignite/spi/discovery/tcp/ipfinder/google/TcpDiscoveryGoogleCloudIpFinder.java @@ -18,13 +18,18 @@ package org.apache.ignite.spi.discovery.tcp.ipfinder.google; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; +import com.google.api.client.http.InputStreamContent; import com.google.api.client.http.javanet.NetHttpTransport; -import org.apache.ignite.*; -import org.apache.ignite.resources.*; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.StorageScopes; +import com.google.api.services.storage.model.*; +import com.google.common.collect.ImmutableMap; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.*; import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import com.google.api.client.auth.oauth2.*; import com.google.api.client.googleapis.auth.oauth2.*; import com.google.api.client.json.jackson2.*; @@ -32,6 +37,8 @@ import java.io.*; import java.net.*; import java.security.GeneralSecurityException; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; /** * Google Cloud Storage based IP finder. @@ -39,9 +46,8 @@ import java.util.*; * TODO: complete */ public class TcpDiscoveryGoogleCloudIpFinder extends TcpDiscoveryIpFinderAdapter { - /** Grid logger. */ - @LoggerResource - private IgniteLogger log; + /* Default object's content. */ + private final static ByteArrayInputStream OBJECT_CONTENT = new ByteArrayInputStream(new byte[1]); /* Google Cloud Platform's project name.*/ private String projectName; @@ -49,11 +55,20 @@ public class TcpDiscoveryGoogleCloudIpFinder extends TcpDiscoveryIpFinderAdapter /* Google Cloud Platform's bucket name. */ private String bucketName; - /* Google Cloud Platform's secret file name. */ - private String secretsFileName; + /* Service account p12 private key file name. */ + private String serviceAccountP12FilePath; - /* Google HTTP transport. */ - private NetHttpTransport httpTransport; + /* Service account id. */ + private String serviceAccountId; + + /* Google storage. */ + private Storage storage; + + /* Init routine guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /* Init routine latch. */ + private final CountDownLatch initLatch = new CountDownLatch(1); public TcpDiscoveryGoogleCloudIpFinder() { setShared(true); @@ -61,17 +76,81 @@ public class TcpDiscoveryGoogleCloudIpFinder extends TcpDiscoveryIpFinderAdapter /** {@inheritDoc} */ @Override public Collection<InetSocketAddress> getRegisteredAddresses() throws IgniteSpiException { - return null; + init(); + + Collection<InetSocketAddress> addrs = new LinkedList<>(); + + try { + Storage.Objects.List listObjects = storage.objects().list(bucketName); + + com.google.api.services.storage.model.Objects objects; + + do { + objects = listObjects.execute(); + + if (objects == null || objects.getItems() == null) + break; + + for (StorageObject object : objects.getItems()) + addrs.add(addrFromString(object.getName())); + + listObjects.setPageToken(objects.getNextPageToken()); + } while (null != objects.getNextPageToken()); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to get content from the bucket: " + bucketName, e); + } + + return addrs; } /** {@inheritDoc} */ @Override public void registerAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + for (InetSocketAddress addr : addrs) { + String key = keyFromAddr(addr); + StorageObject object = new StorageObject(); + + object.setBucket(bucketName); + object.setName(key); + + InputStreamContent content = new InputStreamContent("application/octet-stream", OBJECT_CONTENT); + content.setLength(OBJECT_CONTENT.available()); + + try { + Storage.Objects.Insert insertObject = storage.objects().insert(bucketName, object, content); + + insertObject.execute(); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to put entry [bucketName=" + bucketName + + ", entry=" + key + ']', e); + } + } } /** {@inheritDoc} */ @Override public void unregisterAddresses(Collection<InetSocketAddress> addrs) throws IgniteSpiException { + assert !F.isEmpty(addrs); + + init(); + + for (InetSocketAddress addr : addrs) { + String key = keyFromAddr(addr); + + try { + Storage.Objects.Delete deleteObject = storage.objects().delete(bucketName, key); + deleteObject.execute(); + } catch (Exception e) { + throw new IgniteSpiException("Failed to delete entry [bucketName=" + bucketName + + ", entry=" + key + ']', e); + } + } } /** @@ -97,65 +176,128 @@ public class TcpDiscoveryGoogleCloudIpFinder extends TcpDiscoveryIpFinderAdapter this.bucketName = bucketName; } - /** - * File name of a client secrets file that is used to authenticate with Google Cloud Platform. - * Client secrets (JSON) file can be created and downloaded from the Google Developers Console. - * Note that the file must correspond to a Service Account. - * <p> - * For details refer to Google Cloud Platform Documentation. - * - * @param secretsFileName Client secrets file name. - */ @IgniteSpiConfiguration(optional = false) - public void setSecretsFileName(String secretsFileName) { - this.secretsFileName = secretsFileName; + public void setServiceAccountP12FilePath(String p12FileName) { + this.serviceAccountP12FilePath = p12FileName; + } + + @IgniteSpiConfiguration(optional = false) + public void setServiceAccountId(String id) { + this.serviceAccountId = id; } /** * */ - private AuthorizationCodeInstalledApp Credential authorize() throws IgniteSpiException { - GoogleClientSecrets clientSecrets = null; + private void init() throws IgniteSpiException { + if (initGuard.compareAndSet(false, true)) { + try { + NetHttpTransport httpTransport; - try { - clientSecrets = GoogleClientSecrets.load(JacksonFactory.getDefaultInstance(), - new InputStreamReader(new FileInputStream(new File(secretsFileName)))); + try { + httpTransport = GoogleNetHttpTransport.newTrustedTransport(); + } catch (GeneralSecurityException | IOException e) { + throw new IgniteSpiException(e); + } + + GoogleCredential credential; + + try { + credential = new GoogleCredential.Builder().setTransport(httpTransport) + .setJsonFactory(JacksonFactory.getDefaultInstance()).setServiceAccountId(serviceAccountId) + .setServiceAccountPrivateKeyFromP12File(new File(serviceAccountP12FilePath)) + .setServiceAccountScopes(Collections.singleton(StorageScopes.DEVSTORAGE_READ_WRITE)).build(); + } catch (Exception e) { + throw new IgniteSpiException("Failed to authenticate on Google Cloud Platform", e); + } - if (clientSecrets.getDetails().getClientId() == null || - clientSecrets.getDetails().getClientSecret() == null) - throw new IgniteSpiException("Client secrets file is not well formed."); + try { + storage = new Storage.Builder(httpTransport, JacksonFactory.getDefaultInstance(), credential) + .setApplicationName(projectName).build(); + } catch (Exception e) { + throw new IgniteSpiException("Failed to open a storage for given project name: " + projectName); + } + } + finally { + initLatch.countDown(); + } + } + else { + try { + U.await(initLatch); + } catch (IgniteInterruptedCheckedException e) { + throw new IgniteSpiException("Thread has been interrupted.", e); + } - } catch (Exception e) { - throw new IgniteSpiException("Failed to load client secrets JSON file.", e); + if (storage == null) + throw new IgniteSpiException("IpFinder has not been initialized properly"); } + } + + private String keyFromAddr(InetSocketAddress addr) { + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + private InetSocketAddress addrFromString(String address) throws IgniteSpiException { + String[] res = address.split(":"); + + if (res.length != 2) + throw new IgniteSpiException("Invalid address string: " + address); + + + int port; try { - httpTransport = GoogleNetHttpTransport.newTrustedTransport(); - } catch (GeneralSecurityException | IOException e ) { - throw new IgniteSpiException(e); + port = Integer.parseInt(res[1]); + } + catch (NumberFormatException e) { + throw new IgniteSpiException("Invalid port number: " + res[1]); } - // Set up authorization code flow. - // Ask for only the permissions you need. Asking for more permissions will - // reduce the number of users who finish the process for giving you access - // to their accounts. It will also increase the amount of effort you will - // have to spend explaining to users what you are doing with their data. - // Here we are listing all of the available scopes. You should remove scopes - // that you are not actually using. - Set<String> scopes = new HashSet<String>(); - scopes.add(StorageScopes.DEVSTORAGE_FULL_CONTROL); - scopes.add(StorageScopes.DEVSTORAGE_READ_ONLY); - scopes.add(StorageScopes.DEVSTORAGE_READ_WRITE); - - GoogleAuthorizationCodeFlow flow = new GoogleAuthorizationCodeFlow.Builder( - httpTransport, JSON_FACTORY, clientSecrets, scopes) - .setDataStoreFactory(dataStoreFactory) - .build(); - - GooglePromtReceiver - // Authorize. - VerificationCodeReceiver receiver = - AUTH_LOCAL_WEBSERVER ? new LocalServerReceiver() : new GooglePromptReceiver(); - return new AuthorizationCodeInstalledApp(flow, receiver).authorize("user"); + return new InetSocketAddress(res[0], port); } + + + + public static void main(String args[]) { + TcpDiscoveryGoogleCloudIpFinder ipFinder = new TcpDiscoveryGoogleCloudIpFinder(); + + String bucketName = "grid-gain-test-bucket1"; + + ipFinder.setBucketName(bucketName); + ipFinder.setProjectName("gridgain"); + ipFinder.setServiceAccountId("208709979073-v0mn6ttpd3mqu2b5lbhh1mvdet7os...@developer.gserviceaccount.com"); + ipFinder.setServiceAccountP12FilePath("C:\\ignite\\GCE\\gridgain-0889e44b58b7.p12"); + + List<InetSocketAddress> addresses = new LinkedList<>(); + addresses.add(new InetSocketAddress("192.168.0.1", 23)); + addresses.add(new InetSocketAddress("192.168.0.1", 89)); + addresses.add(new InetSocketAddress("92.68.0.1", 1223)); + + System.out.println("PUT ADDR"); + ipFinder.registerAddresses(addresses); + + Collection<InetSocketAddress> result = ipFinder.getRegisteredAddresses(); + System.out.println("GET ADDR"); + + for (InetSocketAddress add: result) { + System.out.println(add.getAddress().getHostAddress() + ":" + add.getPort()); + } + + System.out.println("REMOVE"); + ipFinder.unregisterAddresses(addresses); + + + result = ipFinder.getRegisteredAddresses(); + System.out.println("GET ADDR 2"); + + for (InetSocketAddress add: result) { + System.out.println(add.getAddress().getHostAddress() + ":" + add.getPort()); + } + + List<InetSocketAddress> addresses2 = new LinkedList<>(); + addresses.add(new InetSocketAddress("192.1638.02.1", 23)); + ipFinder.unregisterAddresses(addresses2); + } + + }