Repository: zeppelin Updated Branches: refs/heads/branch-0.7 69e70d515 -> 4dfb81500
[ZEPPELIN-2318][branch-0.7] Fix proxy configuration for http client of zeppelinhub storage layer ### What is this PR for? this is to resolve this issue for `branch-0.7` since original PR #2198 had conflicts with the branch. for more details check the original pr ### What type of PR is it? Bug Fix | Improvement ### Todos * [x] - Task ### What is the Jira issue? [ZEPPELIN-2318](https://issues.apache.org/jira/browse/ZEPPELIN-2318) ### How should this be tested? check in #2198 ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Khalid Huseynov <khalid...@gmail.com> Author: LeiWang <wanglei6...@163.com> Closes #2247 from khalidhuseynov/fix/branch-0.7-ZEPPELIN-2318 and squashes the following commits: ec23c958a [Khalid Huseynov] edge case logs from error -> warn e4db79ca0 [Khalid Huseynov] fix log d8efb462f [Khalid Huseynov] fix websocket timing e6622398d [Khalid Huseynov] add ssl setup 8f8109eaa [Khalid Huseynov] add close routine 7f3cd5040 [Khalid Huseynov] jetty client relay to asyncclient when proxy on a65f73556 [Khalid Huseynov] add proxy client with asynclient library 6add668ff [Khalid Huseynov] add dependency in pom, resolve coflict d4cacadaf [LeiWang] fix bugs for timer saver Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/4dfb8150 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/4dfb8150 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/4dfb8150 Branch: refs/heads/branch-0.7 Commit: 4dfb815004c3d5a611c3ce16c6f6050813e24f38 Parents: 69e70d5 Author: Khalid Huseynov <khalid...@gmail.com> Authored: Wed Mar 29 16:54:38 2017 +0900 Committer: Khalid Huseynov <khalid...@gmail.com> Committed: Thu Apr 20 15:04:15 2017 +0900 ---------------------------------------------------------------------- pom.xml | 7 + zeppelin-zengine/pom.xml | 5 + .../repo/zeppelinhub/ZeppelinHubRepo.java | 3 +- .../repo/zeppelinhub/rest/HttpProxyClient.java | 212 +++++++++++++++++++ .../rest/ZeppelinhubRestApiHandler.java | 114 +++++++--- .../zeppelinhub/websocket/ZeppelinClient.java | 17 +- 6 files changed, 320 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 5010675..4df0094 100644 --- a/pom.xml +++ b/pom.xml @@ -97,6 +97,7 @@ <jetty.version>9.2.15.v20160210</jetty.version> <httpcomponents.core.version>4.3.3</httpcomponents.core.version> <httpcomponents.client.version>4.3.6</httpcomponents.client.version> + <httpcomponents.asyncclient.version>4.0.2</httpcomponents.asyncclient.version> <commons.lang.version>2.5</commons.lang.version> <commons.configuration.version>1.9</commons.configuration.version> <commons.codec.version>1.5</commons.codec.version> @@ -174,6 +175,12 @@ </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + <version>${httpcomponents.asyncclient.version}</version> + </dependency> + + <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>${commons.lang.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index efe1c8c..730ada8 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -114,6 +114,11 @@ </dependency> <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpasyncclient</artifactId> + </dependency> + + <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-s3</artifactId> <version>${aws.sdk.s3.version}</version> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java index 2f33f6f..cd94180 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/ZeppelinHubRepo.java @@ -217,7 +217,8 @@ public class ZeppelinHubRepo implements NotebookRepo { @Override public void close() { - //websocketClient.stop(); + websocketClient.stop(); + restApiClient.close(); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java new file mode 100644 index 0000000..690a8b6 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/HttpProxyClient.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub.rest; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import javax.net.ssl.SSLContext; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpHead; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.config.Registry; +import org.apache.http.config.RegistryBuilder; +import org.apache.http.conn.ssl.BrowserCompatHostnameVerifier; +import org.apache.http.conn.ssl.SSLContexts; +import org.apache.http.conn.ssl.X509HostnameVerifier; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.http.impl.nio.client.HttpAsyncClients; +import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.nio.conn.NoopIOSessionStrategy; +import org.apache.http.nio.conn.SchemeIOSessionStrategy; +import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.nio.reactor.IOReactorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is http client class for the case of proxy usage + * jetty-client has issue with https over proxy for 9.2.x + * https://github.com/eclipse/jetty.project/issues/408 + * https://github.com/eclipse/jetty.project/issues/827 + * + */ + +public class HttpProxyClient { + private static final Logger LOG = LoggerFactory.getLogger(HttpProxyClient.class); + public static final String ZEPPELIN_TOKEN_HEADER = "X-Zeppelin-Token"; + + private CloseableHttpAsyncClient client; + private URI proxyUri; + + public static HttpProxyClient newInstance(URI proxyUri) { + return new HttpProxyClient(proxyUri); + } + + private HttpProxyClient(URI uri) { + this.proxyUri = uri; + + client = getAsyncProxyHttpClient(proxyUri); + client.start(); + } + + public URI getProxyUri() { + return proxyUri; + } + + private CloseableHttpAsyncClient getAsyncProxyHttpClient(URI proxyUri) { + LOG.info("Creating async proxy http client"); + PoolingNHttpClientConnectionManager cm = getAsyncConnectionManager(); + HttpHost proxy = new HttpHost(proxyUri.getHost(), proxyUri.getPort()); + + HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom(); + if (cm != null) { + clientBuilder = clientBuilder.setConnectionManager(cm); + } + + if (proxy != null) { + clientBuilder = clientBuilder.setProxy(proxy); + } + clientBuilder = setRedirects(clientBuilder); + return clientBuilder.build(); + } + + private PoolingNHttpClientConnectionManager getAsyncConnectionManager() { + ConnectingIOReactor ioReactor = null; + PoolingNHttpClientConnectionManager cm = null; + try { + ioReactor = new DefaultConnectingIOReactor(); + // ssl setup + SSLContext sslcontext = SSLContexts.createSystemDefault(); + X509HostnameVerifier hostnameVerifier = new BrowserCompatHostnameVerifier(); + @SuppressWarnings("deprecation") + Registry<SchemeIOSessionStrategy> sessionStrategyRegistry = RegistryBuilder + .<SchemeIOSessionStrategy>create() + .register("http", NoopIOSessionStrategy.INSTANCE) + .register("https", new SSLIOSessionStrategy(sslcontext, hostnameVerifier)) + .build(); + + cm = new PoolingNHttpClientConnectionManager(ioReactor, sessionStrategyRegistry); + } catch (IOReactorException e) { + LOG.error("Couldn't initialize multi-threaded async client ", e); + return null; + } + return cm; + } + + private HttpAsyncClientBuilder setRedirects(HttpAsyncClientBuilder clientBuilder) { + clientBuilder.setRedirectStrategy(new DefaultRedirectStrategy() { + /** Redirectable methods. */ + private String[] REDIRECT_METHODS = new String[] { + HttpGet.METHOD_NAME, HttpPost.METHOD_NAME, + HttpPut.METHOD_NAME, HttpDelete.METHOD_NAME, HttpHead.METHOD_NAME + }; + + @Override + protected boolean isRedirectable(String method) { + for (String m : REDIRECT_METHODS) { + if (m.equalsIgnoreCase(method)) { + return true; + } + } + return false; + } + }); + return clientBuilder; + } + + public String sendToZeppelinHub(HttpRequestBase request, + boolean withResponse) throws IOException { + return withResponse ? + sendAndGetResponse(request) : sendWithoutResponseBody(request); + } + + + private String sendWithoutResponseBody(HttpRequestBase request) throws IOException { + FutureCallback<HttpResponse> callback = getCallback(request); + client.execute(request, callback); + return StringUtils.EMPTY; + } + + private String sendAndGetResponse(HttpRequestBase request) throws IOException { + String data = StringUtils.EMPTY; + try { + HttpResponse response = client.execute(request, null).get(30, TimeUnit.SECONDS); + int code = response.getStatusLine().getStatusCode(); + if (code == 200) { + try (InputStream responseContent = response.getEntity().getContent()) { + data = IOUtils.toString(responseContent, "UTF-8"); + } + } else { + LOG.error("ZeppelinHub {} {} returned with status {} ", request.getMethod(), + request.getURI(), code); + throw new IOException("Cannot perform " + request.getMethod() + " request to ZeppelinHub"); + } + } catch (InterruptedException | ExecutionException | TimeoutException + | NullPointerException e) { + throw new IOException(e); + } + return data; + } + + private FutureCallback<HttpResponse> getCallback(final HttpRequestBase request) { + return new FutureCallback<HttpResponse>() { + + public void completed(final HttpResponse response) { + request.releaseConnection(); + LOG.info("Note {} completed with {} status", request.getMethod(), + response.getStatusLine()); + } + + public void failed(final Exception ex) { + request.releaseConnection(); + LOG.error("Note {} failed with {} message", request.getMethod(), + ex.getMessage()); + } + + public void cancelled() { + request.releaseConnection(); + LOG.info("Note {} was canceled", request.getMethod()); + } + }; + } + + public void stop() { + try { + client.close(); + } catch (Exception e) { + LOG.error("Failed to close proxy client ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java index f2ae7b9..437386c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/rest/ZeppelinhubRestApiHandler.java @@ -19,6 +19,8 @@ package org.apache.zeppelin.notebook.repo.zeppelinhub.rest; import java.io.IOException; import java.io.InputStream; import java.lang.reflect.Type; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -27,6 +29,12 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance; import org.eclipse.jetty.client.HttpClient; import org.eclipse.jetty.client.api.Request; @@ -52,11 +60,10 @@ public class ZeppelinhubRestApiHandler { private static final String USER_SESSION_HEADER = "X-User-Session"; private static final String DEFAULT_API_PATH = "/api/v1/zeppelin"; private static boolean PROXY_ON = false; - private static String PROXY_HOST; - private static int PROXY_PORT; - + //TODO(xxx): possibly switch to jetty-client > 9.3.12 when adopt jvm 1.8 + private static HttpProxyClient proxyClient; private final HttpClient client; - private final String zepelinhubUrl; + private String zepelinhubUrl; public static ZeppelinhubRestApiHandler newInstance(String zeppelinhubUrl) { return new ZeppelinhubRestApiHandler(zeppelinhubUrl); @@ -65,8 +72,7 @@ public class ZeppelinhubRestApiHandler { private ZeppelinhubRestApiHandler(String zeppelinhubUrl) { this.zepelinhubUrl = zeppelinhubUrl + DEFAULT_API_PATH + "/"; - //TODO(khalid):to make proxy conf consistent with Zeppelin confs - //readProxyConf(); + readProxyConf(); client = getAsyncClient(); try { @@ -74,48 +80,41 @@ public class ZeppelinhubRestApiHandler { } catch (Exception e) { LOG.error("Cannot initialize ZeppelinHub REST async client", e); } - } - + private void readProxyConf() { - //try reading http_proxy - String proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ? - System.getenv("HTTP_PROXY") : System.getenv("http_proxy"); + //try reading https_proxy + String proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ? + System.getenv("HTTPS_PROXY") : System.getenv("https_proxy"); if (StringUtils.isBlank(proxyHostString)) { - //try https_proxy if no http_proxy - proxyHostString = StringUtils.isBlank(System.getenv("https_proxy")) ? - System.getenv("HTTPS_PROXY") : System.getenv("https_proxy"); + //try http_proxy if no https_proxy + proxyHostString = StringUtils.isBlank(System.getenv("http_proxy")) ? + System.getenv("HTTP_PROXY") : System.getenv("http_proxy"); } - if (StringUtils.isBlank(proxyHostString)) { - PROXY_ON = false; - } else { - // host format - http://domain:port/ - String[] parts = proxyHostString.replaceAll("/", "").split(":"); - if (parts.length != 3) { - LOG.warn("Proxy host format is incorrect {}, e.g. http://domain:port/", proxyHostString); - PROXY_ON = false; - return; + if (!StringUtils.isBlank(proxyHostString)) { + URI uri = null; + try { + uri = new URI(proxyHostString); + } catch (URISyntaxException e) { + LOG.warn("Proxy uri doesn't follow correct syntax", e); + } + if (uri != null) { + PROXY_ON = true; + proxyClient = HttpProxyClient.newInstance(uri); } - PROXY_HOST = parts[1]; - PROXY_PORT = Integer.parseInt(parts[2]); - LOG.info("Proxy protocol: {}, domain: {}, port: {}", parts[0], parts[1], parts[2]); - PROXY_ON = true; } } private HttpClient getAsyncClient() { SslContextFactory sslContextFactory = new SslContextFactory(); HttpClient httpClient = new HttpClient(sslContextFactory); - // Configure HttpClient httpClient.setFollowRedirects(false); httpClient.setMaxConnectionsPerDestination(100); + // Config considerations - //TODO(khalid): consider using proxy - //TODO(khalid): consider whether require to follow redirects //TODO(khalid): consider multi-threaded connection manager case - return httpClient; } @@ -159,7 +158,11 @@ public class ZeppelinhubRestApiHandler { return StringUtils.EMPTY; } String url = zepelinhubUrl + argument; - return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true); + if (PROXY_ON) { + return sendToZeppelinHubViaProxy(new HttpGet(url), StringUtils.EMPTY, token, true); + } else { + return sendToZeppelinHub(HttpMethod.GET, url, StringUtils.EMPTY, token, true); + } } public String putWithResponseBody(String token, String url, String json) throws IOException { @@ -167,7 +170,11 @@ public class ZeppelinhubRestApiHandler { LOG.error("Empty note, cannot send it to zeppelinHub"); throw new IOException("Cannot send emtpy note to zeppelinHub"); } - return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true); + if (PROXY_ON) { + return sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl + url), json, token, true); + } else { + return sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl + url, json, token, true); + } } public void put(String token, String jsonNote) throws IOException { @@ -175,7 +182,11 @@ public class ZeppelinhubRestApiHandler { LOG.error("Cannot save empty note/string to ZeppelinHub"); return; } - sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false); + if (PROXY_ON) { + sendToZeppelinHubViaProxy(new HttpPut(zepelinhubUrl), jsonNote, token, false); + } else { + sendToZeppelinHub(HttpMethod.PUT, zepelinhubUrl, jsonNote, token, false); + } } public void del(String token, String argument) throws IOException { @@ -183,7 +194,37 @@ public class ZeppelinhubRestApiHandler { LOG.error("Cannot delete empty note from ZeppelinHub"); return; } - sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, false); + if (PROXY_ON) { + sendToZeppelinHubViaProxy(new HttpDelete(zepelinhubUrl + argument), StringUtils.EMPTY, token, + false); + } else { + sendToZeppelinHub(HttpMethod.DELETE, zepelinhubUrl + argument, StringUtils.EMPTY, token, + false); + } + } + + private String sendToZeppelinHubViaProxy(HttpRequestBase request, + String json, + String token, + boolean withResponse) throws IOException { + request.setHeader(ZEPPELIN_TOKEN_HEADER, token); + if (request.getMethod().equals(HttpPost.METHOD_NAME)) { + HttpPost post = (HttpPost) request; + StringEntity content = new StringEntity(json, "application/json;charset=UTF-8"); + post.setEntity(content); + } + if (request.getMethod().equals(HttpPut.METHOD_NAME)) { + HttpPut put = (HttpPut) request; + StringEntity content = new StringEntity(json, "application/json;charset=UTF-8"); + put.setEntity(content); + } + String body = StringUtils.EMPTY; + if (proxyClient != null) { + body = proxyClient.sendToZeppelinHub(request, withResponse); + } else { + LOG.warn("Proxy client request was submitted while not correctly initialized"); + } + return body; } private String sendToZeppelinHub(HttpMethod method, @@ -243,6 +284,9 @@ public class ZeppelinhubRestApiHandler { public void close() { try { client.stop(); + if (proxyClient != null) { + proxyClient.stop(); + } } catch (Exception e) { LOG.info("Couldn't stop ZeppelinHub client properly", e); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/4dfb8150/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java index 9847e1c..b072251 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/websocket/ZeppelinClient.java @@ -137,9 +137,22 @@ public class ZeppelinClient { new Timer().schedule(new java.util.TimerTask() { @Override public void run() { - watcherSession = openWatcherSession(); + int time = 0; + while (time < 5 * MIN) { + watcherSession = openWatcherSession(); + if (watcherSession == null) { + try { + Thread.sleep(5000); + time += 5; + } catch (InterruptedException e) { + //continue + } + } else { + break; + } + } } - }, 10000); + }, 5000); } public void stop() {