diff --git a/lib/concentrate/group_filter/cancelled_trip.ex b/lib/concentrate/group_filter/cancelled_trip.ex index bafdbe51..374284aa 100644 --- a/lib/concentrate/group_filter/cancelled_trip.ex +++ b/lib/concentrate/group_filter/cancelled_trip.ex @@ -4,12 +4,17 @@ defmodule Concentrate.GroupFilter.CancelledTrip do """ @behaviour Concentrate.GroupFilter alias Concentrate.Filter.Alert.CancelledTrips + alias Concentrate.GTFS.Routes alias Concentrate.{StopTimeUpdate, TripDescriptor} @impl Concentrate.GroupFilter - def filter(trip_group, module \\ CancelledTrips) + def filter(trip_group, module \\ CancelledTrips, routes_module \\ Routes) - def filter({%TripDescriptor{} = td, _vps, [stu | _] = stop_time_updates} = group, module) do + def filter( + {%TripDescriptor{} = td, _vps, [stu | _] = stop_time_updates} = group, + module, + routes_module + ) do trip_id = TripDescriptor.trip_id(td) route_id = TripDescriptor.route_id(td) time = StopTimeUpdate.time(stu) @@ -18,7 +23,7 @@ defmodule Concentrate.GroupFilter.CancelledTrip do TripDescriptor.schedule_relationship(td) == :CANCELED -> cancel_group(group) - Enum.all?(stop_time_updates, &StopTimeUpdate.skipped?(&1)) -> + bus_block_waiver?(stop_time_updates, routes_module.route_type(route_id)) -> cancel_group(group) is_nil(time) -> @@ -35,7 +40,13 @@ defmodule Concentrate.GroupFilter.CancelledTrip do end end - def filter(other, _module), do: other + def filter(other, _module, _trips_module), do: other + + defp bus_block_waiver?(stop_time_updates, 3) do + Enum.all?(stop_time_updates, &StopTimeUpdate.skipped?(&1)) + end + + defp bus_block_waiver?(_, _), do: false defp cancel_group({td, vps, stus}) do td = TripDescriptor.cancel(td) diff --git a/lib/concentrate/gtfs/routes.ex b/lib/concentrate/gtfs/routes.ex new file mode 100644 index 00000000..9d6426cc --- /dev/null +++ b/lib/concentrate/gtfs/routes.ex @@ -0,0 +1,48 @@ +defmodule Concentrate.GTFS.Routes do + @moduledoc """ + Server which maintains a list of route_id -> route_type mappings. + """ + use GenStage + require Logger + import :binary, only: [copy: 1] + @table __MODULE__ + + def start_link(opts) do + GenStage.start_link(__MODULE__, opts, name: __MODULE__) + end + + def route_type(route_id) do + hd(:ets.lookup_element(@table, route_id, 2)) + rescue + ArgumentError -> nil + end + + def init(opts) do + @table = :ets.new(@table, [:named_table, :public, :duplicate_bag]) + {:consumer, %{}, opts} + end + + def handle_events(events, _from, state) do + inserts = + for event <- events, + {"routes.txt", route_body} <- event, + lines = String.split(route_body, "\n"), + {:ok, row} <- CSV.decode(lines, headers: true) do + {copy(row["route_id"]), String.to_integer(row["route_type"])} + end + + _ = + if inserts == [] do + :ok + else + true = :ets.delete_all_objects(@table) + :ets.insert(@table, inserts) + + Logger.info(fn -> + "#{__MODULE__}: updated with #{length(inserts)} records" + end) + end + + {:noreply, [], state, :hibernate} + end +end diff --git a/test/concentrate/group_filter/cancelled_trip_test.exs b/test/concentrate/group_filter/cancelled_trip_test.exs index e3c4bcfa..78bdd00a 100644 --- a/test/concentrate/group_filter/cancelled_trip_test.exs +++ b/test/concentrate/group_filter/cancelled_trip_test.exs @@ -5,6 +5,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do alias Concentrate.{StopTimeUpdate, TripDescriptor} @module Concentrate.Filter.FakeCancelledTrips + @fake_routes_module Concentrate.GTFS.FakeRoutes describe "filter/2" do test "cancels the group if the trip is cancelled by an alert" do @@ -21,7 +22,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu]} - {new_td, [], [new_stu]} = filter(group, @module) + {new_td, [], [new_stu]} = filter(group, @module, @fake_routes_module) assert TripDescriptor.schedule_relationship(new_td) == :CANCELED assert StopTimeUpdate.schedule_relationship(new_stu) == :SKIPPED end @@ -41,7 +42,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu]} - {new_td, [], [new_stu]} = filter(group, @module) + {new_td, [], [new_stu]} = filter(group, @module, @fake_routes_module) assert TripDescriptor.schedule_relationship(new_td) == :CANCELED assert StopTimeUpdate.schedule_relationship(new_stu) == :SKIPPED end @@ -61,13 +62,14 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu]} - {_td, [], [stu]} = filter(group, @module) + {_td, [], [stu]} = filter(group, @module, @fake_routes_module) assert StopTimeUpdate.schedule_relationship(stu) == :SKIPPED end test "cancels the group if all stop updates have already been given a skipped status" do td = TripDescriptor.new( + route_id: "1", trip_id: "trip", start_date: {1970, 1, 2} ) @@ -81,15 +83,39 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu, stu]} - {td_actual, [], [stu_actual1, stu_actual2]} = filter(group, @module) + {td_actual, [], [stu_actual1, stu_actual2]} = filter(group, @module, @fake_routes_module) assert TripDescriptor.schedule_relationship(td_actual) == :CANCELED assert StopTimeUpdate.schedule_relationship(stu_actual1) == :SKIPPED assert StopTimeUpdate.schedule_relationship(stu_actual2) == :SKIPPED end + test "does not cancel the group if all stop updates have already been given a skipped status but route_type is not 3" do + td = + TripDescriptor.new( + route_id: "Red", + trip_id: "red_trip", + start_date: {1970, 1, 2} + ) + + stu = + StopTimeUpdate.new( + trip_id: "red_trip", + status: :SCHEDULED, + arrival_time: 87_000, + schedule_relationship: :SKIPPED + ) + + group = {td, [], [stu, stu]} + {td_actual, [], [stu_actual1, stu_actual2]} = filter(group, @module, @fake_routes_module) + assert TripDescriptor.schedule_relationship(td_actual) == :SCHEDULED + assert StopTimeUpdate.schedule_relationship(stu_actual1) == :SKIPPED + assert StopTimeUpdate.schedule_relationship(stu_actual2) == :SKIPPED + end + test "does not cancel the group if only some stop updates have already been given a skipped schedule relationship" do td = TripDescriptor.new( + route_id: "86", trip_id: "trip", start_date: {1970, 1, 2} ) @@ -109,7 +135,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu1, stu2]} - {td_actual, [], [stu_actual1, stu_actual2]} = filter(group, @module) + {td_actual, [], [stu_actual1, stu_actual2]} = filter(group, @module, @fake_routes_module) assert TripDescriptor.schedule_relationship(td_actual) == :SCHEDULED assert StopTimeUpdate.schedule_relationship(stu_actual1) == :SKIPPED assert StopTimeUpdate.schedule_relationship(stu_actual2) == :SCHEDULED @@ -129,7 +155,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu]} - assert filter(group, @module) == group + assert filter(group, @module, @fake_routes_module) == group end test "does not cancel the group if the route was cancelled at a different time" do @@ -147,7 +173,7 @@ defmodule Concentrate.GroupFilter.CancelledTripTest do ) group = {td, [], [stu]} - assert filter(group, @module) == group + assert filter(group, @module, @fake_routes_module) == group end test "other values are returned as-is" do diff --git a/test/concentrate/gtfs/routes_test.exs b/test/concentrate/gtfs/routes_test.exs new file mode 100644 index 00000000..357982bc --- /dev/null +++ b/test/concentrate/gtfs/routes_test.exs @@ -0,0 +1,37 @@ +defmodule Concentrate.GTFS.RoutesTest do + @moduledoc false + use ExUnit.Case + import Concentrate.GTFS.Routes + + @body """ + route_id,route_type + route_1,0 + route_2,1 + route_3,2 + """ + + defp supervised(_) do + start_supervised(Concentrate.GTFS.Routes) + event = [{"routes.txt", @body}] + # relies on being able to update the table from a different process + handle_events([event], :ignored, :ignored) + :ok + end + + describe "route_type/1" do + setup :supervised + + test "returns the route_type for the given route_id" do + assert route_type("route_1") == 0 + assert route_type("route_2") == 1 + assert route_type("route_3") == 2 + assert route_type("unknown") == nil + end + end + + describe "missing ETS table" do + test "route_type/1 returns nil" do + assert route_type("route_1") == nil + end + end +end diff --git a/test/support/filter/fakes.ex b/test/support/filter/fakes.ex index e0d17526..4937dd7a 100644 --- a/test/support/filter/fakes.ex +++ b/test/support/filter/fakes.ex @@ -7,6 +7,18 @@ defmodule Concentrate.GTFS.FakeTrips do def direction_id(_), do: nil end +defmodule Concentrate.GTFS.FakeRoutes do + @moduledoc "Fake implementation of GTFS.Routes" + def route_type(route_id) when is_binary(route_id) do + case Integer.parse(route_id) do + :error -> nil + _ -> 3 + end + end + + def route_type(_), do: nil +end + defmodule Concentrate.Filter.FakeCancelledTrips do @moduledoc "Fake implementation of Filter.Alerts.CancelledTrips" def route_cancelled?("route", {1970, 1, 2}) do