APACHE-KYLIN-2732: better way to fix bug of missing records Signed-off-by: Zhong <nju_y...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a6e6d7e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a6e6d7e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a6e6d7e Branch: refs/heads/master Commit: 5a6e6d7e38342691a057345a9d51c4375df8dc8f Parents: d103e63 Author: Wang Ken <mingmw...@ebay.com> Authored: Mon Dec 4 13:13:18 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Tue Dec 5 00:38:22 2017 +0800 ---------------------------------------------------------------------- .../ConsumeBlockingQueueController.java | 2 +- .../cube/inmemcubing/DoggedCubeBuilder.java | 19 ++- .../RecordConsumeBlockingQueueController.java | 43 +++---- .../ConsumeBlockingQueueControllerTest.java | 124 +++++++++++++++++++ .../inmemcubing/ITDoggedCubeBuilderTest.java | 5 +- .../inmemcubing/ITInMemCubeBuilderTest.java | 13 ++ 6 files changed, 164 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java index a9e55f7..81f44a0 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java @@ -78,7 +78,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> { hasException = true; } - public long getOutputRowCount() { + public int getOutputRowCount() { return outputRowCount.get(); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java index ccd7137..d761505 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java @@ -50,7 +50,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { private static Logger logger = LoggerFactory.getLogger(DoggedCubeBuilder.class); - private int splitRowThreshold = Integer.MAX_VALUE; private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE; public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, IJoinedFlatTableDesc flatDesc, @@ -62,11 +61,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { unitRows /= 10; } - public void setSplitRowThreshold(int rowThreshold) { - this.splitRowThreshold = rowThreshold; - this.unitRows = Math.min(unitRows, rowThreshold); - } - @Override public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) throws IOException { @@ -80,6 +74,9 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> inputConverterUnit, ICuboidWriter output) throws IOException { + final RecordConsumeBlockingQueueController<T> inputController = RecordConsumeBlockingQueueController + .getQueueController(inputConverterUnit, input, unitRows); + final List<SplitThread> splits = new ArrayList<SplitThread>(); final Merger merger = new Merger(); @@ -88,8 +85,11 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { try { while (true) { - SplitThread last = new SplitThread(splits.size() + 1, RecordConsumeBlockingQueueController - .getQueueController(inputConverterUnit, input, unitRows)); + if (inputController.ifEnd()) { + break; + } + + SplitThread last = new SplitThread(splits.size() + 1, inputController); splits.add(last); last.start(); @@ -99,9 +99,6 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder { last.join(); checkException(splits); - if (last.inputController.ifEnd()) { - break; - } } logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms"); http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java index 49cbe1f..5fc3e32 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java @@ -31,24 +31,34 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu private T currentObject = null; private volatile boolean ifEnd = false; - private volatile boolean cut = false; - private long outputRowCountCut = 0L; @Override - public boolean hasNext() { + public boolean hasNext() { // should be idempotent + if (ifEnd) { + return false; + } if (currentObject != null) { - return hasNext(currentObject); + return true; } if (!super.hasNext()) { return false; } currentObject = super.next(); - return hasNext(currentObject); + + if (inputConverterUnit.ifEnd(currentObject)) { + ifEnd = true; + return false; + } else if (inputConverterUnit.ifCut(currentObject)) { + currentObject = null; + hasNext(); + return false; + } + return true; } @Override public T next() { - if (ifEnd()) + if (ifEnd() || currentObject == null) throw new IllegalStateException(); T result = currentObject; @@ -59,18 +69,6 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu public boolean ifEnd() { return ifEnd; } - - private boolean hasNext(T object) { - if (inputConverterUnit.ifEnd(object)) { - ifEnd = true; - return false; - }else if(cut){ - return false; - }else if(inputConverterUnit.ifCut(object)){ - return false; - } - return true; - } public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){ return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE); @@ -79,13 +77,4 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input, int batchSize){ return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, batchSize); } - - public void forceCutPipe() { - cut = true; - outputRowCountCut = getOutputRowCount(); - } - - public long getOutputRowCountAfterCut() { - return getOutputRowCount() - outputRowCountCut; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java new file mode 100644 index 0000000..a443596 --- /dev/null +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.cube.inmemcubing; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kylin.gridtable.GTRecord; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConsumeBlockingQueueControllerTest { + private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class); + + @Test + public void testIterator() { + final int nRecord = 4345; + final int nCut = 2000; + final int nBatch = 60; + + final BlockingQueue<String> input = new LinkedBlockingQueue<>(); + final RecordConsumeBlockingQueueController inputController = RecordConsumeBlockingQueueController + .getQueueController(new InputConverterUnitTest(), input, nBatch); + + new Thread(new Runnable() { + @Override + public void run() { + try { + for (int i = 1; i <= nRecord; i++) { + input.put("test"); + if (i % nCut == 0) { + input.put(InputConverterUnitTest.CUT_ROW); + } + } + input.put(InputConverterUnitTest.END_ROW); + } catch (InterruptedException e) { + logger.warn("Fail to produce records into BlockingQueue due to: " + e); + } + } + }).start(); + + final AtomicInteger nRecordConsumed = new AtomicInteger(0); + final AtomicInteger nSplit = new AtomicInteger(0); + while (true) { + // producer done & consume the end row flag + if (inputController.ifEnd()) { + break; + } + nSplit.incrementAndGet(); + Thread consumer = new Thread(new Runnable() { + @Override + public void run() { + while (inputController.hasNext()) { + inputController.next(); + nRecordConsumed.incrementAndGet(); + } + System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit.get()); + } + }); + consumer.start(); + try { + consumer.join(); + } catch (InterruptedException e) { + logger.warn("Fail to join consumer thread: " + e); + } + } + + Assert.assertEquals(nRecord, nRecordConsumed.get()); + } + + private static class InputConverterUnitTest implements InputConverterUnit<String> { + public static final String END_ROW = new String(); + public static final String CUT_ROW = "0"; + + @Override + public void convert(String currentObject, GTRecord record) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean ifEnd(String currentObject) { + return currentObject == END_ROW; + } + + @Override + public boolean ifCut(String currentObject) { + return currentObject == CUT_ROW; + } + + @Override + public String getEndRow() { + return END_ROW; + } + + @Override + public String getCutRow() { + return CUT_ROW; + } + + @Override + public boolean ifChange() { + throw new UnsupportedOperationException(); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java index 0338da8..865cdbb 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java @@ -89,12 +89,11 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); DoggedCubeBuilder doggedBuilder = new DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap); doggedBuilder.setConcurrentThreads(THREADS); - doggedBuilder.setSplitRowThreshold(SPLIT_ROWS); FileRecordWriter doggedResult = new FileRecordWriter(); { Future<?> future = executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult)); - ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed); + ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, INPUT_ROWS, randSeed, SPLIT_ROWS); future.get(); doggedResult.close(); } @@ -110,7 +109,7 @@ public class ITDoggedCubeBuilderTest extends LocalFileMetadataTestCase { inmemResult.close(); } - fileCompare(doggedResult.file, inmemResult.file); + fileCompare(inmemResult.file, doggedResult.file); doggedResult.file.delete(); inmemResult.file.delete(); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java index 3f97f80..f1b65b0 100644 --- a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java @@ -151,6 +151,11 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count, long randSeed) throws IOException, InterruptedException { + feedData(cube, flatTable, queue, count, randSeed, Integer.MAX_VALUE); + } + + static void feedData(final CubeInstance cube, final String flatTable, ArrayBlockingQueue<String[]> queue, int count, + long randSeed, int splitRowThreshold) throws IOException, InterruptedException { IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor()); int nColumns = flatDesc.getAllColumns().size(); @@ -178,6 +183,7 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { rand.setSeed(randSeed); // output with random data + int countOfLastSplit = 0; for (; count > 0; count--) { String[] row = new String[nColumns]; for (int i = 0; i < nColumns; i++) { @@ -185,6 +191,13 @@ public class ITInMemCubeBuilderTest extends LocalFileMetadataTestCase { row[i] = candidates[rand.nextInt(candidates.length)]; } queue.put(row); + + // put cut row if possible + countOfLastSplit++; + if (countOfLastSplit >= splitRowThreshold) { + queue.put(InputConverterUnitForRawData.CUT_ROW); + countOfLastSplit = 0; + } } queue.put(InputConverterUnitForRawData.END_ROW); }