Skip to content

Commit

Permalink
Merge pull request #5 from decodingml/module-2
Browse files Browse the repository at this point in the history
Module 2
  • Loading branch information
alexandruvesa authored Apr 5, 2024
2 parents bfca121 + 7d30fd1 commit dd9deaf
Show file tree
Hide file tree
Showing 15 changed files with 813 additions and 0 deletions.
Binary file added course/.DS_Store
Binary file not shown.
24 changes: 24 additions & 0 deletions course/module-2/.docker/Dockerfile.cdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Use an official Python runtime as a parent image
FROM python:3.11-slim

# Set the working directory in the container
WORKDIR /usr/src/app

# Upgrade pip and install Poetry
RUN python3 -m pip install --upgrade pip && pip3 install poetry

# Copy only the pyproject.toml and poetry.lock (if exists) to use Docker cache
COPY pyproject.toml poetry.lock* /usr/src/app/

# Install project dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi

# Copy the rest of the application
COPY . /usr/src/app

# Make port 8000 available to the world outside this container
EXPOSE 8000

# Run cdc.py when the container launches
CMD ["python", "cdc.py"]
Empty file added course/module-2/.env.example
Empty file.
161 changes: 161 additions & 0 deletions course/module-2/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

data/
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
11 changes: 11 additions & 0 deletions course/module-2/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
help:
@grep -E '^[a-zA-Z0-9 -]+:.*#' Makefile | sort | while read -r l; do printf "\033[1;32m$$(echo $$l | cut -f 1 -d':')\033[00m:$$(echo $$l | cut -f 2- -d'#')\n"; done

local-start: # Buil and start mongodb and mq.
docker-compose -f docker-compose.yml up --build -d

local-start-cdc: # Start CDC system
python cdc.py

local-test-cdc: #Test CDC system by inserting data to mongodb
python test_cdc.py
42 changes: 42 additions & 0 deletions course/module-2/cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import json
import logging

from bson import json_util

from data_flow.mq import publish_to_rabbitmq
from db.mongo import MongoDatabaseConnector

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


def stream_process():
try:
# Setup MongoDB connection
client = MongoDatabaseConnector()
db = client["scrabble"]
logging.info("Connected to MongoDB.")

# Watch changes in a specific collection
changes = db.watch([{"$match": {"operationType": {"$in": ["insert"]}}}])
for change in changes:
data_type = change["ns"]["coll"]
entry_id = str(change["fullDocument"]["_id"]) # Convert ObjectId to string
change["fullDocument"].pop("_id")
change["fullDocument"]["type"] = data_type
change["fullDocument"]["entry_id"] = entry_id

# Use json_util to serialize the document
data = json.dumps(change["fullDocument"], default=json_util.default)
logging.info(f"Change detected and serialized: {data}")

# Send data to rabbitmq
publish_to_rabbitmq(queue_name="test_queue", data=data)
logging.info("Data published to RabbitMQ.")

except Exception as e:
logging.error(f"An error occurred: {e}")


if __name__ == "__main__":
stream_process()
103 changes: 103 additions & 0 deletions course/module-2/data_flow/mq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import pika

from settings import settings


class RabbitMQConnection:
"""Singleton class to manage RabbitMQ connection."""

_instance = None

def __new__(
cls, host: str = None, port: int = None, username: str = None, password: str = None, virtual_host: str = "/"
):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(
self,
host: str = None,
port: int = None,
username: str = None,
password: str = None,
virtual_host: str = "/",
fail_silently: bool = False,
**kwargs,
):
self.host = host or settings.RABBITMQ_HOST
self.port = port or settings.RABBITMQ_PORT
self.username = username or settings.RABBITMQ_DEFAULT_USERNAME
self.password = password or settings.RABBITMQ_DEFAULT_PASSWORD
self.virtual_host = virtual_host
self.fail_silently = fail_silently
self._connection = None

def __enter__(self):
self.connect()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def connect(self):
try:
credentials = pika.PlainCredentials(self.username, self.password)
self._connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials
)
)
except pika.exceptions.AMQPConnectionError as e:
print("Failed to connect to RabbitMQ:", e)
if not self.fail_silently:
raise e

def is_connected(self) -> bool:
return self._connection is not None and self._connection.is_open

def get_channel(self):
if self.is_connected():
return self._connection.channel()

def close(self):
if self.is_connected():
self._connection.close()
self._connection = None
print("Closed RabbitMQ connection")


def publish_to_rabbitmq(queue_name: str, data: str):
"""Publish data to a RabbitMQ queue."""
try:
# Create an instance of RabbitMQConnection
rabbitmq_conn = RabbitMQConnection()

# Establish connection
with rabbitmq_conn:
channel = rabbitmq_conn.get_channel()

# Ensure the queue exists
channel.queue_declare(queue=queue_name, durable=True)

# Delivery confirmation
channel.confirm_delivery()

# Send data to the queue
channel.basic_publish(
exchange="",
routing_key=queue_name,
body=data,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
)
print("Sent data to RabbitMQ:", data)
except pika.exceptions.UnroutableError:
print("Message could not be routed")
except Exception as e:
print(f"Error publishing to RabbitMQ: {e}")


if __name__ == "__main__":
publish_to_rabbitmq("test_queue", "Hello, World!")
33 changes: 33 additions & 0 deletions course/module-2/db/mongo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure

from settings import settings


class MongoDatabaseConnector:
"""Singleton class to connect to MongoDB database."""

_instance: MongoClient = None

def __new__(cls, *args, **kwargs):
if cls._instance is None:
try:
cls._instance = MongoClient(settings.MONGO_DATABASE_HOST)
# cls._instance = MongoClient("mongodb://localhost:30001/?replicaSet=my-replica-set")
except ConnectionFailure as e:
print(f"Couldn't connect to the database: {str(e)}")
raise

print(f"Connection to database with uri: {settings.MONGO_DATABASE_HOST} successful")
return cls._instance

def get_database(self):
return self._instance[settings.MONGO_DATABASE_NAME]

def close(self):
if self._instance:
self._instance.close()
print("Connected to database has been closed.")


connection = MongoDatabaseConnector()
Loading

0 comments on commit dd9deaf

Please sign in to comment.