diff --git a/.coveragerc b/.coveragerc index f6d7e39..f59df72 100644 --- a/.coveragerc +++ b/.coveragerc @@ -5,6 +5,9 @@ omit = */__init__.py setup.py */main.py + common/event/base_event.py + common/exceptions.py + common/qconnector/q_connector.py [report] show_missing=true \ No newline at end of file diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index fa16390..4152f68 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -29,7 +29,7 @@ jobs: # Run Pytest with coverage - name: Run Tests run: | - python -m pytest tests/ --cov=. --junit-xml=test-reports/report.xml --cov-report=term-missing --cov-fail-under=70 | tee pytest-coverage.txt + python -m pytest tests/ --cov=. --junit-xml=test-reports/report.xml --cov-report=term-missing --cov-fail-under=92 | tee pytest-coverage.txt echo "STATUS=$(grep 'Required test' pytest-coverage.txt | awk '{ print $1 }')" >> $GITHUB_ENV echo "FAILED=$(awk -F'=' '/failures=/ {gsub(/"/, "", $2); print $2}' test-reports/report.xml)" >> $GITHUB_ENV diff --git a/consumer/event_consumer.py b/consumer/event_consumer.py index 7f4b810..16a5f4a 100644 --- a/consumer/event_consumer.py +++ b/consumer/event_consumer.py @@ -1,5 +1,6 @@ import json from abc import ABC +from typing import Type from common.event import BaseEvent from common.qconnector import QConnector diff --git a/tests/conftest.py b/tests/conftest.py index 92d8c82..1e9b40d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,12 @@ -import os from unittest import mock -import psycopg2 import pytest from common.event import base_event from common.qconnector import RabbitMQConnector from common import log + from producer.event_producer import EventProducer +from consumer.event_consumer import EventConsumer logger = log.get_logger(__name__) @@ -47,12 +47,6 @@ def mock_pg_conn(): yield mock_pg -@pytest.fixture -def mock_basic_publish(): - with mock.patch('pika.adapters.blocking_connection.BlockingChannel.basic_publish') as mock_publish: - yield mock_publish - - @pytest.fixture def mock_schema(): return { @@ -190,3 +184,27 @@ def delete_response(): 'new': {}, 'diff': {} } + + +@pytest.fixture +def event_consumer_init_params(): + return { + 'qconnector_cls': RabbitMQConnector, + 'event_cls': base_event.BaseEvent, + 'rabbitmq_url': 'amqp://admin:password@rabbitmq:5672/?heartbeat=0', + 'rabbitmq_exchange': 'test', + 'queue_name': 'test', + 'binding_keys': 'public.users' + } + + +@pytest.fixture +def mock_consumer(event_consumer_init_params): + return EventConsumer(**event_consumer_init_params) + + +@pytest.fixture +def mock_event(update_response): + event = base_event.BaseEvent() + event.from_dict(update_response) + return event diff --git a/tests/test_event_consumer.py b/tests/test_event_consumer.py new file mode 100644 index 0000000..6814c97 --- /dev/null +++ b/tests/test_event_consumer.py @@ -0,0 +1,58 @@ +from unittest import mock +from common.event import base_event +from common.qconnector.rabbitmq_connector import RabbitMQConnector +from consumer.event_consumer import EventConsumer + + +# Test init +def test_init(event_consumer_init_params): + c = EventConsumer(**event_consumer_init_params) + + assert c.qconnector_cls == RabbitMQConnector + assert c.event_cls == base_event.BaseEvent + assert c.qconnector._RabbitMQConnector__rabbitmq_url == 'amqp://admin:password@rabbitmq:5672/?heartbeat=0' + assert c.qconnector._RabbitMQConnector__rabbitmq_exchange == 'test' + + +# Test connect +def test_connect(mock_consumer, mock_pika_connect): + mock_consumer.connect() + mock_pika_connect.assert_called_once() + + +# Test start_consuming +def test_start_consuming(mock_consumer): + # mock basic_consume + mock_consumer.qconnector.consume_stream = mock.Mock() + mock_consumer.start_consuming() + + mock_consumer.qconnector.consume_stream.assert_called_once() + + # test stream_consumer function + mock_consumer.process_message = mock.Mock() + mock_consumer.check_shutdown = mock.Mock() + mock_consumer.qconnector.consume_stream.call_args[1]['callback_fn']('test', '{"test": "test"}') + + mock_consumer.process_message.assert_called_once() + mock_consumer.check_shutdown.assert_called_once() + + +# Test process_message +def test_process_message(mock_consumer, mock_event): + with mock.patch('logging.Logger.info') as mock_logger: + mock_consumer.process_message('test', mock_event) + mock_logger.call_count == 3 + + +# Test shutdown +def test_shutdown(mock_consumer): + mock_consumer.shutdown() + assert mock_consumer._EventConsumer__shutdown is True + + +# Test check_shutdown +def test_check_shutdown(mock_consumer): + mock_consumer._EventConsumer__shutdown = True + mock_consumer.qconnector.check_shutdown = mock.Mock() + mock_consumer.check_shutdown() + mock_consumer.qconnector.check_shutdown.assert_called_once() diff --git a/tests/test_event_producer.py b/tests/test_event_producer.py index a604b40..2e829ea 100644 --- a/tests/test_event_producer.py +++ b/tests/test_event_producer.py @@ -1,5 +1,6 @@ import json from unittest import mock + from common.event import base_event from common.qconnector.rabbitmq_connector import RabbitMQConnector from producer.event_producer import EventProducer @@ -30,8 +31,14 @@ def test_connect(mock_producer, mock_pika_connect, mock_pg_conn): assert mock_producer._EventProducer__db_conn is not None assert mock_producer._EventProducer__db_cur is not None - mock_pika_connect.assert_called_once() - mock_pg_conn.assert_called_once() + mock_producer._EventProducer__pg_output_plugin = 'wal2json' + mock_producer.connect() + + assert mock_producer._EventProducer__db_conn is not None + assert mock_producer._EventProducer__db_cur is not None + + mock_pika_connect.call_count == 2 + mock_pg_conn.call_count == 2 # Test PGOutputMessageProcessor @@ -86,3 +93,35 @@ def test_wal2json_msg_processor(mock_producer, wal2json_payload): assert mock_msg.cursor.send_feedback.call_count == 1 assert mock_producer.publish.call_count == 1 + + +# Test publish +def test_publish(mock_producer): + # mock basic_publish + mock_producer.qconnector._RabbitMQConnector__rmq_channel = mock.Mock() + + mock_producer.publish(routing_key='test', payload='test') + + mock_producer.qconnector._RabbitMQConnector__rmq_channel.basic_publish.assert_called_once() + + +# Test start_consuming +def test_start_consuming(mock_producer): + # mock consume_stream pyscopg2 + mock_producer._EventProducer__db_cur = mock.Mock() + mock_producer._EventProducer__db_cur.consume_stream = mock.Mock() + + mock_producer.start_consuming() + + mock_producer._EventProducer__db_cur.consume_stream.assert_called_once() + + # Test stream_consumer function + mock_producer.wal2json_msg_processor = mock.Mock() + mock_producer.pgoutput_msg_processor = mock.Mock() + mock_producer.check_shutdown = mock.Mock() + + mock_producer._EventProducer__db_cur.consume_stream.call_args[1]['consume']('test') + + mock_producer._EventProducer__pg_output_plugin = 'wal2json' + mock_producer._EventProducer__db_cur.consume_stream.call_args[1]['consume']('test') + mock_producer.wal2json_msg_processor.assert_called_once()