gyang94 opened a new issue, #2967: URL: https://github.com/apache/fluss/issues/2967
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Fluss version 0.9.0 (latest release) ### Please describe the bug 🐞 In function `ChangeTypeVectorWriter.writeChangeType`, there is chance to see `OutOfBoundException` when a record batch is large. > Suppressed: java.io.IOException: One or more Fluss Writer send requests have encountered exception at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.checkAsyncException(FlinkSinkWriter.java:239) at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.close(FlinkSinkWriter.java:198) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:234) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:223) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:275) at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1410) at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1314) at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:958) ... 3 more Caused by: java.util.concurrent.CompletionException: org.apache.fluss.exception.UnknownServerException: java.lang.IndexOutOfBoundsException: capacity: 131072, index: 131072, put length: 1 at org.apache.fluss.memory.MemorySegment.put(MemorySegment.java:325) at org.apache.fluss.record.ChangeTypeVectorWriter.writeChangeType(ChangeTypeVectorWriter.java:44) at org.apache.fluss.record.MemoryLogRecordsArrowBuilder.append(MemoryLogRecordsArrowBuilder.java:185) at org.apache.fluss.server.kv.wal.ArrowWalBuilder.append(ArrowWalBuilder.java:48) at org.apache.fluss.server.kv.KvTablet.applyUpdate(KvTablet.java:624) at org.apache.fluss.server.kv.KvTablet.processUpsert(KvTablet.java:582) at org.apache.fluss.server.kv.KvTablet.processKvRecords(KvTablet.java:493) at org.apache.fluss.server.kv.KvTablet.lambda$putAsLeader$0(KvTablet.java:406) at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42) at org.apache.fluss.utils.concurrent.LockUtils.inWriteLock(LockUtils.java:65) at org.apache.fluss.server.kv.KvTablet.putAsLeader(KvTablet.java:367) at org.apache.fluss.server.replica.Replica.lambda$putRecordsToLeader$9(Replica.java:1012) at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42) at org.apache.fluss.utils.concurrent.LockUtils.inReadLock(LockUtils.java:55) at org.apache.fluss.server.replica.Replica.putRecordsToLeader(Replica.java:996) at org.apache.fluss.server.replica.ReplicaManager.putToLocalKv(ReplicaManager.java:1282) at org.apache.fluss.server.replica.ReplicaManager.putRecordsToKv(ReplicaManager.java:641) at org.apache.fluss.server.tablet.TabletService.putKv(TabletService.java:238) at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:568) at org.apache.fluss.rpc.netty.server.FlussRequestHandler.processRequest(FlussRequestHandler.java:63) ### Solution `ChangeTypeVectorWriter` only have one memory segment now. To solve this problem, we need to make the `ChangeTypeVectorWriter` support multiple memory pages. And another minor problem is that the boundary check is not good. > if (recordsCount > capacity) { throw new IllegalStateException("The change type vector is full."); } segment.put(startPosition + recordsCount, changeType.toByteValue()); we should check `records >= capacity` and expect to see this `IllegalStateException`, rather than writing to memory and see a runtime `OutOfBoundException` ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
