This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13c9c41c1f [opt](hudi) reduce the memory usage of avro reader (#23745)
13c9c41c1f is described below

commit 13c9c41c1f74711292bbcd44877efd166b4379b0
Author: Ashin Gau <ashin...@users.noreply.github.com>
AuthorDate: Tue Sep 5 23:59:23 2023 +0800

    [opt](hudi) reduce the memory usage of avro reader (#23745)
    
    1. Reduce the number of threads reading avro logs and keep the readers in a 
fixed thread pool.
    2. Regularly cleaning the cached resolvers in the thread local map by 
reflection.
---
 .../java/org/apache/doris/hudi/HudiJniScanner.java | 141 ++++++++++++++++-----
 .../org/apache/doris/hudi/BaseSplitReader.scala    |  34 +++--
 2 files changed, 125 insertions(+), 50 deletions(-)

diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index 539ab8f7a8..417b338115 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -22,6 +22,9 @@ import org.apache.doris.common.jni.JniScanner;
 import org.apache.doris.common.jni.vec.ColumnType;
 import org.apache.doris.common.jni.vec.ScanPredicate;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.util.WeakIdentityHashMap;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Logger;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -30,14 +33,21 @@ import scala.collection.Iterator;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
 /**
@@ -56,6 +66,56 @@ public class HudiJniScanner extends JniScanner {
     private long getRecordReaderTimeNs = 0;
     private Iterator<InternalRow> recordIterator;
 
+    /**
+     * `GenericDatumReader` of avro is a thread local map, that stores 
`WeakIdentityHashMap`.
+     * `WeakIdentityHashMap` has cached the avro resolving decoder, and the 
cached resolver can only be cleaned when
+     * its avro schema is recycled and become a week reference. However, the 
behavior of the week reference queue
+     * of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is 
very memory intensive, the number of threads
+     * to call the thread local map cannot be too many.
+     * Two solutions:
+     * 1. Reduce the number of threads reading avro logs and keep the readers 
in a fixed thread pool.
+     * 2. Regularly cleaning the cached resolvers in the thread local map by 
reflection.
+     */
+    private static final AtomicLong lastUpdateTime = new 
AtomicLong(System.currentTimeMillis());
+    private static final long RESOLVER_TIME_OUT = 60000;
+    private static final ExecutorService avroReadPool;
+    private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE;
+    private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers 
= new ConcurrentHashMap<>();
+    private static final ReadWriteLock cleanResolverLock = new 
ReentrantReadWriteLock();
+    private static final ScheduledExecutorService cleanResolverService = 
Executors.newScheduledThreadPool(1);
+
+    static {
+        int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 
2 + 1, 4);
+        avroReadPool = Executors.newFixedThreadPool(numThreads,
+                new 
ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
+        LOG.info("Create " + numThreads + " daemon threads to load avro logs");
+
+        Class<?> avroReader = GenericDatumReader.class;
+        try {
+            Field field = avroReader.getDeclaredField("RESOLVER_CACHE");
+            field.setAccessible(true);
+            AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>) 
field.get(null);
+            LOG.info("Get the resolved cache for avro reader");
+        } catch (Exception e) {
+            AVRO_RESOLVER_CACHE = null;
+            LOG.warn("Failed to get the resolved cache for avro reader");
+        }
+
+        cleanResolverService.scheduleAtFixedRate(() -> {
+            cleanResolverLock.writeLock().lock();
+            try {
+                if (System.currentTimeMillis() - lastUpdateTime.get() > 
RESOLVER_TIME_OUT) {
+                    for (WeakIdentityHashMap<?, ?> solver : 
cachedResolvers.values()) {
+                        solver.clear();
+                    }
+                    lastUpdateTime.set(System.currentTimeMillis());
+                }
+            } finally {
+                cleanResolverLock.writeLock().unlock();
+            }
+        }, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS);
+    }
+
     public HudiJniScanner(int fetchSize, Map<String, String> params) {
         debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + 
kv.getValue())
                 .collect(Collectors.joining("\n"));
@@ -84,46 +144,62 @@ public class HudiJniScanner extends JniScanner {
 
     @Override
     public void open() throws IOException {
-        Thread.currentThread().setContextClassLoader(classLoader);
-        initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
-        long startTime = System.nanoTime();
-        // RecordReader will use ProcessBuilder to start a hotspot process, 
which may be stuck,
-        // so use another process to kill this stuck process.
-        // TODO(gaoxin): better way to solve the stuck process?
-        AtomicBoolean isKilled = new AtomicBoolean(false);
-        ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
-        executorService.scheduleAtFixedRate(() -> {
-            if (!isKilled.get()) {
-                synchronized (HudiJniScanner.class) {
-                    List<Long> pids = Utils.getChildProcessIds(
-                            Utils.getCurrentProcId());
-                    for (long pid : pids) {
-                        String cmd = Utils.getCommandLine(pid);
-                        if (cmd != null && 
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
-                            Utils.killProcess(pid);
-                            isKilled.set(true);
-                            LOG.info("Kill hotspot debugger process " + pid);
+        Future<?> avroFuture = avroReadPool.submit(() -> {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            initTableInfo(split.requiredTypes(), split.requiredFields(), 
predicates, fetchSize);
+            long startTime = System.nanoTime();
+            // RecordReader will use ProcessBuilder to start a hotspot 
process, which may be stuck,
+            // so use another process to kill this stuck process.
+            // TODO(gaoxin): better way to solve the stuck process?
+            AtomicBoolean isKilled = new AtomicBoolean(false);
+            ScheduledExecutorService executorService = 
Executors.newScheduledThreadPool(1);
+            executorService.scheduleAtFixedRate(() -> {
+                if (!isKilled.get()) {
+                    synchronized (HudiJniScanner.class) {
+                        List<Long> pids = Utils.getChildProcessIds(
+                                Utils.getCurrentProcId());
+                        for (long pid : pids) {
+                            String cmd = Utils.getCommandLine(pid);
+                            if (cmd != null && 
cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
+                                Utils.killProcess(pid);
+                                isKilled.set(true);
+                                LOG.info("Kill hotspot debugger process " + 
pid);
+                            }
                         }
                     }
                 }
+            }, 100, 1000, TimeUnit.MILLISECONDS);
+
+            cleanResolverLock.readLock().lock();
+            try {
+                lastUpdateTime.set(System.currentTimeMillis());
+                if (ugi != null) {
+                    recordIterator = ugi.doAs(
+                            (PrivilegedExceptionAction<Iterator<InternalRow>>) 
() -> new MORSnapshotSplitReader(
+                                    
split).buildScanIterator(split.requiredFields(), new Filter[0]));
+                } else {
+                    recordIterator = new MORSnapshotSplitReader(split)
+                            .buildScanIterator(split.requiredFields(), new 
Filter[0]);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to open hudi scanner, split params:\n" + 
debugString, e);
+                throw new RuntimeException(e.getMessage(), e);
+            } finally {
+                cleanResolverLock.readLock().unlock();
             }
-        }, 100, 1000, TimeUnit.MILLISECONDS);
-        try {
-            if (ugi != null) {
-                recordIterator = ugi.doAs(
-                        (PrivilegedExceptionAction<Iterator<InternalRow>>) () 
-> new MORSnapshotSplitReader(
-                                
split).buildScanIterator(split.requiredFields(), new Filter[0]));
-            } else {
-                recordIterator = new MORSnapshotSplitReader(split)
-                        .buildScanIterator(split.requiredFields(), new 
Filter[0]);
+            isKilled.set(true);
+            executorService.shutdownNow();
+            if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != 
null) {
+                cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
+                        threadId -> AVRO_RESOLVER_CACHE.get());
             }
+            getRecordReaderTimeNs += System.nanoTime() - startTime;
+        });
+        try {
+            avroFuture.get();
         } catch (Exception e) {
-            LOG.error("Failed to open hudi scanner, split params:\n" + 
debugString, e);
             throw new IOException(e.getMessage(), e);
         }
-        isKilled.set(true);
-        executorService.shutdownNow();
-        getRecordReaderTimeNs += System.nanoTime() - startTime;
     }
 
     @Override
@@ -131,6 +207,7 @@ public class HudiJniScanner extends JniScanner {
         if (recordIterator instanceof Closeable) {
             ((Closeable) recordIterator).close();
         }
+        recordIterator = null;
     }
 
     @Override
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
index cdae395534..5ba16a5e16 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala
@@ -153,7 +153,6 @@ case class HoodieTableInformation(sparkSession: 
SparkSession,
                                   metaClient: HoodieTableMetaClient,
                                   timeline: HoodieTimeline,
                                   tableConfig: HoodieTableConfig,
-                                  tableAvroSchema: Schema,
                                   internalSchemaOpt: Option[InternalSchema])
 
 /**
@@ -215,7 +214,22 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
    * required to fetch table's Avro and Internal schemas
    */
   protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: 
Option[InternalSchema]) = {
-    (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
+    val schemaResolver = new TableSchemaResolver(tableInformation.metaClient)
+    val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
+    val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is =>
+      AvroInternalSchemaConverter.convert(is, namespace + "." + name)
+    } orElse {
+      specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
+    } orElse {
+      split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
+    } getOrElse {
+      Try(schemaResolver.getTableAvroSchema) match {
+        case Success(schema) => schema
+        case Failure(e) =>
+          throw new HoodieSchemaException("Failed to fetch schema from the 
table", e)
+      }
+    }
+    (avroSchema, tableInformation.internalSchemaOpt)
   }
 
   protected lazy val tableStructSchema: StructType = 
convertAvroSchemaToStructType(tableAvroSchema)
@@ -649,27 +663,11 @@ object BaseSplitReader {
               None
           }
         }
-        val tableName = metaClient.getTableConfig.getTableName
-        val (name, namespace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
-        val avroSchema: Schema = internalSchemaOpt.map { is =>
-          AvroInternalSchemaConverter.convert(is, namespace + "." + name)
-        } orElse {
-          specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
-        } orElse {
-          split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
-        } getOrElse {
-          Try(schemaResolver.getTableAvroSchema) match {
-            case Success(schema) => schema
-            case Failure(e) =>
-              throw new HoodieSchemaException("Failed to fetch schema from the 
table", e)
-          }
-        }
 
         HoodieTableInformation(sparkSession,
           metaClient,
           timeline,
           metaClient.getTableConfig,
-          avroSchema,
           internalSchemaOpt)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to