diff --git a/apps/transport/lib/jobs/expiration_admin_producer_notification_job.ex b/apps/transport/lib/jobs/expiration_admin_producer_notification_job.ex deleted file mode 100644 index ad493add7b..0000000000 --- a/apps/transport/lib/jobs/expiration_admin_producer_notification_job.ex +++ /dev/null @@ -1,79 +0,0 @@ -defmodule Transport.Jobs.ExpirationAdminProducerNotificationJob do - @moduledoc """ - This module is in charge of sending notifications to admins and producers when data is outdated. - It is similar to `Transport.Jobs.ExpirationNotificationJob`, dedicated to reusers. - Both could be merged in the future. - """ - - use Oban.Worker, max_attempts: 3, tags: ["notifications"] - import Ecto.Query - - @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} - @expiration_reason Transport.NotificationReason.reason(:expiration) - # If delay < 0, the resource is already expired - @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] - - @impl Oban.Worker - - def perform(%Oban.Job{id: job_id}) do - outdated_data(job_id) - :ok - end - - def outdated_data(job_id) do - for delay <- possible_delays(), - date = Date.add(Date.utc_today(), delay) do - {delay, gtfs_datasets_expiring_on(date)} - end - |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) - |> send_outdated_data_admin_mail() - |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) - end - - @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] - def gtfs_datasets_expiring_on(%Date{} = date) do - DB.Dataset.base_query() - |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) - |> where( - [metadata: m, resource: r], - fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" - ) - |> select([dataset: d, resource: r], {d, r}) - |> distinct(true) - |> DB.Repo.all() - |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) - |> Enum.to_list() - end - - def possible_delays do - @default_outdated_data_delays - |> Enum.uniq() - |> Enum.sort() - end - - # A different email is sent to producers for every delay, containing all datasets expiring on this given delay - @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok - def send_outdated_data_producer_notifications({delay, records}, job_id) do - Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> - @expiration_reason - |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) - |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> - contact - |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) - |> Transport.Mailer.deliver() - - DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) - end) - end) - end - - @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] - defp send_outdated_data_admin_mail([] = _records), do: [] - - defp send_outdated_data_admin_mail(records) do - Transport.AdminNotifier.expiration(records) - |> Transport.Mailer.deliver() - - records - end -end diff --git a/apps/transport/lib/jobs/expiration_notification_job.ex b/apps/transport/lib/jobs/expiration_notification_job.ex index 09f4f0dfce..75dcb27c94 100644 --- a/apps/transport/lib/jobs/expiration_notification_job.ex +++ b/apps/transport/lib/jobs/expiration_notification_job.ex @@ -3,11 +3,10 @@ defmodule Transport.Jobs.ExpirationNotificationJob do This job sends daily digests to reusers about the expiration of their favorited datasets. The expiration delays is the same for all reusers and cannot be customized for now. - It has 2 `perform/1` methods: + It has 3 `perform/1` methods: - a dispatcher one in charge of identifying contacts we should get in touch with today - another in charge of building the daily digest for a specific contact (with only their favorited datasets) - - It is similar to `Transport.Jobs.ExpirationAdminProducerNotificationJob`, dedicated to producers and admins. + - and one to send to admins and producers """ use Oban.Worker, max_attempts: 3, @@ -23,11 +22,23 @@ defmodule Transport.Jobs.ExpirationNotificationJob do import Ecto.Query @notification_reason Transport.NotificationReason.reason(:expiration) + @expiration_reason Transport.NotificationReason.reason(:expiration) + @type delay_and_records :: {integer(), [{DB.Dataset.t(), [DB.Resource.t()]}]} # If delay < 0, the resource is already expired - @default_outdated_data_delays [-30, -7, 0, 7, 14] + @default_outdated_data_delays_reuser [-30, -7, 0, 7, 14] + @default_outdated_data_delays [-90, -60, -30, -45, -15, -7, -3, 0, 7, 14] @impl Oban.Worker - def perform(%Oban.Job{id: job_id, args: %{"contact_id" => contact_id, "digest_date" => digest_date}}) do + + def perform(%Oban.Job{id: job_id, args: %{"role" => "admin_and_producer"}}) do + outdated_data(job_id) + :ok + end + + def perform(%Oban.Job{ + id: job_id, + args: %{"contact_id" => contact_id, "digest_date" => digest_date, "role" => "reuser"} + }) do contact = DB.Repo.get!(DB.Contact, contact_id) subscribed_dataset_ids = subscribed_dataset_ids_for_expiration(contact) @@ -56,10 +67,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do DB.Repo.transaction( fn -> + new(%{"role" => "admin_and_producer"}) |> Oban.insert() + dataset_ids |> contact_ids_subscribed_to_dataset_ids() |> Stream.chunk_every(100) - |> Stream.each(fn contact_ids -> insert_jobs(contact_ids, target_date) end) + |> Stream.each(fn contact_ids -> insert_reuser_jobs(contact_ids, target_date) end) |> Stream.run() end, timeout: :timer.seconds(60) @@ -138,12 +151,12 @@ defmodule Transport.Jobs.ExpirationNotificationJob do def delay_str(-1), do: "périmé depuis hier" def delay_str(d) when d <= -2, do: "périmés depuis #{-d} jours" - defp insert_jobs(contact_ids, %Date{} = target_date) do + defp insert_reuser_jobs(contact_ids, %Date{} = target_date) do # Oban caveat: can't use [insert_all/2](https://hexdocs.pm/oban/Oban.html#insert_all/2): # > Only the Smart Engine in Oban Pro supports bulk unique jobs and automatic batching. # > With the basic engine, you must use insert/3 for unique support. Enum.each(contact_ids, fn contact_id -> - %{"contact_id" => contact_id, "digest_date" => target_date} + %{"contact_id" => contact_id, "digest_date" => target_date, "role" => "reuser"} |> new() |> Oban.insert() end) @@ -172,9 +185,9 @@ defmodule Transport.Jobs.ExpirationNotificationJob do Transport.Cache.fetch( to_string(__MODULE__) <> ":gtfs_expiring_on_target_dates:#{reference_date}", fn -> - delays_and_dates = delays_and_dates(reference_date) - dates_and_delays = Map.new(delays_and_dates, fn {key, value} -> {value, key} end) - expiring_dates = Map.values(delays_and_dates) + delays_and_date_reuser = delays_and_date_reuser(reference_date) + dates_and_delays = Map.new(delays_and_date_reuser, fn {key, value} -> {value, key} end) + expiring_dates = Map.values(delays_and_date_reuser) DB.Dataset.base_query() |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) @@ -201,7 +214,7 @@ defmodule Transport.Jobs.ExpirationNotificationJob do end @doc """ - iex> delays_and_dates(~D[2024-05-21]) + iex> delays_and_date_reuser(~D[2024-05-21]) %{ -30 => ~D[2024-04-21], -7 => ~D[2024-05-14], @@ -210,8 +223,67 @@ defmodule Transport.Jobs.ExpirationNotificationJob do 14 => ~D[2024-06-04] } """ - @spec delays_and_dates(Date.t()) :: %{delay() => Date.t()} - def delays_and_dates(%Date{} = date) do - Map.new(@default_outdated_data_delays, fn delay -> {delay, Date.add(date, delay)} end) + @spec delays_and_date_reuser(Date.t()) :: %{delay() => Date.t()} + def delays_and_date_reuser(%Date{} = date) do + Map.new(@default_outdated_data_delays_reuser, fn delay -> {delay, Date.add(date, delay)} end) + end + + #### HERE CODE FROM ADMIN AND PRODUCER JOB #### + + def outdated_data(job_id) do + for delay <- possible_delays(), + date = Date.add(Date.utc_today(), delay) do + {delay, gtfs_datasets_expiring_on(date)} + end + |> Enum.reject(fn {_, records} -> Enum.empty?(records) end) + |> send_outdated_data_admin_mail() + |> Enum.map(&send_outdated_data_producer_notifications(&1, job_id)) + end + + @spec gtfs_datasets_expiring_on(Date.t()) :: [{DB.Dataset.t(), [DB.Resource.t()]}] + def gtfs_datasets_expiring_on(%Date{} = date) do + DB.Dataset.base_query() + |> DB.Dataset.join_from_dataset_to_metadata(Transport.Validators.GTFSTransport.validator_name()) + |> where( + [metadata: m, resource: r], + fragment("TO_DATE(?->>'end_date', 'YYYY-MM-DD')", m.metadata) == ^date and r.format == "GTFS" + ) + |> select([dataset: d, resource: r], {d, r}) + |> distinct(true) + |> DB.Repo.all() + |> Enum.group_by(fn {%DB.Dataset{} = d, _} -> d end, fn {_, %DB.Resource{} = r} -> r end) + |> Enum.to_list() + end + + def possible_delays do + @default_outdated_data_delays + |> Enum.uniq() + |> Enum.sort() + end + + # A different email is sent to producers for every delay, containing all datasets expiring on this given delay + @spec send_outdated_data_producer_notifications(delay_and_records(), integer()) :: :ok + def send_outdated_data_producer_notifications({delay, records}, job_id) do + Enum.each(records, fn {%DB.Dataset{} = dataset, resources} -> + @expiration_reason + |> DB.NotificationSubscription.subscriptions_for_reason_dataset_and_role(dataset, :producer) + |> Enum.each(fn %DB.NotificationSubscription{contact: %DB.Contact{} = contact} = subscription -> + contact + |> Transport.UserNotifier.expiration_producer(dataset, resources, delay) + |> Transport.Mailer.deliver() + + DB.Notification.insert!(dataset, subscription, %{delay: delay, job_id: job_id}) + end) + end) + end + + @spec send_outdated_data_admin_mail([delay_and_records()]) :: [delay_and_records()] + defp send_outdated_data_admin_mail([] = _records), do: [] + + defp send_outdated_data_admin_mail(records) do + Transport.AdminNotifier.expiration(records) + |> Transport.Mailer.deliver() + + records end end diff --git a/apps/transport/test/transport/jobs/expiration_admin_producer_notification_job_test.exs b/apps/transport/test/transport/jobs/expiration_admin_producer_notification_job_test.exs deleted file mode 100644 index c456bace18..0000000000 --- a/apps/transport/test/transport/jobs/expiration_admin_producer_notification_job_test.exs +++ /dev/null @@ -1,172 +0,0 @@ -defmodule Transport.Test.Transport.Jobs.ExpirationAdminProducerNotificationJobTest do - use ExUnit.Case, async: true - import DB.Factory - import Swoosh.TestAssertions - use Oban.Testing, repo: DB.Repo - - setup do - Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) - end - - test "sends email to our team + relevant contact before expiry" do - %DB.Dataset{id: dataset_id} = - dataset = - insert(:dataset, is_active: true, custom_title: "Dataset custom title", custom_tags: ["loi-climat-resilience"]) - - assert DB.Dataset.climate_resilience_bill?(dataset) - # fake a resource expiring today - %DB.Resource{id: resource_id} = - resource = insert(:resource, dataset: dataset, format: "GTFS", title: resource_title = "Super GTFS") - - multi_validation = - insert(:multi_validation, - validator: Transport.Validators.GTFSTransport.validator_name(), - resource_history: insert(:resource_history, resource: resource) - ) - - insert(:resource_metadata, - multi_validation_id: multi_validation.id, - metadata: %{"end_date" => Date.utc_today()} - ) - - assert [{%DB.Dataset{id: ^dataset_id}, [%DB.Resource{id: ^resource_id}]}] = - Date.utc_today() |> Transport.Jobs.ExpirationAdminProducerNotificationJob.gtfs_datasets_expiring_on() - - %DB.Contact{id: contact_id, email: email} = contact = insert_contact() - - %DB.NotificationSubscription{id: ns_id} = - insert(:notification_subscription, %{ - reason: :expiration, - source: :admin, - role: :producer, - contact_id: contact_id, - dataset_id: dataset.id - }) - - # Should be ignored, this subscription is for a reuser - %DB.Contact{id: reuser_id} = insert_contact() - - insert(:notification_subscription, %{ - reason: :expiration, - source: :user, - role: :reuser, - contact_id: reuser_id, - dataset_id: dataset.id - }) - - assert :ok == perform_job(Transport.Jobs.ExpirationAdminProducerNotificationJob, %{}) - - # a first mail to our team - - assert_email_sent(fn %Swoosh.Email{ - from: {"transport.data.gouv.fr", "contact@transport.data.gouv.fr"}, - to: [{"", "contact@transport.data.gouv.fr"}], - subject: "Jeux de données arrivant à expiration", - text_body: nil, - html_body: body - } -> - assert body =~ ~r/Jeux de données périmant demain :/ - - assert body =~ - ~s|