Connect listner to producer

main
Simon Aubury 2023-01-28 22:06:42 +11:00
rodzic e5bb660f7e
commit dcf3a10437
5 zmienionych plików z 83 dodań i 84 usunięć

Wyświetl plik

@ -6,7 +6,7 @@
{
"name": "language",
"type": "string",
"default": ""
"default": "null"
},
{
"name": "favourites",

Wyświetl plik

@ -1,40 +0,0 @@
import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
else:
print("Message produced: %s" % msg.value().decode('utf-8'))
def send_data():
producer_config = {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
}
value_schema = avro.load("avro/mastodon-topic-value.avsc")
try:
producer = AvroProducer(producer_config, default_value_schema=value_schema)
value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
producer.produce(topic = "mastodon-topic", value = value_dict)
producer.flush()
except KafkaException as e:
print('Kafka failure ' + e)
def main():
send_data()
if __name__ == "__main__":
main()

49
kafkaproducer.py 100644
Wyświetl plik

@ -0,0 +1,49 @@
import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
else:
print("Message produced: %s" % msg.value().decode('utf-8'))
def kafka_producer():
producer_config = {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
}
value_schema = avro.load("avro/mastodon-topic-value.avsc")
producer = AvroProducer(producer_config, default_value_schema=value_schema)
return "mastodon-topic", producer
# try:
# value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
# producer.produce(topic = "mastodon-topic", value = value_dict)
# producer.flush()
# except KafkaException as e:
# print('Kafka failure ' + e)
def main():
topic_name, producer = kafka_producer()
value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
producer.produce(topic = topic_name, value = value_dict)
value_dict = { "language": "fr", "favourites": 0, "username": "jane", "bot": False, "tags": 0, "characters": 500, "words": 120}
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
if __name__ == "__main__":
main()

Wyświetl plik

@ -1,43 +0,0 @@
# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023
import mastodon
from mastodon import Mastodon
from bs4 import BeautifulSoup
#### Listener for Mastodon events
class Listener(mastodon.StreamListener):
def on_update(self, status):
if status.language in [ 'en', 'fr' ]:
# print('FOO ')
m_text = BeautifulSoup(status.content, "html.parser").text
print(f'status.language {status.language}', type(status.language))
print(f'status.favourites_count {status.favourites_count}', type(status.favourites_count))
print(f'status.account.username {status.account.username}', type(status.account.username))
print(f'status.account.display_name {status.account.display_name}', type(status.account.display_name))
print(f'status.account.bot {status.account.bot}', type(status.account.bot))
print(f'status.tags {status.tags}', type(status.tags))
num_tags = len(status.tags)
num_chars = len(m_text)
num_words = len(m_text.split())
print(f'num_tags {num_tags}')
print(f'num_chars {num_chars}')
print(f'num_words {num_words}')
print(m_text)
print()
print('*******************')
exit
Mastodon.create_app(
'streamreader',
api_base_url = 'https://mastodon.social'
)
mastodon = Mastodon(api_base_url='https://mastodon.social')
mastodon.stream_public(Listener())

33
mastodonlisten.py 100644
Wyświetl plik

@ -0,0 +1,33 @@
# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023
import mastodon
from mastodon import Mastodon
from bs4 import BeautifulSoup
from kafkaproducer import kafka_producer
topic_name, producer = kafka_producer()
#### Listener for Mastodon events
class Listener(mastodon.StreamListener):
def on_update(self, status):
if status.language in [ 'en', 'fr' ]:
m_text = BeautifulSoup(status.content, "html.parser").text
num_tags = len(status.tags)
num_chars = len(m_text)
num_words = len(m_text.split())
print(m_text)
value_dict = { "language": status.language, "favourites": status.favourites_count, "username": status.account.username, "bot": status.account.bot, "tags": num_tags, "characters": num_chars, "words": num_words}
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
def main():
mastodon = Mastodon(api_base_url='https://mastodon.social')
mastodon.stream_public(Listener())
if __name__ == "__main__":
main()