diff --git a/.gitignore b/.gitignore index 435a611..bc12551 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ env/* __pycache__ BAK data/* +config/mastodon-sink-s3-aws.json diff --git a/README.md b/README.md index fc15f01..ad66b1a 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,8 @@ confluent-hub install confluentinc/kafka-connect-s3:10.3.0 curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/mastodon-sink-s3/config -d '@./config/mastodon-sink-s3.json' +curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/mastodon-sink-s3-aws/config -d '@./config/mastodon-sink-s3-aws.json' + # DuckDB @@ -34,6 +36,8 @@ duckdb --init duckdb/init.sql select * FROM read_parquet('s3://mastodon/topics/mastodon-topic*'); +select 'epoch'::TIMESTAMP + INTERVAL 1675325510 seconds; + # OLD Notes diff --git a/avro/mastodon-topic-value.avsc b/avro/mastodon-topic-value.avsc index 308a834..60ad009 100644 --- a/avro/mastodon-topic-value.avsc +++ b/avro/mastodon-topic-value.avsc @@ -14,6 +14,11 @@ "logicalType": "date", "default" : "null" }, + { + "name": "created_at_str", + "type": "string", + "default": "unknown" + }, { "name": "app", "type": "string", diff --git a/config/mastodon-sink-s3.json b/config/mastodon-sink-s3.json index 62c57bf..41ea297 100644 --- a/config/mastodon-sink-s3.json +++ b/config/mastodon-sink-s3.json @@ -3,7 +3,7 @@ "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "mastodon-topic", "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", - "flush.size": "1", + "flush.size": "1000", "s3.bucket.name": "mastodon", "aws.access.key.id": "minio", "aws.secret.access.key": "minio123", diff --git a/duckdb/go.sql b/duckdb/go.sql new file mode 100644 index 0000000..99f130d --- /dev/null +++ b/duckdb/go.sql @@ -0,0 +1,33 @@ +-- .read duckdb/go.sql + +/* +drop table if exists xx; + +create table xx as +select m_id +, created_at_str +, created_at, ('EPOCH'::TIMESTAMP + INTERVAL (created_at::INT) seconds)::TIMESTAMPTZ as created_tz +, app +, url +, regexp_replace(regexp_replace(url, '^http[s]://', ''), '/.*$', '') as new_url +, base_url +, language +, favourites +, username +, bot +, tags +, characters +, mastodon_text +FROM read_parquet('s3://mastodon/topics/mastodon-topic/partition=0/*'); +*/ + +select date_part('day', created_tz) as created_day +, date_part('hour', created_tz) as created_hour +, count(*) +from xx +group by 1,2 +order by 1,2 +; + +-- select username, bot, count(*) from xx group by 1,2 order by 3 desc; + diff --git a/mastodonlisten.py b/mastodonlisten.py index 3a437ef..9081361 100644 --- a/mastodonlisten.py +++ b/mastodonlisten.py @@ -13,11 +13,7 @@ base_url = '' enable_kafka = False quiet = False watchdog = False - -if enable_kafka: - topic_name, producer = kafka_producer() -else: - topic_name, producer = '' , '' +topic_name, producer = '' , '' # Listener for Mastodon events @@ -41,10 +37,13 @@ class Listener(mastodon.StreamListener): # attribute only available on local if hasattr(status, 'application'): app = status.application.get('name') + + now_dt=datetime.datetime.now() value_dict = { 'm_id': status.id, - 'created_at': int(datetime.datetime.now().strftime('%s')), + 'created_at': int(now_dt.strftime('%s')), + 'created_at_str': now_dt.strftime('%Y %m %d %H:%M:%S'), 'app': app, 'url': status.url, 'base_url': base_url, @@ -98,6 +97,7 @@ def main(): global enable_kafka global quiet global watchdog + global topic_name, producer parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) @@ -137,8 +137,15 @@ def main(): args = parser.parse_args() + base_url=args.baseURL enable_kafka=args.enableKafka + + if enable_kafka: + topic_name, producer = kafka_producer() + + + mastodon = Mastodon(api_base_url = base_url) if args.watchdog: diff --git a/shell/go.sh b/shell/go.sh index cc6ed8c..913d5ce 100755 --- a/shell/go.sh +++ b/shell/go.sh @@ -8,8 +8,8 @@ cd ${BASE} # while true; do echo Start; ${PY} mastodonlisten.py --enableKafka --public; sleep 30; done & -while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.social --enableKafka --public; sleep 30; done -while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://hachyderm.io --enableKafka ; sleep 30; done -while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.au/ --enableKafka ; sleep 30; done -while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://data-folks.masto.host --enableKafka ; sleep 30; done +while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.social --enableKafka --watchdog 30 --public; sleep 30; done +while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://hachyderm.io --enableKafka --watchdog 30 ; sleep 30; done +while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://mastodon.au/ --enableKafka --watchdog 30 ; sleep 30; done +while true; do echo Start; ${PY} mastodonlisten.py --baseURL https://data-folks.masto.host --enableKafka --watchdog 30 ; sleep 30; done diff --git a/xx.parquet b/xx.parquet new file mode 100644 index 0000000..efebce6 Binary files /dev/null and b/xx.parquet differ