Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add WS station fetcher #135

Merged
merged 12 commits into from
Apr 22, 2024
Merged
5 changes: 5 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ dependencies {
'org.eclipse.jetty.websocket:websocket-jetty-api:' + jettyVersion,
'org.eclipse.jetty.websocket:websocket-jetty-server:' + jettyVersion,
)

// Websocket client libs
compileOnly 'jakarta.websocket:jakarta.websocket-client-api:2.2.0-M1'
implementation 'org.glassfish.tyrus.bundles:tyrus-standalone-client:2.2.0-M1'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this dependency provide?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It adds implementation for some jakarta classes, I've replaced it with the jersey one to make it more in line with the impl's we are using for our ws server


// Database
implementation('com.h2database:h2:2.2.220')
implementation('org.postgresql:postgresql:42.7.3')
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/telraam/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import telraam.logic.lapper.robust.RobustLapper;
import telraam.logic.positioner.Positioner;
import telraam.logic.positioner.simple.SimplePositioner;
import telraam.station.Fetcher;
import telraam.station.FetcherFactory;
import telraam.station.websocket.WebsocketFetcher;
import telraam.util.AcceptedLapsUtil;
import telraam.websocket.WebSocketConnection;

Expand Down Expand Up @@ -142,9 +143,10 @@ public void run(AppConfiguration configuration, Environment environment) {
positioners.add(new SimplePositioner(this.database));

// Start fetch thread for each station
FetcherFactory fetcherFactory = new FetcherFactory(this.database, lappers, positioners);
StationDAO stationDAO = this.database.onDemand(StationDAO.class);
for (Station station : stationDAO.getAll()) {
new Thread(() -> new Fetcher(this.database, station, lappers, positioners).fetch()).start();
new Thread(() -> fetcherFactory.create(station).fetch()).start();
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/telraam/database/daos/DetectionDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id
@GetGeneratedKeys({"id"})
int insertAll(@BindBean List<Detection> detection);

@SqlBatch("""
INSERT INTO detection (station_id, baton_id, timestamp, rssi, battery, remote_id, uptime_ms, timestamp_ingestion) \
VALUES (:stationId, (SELECT id FROM baton WHERE mac = :batonMac), :timestamp, :rssi, :battery, :remoteId, :uptimeMs, :timestampIngestion)
""")
@GetGeneratedKeys({"id", "baton_id"})
@RegisterBeanMapper(Detection.class)
List<Detection> insertAllWithoutBaton(@BindBean List<Detection> detection, @Bind("batonMac") List<String> batonMac);

@SqlQuery("SELECT * FROM detection WHERE id = :id")
@RegisterBeanMapper(Detection.class)
Optional<Detection> getById(@Bind("id") int id);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/telraam/database/models/Detection.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import java.sql.Timestamp;

@Setter @Getter @NoArgsConstructor
@Setter
@Getter
@NoArgsConstructor
public class Detection {
private Integer id;
private Integer batonId;
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/telraam/database/models/Team.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter @Setter @NoArgsConstructor
import java.util.Objects;

@Getter
@Setter
@NoArgsConstructor
public class Team {
private Integer id;
private String name;
Expand All @@ -19,4 +23,8 @@ public Team(String name, int batonId) {
this.name = name;
this.batonId = batonId;
}

public boolean equals(Team obj) {
return Objects.equals(id, obj.getId());
}
}
35 changes: 17 additions & 18 deletions src/main/java/telraam/logic/positioner/simple/SimplePositioner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,27 @@
import telraam.logic.positioner.PositionSender;
import telraam.logic.positioner.Positioner;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class SimplePositioner implements Positioner {
private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName());
private final int QUEUE_SIZE = 50;
private final int MIN_RSSI = -85;
private final int DEBOUNCE_TIMEOUT = 1;
private boolean debounceScheduled;
private final ScheduledExecutorService scheduler;
private static final Logger logger = Logger.getLogger(SimplePositioner.class.getName());
private final PositionSender positionSender;
private final Map<Integer, Team> batonIdToTeam;
private final Map<Team, CircularQueue<Detection>> teamDetections;
private final Map<Integer, CircularQueue<Detection>> teamDetections;
private final List<Integer> stations;
private final Map<Team, Position> teamPositions;
private final Map<Integer, Position> teamPositions;

public SimplePositioner(Jdbi jdbi) {
this.debounceScheduled = false;
Expand All @@ -45,14 +45,14 @@ public SimplePositioner(Jdbi jdbi) {

TeamDAO teamDAO = jdbi.onDemand(TeamDAO.class);
List<Team> teams = teamDAO.getAll();
for (Team team: teams) {
teamDetections.put(team, new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team, new Position(team.getId()));
for (Team team : teams) {
teamDetections.put(team.getId(), new CircularQueue<>(QUEUE_SIZE));
teamPositions.put(team.getId(), new Position(team.getId()));
}
List<BatonSwitchover> switchovers = jdbi.onDemand(BatonSwitchoverDAO.class).getAll();
switchovers.sort(Comparator.comparing(BatonSwitchover::getTimestamp));

for (BatonSwitchover switchover: switchovers) {
for (BatonSwitchover switchover : switchovers) {
batonIdToTeam.put(switchover.getNewBatonId(), teamDAO.getById(switchover.getTeamId()).get());
}

Expand All @@ -63,13 +63,13 @@ public SimplePositioner(Jdbi jdbi) {

public void calculatePositions() {
logger.info("SimplePositioner: Calculating positions...");
for (Map.Entry<Team, CircularQueue<Detection>> entry: teamDetections.entrySet()) {
for (Map.Entry<Integer, CircularQueue<Detection>> entry : teamDetections.entrySet()) {
List<Detection> detections = teamDetections.get(entry.getKey());
detections.sort(Comparator.comparing(Detection::getTimestamp));

int currentStationRssi = MIN_RSSI;
int currentStationPosition = 0;
for (Detection detection: detections) {
for (Detection detection : detections) {
if (detection.getRssi() > currentStationRssi) {
currentStationRssi = detection.getRssi();
currentStationPosition = detection.getStationId();
Expand All @@ -84,21 +84,20 @@ public void calculatePositions() {
logger.info("SimplePositioner: Done calculating positions");
}

public void handle(Detection detection) {
public synchronized void handle(Detection detection) {
Team team = batonIdToTeam.get(detection.getBatonId());
teamDetections.get(team).add(detection);
teamDetections.get(team.getId()).add(detection);

if (! debounceScheduled) {
if (!debounceScheduled) {
debounceScheduled = true;
scheduler.schedule(() -> {
try {
calculatePositions();
} catch (Exception e) {
logger.severe(e.getMessage());
logger.log(Level.SEVERE, e.getMessage(), e);
}
debounceScheduled = false;
}, DEBOUNCE_TIMEOUT, TimeUnit.SECONDS);
}
}

}
182 changes: 7 additions & 175 deletions src/main/java/telraam/station/Fetcher.java
Original file line number Diff line number Diff line change
@@ -1,182 +1,14 @@
package telraam.station;

import org.jdbi.v3.core.Jdbi;
import telraam.database.daos.BatonDAO;
import telraam.database.daos.DetectionDAO;
import telraam.database.daos.StationDAO;
import telraam.database.models.Baton;
import telraam.database.models.Detection;
import telraam.database.models.Station;
import telraam.logic.lapper.Lapper;
import telraam.logic.positioner.Positioner;
import telraam.station.models.RonnyDetection;
import telraam.station.models.RonnyResponse;

import java.io.IOException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpConnectTimeoutException;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class Fetcher {
private final Set<Lapper> lappers;
private final Set<Positioner> positioners;
private Station station;

private final BatonDAO batonDAO;
private final DetectionDAO detectionDAO;
private final StationDAO stationDAO;

private final HttpClient client = HttpClient.newHttpClient();
private final Logger logger = Logger.getLogger(Fetcher.class.getName());

public interface Fetcher {
//Timeout to wait for before sending the next request after an error.
private final static int ERROR_TIMEOUT_MS = 2000;
int ERROR_TIMEOUT_MS = 2000;
//Timeout for a request to a station.
private final static int REQUEST_TIMEOUT_S = 10;
int REQUEST_TIMEOUT_S = 10;
//Full batch size, if this number of detections is reached, more are probably available immediately.
private final static int FULL_BATCH_SIZE = 1000;
int FULL_BATCH_SIZE = 1000;
//Timeout when result has less than a full batch of detections.
private final static int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds


public Fetcher(Jdbi database, Station station, Set<Lapper> lappers, Set<Positioner> positioners) {
this.batonDAO = database.onDemand(BatonDAO.class);
this.detectionDAO = database.onDemand(DetectionDAO.class);
this.stationDAO = database.onDemand(StationDAO.class);

this.lappers = lappers;
this.positioners = positioners;
this.station = station;
}

public void fetch() {
logger.info("Running Fetcher for station(" + this.station.getId() + ")");
JsonBodyHandler<RonnyResponse> bodyHandler = new JsonBodyHandler<>(RonnyResponse.class);

while (true) {
//Update the station to account for possible changes in the database
this.stationDAO.getById(station.getId()).ifPresentOrElse(
station -> this.station = station,
() -> this.logger.severe("Can't update station from database.")
);

//Get last detection id
int lastDetectionId = 0;
Optional<Detection> lastDetection = detectionDAO.latestDetectionByStationId(this.station.getId());
if (lastDetection.isPresent()) {
lastDetectionId = lastDetection.get().getRemoteId();
}

//Create URL
URI url;
try {
url = new URI(station.getUrl() + "/detections/" + lastDetectionId);
} catch (URISyntaxException ex) {
this.logger.severe(ex.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
continue;
}

//Create request
HttpRequest request;
try {
request = HttpRequest.newBuilder()
.uri(url)
.version(HttpClient.Version.HTTP_1_1)
.timeout(Duration.ofSeconds(Fetcher.REQUEST_TIMEOUT_S))
.build();
} catch (IllegalArgumentException e) {
logger.severe(e.getMessage());
try {
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
} catch (InterruptedException ex) {
logger.severe(ex.getMessage());
}
continue;
}

//Do request
HttpResponse<Supplier<RonnyResponse>> response;
try {
try {
response = this.client.send(request, bodyHandler);
} catch (ConnectException | HttpConnectTimeoutException ex) {
this.logger.severe("Could not connect to " + request.uri());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
} catch (IOException e) {
logger.severe(e.getMessage());
Thread.sleep(Fetcher.ERROR_TIMEOUT_MS);
continue;
}
} catch (InterruptedException e) {
logger.severe(e.getMessage());
continue;
}

//Check response state
if (response.statusCode() != 200) {
this.logger.warning(
"Unexpected status code(" + response.statusCode() + ") when requesting " + url + " for station(" + this.station.getName() + ")"
);
continue;
}

//Fetch all batons and create a map by batonMAC
Map<String, Baton> baton_mac_map = batonDAO.getAll().stream()
.collect(Collectors.toMap(b -> b.getMac().toUpperCase(), Function.identity()));

//Insert detections
List<Detection> new_detections = new ArrayList<>();
List<RonnyDetection> detections = response.body().get().detections;
for (RonnyDetection detection : detections) {
if (baton_mac_map.containsKey(detection.mac.toUpperCase())) {
var baton = baton_mac_map.get(detection.mac.toUpperCase());
new_detections.add(new Detection(
baton.getId(),
station.getId(),
detection.rssi,
detection.battery,
detection.uptimeMs,
detection.id,
new Timestamp((long) (detection.detectionTimestamp * 1000)),
new Timestamp(System.currentTimeMillis())
));
}
}
if (!new_detections.isEmpty()) {
detectionDAO.insertAll(new_detections);
new_detections.forEach((detection) -> {
lappers.forEach((lapper) -> lapper.handle(detection));
positioners.forEach((positioner) -> positioner.handle(detection));
});
}

this.logger.finer("Fetched " + detections.size() + " detections from " + station.getName() + ", Saved " + new_detections.size());
int IDLE_TIMEOUT_MS = 4000; // Wait 4 seconds

//If few detections are retrieved from the station, wait for some time.
if (detections.size() < Fetcher.FULL_BATCH_SIZE) {
try {
Thread.sleep(Fetcher.IDLE_TIMEOUT_MS);
} catch (InterruptedException e) {
logger.severe(e.getMessage());
}
}
}
}
}
void fetch();
}
Loading
Loading