Skip to content

Commit

Permalink
refactor(oonimeasurements): redesign measurements service (#895)
Browse files Browse the repository at this point in the history
* init measurements refactor

* refactor: measurement partial

* Remove the SQLAlchemy models for fastpath tables

* Align the queries to be closer to original implementation

* Rename migration files so that they are applied in correct order

* Fix typing of measurement_uid

* Add more debug info when migrations fail

* Start fixing some of the tests

* Use random port for clickhouse

* reformat

* More progress on fixing broken tests

* Fix all the tests

* Remove duplicate test_measurements from tests

---------

Co-authored-by: Arturo Filastò <[email protected]>
  • Loading branch information
DecFox and hellais committed Jan 8, 2025
1 parent b208992 commit d2d3dc4
Show file tree
Hide file tree
Showing 17 changed files with 643 additions and 1,119 deletions.
4 changes: 4 additions & 0 deletions ooniapi/common/src/common/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from clickhouse_sqlalchemy import get_declarative_base


Base = get_declarative_base()
1 change: 1 addition & 0 deletions ooniapi/common/src/common/routers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date, datetime
from typing import Union
from pydantic import BaseModel as PydandicBaseModel
from pydantic import ConfigDict

Expand Down
2 changes: 2 additions & 0 deletions ooniapi/common/src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def commasplit(p: str) -> List[str]:
def convert_to_csv(r) -> str:
"""Convert aggregation result dict/list to CSV"""
csvf = StringIO()
if len(r) == 0:
return ""
if isinstance(r, dict):
# 0-dimensional data
fieldnames = sorted(r.keys())
Expand Down
1 change: 1 addition & 0 deletions ooniapi/services/oonimeasurements/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"fastapi ~= 0.108.0",
"psycopg2 ~= 2.9.5",
"clickhouse-driver ~= 0.2.6",
"clickhouse-sqlalchemy ~= 0.3.2",
"sqlalchemy ~= 2.0.27",
"ujson ~= 5.9.0",
"urllib3 ~= 2.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from .common.config import Settings
from .common.dependencies import get_settings


def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]):
db = Clickhouse.from_url(settings.clickhouse_url)
try:
yield db
finally:
finally:
db.disconnect()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from prometheus_fastapi_instrumentator import Instrumentator

from .routers import aggregation, measurements
from .routers.v1 import aggregation
from .routers.v1 import measurements

from .dependencies import get_clickhouse_session
from .common.dependencies import get_settings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""
Aggregation API
The routes are mounted under /api
"""

from datetime import datetime, timedelta, date
from datetime import datetime, timedelta, date, timezone
from typing import List, Any, Dict, Optional, Union
import logging

from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from typing_extensions import Annotated

from clickhouse_driver import Client as ClickhouseClient
Expand All @@ -20,8 +18,8 @@

from oonimeasurements.common.clickhouse_utils import query_click, query_click_one_row
from oonimeasurements.common.utils import jerror, commasplit, convert_to_csv
from ..dependencies import get_clickhouse_session

from ...dependencies import get_clickhouse_session
from ...common.routers import BaseModel

router = APIRouter()

Expand Down Expand Up @@ -116,7 +114,7 @@ class AggregationResult(BaseModel):
failure_count: int
ok_count: int
measurement_count: int
measurement_start_day: Optional[date] = None
measurement_start_day: Optional[str] = None
blocking_type: Optional[str] = None
category_code: Optional[str] = None
domain: Optional[str] = None
Expand All @@ -134,8 +132,9 @@ class MeasurementAggregation(BaseModel):


@router.get(
"/v1/aggregation",
response_model_exclude_none=True
"/v1/aggregation",
response_model_exclude_none=True,
response_model=MeasurementAggregation,
)
async def get_measurements(
response: Response,
Expand Down Expand Up @@ -247,7 +246,9 @@ async def get_measurements(
int(i[2:]) if i.startswith("AS") else i for i in commasplit(probe_asn)
]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid ASN value in parameter probe_asn")
raise HTTPException(
status_code=400, detail="Invalid ASN value in parameter probe_asn"
)

probe_cc_s = []
if probe_cc:
Expand Down Expand Up @@ -342,12 +343,16 @@ async def get_measurements(
group_by: List = []
try:
if axis_x == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_x:
add_axis(axis_x, cols, colnames, group_by)

if axis_y == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_y:
add_axis(axis_y, cols, colnames, group_by)

Expand All @@ -372,7 +377,17 @@ async def get_measurements(

try:
if dimension_cnt > 0:
r: Any = list(query_click(db, query, query_params, query_prio=4))
str_format = "%Y-%m-%d"
if time_grain == "hour":
str_format = "%Y-%m-%dT%H:%M:%SZ"
r: Any = []
for row in query_click(db, query, query_params, query_prio=4):
## Handle the difference in formatting between hourly and daily measurement_start_day
if "measurement_start_day" in row:
row["measurement_start_day"] = row[
"measurement_start_day"
].strftime(str_format)
r.append(row)
else:
r = query_click_one_row(db, query, query_params, query_prio=4)

Expand Down Expand Up @@ -410,7 +425,8 @@ async def get_measurements(
elapsed_seconds=pq.elapsed,
),
result=r,
).model_dump(exclude_none=True)
)

except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=str(e))
Loading

0 comments on commit d2d3dc4

Please sign in to comment.