junrao commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1589688863
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -181,7 +189,8 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
Time time,
Function<TopicPartition, Optional<UnifiedLog>>
fetchLog,
BiConsumer<TopicPartition, Long>
updateRemoteLogStartOffset,
- BrokerTopicStats brokerTopicStats) throws
IOException {
+ BrokerTopicStats brokerTopicStats,
+ Metrics metrics) throws IOException {
Review Comment:
Could we add the javadoc for the new param?
##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManagerConfig.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+public class RLMQuotaManagerConfig {
+ public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+
+ private final long quotaBytesPerSecond;
+ private final int numQuotaSamples;
+ private final int quotaWindowSizeSeconds;
+
+ public long getQuotaBytesPerSecond() {
Review Comment:
For consistency, we don't typically use getters. So this can just be
quotaBytesPerSecond. Ditto below.
##########
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+ private final RLMQuotaManagerConfig config;
+ private final Metrics metrics;
+ private final QuotaType quotaType;
+ private final String description;
+ private final Time time;
+
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final SensorAccess sensorAccess;
+ private Quota quota;
+
+ public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics,
QuotaType quotaType, String description, Time time) {
+ this.config = config;
+ this.metrics = metrics;
+ this.quotaType = quotaType;
+ this.description = description;
+ this.time = time;
+
+ this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+ this.sensorAccess = new SensorAccess(lock, metrics);
+ }
+
+ public void updateQuota(Quota newQuota) {
+ lock.writeLock().lock();
+ try {
+ this.quota = newQuota;
+
+ Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
+ MetricName quotaMetricName = metricName();
+ KafkaMetric metric = allMetrics.get(quotaMetricName);
+ if (metric != null) {
+ LOGGER.warn("Sensor for quota-id {} already exists. Setting
quota to {} in MetricConfig", quotaMetricName, newQuota);
+ metric.config(getQuotaMetricConfig(newQuota));
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public boolean isQuotaExceeded() {
+ Sensor sensorInstance = sensor();
+ try {
+ sensorInstance.checkQuotas();
+ } catch (QuotaViolationException qve) {
+ LOGGER.debug("Quota violated for sensor ({}), metric: ({}),
metric-value: ({}), bound: ({})",
+ sensorInstance.name(), qve.metric().metricName(), qve.value(),
qve.bound());
+ return true;
+ }
+ return false;
+ }
+
+ public void record(double value) {
+ sensor().record(value, time.milliseconds(), false);
Review Comment:
Why do we turn the quota checking off during `record()`? In other
implementations like `ClientQuotaManager.recordAndGetThrottleTimeMs()`, we call
`record()` by turning on quota checking and get back the amount of time to
throttle.
##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java:
##########
@@ -143,6 +143,38 @@ public final class RemoteLogManagerConfig {
"less than or equal to `log.retention.bytes` value.";
public static final Long DEFAULT_LOG_LOCAL_RETENTION_BYTES = -2L;
+ public static final String
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP =
"remote.log.manager.copy.max.bytes.per.second";
+ public static final String
REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_DOC = "The maximum number of bytes
that can be copied from local storage to remote storage per second. " +
+ "This is a global limit for all the partitions that are being
copied from remote storage to local storage. " +
+ "The default value is Long.MAX_VALUE, which means there is no
limit on the number of bytes that can be copied per second.";
+ public static final Long
DEFAULT_REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+ public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_PROP =
"remote.log.manager.copy.quota.window.num";
+ public static final String REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM_DOC =
"The number of samples to retain in memory for remote copy quota management. " +
+ "The default value is 61, which means there are 60 whole windows +
1 current window.";
+ public static final int DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_NUM =
61;
+
+ public static final String
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_PROP =
"remote.log.manager.copy.quota.window.size.seconds";
+ public static final String
REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each
sample for remote copy quota management. " +
+ "The default value is 1 second.";
+ public static final int
DEFAULT_REMOTE_LOG_MANAGER_COPY_QUOTA_WINDOW_SIZE_SECONDS = 1;
+
+ public static final String
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP =
"remote.log.manager.fetch.max.bytes.per.second";
+ public static final String
REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_DOC = "The maximum number of
bytes that can be fetched from remote storage to local storage per second. " +
+ "This is a global limit for all the partitions that are being
fetched from remote storage to local storage. " +
+ "The default value is Long.MAX_VALUE, which means there is no
limit on the number of bytes that can be fetched per second.";
+ public static final Long
DEFAULT_REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND = Long.MAX_VALUE;
+
+ public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_PROP
= "remote.log.manager.fetch.quota.window.num";
+ public static final String REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM_DOC =
"The number of samples to retain in memory for remote fetch quota management. "
+
+ "The default value is 11, which means there are 10 whole windows +
1 current window.";
+ public static final int DEFAULT_REMOTE_LOG_MANAGER_FETCH_QUOTA_WINDOW_NUM
= 11;
Review Comment:
Why do we choose the default fetch window different from copy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]