[
https://issues.apache.org/jira/browse/PIO-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16126318#comment-16126318
]
ASF GitHub Bot commented on PIO-106:
------------------------------------
GitHub user mars opened a pull request:
https://github.com/apache/incubator-predictionio/pull/421
Elasticsearch singleton client with authentication
Fixes both [PIO-106](https://issues.apache.org/jira/browse/PIO-106) &
[PIO-114](https://issues.apache.org/jira/browse/PIO-114), replacing
https://github.com/apache/incubator-predictionio/pull/372. These are combined
because they each heavily revise the same class.
## Authentication
Add optional username-password configuration for the new Elasticsearch 5
client; in `pio-env.sh` config:
```bash
# Optional basic HTTP auth
PIO_STORAGE_SOURCES_ELASTICSEARCH_USERNAME=my-name
PIO_STORAGE_SOURCES_ELASTICSEARCH_PASSWORD=my-secret
```
These credentials are sent in each Elasticsearch request as an HTTP Basic
Authorization header.
Enables use of public-cloud, hosted Elasticsearch clusters, such as [Bonsai
on Heroku](https://elements.heroku.com/addons/bonsai).
## Singleton client
This PR moves to a singleton Elasticsearch RestClient which has built-in
HTTP keep-alive and TCP connection pooling. Running on this branch, we've seen
a 2x speed-up in predictions from the Universal Recommender with ES5, and the
feared "cannot assign requested address" 😱 Elasticsearch connection errors have
completely disappeared. Running `pio batchpredict` for 160K queries results in
only 7 total TCP connections to Elasticsearch. Previously that would escalate
to ~25,000 connections before denying further connections.
**This fundamentally changes the interface for the new [Elasticsearch 5.x
REST
client](https://github.com/apache/incubator-predictionio/tree/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch)**
introduced with PredictionIO 0.11.0-incubating. With this changeset, the
`client` is a single instance of
[`org.elasticsearch.client.RestClient`](https://github.com/elastic/elasticsearch/blob/master/client/rest/src/main/java/org/elasticsearch/client/RestClient.java).
🚨 **As a result of this change, any engine templates that directly use the
Elasticsearch 5 StorageClient would require an update for compatibility.** The
change is this:
### Original
```scala
val client: StorageClient = … // code to instantiate client
val restClient: RestClient = client.open()
try {
restClient.performRequest(…)
} finally {
restClient.close()
}
```
### With this PR
```scala
val client: RestClient = … // code to instantiate client
client.performRequest(…)
```
*No more balancing `open` & `close` as this is handled by using a new
`CleanupFunctions` hook added to the framework in this PR.*
[Universal Recommender](https://github.com/actionml/universal-recommender)
is the only template that I know of which directly uses the ES StorageClient
outside of PredictionIO core. See example [UR changes for compatibility with
this
PR](https://github.com/heroku/predictionio-engine-ur/compare/esclient-singleton).
### Elasticsearch StorageClient changes
* reimplemented as singleton
* installs a cleanup function
See
[StorageClient](https://github.com/apache/incubator-predictionio/compare/develop...mars:esclient-singleton?expand=1#diff-2926f4cfd93ccb02320e2a9503ccd223)
### Core changes
A new
[`CleanupFunctions`](https://github.com/apache/incubator-predictionio/compare/develop...mars:esclient-singleton?expand=1#diff-2a958821ac58f019fbce38540c775f19)
hook has been added which enables developers of storage modules to register
anonymous functions with `CleanupFunctions.add { … }` to be executed after
Spark-related commands/workflows. The hook is called in a `finally {
CleanupFunctions.run() }` from within:
* `pio import`
* `pio export`
* `pio train`
* `pio batchpredict`
Apologies for the huge indentation shifts from the requisite try-finally
blocks:
```scala
try {
// Freshly indented code.
} finally {
CleanupFunctions.run()
}
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mars/incubator-predictionio
esclient-singleton-with-auth
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-predictionio/pull/421.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #421
----
commit f30f27bcc09a397efb42a7923938beceaeac37bf
Author: Mars Hall <[email protected]>
Date: 2017-08-08T23:29:15Z
Migrate to singleton Elasticsearch client to use underlying connection
pooling (PoolingNHttpClientConnectionManager)
commit d99927089a41cb85f525cb74bdf394eed4686bf2
Author: Mars Hall <[email protected]>
Date: 2017-08-10T03:00:58Z
Log stacktrace for Storage initialization errors.
commit dc4c31cbcddbb3b281d52b8099e210adc546d1ed
Author: Mars Hall <[email protected]>
Date: 2017-08-10T22:55:38Z
Remove shade rule that breaks Elasticsearch 5 client
commit 7634a7ab720239d5f8efda85f67b26bdaff797f8
Author: Mars Hall <[email protected]>
Date: 2017-08-10T22:59:01Z
Collect & run cleanup functions to allow spark-submit processes to end
gracefully.
commit 5953451f40e554eafa887328122c794edbbd8f1d
Author: Mars Hall <[email protected]>
Date: 2017-08-11T00:06:24Z
Rename CleanupFunctions to match object name
commit 6c79e2f30e166185e8a2afbda27689b387a6b6d1
Author: Mars Hall <[email protected]>
Date: 2017-08-11T18:26:12Z
Remove a missed `client.close()`
commit bab0666f24af19228626aae4c0be39ca890a1f6a
Author: Mars Hall <[email protected]>
Date: 2017-04-19T18:37:18Z
Optional Elasticsearch 5 support for basic HTTP auth (username & password)
commit 073d734a9efe321314964b48764a27b392610761
Author: Mars Hall <[email protected]>
Date: 2017-04-22T01:28:46Z
Beginning of unit tests for Elasticsearch REST client
commit f7f74bd87fa158168740715e1f42eaa57c3cf0a9
Author: Mars Hall <[email protected]>
Date: 2017-08-11T19:04:24Z
Merge branch 'esclient-singleton' into esclient-auth
commit dc32c51e76cc7c358f59ede7c9f42a8f2fb7fe45
Author: Mars Hall <[email protected]>
Date: 2017-05-23T20:20:03Z
Stabilize the sort order of CLASSPATH; fixes intermittent `INSTANCE` errors
due to wrong version of class loaded
----
> Elasticsearch 5.x StorageClient should reuse RestClient
> -------------------------------------------------------
>
> Key: PIO-106
> URL: https://issues.apache.org/jira/browse/PIO-106
> Project: PredictionIO
> Issue Type: Improvement
> Components: Core
> Affects Versions: 0.11.0-incubating
> Reporter: Mars Hall
> Assignee: Mars Hall
>
> When using the proposed [PIO-105 Batch
> Predictions|https://issues.apache.org/jira/browse/PIO-105] feature with an
> engine that queries Elasticsearch in {{Algorithm#predict}}, Elasticsearch's
> REST interface appears to become overloaded, ending with the Spark job being
> killed from errors like:
> {noformat}
> [ERROR] [ESChannels] Failed to access to /pio_meta/channels/_search
> [ERROR] [Utils] Aborting task
> [ERROR] [ESApps] Failed to access to /pio_meta/apps/_search
> [ERROR] [Executor] Exception in task 747.0 in stage 1.0 (TID 749)
> [ERROR] [Executor] Exception in task 735.0 in stage 1.0 (TID 737)
> [ERROR] [Common$] Invalid app name ur
> [ERROR] [Utils] Aborting task
> [ERROR] [URAlgorithm] Error when read recent events:
> java.lang.IllegalArgumentException: Invalid app name ur
> [ERROR] [Executor] Exception in task 749.0 in stage 1.0 (TID 751)
> [ERROR] [Utils] Aborting task
> [ERROR] [Executor] Exception in task 748.0 in stage 1.0 (TID 750)
> [WARN] [TaskSetManager] Lost task 749.0 in stage 1.0 (TID 751, localhost,
> executor driver): java.net.BindException: Can't assign requested address
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:454)
> at sun.nio.ch.Net.connect(Net.java:446)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:648)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processSessionRequests(DefaultConnectingIOReactor.java:273)
> at
> org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEvents(DefaultConnectingIOReactor.java:139)
> at
> org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:348)
> at
> org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
> at
> org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
> After these errors happen & the job is killed, Elasticsearch immediately
> recovers. It responds to queries normally. I researched what could cause this
> and found an [old issue in the main Elasticsearch
> repo|https://github.com/elastic/elasticsearch/issues/3647]. With the hints
> given therein about *using keep-alive in the ES client* to avoid these
> performance issues, I investigated how PredictionIO's [Elasticsearch
> StorageClient|https://github.com/apache/incubator-predictionio/tree/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch]
> manages its connections.
> I found that unlike the other StorageClients (Elasticsearch1, HBase, JDBC),
> Elasticsearch creates a new underlying connection, an Elasticsearch
> RestClient, for
> [every|https://github.com/apache/incubator-predictionio/blob/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala#L80]
>
> [single|https://github.com/apache/incubator-predictionio/blob/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESApps.scala#L157]
>
> [query|https://github.com/apache/incubator-predictionio/blob/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESChannels.scala#L78]
> &
> [interaction|https://github.com/apache/incubator-predictionio/blob/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/ESEngineInstances.scala#L205]
> with its API. As a result, *there is no way Elasticsearch TCP connections
> can be reused via HTTP keep-alive*.
> High-performance workloads with Elasticsearch 5.x will suffer from these
> issues unless we refactor Elasticsearch StorageClient to share the underlying
> RestClient instead of [building a new one everytime the client is
> used|https://github.com/apache/incubator-predictionio/blob/develop/storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala#L31].
> There are certainly different approaches we could take to sharing a
> RestClient so that its keep-alive behavior may work as designed:
> * maintain a singleton RestClient that is reused throughout the ES storage
> classes
> * create a RestClient on-demand and pass it as an argument to ES storage
> methods
> * other ideas?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)