From dcf3a10437056ac8d34d05069b199129416d7702 Mon Sep 17 00:00:00 2001 From: Simon Aubury Date: Sat, 28 Jan 2023 22:06:42 +1100 Subject: [PATCH] Connect listner to producer --- avro/mastodon-topic-value.avsc | 2 +- kafka-producer.py | 40 --------------------------- kafkaproducer.py | 49 ++++++++++++++++++++++++++++++++++ mastodon.py | 43 ----------------------------- mastodonlisten.py | 33 +++++++++++++++++++++++ 5 files changed, 83 insertions(+), 84 deletions(-) delete mode 100644 kafka-producer.py create mode 100644 kafkaproducer.py delete mode 100644 mastodon.py create mode 100644 mastodonlisten.py diff --git a/avro/mastodon-topic-value.avsc b/avro/mastodon-topic-value.avsc index 9a4ce83..b8a7049 100644 --- a/avro/mastodon-topic-value.avsc +++ b/avro/mastodon-topic-value.avsc @@ -6,7 +6,7 @@ { "name": "language", "type": "string", - "default": "" + "default": "null" }, { "name": "favourites", diff --git a/kafka-producer.py b/kafka-producer.py deleted file mode 100644 index 020520d..0000000 --- a/kafka-producer.py +++ /dev/null @@ -1,40 +0,0 @@ -import json -from confluent_kafka import Producer, KafkaException -from confluent_kafka.avro import AvroProducer -from confluent_kafka import avro - - -def acked(err, msg): - if err is not None: - print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err)) - else: - print("Message produced: %s" % msg.value().decode('utf-8')) - - -def send_data(): - producer_config = { - "bootstrap.servers": "localhost:9092", - "schema.registry.url": "http://localhost:8081" - } - - value_schema = avro.load("avro/mastodon-topic-value.avsc") - - try: - producer = AvroProducer(producer_config, default_value_schema=value_schema) - - value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12} - - producer.produce(topic = "mastodon-topic", value = value_dict) - producer.flush() - - except KafkaException as e: - print('Kafka failure ' + e) - - - -def main(): - send_data() - - -if __name__ == "__main__": - main() diff --git a/kafkaproducer.py b/kafkaproducer.py new file mode 100644 index 0000000..94d64c2 --- /dev/null +++ b/kafkaproducer.py @@ -0,0 +1,49 @@ +import json +from confluent_kafka import Producer, KafkaException +from confluent_kafka.avro import AvroProducer +from confluent_kafka import avro + + +def acked(err, msg): + if err is not None: + print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err)) + else: + print("Message produced: %s" % msg.value().decode('utf-8')) + + +def kafka_producer(): + producer_config = { + "bootstrap.servers": "localhost:9092", + "schema.registry.url": "http://localhost:8081" + } + + value_schema = avro.load("avro/mastodon-topic-value.avsc") + producer = AvroProducer(producer_config, default_value_schema=value_schema) + return "mastodon-topic", producer + + # try: + + + # value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12} + + # producer.produce(topic = "mastodon-topic", value = value_dict) + # producer.flush() + + # except KafkaException as e: + # print('Kafka failure ' + e) + + + +def main(): + topic_name, producer = kafka_producer() + + value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12} + producer.produce(topic = topic_name, value = value_dict) + + value_dict = { "language": "fr", "favourites": 0, "username": "jane", "bot": False, "tags": 0, "characters": 500, "words": 120} + producer.produce(topic = topic_name, value = value_dict) + + producer.flush() + +if __name__ == "__main__": + main() diff --git a/mastodon.py b/mastodon.py deleted file mode 100644 index 2c30688..0000000 --- a/mastodon.py +++ /dev/null @@ -1,43 +0,0 @@ -# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023 - -import mastodon -from mastodon import Mastodon -from bs4 import BeautifulSoup - -#### Listener for Mastodon events - -class Listener(mastodon.StreamListener): - - def on_update(self, status): - - if status.language in [ 'en', 'fr' ]: - # print('FOO ') - - m_text = BeautifulSoup(status.content, "html.parser").text - - print(f'status.language {status.language}', type(status.language)) - print(f'status.favourites_count {status.favourites_count}', type(status.favourites_count)) - print(f'status.account.username {status.account.username}', type(status.account.username)) - print(f'status.account.display_name {status.account.display_name}', type(status.account.display_name)) - print(f'status.account.bot {status.account.bot}', type(status.account.bot)) - print(f'status.tags {status.tags}', type(status.tags)) - num_tags = len(status.tags) - num_chars = len(m_text) - num_words = len(m_text.split()) - - print(f'num_tags {num_tags}') - print(f'num_chars {num_chars}') - print(f'num_words {num_words}') - print(m_text) - print() - print('*******************') - exit - - -Mastodon.create_app( - 'streamreader', - api_base_url = 'https://mastodon.social' -) - -mastodon = Mastodon(api_base_url='https://mastodon.social') -mastodon.stream_public(Listener()) \ No newline at end of file diff --git a/mastodonlisten.py b/mastodonlisten.py new file mode 100644 index 0000000..c38ee89 --- /dev/null +++ b/mastodonlisten.py @@ -0,0 +1,33 @@ +# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023 + +import mastodon +from mastodon import Mastodon +from bs4 import BeautifulSoup + +from kafkaproducer import kafka_producer + +topic_name, producer = kafka_producer() + +#### Listener for Mastodon events + +class Listener(mastodon.StreamListener): + + def on_update(self, status): + if status.language in [ 'en', 'fr' ]: + m_text = BeautifulSoup(status.content, "html.parser").text + num_tags = len(status.tags) + num_chars = len(m_text) + num_words = len(m_text.split()) + print(m_text) + + value_dict = { "language": status.language, "favourites": status.favourites_count, "username": status.account.username, "bot": status.account.bot, "tags": num_tags, "characters": num_chars, "words": num_words} + producer.produce(topic = topic_name, value = value_dict) + producer.flush() + + +def main(): + mastodon = Mastodon(api_base_url='https://mastodon.social') + mastodon.stream_public(Listener()) + +if __name__ == "__main__": + main()