Skip to content

Commit

Permalink
add roll_time_interval, wrap_timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
lixiliu committed Dec 21, 2024
1 parent 45d5ddb commit 8feb0eb
Show file tree
Hide file tree
Showing 13 changed files with 383 additions and 113 deletions.
4 changes: 4 additions & 0 deletions src/chronify/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class TableSchema(TableSchemaBase):
@classmethod
def check_name(cls, name: str) -> str:
_check_name(name)
if name.lower() == "table":
# Avoid for DuckdB.
msg = f"Table schema cannot use {name=}."
raise ValueError(msg)
return name

@field_validator("value_column")
Expand Down
11 changes: 7 additions & 4 deletions src/chronify/sqlalchemy/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ def write_database(
df: pd.DataFrame,
conn: Connection,
table_name: str,
config: TimeBaseModel,
configs: list[TimeBaseModel],
if_table_exists: DbWriteMode = "append",
) -> None:
"""Write a Pandas DataFrame to the database."""
"""Write a Pandas DataFrame to the database.
configs allows sqlite formatting for more than one datetime columns.
"""
match conn.engine.name:
case "duckdb":
assert conn._dbapi_connection is not None
Expand All @@ -67,8 +69,9 @@ def write_database(
raise InvalidOperation(msg)
conn._dbapi_connection.driver_connection.sql(query)
case "sqlite":
if isinstance(config, DatetimeRange):
df = _convert_database_input_for_datetime(df, config)
for config in configs:
if isinstance(config, DatetimeRange):
df = _convert_database_input_for_datetime(df, config)
pl.DataFrame(df).write_database(
table_name, connection=conn, if_table_exists=if_table_exists
)
Expand Down
2 changes: 1 addition & 1 deletion src/chronify/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ def _ingest_table(
created_table = False

try:
write_database(df, conn, schema.name, schema.time_config)
write_database(df, conn, schema.name, [schema.time_config])
except Exception:
if created_table:
table.drop(self._engine)
Expand Down
13 changes: 7 additions & 6 deletions src/chronify/time_series_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,12 @@ def check_timestamp_lists(actual: list[pd.Timestamp], expected: list[pd.Timestam
match = actual == expected
if not match:
if len(actual) != len(expected):
msg1 = f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}"
raise InvalidTable(msg1)
msg = f"Mismatch number of timestamps: actual: {len(actual)} vs. expected: {len(expected)}\n"
else:
msg = ""
missing = [x for x in expected if x not in actual]
extra = [x for x in actual if x not in expected]
msg2 = "Actual timestamps do not match expected timestamps. \n"
msg2 += f"Missing: {missing} \n"
msg2 += f"Extra: {extra}"
raise InvalidTable(msg2)
msg += "Actual timestamps do not match expected timestamps. \n"
msg += f"Missing: {missing} \n"
msg += f"Extra: {extra}"
raise InvalidTable(msg)
95 changes: 95 additions & 0 deletions src/chronify/time_series_mapper_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,100 @@
import abc
import logging
from functools import reduce
from operator import and_

import pandas as pd
from sqlalchemy import Engine, MetaData, Table, select, text

from chronify.sqlalchemy.functions import write_database
from chronify.models import TableSchema
from chronify.exceptions import TableAlreadyExists
from chronify.utils.sqlalchemy_table import create_table
from chronify.time_series_checker import check_timestamps
from chronify.time_configs import DatetimeRange

logger = logging.getLogger(__name__)


class TimeSeriesMapperBase(abc.ABC):
"""Maps time series data from one configuration to another."""


def apply_mapping(
df_mapping: pd.DataFrame,
mapping_table_name: str,
from_schema: TableSchema,
to_schema: TableSchema,
engine: Engine,
metadata: MetaData,
) -> None:
"""
Apply mapping to create result table with process to clean up and roll back if checks fail
"""
if mapping_table_name in metadata.tables:
msg = (
f"table {mapping_table_name} already exists, delete it or use a different table name."
)
raise TableAlreadyExists(msg)

time_configs = [to_schema.time_config]
if isinstance(from_schema.time_config, DatetimeRange):
from_time_config = from_schema.time_config.model_copy()
from_time_config.time_column = "from_" + from_time_config.time_column
time_configs.append(from_time_config)
try:
with engine.connect() as conn:
write_database(
df_mapping, conn, mapping_table_name, time_configs, if_table_exists="fail"
)
metadata.reflect(engine, views=True)
_apply_mapping(mapping_table_name, from_schema, to_schema, engine, metadata)
mapped_table = Table(to_schema.name, metadata)
try:
check_timestamps(conn, mapped_table, to_schema)
except Exception:
logger.exception(
"check_timestamps failed on mapped table {}. Drop it", to_schema.name
)
conn.rollback()
raise
conn.commit()
finally:
if mapping_table_name in metadata.tables:
with engine.connect() as conn:
conn.execute(text(f"DROP TABLE {mapping_table_name}"))
conn.commit()
metadata.remove(Table(mapping_table_name, metadata))


def _apply_mapping(
mapping_table_name: str,
from_schema: TableSchema,
to_schema: TableSchema,
engine: Engine,
metadata: MetaData,
) -> None:
"""Apply mapping to create result as a table according to_schema
- Columns used to join the from_table are prefixed with "from_" in the mapping table
"""
left_table = Table(from_schema.name, metadata)
right_table = Table(mapping_table_name, metadata)
left_table_columns = [x.name for x in left_table.columns]
right_table_columns = [x.name for x in right_table.columns]

final_cols = set(to_schema.list_columns())
right_cols = set(right_table_columns).intersection(final_cols)
left_cols = final_cols - right_cols

select_stmt = [left_table.c[x] for x in left_cols]
select_stmt += [right_table.c[x] for x in right_cols]

keys = from_schema.time_config.list_time_columns()
# infer the use of time_zone
if "from_time_zone" in right_table_columns:
keys.append("time_zone")
assert "time_zone" in left_table_columns, f"time_zone not in table={from_schema.name}"

on_stmt = reduce(and_, (left_table.c[x] == right_table.c["from_" + x] for x in keys))
query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt)
create_table(to_schema.name, query, engine, metadata)
63 changes: 35 additions & 28 deletions src/chronify/time_series_mapper_datetime.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
import logging

from sqlalchemy import Engine, MetaData, Table
import pandas as pd
from sqlalchemy import Engine, MetaData

from chronify.models import TableSchema
from chronify.exceptions import (
ConflictingInputsError,
InvalidParameter,
)
from chronify.time_series_mapper_base import TimeSeriesMapperBase
from chronify.utils.sqlalchemy_table import create_table
from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping
from chronify.time_configs import DatetimeRange
from chronify.time import TimeIntervalType
from chronify.time_range_generator_factory import make_time_range_generator
from chronify.time_utils import roll_time_interval

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,31 +73,36 @@ def _check_time_resolution(self) -> None:

def map_time(self) -> None:
"""Convert time columns with from_schema to to_schema configuration."""
if self._from_schema == self._to_schema:
msg = "From table schema is the same as to table schema. Nothing to do.\n{self._from_schema}"
logger.info(msg)
return
self.check_schema_consistency()

map_table_name = "mapping_table"
df = self._create_mapping()
apply_mapping(
df, map_table_name, self._from_schema, self._to_schema, self._engine, self._metadata
)
# TODO - add handling for changing resolution

def _create_mapping(self) -> pd.DataFrame:
"""Create mapping dataframe
Handles time interval type
"""
from_time_col = "from_" + self._from_time_config.time_column
to_time_col = self._to_time_config.time_column
to_time_data = make_time_range_generator(self._to_time_config).list_timestamps()
df = pd.DataFrame(
{
from_time_col: make_time_range_generator(self._from_time_config).list_timestamps(),
to_time_col: to_time_data,
}
)
if self._from_time_config.interval_type != self._to_time_config.interval_type:
self._shift_time_interval()
# TODO - add handling for changing resolution

def _shift_time_interval(self) -> None:
table = Table(self._from_schema.name, self._metadata)
time_col = self._from_time_config.time_column
new_time_col = self._to_time_config.time_column
if new_time_col == time_col:
other_cols = [x.name for x in table.columns if x != time_col]
else:
other_cols = table.columns

match (self._from_time_config.interval_type, self._to_time_config.interval_type):
case (TimeIntervalType.PERIOD_BEGINNING, TimeIntervalType.PERIOD_ENDING):
op = "+"
case (TimeIntervalType.PERIOD_ENDING, TimeIntervalType.PERIOD_BEGINNING):
op = "-"
case _:
msg = f"Cannot handle from {self._from_time_config.interval_type} to {self._to_time_config.interval_type}"
raise InvalidParameter(msg)

delta_sec = self._from_time_config.resolution.seconds
query = f"SELECT {other_cols}, {time_col} {op} INTERVAL {delta_sec} SECONDS AS {new_time_col} FROM {self._from_schema.name}"

create_table(self._to_schema.name, query, self._engine, self._metadata)
df[to_time_col] = roll_time_interval(
df[to_time_col],
self._from_time_config.interval_type,
self._to_time_config.interval_type,
)
return df
88 changes: 19 additions & 69 deletions src/chronify/time_series_mapper_representative.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
import logging
from functools import reduce
from operator import and_

import pandas as pd
from sqlalchemy import Engine, MetaData, Table, select, text
from sqlalchemy import Engine, MetaData, Table, select

from chronify.sqlalchemy.functions import read_database, write_database
from chronify.sqlalchemy.functions import read_database
from chronify.models import TableSchema
from chronify.exceptions import (
MissingParameter,
ConflictingInputsError,
TableAlreadyExists,
InvalidParameter,
)
from chronify.time_range_generator_factory import make_time_range_generator
from chronify.time_series_mapper_base import TimeSeriesMapperBase
from chronify.utils.sqlalchemy_table import create_table
from chronify.time_series_mapper_base import TimeSeriesMapperBase, apply_mapping
from chronify.representative_time_range_generator import RepresentativePeriodTimeGenerator
from chronify.time_series_checker import check_timestamps
from chronify.time_configs import DatetimeRange, RepresentativePeriodTime
from chronify.time_utils import shift_time_interval
from chronify.time import TimeIntervalType
Expand Down Expand Up @@ -88,41 +83,16 @@ def map_time(self) -> None:
if not is_tz_naive:
self._check_source_table_has_time_zone()

map_table_name = "map_table"
dfm = self._create_mapping(is_tz_naive)
if map_table_name in self._metadata.tables:
msg = (
f"table {map_table_name} already exists, delete it or use a different table name."
)
raise TableAlreadyExists(msg)

try:
with self._engine.connect() as conn:
write_database(
dfm, conn, map_table_name, self._to_time_config, if_table_exists="fail"
)
self._metadata.reflect(self._engine, views=True)
self._apply_mapping(map_table_name)
mapped_table = Table(self._to_schema.name, self._metadata)
try:
check_timestamps(conn, mapped_table, self._to_schema)
except Exception:
logger.exception(
"check_timestamps failed on mapped table {}. Drop it", self._to_schema.name
)
conn.rollback()
raise
conn.commit()
finally:
if map_table_name in self._metadata.tables:
with self._engine.connect() as conn:
conn.execute(text(f"DROP TABLE {map_table_name}"))
conn.commit()
self._metadata.remove(Table(map_table_name, self._metadata))
map_table_name = "mapping_table"
df = self._create_mapping(is_tz_naive)
apply_mapping(
df, map_table_name, self._from_schema, self._to_schema, self._engine, self._metadata
)

def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame:
"""Create mapping dataframe
Handles time interval type
- Handles time interval type adjustment
- Columns used to join the from_table are prefixed with "from_"
"""
timestamp_generator = make_time_range_generator(self._to_time_config)

Expand All @@ -131,6 +101,8 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame:

if self._from_time_config.interval_type != self._to_time_config.interval_type:
time_col = "mapping_" + to_time_col
# mapping works backward for representative time
# shift is correct here for extracting time info
dft[time_col] = shift_time_interval(
dft[to_time_col],
self._to_time_config.interval_type,
Expand All @@ -139,8 +111,9 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame:
else:
time_col = to_time_col

from_columns = self._from_time_config.list_time_columns()
if is_tz_naive:
dfm = self._generator.create_tz_naive_mapping_dataframe(dft, time_col)
df = self._generator.create_tz_naive_mapping_dataframe(dft, time_col)
else:
with self._engine.connect() as conn:
table = Table(self._from_schema.name, self._metadata)
Expand All @@ -152,31 +125,8 @@ def _create_mapping(self, is_tz_naive: bool) -> pd.DataFrame:
time_zones = read_database(stmt, conn, self._from_time_config)[
"time_zone"
].to_list()
dfm = self._generator.create_tz_aware_mapping_dataframe(dft, time_col, time_zones)
return dfm

def _apply_mapping(self, map_table_name: str) -> None:
"""Apply mapping to create result as a table according to_schema"""
left_table = Table(self._from_schema.name, self._metadata)
right_table = Table(map_table_name, self._metadata)
left_table_columns = [x.name for x in left_table.columns]
right_table_columns = [x.name for x in right_table.columns]

final_cols = set(self._to_schema.list_columns())
left_cols = set(left_table_columns).intersection(final_cols)
right_cols = final_cols - left_cols

select_stmt = [left_table.c[x] for x in left_cols]
select_stmt += [right_table.c[x] for x in right_cols]

keys = self._from_time_config.list_time_columns()
if not self._to_time_config.is_time_zone_naive():
keys.append("time_zone")
assert (
"time_zone" in left_table_columns
), f"time_zone not in table={self._from_schema.name}"
assert "time_zone" in right_table_columns, f"time_zone not in table={map_table_name}"

on_stmt = reduce(and_, (left_table.c[x] == right_table.c[x] for x in keys))
query = select(*select_stmt).select_from(left_table).join(right_table, on_stmt)
create_table(self._to_schema.name, query, self._engine, self._metadata)
df = self._generator.create_tz_aware_mapping_dataframe(dft, time_col, time_zones)
from_columns.append("time_zone")

df = df.rename(columns={x: "from_" + x for x in from_columns})
return df
Loading

0 comments on commit 8feb0eb

Please sign in to comment.