From e5bb660f7e7a49ea380e11f3596ab3670300259e Mon Sep 17 00:00:00 2001 From: Simon Aubury Date: Sat, 28 Jan 2023 21:28:14 +1100 Subject: [PATCH] initial checkin --- .gitignore | 5 ++ README.md | 143 +++++++++++++++++++++++++++++++++ avro/mastodon-topic-value.avsc | 41 ++++++++++ config/mastodon-sink-s3.json | 16 ++++ docker-compose.yml | 33 ++++++++ duckdb/init.sql | 9 +++ kafka-producer.py | 40 +++++++++ mastodon.py | 43 ++++++++++ 8 files changed, 330 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 avro/mastodon-topic-value.avsc create mode 100644 config/mastodon-sink-s3.json create mode 100644 docker-compose.yml create mode 100644 duckdb/init.sql create mode 100644 kafka-producer.py create mode 100644 mastodon.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bc15c37 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +env/* +.DS_Store +__pycache__ +BAK + diff --git a/README.md b/README.md new file mode 100644 index 0000000..3ea8102 --- /dev/null +++ b/README.md @@ -0,0 +1,143 @@ +# Setup virtual python environment +Optionally, you can use a [virtual python](https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/) environment to keep dependencies separate. The _venv_ module is the preferred way to create and manage virtual environments. + + ```console +python3 -m venv env +``` + +Before you can start installing or using packages in your virtual environment you’ll need to activate it. + +```console +source env/bin/activate +``` + + + + +# Kafka Connect + +curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/mastodon-sink-s3/config -d '@./config/mastodon-sink-s3.json' + + +# DuckDB + +duckdb --init duckdb/init.sql + +select * FROM read_parquet('s3://mastodon/topics/mastodon-topic*'); + + +# OLD Notes + +- https://martinheinz.dev/blog/86 +- https://github.com/morsapaes/hex-data-council/tree/main/data-generator +- https://redpanda.com/blog/kafka-streaming-data-pipeline-from-postgres-to-duckdb + + +# Docker Notes + +``` +docker-compose up -d postgres datagen +``` + +Password `postgres` + +``` +psql -h localhost -U postgres -d postgres +select * from public.user limit 3; +``` + +``` +docker-compose up -d redpanda redpanda-console connect +``` + +Redpanda Console at http://localhost:8080 + +``` +docker exec -it connect /bin/bash + +curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/pg-src/config -d '@/connectors/pg-src.json' + +curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink/config -d '@/connectors/s3-sink.json' + + +curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink-m/config -d '@/connectors/s3-sink-m.json' + +``` + + + +``` +docker-compose up -d minio mc +``` +http://localhost:9000 +``` + +Login with : `minio / minio123` + +``` +docker-compose up -d duckdb +``` + +``` +docker-compose exec duckdb bash +duckdb --init duckdb/init.sql + +SELECT count(value.after.id) as user_count FROM read_parquet('s3://user-payments/debezium.public.user-*'); + +``` + +## Kafka notes + +python avro-producer.py -b "localhost:9092" -s "http://localhost:8081" -t aubury.mytopic + + + +## LakeFS + +docker run --pull always -p 8000:8000 \ + -e LAKEFS_BLOCKSTORE_TYPE='s3' \ + -e AWS_ACCESS_KEY_ID='YourAccessKeyValue' \ + -e AWS_SECRET_ACCESS_KEY='YourSecretKeyValue' \ + treeverse/lakefs run --local-settings + +docker run --pull always -p 8000:8000 \ + -e LAKEFS_BLOCKSTORE_TYPE='s3' \ + -e LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE='true' \ + -e LAKEFS_BLOCKSTORE_S3_ENDPOINT='http://minio:9000' \ + -e LAKEFS_BLOCKSTORE_S3_DISCOVER_BUCKET_REGION='false' \ + -e LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID='minio' \ + -e LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY='minio123' \ + treeverse/lakefs run --local-settings + +set s3_endpoint='minio:9000'; +set s3_access_key_id='minio'; +set s3_secret_access_key='minio123'; +set s3_use_ssl=false; +set s3_region='us-east-1'; +set s3_url_style='path'; + + + +### Installing packages + +Now that you’re in your virtual environment you can install packages. + +```console +python -m pip install --requirement requirements.txt +``` + +### JupyterLab +Once installed, launch JupyterLab with: + +```console +jupyter-lab +``` + +### Cleanup of virtual environment +If you want to switch projects or otherwise leave your virtual environment, simply run: + +```console +deactivate +``` + +If you want to re-enter the virtual environment just follow the same instructions above about activating a virtual environment. There’s no need to re-create the virtual environment. diff --git a/avro/mastodon-topic-value.avsc b/avro/mastodon-topic-value.avsc new file mode 100644 index 0000000..9a4ce83 --- /dev/null +++ b/avro/mastodon-topic-value.avsc @@ -0,0 +1,41 @@ +{ + "type": "record", + "namespace": "com.simonaubury", + "name": "MastodonDetails", + "fields": [ + { + "name": "language", + "type": "string", + "default": "" + }, + { + "name": "favourites", + "type": "int", + "default": 0 + }, + { + "name": "username", + "type": "string", + "default": "" + }, + { + "name": "bot", + "type": "boolean" + }, + { + "name": "tags", + "type": "int", + "default": 0 + }, + { + "name": "characters", + "type": "int", + "default": 0 + }, + { + "name": "words", + "type": "int", + "default": 0 + } + ] +} diff --git a/config/mastodon-sink-s3.json b/config/mastodon-sink-s3.json new file mode 100644 index 0000000..62c57bf --- /dev/null +++ b/config/mastodon-sink-s3.json @@ -0,0 +1,16 @@ +{ + "name": "mastodon-sink-s3", + "connector.class": "io.confluent.connect.s3.S3SinkConnector", + "topics": "mastodon-topic", + "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat", + "flush.size": "1", + "s3.bucket.name": "mastodon", + "aws.access.key.id": "minio", + "aws.secret.access.key": "minio123", + "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "store.url": "http://localhost:9000" +} + + + + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..b3814c4 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,33 @@ +version: '3.7' +services: + minio: + hostname: minio + image: 'minio/minio:latest' + container_name: minio + ports: + - "9001:9001" + - "9000:9000" + command: [ "server", "/data", "--console-address", ":9001" ] + volumes: + - minio:/data + environment: + MINIO_ROOT_USER: minio + MINIO_ROOT_PASSWORD: minio123 + MINIO_ACCESS_KEY: minio + MINIO_SECRET_KEY: minio123 + mc: + depends_on: + - minio + image: minio/mc + container_name: mc + entrypoint: > + /bin/sh -c " + until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; + /usr/bin/mc rm -r --force minio/mastodon; + /usr/bin/mc mb minio/mastodon; + /usr/bin/mc policy set public minio/mastodon; + exit 0; + " + +volumes: + minio: \ No newline at end of file diff --git a/duckdb/init.sql b/duckdb/init.sql new file mode 100644 index 0000000..1d4867a --- /dev/null +++ b/duckdb/init.sql @@ -0,0 +1,9 @@ +install 'httpfs'; +load 'httpfs'; + +set s3_endpoint='localhost:9000'; +set s3_access_key_id='minio'; +set s3_secret_access_key='minio123'; +set s3_use_ssl=false; +set s3_region='us-east-1'; +set s3_url_style='path'; diff --git a/kafka-producer.py b/kafka-producer.py new file mode 100644 index 0000000..020520d --- /dev/null +++ b/kafka-producer.py @@ -0,0 +1,40 @@ +import json +from confluent_kafka import Producer, KafkaException +from confluent_kafka.avro import AvroProducer +from confluent_kafka import avro + + +def acked(err, msg): + if err is not None: + print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err)) + else: + print("Message produced: %s" % msg.value().decode('utf-8')) + + +def send_data(): + producer_config = { + "bootstrap.servers": "localhost:9092", + "schema.registry.url": "http://localhost:8081" + } + + value_schema = avro.load("avro/mastodon-topic-value.avsc") + + try: + producer = AvroProducer(producer_config, default_value_schema=value_schema) + + value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12} + + producer.produce(topic = "mastodon-topic", value = value_dict) + producer.flush() + + except KafkaException as e: + print('Kafka failure ' + e) + + + +def main(): + send_data() + + +if __name__ == "__main__": + main() diff --git a/mastodon.py b/mastodon.py new file mode 100644 index 0000000..2c30688 --- /dev/null +++ b/mastodon.py @@ -0,0 +1,43 @@ +# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023 + +import mastodon +from mastodon import Mastodon +from bs4 import BeautifulSoup + +#### Listener for Mastodon events + +class Listener(mastodon.StreamListener): + + def on_update(self, status): + + if status.language in [ 'en', 'fr' ]: + # print('FOO ') + + m_text = BeautifulSoup(status.content, "html.parser").text + + print(f'status.language {status.language}', type(status.language)) + print(f'status.favourites_count {status.favourites_count}', type(status.favourites_count)) + print(f'status.account.username {status.account.username}', type(status.account.username)) + print(f'status.account.display_name {status.account.display_name}', type(status.account.display_name)) + print(f'status.account.bot {status.account.bot}', type(status.account.bot)) + print(f'status.tags {status.tags}', type(status.tags)) + num_tags = len(status.tags) + num_chars = len(m_text) + num_words = len(m_text.split()) + + print(f'num_tags {num_tags}') + print(f'num_chars {num_chars}') + print(f'num_words {num_words}') + print(m_text) + print() + print('*******************') + exit + + +Mastodon.create_app( + 'streamreader', + api_base_url = 'https://mastodon.social' +) + +mastodon = Mastodon(api_base_url='https://mastodon.social') +mastodon.stream_public(Listener()) \ No newline at end of file