gyang94 opened a new issue, #2967:
URL: https://github.com/apache/fluss/issues/2967

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Fluss version
   
   0.9.0 (latest release)
   
   ### Please describe the bug 🐞
   
   In function `ChangeTypeVectorWriter.writeChangeType`, there is chance to see 
`OutOfBoundException` when a record batch is large. 
   
   > Suppressed: java.io.IOException: One or more Fluss Writer send requests 
have encountered exception
                at 
org.apache.fluss.flink.sink.writer.FlinkSinkWriter.checkAsyncException(FlinkSinkWriter.java:239)
                at 
org.apache.fluss.flink.sink.writer.FlinkSinkWriter.close(FlinkSinkWriter.java:198)
                at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
                at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:234)
                at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:223)
                at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.close(SinkWriterOperator.java:275)
                at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
                at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1410)
                at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
                at 
org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
                at 
org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
                at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1314)
                at 
org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:958)
                at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976)
                at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:958)
                ... 3 more
        Caused by: java.util.concurrent.CompletionException: 
org.apache.fluss.exception.UnknownServerException: 
java.lang.IndexOutOfBoundsException: capacity: 131072, index: 131072, put 
length: 1
        at org.apache.fluss.memory.MemorySegment.put(MemorySegment.java:325)
        at 
org.apache.fluss.record.ChangeTypeVectorWriter.writeChangeType(ChangeTypeVectorWriter.java:44)
        at 
org.apache.fluss.record.MemoryLogRecordsArrowBuilder.append(MemoryLogRecordsArrowBuilder.java:185)
        at 
org.apache.fluss.server.kv.wal.ArrowWalBuilder.append(ArrowWalBuilder.java:48)
        at org.apache.fluss.server.kv.KvTablet.applyUpdate(KvTablet.java:624)
        at org.apache.fluss.server.kv.KvTablet.processUpsert(KvTablet.java:582)
        at 
org.apache.fluss.server.kv.KvTablet.processKvRecords(KvTablet.java:493)
        at 
org.apache.fluss.server.kv.KvTablet.lambda$putAsLeader$0(KvTablet.java:406)
        at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
        at 
org.apache.fluss.utils.concurrent.LockUtils.inWriteLock(LockUtils.java:65)
        at org.apache.fluss.server.kv.KvTablet.putAsLeader(KvTablet.java:367)
        at 
org.apache.fluss.server.replica.Replica.lambda$putRecordsToLeader$9(Replica.java:1012)
        at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
        at 
org.apache.fluss.utils.concurrent.LockUtils.inReadLock(LockUtils.java:55)
        at 
org.apache.fluss.server.replica.Replica.putRecordsToLeader(Replica.java:996)
        at 
org.apache.fluss.server.replica.ReplicaManager.putToLocalKv(ReplicaManager.java:1282)
        at 
org.apache.fluss.server.replica.ReplicaManager.putRecordsToKv(ReplicaManager.java:641)
        at 
org.apache.fluss.server.tablet.TabletService.putKv(TabletService.java:238)
        at jdk.internal.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:568)
        at 
org.apache.fluss.rpc.netty.server.FlussRequestHandler.processRequest(FlussRequestHandler.java:63)
   
   ### Solution
   
   `ChangeTypeVectorWriter` only have one memory segment now. To solve this 
problem, we need to make the `ChangeTypeVectorWriter` support multiple memory 
pages.
   
   And another minor problem is that the boundary check is not good.
   >  if (recordsCount > capacity) {
               throw new IllegalStateException("The change type vector is 
full.");
       }
       segment.put(startPosition + recordsCount, changeType.toByteValue());
    
   we should check `records >= capacity` and expect to see this 
`IllegalStateException`, rather than writing to memory and see a runtime 
`OutOfBoundException`
   
   
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to