This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new 2da8a1f55 RANGER-4375: updated to log plugin activities asynchronously
- #2
2da8a1f55 is described below
commit 2da8a1f55928ff57d8b4b9cefb8e82109943aa83
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Fri Aug 25 00:50:41 2023 -0700
RANGER-4375: updated to log plugin activities asynchronously - #2
---
.../main/java/org/apache/ranger/biz/AssetMgr.java | 93 ++++++++++++----------
.../ranger/service/RangerTransactionService.java | 57 ++++++++++++-
2 files changed, 108 insertions(+), 42 deletions(-)
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
index 1dc3d372d..8bbeba783 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
@@ -44,7 +44,6 @@ import org.apache.ranger.common.AppConstants;
import org.apache.ranger.common.DateUtil;
import org.apache.ranger.common.JSONUtil;
import org.apache.ranger.common.MessageEnums;
-import org.apache.ranger.common.PropertiesUtil;
import org.apache.ranger.common.RangerCommonEnums;
import org.apache.ranger.common.RangerConstants;
import org.apache.ranger.common.SearchCriteria;
@@ -75,6 +74,9 @@ import org.springframework.stereotype.Component;
@Component
public class AssetMgr extends AssetMgrBase {
+ private static final String PROP_RANGER_LOG_SC_NOT_MODIFIED =
"ranger.log.SC_NOT_MODIFIED";
+ private static final String PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED =
"ranger.plugin.activity.audit.not.modified";
+ private static final String PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE =
"ranger.plugin.activity.audit.commit.inline";
@Autowired
XPermMapService xPermMapService;
@@ -136,7 +138,9 @@ public class AssetMgr extends AssetMgrBase {
@Autowired
ServiceMgr serviceMgr;
- boolean pluginActivityAuditCommitInline = false;
+ boolean rangerLogNotModified = false;
+ boolean pluginActivityAuditLogNotModified = false;
+ boolean pluginActivityAuditCommitInline = false;
private static final Logger logger =
LoggerFactory.getLogger(AssetMgr.class);
@@ -146,9 +150,13 @@ public class AssetMgr extends AssetMgrBase {
public void init() {
logger.info("==> AssetMgr.init()");
- pluginActivityAuditCommitInline =
RangerAdminConfig.getInstance().getBoolean("ranger.plugin.activity.audit.commit.inline",
false);
+ rangerLogNotModified =
RangerAdminConfig.getInstance().getBoolean(PROP_RANGER_LOG_SC_NOT_MODIFIED,
false);
+ pluginActivityAuditLogNotModified =
RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED,
false);
+ pluginActivityAuditCommitInline =
RangerAdminConfig.getInstance().getBoolean(PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE,
false);
- logger.info("ranger.plugin.activity.audit.commit.inline={}",
pluginActivityAuditCommitInline);
+ logger.info("{}={}", PROP_RANGER_LOG_SC_NOT_MODIFIED,
rangerLogNotModified);
+ logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED,
pluginActivityAuditLogNotModified);
+ logger.info("{}={}", PROP_PLUGIN_ACTIVITY_AUDIT_COMMIT_INLINE,
pluginActivityAuditCommitInline);
logger.info("<== AssetMgr.init()");
}
@@ -662,13 +670,10 @@ public class AssetMgr extends AssetMgrBase {
public void createPolicyAudit(final XXPolicyExportAudit
xXPolicyExportAudit) {
final Runnable commitWork;
if (xXPolicyExportAudit.getHttpRetCode() ==
HttpServletResponse.SC_NOT_MODIFIED) {
- boolean logNotModified =
PropertiesUtil.getBooleanProperty("ranger.log.SC_NOT_MODIFIED", false);
- if (!logNotModified) {
- commitWork = null;
+ if (!rangerLogNotModified) {
+ logger.debug("Not logging HttpServletResponse.
SC_NOT_MODIFIED. To enable, set configuration: {}=true",
PROP_RANGER_LOG_SC_NOT_MODIFIED);
- logger.debug("Not logging HttpServletResponse."
- + "SC_NOT_MODIFIED, to enable,
update "
- + ":
ranger.log.SC_NOT_MODIFIED");
+ commitWork = null;
} else {
// Create PolicyExportAudit record after
transaction is completed. If it is created in-line here
// then the TransactionManager will roll-back
the changes because the HTTP return code is
@@ -762,34 +767,40 @@ public class AssetMgr extends AssetMgrBase {
final Runnable commitWork;
if (httpCode == HttpServletResponse.SC_NOT_MODIFIED) {
- // Create or update PluginInfo record after transaction
is completed. If it is created in-line here
- // then the TransactionManager will roll-back the
changes because the HTTP return code is
- // HttpServletResponse.SC_NOT_MODIFIED
-
- switch (entityType) {
- case RangerPluginInfo.ENTITY_TYPE_POLICIES:
- isTagVersionResetNeeded =
rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName())
== null;
- break;
- case RangerPluginInfo.ENTITY_TYPE_TAGS:
- isTagVersionResetNeeded = false;
- break;
- case RangerPluginInfo.ENTITY_TYPE_ROLES:
- isTagVersionResetNeeded = false;
- break;
- case RangerPluginInfo.ENTITY_TYPE_USERSTORE:
- isTagVersionResetNeeded = false;
- break;
- default:
- isTagVersionResetNeeded = false;
- break;
- }
+ if (!pluginActivityAuditLogNotModified) {
+ logger.debug("Not logging HttpServletResponse.
SC_NOT_MODIFIED. To enable, set configuration: {}=true",
PROP_PLUGIN_ACTIVITY_AUDIT_NOT_MODIFIED);
- commitWork = new Runnable() {
- @Override
- public void run() {
-
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded,
clusterName);
+ commitWork = null;
+ } else {
+ // Create or update PluginInfo record after
transaction is completed. If it is created in-line here
+ // then the TransactionManager will roll-back
the changes because the HTTP return code is
+ // HttpServletResponse.SC_NOT_MODIFIED
+
+ switch (entityType) {
+ case
RangerPluginInfo.ENTITY_TYPE_POLICIES:
+ isTagVersionResetNeeded =
rangerDaoManager.getXXService().findAssociatedTagService(pluginInfo.getServiceName())
== null;
+ break;
+ case RangerPluginInfo.ENTITY_TYPE_TAGS:
+ isTagVersionResetNeeded = false;
+ break;
+ case RangerPluginInfo.ENTITY_TYPE_ROLES:
+ isTagVersionResetNeeded = false;
+ break;
+ case
RangerPluginInfo.ENTITY_TYPE_USERSTORE:
+ isTagVersionResetNeeded = false;
+ break;
+ default:
+ isTagVersionResetNeeded = false;
+ break;
}
- };
+
+ commitWork = new Runnable() {
+ @Override
+ public void run() {
+
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded,
clusterName);
+ }
+ };
+ }
} else if (httpCode == HttpServletResponse.SC_NOT_FOUND) {
if ((isPolicyDownloadRequest(entityType) &&
(pluginInfo.getPolicyActiveVersion() == null ||
pluginInfo.getPolicyActiveVersion() == -1))
|| (isTagDownloadRequest(entityType) &&
(pluginInfo.getTagActiveVersion() == null || pluginInfo.getTagActiveVersion()
== -1))
@@ -820,10 +831,12 @@ public class AssetMgr extends AssetMgrBase {
};
}
- if (pluginActivityAuditCommitInline) {
-
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
- } else {
-
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
+ if (commitWork != null) {
+ if (pluginActivityAuditCommitInline) {
+
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
+ } else {
+
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
+ }
}
if (logger.isDebugEnabled()) {
diff --git
a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
index 49d07fcd0..0e7ae7daa 100644
---
a/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
+++
b/security-admin/src/main/java/org/apache/ranger/service/RangerTransactionService.java
@@ -19,6 +19,7 @@
package org.apache.ranger.service;
+import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,22 +36,41 @@ import javax.annotation.PreDestroy;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@Service
public class RangerTransactionService {
+ private static final String PROP_THREADPOOL_SIZE =
"ranger.admin.transaction.service.threadpool.size";
+ private static final String PROP_SUMMARY_LOG_INTERVAL_SEC =
"ranger.admin.transaction.service.summary.log.interval.sec";
+
@Autowired
@Qualifier(value = "transactionManager")
PlatformTransactionManager txManager;
private static final Logger LOG =
LoggerFactory.getLogger(RangerTransactionService.class);
- private ScheduledExecutorService scheduler = null;
+ private ScheduledExecutorService scheduler = null;
+ private AtomicLong scheduledTaskCount = new AtomicLong(0);
+ private AtomicLong executedTaskCount = new AtomicLong(0);
+ private AtomicLong failedTaskCount = new AtomicLong(0);
+ private long summaryLogIntervalMs = 5 * 60 * 1000;
+ private long nextLogSummaryTime =
System.currentTimeMillis() + summaryLogIntervalMs;
@PostConstruct
public void init() {
- scheduler = Executors.newScheduledThreadPool(1);
+ RangerAdminConfig config = RangerAdminConfig.getInstance();
+
+ int numOfThreads = config.getInt(PROP_THREADPOOL_SIZE, 1);
+ long summaryLogIntervalSec =
config.getInt(PROP_SUMMARY_LOG_INTERVAL_SEC, 5 * 60);
+
+ scheduler = Executors.newScheduledThreadPool(numOfThreads);
+ summaryLogIntervalMs = summaryLogIntervalSec * 1000;
+ nextLogSummaryTime = System.currentTimeMillis() +
summaryLogIntervalSec;
+
+ LOG.info("{}={}", PROP_THREADPOOL_SIZE, numOfThreads);
+ LOG.info("{}={}", PROP_SUMMARY_LOG_INTERVAL_SEC,
summaryLogIntervalSec);
}
@PreDestroy
@@ -59,6 +79,8 @@ public class RangerTransactionService {
LOG.info("attempt to shutdown RangerTransactionService");
scheduler.shutdown();
scheduler.awaitTermination(5, TimeUnit.SECONDS);
+
+ logSummary();
}
catch (InterruptedException e) {
LOG.error("RangerTransactionService tasks interrupted");
@@ -90,16 +112,47 @@ public class RangerTransactionService {
}
});
} catch (Exception e) {
+ failedTaskCount.getAndIncrement();
+
LOG.error("Failed to commit TransactionService
transaction", e);
LOG.error("Ignoring...");
+ } finally {
+ executedTaskCount.getAndIncrement();
+ logSummaryIfNeeded();
}
}
}
}, delayInMillis, MILLISECONDS);
+
+ scheduledTaskCount.getAndIncrement();
+
+ logSummaryIfNeeded();
} catch (Exception e) {
LOG.error("Failed to schedule TransactionService transaction:", e);
LOG.error("Ignroing...");
}
}
+ private void logSummaryIfNeeded() {
+ long now = System.currentTimeMillis();
+
+ if (summaryLogIntervalMs > 0 && now > nextLogSummaryTime) {
+ synchronized (this) {
+ if (now > nextLogSummaryTime) {
+ nextLogSummaryTime = now + summaryLogIntervalMs;
+
+ logSummary();
+ }
+ }
+ }
+ }
+
+ private void logSummary() {
+ long scheduled = scheduledTaskCount.get();
+ long executed = executedTaskCount.get();
+ long failed = failedTaskCount.get();
+ long pending = scheduled - executed;
+
+ LOG.info("RangerTransactionService: tasks(scheduled={}, executed={},
failed={}, pending={})", scheduled, executed, failed, pending);
+ }
}
\ No newline at end of file