[ 
https://issues.apache.org/jira/browse/KAFKA-8486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav  updated KAFKA-8486:
------------------------------
    Description: 
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from json import loads
import vertica_python
from datetime import datetime as dt
from time import sleep

conn_vertica = {'host': '',
 'port': 5433,
 'user': '',
 'password': '',
 'database': '',
 'use_prepared_statements': True}

conn_to = conn_vertica

def load():(
 parsed_topic_name = 'orderSummary'

consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
 bootstrap_servers=['us-kafka-broker:9092'],
 enable_auto_commit=False,
 group_id="my_group",
 #api_version=(0, 10),
 #consumer_timeout_ms=20000,
 value_deserializer=lambda x: loads(x.decode('utf-8'))
 )
 timeout = 20
 max_len = 10
 res = []
 t1 = dt.now()
 while (dt.now()-t1).seconds < timeout or len(res) < max_len:
 msgs = consumer.poll()
 print(msgs)
 for v in msgs.values():(
 res += v

with vertica_python.connect(**conn_to) as conn_2:
 curs2 = conn_2.cursor()
 if res:
 curs2.executemany('''
 INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, 
"offset", value) 
 VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, 
r.value) for r in res])
 curs2.execute('COMMIT')


 else:
 print('Nothing!')

consumer.close()
 #sleep(5)

load()

> How to commit offset via Kafka 
> -------------------------------
>
>                 Key: KAFKA-8486
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8486
>             Project: Kafka
>          Issue Type: Wish
>          Components: consumer
>    Affects Versions: 2.2.1
>            Reporter: Stanislav 
>            Priority: Trivial
>
> from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
> from json import loads
> import vertica_python
> from datetime import datetime as dt
> from time import sleep
> conn_vertica = {'host': '',
>  'port': 5433,
>  'user': '',
>  'password': '',
>  'database': '',
>  'use_prepared_statements': True}
> conn_to = conn_vertica
> def load():(
>  parsed_topic_name = 'orderSummary'
> consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
>  bootstrap_servers=['us-kafka-broker:9092'],
>  enable_auto_commit=False,
>  group_id="my_group",
>  #api_version=(0, 10),
>  #consumer_timeout_ms=20000,
>  value_deserializer=lambda x: loads(x.decode('utf-8'))
>  )
>  timeout = 20
>  max_len = 10
>  res = []
>  t1 = dt.now()
>  while (dt.now()-t1).seconds < timeout or len(res) < max_len:
>  msgs = consumer.poll()
>  print(msgs)
>  for v in msgs.values():(
>  res += v
> with vertica_python.connect(**conn_to) as conn_2:
>  curs2 = conn_2.cursor()
>  if res:
>  curs2.executemany('''
>  INSERT INTO stage.FS_Orders_from_kafka (load_dtm,topic_name, partition_id, 
> "offset", value) 
>  VALUES (?, ?, ?, ?, ?)''', [(r.timestamp, r.topic, r.partition, r.offset, 
> r.value) for r in res])
>  curs2.execute('COMMIT')
>  else:
>  print('Nothing!')
> consumer.close()
>  #sleep(5)
> load()



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to