APACHE-KYLIN-2732: better way to fix bug of missing records

Signed-off-by: Zhong <nju_y...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5a6e6d7e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5a6e6d7e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5a6e6d7e

Branch: refs/heads/master
Commit: 5a6e6d7e38342691a057345a9d51c4375df8dc8f
Parents: d103e63
Author: Wang Ken <mingmw...@ebay.com>
Authored: Mon Dec 4 13:13:18 2017 +0800
Committer: Zhong <nju_y...@apache.org>
Committed: Tue Dec 5 00:38:22 2017 +0800

----------------------------------------------------------------------
 .../ConsumeBlockingQueueController.java         |   2 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |  19 ++-
 .../RecordConsumeBlockingQueueController.java   |  43 +++----
 .../ConsumeBlockingQueueControllerTest.java     | 124 +++++++++++++++++++
 .../inmemcubing/ITDoggedCubeBuilderTest.java    |   5 +-
 .../inmemcubing/ITInMemCubeBuilderTest.java     |  13 ++
 6 files changed, 164 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
index a9e55f7..81f44a0 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueController.java
@@ -78,7 +78,7 @@ public class ConsumeBlockingQueueController<T> implements 
Iterator<T> {
         hasException = true;
     }
 
-    public long getOutputRowCount() {
+    public int getOutputRowCount() {
         return outputRowCount.get();
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index ccd7137..d761505 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -50,7 +50,6 @@ public class DoggedCubeBuilder extends 
AbstractInMemCubeBuilder {
 
     private static Logger logger = 
LoggerFactory.getLogger(DoggedCubeBuilder.class);
 
-    private int splitRowThreshold = Integer.MAX_VALUE;
     private int unitRows = ConsumeBlockingQueueController.DEFAULT_BATCH_SIZE;
 
     public DoggedCubeBuilder(CuboidScheduler cuboidScheduler, 
IJoinedFlatTableDesc flatDesc,
@@ -62,11 +61,6 @@ public class DoggedCubeBuilder extends 
AbstractInMemCubeBuilder {
             unitRows /= 10;
     }
 
-    public void setSplitRowThreshold(int rowThreshold) {
-        this.splitRowThreshold = rowThreshold;
-        this.unitRows = Math.min(unitRows, rowThreshold);
-    }
-
     @Override
     public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
             throws IOException {
@@ -80,6 +74,9 @@ public class DoggedCubeBuilder extends 
AbstractInMemCubeBuilder {
 
         public <T> void build(BlockingQueue<T> input, InputConverterUnit<T> 
inputConverterUnit, ICuboidWriter output)
                 throws IOException {
+            final RecordConsumeBlockingQueueController<T> inputController = 
RecordConsumeBlockingQueueController
+                    .getQueueController(inputConverterUnit, input, unitRows);
+
             final List<SplitThread> splits = new ArrayList<SplitThread>();
             final Merger merger = new Merger();
 
@@ -88,8 +85,11 @@ public class DoggedCubeBuilder extends 
AbstractInMemCubeBuilder {
 
             try {
                 while (true) {
-                    SplitThread last = new SplitThread(splits.size() + 1, 
RecordConsumeBlockingQueueController
-                            .getQueueController(inputConverterUnit, input, 
unitRows));
+                    if (inputController.ifEnd()) {
+                        break;
+                    }
+
+                    SplitThread last = new SplitThread(splits.size() + 1, 
inputController);
                     splits.add(last);
 
                     last.start();
@@ -99,9 +99,6 @@ public class DoggedCubeBuilder extends 
AbstractInMemCubeBuilder {
                     last.join();
 
                     checkException(splits);
-                    if (last.inputController.ifEnd()) {
-                        break;
-                    }
                 }
 
                 logger.info("Dogged Cube Build splits complete, took " + 
(System.currentTimeMillis() - start) + " ms");

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
index 49cbe1f..5fc3e32 100644
--- 
a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
+++ 
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/RecordConsumeBlockingQueueController.java
@@ -31,24 +31,34 @@ public class RecordConsumeBlockingQueueController<T> 
extends ConsumeBlockingQueu
    
     private T currentObject = null;
     private volatile boolean ifEnd = false;
-    private volatile boolean cut = false;
-    private long outputRowCountCut = 0L;
 
     @Override
-    public boolean hasNext() {
+    public boolean hasNext() { // should be idempotent
+        if (ifEnd) {
+            return false;
+        }
         if (currentObject != null) {
-            return hasNext(currentObject);
+            return true;
         }
         if (!super.hasNext()) {
             return false;
         }
         currentObject = super.next();
-        return hasNext(currentObject);
+
+        if (inputConverterUnit.ifEnd(currentObject)) {
+            ifEnd = true;
+            return false;
+        } else if (inputConverterUnit.ifCut(currentObject)) {
+            currentObject = null;
+            hasNext();
+            return false;
+        }
+        return true;
     }
 
     @Override
     public T next() {
-        if (ifEnd())
+        if (ifEnd() || currentObject == null)
             throw new IllegalStateException();
 
         T result = currentObject;
@@ -59,18 +69,6 @@ public class RecordConsumeBlockingQueueController<T> extends 
ConsumeBlockingQueu
     public boolean ifEnd() {
         return ifEnd;
     }
-
-    private boolean hasNext(T object) {
-        if (inputConverterUnit.ifEnd(object)) {
-            ifEnd = true;
-            return false;
-        }else if(cut){
-            return false;
-        }else if(inputConverterUnit.ifCut(object)){
-            return false;
-        }
-        return true;
-    }
     
     public static <T> RecordConsumeBlockingQueueController<T> 
getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> 
input){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, 
input, DEFAULT_BATCH_SIZE);
@@ -79,13 +77,4 @@ public class RecordConsumeBlockingQueueController<T> extends 
ConsumeBlockingQueu
     public static <T> RecordConsumeBlockingQueueController<T> 
getQueueController(InputConverterUnit<T> inputConverterUnit, BlockingQueue<T> 
input, int batchSize){
         return new RecordConsumeBlockingQueueController<>(inputConverterUnit, 
input, batchSize);
     }
-
-    public void forceCutPipe() {
-        cut = true;
-        outputRowCountCut = getOutputRowCount();
-    }
-
-    public long getOutputRowCountAfterCut() {
-        return getOutputRowCount() - outputRowCountCut;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
----------------------------------------------------------------------
diff --git 
a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
new file mode 100644
index 0000000..a443596
--- /dev/null
+++ 
b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConsumeBlockingQueueControllerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube.inmemcubing;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumeBlockingQueueControllerTest {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConsumeBlockingQueueControllerTest.class);
+
+    @Test
+    public void testIterator() {
+        final int nRecord = 4345;
+        final int nCut = 2000;
+        final int nBatch = 60;
+
+        final BlockingQueue<String> input = new LinkedBlockingQueue<>();
+        final RecordConsumeBlockingQueueController inputController = 
RecordConsumeBlockingQueueController
+                .getQueueController(new InputConverterUnitTest(), input, 
nBatch);
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    for (int i = 1; i <= nRecord; i++) {
+                        input.put("test");
+                        if (i % nCut == 0) {
+                            input.put(InputConverterUnitTest.CUT_ROW);
+                        }
+                    }
+                    input.put(InputConverterUnitTest.END_ROW);
+                } catch (InterruptedException e) {
+                    logger.warn("Fail to produce records into BlockingQueue 
due to: " + e);
+                }
+            }
+        }).start();
+
+        final AtomicInteger nRecordConsumed = new AtomicInteger(0);
+        final AtomicInteger nSplit = new AtomicInteger(0);
+        while (true) {
+            // producer done & consume the end row flag
+            if (inputController.ifEnd()) {
+                break;
+            }
+            nSplit.incrementAndGet();
+            Thread consumer = new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (inputController.hasNext()) {
+                        inputController.next();
+                        nRecordConsumed.incrementAndGet();
+                    }
+                    System.out.println(nRecordConsumed.get() + " records 
consumed when finished split " + nSplit.get());
+                }
+            });
+            consumer.start();
+            try {
+                consumer.join();
+            } catch (InterruptedException e) {
+                logger.warn("Fail to join consumer thread: " + e);
+            }
+        }
+
+        Assert.assertEquals(nRecord, nRecordConsumed.get());
+    }
+
+    private static class InputConverterUnitTest implements 
InputConverterUnit<String> {
+        public static final String END_ROW = new String();
+        public static final String CUT_ROW = "0";
+
+        @Override
+        public void convert(String currentObject, GTRecord record) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean ifEnd(String currentObject) {
+            return currentObject == END_ROW;
+        }
+
+        @Override
+        public boolean ifCut(String currentObject) {
+            return currentObject == CUT_ROW;
+        }
+
+        @Override
+        public String getEndRow() {
+            return END_ROW;
+        }
+
+        @Override
+        public String getCutRow() {
+            return CUT_ROW;
+        }
+
+        @Override
+        public boolean ifChange() {
+            throw new UnsupportedOperationException();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
index 0338da8..865cdbb 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITDoggedCubeBuilderTest.java
@@ -89,12 +89,11 @@ public class ITDoggedCubeBuilderTest extends 
LocalFileMetadataTestCase {
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
         DoggedCubeBuilder doggedBuilder = new 
DoggedCubeBuilder(cube.getCuboidScheduler(), flatDesc, dictionaryMap);
         doggedBuilder.setConcurrentThreads(THREADS);
-        doggedBuilder.setSplitRowThreshold(SPLIT_ROWS);
         FileRecordWriter doggedResult = new FileRecordWriter();
 
         {
             Future<?> future = 
executorService.submit(doggedBuilder.buildAsRunnable(queue, doggedResult));
-            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed);
+            ITInMemCubeBuilderTest.feedData(cube, flatTable, queue, 
INPUT_ROWS, randSeed, SPLIT_ROWS);
             future.get();
             doggedResult.close();
         }
@@ -110,7 +109,7 @@ public class ITDoggedCubeBuilderTest extends 
LocalFileMetadataTestCase {
             inmemResult.close();
         }
 
-        fileCompare(doggedResult.file, inmemResult.file);
+        fileCompare(inmemResult.file, doggedResult.file);
         doggedResult.file.delete();
         inmemResult.file.delete();
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5a6e6d7e/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
index 3f97f80..f1b65b0 100644
--- 
a/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
+++ 
b/kylin-it/src/test/java/org/apache/kylin/cube/inmemcubing/ITInMemCubeBuilderTest.java
@@ -151,6 +151,11 @@ public class ITInMemCubeBuilderTest extends 
LocalFileMetadataTestCase {
 
     static void feedData(final CubeInstance cube, final String flatTable, 
ArrayBlockingQueue<String[]> queue, int count,
             long randSeed) throws IOException, InterruptedException {
+        feedData(cube, flatTable, queue, count, randSeed, Integer.MAX_VALUE);
+    }
+
+    static void feedData(final CubeInstance cube, final String flatTable, 
ArrayBlockingQueue<String[]> queue, int count,
+            long randSeed, int splitRowThreshold) throws IOException, 
InterruptedException {
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(cube.getDescriptor());
         int nColumns = flatDesc.getAllColumns().size();
 
@@ -178,6 +183,7 @@ public class ITInMemCubeBuilderTest extends 
LocalFileMetadataTestCase {
             rand.setSeed(randSeed);
 
         // output with random data
+        int countOfLastSplit = 0;
         for (; count > 0; count--) {
             String[] row = new String[nColumns];
             for (int i = 0; i < nColumns; i++) {
@@ -185,6 +191,13 @@ public class ITInMemCubeBuilderTest extends 
LocalFileMetadataTestCase {
                 row[i] = candidates[rand.nextInt(candidates.length)];
             }
             queue.put(row);
+
+            // put cut row if possible
+            countOfLastSplit++;
+            if (countOfLastSplit >= splitRowThreshold) {
+                queue.put(InputConverterUnitForRawData.CUT_ROW);
+                countOfLastSplit = 0;
+            }
         }
         queue.put(InputConverterUnitForRawData.END_ROW);
     }

Reply via email to