vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1986549031
This PR is superseded by #12608
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specifi
vrajat closed pull request #12157: Detect expired messages in Kafka. Log and
set a guage.
URL: https://github.com/apache/pinot/pull/12157
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specif
Jackie-Jiang commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981974328
Ideally we want a unit test to directly read the value from the gauge,
instead of relying on a rest API which is kind of an integration test.
Currently I saw CI test failures, but no
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981792537
Debug APIs and tests are there to test that the data loss is detected. Is it
OK if one of the PRs has no test to check the changes?
--
This is an automated message from the Apache Git S
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1475535154
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -140,6 +142,8 @@ public class PinotLLCRealti
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1474666166
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -140,6 +142,8 @@ public class PinotLLCRealti
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -140,6 +142,8 @@ public class PinotLLCRealti
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780835
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780427
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473773112
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -1711,4 +1725,14 @@ String moveSegmentFile(S
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473772323
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -1711,4 +1725,14 @@ String moveSegmentFile(S
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -140,6 +142,8 @@ public class PinotLLCRealti
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473761868
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -889,6 +892,39 @@ public Map
getPartitionToLagState(
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473760338
##
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##
@@ -1339,7 +1346,14 @@ private StreamPartitionM
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473759897
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -889,6 +892,39 @@ public Map
getPartitionToLagState(
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473755477
##
pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java:
##
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+
Jackie-Jiang commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1471662750
##
pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java:
##
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1914824597
@Jackie-Jiang @ege-st Can you please re-review this PR? The major changes
since the last changes are:
1. Cleaned up the check in `RealtimeSegmentDataManager` to reuse member
objects to
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1891440840
> So to make sure I understand: we'll essentially have a gauge per partition
per table. And if a new consuming segment is started where the start offset <
earliest offset still in Kafka th
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1889843108
So to make sure I understand: we'll essentially have a gauge per partition
per table. And if a new consuming segment is started where the start offset <
earliest offset still in Kafka the
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1450846307
##
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##
@@ -46,6 +50,20 @@ publ
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1882746729
I've added `RealtimeSlowConsumer.java` as a temporary reproducer while I
work on an integration test.
--
This is an automated message from the Apache Git Service.
To respond to the mess
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819785
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819332
##
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##
@@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1867294975
A simple script to show message expiration in kafka
Assume all the components are installed
- Kafka
- Kafka CLIs
- Faker
```
# Create a topic with retention of 1m
Jackie-Jiang commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1863422625
The overhead should be minimal if we don't need to make extra remote calls.
I'd assume `PartitionGroupConsumer.fetchMessages()` already have the
information we needed to perform the
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861814766
> > We should be able to emit metric within
`PartitionGroupConsumer.fetchMessages()` when the start offset is not available
(e.g. the asked offset range is not fully returned).
>
>
ege-st commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861811576
It is possible for messages to get expired in the middle of the lifetime of
a consuming segment. One way this could happen is if the ingestion on the table
is paused for a period longer th
swaminathanmanish commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861764515
> > We should be able to emit metric within
`PartitionGroupConsumer.fetchMessages()` when the start offset is not available
(e.g. the asked offset range is not fully returned).
vrajat commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1859525432
> We should be able to emit metric within
`PartitionGroupConsumer.fetchMessages()` when the start offset is not available
(e.g. the asked offset range is not fully returned).
@Jacki
vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1429452785
##
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##
@@ -46,6 +50,20 @@ publ
ege-st commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886
##
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##
@@ -46,6 +50,20 @@ publ
codecov-commenter commented on PR #12157:
URL: https://github.com/apache/pinot/pull/12157#issuecomment-1857759620
##
[Codecov](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
R
vrajat opened a new pull request, #12157:
URL: https://github.com/apache/pinot/pull/12157
Pinot may take multiple hours between polling a partition in a Kafka topic.
One specific example is that Pinot took a long time to flush a segment to disk.
In the meantime, messages in Kafka can expire
34 matches
Mail list logo