This is an automated email from the ASF dual-hosted git repository.

abhi pushed a commit to branch ranger-2.6
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/ranger-2.6 by this push:
     new 7a6c0f62aa RANGER-4943: Error in ElasticSearchAuditDestination 
shutting down RestHighLevelClient client
7a6c0f62aa is described below

commit 7a6c0f62aa07f03597fa26ad0350515e185f7c5d
Author: Fernando Arribas Jara <[email protected]>
AuthorDate: Tue Jan 21 15:20:58 2025 -0800

    RANGER-4943: Error in ElasticSearchAuditDestination shutting down 
RestHighLevelClient client
---
 .../destination/ElasticSearchAuditDestination.java | 67 +++++++++++-----------
 1 file changed, 35 insertions(+), 32 deletions(-)

diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
index 8d02d9d6f2..da8792c702 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
@@ -20,6 +20,7 @@
 package org.apache.ranger.audit.destination;
 
 import java.io.File;
+import java.io.IOException;
 import java.security.PrivilegedActionException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -98,7 +99,8 @@ public void init(Properties props, String propPrefix) {
         this.port = MiscUtil.getIntProperty(props, propPrefix + "." + 
CONFIG_PORT, 9200);
         this.index = getStringProperty(props, propPrefix + "." + CONFIG_INDEX, 
DEFAULT_INDEX);
         this.hosts = getHosts();
-        LOG.info("Connecting to ElasticSearch: " + connectionString());
+
+        LOG.info("Connecting to ElasticSearch: {}", connectionString());
         getClient(); // Initialize client
     }
 
@@ -152,9 +154,7 @@ public boolean log(Collection<AuditEventBase> events) {
                         addFailedCount(1);
                         logFailedEvent(Arrays.asList(itemRequest), 
itemResponse.getFailureMessage());
                     } else {
-                        if (LOG.isDebugEnabled()) {
-                            LOG.debug(String.format("Indexed %s", 
itemRequest.getEventKey()));
-                        }
+                        LOG.debug("Indexed {}", itemRequest.getEventKey());
                         addSuccessCount(1);
                         ret = true;
                     }
@@ -219,10 +219,7 @@ public static RestClientBuilder 
getRestClientBuilder(String urls, String protoco
                         .map(x -> new HttpHost(x, port, protocol))
                         .toArray(HttpHost[]::new)
         );
-        ThreadFactory clientThreadFactory = new ThreadFactoryBuilder()
-                .setNameFormat("ElasticSearch rest client %s")
-                .setDaemon(true)
-                .build();
+        ThreadFactory clientThreadFactory = new 
ThreadFactoryBuilder().setNameFormat("ElasticSearch rest client 
%s").setDaemon(true).build();
         if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password) 
&& !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {
             if (password.contains("keytab") && new File(password).exists()) {
                 final KerberosCredentialsProvider credentialsProvider =
@@ -236,8 +233,7 @@ public static RestClientBuilder getRestClientBuilder(String 
urls, String protoco
                     return clientBuilder;
                 });
             } else {
-                final CredentialsProvider credentialsProvider =
-                        CredentialsProviderUtil.getBasicCredentials(user, 
password);
+                final CredentialsProvider credentialsProvider = 
CredentialsProviderUtil.getBasicCredentials(user, password);
                 restClientBuilder.setHttpClientConfigCallback(clientBuilder -> 
{
                     clientBuilder.setThreadFactory(clientThreadFactory);
                     
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
@@ -257,42 +253,50 @@ public static RestClientBuilder 
getRestClientBuilder(String urls, String protoco
     }
 
     private RestHighLevelClient newClient() {
+        RestHighLevelClient restHighLevelClient = null;
+
         try {
             if (StringUtils.isNotBlank(user) && 
StringUtils.isNotBlank(password) && password.contains("keytab") && new 
File(password).exists()) {
                 subject = CredentialsProviderUtil.login(user, password);
             }
-            RestClientBuilder restClientBuilder =
-                    getRestClientBuilder(hosts, protocol, user, password, 
port);
-            try (RestHighLevelClient restHighLevelClient = new 
RestHighLevelClient(restClientBuilder)) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Initialized client");
-                }
-                boolean exists = false;
-                try {
-                    exists = restHighLevelClient.indices().open(new 
OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
-                } catch (Exception e) {
-                    LOG.warn("Error validating index " + this.index);
-                }
-                if (exists) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Index exists");
-                    }
-                } else {
-                    LOG.info("Index does not exist");
-                }
-                return restHighLevelClient;
+            RestClientBuilder restClientBuilder = getRestClientBuilder(hosts, 
protocol, user, password, port);
+            restHighLevelClient = new RestHighLevelClient(restClientBuilder);
+            boolean exists = false;
+
+            try {
+                exists = restHighLevelClient.indices().open(new 
OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
+            } catch (Exception e) {
+                LOG.warn("Error validating index {}", this.index);
             }
+
+            if (exists) {
+                LOG.debug("Index exists");
+            } else {
+                LOG.info("Index does not exist");
+            }
+
+            return restHighLevelClient;
         } catch (Throwable t) {
             lastLoggedAt.updateAndGet(lastLoggedAt -> {
                 long now = System.currentTimeMillis();
                 long elapsed = now - lastLoggedAt;
                 if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
-                    LOG.error("Can't connect to ElasticSearch server: " + 
connectionString(), t);
+                    LOG.error("Can't connect to ElasticSearch server: {}", 
connectionString(), t);
                     return now;
                 } else {
                     return lastLoggedAt;
                 }
             });
+
+            if (restHighLevelClient != null) {
+                try {
+                    restHighLevelClient.close();
+                    LOG.debug("Closed RestHighLevelClient after failure");
+                } catch (IOException e) {
+                    LOG.warn("Error closing RestHighLevelClient: {}", 
e.getMessage(), e);
+                }
+            }
+
             return null;
         }
     }
@@ -346,5 +350,4 @@ Map<String, Object> toDoc(AuthzAuditEvent auditEvent) {
         doc.put("policyVersion", auditEvent.getPolicyVersion());
         return doc;
     }
-
 }

Reply via email to