Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-03-08 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1986549031 This PR is superseded by #12608 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specifi

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-03-08 Thread via GitHub
vrajat closed pull request #12157: Detect expired messages in Kafka. Log and set a guage. URL: https://github.com/apache/pinot/pull/12157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specif

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-03-06 Thread via GitHub
Jackie-Jiang commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981974328 Ideally we want a unit test to directly read the value from the gauge, instead of relying on a rest API which is kind of an integration test. Currently I saw CI test failures, but no

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-03-06 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1981792537 Debug APIs and tests are there to test that the data loss is detected. Is it OK if one of the PRs has no test to check the changes? -- This is an automated message from the Apache Git S

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-02-01 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1475535154 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -140,6 +142,8 @@ public class PinotLLCRealti

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-02-01 Thread via GitHub
ege-st commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1474666166 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -140,6 +142,8 @@ public class PinotLLCRealti

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -140,6 +142,8 @@ public class PinotLLCRealti

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780835 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473780427 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473773112 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -1711,4 +1725,14 @@ String moveSegmentFile(S

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473772323 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -1711,4 +1725,14 @@ String moveSegmentFile(S

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473771585 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -140,6 +142,8 @@ public class PinotLLCRealti

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473761868 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -889,6 +892,39 @@ public Map getPartitionToLagState(

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473760338 ## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java: ## @@ -1339,7 +1346,14 @@ private StreamPartitionM

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473759897 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -889,6 +892,39 @@ public Map getPartitionToLagState(

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-31 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1473755477 ## pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java: ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one +

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-30 Thread via GitHub
Jackie-Jiang commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1471662750 ## pinot-tools/src/main/java/org/apache/pinot/tools/RealTimeSlowConsumer.java: ## @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-29 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1914824597 @Jackie-Jiang @ege-st Can you please re-review this PR? The major changes since the last changes are: 1. Cleaned up the check in `RealtimeSegmentDataManager` to reuse member objects to

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-14 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1891440840 > So to make sure I understand: we'll essentially have a gauge per partition per table. And if a new consuming segment is started where the start offset < earliest offset still in Kafka th

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-12 Thread via GitHub
ege-st commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1889843108 So to make sure I understand: we'll essentially have a gauge per partition per table. And if a new consuming segment is started where the start offset < earliest offset still in Kafka the

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-12 Thread via GitHub
ege-st commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1450846307 ## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ## @@ -46,6 +50,20 @@ publ

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2024-01-09 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1882746729 I've added `RealtimeSlowConsumer.java` as a temporary reproducer while I work on an integration test. -- This is an automated message from the Apache Git Service. To respond to the mess

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-22 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819785 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-22 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1434819332 ## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java: ## @@ -1648,6 +1648,15 @@ private void makeStreamConsumer(String

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-21 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1867294975 A simple script to show message expiration in kafka Assume all the components are installed - Kafka - Kafka CLIs - Faker ``` # Create a topic with retention of 1m

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-19 Thread via GitHub
Jackie-Jiang commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1863422625 The overhead should be minimal if we don't need to make extra remote calls. I'd assume `PartitionGroupConsumer.fetchMessages()` already have the information we needed to perform the

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-18 Thread via GitHub
ege-st commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861814766 > > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned). > >

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-18 Thread via GitHub
ege-st commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861811576 It is possible for messages to get expired in the middle of the lifetime of a consuming segment. One way this could happen is if the ingestion on the table is paused for a period longer th

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-18 Thread via GitHub
swaminathanmanish commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1861764515 > > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned).

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-17 Thread via GitHub
vrajat commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1859525432 > We should be able to emit metric within `PartitionGroupConsumer.fetchMessages()` when the start offset is not available (e.g. the asked offset range is not fully returned). @Jacki

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-17 Thread via GitHub
vrajat commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1429452785 ## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ## @@ -46,6 +50,20 @@ publ

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-15 Thread via GitHub
ege-st commented on code in PR #12157: URL: https://github.com/apache/pinot/pull/12157#discussion_r1428220886 ## pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java: ## @@ -46,6 +50,20 @@ publ

Re: [PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-15 Thread via GitHub
codecov-commenter commented on PR #12157: URL: https://github.com/apache/pinot/pull/12157#issuecomment-1857759620 ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/12157?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) R

[PR] Detect expired messages in Kafka. Log and set a guage. [pinot]

2023-12-15 Thread via GitHub
vrajat opened a new pull request, #12157: URL: https://github.com/apache/pinot/pull/12157 Pinot may take multiple hours between polling a partition in a Kafka topic. One specific example is that Pinot took a long time to flush a segment to disk. In the meantime, messages in Kafka can expire