mastodon-stream/kafkaproducer.py

35 wiersze
1.0 KiB
Python
Czysty Zwykły widok Historia

2023-01-28 11:06:42 +00:00
import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
def acked(err, msg):
if err is not None:
2023-02-01 22:17:10 +00:00
print('Failed to deliver message: %s: %s' % msg.value().decode('utf-8'), str(err))
2023-01-28 11:06:42 +00:00
else:
2023-02-01 22:17:10 +00:00
print('Message produced: %s' % msg.value().decode('utf-8'))
2023-01-28 11:06:42 +00:00
def kafka_producer():
producer_config = {
2023-02-01 22:17:10 +00:00
'bootstrap.servers': 'localhost:9092',
2023-02-14 05:01:18 +00:00
'schema.registry.url': 'http://localhost:8081',
'broker.address.family': 'v4'
2023-01-28 11:06:42 +00:00
}
2023-02-01 22:17:10 +00:00
value_schema = avro.load('avro/mastodon-topic-value.avsc')
2023-01-28 11:06:42 +00:00
producer = AvroProducer(producer_config, default_value_schema=value_schema)
2023-02-01 22:17:10 +00:00
return 'mastodon-topic', producer
2023-01-28 11:06:42 +00:00
def main():
2023-02-21 07:07:26 +00:00
# example test producer
2023-01-28 11:06:42 +00:00
topic_name, producer = kafka_producer()
2023-02-01 22:17:10 +00:00
value_dict = { 'language': 'en', 'favourites': 0, 'username': 'bob', 'bot': False, 'tags': 0, 'characters': 50, 'words': 12}
2023-01-28 11:06:42 +00:00
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
2023-02-01 22:17:10 +00:00
if __name__ == '__main__':
2023-01-28 11:06:42 +00:00
main()