Github user takezoe commented on a diff in the pull request:

    
https://github.com/apache/incubator-predictionio/pull/421#discussion_r134417513
  
    --- Diff: 
storage/elasticsearch/src/main/scala/org/apache/predictionio/data/storage/elasticsearch/StorageClient.scala
 ---
    @@ -18,27 +18,84 @@
     package org.apache.predictionio.data.storage.elasticsearch
     
     import org.apache.http.HttpHost
    +import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
    +import org.apache.http.impl.client.BasicCredentialsProvider
    +import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
     import org.apache.predictionio.data.storage.BaseStorageClient
     import org.apache.predictionio.data.storage.StorageClientConfig
     import org.apache.predictionio.data.storage.StorageClientException
    +import org.apache.predictionio.workflow.CleanupFunctions
     import org.elasticsearch.client.RestClient
    +import org.elasticsearch.client.RestClientBuilder.HttpClientConfigCallback
     
     import grizzled.slf4j.Logging
     
    -case class ESClient(hosts: Seq[HttpHost]) {
    -  def open(): RestClient = {
    +object ESClient extends Logging {
    +  var _sharedRestClient: Option[RestClient] = None
    +
    +  def open(
    +    hosts: Seq[HttpHost],
    +    basicAuth: Option[(String, String)] = None): RestClient = {
         try {
    -      RestClient.builder(hosts: _*).build()
    +      val newClient = _sharedRestClient match {
    +        case Some(c)  => c
    +        case None     => {
    +          var builder = RestClient.builder(hosts: _*)
    +          builder = basicAuth match {
    +            case Some((username, password)) => 
builder.setHttpClientConfigCallback(
    +              new BasicAuthProvider(username, password))
    +            case None                       => builder}
    +          builder.build()
    +        }
    +      }
    +      _sharedRestClient = Some(newClient)
    +      newClient
         } catch {
           case e: Throwable =>
             throw new StorageClientException(e.getMessage, e)
         }
       }
    +
    +  def close(): Unit = {
    +    if (!_sharedRestClient.isEmpty) {
    +      _sharedRestClient.get.close()
    +      _sharedRestClient = None
    +    }
    --- End diff --
    
    Typically, `Option.get()` is hated in the Scala world. You can write as 
follows instead:
    ```scala
    _sharedRestClient.foreach { client =>
       client.close()
      _sharedRestClient = None
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to