diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..f6d7e39 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,10 @@ +# .coveragerc +[run] +omit = + tests/* + */__init__.py + setup.py + */main.py + +[report] +show_missing=true \ No newline at end of file diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml new file mode 100644 index 0000000..fa16390 --- /dev/null +++ b/.github/workflows/pytest.yml @@ -0,0 +1,47 @@ +name: Run Pytest and Coverage + +on: + push: + branches: + - main + pull_request: + types: [assigned, opened, synchronize, reopened] + +jobs: + pytest: + runs-on: ubuntu-latest + + steps: + # Checkout the code from the repository + - name: Checkout code + uses: actions/checkout@v2 + + # Setup Python environment + - name: Set up Python 3.8 + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + # Install Python dependencies + - name: Install dependencies + run: pip install -r requirements.txt + + # Run Pytest with coverage + - name: Run Tests + run: | + python -m pytest tests/ --cov=. --junit-xml=test-reports/report.xml --cov-report=term-missing --cov-fail-under=70 | tee pytest-coverage.txt + echo "STATUS=$(grep 'Required test' pytest-coverage.txt | awk '{ print $1 }')" >> $GITHUB_ENV + echo "FAILED=$(awk -F'=' '/failures=/ {gsub(/"/, "", $2); print $2}' test-reports/report.xml)" >> $GITHUB_ENV + + # Post Pytest coverage as a comment on PR + - name: Pytest coverage comment + uses: MishaKav/pytest-coverage-comment@main + with: + create-new-comment: true + pytest-coverage-path: ./pytest-coverage.txt + junitxml-path: ./test-reports/report.xml + + # Evaluate test and coverage results + - name: Evaluate Coverage + if: env.STATUS == 'FAIL' || env.FAILED > 0 + run: exit 1 diff --git a/.gitignore b/.gitignore index 8c67229..364f8a8 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,5 @@ dev.sql build/ dist/ *.egg-info/ + +.coverage diff --git a/docker-compose.yaml b/docker-compose.yaml index 974ba2c..8a8ff0d 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -31,8 +31,8 @@ services: PGDATABASE: dummy PGUSER: postgres PGPASSWORD: postgres - PGREPLICATIONSLOT: pgevents_slot - PGOUTPUTPLUGIN: pgoutput + PGREPLICATIONSLOT: pgevents + PGOUTPUTPLUGIN: wal2json PGTABLES: public.users PGPUBLICATION: events RABBITMQ_URL: amqp://admin:password@rabbitmq:5672/?heartbeat=0 @@ -41,6 +41,7 @@ services: command: ["bash", "-c", "producer"] volumes: - ./producer:/pgevents/producer + - ./test:/pgevents/test depends_on: - database - rabbitmq diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..030018d --- /dev/null +++ b/pytest.ini @@ -0,0 +1,4 @@ +# pytest.ini +[pytest] +addopts = --cov=. --cov-config=.coveragerc +testpaths = tests/ \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d1e15d9..c450072 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,6 +14,7 @@ py==1.11.0 pylint==2.5.3 pyparsing==3.0.7 pytest==7.0.1 +pytest-cov==4.1.0 toml==0.10.2 tomli==2.0.1 wrapt==1.12.1 diff --git a/tests/conftest.py b/tests/conftest.py index 16cc65e..92d8c82 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,35 +1,192 @@ import os +from unittest import mock import psycopg2 import pytest +from common.event import base_event from common.qconnector import RabbitMQConnector from common import log +from producer.event_producer import EventProducer logger = log.get_logger(__name__) -@pytest.fixture(scope='function') -def db_conn(): - db_connection = psycopg2.connect( - user=os.environ['PGUSER'], - password=os.environ['PGPASSWORD'], - host=os.environ['PGHOST'], - port=os.environ['PGPORT'], - dbname=os.environ['PGDATABASE'] - ) +@pytest.fixture +def producer_init_params(): + return { + 'qconnector_cls': RabbitMQConnector, + 'event_cls': base_event.BaseEvent, + 'pg_host': 'localhost', + 'pg_port': 5432, + 'pg_database': 'test', + 'pg_user': 'test', + 'pg_password': 'test', + 'pg_replication_slot': 'test', + 'pg_output_plugin': 'pgoutput', + 'pg_tables': 'public.users', + 'pg_publication_name': 'test', + 'rabbitmq_url': 'amqp://admin:password@rabbitmq:5672/?heartbeat=0', + 'rabbitmq_exchange': 'test' + } - yield db_connection - db_connection.close() +@pytest.fixture +def mock_producer(producer_init_params): + return EventProducer(**producer_init_params) -@pytest.fixture(scope='session') -def rmq_conn(): - rmq_connector = RabbitMQConnector( - rabbitmq_url=os.environ['RABBITMQ_URL'], - rabbitmq_exchange=os.environ['RABBITMQ_EXCHANGE'], - queue_name='PRODUCER_TEST_QUEUE', - binding_keys='#' - ) - rmq_connector.connect() - return rmq_connector +@pytest.fixture +def mock_pika_connect(): + with mock.patch('pika.BlockingConnection') as mock_pika: + yield mock_pika + + +@pytest.fixture +def mock_pg_conn(): + with mock.patch('psycopg2.connect') as mock_pg: + yield mock_pg + + +@pytest.fixture +def mock_basic_publish(): + with mock.patch('pika.adapters.blocking_connection.BlockingChannel.basic_publish') as mock_publish: + yield mock_publish + + +@pytest.fixture +def mock_schema(): + return { + 'relation_id': 16385, + 'table_name': 'public.users', + 'columns': [ + {'name': 'id'}, + {'name': 'full_name'}, + {'name': 'company'}, + {'name': 'created_at'}, + {'name': 'updated_at'} + ] + } + +@pytest.fixture +def wal2json_payload(): + return {"action":"D","lsn":"0/1671850","schema":"public","table":"users","identity":[{"name":"id","type":"integer","value":2},{"name":"full_name","type":"text","value":"Geezer Butler"},{"name":"company","type":"jsonb","value":"{\"name\": \"Fyle\"}"},{"name":"created_at","type":"timestamp with time zone","value":"2023-11-17 13:35:09.471909+00"},{"name":"updated_at","type":"timestamp with time zone","value":"2023-11-17 13:35:09.471909+00"}]} + +# Class to hold payload data +class OutputData: + payload = None + data_start = 124122 + + +# Fixture for relation payload +@pytest.fixture +def relation_payload(): + data = OutputData() + data.payload = b'R\x00\x00@\x01public\x00users\x00f\x00\x05\x01id\x00\x00\x00\x00\x17\xff\xff\xff\xff\x01full_name\x00\x00\x00\x00\x19\xff\xff\xff\xff\x01company\x00\x00\x00\x0e\xda\xff\xff\xff\xff\x01created_at\x00\x00\x00\x04\xa0\xff\xff\xff\xff\x01updated_at\x00\x00\x00\x04\xa0\xff\xff\xff\xff' + return data + + +# Fixture for expected relation response +@pytest.fixture +def relation_response(): + return { + 'relation_id': 16385, + 'table_name': 'public.users', + 'columns': [ + {'name': 'id'}, + {'name': 'full_name'}, + {'name': 'company'}, + {'name': 'created_at'}, + {'name': 'updated_at'} + ] + } + + +# Fixture for insert payload +@pytest.fixture +def insert_payload(): + data = OutputData() + data.payload = b'I\x00\x00@\x01N\x00\x05t\x00\x00\x00\x011t\x00\x00\x00\x04Miket\x00\x00\x00\x10{"name": "Fyle"}t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00' + return data + + +# Fixture for expected insert response +@pytest.fixture +def insert_response(): + return { + 'table_name': 'public.users', + 'new': { + 'id': '1', + 'full_name': 'Mike', + 'company': '{"name": "Fyle"}', + 'created_at': '2023-11-17 13:44:14.700844+00', + 'updated_at': '2023-11-17 13:44:14.700844+00' + }, + 'id': '1', + 'old': {}, + 'diff': { + 'id': '1', + 'full_name': 'Mike', + 'company': '{"name": "Fyle"}', + 'created_at': '2023-11-17 13:44:14.700844+00', + 'updated_at': '2023-11-17 13:44:14.700844+00' + }, + 'action': 'I' + } + +# Fixture for update payload +@pytest.fixture +def update_payload(): + data = OutputData() + data.payload = b'U\x00\x00@\x01O\x00\x05t\x00\x00\x00\x011t\x00\x00\x00\x04Miket\x00\x00\x00\x10{"name": "Fyle"}t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00N\x00\x05t\x00\x00\x00\x011t\x00\x00\x00\x05Mylest\x00\x00\x00\x10{"name": "Fyle"}t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00t\x00\x00\x00\x1d2023-11-17 13:44:14.700844+00' + return data + +# Fixture for expected update response +@pytest.fixture +def update_response(): + return { + 'table_name': 'public.users', + 'id': '1', + 'old': { + 'id': '1', + 'full_name': 'Mike', + 'company': '{"name": "Fyle"}', + 'created_at': '2023-11-17 13:44:14.700844+00', + 'updated_at': '2023-11-17 13:44:14.700844+00' + }, + 'new': { + 'id': '1', + 'full_name': 'Myles', + 'company': '{"name": "Fyle"}', + 'created_at': '2023-11-17 13:44:14.700844+00', + 'updated_at': '2023-11-17 13:44:14.700844+00' + }, + 'diff': { + 'full_name': 'Myles' + }, + 'action': 'U' + } + +# Fixture for delete payload +@pytest.fixture +def delete_payload(): + data = OutputData() + data.payload = b'D\x00\x00@\x01O\x00\x05t\x00\x00\x00\x012t\x00\x00\x00\rGeezer Butlert\x00\x00\x00\x10{"name": "Fyle"}t\x00\x00\x00\x1d2023-11-17 13:35:09.471909+00t\x00\x00\x00\x1d2023-11-17 13:35:09.471909+00' + return data + +# Fixture for expected delete response +@pytest.fixture +def delete_response(): + return { + 'table_name': 'public.users', + 'action': 'D', + 'old': { + 'id': '2', + 'full_name': 'Geezer Butler', + 'company': '{"name": "Fyle"}', + 'created_at': '2023-11-17 13:35:09.471909+00', + 'updated_at': '2023-11-17 13:35:09.471909+00' + }, + 'id': '2', + 'new': {}, + 'diff': {} + } diff --git a/tests/test_event_producer.py b/tests/test_event_producer.py index 8a9a72d..a604b40 100644 --- a/tests/test_event_producer.py +++ b/tests/test_event_producer.py @@ -1,64 +1,88 @@ import json -from time import sleep -from common import log - - -logger = log.get_logger(__name__) - - -class TestEventProducer: - - def test_insert(self, db_conn, rmq_conn): - with db_conn.cursor() as db_cursor: - db_cursor.execute( - ''' - insert into users(id, full_name) - values (%s,%s) - ''', ( - '2', 'Tony Iommi' - ) - ) - db_conn.commit() - sleep(1) - - events = rmq_conn.consume_all() - - assert len(events) == 1 - - routing_key, event_body = events[0] - event = json.loads(event_body) - - assert routing_key == 'public.users' - assert event['table_name'] == 'public.users' - assert event['action'] == 'I' - assert event['old'] == {} - assert event['id'] == 2 - assert event['new']['id'] == 2 - assert event['new']['full_name'] == 'Tony Iommi' - - def test_update(self, db_conn, rmq_conn): - with db_conn.cursor() as db_cursor: - db_cursor.execute( - ''' - update users set full_name=%s - where id=%s - ''', ( - 'Geezer Butler', '2' - ) - ) - db_conn.commit() - sleep(1) - - events = rmq_conn.consume_all() - - assert len(events) == 1 - - routing_key, event_body = events[0] - event = json.loads(event_body) - - assert routing_key == 'public.users' - assert event['table_name'] == 'public.users' - assert event['action'] == 'U' - assert event['old']['full_name'] == 'Tony Iommi' - assert event['diff']['full_name'] == 'Geezer Butler' - assert event['new']['full_name'] == 'Geezer Butler' +from unittest import mock +from common.event import base_event +from common.qconnector.rabbitmq_connector import RabbitMQConnector +from producer.event_producer import EventProducer + + +# Test init +def test_init(producer_init_params): + p = EventProducer(**producer_init_params) + + assert p.qconnector_cls == RabbitMQConnector + assert p.event_cls == base_event.BaseEvent + assert p._EventProducer__pg_host == 'localhost' + assert p._EventProducer__pg_port == 5432 + assert p._EventProducer__pg_database == 'test' + assert p._EventProducer__pg_user == 'test' + assert p._EventProducer__pg_password == 'test' + assert p._EventProducer__pg_replication_slot == 'test' + assert p._EventProducer__pg_output_plugin == 'pgoutput' + assert p._EventProducer__pg_publication_name == 'test' + assert p.qconnector._RabbitMQConnector__rabbitmq_url == 'amqp://admin:password@rabbitmq:5672/?heartbeat=0' + assert p.qconnector._RabbitMQConnector__rabbitmq_exchange == 'test' + + +# Test connect +def test_connect(mock_producer, mock_pika_connect, mock_pg_conn): + mock_producer.connect() + + assert mock_producer._EventProducer__db_conn is not None + assert mock_producer._EventProducer__db_cur is not None + + mock_pika_connect.assert_called_once() + mock_pg_conn.assert_called_once() + + +# Test PGOutputMessageProcessor +def test_pgoutput_msg_processor(mock_producer, relation_payload, insert_payload, update_payload, delete_payload, mock_schema): + # mock msg.cursor.send_feedback + mock_msg = mock.Mock() + mock_msg.cursor.send_feedback = mock.Mock() + mock_msg.cursor.send_feedback.return_value = None + + mock_producer.publish = mock.Mock() + + # mock msg.payload + mock_msg.payload = relation_payload.payload + mock_producer.pgoutput_msg_processor(mock_msg) + + assert mock_producer._EventProducer__table_schemas[16385] == mock_schema + + # mock msg.payload + mock_msg.payload = insert_payload.payload + mock_producer.pgoutput_msg_processor(mock_msg) + + assert mock_producer._EventProducer__table_schemas[16385] == mock_schema + + # mock msg.payload + mock_msg.payload = update_payload.payload + mock_producer.pgoutput_msg_processor(mock_msg) + + assert mock_producer._EventProducer__table_schemas[16385] == mock_schema + + # mock msg.payload + mock_msg.payload = delete_payload.payload + mock_producer.pgoutput_msg_processor(mock_msg) + + assert mock_producer._EventProducer__table_schemas[16385] == mock_schema + + assert mock_msg.cursor.send_feedback.call_count == 4 + + assert mock_producer.publish.call_count == 3 + + +# Test Wal2JsonMessageProcessor +def test_wal2json_msg_processor(mock_producer, wal2json_payload): + mock_msg = mock.Mock() + mock_msg.payload = json.dumps(wal2json_payload) + mock_msg.cursor.send_feedback = mock.Mock() + mock_msg.cursor.send_feedback.return_value = None + + mock_producer.publish = mock.Mock() + + mock_producer.wal2json_msg_processor(mock_msg) + + assert mock_msg.cursor.send_feedback.call_count == 1 + + assert mock_producer.publish.call_count == 1 diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..da4b03d --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,82 @@ +import pytest +from unittest import mock + +from pgoutput_parser import ( + InsertMessage, + UpdateMessage, + DeleteMessage, + RelationMessage +) +from pgoutput_parser.base import BaseMessage + + +# Test InsertMessage decoding +def test_insert(insert_payload, insert_response, mock_schema): + + parser = InsertMessage(table_name=mock_schema['table_name'], message=insert_payload.payload, schema=mock_schema) + parsed_message = parser.decode_insert_message() + + assert parsed_message == insert_response + + +# Test UpdateMessage decoding +def test_update(update_payload, update_response, mock_schema): + parser = UpdateMessage(table_name=mock_schema['table_name'], message=update_payload.payload, schema=mock_schema) + parsed_message = parser.decode_update_message() + + assert parsed_message == update_response + + +# Test DeleteMessage decoding +def test_delete(delete_payload, delete_response, mock_schema): + parser = DeleteMessage(table_name=mock_schema['table_name'], message=delete_payload.payload, schema=mock_schema) + parsed_message = parser.decode_delete_message() + + assert parsed_message == delete_response + + +# Test RelationMessage decoding +def test_relation(relation_payload, relation_response): + parser = RelationMessage(table_name=None, message=relation_payload.payload, schema=None) + parsed_message = parser.decode_relation_message() + + assert parsed_message == relation_response + + +# Test BaseMessage for NotImplementedError +def test_base_not_implemented_methods(): + parser = BaseMessage(table_name=None, message=b'123', schema=None) + + with pytest.raises(NotImplementedError) as excinfo: + parser.decode_insert_message() + assert 'This method should be overridden by subclass' in str(excinfo.value) + + with pytest.raises(NotImplementedError) as excinfo: + parser.decode_update_message() + assert 'This method should be overridden by subclass' in str(excinfo.value) + + with pytest.raises(NotImplementedError) as excinfo: + parser.decode_delete_message() + assert 'This method should be overridden by subclass' in str(excinfo.value) + + with pytest.raises(NotImplementedError) as excinfo: + parser.decode_relation_message() + assert 'This method should be overridden by subclass' in str(excinfo.value) + + +# Test BaseMessage for decode_tuple for 'u' and 'n' types +def test_decode_tuple_null_and_unchanged(): + base_message_instance = BaseMessage(table_name=None, message=b'123', schema=None) + # Replace with actual class instantiation if needed + base_message_instance.schema = {'columns': [{'name': 'col1'}, {'name': 'col2'}]} + + # Mock the methods + base_message_instance.read_int16 = mock.MagicMock(return_value=2) + base_message_instance.read_utf_8 = mock.MagicMock(side_effect=['n', 'u']) + base_message_instance.read_int32 = mock.MagicMock() # This won't be called + + # Call the method + result = base_message_instance.decode_tuple() + + # Assertions + assert result == {'col1': None, 'col2': None}