leekeiabstraction commented on code in PR #2976:
URL: https://github.com/apache/fluss/pull/2976#discussion_r3030527191
##########
fluss-rpc/src/main/proto/FlussApi.proto:
##########
@@ -300,6 +300,43 @@ message LimitScanResponse{
}
+// Full KV scan request and response.
+// A new scan is initiated with bucket_scan_req; subsequent batches use
scanner_id.
+message PbScanReqForBucket {
+ required int64 table_id = 1;
+ optional int64 partition_id = 2;
+ required int32 bucket_id = 3;
+ // If set, stops returning rows after this many records.
+ optional int64 limit = 4;
+}
+
+message ScanKvRequest {
+ // Mutually exclusive: either scanner_id (continuation) or bucket_scan_req
(new scan).
+ optional bytes scanner_id = 1;
+ optional PbScanReqForBucket bucket_scan_req = 2;
+ // Monotonically increasing sequence number for in-order delivery validation.
+ optional int32 call_seq_id = 3;
+ // Maximum number of bytes of record data to return in this batch.
+ optional int32 batch_size_bytes = 4;
+ // If true, the server closes the scanner session immediately.
+ optional bool close_scanner = 5;
Review Comment:
We can probably split close out as eg closeScanRequest out as a request on
its own so that this API is single responsibility.
##########
fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java:
##########
@@ -702,35 +702,46 @@ void testSchemaEvolution() throws Exception {
FLUSS_CLUSTER_EXTENSION.stopCoordinatorServer();
FLUSS_CLUSTER_EXTENSION.startCoordinatorServer();
- // check metadata response
- MetadataResponse metadataResponse =
-
gateway.metadata(newMetadataRequest(Collections.singletonList(tablePath))).get();
- assertThat(metadataResponse.getTableMetadatasCount()).isEqualTo(1);
- PbTableMetadata pbTableMetadata =
metadataResponse.getTableMetadataAt(0);
- assertThat(pbTableMetadata.getSchemaId()).isEqualTo(2);
- TableInfo tableInfo =
- TableInfo.of(
- tablePath,
- pbTableMetadata.getTableId(),
- pbTableMetadata.getSchemaId(),
-
TableDescriptor.fromJsonBytes(pbTableMetadata.getTableJson()),
- pbTableMetadata.getRemoteDataDir(),
- pbTableMetadata.getCreatedTime(),
- pbTableMetadata.getModifiedTime());
- List<Schema.Column> columns = tableInfo.getSchema().getColumns();
- assertThat(columns.size()).isEqualTo(4);
- assertThat(tableInfo.getSchema().getColumnIds()).containsExactly(0, 1,
2, 5);
- // check nested row's field_id.
- assertThat(columns.get(2).getName()).isEqualTo("new_nested_column");
- assertThat(
- DataTypeChecks.equalsWithFieldId(
- columns.get(2).getDataType(),
- new RowType(
- true,
- Arrays.asList(
- DataTypes.FIELD("f0",
DataTypes.STRING(), 3),
- DataTypes.FIELD("f1",
DataTypes.INT(), 4)))))
- .isTrue();
+ // check metadata response - retry since coordinator needs time to
fully load its state
+ // after restart (waitUntilCoordinatorServerElected only waits for ZK
election, not for
+ // the full metadata/schema load to complete)
+ retry(
Review Comment:
Why are we updating schema evolution test case in this PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]