Repository: kylin Updated Branches: refs/heads/master 43d02f473 -> 5a6e6d7e3
Revert "APACHE-KYLIN-2732: fix bug of missing records" This reverts commit 84779827ad56673848c0e2f6b589a406dac1bce2. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d103e639 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d103e639 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d103e639 Branch: refs/heads/master Commit: d103e639db0530cb21d3f98584fe2552ddfe8b59 Parents: 43d02f4 Author: Zhong <nju_y...@apache.org> Authored: Tue Dec 5 00:37:27 2017 +0800 Committer: Zhong <nju_y...@apache.org> Committed: Tue Dec 5 00:37:27 2017 +0800 ---------------------------------------------------------------------- .../ConsumeBlockingQueueController.java | 6 +- .../RecordConsumeBlockingQueueController.java | 50 ++++---- .../ConsumeBlockingQueueControllerTest.java | 127 ------------------- 3 files changed, 27 insertions(+), 156 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java index 8875618..a9e55f7 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java @@ -48,7 +48,7 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> { if (hasException) { return false; } - if (hasNextInBuffer()) { + if (internalIT.hasNext()) { return true; } else { batchBuffer.clear(); @@ -74,10 +74,6 @@ public class ConsumeBlockingQueueController<T> implements Iterator<T> { throw new UnsupportedOperationException(); } - protected boolean hasNextInBuffer() { - return internalIT.hasNext(); - } - public void findException() { hasException = true; } http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java index 8c51fc6..49cbe1f 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java @@ -18,7 +18,6 @@ package org.apache.kylin.cube.inmemcubing; -import java.util.NoSuchElementException; import java.util.concurrent.BlockingQueue; public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueueController<T> { @@ -32,39 +31,25 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu private T currentObject = null; private volatile boolean ifEnd = false; - private volatile boolean ifCut = false; + private volatile boolean cut = false; + private long outputRowCountCut = 0L; @Override public boolean hasNext() { if (currentObject != null) { - return true; + return hasNext(currentObject); } - if (ifCut) { - if (!super.hasNextInBuffer()) { - return false; - } - } else { - if (!super.hasNext()) { - return false; - } - } - - currentObject = super.next(); - if (inputConverterUnit.ifEnd(currentObject)) { - ifEnd = true; + if (!super.hasNext()) { return false; - } else if (inputConverterUnit.ifCut(currentObject)) { - ifCut = true; - currentObject = null; - return hasNext(); } - return true; + currentObject = super.next(); + return hasNext(currentObject); } @Override public T next() { - if (ifEnd() || currentObject == null) - throw new NoSuchElementException(); + if (ifEnd()) + throw new IllegalStateException(); T result = currentObject; currentObject = null; @@ -74,6 +59,18 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu public boolean ifEnd() { return ifEnd; } + + private boolean hasNext(T object) { + if (inputConverterUnit.ifEnd(object)) { + ifEnd = true; + return false; + }else if(cut){ + return false; + }else if(inputConverterUnit.ifCut(object)){ + return false; + } + return true; + } public static <T> RecordConsumeBlockingQueueController<T> getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> input){ return new RecordConsumeBlockingQueueController<>(inputConverterUnit, input, DEFAULT_BATCH_SIZE); @@ -84,6 +81,11 @@ public class RecordConsumeBlockingQueueController<T> extends ConsumeBlockingQueu } public void forceCutPipe() { - ifCut = true; + cut = true; + outputRowCountCut = getOutputRowCount(); + } + + public long getOutputRowCountAfterCut() { + return getOutputRowCount() - outputRowCountCut; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/d103e639/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java deleted file mode 100644 index 681ae62..0000000 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.cube.inmemcubing; - -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.kylin.gridtable.GTRecord; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ConsumeBlockingQueueControllerTest { - private static final Logger logger = LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class); - - @Test - public void testIterator() { - final int nRecord = 4345; - final int nCut = 2000; - final int nBatch = 60; - - final BlockingQueue<String> input = new LinkedBlockingQueue<>(); - Thread producer = new Thread(new Runnable() { - @Override - public void run() { - try { - for (int i = 1; i <= nRecord; i++) { - input.put("test"); - if (i % nCut == 0) { - input.put(InputConverterUnitTest.CUT_ROW); - } - } - input.put(InputConverterUnitTest.END_ROW); - } catch (InterruptedException e) { - logger.warn("Fail to produce records into BlockingQueue due to: " + e); - } - } - }); - - final AtomicInteger nRecordConsumed = new AtomicInteger(0); - Thread consumer = new Thread(new Runnable() { - @Override - public void run() { - int nSplit = 0; - while (true) { - RecordConsumeBlockingQueueController blockingQueueController = RecordConsumeBlockingQueueController - .getQueueController(new InputConverterUnitTest(), input, nBatch); - while (blockingQueueController.hasNext()) { - blockingQueueController.next(); - nRecordConsumed.incrementAndGet(); - } - System.out.println(nRecordConsumed.get() + " records consumed when finished split " + nSplit); - nSplit++; - - if (blockingQueueController.ifEnd()) { - break; - } - } - } - }); - - producer.start(); - consumer.start(); - - try { - producer.join(); - consumer.join(); - } catch (InterruptedException e) { - logger.warn("Fail to join threads: " + e); - } - - Assert.assertEquals(nRecord, nRecordConsumed.get()); - } - - private static class InputConverterUnitTest implements InputConverterUnit<String> { - public static final String END_ROW = new String(); - public static final String CUT_ROW = "0"; - - @Override - public void convert(String currentObject, GTRecord record) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean ifEnd(String currentObject) { - return currentObject == END_ROW; - } - - @Override - public boolean ifCut(String currentObject) { - return currentObject == CUT_ROW; - } - - @Override - public String getEndRow() { - return END_ROW; - } - - @Override - public String getCutRow() { - return CUT_ROW; - } - - @Override - public boolean ifChange() { - throw new UnsupportedOperationException(); - } - } -}