From 4c7989df195b197b4774014ab9cd3bad590662bd Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:45:54 +0200
Subject: [PATCH 01/16] chore: Fix application class in gradle configs
---
modules/telestion-api/build.gradle | 2 +-
modules/telestion-application/build.gradle | 2 +-
modules/telestion-services/build.gradle | 4 +---
3 files changed, 3 insertions(+), 5 deletions(-)
diff --git a/modules/telestion-api/build.gradle b/modules/telestion-api/build.gradle
index 4a4c82e6..047d2db9 100644
--- a/modules/telestion-api/build.gradle
+++ b/modules/telestion-api/build.gradle
@@ -1,6 +1,6 @@
application {
// Define the main class for the application.
- mainClass.set('de.wuespace.telestion.application.Application')
+ mainClass.set('de.wuespace.telestion.application.Telestion')
}
java {
diff --git a/modules/telestion-application/build.gradle b/modules/telestion-application/build.gradle
index 93360fd1..d3266cb3 100644
--- a/modules/telestion-application/build.gradle
+++ b/modules/telestion-application/build.gradle
@@ -1,6 +1,6 @@
application {
// Define the main class for the application.
- mainClass.set('de.wuespace.telestion.application.Application')
+ mainClass.set('de.wuespace.telestion.application.Telestion')
}
java {
diff --git a/modules/telestion-services/build.gradle b/modules/telestion-services/build.gradle
index 8ca8e652..817fc9f0 100644
--- a/modules/telestion-services/build.gradle
+++ b/modules/telestion-services/build.gradle
@@ -1,6 +1,6 @@
application {
// Define the main class for the application.
- mainClass.set('de.wuespace.telestion.application.Application')
+ mainClass.set('de.wuespace.telestion.application.Telestion')
}
java {
@@ -19,8 +19,6 @@ ext {
description = 'Services and service components that are re-usable in any Telestion Project Application'
dependencies {
- // implementation name: 'engine'
-
implementation project(':modules:telestion-api')
implementation group: 'com.fazecast', name: 'jSerialComm', version: '2.9.1'
From 5fa764a03cf7e9c97ece8bc3b77681b92739fcf3 Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:46:56 +0200
Subject: [PATCH 02/16] feat(application): Delete deprecated Application class
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: Cedric Bös <16071970+cb0s@users.noreply.github.com>
---
.../telestion/application/Application.java | 30 -------------------
1 file changed, 30 deletions(-)
delete mode 100644 modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java
diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java
deleted file mode 100644
index 3f09e9ac..00000000
--- a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package de.wuespace.telestion.application;
-
-//import org.telestion.example.RandomPositionPublisher;
-//import org.telestion.launcher.launcher.Launcher;
-
-/**
- * Starts the Telestion-Project as a standalone Application.
- *
- * @author Jan von Pichowski, Cedric Boes
- * @version 1.0
- */
-public class Application {
-
- /**
- * Calls the Launcher for a specific Testcase (at the moment).
- * Real functionality will be added later.
- *
- * @param args unused at the moment
- */
- public static void main(String[] args) {
- /*Launcher.start(new MessageLogger(), new RandomPositionPublisher(),
- new EventbusTcpBridge("localhost", 9870,
- Collections.singletonList(Address.incoming(DataService.class, "find")),
- Collections.singletonList(Address.outgoing(RandomPositionPublisher.class, "MockPos"))),
- new WebServer(8080),
- new DataService(),
- new MongoDatabaseService("raketenpraktikum", "raketenpraktikumPool"));*/
- }
-
-}
From bc4239335246f7b315fa7f9b50e925853db9f98b Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:48:13 +0200
Subject: [PATCH 03/16] feat(application): Read JSON configuration from
`"de.wuespace.telestion.configuration"` and refactor
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: Cedric Bös <16071970+cb0s@users.noreply.github.com>
---
.../telestion/application/Telestion.java | 33 ++++++++++++-------
1 file changed, 22 insertions(+), 11 deletions(-)
diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java
index 8004a37d..b275e0f5 100644
--- a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java
+++ b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java
@@ -5,7 +5,9 @@
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
+
import java.util.Collections;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import de.wuespace.telestion.services.config.Configuration;
@@ -16,9 +18,6 @@
* @author Jan von Pichowski
*/
public final class Telestion extends AbstractVerticle {
-
- private static final Logger logger = LoggerFactory.getLogger(Telestion.class);
-
/**
* Deploys this Telestion verticle.
*
@@ -38,18 +37,30 @@ public void start(Promise startPromise) {
startPromise.fail(configRes.cause());
return;
}
- var conf = configRes.result().getJsonObject("org.telestion.configuration").mapTo(Configuration.class);
- conf.verticles().stream().flatMap(c -> Collections.nCopies(c.magnitude(), c).stream()).forEach(v -> {
- logger.info("Deploying {}", v.name());
- var future = vertx.deployVerticle(v.verticle(), new DeploymentOptions().setConfig(v.jsonConfig()));
- future.onFailure(Throwable::printStackTrace);
- });
+
+ var conf = configRes
+ .result()
+ .getJsonObject("de.wuespace.telestion.configuration")
+ .mapTo(Configuration.class);
+
+ conf.verticles().stream()
+ .flatMap(c -> Collections.nCopies(c.magnitude(), c).stream()).forEach(v -> {
+ logger.info("Deploying {}", v.name());
+ var future = vertx.deployVerticle(
+ v.verticle(),
+ new DeploymentOptions().setConfig(v.jsonConfig())
+ );
+ future.onFailure(Throwable::printStackTrace);
+ });
+
startPromise.complete();
});
}
@Override
- public void stop(Promise stopPromise) throws Exception {
-
+ public void stop(Promise stopPromise) {
+ stopPromise.complete();
}
+
+ private static final Logger logger = LoggerFactory.getLogger(Telestion.class);
}
From ea4fa773a9b075aa74ef49b4f448991f584ac4d0 Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:49:03 +0200
Subject: [PATCH 04/16] refactor(application): Combine duplicated code in
Launcher class
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: Cedric Bös <16071970+cb0s@users.noreply.github.com>
---
.../wuespace/telestion/launcher/Launcher.java | 58 ++++++++++---------
1 file changed, 32 insertions(+), 26 deletions(-)
diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java
index 72aac043..3ff05278 100644
--- a/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java
+++ b/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java
@@ -1,9 +1,13 @@
package de.wuespace.telestion.launcher;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
+
import java.time.Duration;
import java.util.Arrays;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,19 +40,13 @@ public static void main(String[] args) {
public static void start(String... verticleNames) {
logger.info("Deploying {} verticles", verticleNames.length);
var vertx = Vertx.vertx();
- // vertx.eventBus().registerDefaultCodec(Position.class, JsonMessageCodec.Instance(Position.class));
- Arrays.stream(verticleNames).forEach(verticleName -> {
- logger.info("Deploying verticle {}", verticleName);
- vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), timerId -> {
- vertx.deployVerticle(verticleName, res -> {
- if (res.failed()) {
- logger.error("Failed to deploy verticle {} retrying in 5s", verticleName, res.cause());
- return;
- }
- logger.info("Deployed verticle {} with id {}", verticleName, res.result());
- vertx.cancelTimer(timerId);
- });
- });
+
+ Arrays.stream(verticleNames).forEach(name -> {
+ logger.info("Deploying verticle {}", name);
+ vertx.setPeriodic(
+ Duration.ofSeconds(5).toMillis(),
+ timerId -> vertx.deployVerticle(name, deploymentHandler(vertx, name, timerId))
+ );
});
}
@@ -61,19 +59,27 @@ public static void start(String... verticleNames) {
public static void start(Verticle... verticles) {
logger.info("Deploying {} verticles", verticles.length);
var vertx = Vertx.vertx();
- // vertx.eventBus().registerDefaultCodec(Position.class, JsonMessageCodec.Instance(Position.class));
- Arrays.stream(verticles).forEach(verticleName -> {
- logger.info("Deploying verticle {}", verticleName);
- vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), timerId -> {
- vertx.deployVerticle(verticleName, res -> {
- if (res.failed()) {
- logger.error("Failed to deploy verticle {} retrying in 5s", verticleName, res.cause());
- return;
- }
- logger.info("Deployed verticle {} with id {}", verticleName, res.result());
- vertx.cancelTimer(timerId);
- });
- });
+
+ Arrays.stream(verticles).forEach(instance -> {
+ logger.info("Deploying verticle {}", instance);
+ vertx.setPeriodic(
+ Duration.ofSeconds(5).toMillis(),
+ timerId -> vertx.deployVerticle(
+ instance,
+ deploymentHandler(vertx, instance.getClass().getName(), timerId)
+ )
+ );
});
}
+
+ private static Handler> deploymentHandler(Vertx vertx, String verticle, Long timerId) {
+ return res -> {
+ if (res.failed()) {
+ logger.error("Failed to deploy verticle {} retrying in 5s", verticle, res.cause());
+ return;
+ }
+ logger.info("Deployed verticle {} with id {}", verticle, res.result());
+ vertx.cancelTimer(timerId);
+ };
+ }
}
From 3fddff019e3602955618e21577406293ecfb68e9 Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:50:29 +0200
Subject: [PATCH 05/16] refactor(services): First cleanup of Connection API
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: Cedric Bös <16071970+cb0s@users.noreply.github.com>
---
.../connection/rework/RawMessage.java | 1 -
.../connection/rework/StaticSender.java | 30 ++++++++-----------
.../connection/rework/tcp/TcpDispatcher.java | 6 ++--
.../connection/rework/tcp/TcpTimeouts.java | 17 +++++++++--
4 files changed, 30 insertions(+), 24 deletions(-)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java
index c79cd109..a987e636 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java
@@ -4,7 +4,6 @@
import de.wuespace.telestion.api.message.JsonMessage;
public record RawMessage(@JsonProperty byte[] data) implements JsonMessage {
- @SuppressWarnings("unused")
private RawMessage() {
this(null);
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java
index 3e52dca8..32971826 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java
@@ -10,17 +10,23 @@
public class StaticSender extends AbstractVerticle {
+ public record Configuration(@JsonProperty String inAddress,
+ @JsonProperty String outAddress,
+ @JsonProperty ConnectionDetails staticDetails) implements JsonMessage {
+ private Configuration() {
+ this(null, null, null);
+ }
+ }
+
@Override
public void start(Promise startPromise) {
config = Config.get(config, config(), Configuration.class);
- vertx.eventBus().consumer(config.inAddress(), raw -> {
- JsonMessage.on(RawMessage.class, raw, msg -> {
- logger.debug("Sending static message");
- vertx.eventBus().publish(config.outAddress(), new ConnectionData(msg.data(),
- config.staticDetails()).json());
- });
- });
+ vertx.eventBus().consumer(config.inAddress(), raw -> JsonMessage.on(RawMessage.class, raw, msg -> {
+ logger.debug("Sending static message");
+ vertx.eventBus().publish(config.outAddress(), new ConnectionData(msg.data(),
+ config.staticDetails()).json());
+ }));
startPromise.complete();
}
@@ -30,16 +36,6 @@ public void stop(Promise stopPromise) {
stopPromise.complete();
}
- public record Configuration(@JsonProperty String inAddress,
- @JsonProperty String outAddress,
- @JsonProperty ConnectionDetails staticDetails) implements JsonMessage {
-
- @SuppressWarnings("unused")
- private Configuration() {
- this(null, null, null);
- }
- }
-
public StaticSender() {
this(null);
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java
index bf84dcf4..491626b9 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java
@@ -19,9 +19,9 @@ public void start(Promise startPromise) {
vertx.eventBus().consumer(config.inAddress(), raw -> {
if (!JsonMessage.on(SenderData.class, raw, msg -> Arrays.stream(msg.conDetails())
- .filter(det -> det instanceof TcpDetails)
- .map(det -> (TcpDetails) det)
- .forEach(det -> handle(msg.rawData(), det)))) {
+ .filter(details -> details instanceof TcpDetails)
+ .map(details -> (TcpDetails) details)
+ .forEach(details -> handle(msg.rawData(), details)))) {
JsonMessage.on(ConnectionData.class, raw, msg -> {
if (msg.conDetails() instanceof TcpDetails det) {
handle(msg.rawData(), det);
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java
index 135c082e..602b3144 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java
@@ -3,7 +3,18 @@
import java.time.Duration;
public class TcpTimeouts {
- public static final long NO_RESPONSES = -1; // Close tcp connection after first received packet
- public static final long NO_TIMEOUT = 0; // Never actively close active tcp connection
- public static final long DEFAULT_TIMEOUT = Duration.ofSeconds(30).toMillis(); // 30 secs.
+ /**
+ * Close tcp connection after first received packet.
+ */
+ public static final long NO_RESPONSES = -1;
+
+ /**
+ * Never actively close active tcp connection.
+ */
+ public static final long NO_TIMEOUT = 0;
+
+ /**
+ * The default TCP connection timeout.
+ */
+ public static final long DEFAULT_TIMEOUT = Duration.ofSeconds(30).toMillis();
}
From 9ed979f82977b6c6991838a052da15118a3759dd Mon Sep 17 00:00:00 2001
From: Ludwig Richter
Date: Mon, 2 Aug 2021 00:53:12 +0200
Subject: [PATCH 06/16] feat(services): Both the TCP server and client now wait
until every connection is closed.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Co-authored-by: Cedric Bös <16071970+cb0s@users.noreply.github.com>
---
.../connection/rework/tcp/TcpClient.java | 107 ++++++++-----
.../connection/rework/tcp/TcpServer.java | 143 ++++++++----------
2 files changed, 130 insertions(+), 120 deletions(-)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java
index f4bec650..0490c897 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java
@@ -5,7 +5,10 @@
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.services.connection.rework.ConnectionData;
import de.wuespace.telestion.services.connection.rework.Tuple;
+import io.reactivex.annotations.NonNull;
import io.vertx.core.AbstractVerticle;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
@@ -15,19 +18,33 @@
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
public final class TcpClient extends AbstractVerticle {
+ public final record Configuration(@JsonProperty String inAddress,
+ @JsonProperty String outAddress,
+ @JsonProperty long timeout) implements JsonMessage {
+ private Configuration() {
+ this(null, null, TcpTimeouts.DEFAULT_TIMEOUT);
+ }
+
+ public Configuration(@NonNull String inAddress, @NonNull String outAddress) {
+ this(inAddress, outAddress, TcpTimeouts.DEFAULT_TIMEOUT);
+ }
+ }
+
@Override
public void start(Promise startPromise) {
- config = Config.get(config, config(), Configuration.class);
+ config = Config.get(config, new Configuration(), config(), Configuration.class);
var options = new NetClientOptions();
options.setIdleTimeout(config.timeout() == TcpTimeouts.NO_RESPONSES
? (int) TcpTimeouts.NO_TIMEOUT : (int) config.timeout());
- currentClient = vertx.createNetClient(options);
+ currentClient = vertx.createNetClient(options);
activeClients = new HashMap<>();
vertx.eventBus().consumer(config.inAddress(), raw -> {
@@ -41,19 +58,18 @@ public void start(Promise startPromise) {
@Override
public void stop(Promise stopPromise) {
- stopPromise.complete();
- }
+ var composite = CompositeFuture.all(
+ activeClients.values().stream().map(NetSocket::close).collect(Collectors.toList())
+ );
+
+ composite.onComplete(result -> {
+ if (result.failed()) {
+ stopPromise.fail(result.cause());
+ return;
+ }
- public record Configuration(@JsonProperty String inAddress,
- @JsonProperty String outAddress,
- @JsonProperty long timeout) implements JsonMessage {
- /**
- * For json
- */
- @SuppressWarnings("unused")
- private Configuration() {
- this(null, null, 0L);
- }
+ stopPromise.complete();
+ });
}
public TcpClient() {
@@ -66,30 +82,37 @@ public TcpClient(Configuration config) {
private void handleDispatchedMsg(TcpData tcpData) {
var details = tcpData.details();
-
- // Checks if socket already exists, if not connects to Server and if successful sends data asynchronously
- vertx.executeBlocking(handler -> {
- if (!activeClients.containsKey(new Tuple<>(details.ip(), details.port()))) {
- currentClient.connect(details.port(), details.ip(), h -> {
- if (h.succeeded()) {
- onConnected(h.result());
- handler.complete();
- } else {
- logger.error("Failed to create new NetClient with {}:{}:", details.ip(), details.port(),
- h.cause());
- handler.fail(h.cause());
- }
- });
- } else {
- handler.complete();
+ var key = new Tuple<>(details.ip(), details.port());
+
+ // Checks if socket already exists.
+ // If not, connects to Server and if successful sends data asynchronously.
+ vertx.executeBlocking(promise -> {
+ if (activeClients.containsKey(key)) {
+ promise.complete();
+ return;
}
- }, handler -> {
- if (handler.succeeded()) {
- logger.debug("Sending data to {}:{}", details.ip(), details.port());
- activeClients.get(new Tuple<>(details.ip(), details.port())).write(Buffer.buffer(tcpData.data()));
- } else {
+
+ currentClient.connect(details.port(), details.ip(), result -> {
+ if (result.succeeded()) {
+ onConnected(result.result());
+ promise.complete();
+ return;
+ }
+
+ logger.error("Failed to create new NetClient with {}:{}:",
+ details.ip(), details.port(), result.cause());
+ promise.fail(result.cause());
+ });
+ }, result -> {
+ if (result.failed()) {
logger.warn("Due to an error the packet to {}:{} will be dropped", details.ip(), details.port());
+ return;
}
+
+ logger.debug("Sending data to {}:{}", details.ip(), details.port());
+ activeClients
+ .get(new Tuple<>(details.ip(), details.port()))
+ .write(Buffer.buffer(tcpData.data()));
});
}
@@ -104,7 +127,6 @@ private void onConnected(NetSocket socket) {
var key = new Tuple<>(ip, port);
activeClients.put(key, socket);
-
logger.info("Connection established ({}:{})", ip, port);
var packetIdCounter = new AtomicInteger(0);
@@ -112,13 +134,14 @@ private void onConnected(NetSocket socket) {
socket.handler(buffer -> {
var packetId = packetIdCounter.getAndIncrement();
logger.debug("New message received from Server ({}:{}, packetId={})", ip, port, packetId);
- vertx.eventBus().publish(config.outAddress(), new ConnectionData(buffer.getBytes(), new TcpDetails(ip, port,
- packetId)).json());
+ vertx.eventBus().publish(
+ config.outAddress(),
+ new ConnectionData(buffer.getBytes(), new TcpDetails(ip, port, packetId)).json()
+ );
});
socket.exceptionHandler(error -> {
- logger.error("Encountered an unexpected error (Server: {}:{})", ip, port,
- error);
+ logger.error("Encountered an unexpected error (Server: {}:{})", ip, port, error);
activeClients.remove(key);
});
@@ -132,14 +155,14 @@ private void handleMsg(ConnectionData data) {
var details = data.conDetails();
if (details instanceof TcpDetails tcpDet) {
handleDispatchedMsg(new TcpData(data.rawData(), tcpDet));
- } else { // Shouldn't happen due to Dispatcher
+ } else { // Shouldn't happen due to Dispatcher
logger.warn("Wrong connection detail type received. Packet will be dropped.");
// If there will be ever a logger for broken packets, send this
}
}
private Configuration config;
- private HashMap, NetSocket> activeClients;
+ private Map, NetSocket> activeClients;
private NetClient currentClient;
private static final Logger logger = LoggerFactory.getLogger(TcpClient.class);
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java
index f7d445f1..5762fd34 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java
@@ -5,10 +5,8 @@
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.services.connection.rework.ConnectionData;
import de.wuespace.telestion.services.connection.rework.Tuple;
-import io.vertx.core.AbstractVerticle;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
-import io.vertx.core.Promise;
+import io.reactivex.annotations.NonNull;
+import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
@@ -18,14 +16,16 @@
import java.time.Duration;
import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
- * An implementation of an unencrypted TCP-Server.
+ * An implementation of an unencrypted TCP-Server.
*
*
+ *
* Features are:
*
* - Opening new connections to TCP-Clients
@@ -37,78 +37,87 @@
*/
public final class TcpServer extends AbstractVerticle {
+ /**
+ * Configuration for this Verticle which can be loaded from a config.
+ * An optional timeout can be specified which is the consecutive time without any packets incoming or outgoing after
+ * which a client will be disconnected. Special timeouts can be found in {@link TcpTimeouts}.
+ *
+ * @param inAddress address on which the verticle listens on
+ * @param outAddress address on which the verticle publishes
+ * @param host host on which the Server-Socket should run
+ * @param port port on which the Server-Socket should run
+ * @param clientTimeout time until timeout
+ */
+ public final record Configuration(@JsonProperty String inAddress,
+ @JsonProperty String outAddress,
+ @JsonProperty String host,
+ @JsonProperty int port,
+ @JsonProperty long clientTimeout) implements JsonMessage {
+ private Configuration() {
+ this(null, null, "0.0.0.0", 0, TcpTimeouts.DEFAULT_TIMEOUT);
+ }
+
+ public Configuration(@NonNull String inAddress, @NonNull String outAddress, @NonNull String host, int port) {
+ this(inAddress, outAddress, host, port, TcpTimeouts.DEFAULT_TIMEOUT);
+ }
+ }
+
@Override
public void start(Promise startPromise) throws Exception {
- config = Config.get(config, config(), Configuration.class);
+ config = Config.get(config, new Configuration(), config(), Configuration.class);
// Unencrypted -> TODO: Implement functionality for encryption
var serverOptions = new NetServerOptions();
- serverOptions.setHost(config.hostAddress());
+ serverOptions.setHost(config.host());
serverOptions.setPort(config.port());
serverOptions.setIdleTimeout(config.clientTimeout() == TcpTimeouts.NO_RESPONSES
? (int) TcpTimeouts.NO_TIMEOUT : (int) config.clientTimeout());
serverOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS);
- activeCons = new HashMap<>();
+ activeCons = new HashMap<>();
server = vertx.createNetServer(serverOptions);
+
+ // setup
server.connectHandler(this::onConnected);
- server.exceptionHandler(handler -> logger.error("Encountered an unexpected error (Config: {})", config.json(),
- handler));
- server.listen(h -> complete(h, startPromise,
- r -> logger.info("Successfully started. Running on {}:{}", config.hostAddress(), r.actualPort())));
+ server.exceptionHandler(error ->
+ logger.error("Encountered an unexpected error (Config: {})", config.json(), error));
+
+ server.listen(handler -> {
+ if (handler.failed()) {
+ logger.warn("Could not start server on {}:{}", config.host(), config.port());
+ startPromise.fail(handler.cause());
+ return;
+ }
+
+ var port = handler.result().actualPort();
+ logger.info("Successfully started. Running on {}:{}", config.host(), port);
+ startPromise.complete();
+ });
vertx.eventBus().consumer(config.inAddress, raw -> {
if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) {
JsonMessage.on(ConnectionData.class, raw, this::handleMsg);
}
});
-
- startPromise.complete();
}
@Override
public void stop(Promise stopPromise) throws Exception {
- if (server != null) {
- activeCons.values().forEach(net -> net.close(handler -> {
- if (handler.failed()) {
- stopPromise.fail(handler.cause());
- }
- }));
- logger.info("Closing Server on {}:{}", config.hostAddress(), config.port());
- server.close();
+ if (server == null) {
+ stopPromise.complete();
+ return;
}
- // Wait for all Tcp connections to be closed successfully or fail in the process
- vertx.setTimer(Duration.ofSeconds(2).toMillis(), handler -> stopPromise.tryComplete());
- }
- /**
- * Configuration for this Verticle which can be loaded from a config.
- * An optional timeout can be specified which is the consecutive time without any packets incoming or outgoing after
- * which a client will be disconnected. Special timeouts can be found in {@link TcpTimeouts}.
- *
- * @param inAddress address on which the verticle listens on
- * @param outAddress address on which the verticle publishes
- * @param hostAddress host-address on which the Server-Socket should run
- * @param port port on which the Server-Socket should run
- * @param clientTimeout time until timeout
- */
- public record Configuration(@JsonProperty String inAddress,
- @JsonProperty String outAddress,
- @JsonProperty String hostAddress,
- @JsonProperty int port,
- @JsonProperty long clientTimeout) implements JsonMessage {
-
- /**
- * Used for reflection.
- */
- @SuppressWarnings("unused")
- private Configuration() {
- this(null, null, null, 0, 0L);
- }
+ logger.info("Closing Server on {}:{}", config.host(), config.port());
+ server.close(handler -> {
+ if (handler.failed()) {
+ logger.warn("Cannot close server on {}:{}", config.host(), config.port());
+ stopPromise.fail(handler.cause());
+ return;
+ }
- public Configuration(String inAddress, String outAddress, String hostAddress, int port) {
- this(inAddress, outAddress, hostAddress, port, TcpTimeouts.DEFAULT_TIMEOUT);
- }
+ stopPromise.complete();
+ });
}
public TcpServer() {
@@ -116,10 +125,6 @@ public TcpServer() {
}
public TcpServer(Configuration config) {
- if (config != null && (config.hostAddress() == null || config.hostAddress().equals(""))) {
- config = new Configuration(config.inAddress(), config.outAddress(), "localhost", config.port(),
- config.clientTimeout());
- }
this.config = config;
}
@@ -132,7 +137,6 @@ public boolean isActiveCon(Tuple key) {
return activeCons.containsKey(key);
}
- // @jvpichowski this is similar to TcpClient#onConnected, do you want to put this into one method?
private void onConnected(NetSocket netSocket) {
var ip = netSocket.remoteAddress().hostAddress();
var port = netSocket.remoteAddress().port();
@@ -165,23 +169,6 @@ private void onConnected(NetSocket netSocket) {
});
}
- /**
- * Completes a promise based on the success of a {@link AsyncResult}. If it was successful a handler will be called.
- *
- * @param result the result which is observed
- * @param promise the promise which will be completed
- * @param handler the handler which will be executed on a successful result
- * @param the type of the result
- */
- private static void complete(AsyncResult result, Promise> promise, Handler handler) {
- if (result.failed()) {
- promise.fail(result.cause());
- return;
- }
- handler.handle(result.result());
- promise.tryComplete();
- }
-
private void handleDispatchedMsg(TcpData data) {
var details = data.details();
@@ -207,9 +194,9 @@ private void handleDispatchedMsg(TcpData data) {
private void handleMsg(ConnectionData data) {
var details = data.conDetails();
- if (details instanceof TcpDetails tcpDet) {
- handleDispatchedMsg(new TcpData(data.rawData(), tcpDet));
- } else { // Shouldn't happen due to Dispatcher
+ if (details instanceof TcpDetails tcpDetails) {
+ handleDispatchedMsg(new TcpData(data.rawData(), tcpDetails));
+ } else { // Shouldn't happen due to Dispatcher
logger.warn("Wrong connection detail type received. Packet will be dropped.");
// If there will be ever a logger for broken packets, send this
}
@@ -217,7 +204,7 @@ private void handleMsg(ConnectionData data) {
private Configuration config;
private NetServer server;
- private HashMap, NetSocket> activeCons;
+ private Map, NetSocket> activeCons;
private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);
}
From 5665d236313c795a8ca25929b8de4e8324438108 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Sun, 14 Nov 2021 13:21:39 +0100
Subject: [PATCH 07/16] feat(connection): Enforce deprecation of old con. api
by module design
Move deprecated connection modules to legacy package and remove rework package to allow for one final breaking change once the new connection api is finished.
---
CHANGELOG.md | 2 +-
.../services/connection/{rework => }/ConnectionData.java | 2 +-
.../connection/{rework => }/ConnectionDetails.java | 2 +-
.../services/connection/{rework => }/RawMessage.java | 2 +-
.../services/connection/{rework => }/Receiver.java | 2 +-
.../services/connection/{rework => }/Sender.java | 2 +-
.../services/connection/{rework => }/SenderData.java | 2 +-
.../services/connection/{rework => }/StaticSender.java | 2 +-
.../telestion/services/connection/{rework => }/Tuple.java | 2 +-
.../services/connection/{ => legacy}/SerialConn.java | 2 +-
.../services/connection/{ => legacy}/SerialData.java | 2 +-
.../services/connection/{ => legacy}/TcpConn.java | 2 +-
.../connection/{rework => }/serial/SerialConn.java | 6 +++---
.../connection/{rework => }/serial/SerialDetails.java | 4 ++--
.../services/connection/{rework => }/tcp/TcpClient.java | 7 +++----
.../services/connection/{rework => }/tcp/TcpData.java | 2 +-
.../services/connection/{rework => }/tcp/TcpDetails.java | 4 ++--
.../connection/{rework => }/tcp/TcpDispatcher.java | 8 ++++----
.../services/connection/{rework => }/tcp/TcpServer.java | 6 +++---
.../services/connection/{rework => }/tcp/TcpTimeouts.java | 2 +-
.../services/connection/{rework => }/udp/UdpConn.java | 2 +-
.../services/connection/{rework => }/udp/UdpDetails.java | 4 ++--
22 files changed, 34 insertions(+), 35 deletions(-)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/ConnectionData.java (95%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/ConnectionDetails.java (80%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/RawMessage.java (79%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/Receiver.java (96%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/Sender.java (96%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/SenderData.java (95%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/StaticSender.java (95%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/Tuple.java (90%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{ => legacy}/SerialConn.java (97%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{ => legacy}/SerialData.java (84%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{ => legacy}/TcpConn.java (99%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/serial/SerialConn.java (93%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/serial/SerialDetails.java (70%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpClient.java (95%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpData.java (84%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpDetails.java (73%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpDispatcher.java (87%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpServer.java (97%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/tcp/TcpTimeouts.java (86%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/udp/UdpConn.java (62%)
rename modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/{rework => }/udp/UdpDetails.java (64%)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2eb98185..2e89c58b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -106,7 +106,7 @@
### Features
* **api:** Add new configuration provider which enables default values ([a973cd1](https://github.com/wuespace/telestion-core/commit/a973cd1f0d30513bcfcae655db156138f74b145a))
-* **connection:** Add configuration of baud rate to `SerialConn` verticle. Usages of the `de.wuespace.telestion.services.connection.rework.serial.SerialConn` now requires an additional parameter: `int baudRate`. To migrate, add this parameter to any application configuration using this verticle. The value used before is `9600`. Therefore, you can use `"baudRate": 9600` to match the old default configuration. ([ae4dad2](https://github.com/wuespace/telestion-core/commit/ae4dad2c9732047551ea74cca5b35b45bfd47f83))
+* **connection:** Add configuration of baud rate to `SerialConn` verticle. Usages of the `de.wuespace.telestion.services.connection.serial.SerialConn` now requires an additional parameter: `int baudRate`. To migrate, add this parameter to any application configuration using this verticle. The value used before is `9600`. Therefore, you can use `"baudRate": 9600` to match the old default configuration. ([ae4dad2](https://github.com/wuespace/telestion-core/commit/ae4dad2c9732047551ea74cca5b35b45bfd47f83))
## [0.3.0](https://github.com/wuespace/telestion-core/compare/v0.2.1...v0.3.0) (2021-05-08)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java
similarity index 95%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java
index ff73190d..d340a130 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java
similarity index 80%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java
index 527d5d99..93b62f42 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java
similarity index 79%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java
index a987e636..25667d32 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java
similarity index 96%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java
index c5a2c0bb..a011f0a9 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.vertx.core.AbstractVerticle;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java
similarity index 96%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java
index d076a328..a648d3b7 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java
similarity index 95%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java
index 3df45160..e53f5ea5 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
similarity index 95%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
index 32971826..a1fa268c 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
similarity index 90%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
index dc7fd27e..05e3d9f1 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework;
+package de.wuespace.telestion.services.connection;
import com.fasterxml.jackson.annotation.JsonProperty;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
similarity index 97%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
index 949cfaec..5170309a 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection;
+package de.wuespace.telestion.services.connection.legacy;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.services.monitoring.MessageLogger;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
similarity index 84%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
index 5ec5a6c6..c3f74a77 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection;
+package de.wuespace.telestion.services.connection.legacy;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
similarity index 99%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
index e9e60ea4..751adbc8 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection;
+package de.wuespace.telestion.services.connection.legacy;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
similarity index 93%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
index 11f3bce8..cb9eb868 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
@@ -1,11 +1,11 @@
-package de.wuespace.telestion.services.connection.rework.serial;
+package de.wuespace.telestion.services.connection.serial;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fazecast.jSerialComm.SerialPort;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.rework.ConnectionData;
-import de.wuespace.telestion.services.connection.rework.SenderData;
+import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.SenderData;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import org.slf4j.Logger;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java
similarity index 70%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java
index 697daccf..52dbc13f 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java
@@ -1,7 +1,7 @@
-package de.wuespace.telestion.services.connection.rework.serial;
+package de.wuespace.telestion.services.connection.serial;
import com.fasterxml.jackson.annotation.JsonProperty;
-import de.wuespace.telestion.services.connection.rework.ConnectionDetails;
+import de.wuespace.telestion.services.connection.ConnectionDetails;
/**
*
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
similarity index 95%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
index 0490c897..b8bc2fde 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
@@ -1,14 +1,13 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.rework.ConnectionData;
-import de.wuespace.telestion.services.connection.rework.Tuple;
+import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.Tuple;
import io.reactivex.annotations.NonNull;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
-import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java
similarity index 84%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java
index 69e43cfb..74b89713 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
similarity index 73%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
index b18fa9ff..ad952063 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
@@ -1,7 +1,7 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import com.fasterxml.jackson.annotation.JsonProperty;
-import de.wuespace.telestion.services.connection.rework.ConnectionDetails;
+import de.wuespace.telestion.services.connection.ConnectionDetails;
public record TcpDetails(@JsonProperty String ip,
@JsonProperty int port,
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
similarity index 87%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
index 491626b9..a1c65f3b 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
@@ -1,11 +1,11 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.rework.ConnectionData;
-import de.wuespace.telestion.services.connection.rework.SenderData;
-import de.wuespace.telestion.services.connection.rework.Tuple;
+import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.SenderData;
+import de.wuespace.telestion.services.connection.Tuple;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
similarity index 97%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
index 5762fd34..fdb561a1 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
@@ -1,10 +1,10 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.rework.ConnectionData;
-import de.wuespace.telestion.services.connection.rework.Tuple;
+import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.Tuple;
import io.reactivex.annotations.NonNull;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
similarity index 86%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
index 602b3144..53c439bc 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework.tcp;
+package de.wuespace.telestion.services.connection.tcp;
import java.time.Duration;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java
similarity index 62%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java
index 5cf3eecb..62beed10 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java
@@ -1,4 +1,4 @@
-package de.wuespace.telestion.services.connection.rework.udp;
+package de.wuespace.telestion.services.connection.udp;
public class UdpConn {
public UdpConn() {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java
similarity index 64%
rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java
rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java
index 54ad7070..d5b0c51f 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java
@@ -1,7 +1,7 @@
-package de.wuespace.telestion.services.connection.rework.udp;
+package de.wuespace.telestion.services.connection.udp;
import com.fasterxml.jackson.annotation.JsonProperty;
-import de.wuespace.telestion.services.connection.rework.ConnectionDetails;
+import de.wuespace.telestion.services.connection.ConnectionDetails;
public record UdpDetails(@JsonProperty String ip,
@JsonProperty int port,
From 2588ecd19d7b8461797abb56085ccbdb68d7441c Mon Sep 17 00:00:00 2001
From: Cedric
Date: Sun, 14 Nov 2021 14:06:13 +0100
Subject: [PATCH 08/16] test(connection): Correct imports of old connection-api
tests
---
.../de/wuespace/telestion/services/connection/ConnApiTest.java | 2 +-
.../de/wuespace/telestion/services/connection/TcpConnTest.java | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java
index af2719de..20a5a2be 100644
--- a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java
+++ b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java
@@ -2,7 +2,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.rework.*;
+import de.wuespace.telestion.services.connection.*;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
diff --git a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java
index cfe42ee2..257649d4 100644
--- a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java
+++ b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java
@@ -3,6 +3,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import de.wuespace.telestion.services.connection.legacy.TcpConn;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
From 78e4175cffa47e6d93137433b5471b5fc4c2d6f8 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Sun, 14 Nov 2021 18:44:45 +0100
Subject: [PATCH 09/16] docs(connection): Add docs to old con api and prepare
it for removal
---
.../connection/legacy/SerialConn.java | 7 ++++++
.../connection/legacy/SerialData.java | 8 +++++++
.../services/connection/legacy/TcpConn.java | 6 ++++-
.../connection/legacy/package-info.java | 22 +++++++++++++++++++
4 files changed, 42 insertions(+), 1 deletion(-)
create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
index 5170309a..25c318a5 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
@@ -15,6 +15,13 @@
import de.wuespace.telestion.api.message.JsonMessage;
import com.fazecast.jSerialComm.*;
+/**
+ * Implements a serial connection like UART. This module is part of the old connection api which is planned to be
+ * removed in v0.7
+ *
+ * @author Jan von Pichowski
+ * @deprecated will be removed in v0.7
+ */
@Deprecated(since = "v0.1.3", forRemoval = true)
public final class SerialConn extends AbstractVerticle {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
index c3f74a77..30208a23 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
@@ -3,6 +3,14 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.message.JsonMessage;
+/**
+ * Connection data from the serial connection ({@link SerialConn}) which will be serialized into json to put it on the
+ * vertx event bus.
+ * It is part of the old connection api and will be removed in v0.7
+ *
+ * @author Jan von Pichowski
+ * @deprecated will be removed in v0.7
+ */
@Deprecated(since = "v0.1.3", forRemoval = true)
public record SerialData(@JsonProperty byte[] data) implements JsonMessage {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
index 751adbc8..c33f3432 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
@@ -19,7 +19,11 @@
* This class opens a tcp connection. This could either be a tcp client to a host or a host which accepts new clients.
* It is configured with the file based config pattern. After a connection is established the {@link Participant} is
* published on the event bus. Incoming messages are published to the event bus too. The connection listens to the event
- * bus and send incoming messages over tcp to the participant. All addresses are defined in the configuration.
+ * bus and send incoming messages over tcp to the participant. All addresses are defined in the configuration.
+ * It is planned to be removed in v0.7.
+ *
+ * @author Jan von Pichowski
+ * @deprecated will be removed in v0.7
*/
@Deprecated(since = "v0.1.3", forRemoval = true)
public final class TcpConn extends AbstractVerticle {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
new file mode 100644
index 00000000..6fdcbbff
--- /dev/null
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Old connection api
+ *
+ * Classes from the old connection api.
+ * The old connection api contains the following network interfaces:
+ *
+ * - Serial ({@link de.wuespace.telestion.services.connection.legacy.SerialConn})
+ * - Tcp ({@link de.wuespace.telestion.services.connection.legacy.TcpConn})
+ *
+ *
+ *
+ * Disclaimer:
+ * Classes within this package are all marked as deprecated and will be removed soon.
+ * There will be no new features and bugs will not be fixed.
+ *
+ *
+ * @since 0.6
+ * @author Cedric Boes
+ * @version 1.0
+ * @deprecated the old connection api was replaced with the new connection api and will be removed in v0.7
+ */
+package de.wuespace.telestion.services.connection.legacy;
From 8d8ee73fee5a3e41101539b531e1b616438473eb Mon Sep 17 00:00:00 2001
From: Cedric
Date: Mon, 15 Nov 2021 13:42:31 +0100
Subject: [PATCH 10/16] docs(connection): Add documentation to legacy API
Adds basics documentation to the class of the legacy API.
---
.../telestion/services/connection/legacy/SerialConn.java | 4 ++--
.../telestion/services/connection/legacy/SerialData.java | 4 ++--
.../telestion/services/connection/legacy/TcpConn.java | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
index 25c318a5..52797ccd 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java
@@ -17,9 +17,9 @@
/**
* Implements a serial connection like UART. This module is part of the old connection api which is planned to be
- * removed in v0.7
+ * removed in v0.7.
*
- * @author Jan von Pichowski
+ * @author Jan von Pichowski (jvpichowski)
* @deprecated will be removed in v0.7
*/
@Deprecated(since = "v0.1.3", forRemoval = true)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
index 30208a23..92bba02d 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java
@@ -6,9 +6,9 @@
/**
* Connection data from the serial connection ({@link SerialConn}) which will be serialized into json to put it on the
* vertx event bus.
- * It is part of the old connection api and will be removed in v0.7
+ * It is part of the old connection api and will be removed in v0.7.
*
- * @author Jan von Pichowski
+ * @author Jan von Pichowski (jvpichowski)
* @deprecated will be removed in v0.7
*/
@Deprecated(since = "v0.1.3", forRemoval = true)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
index c33f3432..50d854d3 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java
@@ -22,7 +22,7 @@
* bus and send incoming messages over tcp to the participant. All addresses are defined in the configuration.
* It is planned to be removed in v0.7.
*
- * @author Jan von Pichowski
+ * @author Jan von Pichowski (jvpichowski)
* @deprecated will be removed in v0.7
*/
@Deprecated(since = "v0.1.3", forRemoval = true)
From 045717c62ddb9583c80a18baee11ad57cf029573 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Mon, 15 Nov 2021 14:53:29 +0100
Subject: [PATCH 11/16] chore(services): Delete unused package
`de.wuespace.telestion.services.protocol`
---
.../main/java/de/wuespace/telestion/services/protocol/.gitkeep | 0
1 file changed, 0 insertions(+), 0 deletions(-)
delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/protocol/.gitkeep
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/protocol/.gitkeep b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/protocol/.gitkeep
deleted file mode 100644
index e69de29b..00000000
From 18f127a907a31326fe0954bff4d3447de08a41d6 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Tue, 16 Nov 2021 01:04:21 +0100
Subject: [PATCH 12/16] feat(connection): Replace Tuple with IpDetails which is
optimized for this specific usecase
---
.../services/connection/IpDetails.java | 13 +++++++++
.../telestion/services/connection/Tuple.java | 29 -------------------
.../services/connection/tcp/TcpClient.java | 10 +++----
.../connection/tcp/TcpDispatcher.java | 4 +--
.../services/connection/tcp/TcpServer.java | 10 +++----
5 files changed, 25 insertions(+), 41 deletions(-)
create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java
delete mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java
new file mode 100644
index 00000000..e5463779
--- /dev/null
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java
@@ -0,0 +1,13 @@
+package de.wuespace.telestion.services.connection;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import de.wuespace.telestion.api.message.JsonMessage;
+
+public record IpDetails(@JsonProperty
+ String ip,
+ @JsonProperty
+ int port) implements JsonMessage {
+ public IpDetails() {
+ this("0.0.0.0", 0);
+ }
+}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
deleted file mode 100644
index 05e3d9f1..00000000
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Tuple.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.wuespace.telestion.services.connection;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * A simple implementation of a Value-Pair whose values are linked together without any key - value relationship.
- *
- * @param Type 1
- * @param Type 2
- *
- * @param value1 Value of Type 1
- * @param value2 Value of Type 2
- *
- * @author Cedric Boes
- * @version 1.0
- */
-public record Tuple(
- @JsonProperty T1 value1,
- @JsonProperty T2 value2) {
-
- /**
- * This is only for reflection of Config-Loading!
- */
- @SuppressWarnings("unused")
- private Tuple() {
- this(null, null);
- }
-
-}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
index b8bc2fde..2bb241ee 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
@@ -4,7 +4,7 @@
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.services.connection.ConnectionData;
-import de.wuespace.telestion.services.connection.Tuple;
+import de.wuespace.telestion.services.connection.IpDetails;
import io.reactivex.annotations.NonNull;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
@@ -81,7 +81,7 @@ public TcpClient(Configuration config) {
private void handleDispatchedMsg(TcpData tcpData) {
var details = tcpData.details();
- var key = new Tuple<>(details.ip(), details.port());
+ var key = new IpDetails(details.ip(), details.port());
// Checks if socket already exists.
// If not, connects to Server and if successful sends data asynchronously.
@@ -110,7 +110,7 @@ private void handleDispatchedMsg(TcpData tcpData) {
logger.debug("Sending data to {}:{}", details.ip(), details.port());
activeClients
- .get(new Tuple<>(details.ip(), details.port()))
+ .get(new IpDetails(details.ip(), details.port()))
.write(Buffer.buffer(tcpData.data()));
});
}
@@ -123,7 +123,7 @@ private void handleDispatchedMsg(TcpData tcpData) {
private void onConnected(NetSocket socket) {
var ip = socket.remoteAddress().host();
var port = socket.remoteAddress().port();
- var key = new Tuple<>(ip, port);
+ var key = new IpDetails(ip, port);
activeClients.put(key, socket);
logger.info("Connection established ({}:{})", ip, port);
@@ -161,7 +161,7 @@ private void handleMsg(ConnectionData data) {
}
private Configuration config;
- private Map, NetSocket> activeClients;
+ private Map activeClients;
private NetClient currentClient;
private static final Logger logger = LoggerFactory.getLogger(TcpClient.class);
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
index a1c65f3b..8f8741a5 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
@@ -4,8 +4,8 @@
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.IpDetails;
import de.wuespace.telestion.services.connection.SenderData;
-import de.wuespace.telestion.services.connection.Tuple;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
@@ -59,7 +59,7 @@ public TcpDispatcher(Configuration config, TcpServer... servers) {
private void handle(byte[] bytes, TcpDetails details) {
for (var server : servers) {
- if (server.isActiveCon(new Tuple<>(details.ip(), details.port()))) {
+ if (server.isActiveCon(new IpDetails(details.ip(), details.port()))) {
vertx.eventBus().publish(server.getConfig().inAddress(),
new TcpData(bytes, details).json());
} else {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
index fdb561a1..fea01ed0 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
@@ -4,7 +4,7 @@
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
import de.wuespace.telestion.services.connection.ConnectionData;
-import de.wuespace.telestion.services.connection.Tuple;
+import de.wuespace.telestion.services.connection.IpDetails;
import io.reactivex.annotations.NonNull;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
@@ -133,14 +133,14 @@ public Configuration getConfig() {
}
// Public for own Dispatcher implementations
- public boolean isActiveCon(Tuple key) {
+ public boolean isActiveCon(IpDetails key) {
return activeCons.containsKey(key);
}
private void onConnected(NetSocket netSocket) {
var ip = netSocket.remoteAddress().hostAddress();
var port = netSocket.remoteAddress().port();
- var key = new Tuple<>(ip, port);
+ var key = new IpDetails(ip, port);
logger.info("Connection established with {}:{}", ip, port);
activeCons.put(key, netSocket);
@@ -172,7 +172,7 @@ private void onConnected(NetSocket netSocket) {
private void handleDispatchedMsg(TcpData data) {
var details = data.details();
- var element = activeCons.get(new Tuple<>(details.ip(), details.port()));
+ var element = activeCons.get(new IpDetails(details.ip(), details.port()));
// Might be useful to send this to the TCP-Client however the config must be varied for this (future update?)
if (element == null) {
@@ -204,7 +204,7 @@ private void handleMsg(ConnectionData data) {
private Configuration config;
private NetServer server;
- private Map, NetSocket> activeCons;
+ private Map activeCons;
private static final Logger logger = LoggerFactory.getLogger(TcpServer.class);
}
From 9e55dea905f5f5fcce404ab622026d3cc6c75aaf Mon Sep 17 00:00:00 2001
From: Cedric
Date: Tue, 16 Nov 2021 01:08:33 +0100
Subject: [PATCH 13/16] feat(connection): Add Broadcaster verticle and improve
java docs
---
.../services/connection/Broadcaster.java | 84 +++++++++++++++++
.../connection/legacy/package-info.java | 8 +-
.../services/connection/package-info.java | 91 +++++++++++++++++++
3 files changed, 179 insertions(+), 4 deletions(-)
create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
new file mode 100644
index 00000000..7fbf4eb1
--- /dev/null
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
@@ -0,0 +1,84 @@
+package de.wuespace.telestion.services.connection;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import de.wuespace.telestion.api.config.Config;
+import de.wuespace.telestion.api.message.JsonMessage;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Promise;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class Broadcaster extends AbstractVerticle {
+
+ public final static int DEFAULT_ID = 0;
+
+ public final record Configuration(@JsonProperty
+ String inAddress,
+ @JsonProperty
+ int id) implements JsonMessage {
+ public Configuration() {
+ this(null, DEFAULT_ID);
+ }
+ }
+
+ @Override
+ public void start(Promise startPromise) {
+ this.config = Config.get(this.config, new Configuration(), this.config(), Configuration.class);
+
+ if (broadcasterMap.containsKey(this.config.id())) {
+ startPromise.fail("The broadcasters id #%d is already taken!".formatted(this.config.id()));
+ return;
+ }
+
+ startPromise.complete();
+ }
+
+ @Override
+ public void stop(Promise stopPromise) {
+ stopPromise.complete();
+ }
+
+ public static boolean register(int broadcasterId, String address) {
+ if (!broadcasterMap.containsKey(broadcasterId)) {
+ return false;
+ }
+
+ var broadcaster = broadcasterMap.get(broadcasterId);
+ broadcaster.addressList.add(address);
+
+ broadcaster.getVertx().eventBus()
+ .consumer(broadcaster.config.inAddress(), raw -> {
+ if (!JsonMessage.on(RawMessage.class, raw, broadcaster::send)) {
+ if (!JsonMessage.on(SenderData.class, raw, broadcaster::send)) {
+ JsonMessage.on(ConnectionData.class, raw, broadcaster::send);
+ }
+ }
+ });
+ return true;
+ }
+
+ public String[] getAddresses() {
+ return addressList.toArray(String[]::new);
+ }
+
+ public Broadcaster() {
+ this(null);
+ }
+
+ public Broadcaster(Configuration config) {
+ this.config = config;
+ this.addressList = new HashSet<>();
+ }
+
+ private void send(JsonMessage msg) {
+ addressList.forEach(addr -> vertx.eventBus().publish(addr, msg.json()));
+ }
+
+ private Configuration config;
+ private final Set addressList;
+
+ private static final Map broadcasterMap = new HashMap<>();
+}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
index 6fdcbbff..de0c9dce 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java
@@ -3,11 +3,11 @@
*
* Classes from the old connection api.
* The old connection api contains the following network interfaces:
- *
- * - Serial ({@link de.wuespace.telestion.services.connection.legacy.SerialConn})
- * - Tcp ({@link de.wuespace.telestion.services.connection.legacy.TcpConn})
- *
*
+ *
+ * - Serial ({@link de.wuespace.telestion.services.connection.legacy.SerialConn})
+ * - Tcp ({@link de.wuespace.telestion.services.connection.legacy.TcpConn})
+ *
*
* Disclaimer:
* Classes within this package are all marked as deprecated and will be removed soon.
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java
new file mode 100644
index 00000000..8d5e9e5f
--- /dev/null
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java
@@ -0,0 +1,91 @@
+/**
+ * Connection API
+ *
+ * Short description:
+ *
+ * This module contains the connection api.
+ * It provides a simplified interface for the most commonly used network interfaces in ground-stations and is
+ * optimized for the VertX framework.
+ *
+ *
+ * Supported network interfaces:
+ *
+ * - TCP
+ * - UDP
+ * - Serial connection (e.g. UART)
+ *
+ *
+ * General Concepts:
+ *
+ * The general concept of the connection api is to allow verticles to implement networking code without knowing the
+ * specifics of the interface itself. This is achieved by the following concepts:
+ *
+ *
+ * - Sender-Api
+ * - Receiver-Api
+ * - Static-Sending
+ * - Special Concepts
+ * - Manual Mode
+ *
+ *
+ * {@link de.wuespace.telestion.services.connection.Sender Sender-Api}:
+ *
+ * {@link de.wuespace.telestion.services.connection.ConnectionData} or
+ * {@link de.wuespace.telestion.services.connection.SenderData} sent to this class will be rerouted to the
+ * designated network interface or their dispatchers. If a connection is not, yet, established the default
+ * behaviour is to try to create a new connection to the target.
+ *
+ *
+ *
+ * Note:
+ * This can fail and the package can be dropped if the targeted network interface controller does not support
+ * informing about failed packet delivery!
+ *
+ *
+ *
+ * {@link de.wuespace.telestion.services.connection.Receiver Receiver-Api}:
+ *
+ * If this interface is used all incoming {@link de.wuespace.telestion.services.connection.ConnectionData packages}
+ * will be routed to its output address.
+ * This allows parsers to listen on multiple network connections at once. Answers then can be sent to the
+ * Sender-Api again.
+ *
+ *
+ * {@link de.wuespace.telestion.services.connection.StaticSender Static-Sending}:
+ *
+ * In cases where the receiver is already known at start-up, the
+ * {@link de.wuespace.telestion.services.connection.StaticSender} which can be fed with
+ * {@link de.wuespace.telestion.services.connection.RawMessage RawMessages} which do not require network interface
+ * information.
+ * The static sender will automatically send the packages to the connection specified in its config.
+ *
+ *
+ * Special Concepts:
+ *
+ * There are more complex concepts than the previously described ones. This allows for more advanced or network
+ * specific structures.
+ *
+ * Connection established:
+ *
+ * There are cases when one needs to know when a new connection has been established. In this case the XXXX will
+ * yield the {@link de.wuespace.telestion.services.connection.ConnectionDetails} specifying the connection.
+ *
+ *
+ * {@link de.wuespace.telestion.services.connection.Broadcaster Broadcasting}:
+ *
+ * To send information to all active network interfaces (which support broadcasting), a
+ * {@link de.wuespace.telestion.services.connection.Broadcaster} verticle needs to be added.
+ *
+ *
+ * Manual Mode:
+ *
+ * In rare cases where the previously described abstractions cannot be applied,
+ * there is still the possibility to use the network interfaces directly. For this refer to the package
+ * descriptions of the designated packages.
+ *
+ *
+ * @since 0.2.0
+ * @version 1.0
+ * @author Cedric Boes (cb0s)
+ */
+package de.wuespace.telestion.services.connection;
From 9d60adb602fb9b8ca095eeb1d569ba0033aa2fd7 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Tue, 16 Nov 2021 01:31:31 +0100
Subject: [PATCH 14/16] chore(services): Implement code optimizations proposed
by IntelliJ
---
.../telestion/services/monitoring/HystrixMetrics.java | 2 +-
.../java/de/wuespace/telestion/services/web/WebServer.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java
index 5a59b8f2..5a70e9f7 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java
@@ -38,7 +38,7 @@ public void start(Promise startPromise) throws Exception {
logger.info("Started {} with config {}", HystrixMetrics.class.getSimpleName(), config);
}
- @SuppressWarnings({ "preview", "unused" })
+ @SuppressWarnings({"unused" })
private static record Configuration(@JsonProperty int port, @JsonProperty String path) {
private Configuration() {
this(8080, "/hystrix-metrics");
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java
index 7e02a5d4..d26475e8 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java
@@ -17,7 +17,7 @@
*/
public final class WebServer extends AbstractVerticle {
- private Configuration forcedConfig;
+ private final Configuration forcedConfig;
/**
* Creating a WebServer on the specified port.
@@ -60,7 +60,7 @@ public void start(Promise startPromise) throws Exception {
*
* @param port the port to bind to
*/
- @SuppressWarnings({ "preview", "unused" })
+ @SuppressWarnings({"unused" })
private static record Configuration(@JsonProperty int port) {
private Configuration() {
this(8080);
From 5970abafe42a057e516f7808a601125e08a701b3 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Tue, 16 Nov 2021 16:28:02 +0100
Subject: [PATCH 15/16] feat(connection): Modify the network interfaces to
support broadcasting
---
.../services/connection/Broadcaster.java | 35 ++++++++++---
.../connection/serial/SerialConn.java | 16 +++---
.../connection/tcp/BaseTcpVerticle.java | 30 +++++++++++
.../services/connection/tcp/TcpClient.java | 46 ++++++++---------
.../services/connection/tcp/TcpDetails.java | 5 ++
.../connection/tcp/TcpDispatcher.java | 28 ++++++----
.../services/connection/tcp/TcpServer.java | 51 ++++++++++---------
.../services/connection/tcp/TcpTimeouts.java | 2 +-
8 files changed, 139 insertions(+), 74 deletions(-)
create mode 100644 modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
index 7fbf4eb1..2cb6f77a 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
@@ -5,14 +5,14 @@
import de.wuespace.telestion.api.message.JsonMessage;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
public class Broadcaster extends AbstractVerticle {
+ public final static int NO_BROADCASTING = -1;
public final static int DEFAULT_ID = 0;
public final record Configuration(@JsonProperty
@@ -33,6 +33,8 @@ public void start(Promise startPromise) {
return;
}
+ broadcasterMap.put(this.config.id(), this);
+
startPromise.complete();
}
@@ -41,8 +43,20 @@ public void stop(Promise stopPromise) {
stopPromise.complete();
}
+ /**
+ * Registers a new address for the broadcaster with the given id if it was previously specified in the config.
+ *
+ * @param broadcasterId 0 = default broadcaster, must be >= 0
+ * @param address Address on VertX bus it must be sent to
+ * @return if registration process was successful
+ */
public static boolean register(int broadcasterId, String address) {
- if (!broadcasterMap.containsKey(broadcasterId)) {
+ if (broadcasterId == NO_BROADCASTING) {
+ return true;
+ }
+
+ if (!broadcasterMap.containsKey(broadcasterId) || broadcasterId < 0) {
+ logger.error("Setup invalid!");
return false;
}
@@ -52,8 +66,12 @@ public static boolean register(int broadcasterId, String address) {
broadcaster.getVertx().eventBus()
.consumer(broadcaster.config.inAddress(), raw -> {
if (!JsonMessage.on(RawMessage.class, raw, broadcaster::send)) {
- if (!JsonMessage.on(SenderData.class, raw, broadcaster::send)) {
- JsonMessage.on(ConnectionData.class, raw, broadcaster::send);
+ if (!JsonMessage.on(SenderData.class,
+ raw,
+ msg -> broadcaster.send(new RawMessage(msg.rawData())))) {
+ JsonMessage.on(ConnectionData.class,
+ raw,
+ msg -> broadcaster.send(new RawMessage(msg.rawData())));
}
}
});
@@ -73,7 +91,7 @@ public Broadcaster(Configuration config) {
this.addressList = new HashSet<>();
}
- private void send(JsonMessage msg) {
+ private void send(RawMessage msg) {
addressList.forEach(addr -> vertx.eventBus().publish(addr, msg.json()));
}
@@ -81,4 +99,5 @@ private void send(JsonMessage msg) {
private final Set addressList;
private static final Map broadcasterMap = new HashMap<>();
+ private static Logger logger = LoggerFactory.getLogger(Broadcaster.class);
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
index cb9eb868..6db783b2 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java
@@ -4,6 +4,7 @@
import com.fazecast.jSerialComm.SerialPort;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
+import de.wuespace.telestion.services.connection.Broadcaster;
import de.wuespace.telestion.services.connection.ConnectionData;
import de.wuespace.telestion.services.connection.SenderData;
import io.vertx.core.AbstractVerticle;
@@ -22,7 +23,7 @@ public final class SerialConn extends AbstractVerticle {
@Override
public void start(Promise startPromise) {
- config = Config.get(config, config(), Configuration.class);
+ config = Config.get(config, new Configuration(), config(), Configuration.class);
logger.info("Opening Connection on {}", config.serialPort());
@@ -33,6 +34,8 @@ public void start(Promise startPromise) {
}
// TODO: Add more feature-rich implementation using serialPort.setComPortParameters()
+ Broadcaster.register(config.broadcasterId(), config.inAddress());
+
// In
vertx.setPeriodic(config.sampleTime(), event -> {
try {
@@ -80,14 +83,11 @@ public record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
@JsonProperty String serialPort,
@JsonProperty int baudRate,
- @JsonProperty long sampleTime) implements JsonMessage {
+ @JsonProperty long sampleTime,
+ @JsonProperty int broadcasterId) implements JsonMessage {
@SuppressWarnings("unused")
- private Configuration() {
- this(null, null, null, 9600, 0L);
- }
-
- public Configuration(String inAddress, String outAddress, String serialPort) {
- this(inAddress, outAddress, serialPort, 9600, 100);
+ public Configuration() {
+ this(null, null, null, 9_600, 0L, Broadcaster.NO_BROADCASTING);
}
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java
new file mode 100644
index 00000000..d33fdd5a
--- /dev/null
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java
@@ -0,0 +1,30 @@
+package de.wuespace.telestion.services.connection.tcp;
+
+import de.wuespace.telestion.api.message.JsonMessage;
+import de.wuespace.telestion.services.connection.ConnectionData;
+import de.wuespace.telestion.services.connection.IpDetails;
+import de.wuespace.telestion.services.connection.RawMessage;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.eventbus.Message;
+
+import java.util.Map;
+
+public abstract class BaseTcpVerticle extends AbstractVerticle {
+
+ protected void consumer(Message raw, Map activeClients) {
+ if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) {
+ if (!JsonMessage.on(ConnectionData.class, raw, this::handleMsg)) {
+ // Broadcasting
+ JsonMessage.on(RawMessage.class, raw,
+ // Why not send directly? Well, we want logging and potential future updates
+ msg -> activeClients.keySet().forEach(
+ k -> this.handleDispatchedMsg(
+ new TcpData(msg.data(),
+ TcpDetails.fromIpDetails(k)))));
+ }
+ }
+ }
+
+ protected abstract void handleMsg(ConnectionData data);
+ protected abstract void handleDispatchedMsg(TcpData tcpData);
+}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
index 2bb241ee..54742fbf 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java
@@ -3,8 +3,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
+import de.wuespace.telestion.services.connection.Broadcaster;
import de.wuespace.telestion.services.connection.ConnectionData;
import de.wuespace.telestion.services.connection.IpDetails;
+import de.wuespace.telestion.services.connection.RawMessage;
import io.reactivex.annotations.NonNull;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.CompositeFuture;
@@ -21,17 +23,14 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public final class TcpClient extends AbstractVerticle {
+public final class TcpClient extends BaseTcpVerticle {
public final record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
- @JsonProperty long timeout) implements JsonMessage {
- private Configuration() {
- this(null, null, TcpTimeouts.DEFAULT_TIMEOUT);
- }
-
- public Configuration(@NonNull String inAddress, @NonNull String outAddress) {
- this(inAddress, outAddress, TcpTimeouts.DEFAULT_TIMEOUT);
+ @JsonProperty long timeout,
+ @JsonProperty int broadcasterId) implements JsonMessage {
+ public Configuration() {
+ this(null, null, TcpTimeouts.DEFAULT_TIMEOUT, Broadcaster.DEFAULT_ID);
}
}
@@ -46,12 +45,9 @@ public void start(Promise startPromise) {
currentClient = vertx.createNetClient(options);
activeClients = new HashMap<>();
- vertx.eventBus().consumer(config.inAddress(), raw -> {
- if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) {
- JsonMessage.on(ConnectionData.class, raw, this::handleMsg);
- }
- });
+ Broadcaster.register(config.broadcasterId(), config.inAddress());
+ vertx.eventBus().consumer(config.inAddress(), handler -> this.consumer(handler, activeClients));
startPromise.complete();
}
@@ -79,7 +75,17 @@ public TcpClient(Configuration config) {
this.config = config;
}
- private void handleDispatchedMsg(TcpData tcpData) {
+ protected void handleMsg(ConnectionData data) {
+ var details = data.conDetails();
+ if (details instanceof TcpDetails tcpDet) {
+ handleDispatchedMsg(new TcpData(data.rawData(), tcpDet));
+ } else { // Shouldn't happen due to Dispatcher
+ logger.warn("Wrong connection detail type received. Packet will be dropped.");
+ // If there will ever be a logger for broken packets, send this to it
+ }
+ }
+
+ protected void handleDispatchedMsg(TcpData tcpData) {
var details = tcpData.details();
var key = new IpDetails(details.ip(), details.port());
@@ -109,6 +115,8 @@ private void handleDispatchedMsg(TcpData tcpData) {
}
logger.debug("Sending data to {}:{}", details.ip(), details.port());
+ // TcpClient.write() is non-blocking and returns a Future which theoretically could be used to monitor the
+ // success of this process if needed in the future. (maybe inform if dropped?)
activeClients
.get(new IpDetails(details.ip(), details.port()))
.write(Buffer.buffer(tcpData.data()));
@@ -150,16 +158,6 @@ private void onConnected(NetSocket socket) {
});
}
- private void handleMsg(ConnectionData data) {
- var details = data.conDetails();
- if (details instanceof TcpDetails tcpDet) {
- handleDispatchedMsg(new TcpData(data.rawData(), tcpDet));
- } else { // Shouldn't happen due to Dispatcher
- logger.warn("Wrong connection detail type received. Packet will be dropped.");
- // If there will be ever a logger for broken packets, send this
- }
- }
-
private Configuration config;
private Map activeClients;
private NetClient currentClient;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
index ad952063..adabfd7f 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java
@@ -2,6 +2,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.services.connection.ConnectionDetails;
+import de.wuespace.telestion.services.connection.IpDetails;
public record TcpDetails(@JsonProperty String ip,
@JsonProperty int port,
@@ -18,4 +19,8 @@ private TcpDetails() {
public TcpDetails(String ip, int port) {
this(ip, port, 0);
}
+
+ public static TcpDetails fromIpDetails(IpDetails details) {
+ return new TcpDetails(details.ip(), details.port());
+ }
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
index 8f8741a5..156706b0 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
@@ -3,30 +3,38 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
-import de.wuespace.telestion.services.connection.ConnectionData;
-import de.wuespace.telestion.services.connection.IpDetails;
-import de.wuespace.telestion.services.connection.SenderData;
+import de.wuespace.telestion.services.connection.*;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
+import java.text.BreakIterator;
import java.util.Arrays;
public class TcpDispatcher extends AbstractVerticle {
@Override
public void start(Promise startPromise) {
- config = Config.get(config, config(), Configuration.class);
+ config = Config.get(config, new Configuration(), config(), Configuration.class);
+
+ Broadcaster.register(config.broadcasterId(), config.inAddress());
vertx.eventBus().consumer(config.inAddress(), raw -> {
if (!JsonMessage.on(SenderData.class, raw, msg -> Arrays.stream(msg.conDetails())
.filter(details -> details instanceof TcpDetails)
.map(details -> (TcpDetails) details)
.forEach(details -> handle(msg.rawData(), details)))) {
- JsonMessage.on(ConnectionData.class, raw, msg -> {
+ if (!JsonMessage.on(ConnectionData.class, raw, msg -> {
if (msg.conDetails() instanceof TcpDetails det) {
handle(msg.rawData(), det);
}
- });
+ })) {
+ // Broadcasting
+ JsonMessage.on(RawMessage.class, raw, msg -> {
+ Arrays.stream(servers).forEach(s ->
+ vertx.eventBus().publish(s.getConfig().inAddress(), msg.json()));
+ vertx.eventBus().publish(config.outAddress(), msg.json());
+ });
+ }
}
});
startPromise.complete();
@@ -38,13 +46,15 @@ public void stop(Promise stopPromise) {
}
public record Configuration(@JsonProperty String inAddress,
- @JsonProperty String outAddress) implements JsonMessage {
+ @JsonProperty String outAddress,
+ @JsonProperty int broadcasterId) implements JsonMessage {
/**
* For config
*/
+ // The broadcasting-registration is usually done in the client and server verticle.
@SuppressWarnings("unused")
- private Configuration() {
- this(null, null);
+ public Configuration() {
+ this(null, null, Broadcaster.NO_BROADCASTING);
}
}
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
index fea01ed0..94c9b103 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java
@@ -3,8 +3,10 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import de.wuespace.telestion.api.config.Config;
import de.wuespace.telestion.api.message.JsonMessage;
+import de.wuespace.telestion.services.connection.Broadcaster;
import de.wuespace.telestion.services.connection.ConnectionData;
import de.wuespace.telestion.services.connection.IpDetails;
+import de.wuespace.telestion.services.connection.RawMessage;
import io.reactivex.annotations.NonNull;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
@@ -35,7 +37,7 @@
* Sending answers back to the Client if the connections are still open
*
*/
-public final class TcpServer extends AbstractVerticle {
+public final class TcpServer extends BaseTcpVerticle {
/**
* Configuration for this Verticle which can be loaded from a config.
@@ -47,18 +49,21 @@ public final class TcpServer extends AbstractVerticle {
* @param host host on which the Server-Socket should run
* @param port port on which the Server-Socket should run
* @param clientTimeout time until timeout
+ * @param broadcasterId id of the broadcaster you want to use
*/
public final record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
@JsonProperty String host,
@JsonProperty int port,
- @JsonProperty long clientTimeout) implements JsonMessage {
- private Configuration() {
- this(null, null, "0.0.0.0", 0, TcpTimeouts.DEFAULT_TIMEOUT);
- }
-
- public Configuration(@NonNull String inAddress, @NonNull String outAddress, @NonNull String host, int port) {
- this(inAddress, outAddress, host, port, TcpTimeouts.DEFAULT_TIMEOUT);
+ @JsonProperty long clientTimeout,
+ @JsonProperty int broadcasterId) implements JsonMessage {
+ public Configuration() {
+ this(null,
+ null,
+ "0.0.0.0",
+ 0,
+ TcpTimeouts.DEFAULT_TIMEOUT,
+ Broadcaster.DEFAULT_ID);
}
}
@@ -94,11 +99,9 @@ public void start(Promise startPromise) throws Exception {
startPromise.complete();
});
- vertx.eventBus().consumer(config.inAddress, raw -> {
- if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) {
- JsonMessage.on(ConnectionData.class, raw, this::handleMsg);
- }
- });
+ Broadcaster.register(config.broadcasterId(), config.inAddress());
+
+ vertx.eventBus().consumer(config.inAddress, handler -> this.consumer(handler, activeCons));
}
@Override
@@ -169,7 +172,17 @@ private void onConnected(NetSocket netSocket) {
});
}
- private void handleDispatchedMsg(TcpData data) {
+ protected void handleMsg(ConnectionData data) {
+ var details = data.conDetails();
+ if (details instanceof TcpDetails tcpDetails) {
+ handleDispatchedMsg(new TcpData(data.rawData(), tcpDetails));
+ } else { // Shouldn't happen due to Dispatcher
+ logger.warn("Wrong connection detail type received. Packet will be dropped.");
+ // If there will be ever a logger for broken packets, send this
+ }
+ }
+
+ protected void handleDispatchedMsg(TcpData data) {
var details = data.details();
var element = activeCons.get(new IpDetails(details.ip(), details.port()));
@@ -192,16 +205,6 @@ private void handleDispatchedMsg(TcpData data) {
element.write(Buffer.buffer(data.data()));
}
- private void handleMsg(ConnectionData data) {
- var details = data.conDetails();
- if (details instanceof TcpDetails tcpDetails) {
- handleDispatchedMsg(new TcpData(data.rawData(), tcpDetails));
- } else { // Shouldn't happen due to Dispatcher
- logger.warn("Wrong connection detail type received. Packet will be dropped.");
- // If there will be ever a logger for broken packets, send this
- }
- }
-
private Configuration config;
private NetServer server;
private Map activeCons;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
index 53c439bc..c28bb27d 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
@@ -9,7 +9,7 @@ public class TcpTimeouts {
public static final long NO_RESPONSES = -1;
/**
- * Never actively close active tcp connection.
+ * Never actively close active tcp connections.
*/
public static final long NO_TIMEOUT = 0;
From b64a731e1c71a8e60ad08ca38824391649550a54 Mon Sep 17 00:00:00 2001
From: Cedric
Date: Tue, 16 Nov 2021 17:25:30 +0100
Subject: [PATCH 16/16] fix(connection): Make various classes final as they
should be
---
.../de/wuespace/telestion/services/connection/Broadcaster.java | 2 +-
.../de/wuespace/telestion/services/connection/StaticSender.java | 2 +-
.../telestion/services/connection/tcp/TcpDispatcher.java | 2 +-
.../wuespace/telestion/services/connection/tcp/TcpTimeouts.java | 2 +-
4 files changed, 4 insertions(+), 4 deletions(-)
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
index 2cb6f77a..967c58b4 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java
@@ -10,7 +10,7 @@
import java.util.*;
-public class Broadcaster extends AbstractVerticle {
+public final class Broadcaster extends AbstractVerticle {
public final static int NO_BROADCASTING = -1;
public final static int DEFAULT_ID = 0;
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
index a1fa268c..76ff4436 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java
@@ -8,7 +8,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StaticSender extends AbstractVerticle {
+public final class StaticSender extends AbstractVerticle {
public record Configuration(@JsonProperty String inAddress,
@JsonProperty String outAddress,
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
index 156706b0..499a45af 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java
@@ -10,7 +10,7 @@
import java.text.BreakIterator;
import java.util.Arrays;
-public class TcpDispatcher extends AbstractVerticle {
+public final class TcpDispatcher extends AbstractVerticle {
@Override
public void start(Promise startPromise) {
diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
index c28bb27d..7f01e37c 100644
--- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
+++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java
@@ -2,7 +2,7 @@
import java.time.Duration;
-public class TcpTimeouts {
+public final class TcpTimeouts {
/**
* Close tcp connection after first received packet.
*/