xiangfu0 commented on code in PR #13891: URL: https://github.com/apache/pinot/pull/13891#discussion_r1759750780
########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java: ########## @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; +import org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory; +import org.apache.pinot.plugin.stream.kafka30.KafkaPartitionLevelConsumer; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test for low-level Kafka3 consumer. + */ +public class LLCRealtimeKafka3ClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { + private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; + private static final long RANDOM_SEED = System.currentTimeMillis(); + private static final Random RANDOM = new Random(RANDOM_SEED); + + private final boolean _isDirectAlloc = RANDOM.nextBoolean(); + private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); + private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); + private final long _startTime = System.currentTimeMillis(); + + @Override + protected boolean injectTombstones() { + return true; + } + + @Override + protected String getLoadMode() { + return ReadMode.mmap.name(); + } + + @Override + public void startController() + throws Exception { + super.startController(); + enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); + } + + @Override + protected void overrideServerConf(PinotConfiguration configuration) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); + if (_isConsumerDirConfigured) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); + } + } + + @Override + protected void overrideControllerConf(Map<String, Object> properties) { + // Make sure the realtime segment validation manager does not run by itself, only when we invoke it. + properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD, "2h"); + properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, + 3600); + } + + @Override + protected void runValidationJob(long timeoutMs) + throws Exception { + final int partition = ExceptingKafka3ConsumerFactory.PARTITION_FOR_EXCEPTIONS; + if (partition < 0) { + return; + } + int[] seqNumbers = { + ExceptingKafka3ConsumerFactory.SEQ_NUM_FOR_CREATE_EXCEPTION, + ExceptingKafka3ConsumerFactory.SEQ_NUM_FOR_CONSUME_EXCEPTION}; + Arrays.sort(seqNumbers); + for (int seqNum : seqNumbers) { + if (seqNum < 0) { + continue; + } + TestUtils.waitForCondition(() -> isOffline(partition, seqNum), 5000L, timeoutMs, + "Failed to find offline segment in partition " + partition + " seqNum ", true, + Duration.ofMillis(timeoutMs / 10)); + getControllerRequestClient().runPeriodicTask("RealtimeSegmentValidationManager"); + } + } + + private boolean isOffline(int partition, int seqNum) { + ExternalView ev = _helixAdmin.getResourceExternalView(getHelixClusterName(), + TableNameBuilder.REALTIME.tableNameWithType(getTableName())); + + boolean isOffline = false; + for (String segmentNameStr : ev.getPartitionSet()) { + if (LLCSegmentName.isLLCSegment(segmentNameStr)) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + if (segmentName.getSequenceNumber() == seqNum && segmentName.getPartitionGroupId() == partition + && ev.getStateMap(segmentNameStr).values().contains( + CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) { + isOffline = true; + } + } + } + return isOffline; + } + + @Override + protected Map<String, String> getStreamConfigMap() { + Map<String, String> streamConfigMap = super.getStreamConfigMap(); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty( + streamConfigMap.get(StreamConfigProperties.STREAM_TYPE), + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), ExceptingKafka3ConsumerFactory.class.getName()); + ExceptingKafka3ConsumerFactory.init(getHelixClusterName(), _helixAdmin, getTableName()); + return streamConfigMap; + } + @Override Review Comment: Nit: newline ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeKafka3ClusterIntegrationTest.java: ########## @@ -0,0 +1,528 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.io.FileUtils; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.helix.HelixAdmin; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.utils.FileUploadDownloadClient; +import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.plugin.stream.kafka.KafkaMessageBatch; +import org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory; +import org.apache.pinot.plugin.stream.kafka30.KafkaPartitionLevelConsumer; +import org.apache.pinot.spi.config.table.IndexingConfig; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.junit.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + + +/** + * Integration test for low-level Kafka3 consumer. + */ +public class LLCRealtimeKafka3ClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { + private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; + private static final long RANDOM_SEED = System.currentTimeMillis(); + private static final Random RANDOM = new Random(RANDOM_SEED); + + private final boolean _isDirectAlloc = RANDOM.nextBoolean(); + private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); + private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); + private final long _startTime = System.currentTimeMillis(); + + @Override + protected boolean injectTombstones() { + return true; + } + + @Override + protected String getLoadMode() { + return ReadMode.mmap.name(); + } + + @Override + public void startController() + throws Exception { + super.startController(); + enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); + } + + @Override + protected void overrideServerConf(PinotConfiguration configuration) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); + if (_isConsumerDirConfigured) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); + } + } + + @Override + protected void overrideControllerConf(Map<String, Object> properties) { + // Make sure the realtime segment validation manager does not run by itself, only when we invoke it. + properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_FREQUENCY_PERIOD, "2h"); + properties.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, + 3600); + } + + @Override + protected void runValidationJob(long timeoutMs) + throws Exception { + final int partition = ExceptingKafka3ConsumerFactory.PARTITION_FOR_EXCEPTIONS; + if (partition < 0) { + return; + } + int[] seqNumbers = { + ExceptingKafka3ConsumerFactory.SEQ_NUM_FOR_CREATE_EXCEPTION, + ExceptingKafka3ConsumerFactory.SEQ_NUM_FOR_CONSUME_EXCEPTION}; + Arrays.sort(seqNumbers); + for (int seqNum : seqNumbers) { + if (seqNum < 0) { + continue; + } + TestUtils.waitForCondition(() -> isOffline(partition, seqNum), 5000L, timeoutMs, + "Failed to find offline segment in partition " + partition + " seqNum ", true, + Duration.ofMillis(timeoutMs / 10)); + getControllerRequestClient().runPeriodicTask("RealtimeSegmentValidationManager"); + } + } + + private boolean isOffline(int partition, int seqNum) { + ExternalView ev = _helixAdmin.getResourceExternalView(getHelixClusterName(), + TableNameBuilder.REALTIME.tableNameWithType(getTableName())); + + boolean isOffline = false; + for (String segmentNameStr : ev.getPartitionSet()) { + if (LLCSegmentName.isLLCSegment(segmentNameStr)) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + if (segmentName.getSequenceNumber() == seqNum && segmentName.getPartitionGroupId() == partition + && ev.getStateMap(segmentNameStr).values().contains( + CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE)) { + isOffline = true; + } + } + } + return isOffline; + } + + @Override + protected Map<String, String> getStreamConfigMap() { + Map<String, String> streamConfigMap = super.getStreamConfigMap(); + streamConfigMap.put(StreamConfigProperties.constructStreamProperty( + streamConfigMap.get(StreamConfigProperties.STREAM_TYPE), + StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), ExceptingKafka3ConsumerFactory.class.getName()); + ExceptingKafka3ConsumerFactory.init(getHelixClusterName(), _helixAdmin, getTableName()); + return streamConfigMap; + } + @Override + protected IngestionConfig getIngestionConfig() { + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(Collections.singletonList(getStreamConfigMap()))); + return ingestionConfig; + } + + @Override + protected Map<String, String> getStreamConfigs() { + return null; + } + + @Override + protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig) + throws Exception { + if (!_tarDir.exists()) { + _tarDir.mkdir(); + } + if (!_segmentDir.exists()) { + _segmentDir.mkdir(); + } + + // create segments out of the avro files (segments will be placed in _tarDir) + List<File> copyOfAvroFiles = new ArrayList<>(avroFiles); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + + // upload segments to controller + uploadSegmentsToController(getTableName(), _tarDir, false, false); + + // upload the first segment again to verify refresh + uploadSegmentsToController(getTableName(), _tarDir, true, false); + + // upload the first segment again to verify refresh with different segment crc + uploadSegmentsToController(getTableName(), _tarDir, true, true); + + // add avro files to the original list so H2 will have the uploaded data as well + avroFiles.addAll(copyOfAvroFiles); + } + + private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc) + throws Exception { + File[] segmentTarFiles = tarDir.listFiles(); + assertNotNull(segmentTarFiles); + int numSegments = segmentTarFiles.length; + assertTrue(numSegments > 0); + if (onlyFirstSegment) { + numSegments = 1; + } + URI uploadSegmentHttpURI = URI.create(getControllerRequestURLBuilder().forSegmentUpload()); + try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { + if (numSegments == 1) { + File segmentTarFile = segmentTarFiles[0]; + if (changeCrc) { + changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString()); + } + assertEquals( + fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, + tableName, TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); + } else { + // Upload segments in parallel + ExecutorService executorService = Executors.newFixedThreadPool(numSegments); + List<Future<Integer>> futures = new ArrayList<>(numSegments); + for (File segmentTarFile : segmentTarFiles) { + futures.add(executorService.submit( + () -> fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), + segmentTarFile, tableName, TableType.REALTIME).getStatusCode())); + } + executorService.shutdown(); + for (Future<Integer> future : futures) { + assertEquals((int) future.get(), HttpStatus.SC_OK); + } + } + } + } + + private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) { + int startIdx = segmentFilePath.indexOf("mytable_"); + int endIdx = segmentFilePath.indexOf(".tar.gz"); + String segmentName = segmentFilePath.substring(startIdx, endIdx); + String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName); + SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName); + segmentZKMetadata.setCrc(111L); + _helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata); + } + + @Override + protected long getCountStarResult() { + // all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint + return super.getCountStarResult() * 2; + } + + @BeforeClass + @Override + public void setUp() + throws Exception { + System.out.println(String.format( + "Using random seed: %s, isDirectAlloc: %s, isConsumerDirConfigured: %s, enableLeadControllerResource: %s", + RANDOM_SEED, _isDirectAlloc, _isConsumerDirConfigured, _enableLeadControllerResource)); Review Comment: Are those configs per server basis or table basis? If possible we can create 8 tables to cover the situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org