Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Agrégat brut IRVE #4397

Draft
wants to merge 63 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
2991ace
Add explorer for data frames
thbar Oct 29, 2024
543a715
Create data-frame.exs
thbar Oct 29, 2024
e3bcb39
Create schema-irve-statique.json
thbar Oct 29, 2024
dd99d43
Leverage IRVE schema to build Explorer dtypes
thbar Oct 29, 2024
c9e0633
Merge branch 'master' into irve-dataframe
thbar Nov 4, 2024
1f2849d
Add static IRVE factory
thbar Nov 4, 2024
a5d3888
Promote IRVE DataFrame to app code
thbar Nov 4, 2024
f0569a6
Save WIP tests
thbar Nov 4, 2024
d31f3ea
Fix broken test
thbar Nov 4, 2024
dcd590d
Split independent modules, add tests
thbar Nov 4, 2024
3a6f583
Mix format
thbar Nov 4, 2024
aea430e
Merge branch 'master' into irve-dataframe
thbar Nov 4, 2024
af7a11b
Remove bogus characters
thbar Nov 4, 2024
48b0656
Merge branch 'master' into irve-dataframe
thbar Nov 29, 2024
f1a9bcf
Add @moduledoc (credo)
thbar Nov 29, 2024
975c661
Document field parsing typing via DocTests
thbar Nov 29, 2024
3988269
Test & document behaviour on unspecified fields
thbar Nov 29, 2024
1409c5c
Update doc
thbar Nov 29, 2024
7f0cef1
Reformat for clarity
thbar Nov 29, 2024
a35805c
Merge branch 'master' into irve-dataframe
thbar Nov 29, 2024
c744aa7
Fix credits
thbar Nov 30, 2024
85a27a0
Improve code
thbar Nov 30, 2024
17e3910
Rename for improved clarity
thbar Nov 30, 2024
1c89d3b
Restructure as module
thbar Dec 1, 2024
ab0b2fa
Merge branch 'master' into irve-packing
thbar Dec 12, 2024
0f97e4e
Merge branch 'master' into irve-packing
thbar Dec 16, 2024
80c1e05
Fix incorrect mapping (number is float)
thbar Dec 17, 2024
50ab453
Update data-frame.exs
thbar Dec 17, 2024
680663d
Implement lax/strict
thbar Dec 18, 2024
1a64001
Bubble up organisation id (to filter out data gouv)
thbar Dec 18, 2024
0d7ca66
Fix doctests
thbar Dec 18, 2024
5f1e49b
Add probe to filter out incorrect content
thbar Dec 18, 2024
1696090
Adapt code - eat the data, but in "lax mode"
thbar Dec 18, 2024
5da4682
Raise a clear error if data appears to be not in the correct format
thbar Dec 18, 2024
c37dda0
Exclude data-gouv consolidation (which we are rewriting here)
thbar Dec 18, 2024
79e2501
Dump file to disk
thbar Dec 18, 2024
124da7d
Add code to unpack coordinates
thbar Dec 18, 2024
bfaa361
Avoid errors being inspect'ed
thbar Dec 18, 2024
f44da21
Add probe for v1 of schema
thbar Dec 18, 2024
5b3772c
Raise for non-utf8, v1, and important column missing
thbar Dec 18, 2024
5c82810
Stop inspecting
thbar Dec 18, 2024
f0e3f48
Introduce structure for reporting
thbar Dec 18, 2024
67f3a82
Track down which resources we are processing
thbar Dec 18, 2024
278e7da
Comment this for now
thbar Dec 18, 2024
c04a732
Preprocess data to extract x/y
thbar Dec 18, 2024
c21e871
Avoid getting nil if there are spaces
thbar Dec 18, 2024
cb0ea45
Fix lat/lon for 22k points
thbar Dec 18, 2024
75d4436
Add a way to discover where nils are
thbar Dec 18, 2024
e91fea7
Increase precision to avoid loss
thbar Dec 19, 2024
7b5cfd8
Implement "out-of-process" boolean mapping for now
thbar Dec 19, 2024
7eb1a4c
Remove obsolete stuff
thbar Dec 19, 2024
991d960
DRY code
thbar Dec 19, 2024
b9c1572
Use safer map access
thbar Dec 19, 2024
6af0163
Safe map access, mix format
thbar Dec 19, 2024
6cfeb5d
Safer map access
thbar Dec 19, 2024
64d2b32
Mix format
thbar Dec 19, 2024
3f7eeb0
Only log specific errors here
thbar Dec 19, 2024
d810319
Map most fields
thbar Dec 19, 2024
ad43ef7
Add more todos
thbar Dec 19, 2024
b161bff
Serialize report as well
thbar Dec 20, 2024
951931b
Update .gitignore
thbar Dec 20, 2024
768b506
Fix serialization issue
thbar Dec 20, 2024
4153633
Merge branch 'master' into irve-packing
thbar Jan 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,5 @@ livebook/cache-dir
apps/transport/priv/repo/structure.sql

pg.list
consolidation.csv
report.csv
101 changes: 95 additions & 6 deletions apps/transport/lib/irve/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Transport.IRVE.DataFrame do
@moduledoc """
Tooling supporting the parsing of an IRVE static file into `Explorer.DataFrame`
"""
require Explorer.DataFrame

@doc """
Helper function to convert TableSchema types into DataFrame ones.
Expand All @@ -12,14 +13,23 @@ defmodule Transport.IRVE.DataFrame do
iex> Transport.IRVE.DataFrame.remap_schema_type(:geopoint)
:string
iex> Transport.IRVE.DataFrame.remap_schema_type(:number)
{:u, 16}
{:f, 32}
iex> Transport.IRVE.DataFrame.remap_schema_type(:literally_anything)
:literally_anything
"""
def remap_schema_type(input_type) do
def remap_schema_type(input_type, strict \\ true)

def remap_schema_type(input_type, true) do
case input_type do
:geopoint -> :string
:number -> {:u, 16}
:number -> {:f, 64}
type -> type
end
end

def remap_schema_type(input_type, false) do
case remap_schema_type(input_type, true) do
:boolean -> :string
type -> type
end
end
Expand Down Expand Up @@ -81,18 +91,97 @@ defmodule Transport.IRVE.DataFrame do

Congratulations for reading this far.
"""
def dataframe_from_csv_body!(body, schema \\ Transport.IRVE.StaticIRVESchema.schema_content()) do
def dataframe_from_csv_body!(body, schema \\ Transport.IRVE.StaticIRVESchema.schema_content(), strict \\ true) do
dtypes =
schema
|> Map.fetch!("fields")
|> Enum.map(fn %{"name" => name, "type" => type} ->
{
String.to_atom(name),
String.to_atom(type)
|> Transport.IRVE.DataFrame.remap_schema_type()
|> Transport.IRVE.DataFrame.remap_schema_type(strict)
}
end)

Explorer.DataFrame.load_csv!(body, dtypes: dtypes)
# to be tested - do not call `load_csv!` as it will `inspect` the error
case Explorer.DataFrame.load_csv(body, dtypes: dtypes) do
{:ok, df} -> df
{:error, error} -> raise(error)
end
end

@doc """
iex> Explorer.DataFrame.new([%{coordonneesXY: "[47.39,0.80]"}]) |> Transport.IRVE.DataFrame.preprocess_data()
#Explorer.DataFrame<
Polars[1 x 2]
x f64 [47.39]
y f64 [0.8]
>

We must also support cases where there are extra spaces.

iex> Explorer.DataFrame.new([%{coordonneesXY: "[43.958037, 4.764347]"}]) |> Transport.IRVE.DataFrame.preprocess_data()
#Explorer.DataFrame<
Polars[1 x 2]
x f64 [43.958037]
y f64 [4.764347]
>

But wait, there is more. Leading and trailing spaces can also occur.

iex> Explorer.DataFrame.new([%{coordonneesXY: " [6.128405 , 48.658737] "}]) |> Transport.IRVE.DataFrame.preprocess_data()
#Explorer.DataFrame<
Polars[1 x 2]
x f64 [6.128405]
y f64 [48.658737]
>
"""
def preprocess_data(df) do
df
|> Explorer.DataFrame.mutate(coordonneesXY: coordonneesXY |> strip("[] "))
|> Explorer.DataFrame.mutate_with(fn df ->
%{
coords: Explorer.Series.split_into(df[:coordonneesXY], ",", [:x, :y])
}
end)
|> Explorer.DataFrame.unnest(:coords)
# required or we'll get `nil` values
|> Explorer.DataFrame.mutate(x: x |> strip(" "))
|> Explorer.DataFrame.mutate(y: y |> strip(" "))
|> Explorer.DataFrame.mutate_with(fn df ->
[
x: Explorer.Series.cast(df[:x], {:f, 64}),
y: Explorer.Series.cast(df[:y], {:f, 64})
]
end)
|> Explorer.DataFrame.discard(:coordonneesXY)
end

# just what we've needed so far
@boolean_mappings %{
nil => nil,
"" => nil,
"0" => false,
"1" => true,
"TRUE" => true,
"FALSE" => false,
"false" => false,
"true" => true,
"False" => false,
"True" => true
}

# experimental, I think Explorer lacks a feature to allow this operation within Polars.
# For now, using `transform`, which is a costly operation comparatively
# https://hexdocs.pm/explorer/Explorer.DataFrame.html#transform/3
def preprocess_boolean(df, field_name) do
df
|> Explorer.DataFrame.transform([names: [field_name]], fn row ->
%{
(field_name <> "_remapped") => Map.fetch!(@boolean_mappings, row[field_name])
}
end)
|> Explorer.DataFrame.discard(field_name)
|> Explorer.DataFrame.rename(%{(field_name <> "_remapped") => field_name})
end
end
5 changes: 4 additions & 1 deletion apps/transport/lib/irve/extractor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Transport.IRVE.Extractor do

The code fetches datasets, then unpack resources belonging to each dataset.
"""
def resources(pagination_options \\ []) do
def datagouv_resources(pagination_options \\ []) do
@static_irve_datagouv_url
|> Transport.IRVE.Fetcher.pages(pagination_options)
|> Task.async_stream(&process_data_gouv_page/1, on_timeout: :kill_task, max_concurrency: 10)
Expand Down Expand Up @@ -53,6 +53,8 @@ defmodule Transport.IRVE.Extractor do
x
|> Map.put(:dataset_id, fetch_in!(dataset, ["id"]))
|> Map.put(:dataset_title, fetch_in!(dataset, ["title"]))
# a dataset organisation can be nil (in which case an "owner" will be there)
|> Map.put(:dataset_organisation_id, get_in(dataset, ["organization", "id"]) || "???")
|> Map.put(:dataset_organisation_name, get_in(dataset, ["organization", "name"]) || "???")
|> Map.put(:dataset_organisation_url, get_in(dataset, ["organization", "page"]) || "???")
end)
Expand All @@ -69,6 +71,7 @@ defmodule Transport.IRVE.Extractor do
resource_title: fetch_in!(resource, ["title"]),
dataset_id: fetch_in!(resource, [:dataset_id]),
dataset_title: fetch_in!(resource, [:dataset_title]),
dataset_organisation_id: fetch_in!(resource, [:dataset_organisation_id]),
dataset_organisation_name: fetch_in!(resource, [:dataset_organisation_name]),
dataset_organisation_url: fetch_in!(resource, [:dataset_organisation_url]),
valid: get_in(resource, ["extras", "validation-report:valid_resource"]),
Expand Down
2 changes: 1 addition & 1 deletion apps/transport/lib/jobs/analyze_irve_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Transport.Jobs.AnalyzeIRVEJob do
try do
Logger.info("IRVE: starting global analyse...")
send(job_pid, {:progress, 0})
resources = Transport.IRVE.Extractor.resources() |> Enum.into([])
resources = Transport.IRVE.Extractor.datagouv_resources() |> Enum.into([])

count = resources |> length()
Logger.info("IRVE: processing #{count} resources...")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ defmodule Transport.IRVE.ExtractorTest do
}
end)

assert Transport.IRVE.Extractor.resources(page_size: 2) == [
assert Transport.IRVE.Extractor.datagouv_resources(page_size: 2) == [
%{
dataset_id: "the-dataset-id",
dataset_title: "the-dataset-title",
Expand Down
208 changes: 200 additions & 8 deletions scripts/irve/data-frame.exs
Original file line number Diff line number Diff line change
@@ -1,11 +1,203 @@
# https://www.data.gouv.fr/fr/datasets/623ca46c13130c3228abd018/ - Electra dataset (mid-sized)
# https://www.data.gouv.fr/fr/datasets/623ca46c13130c3228abd018/#/resources/e9bb3424-77cd-40ba-8bbd-5a19362d0365
defmodule Demo do
# https://www.data.gouv.fr/fr/datasets/623ca46c13130c3228abd018/ - Electra dataset (mid-sized)
# https://www.data.gouv.fr/fr/datasets/623ca46c13130c3228abd018/#/resources/e9bb3424-77cd-40ba-8bbd-5a19362d0365

sample_url = "https://www.data.gouv.fr/fr/datasets/r/e9bb3424-77cd-40ba-8bbd-5a19362d0365"
@sample_url "https://www.data.gouv.fr/fr/datasets/r/e9bb3424-77cd-40ba-8bbd-5a19362d0365"

# Note: cached in development if you set `irve_consolidation_caching: true` in `dev.secret.exs`
%Req.Response{status: 200, body: body} =
Transport.IRVE.Fetcher.get!(sample_url, compressed: false, decode_body: false)
def show_one() do
# Note: cached in development if you set `irve_consolidation_caching: true` in `dev.secret.exs`
%Req.Response{status: 200, body: body} =
Transport.IRVE.Fetcher.get!(@sample_url, compressed: false, decode_body: false)

Transport.IRVE.DataFrame.dataframe_from_csv_body!(body)
|> IO.inspect(IEx.inspect_opts())
Transport.IRVE.DataFrame.dataframe_from_csv_body!(body)
|> Explorer.DataFrame.select("id_pdc_itinerance")
|> IO.inspect(IEx.inspect_opts())
end

def first_line(body) do
body
|> String.split("\n", parts: 2)
|> hd()
end

@doc """
A quick probe to evaluate if a content is likely to be "modern" schema-irve-statique data
"""
def has_id_pdc_itinerance(body) do
body
|> first_line()
|> String.contains?("id_pdc_itinerance")
end

@doc """
Relying on a field that was in v1 of the schema, and not in v2, try to hint about old files.

See https://github.com/etalab/schema-irve/compare/v1.0.3...v2.0.0#diff-9fcde326d127f74194f70e563bdf2c118c51b719c308f015b8eb0204a9a552fbL72
"""
def probably_v1_schema(body) do
data = body |> first_line()

# NOTE: do not use `n_amenageur`, because it will match in both v1 and v2 due to `siren_amenageur`
!String.contains?(data, "nom_operateur") && String.contains?(data, "n_operateur")
end

def process_one(row) do
try do
%{status: 200, body: body} = Transport.IRVE.Fetcher.get!(row.url, compressed: false, decode_body: false)

# My assumptions on this method are being checked
if !String.valid?(body) do
raise("string is not valid (likely utf-8 instead of latin1)")
end

if probably_v1_schema(body) do
raise("looks like a v1 irve")
end

if !has_id_pdc_itinerance(body) do
raise("content has no id_pdc_itinerance in first line")
end

df =
# TODO: preprocess/conform booleans (`0` -> `FALSE`) so that we can use `strict = true`
# TODO: be smooth about `cable_t2_attache` - only added in v2.1.0 (https://github.com/etalab/schema-irve/releases/tag/v2.1.0)
# and often not provided
Transport.IRVE.DataFrame.dataframe_from_csv_body!(body, Transport.IRVE.StaticIRVESchema.schema_content(), _strict = false)
# TODO: rename accordingly
|> Transport.IRVE.DataFrame.preprocess_data()
# TODO: loop programmatically
|> Transport.IRVE.DataFrame.preprocess_boolean("prise_type_ef")
|> Transport.IRVE.DataFrame.preprocess_boolean("prise_type_2")
|> Transport.IRVE.DataFrame.preprocess_boolean("prise_type_combo_ccs")
|> Transport.IRVE.DataFrame.preprocess_boolean("prise_type_chademo")
|> Transport.IRVE.DataFrame.preprocess_boolean("prise_type_autre")
|> Transport.IRVE.DataFrame.preprocess_boolean("gratuit")
|> Transport.IRVE.DataFrame.preprocess_boolean("paiement_acte")
|> Transport.IRVE.DataFrame.preprocess_boolean("paiement_cb")
|> Transport.IRVE.DataFrame.preprocess_boolean("paiement_autre")
|> Transport.IRVE.DataFrame.preprocess_boolean("reservation")
|> Transport.IRVE.DataFrame.preprocess_boolean("station_deux_roues")
|> Explorer.DataFrame.select(
[
"nom_amenageur",
"siren_amenageur",
"contact_amenageur",
"nom_operateur",
"contact_operateur",
"telephone_operateur",
"nom_enseigne",
"id_station_itinerance",
"id_station_local",
"nom_station",
"implantation_station",
"adresse_station",
"code_insee_commune",
# "coordonneesXY",
"nbre_pdc",
"id_pdc_itinerance",
"id_pdc_local",
"puissance_nominale",
"prise_type_ef",
"prise_type_2",
"prise_type_combo_ccs",
"prise_type_chademo",
"prise_type_autre",
"gratuit",
"paiement_acte",
"paiement_cb",
"paiement_autre",
"tarification",
"condition_acces",
"reservation",
"horaires",
"accessibilite_pmr",
"restriction_gabarit",
"station_deux_roues",
"raccordement",
"num_pdl",
"date_mise_en_service",
"observations",
"date_maj",
# "cable_t2_attache",
# extracted
"x",
"y"
]
)

nil_counts = Explorer.DataFrame.nil_count(df)
nil_counts = {
Explorer.Series.at(nil_counts[:id_pdc_itinerance], 0),
Explorer.Series.at(nil_counts[:x], 0),
Explorer.Series.at(nil_counts[:y], 0)
}
unless nil_counts == {0, 0, 0} do
IO.puts(row.url)
IO.inspect(nil_counts, IEx.inspect_opts())
end

{:ok, df}
rescue
error ->
if String.contains?(error |> inspect, "KeyError") do
IO.inspect %{error: error, row: row}, IEx.inspect_opts
end
{:error, error}
end
end

def concat_rows(nil, df), do: df
def concat_rows(main_df, df), do: Explorer.DataFrame.concat_rows(main_df, df)

defmodule ReportItem do
@enforce_keys [:dataset_id, :resource_id, :resource_url]
defstruct [:dataset_id, :resource_id, :resource_url, :error]
end

def show_more() do
output = Transport.IRVE.Extractor.datagouv_resources()
# exclude data gouv generated consolidation
|> Enum.reject(fn r -> r.dataset_organisation_id == "646b7187b50b2a93b1ae3d45" end)
|> Enum.sort_by(fn r -> [r.dataset_id, r.resource_id] end)
# |> Stream.drop(1001)
# |> Stream.take(10)
# |> Enum.filter(&(&1.resource_id == "cbd64933-26df-4ab5-b9e8-104f9af9a16c"))
|> Enum.reduce(%{df: nil, report: []}, fn row, %{df: main_df, report: report} ->
{main_df, error} =
case process_one(row) do
{:ok, df} -> {concat_rows(main_df, df), nil}
{:error, error} -> {main_df, error}
end

description = %ReportItem{
dataset_id: row.dataset_id,
resource_id: row.resource_id,
resource_url: row.url,
error: error
}

%{
df: main_df,
report: [description | report]
}
end)

output.df
|> Explorer.DataFrame.to_csv!("consolidation.csv")

output.report
|> Enum.reverse()
|> Enum.map(&Map.from_struct/1)
|> Enum.map(fn(x) -> Map.put(x, :error, x.error |> inspect) end)
|> Explorer.DataFrame.new()
|> Explorer.DataFrame.to_csv!("report.csv")
end
end

# IO.puts("========== just one sample ==========")

# Demo.show_one()

# IO.puts("========== go further ==========")

Demo.show_more()
Loading
Loading