Copilot commented on code in PR #17169:
URL: https://github.com/apache/pinot/pull/17169#discussion_r2509621263
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -576,6 +648,44 @@ private StreamingOutput sendRequestToBroker(String query,
String instanceId, Str
return sendRequestRaw(url, "POST", query, requestJson, headers);
}
+ private StreamingOutput aggregateQueryLogResponses(List<InstanceConfig>
brokerInstanceConfigs,
+ ObjectNode requestJson, Map<String, String> headers) {
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ String payload = requestJson.toString();
+ return outputStream -> {
+ long startTime = System.currentTimeMillis();
+ List<BrokerResponseNative> responses = new
ArrayList<>(brokerInstanceConfigs.size());
+ List<QueryProcessingException> fetchExceptions = new ArrayList<>();
+ for (InstanceConfig brokerConfig : brokerInstanceConfigs) {
+ String hostName = getHost(brokerConfig);
+ int port = getPort(brokerConfig);
+ String url = getQueryURL(protocol, hostName, port);
+ try {
+ responses.add(fetchBrokerQueryLog(url, payload, headers));
+ } catch (Exception e) {
+ fetchExceptions.add(new
QueryProcessingException(QueryErrorCode.BROKER_REQUEST_SEND,
+ String.format("Failed to fetch query log from broker %s: %s",
brokerConfig.getInstanceName(),
+ e.getMessage())));
+ }
+ }
+ BrokerResponseNative aggregated =
QueryLogResponseAggregator.aggregate(responses);
+ List<QueryProcessingException> allExceptions = new
ArrayList<>(aggregated.getExceptions());
+ allExceptions.addAll(fetchExceptions);
+ aggregated.setExceptions(allExceptions);
+ aggregated.setNumServersQueried(brokerInstanceConfigs.size());
+ aggregated.setTimeUsedMs(System.currentTimeMillis() - startTime);
+ aggregated.setBrokerId("controller");
+ aggregated.toOutputStream(outputStream);
+ };
+ }
+
+ private BrokerResponseNative fetchBrokerQueryLog(String url, String payload,
Map<String, String> headers)
+ throws Exception {
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ sendRequestRaw(url, "POST", payload, headers, outputStream);
+ return
JsonUtils.stringToObject(outputStream.toString(StandardCharsets.UTF_8),
BrokerResponseNative.class);
+ }
Review Comment:
The method declares `throws Exception` which is overly broad. Consider
declaring specific exceptions like `IOException` or creating a custom exception
type for better error handling and clarity about what failures can occur.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
+ Comparator<QueryLogRecord> comparator = orderings.isEmpty()
+ ?
Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed()
+ : buildComparator(orderings);
+ rows.sort(comparator);
+
+ int fromIndex = Math.min(offset, rows.size());
+ int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex +
limit);
+ List<QueryLogRecord> window = rows.subList(fromIndex, toIndex);
+
+ String[] columnNames = selections.stream().map(selection ->
selection._outputName).toArray(String[]::new);
+ DataSchema.ColumnDataType[] columnDataTypes = selections.stream()
+ .map(selection ->
selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new);
+
+ List<Object[]> tableRows = new ArrayList<>(window.size());
+ for (QueryLogRecord record : window) {
+ Object[] row = new Object[selections.size()];
+ for (int i = 0; i < selections.size(); i++) {
+ row[i] = selections.get(i)._column.extract(record);
+ }
+ tableRows.add(row);
+ }
+
+ ResultTable resultTable = new ResultTable(new DataSchema(columnNames,
columnDataTypes), tableRows);
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setResultTable(resultTable);
+ response.setNumRowsResultSet(resultTable.getRows().size());
+
response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME));
+ response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs);
+ return response;
+ }
+
+ private List<QueryLogRecord> getFilteredRecords(@Nullable
Predicate<QueryLogRecord> predicate)
+ throws IOException {
+ if (_store == null) {
+ return Collections.emptyList();
+ }
+ _lock.readLock().lock();
+ try {
+ List<QueryLogRecord> records = _store.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+ long cutoff = _retentionMs > 0 ? System.currentTimeMillis() -
_retentionMs : Long.MIN_VALUE;
+ List<QueryLogRecord> filtered = new ArrayList<>(records.size());
+ for (QueryLogRecord record : records) {
+ if (record.getLogTimestampMs() < cutoff) {
+ continue;
+ }
+ filtered.add(record);
+ }
+ if (filtered.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (_maxEntries > 0 && filtered.size() > _maxEntries) {
+ filtered = new ArrayList<>(filtered.subList(filtered.size() -
_maxEntries, filtered.size()));
+ } else {
+ filtered = new ArrayList<>(filtered);
+ }
+ if (predicate != null) {
+ filtered.removeIf(record -> !predicate.test(record));
+ }
+ return filtered;
+ } finally {
+ _lock.readLock().unlock();
+ }
+ }
+
+ private interface QueryLogStore extends AutoCloseable {
+ void append(QueryLogRecord record)
+ throws IOException;
+
+ List<QueryLogRecord> getRecords()
+ throws IOException;
+
+ @Override
+ void close()
+ throws IOException;
+ }
+
+ private static final class InMemoryQueryLogStore implements QueryLogStore {
+ private final Deque<QueryLogRecord> _records = new ArrayDeque<>();
+ private final int _maxEntries;
+ private final long _retentionMs;
+
+ InMemoryQueryLogStore(int maxEntries, long retentionMs) {
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ }
+
+ @Override
+ public void append(QueryLogRecord record) {
+ if (_retentionMs > 0) {
+ long cutoff = record.getLogTimestampMs() - _retentionMs;
+ while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs()
< cutoff) {
+ _records.removeFirst();
+ }
+ }
+ _records.addLast(record);
+ while (_records.size() > Math.max(1, _maxEntries)) {
+ _records.removeFirst();
+ }
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords() {
+ return new ArrayList<>(_records);
+ }
+
+ @Override
+ public void close() {
+ _records.clear();
+ }
+ }
+
+ private static final class DiskBackedQueryLogStore implements QueryLogStore {
+ private static final String SEGMENT_PREFIX = "segment_";
+
+ private final Path _directory;
+ private final long _maxBytes;
+ private final long _segmentBytes;
+ private final int _maxEntries;
+ private final long _retentionMs;
+ private final List<Path> _segments = new ArrayList<>();
+ private final List<Integer> _segmentRecordCounts = new ArrayList<>();
+ private DataOutputStream _currentOutput;
+ private Path _currentSegment;
+ private long _currentSegmentBytes;
+ private long _totalBytes;
+ private long _totalRecords;
+ private int _segmentCounter;
+
+ DiskBackedQueryLogStore(Path directory, long maxBytes, long segmentBytes,
int maxEntries, long retentionMs)
+ throws IOException {
+ _directory = directory;
+ _maxBytes = Math.max(1L, maxBytes);
+ _segmentBytes = Math.max(1L, segmentBytes);
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ Files.createDirectories(_directory);
+ try (Stream<Path> stream = Files.list(_directory)) {
+ stream.filter(path -> Files.isRegularFile(path) &&
path.getFileName().toString().startsWith(SEGMENT_PREFIX))
+ .sorted()
+ .forEach(path -> {
+ _segments.add(path);
+ _segmentRecordCounts.add(estimateRecordCount(path));
+ try {
+ _totalBytes += Files.size(path);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to compute size for query log segment {}",
path, e);
+ }
+ });
+ }
+ _totalRecords = _segmentRecordCounts.stream().mapToLong(i -> i).sum();
+ _segmentCounter = _segments.size();
+ pruneByTimeIfNeeded(); // Ensure old segments are cleaned up on startup
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ openNewSegment();
+ }
+
+ @Override
+ public void append(QueryLogRecord record)
+ throws IOException {
+ byte[] payload = QueryLogRecordSerDe.serialize(record);
+ int entrySize = Integer.BYTES + payload.length;
+ // Rotate segment if either full by size, or if retention requires
time-roll
+ if (_currentSegmentBytes > 0 &&
shouldRotateForTime(record.getLogTimestampMs())) {
+ openNewSegment();
+ }
+ if (_currentSegmentBytes + entrySize > _segmentBytes) {
+ openNewSegment();
+ }
+ _currentOutput.writeInt(payload.length);
+ _currentOutput.write(payload);
+ _currentOutput.flush();
+ _currentSegmentBytes += entrySize;
+ _totalBytes += entrySize;
+ // Update counts
+ if (!_segmentRecordCounts.isEmpty()) {
+ int lastIdx = _segmentRecordCounts.size() - 1;
+ _segmentRecordCounts.set(lastIdx, _segmentRecordCounts.get(lastIdx) +
1);
+ }
+ _totalRecords += 1;
+ // Apply pruning policies
+ pruneByTimeIfNeeded();
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords()
+ throws IOException {
+ if (_segments.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<QueryLogRecord> records = new ArrayList<>();
+ for (Path segment : _segments) {
+ records.addAll(readSegment(segment));
+ }
+ return records;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ closeCurrentOutput();
+ _segments.clear();
+ _segmentRecordCounts.clear();
+ _totalBytes = 0;
+ _currentSegmentBytes = 0;
+ _totalRecords = 0;
+ }
+
+ private void openNewSegment()
+ throws IOException {
+ closeCurrentOutput();
+ _currentSegment = _directory.resolve(String.format("%s%d_%d.log",
SEGMENT_PREFIX, System.currentTimeMillis(),
+ _segmentCounter++));
+ Files.createFile(_currentSegment);
+ _segments.add(_currentSegment);
+ _segmentRecordCounts.add(0);
+ _currentSegmentBytes = 0;
+ _currentOutput = new DataOutputStream(new BufferedOutputStream(
+ Files.newOutputStream(_currentSegment, StandardOpenOption.APPEND)));
+ }
+
+ private void closeCurrentOutput()
+ throws IOException {
+ if (_currentOutput != null) {
+ _currentOutput.flush();
+ _currentOutput.close();
+ _currentOutput = null;
+ }
+ }
+
+ private void pruneBySizeIfNeeded()
+ throws IOException {
+ while (_totalBytes > _maxBytes && _segments.size() > 1) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ if (_totalBytes > _maxBytes && _segments.size() == 1) {
+ LOGGER.warn("Query log disk storage exceeds maxBytes {} but cannot
delete active segment", _maxBytes);
+ }
+ }
+
+ private void pruneByRowCountIfNeeded()
+ throws IOException {
+ if (_maxEntries <= 0) {
+ return;
+ }
+ while (_totalRecords > _maxEntries && _segments.size() > 1) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ if (_totalRecords > _maxEntries && _segments.size() == 1) {
+ LOGGER.warn("Query log disk storage exceeds maxEntries {} but cannot
delete within active segment",
+ _maxEntries);
+ }
+ }
+
+ private void pruneByTimeIfNeeded()
+ throws IOException {
+ if (_retentionMs <= 0) {
+ return;
+ }
+ long cutoff = System.currentTimeMillis() - _retentionMs;
+ while (_segments.size() > 1 && segmentCreationTime(_segments.get(0)) <
cutoff) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ // do not delete the only active segment even if older than cutoff; it
will rotate on next append if needed
+ }
+
+ private boolean shouldRotateForTime(long recordTimestampMs) {
+ if (_retentionMs <= 0) {
+ return false;
+ }
+ long cutoff = recordTimestampMs - _retentionMs;
+ return segmentCreationTime(_currentSegment) < cutoff;
+ }
+
+ private long segmentCreationTime(Path segment) {
+ // segment filename: segment_<timestamp>_<counter>.log
+ String name = segment.getFileName().toString();
+ try {
+ int underscore = name.indexOf('_');
+ int secondUnderscore = name.indexOf('_', underscore + 1);
+ String ts = name.substring(underscore + 1, secondUnderscore);
+ return Long.parseLong(ts);
+ } catch (Exception e) {
+ // fallback to file creation time if parse fails
+ return 0L;
+ }
+ }
Review Comment:
The filename parsing logic is fragile and silently returns 0L on any parsing
failure. Consider using a regex pattern to parse the filename format
`segment_<timestamp>_<counter>.log` for more robust validation, or document why
0L is an appropriate fallback value for segment creation time.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
+ Comparator<QueryLogRecord> comparator = orderings.isEmpty()
+ ?
Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed()
+ : buildComparator(orderings);
+ rows.sort(comparator);
+
+ int fromIndex = Math.min(offset, rows.size());
+ int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex +
limit);
+ List<QueryLogRecord> window = rows.subList(fromIndex, toIndex);
+
+ String[] columnNames = selections.stream().map(selection ->
selection._outputName).toArray(String[]::new);
+ DataSchema.ColumnDataType[] columnDataTypes = selections.stream()
+ .map(selection ->
selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new);
+
+ List<Object[]> tableRows = new ArrayList<>(window.size());
+ for (QueryLogRecord record : window) {
+ Object[] row = new Object[selections.size()];
+ for (int i = 0; i < selections.size(); i++) {
+ row[i] = selections.get(i)._column.extract(record);
+ }
+ tableRows.add(row);
+ }
+
+ ResultTable resultTable = new ResultTable(new DataSchema(columnNames,
columnDataTypes), tableRows);
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setResultTable(resultTable);
+ response.setNumRowsResultSet(resultTable.getRows().size());
+
response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME));
+ response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs);
+ return response;
+ }
+
+ private List<QueryLogRecord> getFilteredRecords(@Nullable
Predicate<QueryLogRecord> predicate)
+ throws IOException {
+ if (_store == null) {
+ return Collections.emptyList();
+ }
+ _lock.readLock().lock();
+ try {
+ List<QueryLogRecord> records = _store.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+ long cutoff = _retentionMs > 0 ? System.currentTimeMillis() -
_retentionMs : Long.MIN_VALUE;
+ List<QueryLogRecord> filtered = new ArrayList<>(records.size());
+ for (QueryLogRecord record : records) {
+ if (record.getLogTimestampMs() < cutoff) {
+ continue;
+ }
+ filtered.add(record);
+ }
+ if (filtered.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (_maxEntries > 0 && filtered.size() > _maxEntries) {
+ filtered = new ArrayList<>(filtered.subList(filtered.size() -
_maxEntries, filtered.size()));
+ } else {
+ filtered = new ArrayList<>(filtered);
+ }
+ if (predicate != null) {
+ filtered.removeIf(record -> !predicate.test(record));
+ }
+ return filtered;
+ } finally {
+ _lock.readLock().unlock();
+ }
+ }
+
+ private interface QueryLogStore extends AutoCloseable {
+ void append(QueryLogRecord record)
+ throws IOException;
+
+ List<QueryLogRecord> getRecords()
+ throws IOException;
+
+ @Override
+ void close()
+ throws IOException;
+ }
+
+ private static final class InMemoryQueryLogStore implements QueryLogStore {
+ private final Deque<QueryLogRecord> _records = new ArrayDeque<>();
+ private final int _maxEntries;
+ private final long _retentionMs;
+
+ InMemoryQueryLogStore(int maxEntries, long retentionMs) {
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ }
+
+ @Override
+ public void append(QueryLogRecord record) {
+ if (_retentionMs > 0) {
+ long cutoff = record.getLogTimestampMs() - _retentionMs;
+ while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs()
< cutoff) {
+ _records.removeFirst();
+ }
+ }
+ _records.addLast(record);
+ while (_records.size() > Math.max(1, _maxEntries)) {
+ _records.removeFirst();
+ }
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords() {
+ return new ArrayList<>(_records);
+ }
+
+ @Override
+ public void close() {
+ _records.clear();
+ }
+ }
+
+ private static final class DiskBackedQueryLogStore implements QueryLogStore {
+ private static final String SEGMENT_PREFIX = "segment_";
+
+ private final Path _directory;
+ private final long _maxBytes;
+ private final long _segmentBytes;
+ private final int _maxEntries;
+ private final long _retentionMs;
+ private final List<Path> _segments = new ArrayList<>();
+ private final List<Integer> _segmentRecordCounts = new ArrayList<>();
+ private DataOutputStream _currentOutput;
+ private Path _currentSegment;
+ private long _currentSegmentBytes;
+ private long _totalBytes;
+ private long _totalRecords;
+ private int _segmentCounter;
+
+ DiskBackedQueryLogStore(Path directory, long maxBytes, long segmentBytes,
int maxEntries, long retentionMs)
+ throws IOException {
+ _directory = directory;
+ _maxBytes = Math.max(1L, maxBytes);
+ _segmentBytes = Math.max(1L, segmentBytes);
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ Files.createDirectories(_directory);
+ try (Stream<Path> stream = Files.list(_directory)) {
+ stream.filter(path -> Files.isRegularFile(path) &&
path.getFileName().toString().startsWith(SEGMENT_PREFIX))
+ .sorted()
+ .forEach(path -> {
+ _segments.add(path);
+ _segmentRecordCounts.add(estimateRecordCount(path));
+ try {
+ _totalBytes += Files.size(path);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to compute size for query log segment {}",
path, e);
+ }
+ });
+ }
+ _totalRecords = _segmentRecordCounts.stream().mapToLong(i -> i).sum();
+ _segmentCounter = _segments.size();
+ pruneByTimeIfNeeded(); // Ensure old segments are cleaned up on startup
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ openNewSegment();
+ }
+
+ @Override
+ public void append(QueryLogRecord record)
+ throws IOException {
+ byte[] payload = QueryLogRecordSerDe.serialize(record);
+ int entrySize = Integer.BYTES + payload.length;
+ // Rotate segment if either full by size, or if retention requires
time-roll
+ if (_currentSegmentBytes > 0 &&
shouldRotateForTime(record.getLogTimestampMs())) {
+ openNewSegment();
+ }
+ if (_currentSegmentBytes + entrySize > _segmentBytes) {
+ openNewSegment();
+ }
+ _currentOutput.writeInt(payload.length);
+ _currentOutput.write(payload);
+ _currentOutput.flush();
+ _currentSegmentBytes += entrySize;
+ _totalBytes += entrySize;
+ // Update counts
+ if (!_segmentRecordCounts.isEmpty()) {
+ int lastIdx = _segmentRecordCounts.size() - 1;
+ _segmentRecordCounts.set(lastIdx, _segmentRecordCounts.get(lastIdx) +
1);
+ }
+ _totalRecords += 1;
+ // Apply pruning policies
+ pruneByTimeIfNeeded();
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords()
+ throws IOException {
+ if (_segments.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<QueryLogRecord> records = new ArrayList<>();
+ for (Path segment : _segments) {
+ records.addAll(readSegment(segment));
+ }
+ return records;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ closeCurrentOutput();
+ _segments.clear();
+ _segmentRecordCounts.clear();
+ _totalBytes = 0;
+ _currentSegmentBytes = 0;
+ _totalRecords = 0;
+ }
+
+ private void openNewSegment()
+ throws IOException {
+ closeCurrentOutput();
+ _currentSegment = _directory.resolve(String.format("%s%d_%d.log",
SEGMENT_PREFIX, System.currentTimeMillis(),
+ _segmentCounter++));
Review Comment:
`_segmentCounter` is incremented without synchronization in a method that
may be called concurrently. While the write lock protects `append()`, ensure
that all paths modifying `_segmentCounter` are properly synchronized to avoid
race conditions.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogRecordSerDe.java:
##########
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+
+/**
+ * Binary serializer/deserializer for {@link QueryLogRecord}. Records are
compressed individually so they can be
+ * appended to arbitrary storage backends.
+ */
+final class QueryLogRecordSerDe {
+ private QueryLogRecordSerDe() {
+ }
+
+ static byte[] serialize(QueryLogRecord record)
+ throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (GZIPOutputStream gzip = new GZIPOutputStream(baos);
+ DataOutputStream out = new DataOutputStream(gzip)) {
+ out.writeLong(record.getLogTimestampMs());
+ out.writeLong(record.getRequestId());
+ writeString(out, record.getTableName());
+ writeString(out, record.getBrokerId());
+ writeString(out, record.getClientIp());
+ writeString(out, record.getQuery());
+ writeString(out, record.getQueryEngine());
+ out.writeLong(record.getRequestArrivalTimeMs());
+ out.writeLong(record.getTimeMs());
+ out.writeLong(record.getBrokerReduceTimeMs());
+ out.writeLong(record.getNumDocsScanned());
+ out.writeLong(record.getTotalDocs());
+ out.writeLong(record.getNumEntriesScannedInFilter());
+ out.writeLong(record.getNumEntriesScannedPostFilter());
+ out.writeLong(record.getNumSegmentsQueried());
+ out.writeLong(record.getNumSegmentsProcessed());
+ out.writeLong(record.getNumSegmentsMatched());
+ out.writeLong(record.getNumConsumingSegmentsQueried());
+ out.writeLong(record.getNumConsumingSegmentsProcessed());
+ out.writeLong(record.getNumConsumingSegmentsMatched());
+ out.writeLong(record.getNumUnavailableSegments());
+ out.writeLong(record.getMinConsumingFreshnessTimeMs());
+ out.writeInt(record.getNumServersResponded());
+ out.writeInt(record.getNumServersQueried());
+ out.writeBoolean(record.isGroupsTrimmed());
+ out.writeBoolean(record.isGroupLimitReached());
+ out.writeBoolean(record.isGroupWarningLimitReached());
+ out.writeLong(record.getNumExceptions());
+ writeString(out, record.getExceptions());
+ writeString(out, record.getServerStats());
+ out.writeLong(record.getOfflineTotalCpuTimeNs());
+ out.writeLong(record.getOfflineThreadCpuTimeNs());
+ out.writeLong(record.getOfflineSystemActivitiesCpuTimeNs());
+ out.writeLong(record.getOfflineResponseSerializationCpuTimeNs());
+ out.writeLong(record.getRealtimeTotalCpuTimeNs());
+ out.writeLong(record.getRealtimeThreadCpuTimeNs());
+ out.writeLong(record.getRealtimeSystemActivitiesCpuTimeNs());
+ out.writeLong(record.getRealtimeResponseSerializationCpuTimeNs());
+ out.writeLong(record.getOfflineTotalMemAllocatedBytes());
+ out.writeLong(record.getOfflineThreadMemAllocatedBytes());
+ out.writeLong(record.getOfflineResponseSerMemAllocatedBytes());
+ out.writeLong(record.getRealtimeTotalMemAllocatedBytes());
+ out.writeLong(record.getRealtimeThreadMemAllocatedBytes());
+ out.writeLong(record.getRealtimeResponseSerMemAllocatedBytes());
+ writeIntArray(out, record.getPools());
+ out.writeBoolean(record.isPartialResult());
+ out.writeBoolean(record.isRlsFiltersApplied());
+ out.writeInt(record.getNumRowsResultSet());
+ writeStringArray(out, record.getTablesQueried());
+ writeString(out, record.getTraceInfoJson());
+ writeString(out, record.getFanoutType());
+ writeString(out, record.getOfflineServerTenant());
+ writeString(out, record.getRealtimeServerTenant());
+ }
+ return baos.toByteArray();
+ }
+
+ static QueryLogRecord deserialize(byte[] payload)
+ throws IOException {
+ try (GZIPInputStream gzip = new GZIPInputStream(new
ByteArrayInputStream(payload));
+ DataInputStream in = new DataInputStream(gzip)) {
+ long logTimestampMs = in.readLong();
+ long requestId = in.readLong();
+ String tableName = readString(in);
+ String brokerId = readString(in);
+ String clientIp = readString(in);
+ String query = readString(in);
+ String queryEngine = readString(in);
+ long requestArrivalTimeMs = in.readLong();
+ long timeMs = in.readLong();
+ long brokerReduceTimeMs = in.readLong();
+ long numDocsScanned = in.readLong();
+ long totalDocs = in.readLong();
+ long numEntriesScannedInFilter = in.readLong();
+ long numEntriesScannedPostFilter = in.readLong();
+ long numSegmentsQueried = in.readLong();
+ long numSegmentsProcessed = in.readLong();
+ long numSegmentsMatched = in.readLong();
+ long numConsumingSegmentsQueried = in.readLong();
+ long numConsumingSegmentsProcessed = in.readLong();
+ long numConsumingSegmentsMatched = in.readLong();
+ long numUnavailableSegments = in.readLong();
+ long minConsumingFreshnessTimeMs = in.readLong();
+ int numServersResponded = in.readInt();
+ int numServersQueried = in.readInt();
+ boolean groupsTrimmed = in.readBoolean();
+ boolean groupLimitReached = in.readBoolean();
+ boolean groupWarningLimitReached = in.readBoolean();
+ long numExceptions = in.readLong();
+ String exceptions = readString(in);
+ String serverStats = readString(in);
+ long offlineTotalCpuTimeNs = in.readLong();
+ long offlineThreadCpuTimeNs = in.readLong();
+ long offlineSystemActivitiesCpuTimeNs = in.readLong();
+ long offlineResponseSerializationCpuTimeNs = in.readLong();
+ long realtimeTotalCpuTimeNs = in.readLong();
+ long realtimeThreadCpuTimeNs = in.readLong();
+ long realtimeSystemActivitiesCpuTimeNs = in.readLong();
+ long realtimeResponseSerializationCpuTimeNs = in.readLong();
+ long offlineTotalMemAllocatedBytes = in.readLong();
+ long offlineThreadMemAllocatedBytes = in.readLong();
+ long offlineResponseSerMemAllocatedBytes = in.readLong();
+ long realtimeTotalMemAllocatedBytes = in.readLong();
+ long realtimeThreadMemAllocatedBytes = in.readLong();
+ long realtimeResponseSerMemAllocatedBytes = in.readLong();
+ int[] pools = readIntArray(in);
+ boolean partialResult = in.readBoolean();
+ boolean rlsFiltersApplied = in.readBoolean();
+ int numRowsResultSet = in.readInt();
+ String[] tablesQueried = readStringArray(in);
+ String traceInfoJson = Objects.requireNonNullElse(readString(in), "{}");
Review Comment:
Only `traceInfoJson` uses `Objects.requireNonNullElse` for null-safe default
values, while the constructor at lines 155-163 also applies defaults to
`_pools`, `_tablesQueried`, and `_traceInfoJson`. Consider applying consistent
null-handling strategy either in serialization or in the constructor, not both
places.
```suggestion
String traceInfoJson = readString(in);
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
+ Comparator<QueryLogRecord> comparator = orderings.isEmpty()
+ ?
Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed()
+ : buildComparator(orderings);
+ rows.sort(comparator);
+
+ int fromIndex = Math.min(offset, rows.size());
+ int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex +
limit);
+ List<QueryLogRecord> window = rows.subList(fromIndex, toIndex);
+
+ String[] columnNames = selections.stream().map(selection ->
selection._outputName).toArray(String[]::new);
+ DataSchema.ColumnDataType[] columnDataTypes = selections.stream()
+ .map(selection ->
selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new);
+
+ List<Object[]> tableRows = new ArrayList<>(window.size());
+ for (QueryLogRecord record : window) {
+ Object[] row = new Object[selections.size()];
+ for (int i = 0; i < selections.size(); i++) {
+ row[i] = selections.get(i)._column.extract(record);
+ }
+ tableRows.add(row);
+ }
+
+ ResultTable resultTable = new ResultTable(new DataSchema(columnNames,
columnDataTypes), tableRows);
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setResultTable(resultTable);
+ response.setNumRowsResultSet(resultTable.getRows().size());
+
response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME));
+ response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs);
+ return response;
+ }
+
+ private List<QueryLogRecord> getFilteredRecords(@Nullable
Predicate<QueryLogRecord> predicate)
+ throws IOException {
+ if (_store == null) {
+ return Collections.emptyList();
+ }
+ _lock.readLock().lock();
+ try {
+ List<QueryLogRecord> records = _store.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+ long cutoff = _retentionMs > 0 ? System.currentTimeMillis() -
_retentionMs : Long.MIN_VALUE;
+ List<QueryLogRecord> filtered = new ArrayList<>(records.size());
+ for (QueryLogRecord record : records) {
+ if (record.getLogTimestampMs() < cutoff) {
+ continue;
+ }
+ filtered.add(record);
+ }
+ if (filtered.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (_maxEntries > 0 && filtered.size() > _maxEntries) {
+ filtered = new ArrayList<>(filtered.subList(filtered.size() -
_maxEntries, filtered.size()));
+ } else {
+ filtered = new ArrayList<>(filtered);
+ }
+ if (predicate != null) {
+ filtered.removeIf(record -> !predicate.test(record));
+ }
+ return filtered;
+ } finally {
+ _lock.readLock().unlock();
+ }
+ }
+
+ private interface QueryLogStore extends AutoCloseable {
+ void append(QueryLogRecord record)
+ throws IOException;
+
+ List<QueryLogRecord> getRecords()
+ throws IOException;
+
+ @Override
+ void close()
+ throws IOException;
+ }
+
+ private static final class InMemoryQueryLogStore implements QueryLogStore {
+ private final Deque<QueryLogRecord> _records = new ArrayDeque<>();
+ private final int _maxEntries;
+ private final long _retentionMs;
+
+ InMemoryQueryLogStore(int maxEntries, long retentionMs) {
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ }
+
+ @Override
+ public void append(QueryLogRecord record) {
+ if (_retentionMs > 0) {
+ long cutoff = record.getLogTimestampMs() - _retentionMs;
+ while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs()
< cutoff) {
+ _records.removeFirst();
+ }
+ }
+ _records.addLast(record);
+ while (_records.size() > Math.max(1, _maxEntries)) {
+ _records.removeFirst();
+ }
Review Comment:
The condition `Math.max(1, _maxEntries)` appears in both pruning logic
(lines 317, 330). Consider extracting this as a constant or method to avoid
duplication and ensure consistency.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
+ Comparator<QueryLogRecord> comparator = orderings.isEmpty()
+ ?
Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed()
+ : buildComparator(orderings);
+ rows.sort(comparator);
+
+ int fromIndex = Math.min(offset, rows.size());
+ int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex +
limit);
+ List<QueryLogRecord> window = rows.subList(fromIndex, toIndex);
+
+ String[] columnNames = selections.stream().map(selection ->
selection._outputName).toArray(String[]::new);
+ DataSchema.ColumnDataType[] columnDataTypes = selections.stream()
+ .map(selection ->
selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new);
+
+ List<Object[]> tableRows = new ArrayList<>(window.size());
+ for (QueryLogRecord record : window) {
+ Object[] row = new Object[selections.size()];
+ for (int i = 0; i < selections.size(); i++) {
+ row[i] = selections.get(i)._column.extract(record);
+ }
+ tableRows.add(row);
+ }
+
+ ResultTable resultTable = new ResultTable(new DataSchema(columnNames,
columnDataTypes), tableRows);
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setResultTable(resultTable);
+ response.setNumRowsResultSet(resultTable.getRows().size());
+
response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME));
+ response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs);
+ return response;
+ }
+
+ private List<QueryLogRecord> getFilteredRecords(@Nullable
Predicate<QueryLogRecord> predicate)
+ throws IOException {
+ if (_store == null) {
+ return Collections.emptyList();
+ }
+ _lock.readLock().lock();
+ try {
+ List<QueryLogRecord> records = _store.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+ long cutoff = _retentionMs > 0 ? System.currentTimeMillis() -
_retentionMs : Long.MIN_VALUE;
+ List<QueryLogRecord> filtered = new ArrayList<>(records.size());
+ for (QueryLogRecord record : records) {
+ if (record.getLogTimestampMs() < cutoff) {
+ continue;
+ }
+ filtered.add(record);
+ }
+ if (filtered.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (_maxEntries > 0 && filtered.size() > _maxEntries) {
+ filtered = new ArrayList<>(filtered.subList(filtered.size() -
_maxEntries, filtered.size()));
+ } else {
+ filtered = new ArrayList<>(filtered);
+ }
+ if (predicate != null) {
+ filtered.removeIf(record -> !predicate.test(record));
+ }
+ return filtered;
+ } finally {
+ _lock.readLock().unlock();
+ }
+ }
+
+ private interface QueryLogStore extends AutoCloseable {
+ void append(QueryLogRecord record)
+ throws IOException;
+
+ List<QueryLogRecord> getRecords()
+ throws IOException;
+
+ @Override
+ void close()
+ throws IOException;
+ }
+
+ private static final class InMemoryQueryLogStore implements QueryLogStore {
+ private final Deque<QueryLogRecord> _records = new ArrayDeque<>();
+ private final int _maxEntries;
+ private final long _retentionMs;
+
+ InMemoryQueryLogStore(int maxEntries, long retentionMs) {
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ }
+
+ @Override
+ public void append(QueryLogRecord record) {
+ if (_retentionMs > 0) {
+ long cutoff = record.getLogTimestampMs() - _retentionMs;
+ while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs()
< cutoff) {
+ _records.removeFirst();
+ }
+ }
+ _records.addLast(record);
+ while (_records.size() > Math.max(1, _maxEntries)) {
+ _records.removeFirst();
+ }
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords() {
+ return new ArrayList<>(_records);
+ }
+
+ @Override
+ public void close() {
+ _records.clear();
+ }
+ }
+
+ private static final class DiskBackedQueryLogStore implements QueryLogStore {
+ private static final String SEGMENT_PREFIX = "segment_";
+
+ private final Path _directory;
+ private final long _maxBytes;
+ private final long _segmentBytes;
+ private final int _maxEntries;
+ private final long _retentionMs;
+ private final List<Path> _segments = new ArrayList<>();
+ private final List<Integer> _segmentRecordCounts = new ArrayList<>();
+ private DataOutputStream _currentOutput;
+ private Path _currentSegment;
+ private long _currentSegmentBytes;
+ private long _totalBytes;
+ private long _totalRecords;
+ private int _segmentCounter;
+
+ DiskBackedQueryLogStore(Path directory, long maxBytes, long segmentBytes,
int maxEntries, long retentionMs)
+ throws IOException {
+ _directory = directory;
+ _maxBytes = Math.max(1L, maxBytes);
+ _segmentBytes = Math.max(1L, segmentBytes);
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ Files.createDirectories(_directory);
+ try (Stream<Path> stream = Files.list(_directory)) {
+ stream.filter(path -> Files.isRegularFile(path) &&
path.getFileName().toString().startsWith(SEGMENT_PREFIX))
+ .sorted()
+ .forEach(path -> {
+ _segments.add(path);
+ _segmentRecordCounts.add(estimateRecordCount(path));
+ try {
+ _totalBytes += Files.size(path);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to compute size for query log segment {}",
path, e);
+ }
+ });
+ }
+ _totalRecords = _segmentRecordCounts.stream().mapToLong(i -> i).sum();
+ _segmentCounter = _segments.size();
+ pruneByTimeIfNeeded(); // Ensure old segments are cleaned up on startup
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ openNewSegment();
+ }
+
+ @Override
+ public void append(QueryLogRecord record)
+ throws IOException {
+ byte[] payload = QueryLogRecordSerDe.serialize(record);
+ int entrySize = Integer.BYTES + payload.length;
+ // Rotate segment if either full by size, or if retention requires
time-roll
+ if (_currentSegmentBytes > 0 &&
shouldRotateForTime(record.getLogTimestampMs())) {
+ openNewSegment();
+ }
+ if (_currentSegmentBytes + entrySize > _segmentBytes) {
+ openNewSegment();
+ }
+ _currentOutput.writeInt(payload.length);
+ _currentOutput.write(payload);
+ _currentOutput.flush();
+ _currentSegmentBytes += entrySize;
+ _totalBytes += entrySize;
+ // Update counts
+ if (!_segmentRecordCounts.isEmpty()) {
+ int lastIdx = _segmentRecordCounts.size() - 1;
+ _segmentRecordCounts.set(lastIdx, _segmentRecordCounts.get(lastIdx) +
1);
+ }
+ _totalRecords += 1;
+ // Apply pruning policies
+ pruneByTimeIfNeeded();
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords()
+ throws IOException {
+ if (_segments.isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<QueryLogRecord> records = new ArrayList<>();
+ for (Path segment : _segments) {
+ records.addAll(readSegment(segment));
+ }
+ return records;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+ closeCurrentOutput();
+ _segments.clear();
+ _segmentRecordCounts.clear();
+ _totalBytes = 0;
+ _currentSegmentBytes = 0;
+ _totalRecords = 0;
+ }
+
+ private void openNewSegment()
+ throws IOException {
+ closeCurrentOutput();
+ _currentSegment = _directory.resolve(String.format("%s%d_%d.log",
SEGMENT_PREFIX, System.currentTimeMillis(),
+ _segmentCounter++));
+ Files.createFile(_currentSegment);
+ _segments.add(_currentSegment);
+ _segmentRecordCounts.add(0);
+ _currentSegmentBytes = 0;
+ _currentOutput = new DataOutputStream(new BufferedOutputStream(
+ Files.newOutputStream(_currentSegment, StandardOpenOption.APPEND)));
+ }
+
+ private void closeCurrentOutput()
+ throws IOException {
+ if (_currentOutput != null) {
+ _currentOutput.flush();
+ _currentOutput.close();
+ _currentOutput = null;
+ }
+ }
+
+ private void pruneBySizeIfNeeded()
+ throws IOException {
+ while (_totalBytes > _maxBytes && _segments.size() > 1) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ if (_totalBytes > _maxBytes && _segments.size() == 1) {
+ LOGGER.warn("Query log disk storage exceeds maxBytes {} but cannot
delete active segment", _maxBytes);
+ }
+ }
+
+ private void pruneByRowCountIfNeeded()
+ throws IOException {
+ if (_maxEntries <= 0) {
+ return;
+ }
+ while (_totalRecords > _maxEntries && _segments.size() > 1) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ if (_totalRecords > _maxEntries && _segments.size() == 1) {
+ LOGGER.warn("Query log disk storage exceeds maxEntries {} but cannot
delete within active segment",
+ _maxEntries);
+ }
+ }
+
+ private void pruneByTimeIfNeeded()
+ throws IOException {
+ if (_retentionMs <= 0) {
+ return;
+ }
+ long cutoff = System.currentTimeMillis() - _retentionMs;
+ while (_segments.size() > 1 && segmentCreationTime(_segments.get(0)) <
cutoff) {
+ Path oldest = _segments.remove(0);
+ int removedCount = _segmentRecordCounts.remove(0);
+ try {
+ long size = Files.size(oldest);
+ Files.deleteIfExists(oldest);
+ _totalBytes -= size;
+ _totalRecords -= removedCount;
+ } catch (IOException e) {
+ LOGGER.warn("Failed to delete old query log segment {}", oldest, e);
+ break;
+ }
+ }
+ // do not delete the only active segment even if older than cutoff; it
will rotate on next append if needed
+ }
+
+ private boolean shouldRotateForTime(long recordTimestampMs) {
+ if (_retentionMs <= 0) {
+ return false;
+ }
+ long cutoff = recordTimestampMs - _retentionMs;
+ return segmentCreationTime(_currentSegment) < cutoff;
+ }
+
+ private long segmentCreationTime(Path segment) {
+ // segment filename: segment_<timestamp>_<counter>.log
+ String name = segment.getFileName().toString();
+ try {
+ int underscore = name.indexOf('_');
+ int secondUnderscore = name.indexOf('_', underscore + 1);
+ String ts = name.substring(underscore + 1, secondUnderscore);
+ return Long.parseLong(ts);
+ } catch (Exception e) {
+ // fallback to file creation time if parse fails
+ return 0L;
+ }
+ }
+
+ private int estimateRecordCount(Path segment) {
+ // Fast count by scanning int-length prefixes
+ int count = 0;
+ if (!Files.exists(segment)) {
+ return 0;
+ }
+ try (DataInputStream input = new DataInputStream(
+ new BufferedInputStream(Files.newInputStream(segment,
StandardOpenOption.READ)))) {
+ while (true) {
+ int length;
+ try {
+ length = input.readInt();
+ } catch (EOFException e) {
+ break;
+ }
+ if (length <= 0) {
+ break;
+ }
+ long skipped = input.skipBytes(length);
+ if (skipped < length) {
Review Comment:
`DataInputStream.skipBytes()` returns an `int`, not a `long`. This may cause
truncation when `length` exceeds `Integer.MAX_VALUE`. Use `input.skip(length)`
or handle the int return value appropriately.
```suggestion
long toSkip = length;
while (toSkip > 0) {
long skipped = input.skip(toSkip);
if (skipped <= 0) {
// EOF or unable to skip further
break;
}
toSkip -= skipped;
}
if (toSkip > 0) {
// Could not skip full record, likely truncated
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotQueryResource.java:
##########
@@ -736,7 +846,10 @@ private String getTimeSeriesQueryURL(String protocol,
String hostName, int port,
}
}
- private Map<String, String> extractHeaders(HttpHeaders httpHeaders) {
+ private Map<String, String> extractHeaders(@Nullable HttpHeaders
httpHeaders) {
+ if (httpHeaders == null) {
+ return Collections.emptyMap();
+ }
Review Comment:
The method signature changed to accept `@Nullable HttpHeaders`, but the
previous implementation didn't handle null. While the null check is now added,
verify that all callers (especially line 454) correctly handle scenarios where
httpHeaders might be null.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
+ Comparator<QueryLogRecord> comparator = orderings.isEmpty()
+ ?
Comparator.comparingLong(QueryLogRecord::getLogTimestampMs).reversed()
+ : buildComparator(orderings);
+ rows.sort(comparator);
+
+ int fromIndex = Math.min(offset, rows.size());
+ int toIndex = limit < 0 ? rows.size() : Math.min(rows.size(), fromIndex +
limit);
+ List<QueryLogRecord> window = rows.subList(fromIndex, toIndex);
+
+ String[] columnNames = selections.stream().map(selection ->
selection._outputName).toArray(String[]::new);
+ DataSchema.ColumnDataType[] columnDataTypes = selections.stream()
+ .map(selection ->
selection._column._dataType).toArray(DataSchema.ColumnDataType[]::new);
+
+ List<Object[]> tableRows = new ArrayList<>(window.size());
+ for (QueryLogRecord record : window) {
+ Object[] row = new Object[selections.size()];
+ for (int i = 0; i < selections.size(); i++) {
+ row[i] = selections.get(i)._column.extract(record);
+ }
+ tableRows.add(row);
+ }
+
+ ResultTable resultTable = new ResultTable(new DataSchema(columnNames,
columnDataTypes), tableRows);
+ BrokerResponseNative response = new BrokerResponseNative();
+ response.setResultTable(resultTable);
+ response.setNumRowsResultSet(resultTable.getRows().size());
+
response.setTablesQueried(Set.of(QueryLogSystemTableUtils.FULL_TABLE_NAME));
+ response.setTimeUsedMs(System.currentTimeMillis() - startTimeMs);
+ return response;
+ }
+
+ private List<QueryLogRecord> getFilteredRecords(@Nullable
Predicate<QueryLogRecord> predicate)
+ throws IOException {
+ if (_store == null) {
+ return Collections.emptyList();
+ }
+ _lock.readLock().lock();
+ try {
+ List<QueryLogRecord> records = _store.getRecords();
+ if (records.isEmpty()) {
+ return Collections.emptyList();
+ }
+ long cutoff = _retentionMs > 0 ? System.currentTimeMillis() -
_retentionMs : Long.MIN_VALUE;
+ List<QueryLogRecord> filtered = new ArrayList<>(records.size());
+ for (QueryLogRecord record : records) {
+ if (record.getLogTimestampMs() < cutoff) {
+ continue;
+ }
+ filtered.add(record);
+ }
+ if (filtered.isEmpty()) {
+ return Collections.emptyList();
+ }
+ if (_maxEntries > 0 && filtered.size() > _maxEntries) {
+ filtered = new ArrayList<>(filtered.subList(filtered.size() -
_maxEntries, filtered.size()));
+ } else {
+ filtered = new ArrayList<>(filtered);
+ }
+ if (predicate != null) {
+ filtered.removeIf(record -> !predicate.test(record));
+ }
+ return filtered;
+ } finally {
+ _lock.readLock().unlock();
+ }
+ }
+
+ private interface QueryLogStore extends AutoCloseable {
+ void append(QueryLogRecord record)
+ throws IOException;
+
+ List<QueryLogRecord> getRecords()
+ throws IOException;
+
+ @Override
+ void close()
+ throws IOException;
+ }
+
+ private static final class InMemoryQueryLogStore implements QueryLogStore {
+ private final Deque<QueryLogRecord> _records = new ArrayDeque<>();
+ private final int _maxEntries;
+ private final long _retentionMs;
+
+ InMemoryQueryLogStore(int maxEntries, long retentionMs) {
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ }
+
+ @Override
+ public void append(QueryLogRecord record) {
+ if (_retentionMs > 0) {
+ long cutoff = record.getLogTimestampMs() - _retentionMs;
+ while (!_records.isEmpty() && _records.peekFirst().getLogTimestampMs()
< cutoff) {
+ _records.removeFirst();
+ }
+ }
+ _records.addLast(record);
+ while (_records.size() > Math.max(1, _maxEntries)) {
+ _records.removeFirst();
+ }
+ }
+
+ @Override
+ public List<QueryLogRecord> getRecords() {
+ return new ArrayList<>(_records);
+ }
+
+ @Override
+ public void close() {
+ _records.clear();
+ }
+ }
+
+ private static final class DiskBackedQueryLogStore implements QueryLogStore {
+ private static final String SEGMENT_PREFIX = "segment_";
+
+ private final Path _directory;
+ private final long _maxBytes;
+ private final long _segmentBytes;
+ private final int _maxEntries;
+ private final long _retentionMs;
+ private final List<Path> _segments = new ArrayList<>();
+ private final List<Integer> _segmentRecordCounts = new ArrayList<>();
+ private DataOutputStream _currentOutput;
+ private Path _currentSegment;
+ private long _currentSegmentBytes;
+ private long _totalBytes;
+ private long _totalRecords;
+ private int _segmentCounter;
+
+ DiskBackedQueryLogStore(Path directory, long maxBytes, long segmentBytes,
int maxEntries, long retentionMs)
+ throws IOException {
+ _directory = directory;
+ _maxBytes = Math.max(1L, maxBytes);
+ _segmentBytes = Math.max(1L, segmentBytes);
+ _maxEntries = Math.max(1, maxEntries);
+ _retentionMs = retentionMs;
+ Files.createDirectories(_directory);
+ try (Stream<Path> stream = Files.list(_directory)) {
+ stream.filter(path -> Files.isRegularFile(path) &&
path.getFileName().toString().startsWith(SEGMENT_PREFIX))
+ .sorted()
+ .forEach(path -> {
+ _segments.add(path);
+ _segmentRecordCounts.add(estimateRecordCount(path));
+ try {
+ _totalBytes += Files.size(path);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to compute size for query log segment {}",
path, e);
+ }
+ });
+ }
+ _totalRecords = _segmentRecordCounts.stream().mapToLong(i -> i).sum();
+ _segmentCounter = _segments.size();
+ pruneByTimeIfNeeded(); // Ensure old segments are cleaned up on startup
+ pruneByRowCountIfNeeded();
+ pruneBySizeIfNeeded();
+ openNewSegment();
+ }
+
+ @Override
+ public void append(QueryLogRecord record)
+ throws IOException {
+ byte[] payload = QueryLogRecordSerDe.serialize(record);
+ int entrySize = Integer.BYTES + payload.length;
+ // Rotate segment if either full by size, or if retention requires
time-roll
Review Comment:
The condition `_currentSegmentBytes > 0` is checked to avoid rotating an
empty segment. This logic is subtle and would benefit from a comment explaining
why rotation is skipped for empty segments.
```suggestion
// Rotate segment if either full by size, or if retention requires
time-roll
// Skip rotation for empty segments to avoid creating unnecessary
empty segment files.
// This logic is subtle: we only rotate for time if the current
segment has data.
```
##########
pinot-broker/src/main/java/org/apache/pinot/broker/querylog/QueryLogSystemTable.java:
##########
@@ -0,0 +1,1508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.querylog;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+import java.util.stream.Stream;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlNumericLiteral;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.QueryLogSystemTableUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.SqlNodeAndOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Keeps an in-memory copy of recent query log records and evaluates {@code
SELECT} queries against it.
+ */
+public class QueryLogSystemTable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(QueryLogSystemTable.class);
+
+ private static final QueryLogSystemTable INSTANCE = new
QueryLogSystemTable();
+
+ public static final String FULL_TABLE_NAME =
QueryLogSystemTableUtils.FULL_TABLE_NAME;
+
+ private final ReadWriteLock _lock = new ReentrantReadWriteLock();
+ private final AtomicBoolean _initialized = new AtomicBoolean(false);
+
+ private volatile boolean _enabled;
+ private volatile int _maxEntries;
+ private volatile long _retentionMs;
+ private volatile int _defaultLimit;
+ private volatile QueryLogStore _store;
+ private volatile String _storageType;
+
+ private QueryLogSystemTable() {
+ }
+
+ public static QueryLogSystemTable getInstance() {
+ return INSTANCE;
+ }
+
+ public void initIfNeeded(PinotConfiguration config) {
+ if (_initialized.compareAndSet(false, true)) {
+ _enabled =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_ENABLED,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_ENABLED);
+ _maxEntries =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_MAX_ENTRIES);
+ _retentionMs =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_RETENTION_MS);
+ _defaultLimit =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DEFAULT_LIMIT);
+ _storageType =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_STORAGE,
+ CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_STORAGE);
+ if (_enabled) {
+ try {
+ if ("disk".equalsIgnoreCase(_storageType)) {
+ String directory =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_DIR,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_DIR);
+ long maxBytes =
config.getProperty(CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_MAX_BYTES);
+ long segmentBytes = config.getProperty(
+
CommonConstants.Broker.CONFIG_OF_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES,
+
CommonConstants.Broker.DEFAULT_QUERY_LOG_SYSTEM_TABLE_DISK_SEGMENT_BYTES);
+ segmentBytes = Math.min(segmentBytes, maxBytes);
+ _store =
+ new DiskBackedQueryLogStore(Paths.get(directory), maxBytes,
segmentBytes, _maxEntries, _retentionMs);
+ } else {
+ _store = new InMemoryQueryLogStore(_maxEntries, _retentionMs);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize query log system table storage",
e);
+ _enabled = false;
+ _store = null;
+ }
+ }
+ LOGGER.info(
+ "Initialized query log system table: enabled={}, storage={},
maxEntries={}, retentionMs={}, defaultLimit={}",
+ _enabled, _storageType, _maxEntries, _retentionMs, _defaultLimit);
+ }
+ }
+
+ public boolean isEnabled() {
+ return _enabled;
+ }
+
+ public void append(QueryLogRecord record) {
+ if (!_enabled || record == null || _store == null) {
+ return;
+ }
+ _lock.writeLock().lock();
+ try {
+ _store.append(record);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to append query log record", e);
+ } finally {
+ _lock.writeLock().unlock();
+ }
+ }
+
+ @Nullable
+ public BrokerResponse handleIfSystemTable(SqlNodeAndOptions
sqlNodeAndOptions)
+ throws BadQueryRequestException {
+ if (!_enabled) {
+ return null;
+ }
+ SqlNode sqlNode = sqlNodeAndOptions.getSqlNode();
+ if (!QueryLogSystemTableUtils.isQueryLogSystemTableQuery(sqlNode)) {
+ return null;
+ }
+ ParsedQuery parsedQuery = parse(sqlNode);
+ if (parsedQuery == null) {
+ return null;
+ }
+ return execute(parsedQuery);
+ }
+
+ private ParsedQuery parse(SqlNode sqlNode)
+ throws BadQueryRequestException {
+ SqlNode workingNode = sqlNode;
+ SqlNodeList outerOrderBy = null;
+ SqlNode outerOffset = null;
+ SqlNode outerFetch = null;
+ if (workingNode instanceof SqlOrderBy) {
+ SqlOrderBy orderBy = (SqlOrderBy) workingNode;
+ outerOrderBy = orderBy.orderList;
+ outerOffset = orderBy.offset;
+ outerFetch = orderBy.fetch;
+ workingNode = orderBy.query;
+ }
+
+ if (!(workingNode instanceof SqlSelect)) {
+ return null;
+ }
+ SqlSelect select = (SqlSelect) workingNode;
+ if (select.getGroup() != null && select.getGroup().size() > 0) {
+ throw new BadQueryRequestException("GROUP BY is not supported for
system.query_log");
+ }
+ if (select.getHaving() != null) {
+ throw new BadQueryRequestException("HAVING is not supported for
system.query_log");
+ }
+ if (select.isDistinct()) {
+ throw new BadQueryRequestException("DISTINCT is not supported for
system.query_log");
+ }
+
+ SqlNodeList selectList = select.getSelectList();
+ SqlNode where = select.getWhere();
+ SqlNodeList orderList = outerOrderBy != null ? outerOrderBy :
select.getOrderList();
+ SqlNode offset = outerOffset != null ? outerOffset : select.getOffset();
+ SqlNode fetch = outerFetch != null ? outerFetch : select.getFetch();
+
+ return new ParsedQuery(selectList, where, orderList, offset, fetch);
+ }
+
+ private BrokerResponse execute(ParsedQuery query)
+ throws BadQueryRequestException {
+ long startTimeMs = System.currentTimeMillis();
+ List<SelectedColumn> selections = resolveSelectList(query._selectList);
+ Map<String, QueryLogColumn> aliasLookup = new HashMap<>();
+ for (SelectedColumn selectedColumn : selections) {
+ aliasLookup.put(selectedColumn._outputName.toLowerCase(Locale.ROOT),
selectedColumn._column);
+ }
+
+ Predicate<QueryLogRecord> predicate = buildPredicate(query._whereClause,
aliasLookup);
+ List<Ordering> orderings = parseOrderings(query._orderBy, aliasLookup,
selections);
+ int offset = parseNonNegativeInt(query._offsetNode, "OFFSET", 0);
+ int limit = parseNonNegativeInt(query._fetchNode, "LIMIT", _defaultLimit);
+
+ List<QueryLogRecord> rows;
+ try {
+ rows = getFilteredRecords(predicate);
+ } catch (IOException e) {
+ throw new BadQueryRequestException("Failed to read query log storage",
e);
+ }
Review Comment:
The default ordering by `timestampMs DESC` when no ORDER BY is specified is
undocumented. Add a comment explaining this default behavior, as it's an
important query semantic that users should be aware of.
```suggestion
}
// If no ORDER BY clause is specified, default to ordering by
timestampMs DESC.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]