diff --git a/.gitignore b/.gitignore index af20fb4..96e1cc1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .idea venv -mzdata \ No newline at end of file +mzdata +/wikidata_analysis/.user.yml diff --git a/README.md b/README.md index f134e5b..aadc79e 100644 --- a/README.md +++ b/README.md @@ -1,84 +1,44 @@ -# Producer +# Kafka and data producer -Create Redpanda cluster and topic +Create Redpanda cluster and UI ```shell -rpk container start -n 1 -rpk topic create recentchanges --brokers 127.0.0.1:63248 +docker-compose up -d redpanda provectus-ui-local ``` -Start producer - +Create topic ```shell -python data-generator/wikidata_events.py +docker-compose up -d create_topic ``` -Validate +Start producer ```shell -rpk topic consume recentchanges --brokers 127.0.0.1:63248 +docker-compose up -d python_producer ``` +Validate -> http://localhost:9029/ui/clusters/local/topics/recentchange/messages + + # Materialize Start the database ```shell -materialized --workers 1 +docker-compose up -d materialized ``` -Create Redpanda source +# DBT -```sql -CREATE SOURCE recentchange - FROM KAFKA BROKER 'localhost:63248' TOPIC 'recentchange' - KEY FORMAT BYTES - VALUE FORMAT BYTES -ENVELOPE NONE; +```shell +cd wikidata_analysis/ +dbt run --profiles-dir . +cd .. ``` -```sql -CREATE OR REPLACE MATERIALIZED VIEW test3 AS -WITH jsonified_source AS ( - SELECT - (data ->> 'title') :: string as title, - (data ->> '$schema') :: string as schema, - (data ->> 'type') :: string as type, - (data ->> 'bot') :: boolean as bot, - (data ->> 'comment') :: string as comment, - (data ->> 'id') :: integer as id, - (data ->> 'length') :: jsonb as length, - (data ->> 'log_action') :: string as log_action, - (data ->> 'log_action_comment') :: string as log_action_comment, - (data ->> 'log_id') :: string as log_id, - (data ->> 'log_params') :: string as log_params, - (data ->> 'log_type') :: string as log_type, - (data ->> 'meta') :: jsonb as meta, - (data ->> 'minor') :: boolean as minor, - (data ->> 'namespace') :: integer as namespace, - (data ->> 'parsedcomment') :: string as parsedcomment, - (data ->> 'patrolled') :: boolean as patrolled, - (data ->> 'revision') :: jsonb as revision, - (data ->> 'server_name') :: string as server_name, - (data ->> 'server_script_path') :: string as server_script_path, - (data ->> 'server_url') :: string as server_url, - (data ->> 'user') :: string as server_version, - (data ->> 'timestamp') :: numeric as timestamp, - (data ->> 'wiki') :: string as wiki - FROM - (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM public.recentchange) -) - SELECT - * - FROM - jsonified_source; -``` -# Aggregate -```sql -CREATE OR REPLACE MATERIALIZED VIEW changes_by_server_5s AS -select server_name, count(id), to_timestamp(timestamp) ts from test3 -WHERE mz_logical_timestamp() >= timestamp * 1000 - AND mz_logical_timestamp() < timestamp * 1000 + 5000 -group by server_name, timestamp order by count desc; -``` \ No newline at end of file +# website + +```shell +docker-compose up -d backend +``` diff --git a/api/Dockerfile b/api/Dockerfile index 8c7b487..f5a6e27 100644 --- a/api/Dockerfile +++ b/api/Dockerfile @@ -1,8 +1,16 @@ -FROM python:3.10.2 +FROM python:3.10-slim -COPY requirements.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt +ENV PYTHONUNBUFFERED=TRUE +ENV PYTHONDONTWRITEBYTECODE=TRUE +ENV PIP_NO_CACHE_DIR=TRUE +ENV PIP_DISABLE_PIP_VERSION_CHECK=TRUE -COPY app /app +RUN mkdir -p /opt/program +WORKDIR /opt/program + +COPY requirements.txt /opt/program/ +RUN python3 -m pip install -r /opt/program/requirements.txt + +COPY app /opt/program/app CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "9999"] \ No newline at end of file diff --git a/api/app/main.py b/api/app/main.py index 827a29a..70d9ec0 100644 --- a/api/app/main.py +++ b/api/app/main.py @@ -9,11 +9,14 @@ from fastapi import WebSocket, WebSocketDisconnect from sqlalchemy import create_engine +MATER = os.getenv("MATER", "localhost:6875") + + MESSAGE_STREAM_DELAY = 2 # seconds MESSAGE_STREAM_RETRY_TIMEOUT = 15000 # miliseconds # Materialize connection DB_URL = os.getenv( - "DATABASE_URL", "postgresql://materialize:materialize@localhost:6875/materialize" + "DATABASE_URL", f"postgresql://materialize:materialize@{MATER}/materialize" ) database = databases.Database(DB_URL) diff --git a/api/requirements.txt b/api/requirements.txt index d1d2843..b48c4b6 100644 --- a/api/requirements.txt +++ b/api/requirements.txt @@ -4,7 +4,6 @@ uvicorn[standard] SQLAlchemy databases asyncpg -psycopg2 psycopg2-binary sse-starlette asyncio diff --git a/data-generator/Dockerfile b/data-generator/Dockerfile index 5ce39d8..040bea8 100644 --- a/data-generator/Dockerfile +++ b/data-generator/Dockerfile @@ -1,10 +1,14 @@ -FROM python:3.9-slim +FROM python:3.10-slim -COPY requirements.txt . +ENV PYTHONUNBUFFERED=TRUE +ENV PYTHONDONTWRITEBYTECODE=TRUE +ENV PIP_NO_CACHE_DIR=TRUE +ENV PIP_DISABLE_PIP_VERSION_CHECK=TRUE -RUN set -ex; \ - pip install --no-cache-dir -r requirements.txt +RUN mkdir -p /opt/program +WORKDIR /opt/program -ADD wikidata_events.py . +COPY requirements.txt /opt/program/ +RUN python3 -m pip install -r /opt/program/requirements.txt -CMD ["python", "-u", "./wikidata_events.py"] \ No newline at end of file +COPY wikidata_events.py /opt/program/ diff --git a/data-generator/wikidata_events.py b/data-generator/wikidata_events.py index 5a4ebe9..c2f060d 100644 --- a/data-generator/wikidata_events.py +++ b/data-generator/wikidata_events.py @@ -1,8 +1,11 @@ import json +import os from kafka import KafkaProducer from sseclient import SSEClient as EventSource +kafka_broker = os.getenv("BROKER", "localhost:9092") + def produce_events_from_url(url: str, topic: str) -> None: for event in EventSource(url): @@ -13,13 +16,13 @@ def produce_events_from_url(url: str, topic: str) -> None: pass else: key = parsed_event["server_name"] - # Partiton by server_name + # Partition by server_name producer.send(topic, value=json.dumps(parsed_event).encode("utf-8"), key=key.encode("utf-8")) if __name__ == "__main__": producer = KafkaProducer( - bootstrap_servers="localhost:63248", client_id="wikidata-producer" + bootstrap_servers=kafka_broker, client_id="wikidata-producer" ) produce_events_from_url( url="https://stream.wikimedia.org/v2/stream/recentchange", topic="recentchange" diff --git a/docker-compose.yml b/docker-compose.yml index 254fadb..c752f19 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,33 +1,74 @@ services: redpanda: - image: docker.vectorized.io/vectorized/redpanda:v21.11.3 - container_name: redpanda + image: docker.vectorized.io/vectorized/redpanda:v22.1.4 command: - - redpanda start - - --overprovisioned - - --smp 1 - - --memory 1G - - --reserve-memory 0M - - --node-id 0 - - --check=false - - --kafka-addr 0.0.0.0:9092 - - --advertise-kafka-addr redpanda:9092 - - --pandaproxy-addr 0.0.0.0:8082 - - --advertise-pandaproxy-addr redpanda:8082 - - --set redpanda.enable_transactions=true - - --set redpanda.enable_idempotence=true - - --set redpanda.auto_create_topics_enabled=true + - redpanda + - start + - --smp + - '1' + - --reserve-memory + - 0M + - --overprovisioned + - --node-id + - '0' + - --kafka-addr + - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 + - --advertise-kafka-addr + - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092 + - --set + - redpanda.cluster_id=redpanda ports: - "9092:9092" - - "8081:8081" - - "8082:8082" - healthcheck: { test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s } + - "9081:8081" + - "9082:8082" + expose: + - 29092 # redpanda + - 8081 # schema-registry + - 8082 # restproxy + materialized: - image: materialize/materialized:v0.26.3 + image: materialize/materialized:v0.26.4 container_name: materialized command: -w1 ports: - "6875:6875" - mzcli: - image: materialize/cli - container_name: mzcli \ No newline at end of file + + python_producer: + image: kafkapythonwiki:latest + build: + context: ./data-generator/ + dockerfile: Dockerfile + init: true + environment: + BROKER: "redpanda:29092" + volumes: + # Source code + - ./data-generator/wikidata_events.py:/wikidata_events.py + command: [ "python3", "wikidata_events.py" ] + + create_topic: + image: docker.vectorized.io/vectorized/redpanda:v22.1.4 + command: + - topic create recentchange --brokers redpanda:29092 + + provectus-ui-local: + image: provectuslabs/kafka-ui:46bcbb3436caf7357ff11eebbd1b49fe4f2cd167 + ports: + - "9029:8080" + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "redpanda:29092" + + backend: + image: backendpython:latest + build: + context: ./api/ + dockerfile: Dockerfile + init: true + environment: + MATER: "materialized:6875" + ports: + - "9999:9999" + volumes: + # Source code + - ./api/app/:/opt/program/app/ diff --git a/wikidata_analysis/README.md b/wikidata_analysis/README.md deleted file mode 100644 index 7874ac8..0000000 --- a/wikidata_analysis/README.md +++ /dev/null @@ -1,15 +0,0 @@ -Welcome to your new dbt project! - -### Using the starter project - -Try running the following commands: -- dbt run -- dbt test - - -### Resources: -- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) -- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers -- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support -- Find [dbt events](https://events.getdbt.com) near you -- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices diff --git a/wikidata_analysis/models/sources/src_wikidata_events.sql b/wikidata_analysis/models/sources/src_wikidata_events.sql index 0136f08..651a6c2 100644 --- a/wikidata_analysis/models/sources/src_wikidata_events.sql +++ b/wikidata_analysis/models/sources/src_wikidata_events.sql @@ -3,7 +3,7 @@ {% endset %} CREATE SOURCE {{ source_name }} - FROM KAFKA BROKER 'localhost:63248' TOPIC 'recentchange' + FROM KAFKA BROKER 'redpanda:29092' TOPIC 'recentchange' KEY FORMAT BYTES VALUE FORMAT BYTES ENVELOPE NONE diff --git a/wikidata_analysis/profiles.yml b/wikidata_analysis/profiles.yml new file mode 100644 index 0000000..56820d0 --- /dev/null +++ b/wikidata_analysis/profiles.yml @@ -0,0 +1,12 @@ +wikidata_analysis: + target: dev + outputs: + dev: + type: materialize + threads: 1 + host: localhost + port: 6875 + user: materialize + pass: password + dbname: materialize + schema: public \ No newline at end of file