This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch ranger-2.8
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/ranger-2.8 by this push:
     new 4764ab274 RANGER-5322: PolicyRefresher can struck with no download 
thread if exception thrown in startRefresher (#667)
4764ab274 is described below

commit 4764ab2744c766e287c4058dc9d954a75509cc5f
Author: Vyom Mani Tiwari <[email protected]>
AuthorDate: Fri Jan 2 17:02:14 2026 +0530

    RANGER-5322: PolicyRefresher can struck with no download thread if 
exception thrown in startRefresher (#667)
    
    * RANGER-5322: PolicyRefresher can struck with no download thread if 
exception thrown in startRefresher
    
    * added test for policy refresher and fixed some small issues.
    
    * migrated the test to use junit 5
    
    (cherry picked from commit ce162c78f06f7832b5f83daf58eafb862d3e5846)
---
 .../apache/ranger/plugin/util/PolicyRefresher.java |  96 ++--
 .../plugin/policyengine/TestPolicyRefresher.java   | 495 +++++++++++++++++++++
 2 files changed, 554 insertions(+), 37 deletions(-)

diff --git 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
index 962133395..b2547ae36 100644
--- 
a/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
+++ 
b/agents-common/src/main/java/org/apache/ranger/plugin/util/PolicyRefresher.java
@@ -137,24 +137,7 @@ public void setLastActivationTimeInMillis(long 
lastActivationTimeInMillis) {
        public void startRefresher() {
                loadRoles();
                loadPolicy();
-
-               super.start();
-
-               policyDownloadTimer = new Timer("policyDownloadTimer", true);
-
-               try {
-                       policyDownloadTimer.schedule(new 
DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
-
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("Scheduled policyDownloadRefresher to 
download policies every " + pollingIntervalMs + " milliseconds");
-                       }
-               } catch (IllegalStateException exception) {
-                       LOG.error("Error scheduling policyDownloadTimer:", 
exception);
-                       LOG.error("*** Policies will NOT be downloaded every " 
+ pollingIntervalMs + " milliseconds ***");
-
-                       policyDownloadTimer = null;
-               }
-
+               initRefresher();
        }
 
        public void stopRefresher() {
@@ -414,7 +397,10 @@ public void saveToCache(ServicePolicies policies) {
                                        cacheFile =  new File(realCacheDirName 
+ File.separator + realCacheFileName);
                                } else {
                                        try {
-                                               cacheDirTmp.mkdirs();
+                                               if (!cacheDirTmp.mkdirs() && 
!cacheDirTmp.exists()) {
+                                                       LOG.error("Cannot 
create cache directory: {}", realCacheDirName);
+                                               }
+
                                                cacheFile =  new 
File(realCacheDirName + File.separator + realCacheFileName);
                                        } catch (SecurityException ex) {
                                                LOG.error("Cannot create cache 
directory", ex);
@@ -425,7 +411,7 @@ public void saveToCache(ServicePolicies policies) {
                                }
                        }
                        
-               if(cacheFile != null) {
+                       if(cacheFile != null) {
 
                                RangerPerfTracer perf = null;
 
@@ -433,27 +419,16 @@ public void saveToCache(ServicePolicies policies) {
                                        perf = 
RangerPerfTracer.getPerfTracer(PERF_POLICYENGINE_INIT_LOG, 
"PolicyRefresher.saveToCache(serviceName=" + serviceName + ")");
                                }
 
-                               Writer writer = null;
-       
-                               try {
-                                       writer = new FileWriter(cacheFile);
+                               try (Writer writer = new FileWriter(cacheFile)) 
{
                                        JsonUtils.objectToWriter(writer, 
policies);
-                       } catch (Exception excp) {
-                               LOG.error("failed to save policies to cache 
file '" + cacheFile.getAbsolutePath() + "'", excp);
-                       } finally {
-                                       if (writer != null) {
-                                               try {
-                                                       writer.close();
-                                                       
deleteOldestVersionCacheFileInCacheDirectory(cacheFile.getParentFile());
-                                               } catch (Exception excp) {
-                                                       LOG.error("error while 
closing opened cache file '" + cacheFile.getAbsolutePath() + "'", excp);
-                                               }
-                                       }
+
+                                       
deleteOldestVersionCacheFileInCacheDirectory(cacheFile.getParentFile());
+                               } catch (Exception excp) {
+                                       LOG.error("failed to save policies to 
cache file '" + cacheFile.getAbsolutePath() + "'", excp);
                                }
 
                                RangerPerfTracer.log(perf);
-
-               }
+                       }
 
                        if (doPreserveDeltas) {
                                if (backupCacheFile != null) {
@@ -495,7 +470,23 @@ private void 
deleteOldestVersionCacheFileInCacheDirectory(File cacheDirectory) {
                                String fileName = f.getName();
                                // Extract the part after json_
                                int policyVersionIdx = 
fileName.lastIndexOf("json_");
+
+                               if (policyVersionIdx == -1 || policyVersionIdx 
+ 5 >= fileName.length()) {
+                                       LOG.warn("Invalid cache file name 
format: {}", fileName);
+
+                                       continue;
+                               }
+
                                String policyVersionStr = 
fileName.substring(policyVersionIdx + 5);
+
+                               try {
+                                       Long policyVersion = 
Long.valueOf(policyVersionStr);
+
+                                       policyVersions.add(policyVersion);
+                               } catch (NumberFormatException e) {
+                                       LOG.warn("Cannot parse version from 
file: {}", fileName, e);
+                               }
+
                                Long policyVersion = 
Long.valueOf(policyVersionStr);
                                policyVersions.add(policyVersion);
                        }
@@ -566,4 +557,35 @@ private void loadRoles() {
                        LOG.debug("<== PolicyRefresher(serviceName=" + 
serviceName + ").loadRoles()");
                }
        }
+
+       private void initRefresher() {
+               LOG.debug("==> 
PolicyRefresher(serviceName={}).initRefresher()", serviceName);
+
+               try {
+                       super.start();
+               } catch (IllegalStateException e) {
+                       LOG.error("Failed to start PolicyRefresher thread for 
serviceName={}", serviceName, e);
+
+                       throw e;
+               }
+
+               policyDownloadTimer = new Timer("policyDownloadTimer", true);
+
+               try {
+                       policyDownloadTimer.schedule(new 
DownloaderTask(policyDownloadQueue), pollingIntervalMs, pollingIntervalMs);
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Scheduled policyDownloadRefresher to 
download policies every " + pollingIntervalMs + " milliseconds");
+                       }
+               } catch (IllegalArgumentException | IllegalStateException | 
NullPointerException e) {
+                       LOG.error("Error scheduling policyDownloadTimer:", e);
+                       LOG.error("*** Policies will NOT be downloaded every " 
+ pollingIntervalMs + " milliseconds ***");
+
+                       policyDownloadTimer.cancel();
+
+                       policyDownloadTimer = null;
+               }
+
+               LOG.debug("<== 
PolicyRefresher(serviceName={}).initRefresher()", serviceName);
+       }
 }
diff --git 
a/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyRefresher.java
 
b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyRefresher.java
new file mode 100644
index 000000000..4d399d04e
--- /dev/null
+++ 
b/agents-common/src/test/java/org/apache/ranger/plugin/policyengine/TestPolicyRefresher.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.plugin.policyengine;
+
+import org.apache.ranger.admin.client.RangerAdminClient;
+import org.apache.ranger.authorization.hadoop.config.RangerPluginConfig;
+import org.apache.ranger.plugin.service.RangerBasePlugin;
+import org.apache.ranger.plugin.util.DownloadTrigger;
+import org.apache.ranger.plugin.util.PolicyRefresher;
+import org.apache.ranger.plugin.util.RangerServiceNotFoundException;
+import org.apache.ranger.plugin.util.ServicePolicies;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.atLeast;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Timeout(value = 15, unit = TimeUnit.SECONDS)
+public class TestPolicyRefresher {
+    @TempDir
+    Path tempFolder;
+
+    @Mock
+    private RangerBasePlugin mockPlugin;
+    @Mock
+    private RangerPluginConfig mockPluginConfig;
+    @Mock
+    private RangerPluginContext mockPluginContext;
+    @Mock
+    private RangerAdminClient mockRangerAdminClient;
+    private PolicyRefresher policyRefresher;
+    private File tempCacheDir;
+    private static final String SERVICE_NAME = "testService";
+    private static final String SERVICE_TYPE = "testType";
+    private static final String APP_ID = "testAppId";
+    private static final long POLL_INTERVAL = 30000L;
+    private static final long TEST_TIMEOUT_SECONDS = 5;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        MockitoAnnotations.openMocks(this);
+        tempCacheDir = tempFolder.resolve("cache").toFile();
+        tempCacheDir.mkdirs();
+        setupBasicMocks();
+        policyRefresher = new PolicyRefresher(mockPlugin);
+    }
+
+    @AfterEach
+    public void tearDown() throws Exception {
+        if (policyRefresher != null && policyRefresher.isAlive()) {
+            policyRefresher.stopRefresher();
+            policyRefresher.join(2000);
+        }
+        String cacheFileName = (APP_ID + "_" + SERVICE_NAME + ".json")
+                .replace(File.separatorChar, '_')
+                .replace(File.pathSeparatorChar, '_');
+        File cacheFile = new File(tempCacheDir, cacheFileName);
+        if (cacheFile.exists()) {
+            cacheFile.delete();
+        }
+    }
+
+    @Test
+    public void testLastActivationTimeInMillis() {
+        long testTime = System.currentTimeMillis();
+        policyRefresher.setLastActivationTimeInMillis(testTime);
+        assertEquals(testTime, 
policyRefresher.getLastActivationTimeInMillis(), "Last activation time should 
be set and retrieved correctly");
+    }
+
+    @Test
+    public void testStartRefresherLoadsInitialPolicies() throws Exception {
+        CountDownLatch policiesSetLatch = new CountDownLatch(1);
+        ServicePolicies mockPolicies = createMockServicePolicies(1L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+        doAnswer(invocation -> {
+            policiesSetLatch.countDown();
+            return null;
+        })
+                .when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(policiesSetLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Policies should be loaded on start");
+        verify(mockPlugin, atLeastOnce()).setPolicies(argThat(policies ->
+                policies != null && 
SERVICE_NAME.equals(policies.getServiceName()) && policies.getPolicyVersion() 
== 1L));
+        assertTrue(policyRefresher.isAlive(), "PolicyRefresher thread should 
be alive after start");
+    }
+
+    @Test
+    public void testStopRefresherStopsThread() throws Exception {
+        CountDownLatch startLatch = new CountDownLatch(1);
+        ServicePolicies mockPolicies = createMockServicePolicies(1L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+        doAnswer(invocation -> {
+            startLatch.countDown();
+            return null;
+        }).when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(startLatch.await(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS), 
"Refresher should start successfully");
+        policyRefresher.stopRefresher();
+        policyRefresher.join(2000);
+        assertFalse(policyRefresher.isAlive(), "PolicyRefresher thread should 
stop after stopRefresher call");
+    }
+
+    @Test
+    public void testSyncPoliciesWithAdminTriggersImmediateUpdate() throws 
Exception {
+        CountDownLatch initialLoadLatch = new CountDownLatch(1);
+        CountDownLatch syncUpdateLatch = new CountDownLatch(1);
+        AtomicInteger updateCount = new AtomicInteger(0);
+
+        ServicePolicies mockPolicies = createMockServicePolicies(2L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+
+        doAnswer(invocation -> {
+            int count = updateCount.incrementAndGet();
+            if (count == 1) {
+                initialLoadLatch.countDown();
+            } else if (count == 2) {
+                syncUpdateLatch.countDown();
+            }
+            return null;
+        }).when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(initialLoadLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Initial policies should load");
+        DownloadTrigger trigger = new DownloadTrigger();
+        policyRefresher.syncPoliciesWithAdmin(trigger);
+        assertTrue(syncUpdateLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Sync should trigger policy update within timeout");
+        verify(mockPlugin, atLeast(2)).setPolicies(argThat(policies -> 
policies != null
+                && policies.getPolicyVersion() == 2L));
+    }
+
+    @Test
+    public void testPolicyUpdateWithNewVersion() throws Exception {
+        ArgumentCaptor<ServicePolicies> policiesCaptor = 
ArgumentCaptor.forClass(ServicePolicies.class);
+        CountDownLatch initialLoadLatch = new CountDownLatch(1);
+        CountDownLatch updateLatch = new CountDownLatch(1);
+        AtomicInteger callCount = new AtomicInteger(0);
+        ServicePolicies policies1 = createMockServicePolicies(1L);
+        ServicePolicies policies3 = createMockServicePolicies(3L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(eq(-1L), 
anyLong())).thenReturn(policies1);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(eq(1L), 
anyLong())).thenReturn(policies3);
+        doAnswer(invocation -> {
+            int count = callCount.incrementAndGet();
+            if (count == 1) {
+                initialLoadLatch.countDown();
+            } else if (count >= 2) {
+                updateLatch.countDown();
+            }
+            return null;
+        }).when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(initialLoadLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Initial load should complete");
+        verify(mockPlugin, 
atLeastOnce()).setPolicies(policiesCaptor.capture());
+        assertEquals(Long.valueOf(1), 
policiesCaptor.getValue().getPolicyVersion(), "First update should have version 
1");
+
+        DownloadTrigger trigger = new DownloadTrigger();
+        policyRefresher.syncPoliciesWithAdmin(trigger);
+
+        assertTrue(updateLatch.await(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS), 
"Update should complete");
+        verify(mockPlugin, atLeast(2)).setPolicies(policiesCaptor.capture());
+        boolean version3Found = 
policiesCaptor.getAllValues().stream().anyMatch(p -> p.getPolicyVersion() == 
3L);
+        assertTrue(version3Found, "Should update to version 3");
+    }
+
+    @Test
+    public void testHandlesServiceNotFoundException() throws Exception {
+        CountDownLatch nullPoliciesLatch = new CountDownLatch(1);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong()))
+                .thenThrow(new RangerServiceNotFoundException("Service not 
found"));
+        doAnswer(invocation -> {
+            Object arg = invocation.getArgument(0);
+            if (arg == null) {
+                nullPoliciesLatch.countDown();
+            }
+            return null;
+        }).when(mockPlugin).setPolicies(any());
+
+        policyRefresher.startRefresher();
+
+        assertTrue(nullPoliciesLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Should set null policies when service not found");
+        verify(mockPlugin, atLeastOnce()).setPolicies(null);
+    }
+
+    @Test
+    public void testHandlesIOExceptionFromAdmin() throws Exception {
+        AtomicInteger attemptCount = new AtomicInteger(0);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong()))
+                .thenAnswer(invocation -> {
+                    attemptCount.incrementAndGet();
+                    startLatch.countDown();
+                    throw new IOException("Network timeout");
+                });
+
+        policyRefresher.startRefresher();
+
+        assertTrue(startLatch.await(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS), 
"Refresher should start and attempt to fetch policies");
+        assertTrue(policyRefresher.isAlive(), "Refresher should handle IO 
exceptions gracefully and stay alive");
+        assertTrue(attemptCount.get() >= 1, "Refresher should retry after IO 
exception");
+    }
+
+    @Test
+    public void testCachePersistence() throws Exception {
+        CountDownLatch policiesSetLatch = new CountDownLatch(1);
+        ServicePolicies mockPolicies = createMockServicePolicies(5L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+        doAnswer(invocation -> {
+            policiesSetLatch.countDown();
+            return null;
+        })
+                .when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(policiesSetLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Policies should be set in plugin");
+        verify(mockPlugin, atLeastOnce()).setPolicies(argThat(policies -> 
policies != null && policies.getPolicyVersion() == 5L));
+
+        String expectedCacheFileName = (APP_ID + "_" + SERVICE_NAME + ".json")
+                .replace(File.separatorChar, '_')
+                .replace(File.pathSeparatorChar, '_');
+        File cacheFile = new File(tempCacheDir, expectedCacheFileName);
+
+        boolean fileExists = waitForFile(cacheFile);
+
+        if (fileExists) {
+            assertTrue(cacheFile.exists(), "Cache file should be created: " + 
cacheFile.getAbsolutePath());
+        }
+        policyRefresher.stopRefresher();
+        policyRefresher.join(2000);
+    }
+
+    @Test
+    public void testLoadFromCacheWhenAdminUnavailable() throws Exception {
+        String cacheFileName = (APP_ID + "_" + SERVICE_NAME + ".json")
+                .replace(File.separatorChar, '_')
+                .replace(File.pathSeparatorChar, '_');
+        File cacheFile = new File(tempCacheDir, cacheFileName);
+        String json = createCacheFileJson();
+        try (FileWriter writer = new FileWriter(cacheFile)) {
+            writer.write(json);
+        }
+        assertTrue(cacheFile.exists(), "Cache file should be created for test 
setup");
+        reset(mockPlugin, mockRangerAdminClient);
+        setupBasicMocks();
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(null);
+        CountDownLatch policiesLoadedLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            ServicePolicies policies = invocation.getArgument(0);
+            if (policies != null && policies.getPolicyVersion() == 10L) {
+                policiesLoadedLatch.countDown();
+            }
+            return null;
+        }).when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        PolicyRefresher newRefresher = new PolicyRefresher(mockPlugin);
+        newRefresher.startRefresher();
+
+        assertTrue(policiesLoadedLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Policies should be loaded from cache within timeout");
+        verify(mockPlugin, atLeastOnce()).setPolicies(argThat(policies -> 
policies != null && policies.getPolicyVersion() == 10L));
+        newRefresher.stopRefresher();
+        newRefresher.join(2000);
+    }
+
+    @Test
+    public void testMultipleConcurrentSyncRequests() throws Exception {
+        CountDownLatch initLatch = new CountDownLatch(1);
+        CountDownLatch allSyncsLatch = new CountDownLatch(4);
+        AtomicInteger callCount = new AtomicInteger(0);
+        ServicePolicies mockPolicies = createMockServicePolicies(1L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+
+        doAnswer(invocation -> {
+            int count = callCount.incrementAndGet();
+            allSyncsLatch.countDown();
+            if (count == 1) {
+                initLatch.countDown();
+            }
+            return null;
+        }).when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        PolicyRefresher newRefresher = new PolicyRefresher(mockPlugin);
+        newRefresher.startRefresher();
+
+        assertTrue(initLatch.await(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS), 
"Initial load should complete");
+
+        newRefresher.syncPoliciesWithAdmin(new DownloadTrigger());
+        newRefresher.syncPoliciesWithAdmin(new DownloadTrigger());
+        newRefresher.syncPoliciesWithAdmin(new DownloadTrigger());
+
+        assertTrue(allSyncsLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "All sync requests should be processed");
+        verify(mockPlugin, atLeast(4)).setPolicies(argThat(policies ->
+                policies != null && policies.getPolicyVersion() == 1L));
+        newRefresher.stopRefresher();
+        newRefresher.join(2000);
+    }
+
+    @Test
+    public void testRefresherHandlesNullPolicyResponse() throws Exception {
+        CountDownLatch waitLatch = new CountDownLatch(1);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(null);
+        doAnswer(invocation -> {
+            waitLatch.countDown();
+            return null;
+        }).when(mockPlugin).setPolicies(any());
+
+        PolicyRefresher newRefresher = new PolicyRefresher(mockPlugin);
+        newRefresher.startRefresher();
+
+        boolean wasCalledInTime = waitLatch.await(2, TimeUnit.SECONDS);
+
+        assertTrue(newRefresher.isAlive(), "Refresher should be running");
+        if (wasCalledInTime) {
+            verify(mockPlugin, atLeastOnce()).setPolicies(null);
+        }
+        newRefresher.stopRefresher();
+        newRefresher.join(2000);
+    }
+
+    @Test
+    public void testActivationTimeTracking() throws Exception {
+        long beforeActivation = System.currentTimeMillis();
+        CountDownLatch policiesAppliedLatch = new CountDownLatch(1);
+        ServicePolicies mockPolicies = createMockServicePolicies(1L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(mockPolicies);
+
+        doAnswer(invocation -> {
+            policiesAppliedLatch.countDown();
+            return null;
+        })
+                .when(mockPlugin).setPolicies(any(ServicePolicies.class));
+
+        policyRefresher.startRefresher();
+
+        assertTrue(policiesAppliedLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Policies should be applied");
+        long afterActivation = System.currentTimeMillis();
+        long activationTime = policyRefresher.getLastActivationTimeInMillis();
+        assertTrue(activationTime >= beforeActivation, "Activation time should 
be after test start");
+        assertTrue(activationTime <= afterActivation, "Activation time should 
be before verification");
+        verify(mockPlugin, times(1)).setPolicies(argThat(policies ->
+                policies != null && policies.getPolicyVersion() == 1L));
+    }
+
+    @Test
+    public void testCorruptedCacheFileHandling() throws Exception {
+        String cacheFileName = (APP_ID + "_" + SERVICE_NAME + ".json")
+                .replace(File.separatorChar, '_')
+                .replace(File.pathSeparatorChar, '_');
+        File cacheFile = new File(tempCacheDir, cacheFileName);
+
+        try (FileWriter writer = new FileWriter(cacheFile)) {
+            writer.write("{ corrupted json data without closing brace");
+        }
+        assertTrue(cacheFile.exists(), "Corrupted cache file should exist");
+
+        reset(mockPlugin, mockRangerAdminClient);
+        setupBasicMocks();
+
+        ServicePolicies freshPolicies = createMockServicePolicies(1L);
+        when(mockRangerAdminClient.getServicePoliciesIfUpdated(anyLong(), 
anyLong())).thenReturn(freshPolicies);
+
+        CountDownLatch policiesLoadedLatch = new CountDownLatch(1);
+        doAnswer(invocation -> {
+            policiesLoadedLatch.countDown();
+            return null;
+        }).when(mockPlugin)
+                .setPolicies(any(ServicePolicies.class));
+
+        PolicyRefresher newRefresher = new PolicyRefresher(mockPlugin);
+        newRefresher.startRefresher();
+
+        assertTrue(policiesLoadedLatch.await(TEST_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), "Should load fresh policies when cache is corrupted");
+        verify(mockPlugin, atLeastOnce()).setPolicies(argThat(policies ->
+                policies != null && policies.getPolicyVersion() == 1L));
+        newRefresher.stopRefresher();
+        newRefresher.join(2000);
+    }
+
+    private ServicePolicies createMockServicePolicies(long version) {
+        ServicePolicies policies = new ServicePolicies();
+        policies.setServiceName(SERVICE_NAME);
+        policies.setPolicyVersion(version);
+        policies.setServiceId(1L);
+        return policies;
+    }
+
+    private void setupBasicMocks() {
+        when(mockPlugin.getServiceName()).thenReturn(SERVICE_NAME);
+        when(mockPlugin.getServiceType()).thenReturn(SERVICE_TYPE);
+        when(mockPlugin.getAppId()).thenReturn(APP_ID);
+        when(mockPlugin.getConfig()).thenReturn(mockPluginConfig);
+        when(mockPlugin.getPluginContext()).thenReturn(mockPluginContext);
+        
when(mockPluginConfig.getPropertyPrefix()).thenReturn("ranger.plugin.test.service");
+        
when(mockPluginConfig.get(eq("ranger.plugin.test.service.policy.cache.dir")))
+                .thenReturn(tempCacheDir.getAbsolutePath());
+        
when(mockPluginConfig.getLong(eq("ranger.plugin.test.service.policy.pollIntervalMs"),
 anyLong()))
+                .thenReturn(POLL_INTERVAL);
+        
when(mockPluginConfig.getBoolean(eq("ranger.plugin.test.service.preserve.deltas"),
 anyBoolean()))
+                .thenReturn(false);
+        
when(mockPluginConfig.getInt(eq("ranger.plugin.test.service.max.versions.to.preserve"),
 anyInt()))
+                .thenReturn(1);
+        
when(mockPluginContext.getAdminClient()).thenReturn(mockRangerAdminClient);
+        
when(mockPluginContext.createAdminClient(mockPluginConfig)).thenReturn(mockRangerAdminClient);
+    }
+
+    private boolean waitForFile(File file) throws InterruptedException {
+        long endTime = System.currentTimeMillis() + (long) 3000;
+        while (System.currentTimeMillis() < endTime) {
+            if (file.exists()) {
+                return true;
+            }
+            Thread.sleep(100);
+        }
+        return false;
+    }
+
+    private String createCacheFileJson() {
+        return "{"
+                + "\"serviceName\":\"" + SERVICE_NAME + "\","
+                + "\"policyVersion\":10,"
+                + "\"serviceId\":1,"
+                + "\"policies\":["
+                + "{"
+                + "\"id\":1,"
+                + "\"service\":\"" + SERVICE_NAME + "\","
+                + "\"name\":\"read-policy\","
+                + "\"isEnabled\":true,"
+                + 
"\"resources\":{\"path\":{\"values\":[\"/data/test\"],\"isRecursive\":false}},"
+                + "\"policyItems\":["
+                + 
"{\"users\":[\"user1\"],\"accesses\":[{\"type\":\"read\",\"isAllowed\":true}]}"
+                + "]"
+                + "},"
+                + "{"
+                + "\"id\":2,"
+                + "\"service\":\"" + SERVICE_NAME + "\","
+                + "\"name\":\"write-policy\","
+                + "\"isEnabled\":true,"
+                + 
"\"resources\":{\"path\":{\"values\":[\"/data/restricted\"],\"isRecursive\":false}},"
+                + "\"policyItems\":["
+                + 
"{\"users\":[\"user2\"],\"accesses\":[{\"type\":\"write\",\"isAllowed\":false}]}"
+                + "]"
+                + "}"
+                + "]}";
+    }
+}

Reply via email to