[
https://issues.apache.org/jira/browse/KAFKA-8486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stanislav updated KAFKA-8486:
------------------------------
Description:
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from json import loads
import vertica_python
from datetime import datetime as dt
from time import sleep
conn_vertica = {'host': '',
'port': 5433,
'user': '',
'password': '',
'database': '',
'use_prepared_statements': True}
conn_to = conn_vertica
def load():(
parsed_topic_name = 'orderSummary'
consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
bootstrap_servers=['us-kafka-broker:9092'],
enable_auto_commit=False,
group_id="my_group",
#api_version=(0, 10),
#consumer_timeout_ms=20000,
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
timeout = 20
max_len = 10
res = []
t1 = dt.now()
while (dt.now()-t1).seconds < timeout or len(res) < max_len:
msgs = consumer.poll()
print(msgs)
for v in msgs.values():(
res += v
with vertica_python.connect(**conn_to) as conn_2:
curs2 = conn_2.cursor()
if res:
curs2.executemany('''
INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id,
"offset", value)
VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset,
r.value) for r in res])
curs2.execute('COMMIT')
else:
print('Nothing!')
consumer.close()
#sleep(5)
load()
> How to commit offset via Kafka
> -------------------------------
>
> Key: KAFKA-8486
> URL: https://issues.apache.org/jira/browse/KAFKA-8486
> Project: Kafka
> Issue Type: Wish
> Components: consumer
> Affects Versions: 2.2.1
> Reporter: Stanislav
> Priority: Trivial
>
> from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
> from json import loads
> import vertica_python
> from datetime import datetime as dt
> from time import sleep
> conn_vertica = {'host': '',
> 'port': 5433,
> 'user': '',
> 'password': '',
> 'database': '',
> 'use_prepared_statements': True}
> conn_to = conn_vertica
> def load():(
> parsed_topic_name = 'orderSummary'
> consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
> bootstrap_servers=['us-kafka-broker:9092'],
> enable_auto_commit=False,
> group_id="my_group",
> #api_version=(0, 10),
> #consumer_timeout_ms=20000,
> value_deserializer=lambda x: loads(x.decode('utf-8'))
> )
> timeout = 20
> max_len = 10
> res = []
> t1 = dt.now()
> while (dt.now()-t1).seconds < timeout or len(res) < max_len:
> msgs = consumer.poll()
> print(msgs)
> for v in msgs.values():(
> res += v
> with vertica_python.connect(**conn_to) as conn_2:
> curs2 = conn_2.cursor()
> if res:
> curs2.executemany('''
> INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id,
> "offset", value)
> VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset,
> r.value) for r in res])
> curs2.execute('COMMIT')
> else:
> print('Nothing!')
> consumer.close()
> #sleep(5)
> load()
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)