This is an automated email from the ASF dual-hosted git repository.
abhi pushed a commit to branch ranger-2.6
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/ranger-2.6 by this push:
new 7a6c0f62aa RANGER-4943: Error in ElasticSearchAuditDestination
shutting down RestHighLevelClient client
7a6c0f62aa is described below
commit 7a6c0f62aa07f03597fa26ad0350515e185f7c5d
Author: Fernando Arribas Jara <[email protected]>
AuthorDate: Tue Jan 21 15:20:58 2025 -0800
RANGER-4943: Error in ElasticSearchAuditDestination shutting down
RestHighLevelClient client
---
.../destination/ElasticSearchAuditDestination.java | 67 +++++++++++-----------
1 file changed, 35 insertions(+), 32 deletions(-)
diff --git
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
index 8d02d9d6f2..da8792c702 100644
---
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
+++
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/ElasticSearchAuditDestination.java
@@ -20,6 +20,7 @@
package org.apache.ranger.audit.destination;
import java.io.File;
+import java.io.IOException;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -98,7 +99,8 @@ public void init(Properties props, String propPrefix) {
this.port = MiscUtil.getIntProperty(props, propPrefix + "." +
CONFIG_PORT, 9200);
this.index = getStringProperty(props, propPrefix + "." + CONFIG_INDEX,
DEFAULT_INDEX);
this.hosts = getHosts();
- LOG.info("Connecting to ElasticSearch: " + connectionString());
+
+ LOG.info("Connecting to ElasticSearch: {}", connectionString());
getClient(); // Initialize client
}
@@ -152,9 +154,7 @@ public boolean log(Collection<AuditEventBase> events) {
addFailedCount(1);
logFailedEvent(Arrays.asList(itemRequest),
itemResponse.getFailureMessage());
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Indexed %s",
itemRequest.getEventKey()));
- }
+ LOG.debug("Indexed {}", itemRequest.getEventKey());
addSuccessCount(1);
ret = true;
}
@@ -219,10 +219,7 @@ public static RestClientBuilder
getRestClientBuilder(String urls, String protoco
.map(x -> new HttpHost(x, port, protocol))
.toArray(HttpHost[]::new)
);
- ThreadFactory clientThreadFactory = new ThreadFactoryBuilder()
- .setNameFormat("ElasticSearch rest client %s")
- .setDaemon(true)
- .build();
+ ThreadFactory clientThreadFactory = new
ThreadFactoryBuilder().setNameFormat("ElasticSearch rest client
%s").setDaemon(true).build();
if (StringUtils.isNotBlank(user) && StringUtils.isNotBlank(password)
&& !user.equalsIgnoreCase("NONE") && !password.equalsIgnoreCase("NONE")) {
if (password.contains("keytab") && new File(password).exists()) {
final KerberosCredentialsProvider credentialsProvider =
@@ -236,8 +233,7 @@ public static RestClientBuilder getRestClientBuilder(String
urls, String protoco
return clientBuilder;
});
} else {
- final CredentialsProvider credentialsProvider =
- CredentialsProviderUtil.getBasicCredentials(user,
password);
+ final CredentialsProvider credentialsProvider =
CredentialsProviderUtil.getBasicCredentials(user, password);
restClientBuilder.setHttpClientConfigCallback(clientBuilder ->
{
clientBuilder.setThreadFactory(clientThreadFactory);
clientBuilder.setDefaultCredentialsProvider(credentialsProvider);
@@ -257,42 +253,50 @@ public static RestClientBuilder
getRestClientBuilder(String urls, String protoco
}
private RestHighLevelClient newClient() {
+ RestHighLevelClient restHighLevelClient = null;
+
try {
if (StringUtils.isNotBlank(user) &&
StringUtils.isNotBlank(password) && password.contains("keytab") && new
File(password).exists()) {
subject = CredentialsProviderUtil.login(user, password);
}
- RestClientBuilder restClientBuilder =
- getRestClientBuilder(hosts, protocol, user, password,
port);
- try (RestHighLevelClient restHighLevelClient = new
RestHighLevelClient(restClientBuilder)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialized client");
- }
- boolean exists = false;
- try {
- exists = restHighLevelClient.indices().open(new
OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
- } catch (Exception e) {
- LOG.warn("Error validating index " + this.index);
- }
- if (exists) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Index exists");
- }
- } else {
- LOG.info("Index does not exist");
- }
- return restHighLevelClient;
+ RestClientBuilder restClientBuilder = getRestClientBuilder(hosts,
protocol, user, password, port);
+ restHighLevelClient = new RestHighLevelClient(restClientBuilder);
+ boolean exists = false;
+
+ try {
+ exists = restHighLevelClient.indices().open(new
OpenIndexRequest(this.index), RequestOptions.DEFAULT).isShardsAcknowledged();
+ } catch (Exception e) {
+ LOG.warn("Error validating index {}", this.index);
}
+
+ if (exists) {
+ LOG.debug("Index exists");
+ } else {
+ LOG.info("Index does not exist");
+ }
+
+ return restHighLevelClient;
} catch (Throwable t) {
lastLoggedAt.updateAndGet(lastLoggedAt -> {
long now = System.currentTimeMillis();
long elapsed = now - lastLoggedAt;
if (elapsed > TimeUnit.MINUTES.toMillis(1)) {
- LOG.error("Can't connect to ElasticSearch server: " +
connectionString(), t);
+ LOG.error("Can't connect to ElasticSearch server: {}",
connectionString(), t);
return now;
} else {
return lastLoggedAt;
}
});
+
+ if (restHighLevelClient != null) {
+ try {
+ restHighLevelClient.close();
+ LOG.debug("Closed RestHighLevelClient after failure");
+ } catch (IOException e) {
+ LOG.warn("Error closing RestHighLevelClient: {}",
e.getMessage(), e);
+ }
+ }
+
return null;
}
}
@@ -346,5 +350,4 @@ Map<String, Object> toDoc(AuthzAuditEvent auditEvent) {
doc.put("policyVersion", auditEvent.getPolicyVersion());
return doc;
}
-
}