diff --git a/dev/Makefile b/dev/Makefile
index d3a88cb8..7670ec8f 100644
--- a/dev/Makefile
+++ b/dev/Makefile
@@ -1,4 +1,5 @@
# Commands inside the container
+paths = src/ tests/
all: pyupgrade black autoflake isort flake8 mypy
@@ -9,22 +10,22 @@ check-pyupgrade:
pyupgrade --py310-plus $$(find . -name '*.py')
black:
- black src/
+ black $(paths)
check-black:
- black --check --diff src/
+ black --check --diff $(paths)
autoflake:
- autoflake src/
+ autoflake $(paths)
isort:
- isort src/
+ isort $(paths)
check-isort:
- isort --check-only --diff src/
+ isort --check-only --diff $(paths)
flake8:
- flake8 src/
+ flake8 $(paths)
mypy:
mypy src/
diff --git a/dev/setup.cfg b/dev/setup.cfg
index 6575421d..e27cfd7a 100644
--- a/dev/setup.cfg
+++ b/dev/setup.cfg
@@ -16,4 +16,4 @@ line_length = 88
extend-ignore = E203,E501
[mypy]
-disallow_untyped_defs = true
+disallow_untyped_defs = true
\ No newline at end of file
diff --git a/dev/sql/schema_relational.sql b/dev/sql/schema_relational.sql
index 75958081..18f370e6 100644
--- a/dev/sql/schema_relational.sql
+++ b/dev/sql/schema_relational.sql
@@ -55,7 +55,7 @@ BEGIN
END;
$$ LANGUAGE plpgsql;
--- MODELS_YML_CHECKSUM = '82f9031bf779d97f165fdce1ceb5cf71'
+-- MODELS_YML_CHECKSUM = '0a7ce920e0e10eedd3b09c4ce81fc3f8'
-- Type definitions
-- Table definitions
@@ -288,6 +288,7 @@ CREATE TABLE IF NOT EXISTS meeting_t (
agenda_item_creation varchar(256) CONSTRAINT enum_meeting_agenda_item_creation CHECK (agenda_item_creation IN ('always', 'never', 'default_yes', 'default_no')) DEFAULT 'default_no',
agenda_new_items_default_visibility varchar(256) CONSTRAINT enum_meeting_agenda_new_items_default_visibility CHECK (agenda_new_items_default_visibility IN ('common', 'internal', 'hidden')) DEFAULT 'internal',
agenda_show_internal_items_on_projector boolean DEFAULT False,
+ agenda_show_topic_navigation_on_detail_view boolean DEFAULT False,
list_of_speakers_amount_last_on_projector integer CONSTRAINT minimum_list_of_speakers_amount_last_on_projector CHECK (list_of_speakers_amount_last_on_projector >= -1) DEFAULT 0,
list_of_speakers_amount_next_on_projector integer CONSTRAINT minimum_list_of_speakers_amount_next_on_projector CHECK (list_of_speakers_amount_next_on_projector >= -1) DEFAULT -1,
list_of_speakers_couple_countdown boolean DEFAULT True,
@@ -369,6 +370,9 @@ Password: {password}
This email was generated automatically.',
users_enable_vote_delegations boolean,
+ users_forbid_delegator_in_list_of_speakers boolean,
+ users_forbid_delegator_as_submitter boolean,
+ users_forbid_delegator_as_supporter boolean,
assignments_export_title varchar(256) DEFAULT 'Elections',
assignments_export_preamble text,
assignment_poll_ballot_paper_selection varchar(256) CONSTRAINT enum_meeting_assignment_poll_ballot_paper_selection CHECK (assignment_poll_ballot_paper_selection IN ('NUMBER_OF_DELEGATES', 'NUMBER_OF_ALL_PARTICIPANTS', 'CUSTOM_NUMBER')) DEFAULT 'CUSTOM_NUMBER',
diff --git a/dev/src/__init__.py b/dev/src/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/dev/src/db_utils.py b/dev/src/db_utils.py
index 46bc08bd..01de5ad7 100644
--- a/dev/src/db_utils.py
+++ b/dev/src/db_utils.py
@@ -1,83 +1,38 @@
-from typing import Any, cast
+from typing import Any
-from psycopg import Cursor, sql
+from .python_sql import Column, Table
class DbUtils:
@classmethod
- def get_pg_array_for_cu(cls, data: list) -> str:
- return f"""{{"{'","'.join(item for item in data)}"}}"""
-
- @classmethod
- def insert_wrapper(
- cls, curs: Cursor, table_name: str, data: dict[str, Any]
- ) -> None | int:
- query = f"INSERT INTO {table_name} ({', '.join(data.keys())}) VALUES({{}}) RETURNING id;"
- query = (
- sql.SQL(query)
- .format(
- sql.SQL(", ").join(sql.Placeholder() * len(data.keys())),
- )
- .as_string(curs)
- )
- result = curs.execute(query, tuple(data.values())).fetchone()
- if isinstance(result, dict):
- return result.get("id", 0)
- return None
-
- @classmethod
- def insert_many_wrapper(
+ def get_columns_and_values_for_insert(
cls,
- curs: Cursor,
- table_name: str,
+ table: Table,
data_list: list[dict[str, Any]],
- returning: str = "id",
- ) -> list[int]:
- ids: list[int] = []
+ ) -> tuple[list[Column], list[list[dict[str, Any]]]]:
+ """
+ takes a list of dicts, each one to be inserted
+ Takes care of columns and row positions and fills
+ not existent columns in row with "None"
+ """
+ columns: list[Column] = []
+ values: list[list[dict[str, Any]]] = []
if not data_list:
- return ids
+ return columns, values
# use all keys in same sequence
keys_set: set = set()
for data in data_list:
keys_set.update(data.keys())
keys: list = sorted(keys_set)
- temp_data = {k: None for k in keys}
+ columns = [Column(table, key) for key in keys]
+ values = [[row.get(k, None) for k in keys] for row in data_list]
+ return columns, values
- dates = [temp_data | data for data in data_list]
- query = f"INSERT INTO {table_name} ({', '.join(keys)}) VALUES({{}}){' RETURNING ' + returning if returning else ''};"
- query = (
- sql.SQL(query)
- .format(
- sql.SQL(", ").join(sql.Placeholder() * len(keys)),
- )
- .as_string(curs)
- )
- curs.executemany(
- query,
- tuple(tuple(v for _, v in sorted(data.items())) for data in dates),
- returning=bool(returning),
- )
- ids = []
- if returning:
- while True:
- ids.append(cast(dict, curs.fetchone())[returning])
- if not curs.nextset():
- break
- return ids
+ @classmethod
+ def get_columns_from_list(cls, table: Table, items: list[str]) -> list[Column]:
+ return [Column(table, item) for item in items]
@classmethod
- def select_id_wrapper(
- cls,
- curs: Cursor,
- table_name: str,
- id_: int | None = None,
- field_names: list[str] = [],
- ) -> dict[str, Any] | list[dict[str, Any]]:
- """select with single id or all for fields in list or all fields"""
- query = sql.SQL(
- f"SELECT {', '.join(field_names) if field_names else '*'} FROM {table_name}{' where id = %s' if id_ else ''}"
- )
- if id_:
- return result if (result := curs.execute(query, (id_,)).fetchone()) else {}
- else:
- return curs.execute(query).fetchall()
+ def get_pg_array_for_cu(cls, data: list) -> str:
+ """converts a value list into string used for complete array field"""
+ return f"""{{"{'","'.join(item for item in data)}"}}"""
diff --git a/dev/src/generate_sql_schema.py b/dev/src/generate_sql_schema.py
index c55ff5fc..393a1f1b 100644
--- a/dev/src/generate_sql_schema.py
+++ b/dev/src/generate_sql_schema.py
@@ -9,7 +9,7 @@
from textwrap import dedent
from typing import Any, TypedDict, cast
-from helper_get_names import (
+from .helper_get_names import (
KEYSEPARATOR,
HelperGetNames,
InternalHelper,
@@ -1074,7 +1074,6 @@ def generate_field_or_sql_decision(
("1t", "1rR"): (FieldSqlErrorType.SQL, False),
("1tR", "1Gr"): (FieldSqlErrorType.SQL, False),
("1tR", "1GrR"): (FieldSqlErrorType.SQL, False),
- ("1rR", "1t"): (FieldSqlErrorType.FIELD, False),
("nGt", "nt"): (FieldSqlErrorType.SQL, True),
("nr", ""): (FieldSqlErrorType.SQL, True),
("nt", "1Gr"): (FieldSqlErrorType.SQL, False),
diff --git a/dev/src/python_sql.py b/dev/src/python_sql.py
new file mode 100644
index 00000000..c94d3ace
--- /dev/null
+++ b/dev/src/python_sql.py
@@ -0,0 +1,2 @@
+# mypy does not recognize the imports from `python-sql` correctly. Therefore, we gather them in this file so they can be imported from here in other places without using `type: ignore` everytime.
+from sql import Column, Table # type: ignore # noqa:F401
diff --git a/dev/tests/base.py b/dev/tests/base.py
index 35db733d..482e7227 100644
--- a/dev/tests/base.py
+++ b/dev/tests/base.py
@@ -1,24 +1,25 @@
import os
-from datetime import datetime
-from typing import Any
+from collections.abc import Callable
from unittest import TestCase
import psycopg
-import pytest
from psycopg import sql
from psycopg.types.json import Jsonb
+
from src.db_utils import DbUtils
+from src.python_sql import Table
# ADMIN_USERNAME = "admin"
# ADMIN_PASSWORD = "admin"
+
class BaseTestCase(TestCase):
temporary_template_db = "openslides_template"
work_on_test_db = "openslides_test"
- db_connection: psycopg.Connection = None
+ db_connection: psycopg.Connection
# id's of pre loaded rows, see method populate_database
- meeting1_id= 0
+ meeting1_id = 0
theme1_id = 0
organization_id = 0
user1_id = 0
@@ -30,112 +31,180 @@ class BaseTestCase(TestCase):
complex_workflowM1_id = 0
@classmethod
- def set_db_connection(cls, db_name:str, autocommit:bool = False, row_factory:callable = psycopg.rows.dict_row) -> None:
+ def set_db_connection(
+ cls,
+ db_name: str,
+ autocommit: bool = False,
+ row_factory: Callable = psycopg.rows.dict_row,
+ ) -> None:
env = os.environ
try:
- cls.db_connection = psycopg.connect(f"dbname='{db_name}' user='{env['DATABASE_USER']}' host='{env['DATABASE_HOST']}' password='{env['PGPASSWORD']}'", autocommit=autocommit, row_factory=row_factory)
+ cls.db_connection = psycopg.connect(
+ f"dbname='{db_name}' user='{env['DATABASE_USER']}' host='{env['DATABASE_HOST']}' password='{env['PGPASSWORD']}'",
+ autocommit=autocommit,
+ row_factory=row_factory,
+ )
cls.db_connection.isolation_level = psycopg.IsolationLevel.SERIALIZABLE
except Exception as e:
raise Exception(f"Cannot connect to postgres: {e.message}")
@classmethod
- def setup_class(cls):
+ def setup_class(cls) -> None:
env = os.environ
cls.set_db_connection("postgres", True)
with cls.db_connection:
with cls.db_connection.cursor() as curs:
curs.execute(
- sql.SQL("DROP DATABASE IF EXISTS {temporary_template_db} (FORCE);").format(
+ sql.SQL(
+ "DROP DATABASE IF EXISTS {temporary_template_db} (FORCE);"
+ ).format(
temporary_template_db=sql.Identifier(cls.temporary_template_db)
)
)
curs.execute(
- sql.SQL("CREATE DATABASE {db_to_create} TEMPLATE {template_db};").format(
+ sql.SQL(
+ "CREATE DATABASE {db_to_create} TEMPLATE {template_db};"
+ ).format(
db_to_create=sql.Identifier(cls.temporary_template_db),
- template_db=sql.Identifier(env['DATABASE_NAME'])))
+ template_db=sql.Identifier(env["DATABASE_NAME"]),
+ )
+ )
cls.set_db_connection(cls.temporary_template_db)
with cls.db_connection:
with cls.db_connection.cursor() as curs:
- curs.execute("CREATE EXTENSION pldbgapi;") # Postgres debug extension
+ curs.execute("CREATE EXTENSION pldbgapi;") # Postgres debug extension
cls.populate_database()
@classmethod
- def teardown_class(cls):
- """ remove last test db and drop the temporary template db"""
+ def teardown_class(cls) -> None:
+ """remove last test db and drop the temporary template db"""
cls.set_db_connection("postgres", True)
with cls.db_connection:
with cls.db_connection.cursor() as curs:
- curs.execute(sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(sql.Identifier(cls.work_on_test_db)))
- curs.execute(sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(sql.Identifier(cls.temporary_template_db)))
+ curs.execute(
+ sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(
+ sql.Identifier(cls.work_on_test_db)
+ )
+ )
+ curs.execute(
+ sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(
+ sql.Identifier(cls.temporary_template_db)
+ )
+ )
def setUp(self) -> None:
self.set_db_connection("postgres", autocommit=True)
with self.db_connection:
with self.db_connection.cursor() as curs:
- curs.execute(sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(sql.Identifier(self.work_on_test_db)))
- curs.execute(sql.SQL("CREATE DATABASE {test_db} TEMPLATE {temporary_template_db};").format(
- test_db=sql.Identifier(self.work_on_test_db),
- temporary_template_db=sql.Identifier(self.temporary_template_db)))
+ curs.execute(
+ sql.SQL("DROP DATABASE IF EXISTS {} (FORCE);").format(
+ sql.Identifier(self.work_on_test_db)
+ )
+ )
+ curs.execute(
+ sql.SQL(
+ "CREATE DATABASE {test_db} TEMPLATE {temporary_template_db};"
+ ).format(
+ test_db=sql.Identifier(self.work_on_test_db),
+ temporary_template_db=sql.Identifier(
+ self.temporary_template_db
+ ),
+ )
+ )
self.set_db_connection(self.work_on_test_db)
@classmethod
def populate_database(cls) -> None:
- """ do something like setting initial_data.json"""
+ """do something like setting initial_data.json"""
+ theme_t = Table("theme_t")
+ organization_t = Table("organization_t")
+ user_t = Table("user_t")
+ committee_t = Table("committee_t")
+
with cls.db_connection.transaction():
with cls.db_connection.cursor() as curs:
- cls.theme1_id = DbUtils.insert_wrapper(curs, "theme_t", {
- "name": "OpenSlides Blue",
- "accent_500": "#2196f3",
- "primary_500": "#317796",
- "warn_500": "#f06400",
- })
- cls.organization_id = DbUtils.insert_wrapper(curs, "organization_t", {
- "name": "Test Organization",
- "legal_notice": "OpenSlides is a free web based presentation and assembly system for visualizing and controlling agenda, motions and elections of an assembly.",
- "login_text": "Good Morning!",
- "default_language": "en",
- "genders": ["male", "female", "diverse", "non-binary"],
- "enable_electronic_voting": True,
- "enable_chat": True,
- "reset_password_verbose_errors": True,
- "limit_of_meetings": 0,
- "limit_of_users": 0,
- "theme_id": cls.theme1_id,
- "users_email_sender": "OpenSlides",
- "users_email_subject": "OpenSlides access data",
- "users_email_body": "Dear {name},\n\nthis is your personal OpenSlides login:\n\n{url}\nUsername: {username}\nPassword: {password}\n\n\nThis email was generated automatically.",
- "url": "https://example.com",
- "saml_enabled": False,
- "saml_login_button_text": "SAML Login",
- "saml_attr_mapping": Jsonb({
- "saml_id": "username",
- "title": "title",
- "first_name": "firstName",
- "last_name": "lastName",
- "email": "email",
- "gender": "gender",
- "pronoun": "pronoun",
- "is_active": "is_active",
- "is_physical_person": "is_person"
- })
- })
- cls.user1_id = DbUtils.insert_wrapper(curs, "user_t", {
- "username": "admin",
- "last_name": "Administrator",
- "is_active": True,
- "is_physical_person": True,
- "password": "316af7b2ddc20ead599c38541fbe87e9a9e4e960d4017d6e59de188b41b2758flD5BVZAZ8jLy4nYW9iomHcnkXWkfk3PgBjeiTSxjGG7+fBjMBxsaS1vIiAMxYh+K38l0gDW4wcP+i8tgoc4UBg==",
- "default_password": "admin",
- "can_change_own_password": True,
- "gender": "male",
- "default_vote_weight": "1.000000",
- "organization_management_level": "superadmin",
- })
- cls.committee1_id = DbUtils.insert_wrapper(curs, "committee_t", {
- "name": "Default committee",
- "description": "Add description here",
- })
+ cls.theme1_id = curs.execute(
+ *theme_t.insert(
+ columns=[
+ theme_t.name,
+ theme_t.accent_500,
+ theme_t.primary_500,
+ theme_t.warn_500,
+ ],
+ values=[["OpenSlides Blue", "#2196f3", "#317796", "#f06400"]],
+ returning=[theme_t.id],
+ )
+ ).fetchone()["id"]
+ data = [
+ {
+ "name": "Test Organization",
+ "legal_notice": 'OpenSlides is a free web based presentation and assembly system for visualizing and controlling agenda, motions and elections of an assembly.',
+ "login_text": "Good Morning!",
+ "default_language": "en",
+ "genders": ["male", "female", "diverse", "non-binary"],
+ "enable_electronic_voting": True,
+ "enable_chat": True,
+ "reset_password_verbose_errors": True,
+ "limit_of_meetings": 0,
+ "limit_of_users": 0,
+ "theme_id": cls.theme1_id,
+ "users_email_sender": "OpenSlides",
+ "users_email_subject": "OpenSlides access data",
+ "users_email_body": "Dear {name},\n\nthis is your personal OpenSlides login:\n\n{url}\nUsername: {username}\nPassword: {password}\n\n\nThis email was generated automatically.",
+ "url": "https://example.com",
+ "saml_enabled": False,
+ "saml_login_button_text": "SAML Login",
+ "saml_attr_mapping": Jsonb(
+ {
+ "saml_id": "username",
+ "title": "title",
+ "first_name": "firstName",
+ "last_name": "lastName",
+ "email": "email",
+ "gender": "gender",
+ "pronoun": "pronoun",
+ "is_active": "is_active",
+ "is_physical_person": "is_person",
+ }
+ ),
+ }
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ organization_t, data
+ )
+ cls.organization_id = curs.execute(
+ *organization_t.insert(
+ columns, values, returning=[organization_t.id]
+ )
+ ).fetchone()["id"]
+ data = [
+ {
+ "username": "admin",
+ "last_name": "Administrator",
+ "is_active": True,
+ "is_physical_person": True,
+ "password": "316af7b2ddc20ead599c38541fbe87e9a9e4e960d4017d6e59de188b41b2758flD5BVZAZ8jLy4nYW9iomHcnkXWkfk3PgBjeiTSxjGG7+fBjMBxsaS1vIiAMxYh+K38l0gDW4wcP+i8tgoc4UBg==",
+ "default_password": "admin",
+ "can_change_own_password": True,
+ "gender": "male",
+ "default_vote_weight": "1.000000",
+ "organization_management_level": "superadmin",
+ }
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ user_t, data
+ )
+ cls.user1_id = curs.execute(
+ *user_t.insert(columns, values, returning=[user_t.id])
+ ).fetchone()["id"]
+ cls.committee1_id = curs.execute(
+ *committee_t.insert(
+ columns=[committee_t.name, committee_t.description],
+ values=[["Default committee", "Add description here"]],
+ returning=[committee_t.id],
+ )
+ ).fetchone()["id"]
result_ids = cls.create_meeting(curs, committee_id=cls.committee1_id)
cls.meeting1_id = result_ids["meeting_id"]
cls.groupM1_default_id = result_ids["default_group_id"]
@@ -143,10 +212,20 @@ def populate_database(cls) -> None:
cls.groupM1_staff_id = result_ids["staff_group_id"]
cls.simple_workflowM1_id = result_ids["simple_workflow_id"]
cls.complex_workflowM1_id = result_ids["complex_workflow_id"]
- curs.execute("UPDATE committee_t SET default_meeting_id = %s where id = %s;", (result_ids["meeting_id"], cls.committee1_id))
+ curs.execute(
+ *committee_t.update(
+ columns=[committee_t.default_meeting_id],
+ values=[cls.committee1_id],
+ )
+ )
@classmethod
- def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int = 0, ) -> None:
+ def create_meeting(
+ cls,
+ curs: psycopg.Cursor,
+ committee_id: int,
+ meeting_id: int = 0,
+ ) -> dict[str, int]:
"""
Creates meeting with next availale id if not set or id set (lower ids can't be choosed afterwards)
The committee_id must be given and the committee must exist. The meeting will not be set as default meeting of the committee
@@ -159,59 +238,90 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
- default_project_id, secondary_projector_id
- simple_workflow_id, complex_workflow_id
"""
+ group_t = Table("group_t")
+ projector_t = Table("projector_t")
+ motion_state_t = Table("motion_state_t")
+ nm_motion_state_next_state_ids_motion_state_t = Table(
+ "nm_motion_state_next_state_ids_motion_state_t"
+ )
+ motion_workflow_t = Table("motion_workflow_t")
+ meeting_t = Table("meeting_t")
+
result = {}
if meeting_id:
- sequence_name = curs.execute("select * from pg_get_serial_sequence('meeting_t', 'id');").fetchone()["pg_get_serial_sequence"]
- last_value = curs.execute(f"select last_value from {sequence_name};").fetchone()["last_value"]
+ sequence_name = curs.execute(
+ "select * from pg_get_serial_sequence('meeting_t', 'id');"
+ ).fetchone()["pg_get_serial_sequence"]
+ last_value = curs.execute(
+ f"select last_value from {sequence_name};"
+ ).fetchone()["last_value"]
if last_value >= meeting_id:
- raise ValueError(f"meeting_id {meeting_id} is not available, last_value in sequence {sequence_name} is {last_value}")
- result["meeting_id"] = curs.execute("select setval(pg_get_serial_sequence('meeting_t', 'id'), %s);", (meeting_id,)).fetchone()["setval"]
+ raise ValueError(
+ f"meeting_id {meeting_id} is not available, last_value in sequence {sequence_name} is {last_value}"
+ )
+ result["meeting_id"] = curs.execute(
+ "select setval(pg_get_serial_sequence('meeting_t', 'id'), %s);",
+ (meeting_id,),
+ ).fetchone()["setval"]
else:
- result["meeting_id"] = curs.execute("select nextval(pg_get_serial_sequence('meeting_t', 'id')) as id_;").fetchone()["id_"]
- (result["default_group_id"], result["admin_group_id"], result["staff_group_id"]) = DbUtils.insert_many_wrapper(curs, "group_t", [
- {
- "name": "Default",
- "permissions": [
- "agenda_item.can_see_internal",
- "assignment.can_see",
- "list_of_speakers.can_see",
- "mediafile.can_see",
- "meeting.can_see_frontpage",
- "motion.can_see",
- "projector.can_see",
- "user.can_see"
+ result["meeting_id"] = curs.execute(
+ "select nextval(pg_get_serial_sequence('meeting_t', 'id')) as id_;"
+ ).fetchone()["id_"]
+ curs.execute(
+ *group_t.insert(
+ columns=[
+ group_t.name,
+ group_t.permissions,
+ group_t.weight,
+ group_t.meeting_id,
],
- "weight": 1,
- "meeting_id": result["meeting_id"]
- },
- {
- "name": "Admin",
- "permissions": [],
- "weight": 2,
- "meeting_id": result["meeting_id"]
- },
- {
- "name": "Staff",
- "permissions": [
- "agenda_item.can_manage",
- "assignment.can_manage",
- "assignment.can_nominate_self",
- "list_of_speakers.can_be_speaker",
- "list_of_speakers.can_manage",
- "mediafile.can_manage",
- "meeting.can_see_frontpage",
- "meeting.can_see_history",
- "motion.can_manage",
- "poll.can_manage",
- "projector.can_manage",
- "tag.can_manage",
- "user.can_manage"
+ values=[
+ [
+ "Default",
+ [
+ "agenda_item.can_see_internal",
+ "assignment.can_see",
+ "list_of_speakers.can_see",
+ "mediafile.can_see",
+ "meeting.can_see_frontpage",
+ "motion.can_see",
+ "projector.can_see",
+ "user.can_see",
+ ],
+ 1,
+ result["meeting_id"],
+ ],
+ ["Admin", [], 2, result["meeting_id"]],
+ [
+ "Staff",
+ [
+ "agenda_item.can_manage",
+ "assignment.can_manage",
+ "assignment.can_nominate_self",
+ "list_of_speakers.can_be_speaker",
+ "list_of_speakers.can_manage",
+ "mediafile.can_manage",
+ "meeting.can_see_frontpage",
+ "meeting.can_see_history",
+ "motion.can_manage",
+ "poll.can_manage",
+ "projector.can_manage",
+ "tag.can_manage",
+ "user.can_manage",
+ ],
+ 3,
+ result["meeting_id"],
+ ],
],
- "weight": 3,
- "meeting_id": result["meeting_id"]
- }
- ])
- (result["default_projector_id"], result["secondary_projector_id"]) = DbUtils.insert_many_wrapper(curs, "projector_t", [
+ returning=[group_t.id],
+ )
+ )
+ (
+ result["default_group_id"],
+ result["admin_group_id"],
+ result["staff_group_id"],
+ ) = (x["id"] for x in curs)
+ data = [
{
"name": "Default projector",
"is_internal": False,
@@ -232,21 +342,49 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"show_logo": True,
"show_clock": True,
"sequential_number": 1,
- "used_as_default_projector_for_agenda_item_list_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_topic_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_list_of_speakers_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_current_los_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_motion_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_amendment_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_motion_block_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_assignment_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_mediafile_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_message_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_countdown_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_assignment_poll_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_motion_poll_in_meeting_id": result["meeting_id"],
- "used_as_default_projector_for_poll_in_meeting_id": result["meeting_id"],
- "meeting_id": result["meeting_id"]
+ "used_as_default_projector_for_agenda_item_list_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_topic_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_list_of_speakers_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_current_los_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_motion_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_amendment_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_motion_block_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_assignment_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_mediafile_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_message_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_countdown_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_assignment_poll_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_motion_poll_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "used_as_default_projector_for_poll_in_meeting_id": result[
+ "meeting_id"
+ ],
+ "meeting_id": result["meeting_id"],
},
{
"name": "Secondary projector",
@@ -268,12 +406,22 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"show_logo": True,
"show_clock": True,
"sequential_number": 2,
- "meeting_id": result["meeting_id"]
- }
- ])
- result["simple_workflow_id"] = curs.execute("select nextval(pg_get_serial_sequence('motion_workflow_t', 'id')) as new_id;").fetchone()["new_id"]
- result["complex_workflow_id"] = curs.execute("select nextval(pg_get_serial_sequence('motion_workflow_t', 'id')) as new_id;").fetchone()["new_id"]
- wf_m1_simple_motion_state_ids = DbUtils.insert_many_wrapper(curs, "motion_state_t", [
+ "meeting_id": result["meeting_id"],
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(projector_t, data)
+ curs.execute(*projector_t.insert(columns, values, returning=[projector_t.id]))
+ (result["default_projector_id"], result["secondary_projector_id"]) = (
+ x["id"] for x in curs
+ )
+
+ result["simple_workflow_id"] = curs.execute(
+ "select nextval(pg_get_serial_sequence('motion_workflow_t', 'id')) as new_id;"
+ ).fetchone()["new_id"]
+ result["complex_workflow_id"] = curs.execute(
+ "select nextval(pg_get_serial_sequence('motion_workflow_t', 'id')) as new_id;"
+ ).fetchone()["new_id"]
+ motion_state_data = [
{
"name": "submitted",
"weight": 1,
@@ -289,7 +437,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"show_recommendation_extension_field": False,
"set_workflow_timestamp": True,
"allow_motion_forwarding": True,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "accepted",
@@ -307,7 +455,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"set_workflow_timestamp": False,
"allow_motion_forwarding": True,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "rejected",
@@ -325,7 +473,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "not decided",
@@ -343,11 +491,19 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
- ])
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ motion_state_t, motion_state_data
+ )
+ curs.execute(
+ *motion_state_t.insert(columns, values, returning=[motion_state_t.id])
+ )
+ wf_m1_simple_motion_state_ids = [x["id"] for x in curs]
wf_m1_simple_first_state_id = wf_m1_simple_motion_state_ids[0]
- wf_m1_complex_motion_state_ids = DbUtils.insert_many_wrapper(curs, "motion_state_t", [
+
+ motion_state_data = [
{
"name": "in progress",
"weight": 5,
@@ -363,7 +519,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"set_workflow_timestamp": True,
"allow_motion_forwarding": True,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "submitted",
@@ -380,7 +536,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": True,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "permitted",
@@ -398,7 +554,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": 1
+ "meeting_id": 1,
},
{
"name": "accepted",
@@ -416,7 +572,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "rejected",
@@ -434,7 +590,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "withdrawn",
@@ -451,7 +607,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "adjourned",
@@ -469,7 +625,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "not concerned",
@@ -487,7 +643,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "referred to committee",
@@ -505,7 +661,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "needs review",
@@ -522,7 +678,7 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
{
"name": "rejected (not authorized)",
@@ -540,76 +696,154 @@ def create_meeting(cls, curs: psycopg.Cursor, committee_id: int, meeting_id: int
"allow_support": False,
"allow_motion_forwarding": True,
"set_workflow_timestamp": False,
- "meeting_id": result["meeting_id"]
+ "meeting_id": result["meeting_id"],
},
- ])
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ motion_state_t, motion_state_data
+ )
+ curs.execute(
+ *motion_state_t.insert(columns, values, returning=[motion_state_t.id])
+ )
+ wf_m1_complex_motion_state_ids = [x["id"] for x in curs]
wf_m1_complex_first_state_id = wf_m1_complex_motion_state_ids[0]
- DbUtils.insert_many_wrapper(curs, "nm_motion_state_next_state_ids_motion_state_t",
- [
- {"next_state_id": wf_m1_simple_motion_state_ids[1], "previous_state_id": wf_m1_simple_motion_state_ids[0]},
- {"next_state_id": wf_m1_simple_motion_state_ids[2], "previous_state_id": wf_m1_simple_motion_state_ids[0]},
- {"next_state_id": wf_m1_simple_motion_state_ids[3], "previous_state_id": wf_m1_simple_motion_state_ids[0]},
- {"next_state_id": wf_m1_complex_motion_state_ids[1], "previous_state_id": wf_m1_complex_motion_state_ids[0]},
- {"next_state_id": wf_m1_complex_motion_state_ids[5], "previous_state_id": wf_m1_complex_motion_state_ids[0]},
- {"next_state_id": wf_m1_complex_motion_state_ids[2], "previous_state_id": wf_m1_complex_motion_state_ids[1]},
- {"next_state_id": wf_m1_complex_motion_state_ids[5], "previous_state_id": wf_m1_complex_motion_state_ids[1]},
- {"next_state_id": wf_m1_complex_motion_state_ids[10], "previous_state_id": wf_m1_complex_motion_state_ids[1]},
- {"next_state_id": wf_m1_complex_motion_state_ids[3], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[4], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[5], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[6], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[7], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[8], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- {"next_state_id": wf_m1_complex_motion_state_ids[9], "previous_state_id": wf_m1_complex_motion_state_ids[2]},
- ], returning='')
- assert [result["simple_workflow_id"], result["complex_workflow_id"]] == DbUtils.insert_many_wrapper(curs, "motion_workflow_t",
- [
- {
- "id": result["simple_workflow_id"],
- "name": "Simple Workflow",
- "sequential_number": 1,
- "first_state_id": wf_m1_simple_first_state_id,
- "meeting_id": result["meeting_id"]
- },
- {
- "id": result["complex_workflow_id"],
- "name": "Complex Workflow",
- "sequential_number": 2,
- "first_state_id": wf_m1_complex_first_state_id,
- "meeting_id": result["meeting_id"]
- }
- ]
+ data = [
+ {
+ "next_state_id": wf_m1_simple_motion_state_ids[1],
+ "previous_state_id": wf_m1_simple_motion_state_ids[0],
+ },
+ {
+ "next_state_id": wf_m1_simple_motion_state_ids[2],
+ "previous_state_id": wf_m1_simple_motion_state_ids[0],
+ },
+ {
+ "next_state_id": wf_m1_simple_motion_state_ids[3],
+ "previous_state_id": wf_m1_simple_motion_state_ids[0],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[1],
+ "previous_state_id": wf_m1_complex_motion_state_ids[0],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[5],
+ "previous_state_id": wf_m1_complex_motion_state_ids[0],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[2],
+ "previous_state_id": wf_m1_complex_motion_state_ids[1],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[5],
+ "previous_state_id": wf_m1_complex_motion_state_ids[1],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[10],
+ "previous_state_id": wf_m1_complex_motion_state_ids[1],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[3],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[4],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[5],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[6],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[7],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[8],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ {
+ "next_state_id": wf_m1_complex_motion_state_ids[9],
+ "previous_state_id": wf_m1_complex_motion_state_ids[2],
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ nm_motion_state_next_state_ids_motion_state_t, data
+ )
+ curs.execute(
+ *nm_motion_state_next_state_ids_motion_state_t.insert(columns, values)
+ )
+
+ data = [
+ {
+ "id": result["simple_workflow_id"],
+ "name": "Simple Workflow",
+ "sequential_number": 1,
+ "first_state_id": wf_m1_simple_first_state_id,
+ "meeting_id": result["meeting_id"],
+ },
+ {
+ "id": result["complex_workflow_id"],
+ "name": "Complex Workflow",
+ "sequential_number": 2,
+ "first_state_id": wf_m1_complex_first_state_id,
+ "meeting_id": result["meeting_id"],
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ motion_workflow_t, data
+ )
+ curs.execute(
+ *motion_workflow_t.insert(columns, values, returning=[motion_workflow_t.id])
+ )
+ assert [
+ result["simple_workflow_id"],
+ result["complex_workflow_id"],
+ ] == [x["id"] for x in curs]
+
+ data = [
+ {
+ "id": result["meeting_id"],
+ "name": "OpenSlides Demo",
+ "is_active_in_organization_id": cls.organization_id,
+ "language": "en",
+ "conference_los_restriction": True,
+ "agenda_number_prefix": "TOP",
+ "motions_default_workflow_id": result["simple_workflow_id"],
+ "motions_default_amendment_workflow_id": result["complex_workflow_id"],
+ "motions_default_statute_amendment_workflow_id": result[
+ "complex_workflow_id"
+ ],
+ "motions_recommendations_by": "ABK",
+ "motions_statute_recommendations_by": "",
+ "motions_statutes_enabled": True,
+ "motions_amendments_of_amendments": True,
+ "motions_amendments_prefix": "-\u00c4",
+ "motions_supporters_min_amount": 1,
+ "motions_export_preamble": "",
+ "users_enable_presence_view": True,
+ "users_pdf_wlan_encryption": "",
+ "users_enable_vote_delegations": True,
+ "poll_ballot_paper_selection": "CUSTOM_NUMBER",
+ "poll_ballot_paper_number": 8,
+ "poll_sort_poll_result_by_votes": True,
+ "poll_default_type": "nominal",
+ "poll_default_method": "votes",
+ "poll_default_onehundred_percent_base": "valid",
+ "committee_id": committee_id,
+ "reference_projector_id": result["default_projector_id"],
+ "default_group_id": result["default_group_id"],
+ "admin_group_id": result["admin_group_id"],
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(meeting_t, data)
+ assert (
+ result["meeting_id"]
+ == curs.execute(
+ *meeting_t.insert(columns, values, returning=[meeting_t.id])
+ ).fetchone()["id"]
)
- assert result["meeting_id"] == DbUtils.insert_wrapper(curs, "meeting_t", {
- "id": result["meeting_id"],
- "name": "OpenSlides Demo",
- "is_active_in_organization_id": cls.organization_id,
- "language": "en",
- "conference_los_restriction": True,
- "agenda_number_prefix": "TOP",
- "motions_default_workflow_id": result["simple_workflow_id"],
- "motions_default_amendment_workflow_id": result["complex_workflow_id"],
- "motions_default_statute_amendment_workflow_id": result["complex_workflow_id"],
- "motions_recommendations_by": "ABK",
- "motions_statute_recommendations_by": "",
- "motions_statutes_enabled": True,
- "motions_amendments_of_amendments": True,
- "motions_amendments_prefix": "-\u00c4",
- "motions_supporters_min_amount": 1,
- "motions_export_preamble": "",
- "users_enable_presence_view": True,
- "users_pdf_wlan_encryption": "",
- "users_enable_vote_delegations": True,
- "poll_ballot_paper_selection": "CUSTOM_NUMBER",
- "poll_ballot_paper_number": 8,
- "poll_sort_poll_result_by_votes": True,
- "poll_default_type": "nominal",
- "poll_default_method": "votes",
- "poll_default_onehundred_percent_base": "valid",
- "committee_id": committee_id,
- "reference_projector_id": result["default_projector_id"],
- "default_group_id": result["default_group_id"],
- "admin_group_id": result["admin_group_id"]
- })
- return result
\ No newline at end of file
+ return result
diff --git a/dev/tests/test_generic_relations.py b/dev/tests/test_generic_relations.py
index def0d964..1a0e6b08 100644
--- a/dev/tests/test_generic_relations.py
+++ b/dev/tests/test_generic_relations.py
@@ -1,15 +1,35 @@
-from datetime import datetime
-from typing import cast
+from typing import Any
import psycopg
import pytest
from psycopg import sql
-from sql import Table
-from sql.aggregate import *
-from sql.conditionals import *
+
from src.db_utils import DbUtils
+from src.python_sql import Table
from tests.base import BaseTestCase
+agenda_item_t = Table("agenda_item_t")
+assignment_t = Table("assignment_t")
+assignment_v = Table("assignment")
+committee_v = Table("committee")
+gm_organization_tag_tagged_ids_t = Table("gm_organization_tag_tagged_ids_t")
+group_v = Table("group_")
+group_t = Table("group_t")
+list_of_speakers_t = Table("list_of_speakers_t")
+list_of_speakers_v = Table("list_of_speakers")
+mediafile_t = Table("mediafile_t")
+mediafile_v = Table("mediafile")
+meeting_t = Table("meeting_t")
+meeting_v = Table("meeting")
+option_t = Table("option_t")
+organization_tag_t = Table("organization_tag_t")
+organization_v = Table("organization")
+poll_candidate_list_t = Table("poll_candidate_list_t")
+poll_candidate_list_v = Table("poll_candidate_list")
+projector_t = Table("projector")
+theme_v = Table("theme")
+user_v = Table("user_")
+
class Relations(BaseTestCase):
"""
@@ -25,289 +45,810 @@ class Relations(BaseTestCase):
R: Required
"""
+ """ 1:n relation tests with n-side NOT NULL """
+ """ Test:motion_state.submitter_withdraw_back_ids: sql okay?"""
""" 1:1 relation tests """
- def test_one_to_one_pre_populated_1rR_1t(self) -> None:
+
+ # todo: 1Gr errors
+ def test_generic_1Gr_check_constraint_error(self) -> None:
+ """tries to use a not defined owner-model for generic field owner_id"""
+ with pytest.raises(psycopg.DatabaseError) as e:
+ with self.db_connection.cursor() as curs:
+ with self.db_connection.transaction():
+ curs.execute(
+ *mediafile_t.insert(
+ [mediafile_t.is_public, mediafile_t.owner_id],
+ [[True, f"motion_state/{self.meeting1_id}"]],
+ )
+ )
+ assert (
+ 'new row for relation "mediafile_t" violates check constraint "valid_owner_id_part1"'
+ in str(e)
+ )
+
+ # todo: 1GrR errors
+ # todo: 1r errors
+ # todo: 1rR errors
+
+ # todo: 1t:1GrR
+ def test_o2o_generic_1t_1GrR_okay(self) -> None:
+ """SQL 1t:1GrR => assignment/agenda_item_id:-> agenda_item/content_object_id"""
with self.db_connection.cursor() as curs:
- organization_row = DbUtils.select_id_wrapper(curs, "organization", self.organization_id, ["theme_id"])
- assert organization_row["theme_id"] == self.theme1_id
- theme_row = DbUtils.select_id_wrapper(curs, "theme", self.theme1_id, ["theme_for_organization_id"])
- assert theme_row["theme_for_organization_id"] == self.organization_id
+ with self.db_connection.transaction():
+ assignment_id = curs.execute(
+ *assignment_t.insert(
+ [
+ assignment_t.title,
+ assignment_t.sequential_number,
+ assignment_t.meeting_id,
+ ],
+ [["title assignment 1", 21, self.meeting1_id]],
+ returning=[assignment_t.id],
+ )
+ ).fetchone()["id"]
+ assignment = f"assignment/{assignment_id}"
+ agenda_item_id = curs.execute(
+ *agenda_item_t.insert(
+ [
+ agenda_item_t.item_number,
+ agenda_item_t.content_object_id,
+ agenda_item_t.meeting_id,
+ ],
+ [["100", assignment, self.meeting1_id]],
+ returning=[agenda_item_t.id],
+ )
+ ).fetchone()["id"]
+ assignment_row = curs.execute(
+ *assignment_v.select(
+ where=assignment_v.agenda_item_id == agenda_item_id
+ )
+ ).fetchone()
+ agenda_item_row = curs.execute(
+ *agenda_item_t.select(
+ where=agenda_item_t.content_object_id == assignment
+ )
+ ).fetchone()
+ assert assignment_row["agenda_item_id"] == agenda_item_row["id"]
+ assert agenda_item_row["content_object_id"] == assignment
+ assert (
+ agenda_item_row["content_object_id_assignment_id"] == 1
+ ) # internal storage
- def test_one_to_one_pre_populated_1r_1t(self) -> None:
+ def test_o2o_generic_1t_1GrR_okay_with_setval(self) -> None:
with self.db_connection.cursor() as curs:
- committee_row = DbUtils.select_id_wrapper(curs, "committee", self.committee1_id, ["default_meeting_id"])
- assert committee_row["default_meeting_id"] == self.meeting1_id
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["default_meeting_for_committee_id"])
- assert meeting_row["default_meeting_for_committee_id"] == self.committee1_id
+ with self.db_connection.transaction():
+ mediafile_id = 2
+ los_id = 3
+ curs.execute(
+ "select setval(pg_get_serial_sequence('mediafile_t', 'id'), %s);",
+ (mediafile_id,),
+ )
+ assert (
+ mediafile_id
+ == curs.execute(
+ *mediafile_t.insert(
+ [
+ mediafile_t.id,
+ mediafile_t.is_public,
+ mediafile_t.owner_id,
+ ],
+ [[mediafile_id, True, f"meeting/{self.meeting1_id}"]],
+ returning=[mediafile_t.id],
+ )
+ ).fetchone()["id"]
+ )
+ curs.execute(
+ "select setval(pg_get_serial_sequence('list_of_speakers_t', 'id'), %s);",
+ (los_id,),
+ )
+ data = [
+ {
+ "id": los_id,
+ "content_object_id": (
+ content_object_id := f"mediafile/{mediafile_id}"
+ ),
+ "meeting_id": self.meeting1_id,
+ "sequential_number": 28,
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ list_of_speakers_t, data
+ )
+ assert (
+ los_id
+ == curs.execute(
+ *list_of_speakers_t.insert(
+ columns, values, returning=[list_of_speakers_t.id]
+ )
+ ).fetchone()["id"]
+ )
+ los_row: dict[str, Any] = curs.execute(
+ *list_of_speakers_t.select(
+ list_of_speakers_t.id,
+ list_of_speakers_t.content_object_id,
+ list_of_speakers_t.content_object_id_mediafile_id,
+ list_of_speakers_t.content_object_id_topic_id,
+ where=list_of_speakers_t.id == los_id,
+ )
+ ).fetchone()
+ assert los_row["id"] == los_id
+ assert los_row["content_object_id"] == content_object_id
+ assert los_row["content_object_id_mediafile_id"] == mediafile_id
+ assert los_row["content_object_id_topic_id"] is None
+
+ mediafile_row = curs.execute(
+ *mediafile_v.select(
+ mediafile_v.id,
+ mediafile_v.list_of_speakers_id,
+ mediafile_v.owner_id,
+ where=mediafile_v.id == mediafile_id,
+ )
+ ).fetchone()
+ assert mediafile_row["id"] == mediafile_id
+ assert mediafile_row["list_of_speakers_id"] == los_id
- # TODO: remove test, fiktiv test with 1r:1tR, test s.o, in der die meeting-Seite ein SQL hat
- # jetzt setze ich mal ein erequired auf der Meeting Seite. Was ist übrigens mit 1r:1rR
- def test_one_to_one_pre_populated_1r_1tR(self) -> None:
+ # todo: 1t:1r
+ def test_o2o_pre_populated_1t_1r_okay(self) -> None:
with self.db_connection.cursor() as curs:
- committee_row = DbUtils.select_id_wrapper(curs, "committee", self.committee1_id, ["default_meeting_id"])
+ committee_row = curs.execute(
+ *committee_v.select(
+ committee_v.default_meeting_id,
+ where=committee_v.id == self.committee1_id,
+ )
+ ).fetchone()
assert committee_row["default_meeting_id"] == self.meeting1_id
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["default_meeting_for_committee_id"])
+ meeting_row = curs.execute(
+ *meeting_v.select(
+ meeting_v.default_meeting_for_committee_id,
+ where=meeting_v.id == self.meeting1_id,
+ )
+ ).fetchone()
assert meeting_row["default_meeting_for_committee_id"] == self.committee1_id
- def test_one_to_one_1tR_1t(self) -> None:
+ # todo: 1t:1rR
+ def test_o2o_pre_populated_1t_1rR_okay(self) -> None:
+ with self.db_connection.cursor() as curs:
+ organization_row = curs.execute(
+ *organization_v.select(
+ organization_v.theme_id,
+ where=organization_v.id == self.organization_id,
+ )
+ ).fetchone()
+ assert organization_row["theme_id"] == self.theme1_id
+ theme_row = curs.execute(
+ *theme_v.select(
+ theme_v.theme_for_organization_id,
+ where=theme_v.id == self.theme1_id,
+ )
+ ).fetchone()
+ assert theme_row["theme_for_organization_id"] == self.organization_id
+
+ def test_o2o_1t_1rR_okay_with_change_data(self) -> None:
with self.db_connection.cursor() as curs:
# Prepopulated
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["default_group_id"])
+ meeting_row = curs.execute(
+ *meeting_v.select(
+ meeting_v.default_group_id, where=meeting_v.id == self.meeting1_id
+ )
+ ).fetchone()
old_default_group_id = meeting_row["default_group_id"]
- old_default_group_row = DbUtils.select_id_wrapper(curs, "group_", old_default_group_id, ["default_group_for_meeting_id"])
- assert old_default_group_row["default_group_for_meeting_id"] == self.meeting1_id
+ old_default_group_row = curs.execute(
+ *group_v.select(
+ group_v.default_group_for_meeting_id,
+ where=group_v.id == old_default_group_id,
+ )
+ ).fetchone()
+ assert (
+ old_default_group_row["default_group_for_meeting_id"]
+ == self.meeting1_id
+ )
# change default group
with self.db_connection.transaction():
- group_staff_row = curs.execute(sql.SQL("SELECT id, name, meeting_id, default_group_for_meeting_id FROM group_ where name = %s and meeting_id = %s;"), ("Staff", self.meeting1_id)).fetchone()
+ group_staff_row = curs.execute(
+ *group_v.select(
+ group_v.id,
+ group_v.name,
+ group_v.meeting_id,
+ group_v.default_group_for_meeting_id,
+ where=(
+ (group_v.name == "Staff")
+ & (group_v.meeting_id == self.meeting1_id)
+ ),
+ )
+ ).fetchone()
assert group_staff_row["id"] == self.groupM1_staff_id
assert group_staff_row["name"] == "Staff"
assert group_staff_row["meeting_id"] == self.meeting1_id
- assert group_staff_row["default_group_for_meeting_id"] == None
- curs.execute(sql.SQL("UPDATE meeting_t SET default_group_id = %s where id = %s;"), (group_staff_row["id"], self.meeting1_id))
+ assert group_staff_row["default_group_for_meeting_id"] is None
+ curs.execute(
+ *meeting_t.update(
+ [meeting_t.default_group_id],
+ [group_staff_row["id"]],
+ where=meeting_t.id == self.meeting1_id,
+ )
+ )
+
# assert new and old data
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["default_group_id"])
+ meeting_row = curs.execute(
+ *meeting_v.select(
+ meeting_v.default_group_id, where=meeting_v.id == self.meeting1_id
+ )
+ ).fetchone()
assert meeting_row["default_group_id"] == group_staff_row["id"]
- new_default_group_row = DbUtils.select_id_wrapper(curs, "group_", group_staff_row["id"], ["default_group_for_meeting_id"])
- assert new_default_group_row["default_group_for_meeting_id"] == self.meeting1_id
- old_default_group_row = DbUtils.select_id_wrapper(curs, "group_", old_default_group_id, ["default_group_for_meeting_id"])
- assert old_default_group_row["default_group_for_meeting_id"] == None
-
- """ 1:n relation tests with n-side NOT NULL """
- """ Test:motion_state.submitter_withdraw_back_ids: sql okay?"""
- def test_one_to_many_1t_ntR_update_error(self) -> None:
- """ update removes default projector => Exception"""
- with self.db_connection.cursor() as curs:
- with pytest.raises(psycopg.errors.RaiseException) as e:
- projector_id = curs.execute("SELECT id from projector where used_as_default_projector_for_topic_in_meeting_id = %s", (self.meeting1_id,)).fetchone()['id']
- with self.db_connection.transaction():
- curs.execute(sql.SQL("UPDATE projector_t SET used_as_default_projector_for_topic_in_meeting_id = null where id = %s;"), (projector_id,))
- assert 'Exception: NOT NULL CONSTRAINT VIOLATED for meeting.default_projector_topic_ids' in str(e)
+ new_default_group_row = curs.execute(
+ *group_v.select(
+ group_v.default_group_for_meeting_id,
+ where=group_v.id == group_staff_row["id"],
+ )
+ ).fetchone()
+ assert (
+ new_default_group_row["default_group_for_meeting_id"]
+ == self.meeting1_id
+ )
+ old_default_group_row = curs.execute(
+ *group_v.select(
+ group_v.default_group_for_meeting_id,
+ where=group_v.id == old_default_group_id,
+ )
+ ).fetchone()
+ assert old_default_group_row["default_group_for_meeting_id"] is None
- def test_one_to_many_1t_ntR_update_okay(self) -> None:
- """ Update sets new default projector before 2nd removes old default projector"""
- with self.db_connection.cursor() as curs:
- projector_ids = curs.execute("SELECT id from projector where meeting_id = %s", (self.meeting1_id,)).fetchall()
- with self.db_connection.transaction():
- curs.execute(sql.SQL("UPDATE projector_t SET used_as_default_projector_for_topic_in_meeting_id = %s where id = %s;"), (self.meeting1_id, projector_ids[1]["id"]))
- curs.execute(sql.SQL("UPDATE projector_t SET used_as_default_projector_for_topic_in_meeting_id = null where id = %s;"), (projector_ids[0]["id"],))
- assert projector_ids[1]["id"] == DbUtils.select_id_wrapper(curs, 'meeting', self.meeting1_id, ['default_projector_topic_ids'])['default_projector_topic_ids'][0]
-
- def test_one_to_many_1t_ntR_update_wrong_update_sequence_error(self) -> None:
- """ first update removes the projector from meeting => Exception"""
- with self.db_connection.cursor() as curs:
- projector_ids = curs.execute("SELECT id from projector where used_as_default_projector_for_topic_in_meeting_id = %s", (self.meeting1_id,)).fetchall()
- with pytest.raises(psycopg.errors.RaiseException) as e:
- with self.db_connection.transaction():
- curs.execute(sql.SQL("UPDATE projector_t SET used_as_default_projector_for_topic_in_meeting_id = null where id = %s;"), (projector_ids[0]["id"],))
- curs.execute(sql.SQL("UPDATE projector_t SET used_as_default_projector_for_topic_in_meeting_id = %s where id = %s;"), (self.meeting1_id, projector_ids[1]["id"]))
- assert 'Exception: NOT NULL CONSTRAINT VIOLATED for meeting.default_projector_topic_ids' in str(e)
-
- def test_one_to_many_1t_ntR_delete_error(self) -> None:
- """ delete projector from meeting => Exception"""
- with self.db_connection.cursor() as curs:
- projector_id = curs.execute("SELECT id from projector where used_as_default_projector_for_topic_in_meeting_id = %s", (self.meeting1_id,)).fetchone()["id"]
- with pytest.raises(psycopg.errors.RaiseException) as e:
- with self.db_connection.transaction():
- curs.execute(sql.SQL("DELETE FROM projector where id = %s;"), (projector_id,))
- assert 'Exception: NOT NULL CONSTRAINT VIOLATED' in str(e)
-
- def test_one_to_many_1t_ntR_delete_insert_okay(self) -> None:
- """ first insert, than delete old default projector from meeting => okay"""
- with self.db_connection.cursor() as curs:
- with self.db_connection.transaction():
- projector = curs.execute("SELECT * from projector where used_as_default_projector_for_topic_in_meeting_id = %s", (self.meeting1_id,)).fetchone()
- field_list = ["meeting_id", "used_as_default_projector_for_agenda_item_list_in_meeting_id", "used_as_default_projector_for_topic_in_meeting_id", "used_as_default_projector_for_list_of_speakers_in_meeting_id", "used_as_default_projector_for_current_los_in_meeting_id", "used_as_default_projector_for_motion_in_meeting_id", "used_as_default_projector_for_amendment_in_meeting_id", "used_as_default_projector_for_motion_block_in_meeting_id", "used_as_default_projector_for_assignment_in_meeting_id", "used_as_default_projector_for_mediafile_in_meeting_id", "used_as_default_projector_for_message_in_meeting_id", "used_as_default_projector_for_countdown_in_meeting_id", "used_as_default_projector_for_assignment_poll_in_meeting_id", "used_as_default_projector_for_motion_poll_in_meeting_id", "used_as_default_projector_for_poll_in_meeting_id"]
- data = {fname: projector[fname] for fname in field_list}
- data["sequential_number"] = projector["sequential_number"] + 2
- new_projector_id = DbUtils.insert_wrapper(curs, "projector_t", data)
- curs.execute(sql.SQL("UPDATE meeting_t SET reference_projector_id = %s where id = %s;"), (new_projector_id, projector["meeting_id"]))
- curs.execute(sql.SQL("DELETE FROM projector where id = %s;"), (projector["id"],))
- assert new_projector_id == cast(dict, DbUtils.select_id_wrapper(curs, "meeting", projector["meeting_id"], ["default_projector_topic_ids"]))["default_projector_topic_ids"][0]
-
- """ n:m relation tests """
- """ manual sqls tests"""
- """ all field type tests """
- """ constraint tests """
-
- """ generic-relation tests """
- def test_generic_1GT_1tR(self) -> None:
+ # todo: 1tR:1Gr
+ def test_o2o_generic_1tR_1Gr_okay_with_setval(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
pcl_id = 2
option_id = 3
- curs.execute("select setval(pg_get_serial_sequence('poll_candidate_list_t', 'id'), %s);", (pcl_id,))
- assert pcl_id == DbUtils.insert_wrapper(curs, "poll_candidate_list_t", {"id": pcl_id, "meeting_id": self.meeting1_id})
- curs.execute("select setval(pg_get_serial_sequence('option_t', 'id'), %s);", (option_id,))
- assert option_id == DbUtils.insert_wrapper(curs, "option_t", {"id": option_id, "content_object_id": (content_object_id := f"poll_candidate_list/{pcl_id}"), "meeting_id": self.meeting1_id})
- option_row = DbUtils.select_id_wrapper(curs, "option", option_id, ["id", "content_object_id", "content_object_id_poll_candidate_list_id", "content_object_id_user_id"])
+ curs.execute(
+ "select setval(pg_get_serial_sequence('poll_candidate_list_t', 'id'), %s);",
+ (pcl_id,),
+ )
+ assert (
+ pcl_id
+ == curs.execute(
+ *poll_candidate_list_t.insert(
+ [
+ poll_candidate_list_t.id,
+ poll_candidate_list_t.meeting_id,
+ ],
+ [[pcl_id, self.meeting1_id]],
+ returning=[poll_candidate_list_t.id],
+ )
+ ).fetchone()["id"]
+ )
+ curs.execute(
+ "select setval(pg_get_serial_sequence('option_t', 'id'), %s);",
+ (option_id,),
+ )
+ data = [
+ {
+ "id": option_id,
+ "content_object_id": (
+ content_object_id := f"poll_candidate_list/{pcl_id}"
+ ),
+ "meeting_id": self.meeting1_id,
+ },
+ ]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ option_t, data
+ )
+ assert (
+ option_id
+ == curs.execute(
+ *option_t.insert(columns, values, returning=[option_t.id])
+ ).fetchone()["id"]
+ )
+ columns = DbUtils.get_columns_from_list(
+ option_t,
+ [
+ "id",
+ "content_object_id",
+ "content_object_id_poll_candidate_list_id",
+ "content_object_id_user_id",
+ ],
+ )
+ option_row = curs.execute(
+ *option_t.select(*columns, where=option_t.id == option_id)
+ ).fetchone()
assert option_row["id"] == option_id
assert option_row["content_object_id"] == content_object_id
assert option_row["content_object_id_poll_candidate_list_id"] == pcl_id
- assert option_row["content_object_id_user_id"] == None
+ assert option_row["content_object_id_user_id"] is None
- pcl_row = DbUtils.select_id_wrapper(curs, "poll_candidate_list", pcl_id, ["id", "option_id",])
+ pcl_row = curs.execute(
+ *poll_candidate_list_v.select(
+ poll_candidate_list_v.id,
+ poll_candidate_list_v.option_id,
+ where=poll_candidate_list_v.id == pcl_id,
+ )
+ ).fetchone()
assert pcl_row["id"] == pcl_id
assert pcl_row["option_id"] == option_id
- def test_generic_1GT_nt(self) -> None:
+ # todo: 1tR:1GrR to implement R:R
+ def test_generic_1tR_1GrR_okay(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- option_id = 3
- curs.execute("select setval(pg_get_serial_sequence('poll_candidate_list_t', 'id'), %s);", (option_id,))
- option_id == DbUtils.insert_wrapper(curs, "option_t", {"id": option_id, "content_object_id": (content_object_id := f"user/{self.user1_id}"), "meeting_id": self.meeting1_id})
- option_row = DbUtils.select_id_wrapper(curs, "option", option_id, ["id", "content_object_id", "content_object_id_user_id", "content_object_id_poll_candidate_list_id"])
- assert option_row["id"] == option_id
- assert option_row["content_object_id"] == content_object_id
- assert option_row["content_object_id_user_id"] == self.user1_id
- assert option_row["content_object_id_poll_candidate_list_id"] == None
+ assignment_id = 2
+ los_id = 3
+ curs.execute(
+ "select setval(pg_get_serial_sequence('assignment_t', 'id'), %s);",
+ (assignment_id,),
+ )
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ assignment_t,
+ [
+ {
+ "id": assignment_id,
+ "title": "I am an assignment",
+ "sequential_number": 42,
+ "meeting_id": self.meeting1_id,
+ },
+ ],
+ )
+ assert (
+ assignment_id
+ == curs.execute(
+ *assignment_t.insert(
+ columns, values, returning=[assignment_t.id]
+ )
+ ).fetchone()["id"]
+ )
- user_row = DbUtils.select_id_wrapper(curs, "user_", self.user1_id, ["username", "option_ids",])
- assert user_row["option_ids"] == [option_id]
- assert user_row["username"] == "admin"
+ curs.execute(
+ "select setval(pg_get_serial_sequence('list_of_speakers_t', 'id'), %s);",
+ (los_id,),
+ )
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ list_of_speakers_t,
+ [
+ {
+ "id": los_id,
+ "content_object_id": (
+ content_object_id := f"assignment/{assignment_id}"
+ ),
+ "meeting_id": self.meeting1_id,
+ "sequential_number": 28,
+ }
+ ],
+ )
+ assert (
+ los_id
+ == curs.execute(
+ *list_of_speakers_t.insert(
+ columns, values, returning=[list_of_speakers_t.id]
+ )
+ ).fetchone()["id"]
+ )
- def test_generic_1GTR_1t(self) -> None:
- with self.db_connection.cursor() as curs:
- with self.db_connection.transaction():
- mediafile_id = 2
- los_id = 3
- curs.execute("select setval(pg_get_serial_sequence('mediafile_t', 'id'), %s);", (mediafile_id,))
- assert mediafile_id == DbUtils.insert_wrapper(curs, "mediafile_t", {"id": mediafile_id, "is_public": True, "owner_id": f"meeting/{self.meeting1_id}"})
- curs.execute("select setval(pg_get_serial_sequence('list_of_speakers_t', 'id'), %s);", (los_id,))
- assert los_id == DbUtils.insert_wrapper(curs, "list_of_speakers_t", {"id": los_id, "content_object_id": (content_object_id := f"mediafile/{mediafile_id}"), "meeting_id": self.meeting1_id, "sequential_number": 28})
- los_row = DbUtils.select_id_wrapper(curs, "list_of_speakers", los_id, ["id", "content_object_id", "content_object_id_mediafile_id", "content_object_id_topic_id"])
+ los_row = curs.execute(
+ *list_of_speakers_v.select(
+ list_of_speakers_v.id,
+ list_of_speakers_v.content_object_id,
+ where=list_of_speakers_v.id == los_id,
+ )
+ ).fetchone()
assert los_row["id"] == los_id
assert los_row["content_object_id"] == content_object_id
- assert los_row["content_object_id_mediafile_id"] == mediafile_id
- assert los_row["content_object_id_topic_id"] == None
- mediafile_row = DbUtils.select_id_wrapper(curs, "mediafile", mediafile_id, ["id", "list_of_speakers_id", "owner_id"])
- assert mediafile_row["id"] == mediafile_id
- assert mediafile_row["list_of_speakers_id"] == los_id
+ assignment_row = curs.execute(
+ *assignment_v.select(
+ assignment_v.id,
+ assignment_v.list_of_speakers_id,
+ where=assignment_v.id == assignment_id,
+ )
+ ).fetchone()
+ assert assignment_row["id"] == assignment_id
+ assert assignment_row["list_of_speakers_id"] == los_id
- def test_generic_1GTR_1tR(self) -> None:
+ # todo: nGt:nt only error check nGt
+ def test_generic_nGt_check_constraint_error(self) -> None:
+ with pytest.raises(psycopg.DatabaseError) as e:
+ with self.db_connection.cursor() as curs:
+ with self.db_connection.transaction():
+ tag_id = curs.execute(
+ *organization_tag_t.insert(
+ [organization_tag_t.name, organization_tag_t.color],
+ [["Orga Tag 1", "#ffee13"]],
+ returning=[organization_tag_t.id],
+ )
+ ).fetchone()["id"]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ gm_organization_tag_tagged_ids_t,
+ [
+ {
+ "organization_tag_id": tag_id,
+ "tagged_id": f"committee/{self.committee1_id}",
+ },
+ {
+ "organization_tag_id": tag_id,
+ "tagged_id": f"motion_state/{self.meeting1_id}",
+ },
+ ],
+ )
+ curs.execute(
+ *gm_organization_tag_tagged_ids_t.insert(columns, values)
+ )
+ assert (
+ 'new row for relation "gm_organization_tag_tagged_ids_t" violates check constraint "valid_tagged_id_part1"'
+ in str(e)
+ )
+
+ def test_generic_nGt_unique_constraint_error(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- assignment_id = 2
- los_id = 3
- curs.execute("select setval(pg_get_serial_sequence('assignment_t', 'id'), %s);", (assignment_id,))
- assert assignment_id == DbUtils.insert_wrapper(curs, "assignment_t", {"id": assignment_id, "title": "I am an assignment", "sequential_number": 42, "meeting_id": self.meeting1_id})
- curs.execute("select setval(pg_get_serial_sequence('list_of_speakers_t', 'id'), %s);", (los_id,))
- assert los_id == DbUtils.insert_wrapper(curs, "list_of_speakers_t", {"id": los_id, "content_object_id": (content_object_id := f"assignment/{assignment_id}"), "meeting_id": self.meeting1_id, "sequential_number": 28})
- los_row = DbUtils.select_id_wrapper(curs, "list_of_speakers", los_id, ["id", "content_object_id", "content_object_id_assignment_id", "content_object_id_topic_id"])
- assert los_row["id"] == los_id
- assert los_row["content_object_id"] == content_object_id
- assert los_row["content_object_id_assignment_id"] == assignment_id
- assert los_row["content_object_id_topic_id"] == None
+ tag_id = curs.execute(
+ *organization_tag_t.insert(
+ [organization_tag_t.name, organization_tag_t.color],
+ [["Orga Tag 1", "#ffee13"]],
+ returning=[organization_tag_t.id],
+ )
+ ).fetchone()["id"]
+ curs.execute(
+ *gm_organization_tag_tagged_ids_t.insert(
+ [
+ gm_organization_tag_tagged_ids_t.organization_tag_id,
+ gm_organization_tag_tagged_ids_t.tagged_id,
+ ],
+ [[tag_id, f"committee/{self.committee1_id}"]],
+ returning=[gm_organization_tag_tagged_ids_t.id],
+ )
+ ).fetchone()["id"]
+ with pytest.raises(psycopg.DatabaseError) as e:
+ with self.db_connection.transaction():
+ curs.execute(
+ *gm_organization_tag_tagged_ids_t.insert(
+ [
+ gm_organization_tag_tagged_ids_t.organization_tag_id,
+ gm_organization_tag_tagged_ids_t.tagged_id,
+ ],
+ [[tag_id, f"committee/{self.committee1_id}"]],
+ returning=[gm_organization_tag_tagged_ids_t.id],
+ )
+ )
+ assert "duplicate key value violates unique constraint" in str(e)
- assignment_row = DbUtils.select_id_wrapper(curs, "assignment", assignment_id, ["id", "list_of_speakers_id"])
- assert assignment_row["id"] == assignment_id
- assert assignment_row["list_of_speakers_id"] == los_id
+ # todo: nr
+ # todo: nt:1Gr
+ def test_generic_nt_1Gr(self) -> None:
+ with self.db_connection.cursor() as curs:
+ with self.db_connection.transaction():
+ option_id = 3
+ curs.execute(
+ "select setval(pg_get_serial_sequence('poll_candidate_list_t', 'id'), %s);",
+ (option_id,),
+ )
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ option_t,
+ [
+ {
+ "id": option_id,
+ "content_object_id": (
+ content_object_id := f"user/{self.user1_id}"
+ ),
+ "meeting_id": self.meeting1_id,
+ },
+ ],
+ )
+ option_id = curs.execute(
+ *option_t.insert(columns, values, returning=[option_t.id])
+ ).fetchone()["id"]
+ option_row = curs.execute(
+ *option_t.select(
+ *DbUtils.get_columns_from_list(
+ option_t,
+ [
+ "id",
+ "content_object_id",
+ "content_object_id_user_id",
+ "content_object_id_poll_candidate_list_id",
+ ],
+ ),
+ where=option_t.id == option_id,
+ )
+ ).fetchone()
+ assert option_row["id"] == option_id
+ assert option_row["content_object_id"] == content_object_id
+ assert option_row["content_object_id_user_id"] == self.user1_id
+ assert option_row["content_object_id_poll_candidate_list_id"] is None
- def test_generic_1GTR_nt(self) -> None:
+ user_row = curs.execute(
+ *user_v.select(
+ user_v.username, user_v.option_ids, where=user_v.id == self.user1_id
+ )
+ ).fetchone()
+ assert user_row["option_ids"] == [option_id]
+ assert user_row["username"] == "admin"
+
+ # todo: nt:1GrR
+ def test_o2m_generic_nt_1GrR_okay(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- DbUtils.insert_many_wrapper(curs, "mediafile_t", [
- {
- "is_public": True,
- "owner_id": f"meeting/{self.meeting1_id}"
- },
- {
- "is_public": True,
- "owner_id": f"organization/{self.organization_id}"
- },
- {
- "is_public": True,
- "owner_id": f"meeting/{self.meeting1_id}"
- },
- {
- "is_public": True,
- "owner_id": f"organization/{self.organization_id}"
- },
- ])
- rows = DbUtils.select_id_wrapper(curs, "mediafile", field_names=["owner_id", "owner_id_meeting_id", "owner_id_organization_id"])
- expected_results = (("meeting/1", 1, None), ("organization/1", None, 1), ("meeting/1", 1, None), ("organization/1", None, 1))
- for i, row in enumerate (rows):
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ mediafile_t,
+ [
+ {"is_public": True, "owner_id": f"meeting/{self.meeting1_id}"},
+ {
+ "is_public": True,
+ "owner_id": f"organization/{self.organization_id}",
+ },
+ {"is_public": True, "owner_id": f"meeting/{self.meeting1_id}"},
+ {
+ "is_public": True,
+ "owner_id": f"organization/{self.organization_id}",
+ },
+ ],
+ )
+ curs.execute(*mediafile_t.insert(columns, values))
+ rows = curs.execute(
+ *mediafile_v.select(
+ mediafile_v.owner_id,
+ mediafile_v.owner_id_meeting_id,
+ mediafile_v.owner_id_organization_id,
+ )
+ ).fetchall()
+ expected_results = (
+ ("meeting/1", 1, None),
+ ("organization/1", None, 1),
+ ("meeting/1", 1, None),
+ ("organization/1", None, 1),
+ )
+ for i, row in enumerate(rows):
assert tuple(row.values()) == expected_results[i]
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["mediafile_ids"])
+ meeting_row = curs.execute(
+ *meeting_v.select(
+ meeting_v.mediafile_ids, where=meeting_v.id == self.meeting1_id
+ )
+ ).fetchone()
assert meeting_row["mediafile_ids"] == [1, 3]
- organization_row = DbUtils.select_id_wrapper(curs, "organization", self.organization_id, ["mediafile_ids"])
+ organization_row = curs.execute(
+ *organization_v.select(organization_v.mediafile_ids)
+ ).fetchone()
assert organization_row["mediafile_ids"] == [2, 4]
- def test_generic_1Gt_check_constraint_error(self) -> None:
- with pytest.raises(psycopg.DatabaseError) as e:
- with self.db_connection.cursor() as curs:
- with self.db_connection.transaction():
- DbUtils.insert_wrapper(curs, "mediafile_t", {
- "is_public": True,
- "owner_id": f"motion_state/{self.meeting1_id}"
- })
- assert 'motion_state/1' in str(e)
-
- """ generic-relation-list tests """
- def test_generic_nGt_nt(self) -> None:
+ # todo: nt:1r
+ # todo: nt:1rR
+ # todo: nt:nGt
+ def test_n2m_generic_nt_nGt_okay(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- tag_ids = DbUtils.insert_many_wrapper(curs, "organization_tag_t", [
- {
- "name": "Orga Tag 1",
- "color": "#ffee13"
- },
- {
- "name": "Orga Tag 2",
- "color": "#12ee13"
- },
- {
- "name": "Orga Tag 3",
- "color": "#00ee13"
- },
- ])
- DbUtils.insert_many_wrapper(curs, "gm_organization_tag_tagged_ids_t", [
- {"organization_tag_id": tag_ids[0], "tagged_id": f"committee/{self.committee1_id}"},
- {"organization_tag_id": tag_ids[0], "tagged_id": f"meeting/{self.meeting1_id}"},
- {"organization_tag_id": tag_ids[1], "tagged_id": f"committee/{self.committee1_id}"},
- {"organization_tag_id": tag_ids[2], "tagged_id": f"meeting/{self.meeting1_id}"},
- ])
- rows = DbUtils.select_id_wrapper(curs, "gm_organization_tag_tagged_ids_t", field_names=["id", "organization_tag_id", "tagged_id", "tagged_id_committee_id", "tagged_id_meeting_id"])
- expected_results = ((1, 1, "committee/1", 1, None), (2, 1, "meeting/1", None, 1), (3, 2, "committee/1", 1, None), (4, 3, "meeting/1", None, 1))
- for i, row in enumerate (rows):
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ organization_tag_t,
+ [
+ {"name": "Orga Tag 1", "color": "#ffee13"},
+ {"name": "Orga Tag 2", "color": "#12ee13"},
+ {"name": "Orga Tag 3", "color": "#00ee13"},
+ ],
+ )
+
+ curs.execute(
+ *organization_tag_t.insert(
+ columns, values, returning=[organization_tag_t.id]
+ )
+ )
+ tag_ids = [x["id"] for x in curs]
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ gm_organization_tag_tagged_ids_t,
+ [
+ {
+ "organization_tag_id": tag_ids[0],
+ "tagged_id": f"committee/{self.committee1_id}",
+ },
+ {
+ "organization_tag_id": tag_ids[0],
+ "tagged_id": f"meeting/{self.meeting1_id}",
+ },
+ {
+ "organization_tag_id": tag_ids[1],
+ "tagged_id": f"committee/{self.committee1_id}",
+ },
+ {
+ "organization_tag_id": tag_ids[2],
+ "tagged_id": f"meeting/{self.meeting1_id}",
+ },
+ ],
+ )
+ curs.execute(*gm_organization_tag_tagged_ids_t.insert(columns, values))
+ rows = curs.execute(
+ *gm_organization_tag_tagged_ids_t.select(
+ *DbUtils.get_columns_from_list(
+ gm_organization_tag_tagged_ids_t,
+ [
+ "id",
+ "organization_tag_id",
+ "tagged_id",
+ "tagged_id_committee_id",
+ "tagged_id_meeting_id",
+ ],
+ )
+ )
+ ).fetchall()
+ expected_results = (
+ (1, 1, "committee/1", 1, None),
+ (2, 1, "meeting/1", None, 1),
+ (3, 2, "committee/1", 1, None),
+ (4, 3, "meeting/1", None, 1),
+ )
+ for i, row in enumerate(rows):
assert tuple(row.values()) == expected_results[i]
- committee_row = DbUtils.select_id_wrapper(curs, "committee", self.committee1_id, ["organization_tag_ids"])
+ committee_row = curs.execute(
+ *committee_v.select(
+ committee_v.organization_tag_ids,
+ where=committee_v.id == self.committee1_id,
+ )
+ ).fetchone()
assert committee_row["organization_tag_ids"] == [1, 2]
- meeting_row = DbUtils.select_id_wrapper(curs, "meeting", self.meeting1_id, ["organization_tag_ids"])
+ meeting_row = curs.execute(
+ *meeting_v.select(
+ meeting_v.organization_tag_ids,
+ where=meeting_v.id == self.meeting1_id,
+ )
+ ).fetchone()
assert meeting_row["organization_tag_ids"] == [1, 3]
- def test_generic_nGt_check_constraint_error(self) -> None:
- with pytest.raises(psycopg.DatabaseError) as e:
- with self.db_connection.cursor() as curs:
+ # todo: nt:nt
+ # todo: ntR:1r
+ def test_o2m_ntR_1r_update_okay(self) -> None:
+ """Update sets new default projector before 2nd removes old default projector"""
+ with self.db_connection.cursor() as curs:
+ projector_ids = curs.execute(
+ *projector_t.select(
+ projector_t.id, where=projector_t.meeting_id == self.meeting1_id
+ )
+ ).fetchall()
+ with self.db_connection.transaction():
+ curs.execute(
+ *projector_t.update(
+ [projector_t.used_as_default_projector_for_topic_in_meeting_id],
+ [self.meeting1_id],
+ where=projector_t.id == [projector_ids[1]["id"]],
+ )
+ )
+ curs.execute(
+ *projector_t.update(
+ [projector_t.used_as_default_projector_for_topic_in_meeting_id],
+ [None],
+ where=projector_t.id == [projector_ids[0]["id"]],
+ )
+ )
+ assert (
+ projector_ids[1]["id"]
+ == curs.execute(
+ *meeting_v.select(
+ meeting_v.default_projector_topic_ids,
+ where=meeting_v.id == self.meeting1_id,
+ )
+ ).fetchone()["default_projector_topic_ids"][0]
+ )
+
+ def test_o2m_ntR_1r_update_error(self) -> None:
+ """update removes default projector => Exception"""
+ with self.db_connection.cursor() as curs:
+ with pytest.raises(psycopg.errors.RaiseException) as e:
+ projector_id = curs.execute(
+ *projector_t.select(
+ projector_t.id,
+ where=projector_t.used_as_default_projector_for_topic_in_meeting_id
+ == self.meeting1_id,
+ )
+ ).fetchone()["id"]
with self.db_connection.transaction():
- tag_id = DbUtils.insert_wrapper(curs, "organization_tag_t", {"name": "Orga Tag 1", "color": "#ffee13"})
- DbUtils.insert_many_wrapper(curs, "gm_organization_tag_tagged_ids_t", [
- {"organization_tag_id": tag_id, "tagged_id": f"committee/{self.committee1_id}"},
- {"organization_tag_id": tag_id, "tagged_id": f"motion_state/{self.meeting1_id}"},
- ])
- assert 'motion_state/1' in str(e)
+ curs.execute(
+ *projector_t.update(
+ [
+ projector_t.used_as_default_projector_for_topic_in_meeting_id
+ ],
+ [
+ None,
+ ],
+ where=projector_t.id == projector_id,
+ )
+ )
+ assert (
+ "Exception: NOT NULL CONSTRAINT VIOLATED for meeting.default_projector_topic_ids"
+ in str(e)
+ )
- def test_generic_nGt_unique_constraint_error(self) -> None:
+ def test_o2m_ntR_1r_delete_error(self) -> None:
+ """delete projector from meeting => Exception"""
with self.db_connection.cursor() as curs:
- with self.db_connection.transaction():
- tag_id = DbUtils.insert_wrapper(curs, "organization_tag_t", {"name": "Orga Tag 1", "color": "#ffee13"})
- DbUtils.insert_wrapper(curs, "gm_organization_tag_tagged_ids_t", {"organization_tag_id": tag_id, "tagged_id": f"committee/{self.committee1_id}"})
- with pytest.raises(psycopg.DatabaseError) as e:
+ projector_id = curs.execute(
+ "SELECT id from projector where used_as_default_projector_for_topic_in_meeting_id = %s",
+ (self.meeting1_id,),
+ ).fetchone()["id"]
+ with pytest.raises(psycopg.errors.RaiseException) as e:
with self.db_connection.transaction():
- DbUtils.insert_wrapper(curs, "gm_organization_tag_tagged_ids_t", {"organization_tag_id": tag_id, "tagged_id": f"committee/{self.committee1_id}"})
- assert 'duplicate key value violates unique constraint' in str(e)
+ curs.execute(
+ sql.SQL("DELETE FROM projector where id = %s;"), (projector_id,)
+ )
+ assert "Exception: NOT NULL CONSTRAINT VIOLATED" in str(e)
+
+ def test_o2m_ntR_1r_insert_delete_okay(self) -> None:
+ """first insert, than delete old default projector from meeting => okay"""
+ with self.db_connection.cursor() as curs:
+ with self.db_connection.transaction():
+ columns = DbUtils.get_columns_from_list(
+ projector_t,
+ [
+ "id",
+ "meeting_id",
+ "used_as_default_projector_for_agenda_item_list_in_meeting_id",
+ "used_as_default_projector_for_topic_in_meeting_id",
+ "used_as_default_projector_for_list_of_speakers_in_meeting_id",
+ "used_as_default_projector_for_current_los_in_meeting_id",
+ "used_as_default_projector_for_motion_in_meeting_id",
+ "used_as_default_projector_for_amendment_in_meeting_id",
+ "used_as_default_projector_for_motion_block_in_meeting_id",
+ "used_as_default_projector_for_assignment_in_meeting_id",
+ "used_as_default_projector_for_mediafile_in_meeting_id",
+ "used_as_default_projector_for_message_in_meeting_id",
+ "used_as_default_projector_for_countdown_in_meeting_id",
+ "used_as_default_projector_for_assignment_poll_in_meeting_id",
+ "used_as_default_projector_for_motion_poll_in_meeting_id",
+ "used_as_default_projector_for_poll_in_meeting_id",
+ "sequential_number",
+ ],
+ )
+ projector: dict[str, Any] = curs.execute(
+ *projector_t.select(
+ *columns,
+ where=projector_t.used_as_default_projector_for_topic_in_meeting_id
+ == self.meeting1_id,
+ )
+ ).fetchone()
+ projector_old_id = projector.pop("id")
+ projector["sequential_number"] += 2
+ columns, values = DbUtils.get_columns_and_values_for_insert(
+ projector_t, [projector]
+ )
+ projector_new_id = curs.execute(
+ *projector_t.insert(columns, values, returning=[projector_t.id])
+ ).fetchone()["id"]
+ curs.execute(
+ *meeting_t.update(
+ [meeting_t.reference_projector_id], [projector_new_id]
+ )
+ )
+ curs.execute(
+ *projector_t.delete(where=projector_t.id == projector_old_id)
+ )
+ assert (
+ projector_new_id
+ == curs.execute(
+ *meeting_v.select(
+ meeting_v.default_projector_topic_ids,
+ where=projector_new_id == meeting_v.reference_projector_id,
+ )
+ ).fetchone()["default_projector_topic_ids"][0]
+ )
+
+ # todo: nts:nts
+
class EnumTests(BaseTestCase):
def test_correct_singular_values_in_meeting(self) -> None:
meeting_t = Table("meeting_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- meeting = curs.execute(*meeting_t.select(meeting_t.language, meeting_t.export_pdf_fontsize, where=meeting_t.id==1)).fetchone()
+ meeting = curs.execute(
+ *meeting_t.select(
+ meeting_t.language,
+ meeting_t.export_pdf_fontsize,
+ where=meeting_t.id == 1,
+ )
+ ).fetchone()
assert meeting["language"] == "en"
assert meeting["export_pdf_fontsize"] == 10
- meeting = curs.execute(*meeting_t.update([meeting_t.language, meeting_t.export_pdf_fontsize], ["de", 11], where=meeting_t.id==1, returning=[meeting_t.id, meeting_t.language])).fetchone()
+ meeting = curs.execute(
+ *meeting_t.update(
+ [meeting_t.language, meeting_t.export_pdf_fontsize],
+ ["de", 11],
+ where=meeting_t.id == 1,
+ returning=[meeting_t.id, meeting_t.language],
+ )
+ ).fetchone()
assert meeting["language"] == "de"
def test_wrong_language_in_meeting(self) -> None:
@@ -315,7 +856,11 @@ def test_wrong_language_in_meeting(self) -> None:
with self.db_connection.cursor() as curs:
with pytest.raises(psycopg.DatabaseError) as e:
with self.db_connection.transaction():
- curs.execute(*meeting_t.update([meeting_t.language], ["xx"], where=meeting_t.id==1))
+ curs.execute(
+ *meeting_t.update(
+ [meeting_t.language], ["xx"], where=meeting_t.id == 1
+ )
+ )
assert 'violates check constraint "enum_meeting_language"' in str(e)
def test_wrong_pdf_fontsize_in_meeting(self) -> None:
@@ -323,20 +868,37 @@ def test_wrong_pdf_fontsize_in_meeting(self) -> None:
with self.db_connection.cursor() as curs:
with pytest.raises(psycopg.DatabaseError) as e:
with self.db_connection.transaction():
- curs.execute(*meeting_t.update([meeting_t.export_pdf_fontsize], [22], where=meeting_t.id==1))
+ curs.execute(
+ *meeting_t.update(
+ [meeting_t.export_pdf_fontsize],
+ [22],
+ where=meeting_t.id == 1,
+ )
+ )
assert 'violates check constraint "enum_meeting_export_pdf_fontsize"' in str(e)
def test_correct_permissions_in_group(self) -> None:
group_t = Table("group_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- group = curs.execute(*group_t.select(group_t.permissions, where=group_t.id==1)).fetchone()
+ group: dict[str, Any] = curs.execute(
+ *group_t.select(group_t.permissions, where=group_t.id == 1)
+ ).fetchone()
assert "agenda_item.can_see_internal" in group["permissions"]
assert "user.can_see" in group["permissions"]
assert "chat.can_manage" not in group["permissions"]
group["permissions"].remove("user.can_see")
group["permissions"].append("chat.can_manage")
- sql = tuple(group_t.update([group_t.permissions], [DbUtils.get_pg_array_for_cu(group["permissions"]),], where=group_t.id==1, returning=[group_t.permissions]))
+ sql = tuple(
+ group_t.update(
+ [group_t.permissions],
+ [
+ DbUtils.get_pg_array_for_cu(group["permissions"]),
+ ],
+ where=group_t.id == 1,
+ returning=[group_t.permissions],
+ )
+ )
group = curs.execute(*sql).fetchone()
assert "agenda_item.can_see_internal" in group["permissions"]
assert "user.can_see" not in group["permissions"]
@@ -348,16 +910,32 @@ def test_wrong_permissions_in_group(self) -> None:
with self.db_connection.transaction():
with pytest.raises(psycopg.DatabaseError) as e:
group = {"permissions": ["user.can_see", "invalid permission"]}
- sql = tuple(group_t.update([group_t.permissions], [DbUtils.get_pg_array_for_cu(group["permissions"]),], where=group_t.id==1, returning=[group_t.permissions]))
+ sql = tuple(
+ group_t.update(
+ [group_t.permissions],
+ [
+ DbUtils.get_pg_array_for_cu(group["permissions"]),
+ ],
+ where=group_t.id == 1,
+ returning=[group_t.permissions],
+ )
+ )
group = curs.execute(*sql).fetchone()
assert 'violates check constraint "enum_group_permissions"' in str(e)
+
class DataTypeTests(BaseTestCase):
def test_color_type_correct(self) -> None:
orga_tag_t = Table("organization_tag_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- orga_tags = curs.execute(*orga_tag_t.insert(columns=[orga_tag_t.name, orga_tag_t.color], values=[['Foo', '#ff12cc'], ["Bar", "#1234AA"]], returning=[orga_tag_t.id, orga_tag_t.name, orga_tag_t.color])).fetchall()
+ orga_tags = curs.execute(
+ *orga_tag_t.insert(
+ columns=[orga_tag_t.name, orga_tag_t.color],
+ values=[["Foo", "#ff12cc"], ["Bar", "#1234AA"]],
+ returning=[orga_tag_t.id, orga_tag_t.name, orga_tag_t.color],
+ )
+ ).fetchall()
assert orga_tags[0] == {"id": 1, "name": "Foo", "color": "#ff12cc"}
assert orga_tags[1] == {"id": 2, "name": "Bar", "color": "#1234AA"}
@@ -366,15 +944,36 @@ def test_color_type_not_null_error(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
with pytest.raises(psycopg.DatabaseError) as e:
- curs.execute(*orga_tag_t.insert(columns=[orga_tag_t.name, orga_tag_t.color], values=[['Foo', None]], returning=[orga_tag_t.id, orga_tag_t.name, orga_tag_t.color])).fetchone()
- assert 'null value in column "color" of relation "organization_tag_t" violates not-null constraint' in str(e)
+ curs.execute(
+ *orga_tag_t.insert(
+ columns=[orga_tag_t.name, orga_tag_t.color],
+ values=[["Foo", None]],
+ returning=[
+ orga_tag_t.id,
+ orga_tag_t.name,
+ orga_tag_t.color,
+ ],
+ )
+ ).fetchone()
+ assert (
+ 'null value in column "color" of relation "organization_tag_t" violates not-null constraint'
+ in str(e)
+ )
def test_color_type_null_correct(self) -> None:
sl_t = Table("structure_level_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
- sl_id = curs.execute(*sl_t.insert(columns=[sl_t.name, sl_t.color, sl_t.meeting_id], values=[['Foo', None, 1]], returning=[sl_t.id])).fetchone()["id"]
- structure_level = curs.execute(*sl_t.select(sl_t.id, sl_t.color, where=sl_t.id==sl_id)).fetchone()
+ sl_id = curs.execute(
+ *sl_t.insert(
+ columns=[sl_t.name, sl_t.color, sl_t.meeting_id],
+ values=[["Foo", None, 1]],
+ returning=[sl_t.id],
+ )
+ ).fetchone()["id"]
+ structure_level = curs.execute(
+ *sl_t.select(sl_t.id, sl_t.color, where=sl_t.id == sl_id)
+ ).fetchone()
assert structure_level == {"id": sl_id, "color": None}
def test_color_type_empty_string_error(self) -> None:
@@ -382,21 +981,53 @@ def test_color_type_empty_string_error(self) -> None:
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
with pytest.raises(psycopg.DatabaseError) as e:
- curs.execute(*sl_t.insert(columns=[sl_t.name, sl_t.color, sl_t.meeting_id], values=[['Foo', '', 1]], returning=[sl_t.id])).fetchone()["id"]
- assert """new row for relation "structure_level_t" violates check constraint "structure_level_t_color_check""" in str(e)
+ curs.execute(
+ *sl_t.insert(
+ columns=[sl_t.name, sl_t.color, sl_t.meeting_id],
+ values=[["Foo", "", 1]],
+ returning=[sl_t.id],
+ )
+ ).fetchone()["id"]
+ assert (
+ """new row for relation "structure_level_t" violates check constraint "structure_level_t_color_check"""
+ in str(e)
+ )
def test_color_type_wrong_string_error(self) -> None:
sl_t = Table("structure_level_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
with pytest.raises(psycopg.DatabaseError) as e:
- curs.execute(*sl_t.insert(columns=[sl_t.name, sl_t.color, sl_t.meeting_id], values=[['Foo', 'xxx', 1]], returning=[sl_t.id])).fetchone()["id"]
- assert """new row for relation "structure_level_t" violates check constraint "structure_level_t_color_check""" in str(e)
+ curs.execute(
+ *sl_t.insert(
+ columns=[sl_t.name, sl_t.color, sl_t.meeting_id],
+ values=[["Foo", "xxx", 1]],
+ returning=[sl_t.id],
+ )
+ ).fetchone()["id"]
+ assert (
+ """new row for relation "structure_level_t" violates check constraint "structure_level_t_color_check"""
+ in str(e)
+ )
def test_color_type_to_long_string_error(self) -> None:
sl_t = Table("structure_level_t")
with self.db_connection.cursor() as curs:
with self.db_connection.transaction():
with pytest.raises(psycopg.DatabaseError) as e:
- curs.execute(*sl_t.insert(columns=[sl_t.name, sl_t.color, sl_t.meeting_id], values=[['Foo', '#1234567', 1]], returning=[sl_t.id])).fetchone()["id"]
+ curs.execute(
+ *sl_t.insert(
+ columns=[sl_t.name, sl_t.color, sl_t.meeting_id],
+ values=[["Foo", "#1234567", 1]],
+ returning=[sl_t.id],
+ )
+ ).fetchone()["id"]
assert """value too long for type character varying(7)""" in str(e)
+
+
+class ManualSqlTests(BaseTestCase):
+ pass
+
+
+class ConstraintTests(BaseTestCase):
+ """foreign keys etc."""
diff --git a/models.yml b/models.yml
index 8309b899..454cbddf 100644
--- a/models.yml
+++ b/models.yml
@@ -66,7 +66,6 @@
# ("1t", "1rR"): (FieldSqlErrorType.SQL, False),
# ("1tR", "1Gr"): (FieldSqlErrorType.SQL, False),
# ("1tR", "1GrR"): (FieldSqlErrorType.SQL, False),
-# ("1rR", "1t"): (FieldSqlErrorType.FIELD, False),
# ("nGt", "nt"): (FieldSqlErrorType.SQL, True),
# ("nr", ""): (FieldSqlErrorType.SQL, True),
# ("nt", "1Gr"): (FieldSqlErrorType.SQL, False),