From 8feb0ebb0fc2fba6d415d41f97bc4112ed69d52c Mon Sep 17 00:00:00 2001 From: lixiliu <36629962+lixiliu@users.noreply.github.com> Date: Sat, 21 Dec 2024 00:19:06 -0800 Subject: [PATCH] add roll_time_interval, wrap_timestamps --- src/chronify/models.py | 4 + src/chronify/sqlalchemy/functions.py | 11 +- src/chronify/store.py | 2 +- src/chronify/time_series_checker.py | 13 +- src/chronify/time_series_mapper_base.py | 95 ++++++++++ src/chronify/time_series_mapper_datetime.py | 63 ++++--- .../time_series_mapper_representative.py | 88 ++------- src/chronify/time_utils.py | 41 +++++ tests/test_checker_representative_time.py | 2 +- tests/test_mapper_datetime_to_datetime.py | 169 ++++++++++++++++++ ..._mapper_representative_time_to_datetime.py | 2 +- tests/test_models.py | 4 +- tests/test_time_series_checker.py | 2 +- 13 files changed, 383 insertions(+), 113 deletions(-) create mode 100644 tests/test_mapper_datetime_to_datetime.py diff --git a/src/chronify/models.py b/src/chronify/models.py index 51d0bca..0663cc7 100644 --- a/src/chronify/models.py +++ b/src/chronify/models.py @@ -62,6 +62,10 @@ class TableSchema(TableSchemaBase): @classmethod def check_name(cls, name: str) -> str: _check_name(name) + if name.lower() == "table": + # Avoid for DuckdB. + msg = f"Table schema cannot use {name=}." + raise ValueError(msg) return name @field_validator("value_column") diff --git a/src/chronify/sqlalchemy/functions.py b/src/chronify/sqlalchemy/functions.py index 2ea746a..d5372f7 100644 --- a/src/chronify/sqlalchemy/functions.py +++ b/src/chronify/sqlalchemy/functions.py @@ -44,10 +44,12 @@ def write_database( df: pd.DataFrame, conn: Connection, table_name: str, - config: TimeBaseModel, + configs: list[TimeBaseModel], if_table_exists: DbWriteMode = "append", ) -> None: - """Write a Pandas DataFrame to the database.""" + """Write a Pandas DataFrame to the database. + configs allows sqlite formatting for more than one datetime columns. + """ match conn.engine.name: case "duckdb": assert conn._dbapi_connection is not None @@ -67,8 +69,9 @@ def write_database( raise InvalidOperation(msg) conn._dbapi_connection.driver_connection.sql(query) case "sqlite": - if isinstance(config, DatetimeRange): - df = _convert_database_input_for_datetime(df, config) + for config in configs: + if isinstance(config, DatetimeRange): + df = _convert_database_input_for_datetime(df, config) pl.DataFrame(df).write_database( table_name, connection=conn, if_table_exists=if_table_exists ) diff --git a/src/chronify/store.py b/src/chronify/store.py index d55c88f..78852cb 100644 --- a/src/chronify/store.py +++ b/src/chronify/store.py @@ -688,7 +688,7 @@ def _ingest_table( created_table = False try: - write_database(df, conn, schema.name, schema.time_config) + write_database(df, conn, schema.name, [schema.time_config]) except Exception: if created_table: table.drop(self._engine) diff --git a/src/chronify/time_series_checker.py b/src/chronify/time_series_checker.py index 3ec6bff..c052a33 100644 --- a/src/chronify/time_series_checker.py +++ b/src/chronify/time_series_checker.py @@ -115,11 +115,12 @@ def check_timestamp_lists(actual: list[pd.Timestamp], expected: list[pd.Timestam match = actual == expected if not match: if len(actual) != len(expected): - msg1 = f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}" - raise InvalidTable(msg1) + msg = f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}\n" + else: + msg = "" missing = [x for x in expected if x not in actual] extra = [x for x in actual if x not in expected] - msg2 = "Actual timestamps do not match expected timestamps. \n" - msg2 += f"Missing: {missing} \n" - msg2 += f"Extra: {extra}" - raise InvalidTable(msg2) + msg += "Actual timestamps do not match expected timestamps. \n" + msg += f"Missing: {missing} \n" + msg += f"Extra: {extra}" + raise InvalidTable(msg) diff --git a/src/chronify/time_series_mapper_base.py b/src/chronify/time_series_mapper_base.py index f9bd699..a30b688 100644 --- a/src/chronify/time_series_mapper_base.py +++ b/src/chronify/time_series_mapper_base.py @@ -1,5 +1,100 @@ import abc +import logging +from functools import reduce +from operator import and_ + +import pandas as pd +from sqlalchemy import Engine, MetaData, Table, select, text + +from chronify.sqlalchemy.functions import write_database +from chronify.models import TableSchema +from chronify.exceptions import TableAlreadyExists +from chronify.utils.sqlalchemy_table import create_table +from chronify.time_series_checker import check_timestamps +from chronify.time_configs import DatetimeRange + +logger = logging.getLogger(__name__) class TimeSeriesMapperBase(abc.ABC): """Maps time series data from one configuration to another.""" + + +def apply_mapping( + df_mapping: pd.DataFrame, + mapping_table_name: str, + from_schema: TableSchema, + to_schema: TableSchema, + engine: Engine, + metadata: MetaData, +) -> None: + """ + Apply mapping to create result table with process to clean up and roll back if checks fail + """ + if mapping_table_name in metadata.tables: + msg = ( + f"table {mapping_table_name} already exists, delete it or use a different table name." + ) + raise TableAlreadyExists(msg) + + time_configs = [to_schema.time_config] + if isinstance(from_schema.time_config, DatetimeRange): + from_time_config = from_schema.time_config.model_copy() + from_time_config.time_column = "from_" + from_time_config.time_column + time_configs.append(from_time_config) + try: + with engine.connect() as conn: + write_database( + df_mapping, conn, mapping_table_name, time_configs, if_table_exists="fail" + ) + metadata.reflect(engine, views=True) + _apply_mapping(mapping_table_name, from_schema, to_schema, engine, metadata) + mapped_table = Table(to_schema.name, metadata) + try: + check_timestamps(conn, mapped_table, to_schema) + except Exception: + logger.exception( + "check_timestamps failed on mapped table {}. Drop it", to_schema.name + ) + conn.rollback() + raise + conn.commit() + finally: + if mapping_table_name in metadata.tables: + with engine.connect() as conn: + conn.execute(text(f"DROP TABLE {mapping_table_name}")) + conn.commit() + metadata.remove(Table(mapping_table_name, metadata)) + + +def _apply_mapping( + mapping_table_name: str, + from_schema: TableSchema, + to_schema: TableSchema, + engine: Engine, + metadata: MetaData, +) -> None: + """Apply mapping to create result as a table according to_schema + - Columns used to join the from_table are prefixed with "from_" in the mapping table + """ + left_table = Table(from_schema.name, metadata) + right_table = Table(mapping_table_name, metadata) + left_table_columns = [x.name for x in left_table.columns] + right_table_columns = [x.name for x in right_table.columns] + + final_cols = set(to_schema.list_columns()) + right_cols = set(right_table_columns).intersection(final_cols) + left_cols = final_cols - right_cols + + select_stmt = [left_table.c[x] for x in left_cols] + select_stmt += [right_table.c[x] for x in right_cols] + + keys = from_schema.time_config.list_time_columns() + # infer the use of time_zone + if "from_time_zone" in right_table_columns: + keys.append("time_zone") + assert "time_zone" in left_table_columns, f"time_zone not in table={from_schema.name}" + + on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys)) + query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) + create_table(to_schema.name, query, engine, metadata) diff --git a/src/chronify/time_series_mapper_datetime.py b/src/chronify/time_series_mapper_datetime.py index 35e25c5..dd9197e 100644 --- a/src/chronify/time_series_mapper_datetime.py +++ b/src/chronify/time_series_mapper_datetime.py @@ -1,16 +1,18 @@ import logging -from sqlalchemy import Engine, MetaData, Table +import pandas as pd +from sqlalchemy import Engine, MetaData from chronify.models import TableSchema from chronify.exceptions import ( ConflictingInputsError, InvalidParameter, ) -from chronify.time_series_mapper_base import TimeSeriesMapperBase -from chronify.utils.sqlalchemy_table import create_table +from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping from chronify.time_configs import DatetimeRange from chronify.time import TimeIntervalType +from chronify.time_range_generator_factory import make_time_range_generator +from chronify.time_utils import roll_time_interval logger = logging.getLogger(__name__) @@ -71,31 +73,36 @@ def _check_time_resolution(self) -> None: def map_time(self) -> None: """Convert time columns with from_schema to to_schema configuration.""" + if self._from_schema == self._to_schema: + msg = "From table schema is the same as to table schema. Nothing to do.\n{self._from_schema}" + logger.info(msg) + return self.check_schema_consistency() + map_table_name = "mapping_table" + df = self._create_mapping() + apply_mapping( + df, map_table_name, self._from_schema, self._to_schema, self._engine, self._metadata + ) + # TODO - add handling for changing resolution + + def _create_mapping(self) -> pd.DataFrame: + """Create mapping dataframe + Handles time interval type + """ + from_time_col = "from_" + self._from_time_config.time_column + to_time_col = self._to_time_config.time_column + to_time_data = make_time_range_generator(self._to_time_config).list_timestamps() + df = pd.DataFrame( + { + from_time_col: make_time_range_generator(self._from_time_config).list_timestamps(), + to_time_col: to_time_data, + } + ) if self._from_time_config.interval_type != self._to_time_config.interval_type: - self._shift_time_interval() - # TODO - add handling for changing resolution - - def _shift_time_interval(self) -> None: - table = Table(self._from_schema.name, self._metadata) - time_col = self._from_time_config.time_column - new_time_col = self._to_time_config.time_column - if new_time_col == time_col: - other_cols = [x.name for x in table.columns if x != time_col] - else: - other_cols = table.columns - - match (self._from_time_config.interval_type, self._to_time_config.interval_type): - case (TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING): - op = "+" - case (TimeIntervalType.PERIOD_ENDING, TimeIntervalType.PERIOD_BEGINNING): - op = "-" - case _: - msg = f"Cannot handle from {self._from_time_config.interval_type} to {self._to_time_config.interval_type}" - raise InvalidParameter(msg) - - delta_sec = self._from_time_config.resolution.seconds - query = f"SELECT {other_cols}, {time_col} {op} INTERVAL {delta_sec} SECONDS AS {new_time_col} FROM {self._from_schema.name}" - - create_table(self._to_schema.name, query, self._engine, self._metadata) + df[to_time_col] = roll_time_interval( + df[to_time_col], + self._from_time_config.interval_type, + self._to_time_config.interval_type, + ) + return df diff --git a/src/chronify/time_series_mapper_representative.py b/src/chronify/time_series_mapper_representative.py index 9339bc4..5ca93d8 100644 --- a/src/chronify/time_series_mapper_representative.py +++ b/src/chronify/time_series_mapper_representative.py @@ -1,23 +1,18 @@ import logging -from functools import reduce -from operator import and_ import pandas as pd -from sqlalchemy import Engine, MetaData, Table, select, text +from sqlalchemy import Engine, MetaData, Table, select -from chronify.sqlalchemy.functions import read_database, write_database +from chronify.sqlalchemy.functions import read_database from chronify.models import TableSchema from chronify.exceptions import ( MissingParameter, ConflictingInputsError, - TableAlreadyExists, InvalidParameter, ) from chronify.time_range_generator_factory import make_time_range_generator -from chronify.time_series_mapper_base import TimeSeriesMapperBase -from chronify.utils.sqlalchemy_table import create_table +from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping from chronify.representative_time_range_generator import RepresentativePeriodTimeGenerator -from chronify.time_series_checker import check_timestamps from chronify.time_configs import DatetimeRange, RepresentativePeriodTime from chronify.time_utils import shift_time_interval from chronify.time import TimeIntervalType @@ -88,41 +83,16 @@ def map_time(self) -> None: if not is_tz_naive: self._check_source_table_has_time_zone() - map_table_name = "map_table" - dfm = self._create_mapping(is_tz_naive) - if map_table_name in self._metadata.tables: - msg = ( - f"table {map_table_name} already exists, delete it or use a different table name." - ) - raise TableAlreadyExists(msg) - - try: - with self._engine.connect() as conn: - write_database( - dfm, conn, map_table_name, self._to_time_config, if_table_exists="fail" - ) - self._metadata.reflect(self._engine, views=True) - self._apply_mapping(map_table_name) - mapped_table = Table(self._to_schema.name, self._metadata) - try: - check_timestamps(conn, mapped_table, self._to_schema) - except Exception: - logger.exception( - "check_timestamps failed on mapped table {}. Drop it", self._to_schema.name - ) - conn.rollback() - raise - conn.commit() - finally: - if map_table_name in self._metadata.tables: - with self._engine.connect() as conn: - conn.execute(text(f"DROP TABLE {map_table_name}")) - conn.commit() - self._metadata.remove(Table(map_table_name, self._metadata)) + map_table_name = "mapping_table" + df = self._create_mapping(is_tz_naive) + apply_mapping( + df, map_table_name, self._from_schema, self._to_schema, self._engine, self._metadata + ) def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame: """Create mapping dataframe - Handles time interval type + - Handles time interval type adjustment + - Columns used to join the from_table are prefixed with "from_" """ timestamp_generator = make_time_range_generator(self._to_time_config) @@ -131,6 +101,8 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame: if self._from_time_config.interval_type != self._to_time_config.interval_type: time_col = "mapping_" + to_time_col + # mapping works backward for representative time + # shift is correct here for extracting time info dft[time_col] = shift_time_interval( dft[to_time_col], self._to_time_config.interval_type, @@ -139,8 +111,9 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame: else: time_col = to_time_col + from_columns = self._from_time_config.list_time_columns() if is_tz_naive: - dfm = self._generator.create_tz_naive_mapping_dataframe(dft, time_col) + df = self._generator.create_tz_naive_mapping_dataframe(dft, time_col) else: with self._engine.connect() as conn: table = Table(self._from_schema.name, self._metadata) @@ -152,31 +125,8 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame: time_zones = read_database(stmt, conn, self._from_time_config)[ "time_zone" ].to_list() - dfm = self._generator.create_tz_aware_mapping_dataframe(dft, time_col, time_zones) - return dfm - - def _apply_mapping(self, map_table_name: str) -> None: - """Apply mapping to create result as a table according to_schema""" - left_table = Table(self._from_schema.name, self._metadata) - right_table = Table(map_table_name, self._metadata) - left_table_columns = [x.name for x in left_table.columns] - right_table_columns = [x.name for x in right_table.columns] - - final_cols = set(self._to_schema.list_columns()) - left_cols = set(left_table_columns).intersection(final_cols) - right_cols = final_cols - left_cols - - select_stmt = [left_table.c[x] for x in left_cols] - select_stmt += [right_table.c[x] for x in right_cols] - - keys = self._from_time_config.list_time_columns() - if not self._to_time_config.is_time_zone_naive(): - keys.append("time_zone") - assert ( - "time_zone" in left_table_columns - ), f"time_zone not in table={self._from_schema.name}" - assert "time_zone" in right_table_columns, f"time_zone not in table={map_table_name}" - - on_stmt = reduce(and_, (left_table.c[x] == right_table.c[x] for x in keys)) - query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt) - create_table(self._to_schema.name, query, self._engine, self._metadata) + df = self._generator.create_tz_aware_mapping_dataframe(dft, time_col, time_zones) + from_columns.append("time_zone") + + df = df.rename(columns={x: "from_" + x for x in from_columns}) + return df diff --git a/src/chronify/time_utils.py b/src/chronify/time_utils.py index b10dbfa..0d8ac7e 100644 --- a/src/chronify/time_utils.py +++ b/src/chronify/time_utils.py @@ -37,3 +37,44 @@ def shift_time_interval( msg = f"Cannot handle from {from_interval_type} to {to_interval_type}" raise InvalidParameter(msg) return df + freq * mult + + +def roll_time_interval( + df: pd.Series, from_interval_type: TimeIntervalType, to_interval_type: TimeIntervalType +) -> pd.Series: + """Roll pandas timeseries by time interval based on interval type with np.roll(), + which includes time-wrapping. + """ + assert ( + from_interval_type != to_interval_type + ), f"from_ and to_interval_type are the same: {from_interval_type}" + match (from_interval_type, to_interval_type): + case (TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING): + # shift time forward, 1am pd-beginning >> 2am pd-ending + shift = -1 + case (TimeIntervalType.PERIOD_ENDING, TimeIntervalType.PERIOD_BEGINNING): + # shift time backward + shift = 1 + case _: + msg = f"Cannot handle from {from_interval_type} to {to_interval_type}" + raise InvalidParameter(msg) + return np.roll(df, shift) + + +def wrap_timestamps(df: pd.Series, to_timestamps: list[pd.Timestamp]) -> pd.Series: + """Wrap pandas timeseries so it conforms to a list of timestamps.""" + arr = np.sort(to_timestamps) + freqs = set((np.roll(arr, -1) - arr)[:-1]) + assert len(freqs), f"Timeseries has more than one frequency, {freqs}" + freq = list(freqs)[0] + + tmin, tmax = arr[0], arr[-1] + tdelta = tmax - tmin + freq + df2 = df.copy() + lower_cond = df < tmin + if lower_cond.sum() > 0: + df2.loc[lower_cond] += tdelta + upper_cond = df > tmax + if upper_cond.sum() > 0: + df2.loc[upper_cond] -= tdelta + return df2 diff --git a/tests/test_checker_representative_time.py b/tests/test_checker_representative_time.py index 349b428..7c94f43 100644 --- a/tests/test_checker_representative_time.py +++ b/tests/test_checker_representative_time.py @@ -14,7 +14,7 @@ def ingest_data_and_check( ) -> None: metadata = MetaData() with engine.connect() as conn: - write_database(df, conn, schema.name, schema.time_config, if_table_exists="replace") + write_database(df, conn, schema.name, [schema.time_config], if_table_exists="replace") conn.commit() metadata.reflect(engine, views=True) diff --git a/tests/test_mapper_datetime_to_datetime.py b/tests/test_mapper_datetime_to_datetime.py new file mode 100644 index 0000000..61265ea --- /dev/null +++ b/tests/test_mapper_datetime_to_datetime.py @@ -0,0 +1,169 @@ +from zoneinfo import ZoneInfo +import pytest +from datetime import datetime, timedelta +from typing import Any +import numpy as np + +import pandas as pd +from sqlalchemy import Engine, MetaData + +from chronify.sqlalchemy.functions import read_database, write_database +from chronify.time_series_mapper import map_time +from chronify.time_configs import DatetimeRange +from chronify.models import TableSchema +from chronify.time import TimeIntervalType, MeasurementType +from chronify.exceptions import ConflictingInputsError, InvalidParameter +from chronify.datetime_range_generator import DatetimeRangeGenerator +from chronify.time_utils import shift_time_interval, roll_time_interval, wrap_timestamps + + +def generate_datetime_data(time_config: DatetimeRange) -> pd.Series: + return pd.to_datetime(list(DatetimeRangeGenerator(time_config).iter_timestamps())) + + +def generate_datetime_dataframe(schema: TableSchema) -> pd.DataFrame: + df = pd.DataFrame({schema.time_config.time_column: generate_datetime_data(schema.time_config)}) + + for i, x in enumerate(schema.time_array_id_columns): + df[x] = i + df[schema.value_column] = np.random.rand(len(df)) + return df + + +def get_datetime_schema( + year: int, tzinfo: ZoneInfo | None, interval_type: TimeIntervalType, name: str +) -> TableSchema: + start = datetime(year=year, month=1, day=1, tzinfo=tzinfo) + end = datetime(year=year + 1, month=1, day=1, tzinfo=tzinfo) + resolution = timedelta(hours=1) + length = (end - start) / resolution + 1 + schema = TableSchema( + name=name, + time_config=DatetimeRange( + start=start, + resolution=resolution, + length=length, + interval_type=interval_type, + time_column="timestamp", + ), + time_array_id_columns=["id"], + value_column="value", + ) + return schema + + +def check_dataframes( + dfi: pd.DataFrame, dfo: pd.DataFrame, from_schema: TableSchema, to_schema: TableSchema +) -> None: + assert ( + dfo[to_schema.time_config.time_column] == dfi[from_schema.time_config.time_column] + ).all() + match from_schema.time_config.interval_type, to_schema.time_config.interval_type: + case TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING: + shift = 1 + case TimeIntervalType.PERIOD_ENDING, TimeIntervalType.PERIOD_BEGINNING: + shift = -1 + assert (np.array(dfo["value"]) == np.roll(dfi["value"], shift)).all() + + +def run_test( + engine: Engine, + df: pd.DataFrame, + from_schema: TableSchema, + to_schema: TableSchema, + error: tuple[Any, str], +) -> None: + # Ingest + metadata = MetaData() + with engine.connect() as conn: + write_database( + df, conn, from_schema.name, [from_schema.time_config], if_table_exists="replace" + ) + conn.commit() + metadata.reflect(engine, views=True) + + # Map + if error: + with pytest.raises(error[0], match=error[1]): + map_time(engine, metadata, from_schema, to_schema) + else: + map_time(engine, metadata, from_schema, to_schema) + + # Check mapped table + with engine.connect() as conn: + query = f"select * from {to_schema.name}" + queried = read_database(query, conn, to_schema.time_config) + queried = queried.sort_values(by=["id", "timestamp"]).reset_index(drop=True)[df.columns] + assert not queried.equals(df) + check_dataframes(df, queried, from_schema, to_schema) + + +def test_roll_time_using_shift_and_wrap(iter_engines: Engine) -> None: + from_schema = get_datetime_schema(2024, None, TimeIntervalType.PERIOD_ENDING, "from_table") + data = generate_datetime_data(from_schema.time_config) + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2024, None, TimeIntervalType.PERIOD_BEGINNING, "to_table") + + df["rolled"] = roll_time_interval( + df[from_schema.time_config.time_column], + from_schema.time_config.interval_type, + to_schema.time_config.interval_type, + ) + df["rolled2"] = shift_time_interval( + df[from_schema.time_config.time_column], + from_schema.time_config.interval_type, + to_schema.time_config.interval_type, + ) + df["rolled2"] = wrap_timestamps( + df["rolled2"], + data, + ) + assert df["rolled"].equals(df["rolled2"]) + assert set(data) == set(df["rolled"].tolist()) + + +@pytest.mark.parametrize("tzinfo", [ZoneInfo("US/Eastern"), None]) +def test_time_interval_shift( + iter_engines: Engine, + tzinfo: ZoneInfo | None, +) -> None: + from_schema = get_datetime_schema( + 2020, tzinfo, TimeIntervalType.PERIOD_BEGINNING, "from_table" + ) + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2020, tzinfo, TimeIntervalType.PERIOD_ENDING, "to_table") + + error = () + run_test(iter_engines, df, from_schema, to_schema, error) + + +def test_instantaneous_interval_type( + iter_engines: Engine, +) -> None: + from_schema = get_datetime_schema(2020, None, TimeIntervalType.INSTANTANEOUS, "from_table") + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2020, None, TimeIntervalType.PERIOD_ENDING, "to_table") + error = (InvalidParameter, "Cannot handle") + run_test(iter_engines, df, from_schema, to_schema, error) + + +def test_schema_compatibility( + iter_engines: Engine, +) -> None: + from_schema = get_datetime_schema(2020, None, TimeIntervalType.PERIOD_BEGINNING, "from_table") + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2020, None, TimeIntervalType.PERIOD_ENDING, "to_table") + to_schema.time_array_id_columns += ["extra_column"] + error = (ConflictingInputsError, ".* cannot produce the columns") + run_test(iter_engines, df, from_schema, to_schema, error) + + +def test_measurement_type_consistency( + iter_engines: Engine, +) -> None: + from_schema = get_datetime_schema(2020, None, TimeIntervalType.PERIOD_BEGINNING, "from_table") + df = generate_datetime_dataframe(from_schema) + to_schema = get_datetime_schema(2020, None, TimeIntervalType.PERIOD_ENDING, "to_table") + to_schema.time_config.measurement_type = MeasurementType.MAX + error = (ConflictingInputsError, "Inconsistent measurement_types") + run_test(iter_engines, df, from_schema, to_schema, error) diff --git a/tests/test_mapper_representative_time_to_datetime.py b/tests/test_mapper_representative_time_to_datetime.py index c6a9fbc..cb5322f 100644 --- a/tests/test_mapper_representative_time_to_datetime.py +++ b/tests/test_mapper_representative_time_to_datetime.py @@ -57,7 +57,7 @@ def run_test( metadata = MetaData() with engine.connect() as conn: write_database( - df, conn, from_schema.name, from_schema.time_config, if_table_exists="replace" + df, conn, from_schema.name, [from_schema.time_config], if_table_exists="replace" ) conn.commit() metadata.reflect(engine, views=True) diff --git a/tests/test_models.py b/tests/test_models.py index 927c023..18a05c3 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -4,7 +4,7 @@ from chronify.models import ColumnDType, _check_name -def test_column_dtypes(): +def test_column_dtypes() -> None: ColumnDType(name="col1", dtype=Integer()) for dtype in (BigInteger, Boolean, DateTime, Double, String): ColumnDType(name="col1", dtype=dtype()) @@ -16,6 +16,6 @@ def test_column_dtypes(): ColumnDType(name="col1", dtype="invalid") -def test_invalid_column_name(): +def test_invalid_column_name() -> None: with pytest.raises(ValueError): _check_name(name="invalid - name") diff --git a/tests/test_time_series_checker.py b/tests/test_time_series_checker.py index 39c735c..13f0b8c 100644 --- a/tests/test_time_series_checker.py +++ b/tests/test_time_series_checker.py @@ -78,7 +78,7 @@ def _run_test( value_column="value", ) with engine.connect() as conn: - write_database(df, conn, schema.name, schema.time_config, if_table_exists="replace") + write_database(df, conn, schema.name, [schema.time_config], if_table_exists="replace") conn.commit() metadata.reflect(engine)