imay commented on a change in pull request #2847: Doris support in memory olap table URL: https://github.com/apache/incubator-doris/pull/2847#discussion_r380131087
########## File path: fe/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java ########## @@ -18,50 +18,134 @@ package org.apache.doris.task; import java.util.List; -import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.tuple.Triple; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.TabletMeta; +import org.apache.doris.common.MarkedCountDownLatch; +import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; +import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TTabletMetaInfo; +import org.apache.doris.thrift.TTabletMetaType; import org.apache.doris.thrift.TTaskType; import org.apache.doris.thrift.TUpdateTabletMetaInfoReq; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.google.common.collect.Lists; -import com.google.common.collect.SetMultimap; public class UpdateTabletMetaInfoTask extends AgentTask { - private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class); + private static final Logger LOG = LogManager.getLogger(UpdateTabletMetaInfoTask.class); - private SetMultimap<Long, Integer> tabletWithoutPartitionId; + // used for synchronous process + private MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch; - public UpdateTabletMetaInfoTask(long backendId, SetMultimap<Long, Integer> tabletWithoutPartitionId) { - super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, -1L, -1L, -1L, -1L, -1L, backendId); - this.tabletWithoutPartitionId = tabletWithoutPartitionId; + private Set<Pair<Long, Integer>> tableIdWithSchemaHash; + private boolean isInMemory; + private TTabletMetaType metaType; + + // <tablet id, tablet schema hash, tablet in memory> + private List<Triple<Long, Integer, Boolean>> tabletToInMemory; + + public UpdateTabletMetaInfoTask(long backendId, Set<Pair<Long, Integer>> tableIdWithSchemaHash, + TTabletMetaType metaType) { + super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, + -1L, -1L, -1L, -1L, -1L, tableIdWithSchemaHash.hashCode()); + this.tableIdWithSchemaHash = tableIdWithSchemaHash; + this.metaType = metaType; + } + + public UpdateTabletMetaInfoTask(long backendId, + Set<Pair<Long, Integer>> tableIdWithSchemaHash, + boolean isInMemory, + MarkedCountDownLatch<Long, Set<Pair<Long, Integer>>> latch) { + this(backendId, tableIdWithSchemaHash, TTabletMetaType.INMEMORY); + this.isInMemory = isInMemory; + this.latch = latch; + } + + public UpdateTabletMetaInfoTask(long backendId, + List<Triple<Long, Integer, Boolean>> tabletToInMemory) { + super(null, backendId, TTaskType.UPDATE_TABLET_META_INFO, + -1L, -1L, -1L, -1L, -1L, tabletToInMemory.hashCode()); + this.metaType = TTabletMetaType.INMEMORY; + this.tabletToInMemory = tabletToInMemory; + } + + public void countDownLatch(long backendId, Set<Pair<Long, Integer>> tablets) { + if (this.latch != null) { + if (latch.markedCountDown(backendId, tablets)) { + LOG.debug("UpdateTabletMetaInfoTask current latch count: {}, backend: {}, tablets:{}", + latch.getCount(), backendId, tablets); + } + } + } + + // call this always means one of tasks is failed. count down to zero to finish entire task + public void countDownToZero(String errMsg) { + if (this.latch != null) { + latch.countDownToZero(new Status(TStatusCode.CANCELLED, errMsg)); + LOG.debug("UpdateTabletMetaInfoTask count down to zero. error msg: {}", errMsg); + } + } + + public Set<Pair<Long, Integer>> getTablets() { + return tableIdWithSchemaHash; } public TUpdateTabletMetaInfoReq toThrift() { TUpdateTabletMetaInfoReq updateTabletMetaInfoReq = new TUpdateTabletMetaInfoReq(); List<TTabletMetaInfo> metaInfos = Lists.newArrayList(); - int tabletEntryNum = 0; - for (Map.Entry<Long, Integer> entry : tabletWithoutPartitionId.entries()) { - // add at most 10000 tablet meta during one sync to avoid too large task - if (tabletEntryNum > 10000) { + switch (metaType) { + case PARTITIONID: { + int tabletEntryNum = 0; + for (Pair<Long, Integer> pair : tableIdWithSchemaHash) { + // add at most 10000 tablet meta during one sync to avoid too large task + if (tabletEntryNum > 10000) { + break; + } + TTabletMetaInfo metaInfo = new TTabletMetaInfo(); + metaInfo.setTablet_id(pair.first); + metaInfo.setSchema_hash(pair.second); + TabletMeta tabletMeta = Catalog.getInstance().getTabletInvertedIndex().getTabletMeta(pair.first); + if (tabletMeta == null) { + LOG.warn("could not find tablet [{}] in meta ignore it", pair.second); + continue; + } + metaInfo.setPartition_id(tabletMeta.getPartitionId()); + metaInfo.setMeta_type(metaType); + metaInfos.add(metaInfo); + ++tabletEntryNum; + } break; } - TTabletMetaInfo metaInfo = new TTabletMetaInfo(); - metaInfo.setTablet_id(entry.getKey()); - metaInfo.setSchema_hash(entry.getValue()); - TabletMeta tabletMeta = Catalog.getInstance().getTabletInvertedIndex().getTabletMeta(entry.getKey()); - if (tabletMeta == null) { - LOG.warn("could not find tablet [{}] in meta ignore it", entry.getKey()); - continue; + case INMEMORY: { + if (latch != null) { + // for schema change + for (Pair<Long, Integer> pair: tableIdWithSchemaHash) { + TTabletMetaInfo metaInfo = new TTabletMetaInfo(); + metaInfo.setTablet_id(pair.first); + metaInfo.setSchema_hash(pair.second); + metaInfo.setIs_in_memory(isInMemory); + metaInfo.setMeta_type(metaType); + metaInfos.add(metaInfo); + } + } else { + // for ReportHandler + for(Triple<Long, Integer, Boolean> triple: tabletToInMemory) { Review comment: ```suggestion for (Triple<Long, Integer, Boolean> triple: tabletToInMemory) { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org