Skip to content

Commit

Permalink
Enable reading from $all streams
Browse files Browse the repository at this point in the history
  • Loading branch information
subhashb committed Jan 27, 2022
1 parent db3ca2b commit 7b9ee5f
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Release History
===============

0.1.2
-----

* Enable reading from all streams

0.1.1
-----

* Enable editable installation with Poetry
* Return data as json instead of string

0.1.0
-----

* First Release
23 changes: 21 additions & 2 deletions message_db/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def _write(

result = cursor.fetchone()
except Exception as exc:
raise ValueError(exc.args[0].splitlines()[0]) from exc
raise ValueError(
f"{getattr(exc, 'pgcode')}-{getattr(exc, 'pgerror').splitlines()[0]}"
) from exc

return result["write_message"]

Expand Down Expand Up @@ -131,7 +133,24 @@ def read(
cursor = conn.cursor(cursor_factory=RealDictCursor)

if not sql:
if "-" in stream_name:
if stream_name == "$all":
sql = """
SELECT
id::varchar,
stream_name::varchar,
type::varchar,
position::bigint,
global_position::bigint,
data::varchar,
metadata::varchar,
time::timestamp
FROM
messages
WHERE
global_position > %(position)s
LIMIT %(batch_size)s
"""
elif "-" in stream_name:
sql = "SELECT * FROM get_stream_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
else:
sql = "SELECT * FROM get_category_messages(%(stream_name)s, %(position)s, %(batch_size)s);"
Expand Down
24 changes: 23 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def test_that_write_fails_on_expected_version_mismatch(self, client):

assert (
exc.value.args[0]
== "Wrong expected version: 1 (Stream: testStream-123, Stream Version: 2)"
== "P0001-ERROR: Wrong expected version: 1 (Stream: testStream-123, Stream Version: 2)"
)


Expand Down Expand Up @@ -139,6 +139,28 @@ def test_read_category_messages(self, client):
assert messages[4]["data"] == {"foo": "bar4"}


class TestReadAll:
def test_reading_all_streams(self, client):
client.write("stream1-123", "Event1", {"foo": "bar"})
client.write("stream2-123", "Event2", {"foo": "bar"})

messages = client.read("$all")
assert len(messages) == 2

def test_reading_all_streams_after_position(self, client):
for i in range(5):
client.write("stream1-123", f"Event1{i+1}", {"foo": "bar"})
for i in range(5):
client.write("stream2-123", f"Event2{i+1}", {"foo": "bar"})

messages = client.read("$all")
assert len(messages) == 10
messages = client.read("$all", position=5)
assert len(messages) == 5
messages = client.read("$all", position=5, no_of_messages=2)
assert len(messages) == 2


class TestReadStream:
def test_reading_a_category_throws_error(self, client):
with pytest.raises(ValueError) as exc:
Expand Down

0 comments on commit 7b9ee5f

Please sign in to comment.