Skip to content

Commit

Permalink
Adding more producer tests and ptest action
Browse files Browse the repository at this point in the history
  • Loading branch information
Shwetabhk committed Nov 18, 2023
1 parent 3363188 commit 462e4ab
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 15 deletions.
10 changes: 10 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# .coveragerc
[run]
omit =
tests/*
*/__init__.py
setup.py
*/main.py

[report]
show_missing=true
47 changes: 47 additions & 0 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
name: Run Pytest and Coverage

on:
push:
branches:
- main
pull_request:
types: [assigned, opened, synchronize, reopened]

jobs:
pytest:
runs-on: ubuntu-latest

steps:
# Checkout the code from the repository
- name: Checkout code
uses: actions/checkout@v2

# Setup Python environment
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.8

# Install Python dependencies
- name: Install dependencies
run: pip install -r requirements.txt

# Run Pytest with coverage
- name: Run Tests
run: |
python -m pytest tests/ --cov=. --junit-xml=test-reports/report.xml --cov-report=term-missing --cov-fail-under=70 | tee pytest-coverage.txt
echo "STATUS=$(grep 'Required test' pytest-coverage.txt | awk '{ print $1 }')" >> $GITHUB_ENV
echo "FAILED=$(awk -F'=' '/failures=/ {gsub(/"/, "", $2); print $2}' test-reports/report.xml)" >> $GITHUB_ENV
# Post Pytest coverage as a comment on PR
- name: Pytest coverage comment
uses: MishaKav/pytest-coverage-comment@main
with:
create-new-comment: true
pytest-coverage-path: ./pytest-coverage.txt
junitxml-path: ./test-reports/report.xml

# Evaluate test and coverage results
- name: Evaluate Coverage
if: env.STATUS == 'FAIL' || env.FAILED > 0
run: exit 1
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ services:
PGDATABASE: dummy
PGUSER: postgres
PGPASSWORD: postgres
PGREPLICATIONSLOT: pgevents_slot
PGOUTPUTPLUGIN: pgoutput
PGREPLICATIONSLOT: pgevents
PGOUTPUTPLUGIN: wal2json
PGTABLES: public.users
PGPUBLICATION: events
RABBITMQ_URL: amqp://admin:password@rabbitmq:5672/?heartbeat=0
Expand Down
13 changes: 11 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def producer_init_params():
'pg_password': 'test',
'pg_replication_slot': 'test',
'pg_output_plugin': 'pgoutput',
'pg_tables': 'test',
'pg_tables': 'public.users',
'pg_publication_name': 'test',
'rabbitmq_url': 'amqp://admin:password@rabbitmq:5672/?heartbeat=0',
'rabbitmq_exchange': 'test'
Expand All @@ -48,7 +48,13 @@ def mock_pg_conn():


@pytest.fixture
def mocked_schema():
def mock_basic_publish():
with mock.patch('pika.adapters.blocking_connection.BlockingChannel.basic_publish') as mock_publish:
yield mock_publish


@pytest.fixture
def mock_schema():
return {
'relation_id': 16385,
'table_name': 'public.users',
Expand All @@ -61,6 +67,9 @@ def mocked_schema():
]
}

@pytest.fixture
def wal2json_payload():
return {"action":"D","lsn":"0/1671850","schema":"public","table":"users","identity":[{"name":"id","type":"integer","value":2},{"name":"full_name","type":"text","value":"Geezer Butler"},{"name":"company","type":"jsonb","value":"{\"name\": \"Fyle\"}"},{"name":"created_at","type":"timestamp with time zone","value":"2023-11-17 13:35:09.471909+00"},{"name":"updated_at","type":"timestamp with time zone","value":"2023-11-17 13:35:09.471909+00"}]}

# Class to hold payload data
class OutputData:
Expand Down
68 changes: 63 additions & 5 deletions tests/test_event_producer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import json
from unittest import mock
from common.event import base_event
from common.qconnector.rabbitmq_connector import RabbitMQConnector
from producer.event_producer import EventProducer


# Path: tests/test_event_producer.py


def test_producer_init(producer_init_params):
# Test init
def test_init(producer_init_params):
p = EventProducer(**producer_init_params)

assert p.qconnector_cls == RabbitMQConnector
Expand All @@ -23,8 +23,66 @@ def test_producer_init(producer_init_params):
assert p.qconnector._RabbitMQConnector__rabbitmq_exchange == 'test'


def test_producer_connect(mock_producer, mock_pika_connect, mock_pg_conn):
# Test connect
def test_connect(mock_producer, mock_pika_connect, mock_pg_conn):
mock_producer.connect()

assert mock_producer._EventProducer__db_conn is not None
assert mock_producer._EventProducer__db_cur is not None

mock_pika_connect.assert_called_once()
mock_pg_conn.assert_called_once()


# Test PGOutputMessageProcessor
def test_pgoutput_msg_processor(mock_producer, relation_payload, insert_payload, update_payload, delete_payload, mock_schema):
# mock msg.cursor.send_feedback
mock_msg = mock.Mock()
mock_msg.cursor.send_feedback = mock.Mock()
mock_msg.cursor.send_feedback.return_value = None

mock_producer.publish = mock.Mock()

# mock msg.payload
mock_msg.payload = relation_payload.payload
mock_producer.pgoutput_msg_processor(mock_msg)

assert mock_producer._EventProducer__table_schemas[16385] == mock_schema

# mock msg.payload
mock_msg.payload = insert_payload.payload
mock_producer.pgoutput_msg_processor(mock_msg)

assert mock_producer._EventProducer__table_schemas[16385] == mock_schema

# mock msg.payload
mock_msg.payload = update_payload.payload
mock_producer.pgoutput_msg_processor(mock_msg)

assert mock_producer._EventProducer__table_schemas[16385] == mock_schema

# mock msg.payload
mock_msg.payload = delete_payload.payload
mock_producer.pgoutput_msg_processor(mock_msg)

assert mock_producer._EventProducer__table_schemas[16385] == mock_schema

assert mock_msg.cursor.send_feedback.call_count == 4

assert mock_producer.publish.call_count == 3


# Test Wal2JsonMessageProcessor
def test_wal2json_msg_processor(mock_producer, wal2json_payload):
mock_msg = mock.Mock()
mock_msg.payload = json.dumps(wal2json_payload)
mock_msg.cursor.send_feedback = mock.Mock()
mock_msg.cursor.send_feedback.return_value = None

mock_producer.publish = mock.Mock()

mock_producer.wal2json_msg_processor(mock_msg)

assert mock_msg.cursor.send_feedback.call_count == 1

assert mock_producer.publish.call_count == 1
12 changes: 6 additions & 6 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,25 @@


# Test InsertMessage decoding
def test_insert(insert_payload, insert_response, mocked_schema):
def test_insert(insert_payload, insert_response, mock_schema):

parser = InsertMessage(table_name=mocked_schema['table_name'], message=insert_payload.payload, schema=mocked_schema)
parser = InsertMessage(table_name=mock_schema['table_name'], message=insert_payload.payload, schema=mock_schema)
parsed_message = parser.decode_insert_message()

assert parsed_message == insert_response


# Test UpdateMessage decoding
def test_update(update_payload, update_response, mocked_schema):
parser = UpdateMessage(table_name=mocked_schema['table_name'], message=update_payload.payload, schema=mocked_schema)
def test_update(update_payload, update_response, mock_schema):
parser = UpdateMessage(table_name=mock_schema['table_name'], message=update_payload.payload, schema=mock_schema)
parsed_message = parser.decode_update_message()

assert parsed_message == update_response


# Test DeleteMessage decoding
def test_delete(delete_payload, delete_response, mocked_schema):
parser = DeleteMessage(table_name=mocked_schema['table_name'], message=delete_payload.payload, schema=mocked_schema)
def test_delete(delete_payload, delete_response, mock_schema):
parser = DeleteMessage(table_name=mock_schema['table_name'], message=delete_payload.payload, schema=mock_schema)
parsed_message = parser.decode_delete_message()

assert parsed_message == delete_response
Expand Down

0 comments on commit 462e4ab

Please sign in to comment.