This is an automated email from the ASF dual-hosted git repository. sarath pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push: new 4100684 ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers 4100684 is described below commit 4100684fa3f63cb2a6267ab24051002ce38de017 Author: Radhika Kundam <rkun...@cloudera.com> AuthorDate: Tue May 11 18:00:49 2021 -0700 ATLAS-4164: [Atlas: Spooling] Tables created after spooling are created before the spooled tables when there is multiple frequent restart in kafka brokers Signed-off-by: Sarath Subramanian <sar...@apache.org> --- .../org/apache/atlas/notification/spool/IndexManagement.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java index b3a586b..f018983 100644 --- a/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java +++ b/notification/src/main/java/org/apache/atlas/notification/spool/IndexManagement.java @@ -98,8 +98,9 @@ public class IndexManagement { } public boolean isPending() { - return !indexReader.isEmpty() || - (indexWriter.getCurrent() != null && indexWriter.getCurrent().getLine() > 0); + return !indexReader.isEmpty() + || (indexWriter.getCurrent() != null && indexWriter.getCurrent().isStatusWriteInProgress()) + || (indexReader.currentIndexRecord != null && indexReader.currentIndexRecord.getStatus() == IndexRecord.STATUS_READ_IN_PROGRESS); } public synchronized DataOutput getSpoolWriter() throws IOException { @@ -146,6 +147,8 @@ public class IndexManagement { public void update(IndexRecord record) { this.indexFileManager.updateIndex(record); + + LOG.info("this.indexFileManager.updateIndex: {}", record.getLine()); } public void flushSpoolWriter() throws IOException { @@ -349,6 +352,9 @@ public class IndexManagement { public IndexRecord next() throws InterruptedException { this.currentIndexRecord = blockingQueue.poll(retryDestinationMS, TimeUnit.MILLISECONDS); + if (this.currentIndexRecord != null) { + this.currentIndexRecord.setStatus(IndexRecord.STATUS_READ_IN_PROGRESS); + } return this.currentIndexRecord; }