This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 13c9c41c1f [opt](hudi) reduce the memory usage of avro reader (#23745) 13c9c41c1f is described below commit 13c9c41c1f74711292bbcd44877efd166b4379b0 Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Tue Sep 5 23:59:23 2023 +0800 [opt](hudi) reduce the memory usage of avro reader (#23745) 1. Reduce the number of threads reading avro logs and keep the readers in a fixed thread pool. 2. Regularly cleaning the cached resolvers in the thread local map by reflection. --- .../java/org/apache/doris/hudi/HudiJniScanner.java | 141 ++++++++++++++++----- .../org/apache/doris/hudi/BaseSplitReader.scala | 34 +++-- 2 files changed, 125 insertions(+), 50 deletions(-) diff --git a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java index 539ab8f7a8..417b338115 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java +++ b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java @@ -22,6 +22,9 @@ import org.apache.doris.common.jni.JniScanner; import org.apache.doris.common.jni.vec.ColumnType; import org.apache.doris.common.jni.vec.ScanPredicate; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.util.WeakIdentityHashMap; import org.apache.hadoop.security.UserGroupInformation; import org.apache.log4j.Logger; import org.apache.spark.sql.catalyst.InternalRow; @@ -30,14 +33,21 @@ import scala.collection.Iterator; import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.Field; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; /** @@ -56,6 +66,56 @@ public class HudiJniScanner extends JniScanner { private long getRecordReaderTimeNs = 0; private Iterator<InternalRow> recordIterator; + /** + * `GenericDatumReader` of avro is a thread local map, that stores `WeakIdentityHashMap`. + * `WeakIdentityHashMap` has cached the avro resolving decoder, and the cached resolver can only be cleaned when + * its avro schema is recycled and become a week reference. However, the behavior of the week reference queue + * of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is very memory intensive, the number of threads + * to call the thread local map cannot be too many. + * Two solutions: + * 1. Reduce the number of threads reading avro logs and keep the readers in a fixed thread pool. + * 2. Regularly cleaning the cached resolvers in the thread local map by reflection. + */ + private static final AtomicLong lastUpdateTime = new AtomicLong(System.currentTimeMillis()); + private static final long RESOLVER_TIME_OUT = 60000; + private static final ExecutorService avroReadPool; + private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE; + private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers = new ConcurrentHashMap<>(); + private static final ReadWriteLock cleanResolverLock = new ReentrantReadWriteLock(); + private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1); + + static { + int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4); + avroReadPool = Executors.newFixedThreadPool(numThreads, + new ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build()); + LOG.info("Create " + numThreads + " daemon threads to load avro logs"); + + Class<?> avroReader = GenericDatumReader.class; + try { + Field field = avroReader.getDeclaredField("RESOLVER_CACHE"); + field.setAccessible(true); + AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>) field.get(null); + LOG.info("Get the resolved cache for avro reader"); + } catch (Exception e) { + AVRO_RESOLVER_CACHE = null; + LOG.warn("Failed to get the resolved cache for avro reader"); + } + + cleanResolverService.scheduleAtFixedRate(() -> { + cleanResolverLock.writeLock().lock(); + try { + if (System.currentTimeMillis() - lastUpdateTime.get() > RESOLVER_TIME_OUT) { + for (WeakIdentityHashMap<?, ?> solver : cachedResolvers.values()) { + solver.clear(); + } + lastUpdateTime.set(System.currentTimeMillis()); + } + } finally { + cleanResolverLock.writeLock().unlock(); + } + }, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS); + } + public HudiJniScanner(int fetchSize, Map<String, String> params) { debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue()) .collect(Collectors.joining("\n")); @@ -84,46 +144,62 @@ public class HudiJniScanner extends JniScanner { @Override public void open() throws IOException { - Thread.currentThread().setContextClassLoader(classLoader); - initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); - long startTime = System.nanoTime(); - // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, - // so use another process to kill this stuck process. - // TODO(gaoxin): better way to solve the stuck process? - AtomicBoolean isKilled = new AtomicBoolean(false); - ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); - executorService.scheduleAtFixedRate(() -> { - if (!isKilled.get()) { - synchronized (HudiJniScanner.class) { - List<Long> pids = Utils.getChildProcessIds( - Utils.getCurrentProcId()); - for (long pid : pids) { - String cmd = Utils.getCommandLine(pid); - if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) { - Utils.killProcess(pid); - isKilled.set(true); - LOG.info("Kill hotspot debugger process " + pid); + Future<?> avroFuture = avroReadPool.submit(() -> { + Thread.currentThread().setContextClassLoader(classLoader); + initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize); + long startTime = System.nanoTime(); + // RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck, + // so use another process to kill this stuck process. + // TODO(gaoxin): better way to solve the stuck process? + AtomicBoolean isKilled = new AtomicBoolean(false); + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(() -> { + if (!isKilled.get()) { + synchronized (HudiJniScanner.class) { + List<Long> pids = Utils.getChildProcessIds( + Utils.getCurrentProcId()); + for (long pid : pids) { + String cmd = Utils.getCommandLine(pid); + if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) { + Utils.killProcess(pid); + isKilled.set(true); + LOG.info("Kill hotspot debugger process " + pid); + } } } } + }, 100, 1000, TimeUnit.MILLISECONDS); + + cleanResolverLock.readLock().lock(); + try { + lastUpdateTime.set(System.currentTimeMillis()); + if (ugi != null) { + recordIterator = ugi.doAs( + (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( + split).buildScanIterator(split.requiredFields(), new Filter[0])); + } else { + recordIterator = new MORSnapshotSplitReader(split) + .buildScanIterator(split.requiredFields(), new Filter[0]); + } + } catch (Exception e) { + LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); + throw new RuntimeException(e.getMessage(), e); + } finally { + cleanResolverLock.readLock().unlock(); } - }, 100, 1000, TimeUnit.MILLISECONDS); - try { - if (ugi != null) { - recordIterator = ugi.doAs( - (PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader( - split).buildScanIterator(split.requiredFields(), new Filter[0])); - } else { - recordIterator = new MORSnapshotSplitReader(split) - .buildScanIterator(split.requiredFields(), new Filter[0]); + isKilled.set(true); + executorService.shutdownNow(); + if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) { + cachedResolvers.computeIfAbsent(Thread.currentThread().getId(), + threadId -> AVRO_RESOLVER_CACHE.get()); } + getRecordReaderTimeNs += System.nanoTime() - startTime; + }); + try { + avroFuture.get(); } catch (Exception e) { - LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e); throw new IOException(e.getMessage(), e); } - isKilled.set(true); - executorService.shutdownNow(); - getRecordReaderTimeNs += System.nanoTime() - startTime; } @Override @@ -131,6 +207,7 @@ public class HudiJniScanner extends JniScanner { if (recordIterator instanceof Closeable) { ((Closeable) recordIterator).close(); } + recordIterator = null; } @Override diff --git a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala index cdae395534..5ba16a5e16 100644 --- a/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala +++ b/fe/be-java-extensions/hudi-scanner/src/main/scala/org/apache/doris/hudi/BaseSplitReader.scala @@ -153,7 +153,6 @@ case class HoodieTableInformation(sparkSession: SparkSession, metaClient: HoodieTableMetaClient, timeline: HoodieTimeline, tableConfig: HoodieTableConfig, - tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) /** @@ -215,7 +214,22 @@ abstract class BaseSplitReader(val split: HoodieSplit) { * required to fetch table's Avro and Internal schemas */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { - (tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt) + val schemaResolver = new TableSchemaResolver(tableInformation.metaClient) + val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) + val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is => + AvroInternalSchemaConverter.convert(is, namespace + "." + name) + } orElse { + specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) + } orElse { + split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) + } getOrElse { + Try(schemaResolver.getTableAvroSchema) match { + case Success(schema) => schema + case Failure(e) => + throw new HoodieSchemaException("Failed to fetch schema from the table", e) + } + } + (avroSchema, tableInformation.internalSchemaOpt) } protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema) @@ -649,27 +663,11 @@ object BaseSplitReader { None } } - val tableName = metaClient.getTableConfig.getTableName - val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName) - val avroSchema: Schema = internalSchemaOpt.map { is => - AvroInternalSchemaConverter.convert(is, namespace + "." + name) - } orElse { - specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema) - } orElse { - split.schemaSpec.map(s => convertToAvroSchema(s, tableName)) - } getOrElse { - Try(schemaResolver.getTableAvroSchema) match { - case Success(schema) => schema - case Failure(e) => - throw new HoodieSchemaException("Failed to fetch schema from the table", e) - } - } HoodieTableInformation(sparkSession, metaClient, timeline, metaClient.getTableConfig, - avroSchema, internalSchemaOpt) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org