initial checkin

main
Simon Aubury 2023-01-28 21:28:14 +11:00
commit e5bb660f7e
8 zmienionych plików z 330 dodań i 0 usunięć

5
.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1,5 @@
env/*
.DS_Store
__pycache__
BAK

143
README.md 100644
Wyświetl plik

@ -0,0 +1,143 @@
# Setup virtual python environment
Optionally, you can use a [virtual python](https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/) environment to keep dependencies separate. The _venv_ module is the preferred way to create and manage virtual environments.
```console
python3 -m venv env
```
Before you can start installing or using packages in your virtual environment youll need to activate it.
```console
source env/bin/activate
```
# Kafka Connect
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/mastodon-sink-s3/config -d '@./config/mastodon-sink-s3.json'
# DuckDB
duckdb --init duckdb/init.sql
select * FROM read_parquet('s3://mastodon/topics/mastodon-topic*');
# OLD Notes
- https://martinheinz.dev/blog/86
- https://github.com/morsapaes/hex-data-council/tree/main/data-generator
- https://redpanda.com/blog/kafka-streaming-data-pipeline-from-postgres-to-duckdb
# Docker Notes
```
docker-compose up -d postgres datagen
```
Password `postgres`
```
psql -h localhost -U postgres -d postgres
select * from public.user limit 3;
```
```
docker-compose up -d redpanda redpanda-console connect
```
Redpanda Console at http://localhost:8080
```
docker exec -it connect /bin/bash
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/pg-src/config -d '@/connectors/pg-src.json'
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink/config -d '@/connectors/s3-sink.json'
curl -X PUT -H "Content-Type:application/json" localhost:8083/connectors/s3-sink-m/config -d '@/connectors/s3-sink-m.json'
```
```
docker-compose up -d minio mc
```
http://localhost:9000
```
Login with : `minio / minio123`
```
docker-compose up -d duckdb
```
```
docker-compose exec duckdb bash
duckdb --init duckdb/init.sql
SELECT count(value.after.id) as user_count FROM read_parquet('s3://user-payments/debezium.public.user-*');
```
## Kafka notes
python avro-producer.py -b "localhost:9092" -s "http://localhost:8081" -t aubury.mytopic
## LakeFS
docker run --pull always -p 8000:8000 \
-e LAKEFS_BLOCKSTORE_TYPE='s3' \
-e AWS_ACCESS_KEY_ID='YourAccessKeyValue' \
-e AWS_SECRET_ACCESS_KEY='YourSecretKeyValue' \
treeverse/lakefs run --local-settings
docker run --pull always -p 8000:8000 \
-e LAKEFS_BLOCKSTORE_TYPE='s3' \
-e LAKEFS_BLOCKSTORE_S3_FORCE_PATH_STYLE='true' \
-e LAKEFS_BLOCKSTORE_S3_ENDPOINT='http://minio:9000' \
-e LAKEFS_BLOCKSTORE_S3_DISCOVER_BUCKET_REGION='false' \
-e LAKEFS_BLOCKSTORE_S3_CREDENTIALS_ACCESS_KEY_ID='minio' \
-e LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY='minio123' \
treeverse/lakefs run --local-settings
set s3_endpoint='minio:9000';
set s3_access_key_id='minio';
set s3_secret_access_key='minio123';
set s3_use_ssl=false;
set s3_region='us-east-1';
set s3_url_style='path';
### Installing packages
Now that youre in your virtual environment you can install packages.
```console
python -m pip install --requirement requirements.txt
```
### JupyterLab
Once installed, launch JupyterLab with:
```console
jupyter-lab
```
### Cleanup of virtual environment
If you want to switch projects or otherwise leave your virtual environment, simply run:
```console
deactivate
```
If you want to re-enter the virtual environment just follow the same instructions above about activating a virtual environment. Theres no need to re-create the virtual environment.

Wyświetl plik

@ -0,0 +1,41 @@
{
"type": "record",
"namespace": "com.simonaubury",
"name": "MastodonDetails",
"fields": [
{
"name": "language",
"type": "string",
"default": ""
},
{
"name": "favourites",
"type": "int",
"default": 0
},
{
"name": "username",
"type": "string",
"default": ""
},
{
"name": "bot",
"type": "boolean"
},
{
"name": "tags",
"type": "int",
"default": 0
},
{
"name": "characters",
"type": "int",
"default": 0
},
{
"name": "words",
"type": "int",
"default": 0
}
]
}

Wyświetl plik

@ -0,0 +1,16 @@
{
"name": "mastodon-sink-s3",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "mastodon-topic",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": "1",
"s3.bucket.name": "mastodon",
"aws.access.key.id": "minio",
"aws.secret.access.key": "minio123",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"store.url": "http://localhost:9000"
}

33
docker-compose.yml 100644
Wyświetl plik

@ -0,0 +1,33 @@
version: '3.7'
services:
minio:
hostname: minio
image: 'minio/minio:latest'
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- minio:/data
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123
mc:
depends_on:
- minio
image: minio/mc
container_name: mc
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc rm -r --force minio/mastodon;
/usr/bin/mc mb minio/mastodon;
/usr/bin/mc policy set public minio/mastodon;
exit 0;
"
volumes:
minio:

9
duckdb/init.sql 100644
Wyświetl plik

@ -0,0 +1,9 @@
install 'httpfs';
load 'httpfs';
set s3_endpoint='localhost:9000';
set s3_access_key_id='minio';
set s3_secret_access_key='minio123';
set s3_use_ssl=false;
set s3_region='us-east-1';
set s3_url_style='path';

40
kafka-producer.py 100644
Wyświetl plik

@ -0,0 +1,40 @@
import json
from confluent_kafka import Producer, KafkaException
from confluent_kafka.avro import AvroProducer
from confluent_kafka import avro
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % msg.value().decode('utf-8'), str(err))
else:
print("Message produced: %s" % msg.value().decode('utf-8'))
def send_data():
producer_config = {
"bootstrap.servers": "localhost:9092",
"schema.registry.url": "http://localhost:8081"
}
value_schema = avro.load("avro/mastodon-topic-value.avsc")
try:
producer = AvroProducer(producer_config, default_value_schema=value_schema)
value_dict = { "language": "en", "favourites": 0, "username": "bob", "bot": False, "tags": 0, "characters": 50, "words": 12}
producer.produce(topic = "mastodon-topic", value = value_dict)
producer.flush()
except KafkaException as e:
print('Kafka failure ' + e)
def main():
send_data()
if __name__ == "__main__":
main()

43
mastodon.py 100644
Wyświetl plik

@ -0,0 +1,43 @@
# https://medium.com/@tspann/mastodon-streaming-to-pulsar-via-python-be7538112023
import mastodon
from mastodon import Mastodon
from bs4 import BeautifulSoup
#### Listener for Mastodon events
class Listener(mastodon.StreamListener):
def on_update(self, status):
if status.language in [ 'en', 'fr' ]:
# print('FOO ')
m_text = BeautifulSoup(status.content, "html.parser").text
print(f'status.language {status.language}', type(status.language))
print(f'status.favourites_count {status.favourites_count}', type(status.favourites_count))
print(f'status.account.username {status.account.username}', type(status.account.username))
print(f'status.account.display_name {status.account.display_name}', type(status.account.display_name))
print(f'status.account.bot {status.account.bot}', type(status.account.bot))
print(f'status.tags {status.tags}', type(status.tags))
num_tags = len(status.tags)
num_chars = len(m_text)
num_words = len(m_text.split())
print(f'num_tags {num_tags}')
print(f'num_chars {num_chars}')
print(f'num_words {num_words}')
print(m_text)
print()
print('*******************')
exit
Mastodon.create_app(
'streamreader',
api_base_url = 'https://mastodon.social'
)
mastodon = Mastodon(api_base_url='https://mastodon.social')
mastodon.stream_public(Listener())