diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eb98185..2e89c58b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -106,7 +106,7 @@ ### Features * **api:** Add new configuration provider which enables default values ([a973cd1](https://github.com/wuespace/telestion-core/commit/a973cd1f0d30513bcfcae655db156138f74b145a)) -* **connection:** Add configuration of baud rate to `SerialConn` verticle. Usages of the `de.wuespace.telestion.services.connection.rework.serial.SerialConn` now requires an additional parameter: `int baudRate`. To migrate, add this parameter to any application configuration using this verticle. The value used before is `9600`. Therefore, you can use `"baudRate": 9600` to match the old default configuration. ([ae4dad2](https://github.com/wuespace/telestion-core/commit/ae4dad2c9732047551ea74cca5b35b45bfd47f83)) +* **connection:** Add configuration of baud rate to `SerialConn` verticle. Usages of the `de.wuespace.telestion.services.connection.serial.SerialConn` now requires an additional parameter: `int baudRate`. To migrate, add this parameter to any application configuration using this verticle. The value used before is `9600`. Therefore, you can use `"baudRate": 9600` to match the old default configuration. ([ae4dad2](https://github.com/wuespace/telestion-core/commit/ae4dad2c9732047551ea74cca5b35b45bfd47f83)) ## [0.3.0](https://github.com/wuespace/telestion-core/compare/v0.2.1...v0.3.0) (2021-05-08) diff --git a/modules/telestion-api/build.gradle b/modules/telestion-api/build.gradle index 4a4c82e6..047d2db9 100644 --- a/modules/telestion-api/build.gradle +++ b/modules/telestion-api/build.gradle @@ -1,6 +1,6 @@ application { // Define the main class for the application. - mainClass.set('de.wuespace.telestion.application.Application') + mainClass.set('de.wuespace.telestion.application.Telestion') } java { diff --git a/modules/telestion-application/build.gradle b/modules/telestion-application/build.gradle index 93360fd1..d3266cb3 100644 --- a/modules/telestion-application/build.gradle +++ b/modules/telestion-application/build.gradle @@ -1,6 +1,6 @@ application { // Define the main class for the application. - mainClass.set('de.wuespace.telestion.application.Application') + mainClass.set('de.wuespace.telestion.application.Telestion') } java { diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java deleted file mode 100644 index 3f09e9ac..00000000 --- a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Application.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.wuespace.telestion.application; - -//import org.telestion.example.RandomPositionPublisher; -//import org.telestion.launcher.launcher.Launcher; - -/** - * Starts the Telestion-Project as a standalone Application. - * - * @author Jan von Pichowski, Cedric Boes - * @version 1.0 - */ -public class Application { - - /** - * Calls the Launcher for a specific Testcase (at the moment).
- * Real functionality will be added later. - * - * @param args unused at the moment - */ - public static void main(String[] args) { - /*Launcher.start(new MessageLogger(), new RandomPositionPublisher(), - new EventbusTcpBridge("localhost", 9870, - Collections.singletonList(Address.incoming(DataService.class, "find")), - Collections.singletonList(Address.outgoing(RandomPositionPublisher.class, "MockPos"))), - new WebServer(8080), - new DataService(), - new MongoDatabaseService("raketenpraktikum", "raketenpraktikumPool"));*/ - } - -} diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java index 8004a37d..b275e0f5 100644 --- a/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java +++ b/modules/telestion-application/src/main/java/de/wuespace/telestion/application/Telestion.java @@ -5,7 +5,9 @@ import io.vertx.core.DeploymentOptions; import io.vertx.core.Promise; import io.vertx.core.Vertx; + import java.util.Collections; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import de.wuespace.telestion.services.config.Configuration; @@ -16,9 +18,6 @@ * @author Jan von Pichowski */ public final class Telestion extends AbstractVerticle { - - private static final Logger logger = LoggerFactory.getLogger(Telestion.class); - /** * Deploys this Telestion verticle. * @@ -38,18 +37,30 @@ public void start(Promise startPromise) { startPromise.fail(configRes.cause()); return; } - var conf = configRes.result().getJsonObject("org.telestion.configuration").mapTo(Configuration.class); - conf.verticles().stream().flatMap(c -> Collections.nCopies(c.magnitude(), c).stream()).forEach(v -> { - logger.info("Deploying {}", v.name()); - var future = vertx.deployVerticle(v.verticle(), new DeploymentOptions().setConfig(v.jsonConfig())); - future.onFailure(Throwable::printStackTrace); - }); + + var conf = configRes + .result() + .getJsonObject("de.wuespace.telestion.configuration") + .mapTo(Configuration.class); + + conf.verticles().stream() + .flatMap(c -> Collections.nCopies(c.magnitude(), c).stream()).forEach(v -> { + logger.info("Deploying {}", v.name()); + var future = vertx.deployVerticle( + v.verticle(), + new DeploymentOptions().setConfig(v.jsonConfig()) + ); + future.onFailure(Throwable::printStackTrace); + }); + startPromise.complete(); }); } @Override - public void stop(Promise stopPromise) throws Exception { - + public void stop(Promise stopPromise) { + stopPromise.complete(); } + + private static final Logger logger = LoggerFactory.getLogger(Telestion.class); } diff --git a/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java b/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java index 72aac043..3ff05278 100644 --- a/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java +++ b/modules/telestion-application/src/main/java/de/wuespace/telestion/launcher/Launcher.java @@ -1,9 +1,13 @@ package de.wuespace.telestion.launcher; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; import io.vertx.core.Verticle; import io.vertx.core.Vertx; + import java.time.Duration; import java.util.Arrays; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,19 +40,13 @@ public static void main(String[] args) { public static void start(String... verticleNames) { logger.info("Deploying {} verticles", verticleNames.length); var vertx = Vertx.vertx(); - // vertx.eventBus().registerDefaultCodec(Position.class, JsonMessageCodec.Instance(Position.class)); - Arrays.stream(verticleNames).forEach(verticleName -> { - logger.info("Deploying verticle {}", verticleName); - vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), timerId -> { - vertx.deployVerticle(verticleName, res -> { - if (res.failed()) { - logger.error("Failed to deploy verticle {} retrying in 5s", verticleName, res.cause()); - return; - } - logger.info("Deployed verticle {} with id {}", verticleName, res.result()); - vertx.cancelTimer(timerId); - }); - }); + + Arrays.stream(verticleNames).forEach(name -> { + logger.info("Deploying verticle {}", name); + vertx.setPeriodic( + Duration.ofSeconds(5).toMillis(), + timerId -> vertx.deployVerticle(name, deploymentHandler(vertx, name, timerId)) + ); }); } @@ -61,19 +59,27 @@ public static void start(String... verticleNames) { public static void start(Verticle... verticles) { logger.info("Deploying {} verticles", verticles.length); var vertx = Vertx.vertx(); - // vertx.eventBus().registerDefaultCodec(Position.class, JsonMessageCodec.Instance(Position.class)); - Arrays.stream(verticles).forEach(verticleName -> { - logger.info("Deploying verticle {}", verticleName); - vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), timerId -> { - vertx.deployVerticle(verticleName, res -> { - if (res.failed()) { - logger.error("Failed to deploy verticle {} retrying in 5s", verticleName, res.cause()); - return; - } - logger.info("Deployed verticle {} with id {}", verticleName, res.result()); - vertx.cancelTimer(timerId); - }); - }); + + Arrays.stream(verticles).forEach(instance -> { + logger.info("Deploying verticle {}", instance); + vertx.setPeriodic( + Duration.ofSeconds(5).toMillis(), + timerId -> vertx.deployVerticle( + instance, + deploymentHandler(vertx, instance.getClass().getName(), timerId) + ) + ); }); } + + private static Handler> deploymentHandler(Vertx vertx, String verticle, Long timerId) { + return res -> { + if (res.failed()) { + logger.error("Failed to deploy verticle {} retrying in 5s", verticle, res.cause()); + return; + } + logger.info("Deployed verticle {} with id {}", verticle, res.result()); + vertx.cancelTimer(timerId); + }; + } } diff --git a/modules/telestion-services/build.gradle b/modules/telestion-services/build.gradle index 8ca8e652..817fc9f0 100644 --- a/modules/telestion-services/build.gradle +++ b/modules/telestion-services/build.gradle @@ -1,6 +1,6 @@ application { // Define the main class for the application. - mainClass.set('de.wuespace.telestion.application.Application') + mainClass.set('de.wuespace.telestion.application.Telestion') } java { @@ -19,8 +19,6 @@ ext { description = 'Services and service components that are re-usable in any Telestion Project Application' dependencies { - // implementation name: 'engine' - implementation project(':modules:telestion-api') implementation group: 'com.fazecast', name: 'jSerialComm', version: '2.9.1' diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java new file mode 100644 index 00000000..967c58b4 --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Broadcaster.java @@ -0,0 +1,103 @@ +package de.wuespace.telestion.services.connection; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.api.message.JsonMessage; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.Promise; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +public final class Broadcaster extends AbstractVerticle { + + public final static int NO_BROADCASTING = -1; + public final static int DEFAULT_ID = 0; + + public final record Configuration(@JsonProperty + String inAddress, + @JsonProperty + int id) implements JsonMessage { + public Configuration() { + this(null, DEFAULT_ID); + } + } + + @Override + public void start(Promise startPromise) { + this.config = Config.get(this.config, new Configuration(), this.config(), Configuration.class); + + if (broadcasterMap.containsKey(this.config.id())) { + startPromise.fail("The broadcasters id #%d is already taken!".formatted(this.config.id())); + return; + } + + broadcasterMap.put(this.config.id(), this); + + startPromise.complete(); + } + + @Override + public void stop(Promise stopPromise) { + stopPromise.complete(); + } + + /** + * Registers a new address for the broadcaster with the given id if it was previously specified in the config. + * + * @param broadcasterId 0 = default broadcaster, must be >= 0 + * @param address Address on VertX bus it must be sent to + * @return if registration process was successful + */ + public static boolean register(int broadcasterId, String address) { + if (broadcasterId == NO_BROADCASTING) { + return true; + } + + if (!broadcasterMap.containsKey(broadcasterId) || broadcasterId < 0) { + logger.error("Setup invalid!"); + return false; + } + + var broadcaster = broadcasterMap.get(broadcasterId); + broadcaster.addressList.add(address); + + broadcaster.getVertx().eventBus() + .consumer(broadcaster.config.inAddress(), raw -> { + if (!JsonMessage.on(RawMessage.class, raw, broadcaster::send)) { + if (!JsonMessage.on(SenderData.class, + raw, + msg -> broadcaster.send(new RawMessage(msg.rawData())))) { + JsonMessage.on(ConnectionData.class, + raw, + msg -> broadcaster.send(new RawMessage(msg.rawData()))); + } + } + }); + return true; + } + + public String[] getAddresses() { + return addressList.toArray(String[]::new); + } + + public Broadcaster() { + this(null); + } + + public Broadcaster(Configuration config) { + this.config = config; + this.addressList = new HashSet<>(); + } + + private void send(RawMessage msg) { + addressList.forEach(addr -> vertx.eventBus().publish(addr, msg.json())); + } + + private Configuration config; + private final Set addressList; + + private static final Map broadcasterMap = new HashMap<>(); + private static Logger logger = LoggerFactory.getLogger(Broadcaster.class); +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java similarity index 95% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java index ff73190d..d340a130 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionData.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionData.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java similarity index 80% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java index 527d5d99..93b62f42 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/ConnectionDetails.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/ConnectionDetails.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonTypeInfo; import de.wuespace.telestion.api.message.JsonMessage; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java new file mode 100644 index 00000000..e5463779 --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/IpDetails.java @@ -0,0 +1,13 @@ +package de.wuespace.telestion.services.connection; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; + +public record IpDetails(@JsonProperty + String ip, + @JsonProperty + int port) implements JsonMessage { + public IpDetails() { + this("0.0.0.0", 0); + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java similarity index 72% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java index c79cd109..25667d32 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/RawMessage.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/RawMessage.java @@ -1,10 +1,9 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; public record RawMessage(@JsonProperty byte[] data) implements JsonMessage { - @SuppressWarnings("unused") private RawMessage() { this(null); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java similarity index 96% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java index c5a2c0bb..a011f0a9 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Receiver.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Receiver.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import io.vertx.core.AbstractVerticle; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java similarity index 96% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java index d076a328..a648d3b7 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Sender.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/Sender.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java similarity index 95% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java index 3df45160..e53f5ea5 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/SenderData.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SenderData.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java deleted file mode 100644 index 5ec5a6c6..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialData.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.wuespace.telestion.services.connection; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.api.message.JsonMessage; - -@Deprecated(since = "v0.1.3", forRemoval = true) -public record SerialData(@JsonProperty byte[] data) implements JsonMessage { - - @SuppressWarnings("unused") - private SerialData(){ - this(null); - } -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java similarity index 70% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java index 3e52dca8..76ff4436 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/StaticSender.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/StaticSender.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework; +package de.wuespace.telestion.services.connection; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; @@ -8,19 +8,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class StaticSender extends AbstractVerticle { +public final class StaticSender extends AbstractVerticle { + + public record Configuration(@JsonProperty String inAddress, + @JsonProperty String outAddress, + @JsonProperty ConnectionDetails staticDetails) implements JsonMessage { + private Configuration() { + this(null, null, null); + } + } @Override public void start(Promise startPromise) { config = Config.get(config, config(), Configuration.class); - vertx.eventBus().consumer(config.inAddress(), raw -> { - JsonMessage.on(RawMessage.class, raw, msg -> { - logger.debug("Sending static message"); - vertx.eventBus().publish(config.outAddress(), new ConnectionData(msg.data(), - config.staticDetails()).json()); - }); - }); + vertx.eventBus().consumer(config.inAddress(), raw -> JsonMessage.on(RawMessage.class, raw, msg -> { + logger.debug("Sending static message"); + vertx.eventBus().publish(config.outAddress(), new ConnectionData(msg.data(), + config.staticDetails()).json()); + })); startPromise.complete(); } @@ -30,16 +36,6 @@ public void stop(Promise stopPromise) { stopPromise.complete(); } - public record Configuration(@JsonProperty String inAddress, - @JsonProperty String outAddress, - @JsonProperty ConnectionDetails staticDetails) implements JsonMessage { - - @SuppressWarnings("unused") - private Configuration() { - this(null, null, null); - } - } - public StaticSender() { this(null); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java similarity index 89% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java index 949cfaec..52797ccd 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/SerialConn.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialConn.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection; +package de.wuespace.telestion.services.connection.legacy; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.services.monitoring.MessageLogger; @@ -15,6 +15,13 @@ import de.wuespace.telestion.api.message.JsonMessage; import com.fazecast.jSerialComm.*; +/** + * Implements a serial connection like UART. This module is part of the old connection api which is planned to be + * removed in v0.7. + * + * @author Jan von Pichowski (jvpichowski) + * @deprecated will be removed in v0.7 + */ @Deprecated(since = "v0.1.3", forRemoval = true) public final class SerialConn extends AbstractVerticle { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java new file mode 100644 index 00000000..92bba02d --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/SerialData.java @@ -0,0 +1,21 @@ +package de.wuespace.telestion.services.connection.legacy; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.message.JsonMessage; + +/** + * Connection data from the serial connection ({@link SerialConn}) which will be serialized into json to put it on the + * vertx event bus.
+ * It is part of the old connection api and will be removed in v0.7. + * + * @author Jan von Pichowski (jvpichowski) + * @deprecated will be removed in v0.7 + */ +@Deprecated(since = "v0.1.3", forRemoval = true) +public record SerialData(@JsonProperty byte[] data) implements JsonMessage { + + @SuppressWarnings("unused") + private SerialData(){ + this(null); + } +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java similarity index 96% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java index e9e60ea4..50d854d3 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/TcpConn.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/TcpConn.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection; +package de.wuespace.telestion.services.connection.legacy; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; @@ -19,7 +19,11 @@ * This class opens a tcp connection. This could either be a tcp client to a host or a host which accepts new clients. * It is configured with the file based config pattern. After a connection is established the {@link Participant} is * published on the event bus. Incoming messages are published to the event bus too. The connection listens to the event - * bus and send incoming messages over tcp to the participant. All addresses are defined in the configuration. + * bus and send incoming messages over tcp to the participant. All addresses are defined in the configuration.
+ * It is planned to be removed in v0.7. + * + * @author Jan von Pichowski (jvpichowski) + * @deprecated will be removed in v0.7 */ @Deprecated(since = "v0.1.3", forRemoval = true) public final class TcpConn extends AbstractVerticle { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java new file mode 100644 index 00000000..de0c9dce --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/legacy/package-info.java @@ -0,0 +1,22 @@ +/** + *

Old connection api

+ *

+ * Classes from the old connection api.
+ * The old connection api contains the following network interfaces: + *

+ *
    + *
  • Serial ({@link de.wuespace.telestion.services.connection.legacy.SerialConn})
  • + *
  • Tcp ({@link de.wuespace.telestion.services.connection.legacy.TcpConn})
  • + *
+ *

+ * Disclaimer:
+ * Classes within this package are all marked as deprecated and will be removed soon. + * There will be no new features and bugs will not be fixed.
+ *

+ * + * @since 0.6 + * @author Cedric Boes + * @version 1.0 + * @deprecated the old connection api was replaced with the new connection api and will be removed in v0.7 + */ +package de.wuespace.telestion.services.connection.legacy; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java new file mode 100644 index 00000000..8d5e9e5f --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/package-info.java @@ -0,0 +1,91 @@ +/** + *

Connection API

+ * + *

Short description:

+ *

+ * This module contains the connection api.
+ * It provides a simplified interface for the most commonly used network interfaces in ground-stations and is + * optimized for the VertX framework. + *

+ * + *

Supported network interfaces:

+ *
    + *
  • TCP
  • + *
  • UDP
  • + *
  • Serial connection (e.g. UART)
  • + *
+ * + *

General Concepts:

+ *

+ * The general concept of the connection api is to allow verticles to implement networking code without knowing the + * specifics of the interface itself. This is achieved by the following concepts: + *

+ *
    + *
  • Sender-Api
  • + *
  • Receiver-Api
  • + *
  • Static-Sending
  • + *
  • Special Concepts
  • + *
  • Manual Mode
  • + *
+ * + *

{@link de.wuespace.telestion.services.connection.Sender Sender-Api}:

+ *

+ * {@link de.wuespace.telestion.services.connection.ConnectionData} or + * {@link de.wuespace.telestion.services.connection.SenderData} sent to this class will be rerouted to the + * designated network interface or their dispatchers. If a connection is not, yet, established the default + * behaviour is to try to create a new connection to the target. + *

+ *

+ * + * Note:
+ * This can fail and the package can be dropped if the targeted network interface controller does not support + * informing about failed packet delivery! + *
+ *

+ * + *

{@link de.wuespace.telestion.services.connection.Receiver Receiver-Api}:

+ *

+ * If this interface is used all incoming {@link de.wuespace.telestion.services.connection.ConnectionData packages} + * will be routed to its output address.
+ * This allows parsers to listen on multiple network connections at once. Answers then can be sent to the + * Sender-Api again. + *

+ * + *

{@link de.wuespace.telestion.services.connection.StaticSender Static-Sending}:

+ *

+ * In cases where the receiver is already known at start-up, the + * {@link de.wuespace.telestion.services.connection.StaticSender} which can be fed with + * {@link de.wuespace.telestion.services.connection.RawMessage RawMessages} which do not require network interface + * information.
+ * The static sender will automatically send the packages to the connection specified in its config. + *

+ * + *

Special Concepts:

+ *

+ * There are more complex concepts than the previously described ones. This allows for more advanced or network + * specific structures. + *

+ *
Connection established:
+ *

+ * There are cases when one needs to know when a new connection has been established. In this case the XXXX will + * yield the {@link de.wuespace.telestion.services.connection.ConnectionDetails} specifying the connection. + *

+ * + *
{@link de.wuespace.telestion.services.connection.Broadcaster Broadcasting}:
+ *

+ * To send information to all active network interfaces (which support broadcasting), a + * {@link de.wuespace.telestion.services.connection.Broadcaster} verticle needs to be added. + *

+ * + *

Manual Mode:

+ *

+ * In rare cases where the previously described abstractions cannot be applied, + * there is still the possibility to use the network interfaces directly. For this refer to the package + * descriptions of the designated packages. + *

+ * + * @since 0.2.0 + * @version 1.0 + * @author Cedric Boes (cb0s) + */ +package de.wuespace.telestion.services.connection; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java deleted file mode 100644 index dc7fd27e..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/Tuple.java +++ /dev/null @@ -1,29 +0,0 @@ -package de.wuespace.telestion.services.connection.rework; - -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * A simple implementation of a Value-Pair whose values are linked together without any key - value relationship. - * - * @param Type 1 - * @param Type 2 - * - * @param value1 Value of Type 1 - * @param value2 Value of Type 2 - * - * @author Cedric Boes - * @version 1.0 - */ -public record Tuple( - @JsonProperty T1 value1, - @JsonProperty T2 value2) { - - /** - * This is only for reflection of Config-Loading! - */ - @SuppressWarnings("unused") - private Tuple() { - this(null, null); - } - -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java deleted file mode 100644 index f4bec650..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpClient.java +++ /dev/null @@ -1,146 +0,0 @@ -package de.wuespace.telestion.services.connection.rework.tcp; - -import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.api.config.Config; -import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.connection.rework.ConnectionData; -import de.wuespace.telestion.services.connection.rework.Tuple; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Promise; -import io.vertx.core.buffer.Buffer; -import io.vertx.core.net.NetClient; -import io.vertx.core.net.NetClientOptions; -import io.vertx.core.net.NetSocket; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.concurrent.atomic.AtomicInteger; - -public final class TcpClient extends AbstractVerticle { - - @Override - public void start(Promise startPromise) { - config = Config.get(config, config(), Configuration.class); - - var options = new NetClientOptions(); - options.setIdleTimeout(config.timeout() == TcpTimeouts.NO_RESPONSES - ? (int) TcpTimeouts.NO_TIMEOUT : (int) config.timeout()); - currentClient = vertx.createNetClient(options); - - activeClients = new HashMap<>(); - - vertx.eventBus().consumer(config.inAddress(), raw -> { - if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) { - JsonMessage.on(ConnectionData.class, raw, this::handleMsg); - } - }); - - startPromise.complete(); - } - - @Override - public void stop(Promise stopPromise) { - stopPromise.complete(); - } - - public record Configuration(@JsonProperty String inAddress, - @JsonProperty String outAddress, - @JsonProperty long timeout) implements JsonMessage { - /** - * For json - */ - @SuppressWarnings("unused") - private Configuration() { - this(null, null, 0L); - } - } - - public TcpClient() { - this(null); - } - - public TcpClient(Configuration config) { - this.config = config; - } - - private void handleDispatchedMsg(TcpData tcpData) { - var details = tcpData.details(); - - // Checks if socket already exists, if not connects to Server and if successful sends data asynchronously - vertx.executeBlocking(handler -> { - if (!activeClients.containsKey(new Tuple<>(details.ip(), details.port()))) { - currentClient.connect(details.port(), details.ip(), h -> { - if (h.succeeded()) { - onConnected(h.result()); - handler.complete(); - } else { - logger.error("Failed to create new NetClient with {}:{}:", details.ip(), details.port(), - h.cause()); - handler.fail(h.cause()); - } - }); - } else { - handler.complete(); - } - }, handler -> { - if (handler.succeeded()) { - logger.debug("Sending data to {}:{}", details.ip(), details.port()); - activeClients.get(new Tuple<>(details.ip(), details.port())).write(Buffer.buffer(tcpData.data())); - } else { - logger.warn("Due to an error the packet to {}:{} will be dropped", details.ip(), details.port()); - } - }); - } - - /** - * Called when a new socket is opened. - * - * @param socket the newly connected socket - */ - private void onConnected(NetSocket socket) { - var ip = socket.remoteAddress().host(); - var port = socket.remoteAddress().port(); - var key = new Tuple<>(ip, port); - - activeClients.put(key, socket); - - logger.info("Connection established ({}:{})", ip, port); - - var packetIdCounter = new AtomicInteger(0); - - socket.handler(buffer -> { - var packetId = packetIdCounter.getAndIncrement(); - logger.debug("New message received from Server ({}:{}, packetId={})", ip, port, packetId); - vertx.eventBus().publish(config.outAddress(), new ConnectionData(buffer.getBytes(), new TcpDetails(ip, port, - packetId)).json()); - }); - - socket.exceptionHandler(error -> { - logger.error("Encountered an unexpected error (Server: {}:{})", ip, port, - error); - activeClients.remove(key); - }); - - socket.closeHandler(handler -> { - logger.info("Closing remote connection with Server ({}:{})", ip, port); - activeClients.remove(key); - }); - } - - private void handleMsg(ConnectionData data) { - var details = data.conDetails(); - if (details instanceof TcpDetails tcpDet) { - handleDispatchedMsg(new TcpData(data.rawData(), tcpDet)); - } else { // Shouldn't happen due to Dispatcher - logger.warn("Wrong connection detail type received. Packet will be dropped."); - // If there will be ever a logger for broken packets, send this - } - } - - private Configuration config; - private HashMap, NetSocket> activeClients; - private NetClient currentClient; - - private static final Logger logger = LoggerFactory.getLogger(TcpClient.class); -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java deleted file mode 100644 index 135c082e..00000000 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpTimeouts.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.wuespace.telestion.services.connection.rework.tcp; - -import java.time.Duration; - -public class TcpTimeouts { - public static final long NO_RESPONSES = -1; // Close tcp connection after first received packet - public static final long NO_TIMEOUT = 0; // Never actively close active tcp connection - public static final long DEFAULT_TIMEOUT = Duration.ofSeconds(30).toMillis(); // 30 secs. -} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java similarity index 82% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java index 11f3bce8..6db783b2 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialConn.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialConn.java @@ -1,11 +1,12 @@ -package de.wuespace.telestion.services.connection.rework.serial; +package de.wuespace.telestion.services.connection.serial; import com.fasterxml.jackson.annotation.JsonProperty; import com.fazecast.jSerialComm.SerialPort; import de.wuespace.telestion.api.config.Config; import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.connection.rework.ConnectionData; -import de.wuespace.telestion.services.connection.rework.SenderData; +import de.wuespace.telestion.services.connection.Broadcaster; +import de.wuespace.telestion.services.connection.ConnectionData; +import de.wuespace.telestion.services.connection.SenderData; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import org.slf4j.Logger; @@ -22,7 +23,7 @@ public final class SerialConn extends AbstractVerticle { @Override public void start(Promise startPromise) { - config = Config.get(config, config(), Configuration.class); + config = Config.get(config, new Configuration(), config(), Configuration.class); logger.info("Opening Connection on {}", config.serialPort()); @@ -33,6 +34,8 @@ public void start(Promise startPromise) { } // TODO: Add more feature-rich implementation using serialPort.setComPortParameters() + Broadcaster.register(config.broadcasterId(), config.inAddress()); + // In vertx.setPeriodic(config.sampleTime(), event -> { try { @@ -80,14 +83,11 @@ public record Configuration(@JsonProperty String inAddress, @JsonProperty String outAddress, @JsonProperty String serialPort, @JsonProperty int baudRate, - @JsonProperty long sampleTime) implements JsonMessage { + @JsonProperty long sampleTime, + @JsonProperty int broadcasterId) implements JsonMessage { @SuppressWarnings("unused") - private Configuration() { - this(null, null, null, 9600, 0L); - } - - public Configuration(String inAddress, String outAddress, String serialPort) { - this(inAddress, outAddress, serialPort, 9600, 100); + public Configuration() { + this(null, null, null, 9_600, 0L, Broadcaster.NO_BROADCASTING); } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java similarity index 70% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java index 697daccf..52dbc13f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/serial/SerialDetails.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/serial/SerialDetails.java @@ -1,7 +1,7 @@ -package de.wuespace.telestion.services.connection.rework.serial; +package de.wuespace.telestion.services.connection.serial; import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.services.connection.rework.ConnectionDetails; +import de.wuespace.telestion.services.connection.ConnectionDetails; /** * diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java new file mode 100644 index 00000000..d33fdd5a --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/BaseTcpVerticle.java @@ -0,0 +1,30 @@ +package de.wuespace.telestion.services.connection.tcp; + +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.services.connection.ConnectionData; +import de.wuespace.telestion.services.connection.IpDetails; +import de.wuespace.telestion.services.connection.RawMessage; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.eventbus.Message; + +import java.util.Map; + +public abstract class BaseTcpVerticle extends AbstractVerticle { + + protected void consumer(Message raw, Map activeClients) { + if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) { + if (!JsonMessage.on(ConnectionData.class, raw, this::handleMsg)) { + // Broadcasting + JsonMessage.on(RawMessage.class, raw, + // Why not send directly? Well, we want logging and potential future updates + msg -> activeClients.keySet().forEach( + k -> this.handleDispatchedMsg( + new TcpData(msg.data(), + TcpDetails.fromIpDetails(k))))); + } + } + } + + protected abstract void handleMsg(ConnectionData data); + protected abstract void handleDispatchedMsg(TcpData tcpData); +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java new file mode 100644 index 00000000..54742fbf --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpClient.java @@ -0,0 +1,166 @@ +package de.wuespace.telestion.services.connection.tcp; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.wuespace.telestion.api.config.Config; +import de.wuespace.telestion.api.message.JsonMessage; +import de.wuespace.telestion.services.connection.Broadcaster; +import de.wuespace.telestion.services.connection.ConnectionData; +import de.wuespace.telestion.services.connection.IpDetails; +import de.wuespace.telestion.services.connection.RawMessage; +import io.reactivex.annotations.NonNull; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Promise; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public final class TcpClient extends BaseTcpVerticle { + + public final record Configuration(@JsonProperty String inAddress, + @JsonProperty String outAddress, + @JsonProperty long timeout, + @JsonProperty int broadcasterId) implements JsonMessage { + public Configuration() { + this(null, null, TcpTimeouts.DEFAULT_TIMEOUT, Broadcaster.DEFAULT_ID); + } + } + + @Override + public void start(Promise startPromise) { + config = Config.get(config, new Configuration(), config(), Configuration.class); + + var options = new NetClientOptions(); + options.setIdleTimeout(config.timeout() == TcpTimeouts.NO_RESPONSES + ? (int) TcpTimeouts.NO_TIMEOUT : (int) config.timeout()); + + currentClient = vertx.createNetClient(options); + activeClients = new HashMap<>(); + + Broadcaster.register(config.broadcasterId(), config.inAddress()); + + vertx.eventBus().consumer(config.inAddress(), handler -> this.consumer(handler, activeClients)); + startPromise.complete(); + } + + @Override + public void stop(Promise stopPromise) { + var composite = CompositeFuture.all( + activeClients.values().stream().map(NetSocket::close).collect(Collectors.toList()) + ); + + composite.onComplete(result -> { + if (result.failed()) { + stopPromise.fail(result.cause()); + return; + } + + stopPromise.complete(); + }); + } + + public TcpClient() { + this(null); + } + + public TcpClient(Configuration config) { + this.config = config; + } + + protected void handleMsg(ConnectionData data) { + var details = data.conDetails(); + if (details instanceof TcpDetails tcpDet) { + handleDispatchedMsg(new TcpData(data.rawData(), tcpDet)); + } else { // Shouldn't happen due to Dispatcher + logger.warn("Wrong connection detail type received. Packet will be dropped."); + // If there will ever be a logger for broken packets, send this to it + } + } + + protected void handleDispatchedMsg(TcpData tcpData) { + var details = tcpData.details(); + var key = new IpDetails(details.ip(), details.port()); + + // Checks if socket already exists. + // If not, connects to Server and if successful sends data asynchronously. + vertx.executeBlocking(promise -> { + if (activeClients.containsKey(key)) { + promise.complete(); + return; + } + + currentClient.connect(details.port(), details.ip(), result -> { + if (result.succeeded()) { + onConnected(result.result()); + promise.complete(); + return; + } + + logger.error("Failed to create new NetClient with {}:{}:", + details.ip(), details.port(), result.cause()); + promise.fail(result.cause()); + }); + }, result -> { + if (result.failed()) { + logger.warn("Due to an error the packet to {}:{} will be dropped", details.ip(), details.port()); + return; + } + + logger.debug("Sending data to {}:{}", details.ip(), details.port()); + // TcpClient.write() is non-blocking and returns a Future which theoretically could be used to monitor the + // success of this process if needed in the future. (maybe inform if dropped?) + activeClients + .get(new IpDetails(details.ip(), details.port())) + .write(Buffer.buffer(tcpData.data())); + }); + } + + /** + * Called when a new socket is opened. + * + * @param socket the newly connected socket + */ + private void onConnected(NetSocket socket) { + var ip = socket.remoteAddress().host(); + var port = socket.remoteAddress().port(); + var key = new IpDetails(ip, port); + + activeClients.put(key, socket); + logger.info("Connection established ({}:{})", ip, port); + + var packetIdCounter = new AtomicInteger(0); + + socket.handler(buffer -> { + var packetId = packetIdCounter.getAndIncrement(); + logger.debug("New message received from Server ({}:{}, packetId={})", ip, port, packetId); + vertx.eventBus().publish( + config.outAddress(), + new ConnectionData(buffer.getBytes(), new TcpDetails(ip, port, packetId)).json() + ); + }); + + socket.exceptionHandler(error -> { + logger.error("Encountered an unexpected error (Server: {}:{})", ip, port, error); + activeClients.remove(key); + }); + + socket.closeHandler(handler -> { + logger.info("Closing remote connection with Server ({}:{})", ip, port); + activeClients.remove(key); + }); + } + + private Configuration config; + private Map activeClients; + private NetClient currentClient; + + private static final Logger logger = LoggerFactory.getLogger(TcpClient.class); +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java similarity index 84% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java index 69e43cfb..74b89713 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpData.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpData.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework.tcp; +package de.wuespace.telestion.services.connection.tcp; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java similarity index 55% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java index b18fa9ff..adabfd7f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDetails.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDetails.java @@ -1,7 +1,8 @@ -package de.wuespace.telestion.services.connection.rework.tcp; +package de.wuespace.telestion.services.connection.tcp; import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.services.connection.rework.ConnectionDetails; +import de.wuespace.telestion.services.connection.ConnectionDetails; +import de.wuespace.telestion.services.connection.IpDetails; public record TcpDetails(@JsonProperty String ip, @JsonProperty int port, @@ -18,4 +19,8 @@ private TcpDetails() { public TcpDetails(String ip, int port) { this(ip, port, 0); } + + public static TcpDetails fromIpDetails(IpDetails details) { + return new TcpDetails(details.ip(), details.port()); + } } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java similarity index 53% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java index bf84dcf4..499a45af 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpDispatcher.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpDispatcher.java @@ -1,32 +1,40 @@ -package de.wuespace.telestion.services.connection.rework.tcp; +package de.wuespace.telestion.services.connection.tcp; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.connection.rework.ConnectionData; -import de.wuespace.telestion.services.connection.rework.SenderData; -import de.wuespace.telestion.services.connection.rework.Tuple; +import de.wuespace.telestion.services.connection.*; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; +import java.text.BreakIterator; import java.util.Arrays; -public class TcpDispatcher extends AbstractVerticle { +public final class TcpDispatcher extends AbstractVerticle { @Override public void start(Promise startPromise) { - config = Config.get(config, config(), Configuration.class); + config = Config.get(config, new Configuration(), config(), Configuration.class); + + Broadcaster.register(config.broadcasterId(), config.inAddress()); vertx.eventBus().consumer(config.inAddress(), raw -> { if (!JsonMessage.on(SenderData.class, raw, msg -> Arrays.stream(msg.conDetails()) - .filter(det -> det instanceof TcpDetails) - .map(det -> (TcpDetails) det) - .forEach(det -> handle(msg.rawData(), det)))) { - JsonMessage.on(ConnectionData.class, raw, msg -> { + .filter(details -> details instanceof TcpDetails) + .map(details -> (TcpDetails) details) + .forEach(details -> handle(msg.rawData(), details)))) { + if (!JsonMessage.on(ConnectionData.class, raw, msg -> { if (msg.conDetails() instanceof TcpDetails det) { handle(msg.rawData(), det); } - }); + })) { + // Broadcasting + JsonMessage.on(RawMessage.class, raw, msg -> { + Arrays.stream(servers).forEach(s -> + vertx.eventBus().publish(s.getConfig().inAddress(), msg.json())); + vertx.eventBus().publish(config.outAddress(), msg.json()); + }); + } } }); startPromise.complete(); @@ -38,13 +46,15 @@ public void stop(Promise stopPromise) { } public record Configuration(@JsonProperty String inAddress, - @JsonProperty String outAddress) implements JsonMessage { + @JsonProperty String outAddress, + @JsonProperty int broadcasterId) implements JsonMessage { /** * For config */ + // The broadcasting-registration is usually done in the client and server verticle. @SuppressWarnings("unused") - private Configuration() { - this(null, null); + public Configuration() { + this(null, null, Broadcaster.NO_BROADCASTING); } } @@ -59,7 +69,7 @@ public TcpDispatcher(Configuration config, TcpServer... servers) { private void handle(byte[] bytes, TcpDetails details) { for (var server : servers) { - if (server.isActiveCon(new Tuple<>(details.ip(), details.port()))) { + if (server.isActiveCon(new IpDetails(details.ip(), details.port()))) { vertx.eventBus().publish(server.getConfig().inAddress(), new TcpData(bytes, details).json()); } else { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java similarity index 53% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java index f7d445f1..94c9b103 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/tcp/TcpServer.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpServer.java @@ -1,14 +1,14 @@ -package de.wuespace.telestion.services.connection.rework.tcp; +package de.wuespace.telestion.services.connection.tcp; import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.config.Config; import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.connection.rework.ConnectionData; -import de.wuespace.telestion.services.connection.rework.Tuple; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.Promise; +import de.wuespace.telestion.services.connection.Broadcaster; +import de.wuespace.telestion.services.connection.ConnectionData; +import de.wuespace.telestion.services.connection.IpDetails; +import de.wuespace.telestion.services.connection.RawMessage; +import io.reactivex.annotations.NonNull; +import io.vertx.core.*; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; @@ -18,14 +18,16 @@ import java.time.Duration; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** *

- * An implementation of an unencrypted TCP-Server. + * An implementation of an unencrypted TCP-Server. *

* + *

* Features are: *

    *
  • Opening new connections to TCP-Clients
  • @@ -35,80 +37,90 @@ *
  • Sending answers back to the Client if the connections are still open
  • *
*/ -public final class TcpServer extends AbstractVerticle { +public final class TcpServer extends BaseTcpVerticle { + + /** + * Configuration for this Verticle which can be loaded from a config.
+ * An optional timeout can be specified which is the consecutive time without any packets incoming or outgoing after + * which a client will be disconnected. Special timeouts can be found in {@link TcpTimeouts}. + * + * @param inAddress address on which the verticle listens on + * @param outAddress address on which the verticle publishes + * @param host host on which the Server-Socket should run + * @param port port on which the Server-Socket should run + * @param clientTimeout time until timeout + * @param broadcasterId id of the broadcaster you want to use + */ + public final record Configuration(@JsonProperty String inAddress, + @JsonProperty String outAddress, + @JsonProperty String host, + @JsonProperty int port, + @JsonProperty long clientTimeout, + @JsonProperty int broadcasterId) implements JsonMessage { + public Configuration() { + this(null, + null, + "0.0.0.0", + 0, + TcpTimeouts.DEFAULT_TIMEOUT, + Broadcaster.DEFAULT_ID); + } + } @Override public void start(Promise startPromise) throws Exception { - config = Config.get(config, config(), Configuration.class); + config = Config.get(config, new Configuration(), config(), Configuration.class); // Unencrypted -> TODO: Implement functionality for encryption var serverOptions = new NetServerOptions(); - serverOptions.setHost(config.hostAddress()); + serverOptions.setHost(config.host()); serverOptions.setPort(config.port()); serverOptions.setIdleTimeout(config.clientTimeout() == TcpTimeouts.NO_RESPONSES ? (int) TcpTimeouts.NO_TIMEOUT : (int) config.clientTimeout()); serverOptions.setIdleTimeoutUnit(TimeUnit.MILLISECONDS); - activeCons = new HashMap<>(); + activeCons = new HashMap<>(); server = vertx.createNetServer(serverOptions); + + // setup server.connectHandler(this::onConnected); - server.exceptionHandler(handler -> logger.error("Encountered an unexpected error (Config: {})", config.json(), - handler)); - server.listen(h -> complete(h, startPromise, - r -> logger.info("Successfully started. Running on {}:{}", config.hostAddress(), r.actualPort()))); - - vertx.eventBus().consumer(config.inAddress, raw -> { - if (!JsonMessage.on(TcpData.class, raw, this::handleDispatchedMsg)) { - JsonMessage.on(ConnectionData.class, raw, this::handleMsg); + server.exceptionHandler(error -> + logger.error("Encountered an unexpected error (Config: {})", config.json(), error)); + + server.listen(handler -> { + if (handler.failed()) { + logger.warn("Could not start server on {}:{}", config.host(), config.port()); + startPromise.fail(handler.cause()); + return; } + + var port = handler.result().actualPort(); + logger.info("Successfully started. Running on {}:{}", config.host(), port); + startPromise.complete(); }); - startPromise.complete(); + Broadcaster.register(config.broadcasterId(), config.inAddress()); + + vertx.eventBus().consumer(config.inAddress, handler -> this.consumer(handler, activeCons)); } @Override public void stop(Promise stopPromise) throws Exception { - if (server != null) { - activeCons.values().forEach(net -> net.close(handler -> { - if (handler.failed()) { - stopPromise.fail(handler.cause()); - } - })); - logger.info("Closing Server on {}:{}", config.hostAddress(), config.port()); - server.close(); + if (server == null) { + stopPromise.complete(); + return; } - // Wait for all Tcp connections to be closed successfully or fail in the process - vertx.setTimer(Duration.ofSeconds(2).toMillis(), handler -> stopPromise.tryComplete()); - } - /** - * Configuration for this Verticle which can be loaded from a config.
- * An optional timeout can be specified which is the consecutive time without any packets incoming or outgoing after - * which a client will be disconnected. Special timeouts can be found in {@link TcpTimeouts}. - * - * @param inAddress address on which the verticle listens on - * @param outAddress address on which the verticle publishes - * @param hostAddress host-address on which the Server-Socket should run - * @param port port on which the Server-Socket should run - * @param clientTimeout time until timeout - */ - public record Configuration(@JsonProperty String inAddress, - @JsonProperty String outAddress, - @JsonProperty String hostAddress, - @JsonProperty int port, - @JsonProperty long clientTimeout) implements JsonMessage { - - /** - * Used for reflection. - */ - @SuppressWarnings("unused") - private Configuration() { - this(null, null, null, 0, 0L); - } + logger.info("Closing Server on {}:{}", config.host(), config.port()); + server.close(handler -> { + if (handler.failed()) { + logger.warn("Cannot close server on {}:{}", config.host(), config.port()); + stopPromise.fail(handler.cause()); + return; + } - public Configuration(String inAddress, String outAddress, String hostAddress, int port) { - this(inAddress, outAddress, hostAddress, port, TcpTimeouts.DEFAULT_TIMEOUT); - } + stopPromise.complete(); + }); } public TcpServer() { @@ -116,10 +128,6 @@ public TcpServer() { } public TcpServer(Configuration config) { - if (config != null && (config.hostAddress() == null || config.hostAddress().equals(""))) { - config = new Configuration(config.inAddress(), config.outAddress(), "localhost", config.port(), - config.clientTimeout()); - } this.config = config; } @@ -128,15 +136,14 @@ public Configuration getConfig() { } // Public for own Dispatcher implementations - public boolean isActiveCon(Tuple key) { + public boolean isActiveCon(IpDetails key) { return activeCons.containsKey(key); } - // @jvpichowski this is similar to TcpClient#onConnected, do you want to put this into one method? private void onConnected(NetSocket netSocket) { var ip = netSocket.remoteAddress().hostAddress(); var port = netSocket.remoteAddress().port(); - var key = new Tuple<>(ip, port); + var key = new IpDetails(ip, port); logger.info("Connection established with {}:{}", ip, port); activeCons.put(key, netSocket); @@ -165,27 +172,20 @@ private void onConnected(NetSocket netSocket) { }); } - /** - * Completes a promise based on the success of a {@link AsyncResult}. If it was successful a handler will be called. - * - * @param result the result which is observed - * @param promise the promise which will be completed - * @param handler the handler which will be executed on a successful result - * @param the type of the result - */ - private static void complete(AsyncResult result, Promise promise, Handler handler) { - if (result.failed()) { - promise.fail(result.cause()); - return; + protected void handleMsg(ConnectionData data) { + var details = data.conDetails(); + if (details instanceof TcpDetails tcpDetails) { + handleDispatchedMsg(new TcpData(data.rawData(), tcpDetails)); + } else { // Shouldn't happen due to Dispatcher + logger.warn("Wrong connection detail type received. Packet will be dropped."); + // If there will be ever a logger for broken packets, send this } - handler.handle(result.result()); - promise.tryComplete(); } - private void handleDispatchedMsg(TcpData data) { + protected void handleDispatchedMsg(TcpData data) { var details = data.details(); - var element = activeCons.get(new Tuple<>(details.ip(), details.port())); + var element = activeCons.get(new IpDetails(details.ip(), details.port())); // Might be useful to send this to the TCP-Client however the config must be varied for this (future update?) if (element == null) { @@ -205,19 +205,9 @@ private void handleDispatchedMsg(TcpData data) { element.write(Buffer.buffer(data.data())); } - private void handleMsg(ConnectionData data) { - var details = data.conDetails(); - if (details instanceof TcpDetails tcpDet) { - handleDispatchedMsg(new TcpData(data.rawData(), tcpDet)); - } else { // Shouldn't happen due to Dispatcher - logger.warn("Wrong connection detail type received. Packet will be dropped."); - // If there will be ever a logger for broken packets, send this - } - } - private Configuration config; private NetServer server; - private HashMap, NetSocket> activeCons; + private Map activeCons; private static final Logger logger = LoggerFactory.getLogger(TcpServer.class); } diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java new file mode 100644 index 00000000..7f01e37c --- /dev/null +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/tcp/TcpTimeouts.java @@ -0,0 +1,20 @@ +package de.wuespace.telestion.services.connection.tcp; + +import java.time.Duration; + +public final class TcpTimeouts { + /** + * Close tcp connection after first received packet. + */ + public static final long NO_RESPONSES = -1; + + /** + * Never actively close active tcp connections. + */ + public static final long NO_TIMEOUT = 0; + + /** + * The default TCP connection timeout. + */ + public static final long DEFAULT_TIMEOUT = Duration.ofSeconds(30).toMillis(); +} diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java similarity index 62% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java index 5cf3eecb..62beed10 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpConn.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpConn.java @@ -1,4 +1,4 @@ -package de.wuespace.telestion.services.connection.rework.udp; +package de.wuespace.telestion.services.connection.udp; public class UdpConn { public UdpConn() { diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java similarity index 64% rename from modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java rename to modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java index 54ad7070..d5b0c51f 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/rework/udp/UdpDetails.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/connection/udp/UdpDetails.java @@ -1,7 +1,7 @@ -package de.wuespace.telestion.services.connection.rework.udp; +package de.wuespace.telestion.services.connection.udp; import com.fasterxml.jackson.annotation.JsonProperty; -import de.wuespace.telestion.services.connection.rework.ConnectionDetails; +import de.wuespace.telestion.services.connection.ConnectionDetails; public record UdpDetails(@JsonProperty String ip, @JsonProperty int port, diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java index 5a59b8f2..5a70e9f7 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/monitoring/HystrixMetrics.java @@ -38,7 +38,7 @@ public void start(Promise startPromise) throws Exception { logger.info("Started {} with config {}", HystrixMetrics.class.getSimpleName(), config); } - @SuppressWarnings({ "preview", "unused" }) + @SuppressWarnings({"unused" }) private static record Configuration(@JsonProperty int port, @JsonProperty String path) { private Configuration() { this(8080, "/hystrix-metrics"); diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/protocol/.gitkeep b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/protocol/.gitkeep deleted file mode 100644 index e69de29b..00000000 diff --git a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java index 7e02a5d4..d26475e8 100644 --- a/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java +++ b/modules/telestion-services/src/main/java/de/wuespace/telestion/services/web/WebServer.java @@ -17,7 +17,7 @@ */ public final class WebServer extends AbstractVerticle { - private Configuration forcedConfig; + private final Configuration forcedConfig; /** * Creating a WebServer on the specified port. @@ -60,7 +60,7 @@ public void start(Promise startPromise) throws Exception { * * @param port the port to bind to */ - @SuppressWarnings({ "preview", "unused" }) + @SuppressWarnings({"unused" }) private static record Configuration(@JsonProperty int port) { private Configuration() { this(8080); diff --git a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java index af2719de..20a5a2be 100644 --- a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java +++ b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/ConnApiTest.java @@ -2,7 +2,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import de.wuespace.telestion.api.message.JsonMessage; -import de.wuespace.telestion.services.connection.rework.*; +import de.wuespace.telestion.services.connection.*; import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext; diff --git a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java index cfe42ee2..257649d4 100644 --- a/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java +++ b/modules/telestion-services/src/test/java/de/wuespace/telestion/services/connection/TcpConnTest.java @@ -3,6 +3,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import de.wuespace.telestion.services.connection.legacy.TcpConn; import io.vertx.core.Vertx; import io.vertx.junit5.VertxExtension; import io.vertx.junit5.VertxTestContext;