Incremental setup

main
Simon Aubury 2023-02-02 09:17:10 +11:00
rodzic dcf3a10437
commit 91f54e564f
5 zmienionych plików z 134 dodań i 25 usunięć

Wyświetl plik

@ -12,7 +12,12 @@ source env/bin/activate
```
# Federated timeline
These are the most recent public posts from people on this and other servers of the decentralized network that this server knows about.
https://data-folks.masto.host/public
# Proudcer
python mastodonlisten.py --baseURL https://data-folks.masto.host/ --enableKafka
# Kafka Connect

Wyświetl plik

@ -3,6 +3,26 @@
"namespace": "com.simonaubury",
"name": "MastodonDetails",
"fields": [
{
"name": "m_id",
"type": "long",
"default": 0
},
{
"name": "app",
"type": "string",
"default": "unknown"
},
{
"name": "url",
"type": "string",
"default": "null"
},
{
"name": "base_url",
"type": "string",
"default": "null"
},
{
"name": "language",
"type": "string",
@ -36,6 +56,11 @@
"name": "words",
"type": "int",
"default": 0
},
{
"name": "mastodon_text",
"type": "string",
"default": "null"
}
]
}

Wyświetl plik

@ -6,27 +6,27 @@ from confluent_kafka import avro
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
print('Failed to deliver message: %s: %s' % msg.value().decode('utf-8'), str(err))
else:
print("Message produced: %s" % msg.value().decode('utf-8'))
print('Message produced: %s' % msg.value().decode('utf-8'))
def kafka_producer():
producer_config = {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
}
value_schema = avro.load("avro/mastodon-topic-value.avsc")
value_schema = avro.load('avro/mastodon-topic-value.avsc')
producer = AvroProducer(producer_config, default_value_schema=value_schema)
return "mastodon-topic", producer
return 'mastodon-topic', producer
# try:
# value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
# value_dict = { 'language': 'en', 'favourites': 0, 'username': 'bob', 'bot': False, 'tags': 0, 'characters': 50, 'words': 12}
# producer.produce(topic = "mastodon-topic", value = value_dict)
# producer.produce(topic = 'mastodon-topic', value = value_dict)
# producer.flush()
# except KafkaException as e:
@ -37,13 +37,13 @@ def kafka_producer():
def main():
topic_name, producer = kafka_producer()
value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
value_dict = { 'language': 'en', 'favourites': 0, 'username': 'bob', 'bot': False, 'tags': 0, 'characters': 50, 'words': 12}
producer.produce(topic = topic_name, value = value_dict)
value_dict = { "language": "fr", "favourites": 0, "username": "jane", "bot": False, "tags": 0, "characters": 500, "words": 120}
value_dict = { 'language': 'fr', 'favourites': 0, 'username': 'jane', 'bot': False, 'tags': 0, 'characters': 500, 'words': 120}
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
if __name__ == "__main__":
if __name__ == '__main__':
main()

Wyświetl plik

@ -1,11 +1,15 @@
# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023
import mastodon
from mastodon import Mastodon
from bs4 import BeautifulSoup
import argparse
from kafkaproducer import kafka_producer
# globals
base_url = ''
enable_kafka = False
# if enable_kafka:
topic_name, producer = kafka_producer()
#### Listener for Mastodon events
@ -13,21 +17,87 @@ topic_name, producer = kafka_producer()
class Listener(mastodon.StreamListener):
def on_update(self, status):
if status.language in [ 'en', 'fr' ]:
m_text = BeautifulSoup(status.content, "html.parser").text
num_tags = len(status.tags)
num_chars = len(m_text)
num_words = len(m_text.split())
print(m_text)
m_text = BeautifulSoup(status.content, 'html.parser').text
num_tags = len(status.tags)
num_chars = len(m_text)
num_words = len(m_text.split())
m_lang = status.language
if m_lang is None:
m_lang = 'unknown'
value_dict = { "language": status.language, "favourites": status.favourites_count, "username": status.account.username, "bot": status.account.bot, "tags": num_tags, "characters": num_chars, "words": num_words}
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
app=''
# attribute only available on local
if hasattr(status, 'application'):
app = status.application.get('name')
# print(f'APP {app}')
# print(status.url)
# print(m_text)
# print('')
value_dict = {
'm_id': status.id,
'app': app,
'url': status.url,
'base_url': base_url,
'language': m_lang,
'favourites': status.favourites_count,
'username': status.account.username,
'bot': status.account.bot,
'tags': num_tags,
'characters': num_chars,
'words': num_words,
'mastodon_text': m_text
}
if enable_kafka:
try:
producer.produce(topic = topic_name, value = value_dict)
producer.flush()
except Exception as exp:
print('******* ERROR')
print(value_dict)
# raise(exp)
def main():
mastodon = Mastodon(api_base_url='https://mastodon.social')
mastodon.stream_public(Listener())
global base_url
global enable_kafka
if __name__ == "__main__":
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'--enableKafka',
help='Whether to enable Kafka producer.',
action='store_true',
required=False,
default=False)
parser.add_argument(
'--public',
help='listen to public stream (instead of local).',
action='store_true',
required=False,
default=False)
parser.add_argument(
'--baseURL',
help='Server URL',
required=False,
default='https://mastodon.social')
args = parser.parse_args()
base_url=args.baseURL
enable_kafka=args.enableKafka
mastodon = Mastodon(api_base_url = base_url)
if args.public:
mastodon.stream_public(Listener())
else:
mastodon.stream_local(Listener())
if __name__ == '__main__':
main()

9
shell/go.sh 100755
Wyświetl plik

@ -0,0 +1,9 @@
#!/bin/bash
BASE=/Users/simonaubury/git/saubury/mastodon-stream/
PY=./env/bin/python
cd ${BASE}
while true; do echo Start; ${PY} mastodonlisten.py --enableKafka --public; sleep 30; done &