-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.py
29 lines (23 loc) · 894 Bytes
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from aiokafka import AIOKafkaConsumer
import asyncio
import os
# env variables
KAFKA_TOPIC = os.getenv('KAFKA_TOPIC', 'mytopic')
KAFKA_CONSUMER_GROUP = os.getenv('KAFKA_CONSUMER_GROUP', 'group')
KAFKA_BOOTSTRAP_SERVERS = os.getenv('KAFKA_BOOTSTRAP_SERVERS', 'localhost:9092')
# global variables
loop = asyncio.get_event_loop()
async def consume():
consumer = AIOKafkaConsumer(KAFKA_TOPIC, loop=loop,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP)
# get cluster layout and join group KAFKA_CONSUMER_GROUP
await consumer.start()
try:
# consume messages
async for msg in consumer:
print(f"Consumed msg: {msg}")
finally:
# will leave consumer group; perform autocommit if enabled.
await consumer.stop()
loop.run_until_complete(consume())