This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this
push:
new 602c461 KYLIN-4797 Correct inputRecordSizes of segment when there is
no data in this segment
602c461 is described below
commit 602c46166384d018966312b8a57184458f9ffc0c
Author: Zhichao Zhang <[email protected]>
AuthorDate: Fri Oct 23 11:59:03 2020 +0800
KYLIN-4797 Correct inputRecordSizes of segment when there is no data in
this segment
---
.../org/apache/kylin/engine/spark/job/CubeBuildJob.java | 14 +++++++++-----
.../org/apache/kylin/engine/spark2/NBuildAndQueryTest.java | 4 ++--
2 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index 828425f..124cdab 100644
---
a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++
b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -185,12 +185,16 @@ public class CubeBuildJob extends SparkApplication {
List<CubeSegment> cubeSegments = Lists.newArrayList();
for (Map.Entry<String, Object> entry :
toUpdateSegmentSourceSize.entrySet()) {
CubeSegment segment = cubeCopy.getSegmentById(entry.getKey());
- segment.setInputRecordsSize((Long) entry.getValue());
- segment.setLastBuildTime(System.currentTimeMillis());
- cubeSegments.add(segment);
+ if (segment.getInputRecords() > 0l) {
+ segment.setInputRecordsSize((Long) entry.getValue());
+ segment.setLastBuildTime(System.currentTimeMillis());
+ cubeSegments.add(segment);
+ }
+ }
+ if (!cubeSegments.isEmpty()) {
+ update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
+ cubeManager.updateCube(update);
}
- update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
- cubeManager.updateCube(update);
}
private void build(Collection<NBuildSourceInfo> buildSourceInfos,
SegmentInfo seg, SpanningTree st) {
diff --git
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
index ddc0e2f..c1ca7b2 100644
---
a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
+++
b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java
@@ -225,7 +225,7 @@ public class NBuildAndQueryTest extends
LocalWithSparkSessionTest {
CubeSegment segment1 =
cubeMgr.reloadCube(cubeName).getSegments().get(0);
Assert.assertEquals(0, segment1.getInputRecords());
- Assert.assertEquals(2103495, segment1.getInputRecordsSize());
+ Assert.assertEquals(0, segment1.getInputRecordsSize());
Assert.assertEquals(0, segment1.getSizeKB());
Assert.assertEquals(17, segment1.getCuboidShardNums().size());
}
@@ -250,7 +250,7 @@ public class NBuildAndQueryTest extends
LocalWithSparkSessionTest {
CubeSegment firstSegment =
cubeMgr.reloadCube(cubeName).getSegments().get(0);
if (cubeName.equals("ci_left_join_cube")) {
Assert.assertEquals(10000, firstSegment.getInputRecords());
- Assert.assertEquals(4206990, firstSegment.getInputRecordsSize());
+ Assert.assertEquals(2103495, firstSegment.getInputRecordsSize());
Assert.assertTrue(firstSegment.getSizeKB() > 0);
Assert.assertEquals(17, firstSegment.getCuboidShardNums().size());
Assert.assertEquals(leftJoinCubeCuboidShardNums(),
firstSegment.getCuboidShardNums());