kangkaisen commented on a change in pull request #3212: [Alter]Clean SchemaChangeJobV2 when schema change CANCELLED or FINISHED URL: https://github.com/apache/incubator-doris/pull/3212#discussion_r401340789
########## File path: fe/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java ########## @@ -953,4 +969,131 @@ public void readFields(DataInput in) throws IOException { } } } + + /** + * write data need to persist when job finished + */ + private void writeJobFinishedData(DataOutput out) throws IOException { + // only persist data will be used in getInfo + out.writeInt(indexIdMap.size()); + + // shadow index info + out.writeInt(indexIdMap.size()); + for (Entry<Long, Long> entry : indexIdMap.entrySet()) { + long shadowIndexId = entry.getKey(); + out.writeLong(shadowIndexId); + // index id map + out.writeLong(entry.getValue()); + // index name + Text.writeString(out, indexIdToName.get(shadowIndexId)); + // index schema version and hash + out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).first); + out.writeInt(indexSchemaVersionAndHashMap.get(shadowIndexId).second); + } + + // bloom filter + out.writeBoolean(hasBfChange); + if (hasBfChange) { + out.writeInt(bfColumns.size()); + for (String bfCol : bfColumns) { + Text.writeString(out, bfCol); + } + out.writeDouble(bfFpp); + } + + out.writeLong(watershedTxnId); + + // index + out.writeBoolean(indexChange); + if (indexChange) { + if (CollectionUtils.isNotEmpty(indexes)) { + out.writeBoolean(true); + out.writeInt(indexes.size()); + for (Index index : indexes) { + index.write(out); + } + } else { + out.writeBoolean(false); + } + } + } + + /** + * read data need to persist when job finished + */ + private void readJobFinishedData(DataInput in) throws IOException { + // shadow index info + int indexNum = in.readInt(); + for (int i = 0; i < indexNum; i++) { + long shadowIndexId = in.readLong(); + long originIndexId = in.readLong(); + String indexName = Text.readString(in); + int schemaVersion = in.readInt(); + int schemaVersionHash = in.readInt(); + Pair<Integer, Integer> schemaVersionAndHash = Pair.create(schemaVersion, schemaVersionHash); + short shortKeyCount = in.readShort(); + + indexIdMap.put(shadowIndexId, originIndexId); + indexIdToName.put(shadowIndexId, indexName); + indexSchemaVersionAndHashMap.put(shadowIndexId, schemaVersionAndHash); + } + + // bloom filter + hasBfChange = in.readBoolean(); + if (hasBfChange) { + int bfNum = in.readInt(); + bfColumns = Sets.newHashSetWithExpectedSize(bfNum); + for (int i = 0; i < bfNum; i++) { + bfColumns.add(Text.readString(in)); + } + bfFpp = in.readDouble(); + } + + watershedTxnId = in.readLong(); + + // index + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_70) { Review comment: We only call `readJobFinishedData` If `Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_80` is true. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org