This is an automated email from the ASF dual-hosted git repository.
abhay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/master by this push:
new fe33f69ae RANGER-4129: ArrayIndexOutOfBounds exception may be thrown
while processing events
fe33f69ae is described below
commit fe33f69ae5d4ac4f2aa9788523d0bb7313c150f2
Author: Abhay Kulkarni <[email protected]>
AuthorDate: Tue Mar 14 07:59:00 2023 -0700
RANGER-4129: ArrayIndexOutOfBounds exception may be thrown while processing
events
---
.../source/atlas/AtlasNotificationMapper.java | 52 -----------------
.../tagsync/source/atlas/AtlasTagSource.java | 67 +++++++++++-----------
2 files changed, 35 insertions(+), 84 deletions(-)
diff --git
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
index a7c456b3d..5d5ab8a7d 100644
---
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
+++
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasNotificationMapper.java
@@ -75,29 +75,6 @@ public class AtlasNotificationMapper {
}
}
- public static ServiceTags
processEntityNotification(EntityNotificationWrapper entityNotification) {
-
- ServiceTags ret = null;
-
- if (isNotificationHandled(entityNotification)) {
- try {
- RangerAtlasEntityWithTags entityWithTags = new
RangerAtlasEntityWithTags(entityNotification);
-
- if (entityNotification.getIsEntityDeleteOp()) {
- ret =
buildServiceTagsForEntityDeleteNotification(entityWithTags);
- } else {
- ret = buildServiceTags(entityWithTags, null);
- }
-
- } catch (Exception exception) {
- LOG.error("createServiceTags() failed!! ", exception);
- }
- } else {
- logUnhandledEntityNotification(entityNotification);
- }
- return ret;
- }
-
public static Map<String, ServiceTags>
processAtlasEntities(List<RangerAtlasEntityWithTags> atlasEntities) {
Map<String, ServiceTags> ret = null;
@@ -159,35 +136,6 @@ public class AtlasNotificationMapper {
return ret;
}
- @SuppressWarnings("unchecked")
- static ServiceTags
buildServiceTagsForEntityDeleteNotification(RangerAtlasEntityWithTags
entityWithTags) {
- final ServiceTags ret;
-
- RangerAtlasEntity entity = entityWithTags.getEntity();
- String guid = entity.getGuid();
-
- if (StringUtils.isNotBlank(guid)) {
- ret = new ServiceTags();
- RangerServiceResource serviceResource = new
RangerServiceResource();
- serviceResource.setGuid(guid);
- ret.getServiceResources().add(serviceResource);
- } else {
- ret = buildServiceTags(entityWithTags, null);
- if (ret != null) {
- // tag-definitions should NOT be deleted as part of
service-resource delete
- ret.setTagDefinitions(MapUtils.EMPTY_MAP);
- // Ranger deletes tags associated with deleted service-resource
- ret.setTags(MapUtils.EMPTY_MAP);
- }
- }
-
- if (ret != null) {
- ret.setOp(ServiceTags.OP_DELETE);
- }
-
- return ret;
- }
-
static private Map<String, ServiceTags>
buildServiceTags(List<RangerAtlasEntityWithTags> entitiesWithTags) {
Map<String, ServiceTags> ret = new HashMap<>();
diff --git
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
index 1a3ddecb5..a618cc986 100644
---
a/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
+++
b/tagsync/src/main/java/org/apache/ranger/tagsync/source/atlas/AtlasTagSource.java
@@ -87,7 +87,7 @@ public class AtlasTagSource extends AbstractTagSource {
try {
inputStream.close();
} catch (IOException ioException) {
- LOG.error("Cannot close Atlas
application properties file, file-name:\" +
TAGSYNC_ATLAS_PROPERTIES_FILE_NAME", ioException);
+ LOG.error("Cannot close Atlas
application properties file, file-name:" + TAGSYNC_ATLAS_PROPERTIES_FILE_NAME,
ioException);
}
}
} else {
@@ -214,18 +214,17 @@ public class AtlasTagSource extends AbstractTagSource {
if
(AtlasNotificationMapper.isNotificationHandled(notificationWrapper)) {
-
RangerAtlasEntityWithTags entityWithTags = new
RangerAtlasEntityWithTags(notificationWrapper);
-
if ((notificationWrapper.getIsEntityDeleteOp() && !isHandlingDeleteOps) ||
(!notificationWrapper.getIsEntityDeleteOp() && isHandlingDeleteOps)) {
-
buildAndUploadServiceTags();
+
if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
+
buildAndUploadServiceTags();
+
}
isHandlingDeleteOps = !isHandlingDeleteOps;
}
-
atlasEntitiesWithTags.add(entityWithTags);
+
atlasEntitiesWithTags.add(new RangerAtlasEntityWithTags(notificationWrapper));
} else {
AtlasNotificationMapper.logUnhandledEntityNotification(notificationWrapper);
}
-
messages.add(message);
}
} else {
@@ -256,43 +255,47 @@ public class AtlasTagSource extends AbstractTagSource {
LOG.debug("==> buildAndUploadServiceTags()");
}
- commitToKafka();
+ if (CollectionUtils.isNotEmpty(atlasEntitiesWithTags)) {
- Map<String, ServiceTags> serviceTagsMap =
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+ commitToKafka();
- if (MapUtils.isNotEmpty(serviceTagsMap)) {
- if (serviceTagsMap.size() != 1) {
- LOG.warn("Unexpected!! Notifications
for more than one service received by AtlasTagSource.. Service-Names:[" +
serviceTagsMap.keySet() + "]");
- }
- for (Map.Entry<String, ServiceTags> entry :
serviceTagsMap.entrySet()) {
- if (isHandlingDeleteOps) {
-
entry.getValue().setOp(ServiceTags.OP_DELETE);
-
entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
-
entry.getValue().setTags(Collections.EMPTY_MAP);
- } else {
-
entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+ Map<String, ServiceTags> serviceTagsMap =
AtlasNotificationMapper.processAtlasEntities(atlasEntitiesWithTags);
+
+ if (MapUtils.isNotEmpty(serviceTagsMap)) {
+ if (serviceTagsMap.size() != 1) {
+ LOG.warn("Unexpected!!
Notifications for more than one service received by AtlasTagSource..
Service-Names:[" + serviceTagsMap.keySet() + "]");
}
+ for (Map.Entry<String, ServiceTags>
entry : serviceTagsMap.entrySet()) {
+ if (isHandlingDeleteOps) {
+
entry.getValue().setOp(ServiceTags.OP_DELETE);
+
entry.getValue().setTagDefinitions(Collections.EMPTY_MAP);
+
entry.getValue().setTags(Collections.EMPTY_MAP);
+ } else {
+
entry.getValue().setOp(ServiceTags.OP_ADD_OR_UPDATE);
+ }
- if (LOG.isDebugEnabled()) {
- Gson gsonBuilder = new
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
- String serviceTagsString =
gsonBuilder.toJson(entry.getValue());
+ if (LOG.isDebugEnabled()) {
+ Gson gsonBuilder = new
GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").setPrettyPrinting().create();
+ String
serviceTagsString = gsonBuilder.toJson(entry.getValue());
- LOG.debug("serviceTags=" +
serviceTagsString);
+
LOG.debug("serviceTags=" + serviceTagsString);
+ }
+ updateSink(entry.getValue());
}
- updateSink(entry.getValue());
}
- }
- offsetOfLastMessageDeliveredToRanger =
messages.get(messages.size()-1).getOffset();
+ offsetOfLastMessageDeliveredToRanger =
messages.get(messages.size() - 1).getOffset();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Completed processing batch of
messages of size:[" + messages.size() + "] received from NotificationConsumer");
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Completed processing batch
of messages of size:[" + messages.size() + "] received from
NotificationConsumer");
+ }
+
+ commitToKafka();
- commitToKafka();
+ atlasEntitiesWithTags.clear();
+ messages.clear();
- atlasEntitiesWithTags.clear();
- messages.clear();
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("<== buildAndUploadServiceTags()");