This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new fdfa773b1 RANGER-4375: updated to log plugin activities asynchronously
fdfa773b1 is described below
commit fdfa773b195ddb013e1d2cfd4a273e43d8ad16d9
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Wed Aug 23 18:31:51 2023 -0700
RANGER-4375: updated to log plugin activities asynchronously
---
.../main/java/org/apache/ranger/biz/AssetMgr.java | 55 ++++++++---
.../RangerTransactionSynchronizationAdapter.java | 107 ++++++++++++---------
.../java/org/apache/ranger/rest/ServiceREST.java | 4 -
.../ranger/service/RangerPluginActivityLogger.java | 56 -----------
.../org/apache/ranger/rest/TestServiceREST.java | 4 -
.../service/TestRangerPluginActivityLogger.java | 74 --------------
6 files changed, 102 insertions(+), 198 deletions(-)
diff --git a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
index 84f5ab168..1dc3d372d 100644
--- a/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
+++ b/security-admin/src/main/java/org/apache/ranger/biz/AssetMgr.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.PostConstruct;
import javax.naming.InvalidNameException;
import javax.naming.ldap.LdapName;
import javax.naming.ldap.Rdn;
@@ -38,6 +39,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringUtils;
import org.apache.ranger.amazon.cloudwatch.CloudWatchAccessAuditsService;
+import org.apache.ranger.authorization.hadoop.config.RangerAdminConfig;
import org.apache.ranger.common.AppConstants;
import org.apache.ranger.common.DateUtil;
import org.apache.ranger.common.JSONUtil;
@@ -134,10 +136,23 @@ public class AssetMgr extends AssetMgrBase {
@Autowired
ServiceMgr serviceMgr;
+ boolean pluginActivityAuditCommitInline = false;
+
private static final Logger logger =
LoggerFactory.getLogger(AssetMgr.class);
private static final String adminCapabilities = Long.toHexString(new
RangerPluginCapability().getPluginCapabilities());
+ @PostConstruct
+ public void init() {
+ logger.info("==> AssetMgr.init()");
+
+ pluginActivityAuditCommitInline =
RangerAdminConfig.getInstance().getBoolean("ranger.plugin.activity.audit.commit.inline",
false);
+
+ logger.info("ranger.plugin.activity.audit.commit.inline={}",
pluginActivityAuditCommitInline);
+
+ logger.info("<== AssetMgr.init()");
+ }
+
public File getXResourceFile(Long id, String fileType) {
VXResource xResource = xResourceService.readResource(id);
if (xResource == null) {
@@ -644,13 +659,13 @@ public class AssetMgr extends AssetMgrBase {
}
- public XXPolicyExportAudit createPolicyAudit(
- final XXPolicyExportAudit xXPolicyExportAudit) {
-
- XXPolicyExportAudit ret = null;
+ public void createPolicyAudit(final XXPolicyExportAudit
xXPolicyExportAudit) {
+ final Runnable commitWork;
if (xXPolicyExportAudit.getHttpRetCode() ==
HttpServletResponse.SC_NOT_MODIFIED) {
boolean logNotModified =
PropertiesUtil.getBooleanProperty("ranger.log.SC_NOT_MODIFIED", false);
if (!logNotModified) {
+ commitWork = null;
+
logger.debug("Not logging HttpServletResponse."
+ "SC_NOT_MODIFIED, to enable,
update "
+ ":
ranger.log.SC_NOT_MODIFIED");
@@ -658,20 +673,29 @@ public class AssetMgr extends AssetMgrBase {
// Create PolicyExportAudit record after
transaction is completed. If it is created in-line here
// then the TransactionManager will roll-back
the changes because the HTTP return code is
// HttpServletResponse.SC_NOT_MODIFIED
- Runnable commitWork = new Runnable() {
+ commitWork = new Runnable() {
@Override
public void run() {
rangerDaoManager.getXXPolicyExportAudit().create(xXPolicyExportAudit);
-
}
};
-
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
}
} else {
- ret =
rangerDaoManager.getXXPolicyExportAudit().create(xXPolicyExportAudit);
+ commitWork = new Runnable() {
+ @Override
+ public void run() {
+
rangerDaoManager.getXXPolicyExportAudit().create(xXPolicyExportAudit);
+ }
+ };
}
- return ret;
+ if (commitWork != null) {
+ if (pluginActivityAuditCommitInline) {
+
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
+ } else {
+
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
+ }
+ }
}
public void createPluginInfo(String serviceName, String pluginId,
HttpServletRequest request, int entityType, Long downloadedVersion, Long
lastKnownVersion, long lastActivationTime, int httpCode, String clusterName,
String pluginCapabilities) {
@@ -787,12 +811,19 @@ public class AssetMgr extends AssetMgrBase {
}
} else {
isTagVersionResetNeeded = false;
- commitWork = null;
- doCreateOrUpdateXXPluginInfo(pluginInfo, entityType,
isTagVersionResetNeeded, clusterName);
+
+ commitWork = new Runnable() {
+ @Override
+ public void run() {
+
doCreateOrUpdateXXPluginInfo(pluginInfo, entityType, isTagVersionResetNeeded,
clusterName);
+ }
+ };
}
- if (commitWork != null) {
+ if (pluginActivityAuditCommitInline) {
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
+ } else {
+
transactionSynchronizationAdapter.executeAsyncOnTransactionComplete(commitWork);
}
if (logger.isDebugEnabled()) {
diff --git
a/security-admin/src/main/java/org/apache/ranger/common/db/RangerTransactionSynchronizationAdapter.java
b/security-admin/src/main/java/org/apache/ranger/common/db/RangerTransactionSynchronizationAdapter.java
index d84d772a9..a9ec94e1c 100644
---
a/security-admin/src/main/java/org/apache/ranger/common/db/RangerTransactionSynchronizationAdapter.java
+++
b/security-admin/src/main/java/org/apache/ranger/common/db/RangerTransactionSynchronizationAdapter.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.ranger.service.RangerTransactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,39 +49,29 @@ public class RangerTransactionSynchronizationAdapter
extends TransactionSynchron
@Qualifier(value = "transactionManager")
PlatformTransactionManager txManager;
+ @Autowired
+ RangerTransactionService transactionService;
+
private static final Logger LOG =
LoggerFactory.getLogger(RangerTransactionSynchronizationAdapter.class);
- private static final ThreadLocal<List<Runnable>> RUNNABLES = new
ThreadLocal<List<Runnable>>();
- private static final ThreadLocal<List<Runnable>> RUNNABLES_AFTER_COMMIT =
new ThreadLocal<List<Runnable>>();
+ private static final ThreadLocal<List<Runnable>> RUNNABLES =
new ThreadLocal<>();
+ private static final ThreadLocal<List<Runnable>> RUNNABLES_ASYNC =
new ThreadLocal<>();
+ private static final ThreadLocal<List<Runnable>> RUNNABLES_AFTER_COMMIT =
new ThreadLocal<>();
public void executeOnTransactionCompletion(Runnable runnable) {
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting new runnable {" + runnable + "} to run after
completion");
}
- /*
- From TransactionSynchronizationManager documentation:
- TransactionSynchronizationManager is a central helper that manages
resources and transaction synchronizations per thread.
- Resource management code should only register synchronizations when
this manager is active,
- which can be checked via isSynchronizationActive(); it should perform
immediate resource cleanup else.
- If transaction synchronization isn't active, there is either no
current transaction,
- or the transaction manager doesn't support transaction synchronization.
-
- Note: Synchronization is an Interface for transaction synchronization
callbacks which is implemented by
- TransactionSynchronizationAdapter
- */
+ addRunnable(runnable, RUNNABLES);
+ }
- if (!registerSynchronization()) {
- LOG.info("Transaction synchronization is NOT ACTIVE. Executing
right now runnable {" + runnable + "}");
- runnable.run();
- return;
- }
- List<Runnable> threadRunnables = RUNNABLES.get();
- if (threadRunnables == null) {
- threadRunnables = new ArrayList<Runnable>();
- RUNNABLES.set(threadRunnables);
+ public void executeAsyncOnTransactionComplete(Runnable runnable) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Submitting new runnable {" + runnable + "} to run async
after completion");
}
- threadRunnables.add(runnable);
+
+ addRunnable(runnable, RUNNABLES_ASYNC);
}
public void executeOnTransactionCommit(Runnable runnable) {
@@ -88,29 +79,7 @@ public class RangerTransactionSynchronizationAdapter extends
TransactionSynchron
LOG.debug("Submitting new runnable {" + runnable + "} to run after
transaction is committed");
}
- /*
- From TransactionSynchronizationManager documentation:
- TransactionSynchronizationManager is a central helper that manages
resources and transaction synchronizations per thread.
- Resource management code should only register synchronizations when
this manager is active,
- which can be checked via isSynchronizationActive(); it should perform
immediate resource cleanup else.
- If transaction synchronization isn't active, there is either no
current transaction,
- or the transaction manager doesn't support transaction synchronization.
-
- Note: Synchronization is an Interface for transaction synchronization
callbacks which is implemented by
- TransactionSynchronizationAdapter
- */
-
- if (!registerSynchronization()) {
- LOG.info("Transaction synchronization is NOT ACTIVE. Executing
right now runnable {" + runnable + "}");
- runnable.run();
- return;
- }
- List<Runnable> threadRunnables = RUNNABLES_AFTER_COMMIT.get();
- if (threadRunnables == null) {
- threadRunnables = new ArrayList<Runnable>();
- RUNNABLES_AFTER_COMMIT.set(threadRunnables);
- }
- threadRunnables.add(runnable);
+ addRunnable(runnable, RUNNABLES_AFTER_COMMIT);
}
@Override
@@ -127,6 +96,15 @@ public class RangerTransactionSynchronizationAdapter
extends TransactionSynchron
List<Runnable> runnables = RUNNABLES.get();
RUNNABLES.remove();
+ List<Runnable> asyncRunnables = RUNNABLES_ASYNC.get();
+ RUNNABLES_ASYNC.remove();
+
+ if (asyncRunnables != null) {
+ for (Runnable asyncRunnable : asyncRunnables) {
+
transactionService.scheduleToExecuteInOwnTransaction(asyncRunnable, 0L);
+ }
+ }
+
if (isParentTransactionCommitted) {
// Run tasks scheduled to run after transaction is successfully
committed
runRunnables(runnablesAfterCommit, true);
@@ -140,12 +118,45 @@ public class RangerTransactionSynchronizationAdapter
extends TransactionSynchron
}
}
+ private void addRunnable(Runnable runnable, ThreadLocal<List<Runnable>>
threadRunnables) {
+ /*
+ From TransactionSynchronizationManager documentation:
+ TransactionSynchronizationManager is a central helper that manages
resources and transaction synchronizations per thread.
+ Resource management code should only register synchronizations when
this manager is active,
+ which can be checked via isSynchronizationActive(); it should perform
immediate resource cleanup else.
+ If transaction synchronization isn't active, there is either no
current transaction,
+ or the transaction manager doesn't support transaction synchronization.
+
+ Note: Synchronization is an Interface for transaction synchronization
callbacks which is implemented by
+ TransactionSynchronizationAdapter
+ */
+ if (!registerSynchronization()) {
+ LOG.info("Transaction synchronization is NOT ACTIVE. Executing
right now runnable {" + runnable + "}");
+
+ runnable.run();
+
+ return;
+ }
+
+ List<Runnable> runnables = threadRunnables.get();
+
+ if (runnables == null) {
+ runnables = new ArrayList<>();
+ threadRunnables.set(runnables);
+ }
+
+ runnables.add(runnable);
+ }
+
private boolean registerSynchronization() {
final boolean ret =
TransactionSynchronizationManager.isSynchronizationActive();
+
if (ret) {
List<Runnable> threadRunnablesOnCompletion = RUNNABLES.get();
- List<Runnable> threadRunnablesOnCommit =
RUNNABLES_AFTER_COMMIT.get();
- if (threadRunnablesOnCompletion == null && threadRunnablesOnCommit
== null) {
+ List<Runnable> threadRunnablesOnCommit =
RUNNABLES_AFTER_COMMIT.get();
+ List<Runnable> threadRunnablesAsync = RUNNABLES_ASYNC.get();
+
+ if (threadRunnablesOnCompletion == null && threadRunnablesOnCommit
== null && threadRunnablesAsync == null) {
TransactionSynchronizationManager.registerSynchronization(this);
}
}
diff --git
a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
index de8a15823..852c163df 100644
--- a/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
+++ b/security-admin/src/main/java/org/apache/ranger/rest/ServiceREST.java
@@ -134,7 +134,6 @@ import org.apache.ranger.service.RangerPolicyLabelsService;
import org.apache.ranger.service.RangerPolicyService;
import org.apache.ranger.service.RangerServiceDefService;
import org.apache.ranger.service.RangerServiceService;
-import org.apache.ranger.service.RangerTransactionService;
import org.apache.ranger.service.XUserService;
import org.apache.ranger.view.RangerExportPolicyList;
import org.apache.ranger.view.RangerPluginInfoList;
@@ -249,9 +248,6 @@ public class ServiceREST {
@Autowired
TagDBStore tagStore;
- @Autowired
- RangerTransactionService transactionService;
-
@Autowired
RangerTransactionSynchronizationAdapter
rangerTransactionSynchronizationAdapter;
diff --git
a/security-admin/src/main/java/org/apache/ranger/service/RangerPluginActivityLogger.java
b/security-admin/src/main/java/org/apache/ranger/service/RangerPluginActivityLogger.java
deleted file mode 100644
index ce02fdb8f..000000000
---
a/security-admin/src/main/java/org/apache/ranger/service/RangerPluginActivityLogger.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.service;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.ranger.common.PropertiesUtil;
-import org.apache.ranger.common.db.RangerTransactionSynchronizationAdapter;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-
-@Component
-public class RangerPluginActivityLogger {
-
- @Autowired
- RangerTransactionSynchronizationAdapter transactionSynchronizationAdapter;
-
- private static final Logger LOG =
LoggerFactory.getLogger(RangerPluginActivityLogger.class);
-
- boolean pluginActivityAuditCommitInline = false;
-
- @PostConstruct
- public void init() {
- pluginActivityAuditCommitInline =
PropertiesUtil.getBooleanProperty("ranger.plugin.activity.audit.commit.inline",
false);
- LOG.info("ranger.plugin.activity.audit.commit.inline = " +
pluginActivityAuditCommitInline);
- if (pluginActivityAuditCommitInline) {
- LOG.info("Will use TransactionManager for committing scheduled
work");
- } else {
- LOG.info("Will use separate thread for committing scheduled work");
- }
- }
-
- public void commitAfterTransactionComplete(Runnable commitWork) {
-
transactionSynchronizationAdapter.executeOnTransactionCompletion(commitWork);
- }
-
-}
diff --git
a/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
b/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
index 03ceb6280..6c5addbc9 100644
--- a/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
+++ b/security-admin/src/test/java/org/apache/ranger/rest/TestServiceREST.java
@@ -94,7 +94,6 @@ import org.apache.ranger.service.RangerPolicyLabelsService;
import org.apache.ranger.service.RangerPolicyService;
import org.apache.ranger.service.RangerServiceDefService;
import org.apache.ranger.service.RangerServiceService;
-import org.apache.ranger.service.RangerTransactionService;
import org.apache.ranger.service.XUserService;
import org.apache.ranger.view.*;
import org.junit.Assert;
@@ -223,9 +222,6 @@ public class TestServiceREST {
@Mock
RangerPolicyAdmin policyAdmin;
- @Mock
- RangerTransactionService rangerTransactionService;
-
@Mock
RangerTransactionSynchronizationAdapter
rangerTransactionSynchronizationAdapter;
diff --git
a/security-admin/src/test/java/org/apache/ranger/service/TestRangerPluginActivityLogger.java
b/security-admin/src/test/java/org/apache/ranger/service/TestRangerPluginActivityLogger.java
deleted file mode 100644
index 26d63890f..000000000
---
a/security-admin/src/test/java/org/apache/ranger/service/TestRangerPluginActivityLogger.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/* Copyright 2004, 2005, 2006 Acegi Technology Pty Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ranger.service;
-
-import org.apache.ranger.db.RangerDaoManager;
-import org.apache.ranger.entity.XXPolicyExportAudit;
-import org.junit.FixMethodOrder;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.MethodSorters;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-@FixMethodOrder(MethodSorters.NAME_ASCENDING)
-public class TestRangerPluginActivityLogger {
-
- @Mock
- RangerPluginActivityLogger rangerPluginActivityLogger;
-
- @Mock
- RangerDaoManager rangerDaoManager;
-
- @Mock
- XXPolicyExportAudit xXPolicyExportAudit;
-
- @Mock
- RangerTransactionService transactionService;
-
- @Mock
- Runnable commitWork;
-
- boolean pluginActivityAuditCommitInline = false;
-
- @Test
- public void test1CommitAfterTransactionComplete() {
- pluginActivityAuditCommitInline = false;
-
rangerPluginActivityLogger.commitAfterTransactionComplete(commitWork);
-
- }
-
- @Test
- public void test2Init() {
- pluginActivityAuditCommitInline = false;
- rangerPluginActivityLogger.init();
-
- }
-
- @Test
- public void test3CommitAfterTransactionComplete() {
- pluginActivityAuditCommitInline = true;
-
rangerPluginActivityLogger.commitAfterTransactionComplete(commitWork);
- }
-
- @Test
- public void test4Init() {
- pluginActivityAuditCommitInline = true;
- rangerPluginActivityLogger.init();
- }
-
-}