KYLIN-980 spill to disk when sys available memory is low
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0f8fc239 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0f8fc239 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0f8fc239 Branch: refs/heads/master Commit: 0f8fc239162bbca913b1eceb380d89f674928400 Parents: 4c44080 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Nov 25 14:38:14 2015 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Nov 25 14:40:07 2015 +0800 ---------------------------------------------------------------------- .../common/util/MemoryBudgetController.java | 249 +++++++++++++++++++ .../hadoop/cube/FactDistinctColumnsReducer.java | 33 ++- 2 files changed, 274 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/0f8fc239/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java new file mode 100644 index 0000000..4715ef6 --- /dev/null +++ b/common/src/main/java/org/apache/kylin/common/util/MemoryBudgetController.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.common.util; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; + +public class MemoryBudgetController { + + private static final boolean debug = true; + + public interface MemoryConsumer { + // return number MB released + int freeUp(int mb); + } + + @SuppressWarnings("serial") + public static class NotEnoughBudgetException extends IllegalStateException { + + public NotEnoughBudgetException() { + super(); + } + + public NotEnoughBudgetException(Throwable cause) { + super(cause); + } + } + + private static class ConsumerEntry { + final MemoryConsumer consumer; + int reservedMB; + + ConsumerEntry(MemoryConsumer consumer) { + this.consumer = consumer; + } + } + + public static final MemoryBudgetController ZERO_BUDGET = new MemoryBudgetController(0); + public static final int ONE_MB = 1024 * 1024; + public static final long ONE_GB = 1024 * 1024 * 1024; + + private static final Logger logger = LoggerFactory.getLogger(MemoryBudgetController.class); + + // all budget numbers are in MB + private final int totalBudgetMB; + private final ConcurrentHashMap<MemoryConsumer, ConsumerEntry> booking = new ConcurrentHashMap<MemoryConsumer, ConsumerEntry>(); + private int totalReservedMB; + private final ReentrantLock lock = new ReentrantLock(); + + public MemoryBudgetController(int totalBudgetMB) { + Preconditions.checkArgument(totalBudgetMB >= 0); + Preconditions.checkState(totalBudgetMB <= getSystemAvailMB()); + this.totalBudgetMB = totalBudgetMB; + this.totalReservedMB = 0; + } + + public int getTotalBudgetMB() { + return totalBudgetMB; + } + + public int getTotalReservedMB() { + lock.lock(); + try { + return totalReservedMB; + } finally { + lock.unlock(); + } + } + + public int getRemainingBudgetMB() { + lock.lock(); + try { + return totalBudgetMB - totalReservedMB; + } finally { + lock.unlock(); + } + } + + public void reserveInsist(MemoryConsumer consumer, int requestMB) { + long waitStart = 0; + while (true) { + try { + reserve(consumer, requestMB); + if (debug && waitStart > 0) + logger.debug(consumer + " waited " + (System.currentTimeMillis() - waitStart) + " ms on the " + requestMB + " MB request"); + return; + } catch (NotEnoughBudgetException ex) { + // retry + } + + if (waitStart == 0) + waitStart = System.currentTimeMillis(); + + synchronized (lock) { + try { + lock.wait(); + } catch (InterruptedException e) { + throw new NotEnoughBudgetException(e); + } + } + } + } + + /** reserve without wait, fail with NotEnoughBudgetException immediately if no mem */ + public void reserve(MemoryConsumer consumer, int requestMB) { + if (totalBudgetMB == 0 && requestMB > 0) + throw new NotEnoughBudgetException(); + + boolean ok = false; + while (!ok) { + int gap = calculateGap(consumer, requestMB); + if (gap > 0) { + // to void deadlock, don't hold lock when invoking consumer.freeUp() + tryFreeUp(gap); + } + ok = updateBooking(consumer, requestMB); + } + } + + private int calculateGap(MemoryConsumer consumer, int requestMB) { + lock.lock(); + try { + ConsumerEntry entry = booking.get(consumer); + int curMB = entry == null ? 0 : entry.reservedMB; + int delta = requestMB - curMB; + return delta - (totalBudgetMB - totalReservedMB); + } finally { + lock.unlock(); + } + } + + private void tryFreeUp(int gap) { + // note don't hold lock when calling consumer.freeUp(), that method holding lock for itself and may cause deadlock + for (ConsumerEntry entry : booking.values()) { + int mb = entry.consumer.freeUp(gap); + if (mb > 0) { + lock.lock(); + try { + updateBookingWithDelta(entry.consumer, -mb); + } finally { + lock.unlock(); + } + gap -= mb; + if (gap <= 0) + break; + } + } + if (gap > 0) + throw new NotEnoughBudgetException(); + + if (debug) { + if (getSystemAvailMB() < getRemainingBudgetMB()) { + logger.debug("Remaining budget is " + getRemainingBudgetMB() + " MB free, but system only has " + getSystemAvailMB() + " MB free. If this persists, some memory calculation must be wrong."); + } + } + } + + private boolean updateBooking(MemoryConsumer consumer, int requestMB) { + lock.lock(); + try { + ConsumerEntry entry = booking.get(consumer); + if (entry == null) { + if (requestMB == 0) + return true; + + entry = new ConsumerEntry(consumer); + booking.put(consumer, entry); + } + + int delta = requestMB - entry.reservedMB; + return updateBookingWithDelta(consumer, delta); + } finally { + lock.unlock(); + } + } + + // lock MUST be obtained before entering + private boolean updateBookingWithDelta(MemoryConsumer consumer, int delta) { + if (delta == 0) + return true; + + ConsumerEntry entry = booking.get(consumer); + if (entry == null) { + if (delta <= 0) + return true; + + entry = new ConsumerEntry(consumer); + booking.put(consumer, entry); + } + + // double check gap again, it may be changed by other concurrent requests + if (delta > 0) { + int gap = delta - (totalBudgetMB - totalReservedMB); + if (gap > 0) + return false; + } + + totalReservedMB += delta; + entry.reservedMB += delta; + if (entry.reservedMB == 0) { + booking.remove(entry.consumer); + } + if (debug) { + logger.debug(entry.consumer + " reserved " + entry.reservedMB + " MB, total reserved " + totalReservedMB + " MB, remaining budget " + getRemainingBudgetMB() + " MB"); + } + + if (delta < 0) { + synchronized (lock) { + lock.notifyAll(); + } + } + + return true; + } + + public static long getSystemAvailBytes() { + Runtime runtime = Runtime.getRuntime(); + long totalMemory = runtime.totalMemory(); // current heap allocated to the VM process + long freeMemory = runtime.freeMemory(); // out of the current heap, how much is free + long maxMemory = runtime.maxMemory(); // Max heap VM can use e.g. Xmx setting + long usedMemory = totalMemory - freeMemory; // how much of the current heap the VM is using + long availableMemory = maxMemory - usedMemory; // available memory i.e. Maximum heap size minus the current amount used + return availableMemory; + } + + public static int getSystemAvailMB() { + return (int) (getSystemAvailBytes() / ONE_MB); + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/0f8fc239/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java ---------------------------------------------------------------------- diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java index 89f90ba..f18e840 100644 --- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java +++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java @@ -29,6 +29,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.mr.KylinReducer; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.common.util.MemoryBudgetController; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.cuboid.Cuboid; @@ -38,9 +39,7 @@ import org.apache.kylin.job.hadoop.AbstractHadoopJob; import org.apache.kylin.metadata.model.TblColRef; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; +import java.util.*; /** * @author yangli9 @@ -69,15 +68,34 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text TblColRef col = columnList.get(key.get()); HashSet<ByteArray> set = new HashSet<ByteArray>(); + int count = 0; for (Text textValue : values) { ByteArray value = new ByteArray(Bytes.copy(textValue.getBytes(), 0, textValue.getLength())); set.add(value); + count++; + if (count % 10000 == 0 && MemoryBudgetController.getSystemAvailMB() < 100) { + outputDistinctValues(col, set, context); + set.clear(); + } } - Configuration conf = context.getConfiguration(); - FileSystem fs = FileSystem.get(conf); - String outputPath = conf.get(BatchConstants.OUTPUT_PATH); - FSDataOutputStream out = fs.create(new Path(outputPath, col.getName())); + if (set.isEmpty() == false) { + outputDistinctValues(col, set, context); + } + + } + + private void outputDistinctValues(TblColRef col, Set<ByteArray> set, Context context) throws IOException { + final Configuration conf = context.getConfiguration(); + final FileSystem fs = FileSystem.get(conf); + final String outputPath = conf.get(BatchConstants.OUTPUT_PATH); + final Path outputFile = new Path(outputPath, col.getName()); + FSDataOutputStream out; + if (fs.exists(outputFile)) { + out = fs.append(outputFile); + } else { + out = fs.create(outputFile); + } try { for (ByteArray value : set) { @@ -87,7 +105,6 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text } finally { out.close(); } - } }