From c370ba25f7bc54b4f37b0d6cc3aa35148dedac5f Mon Sep 17 00:00:00 2001 From: "Petter A. Urkedal" Date: Wed, 30 Aug 2023 10:01:18 +0200 Subject: [PATCH] Support additional Kafka parameters for ftp and dcap. This is an alternative to the configs property prefix for cells which are not implemented in terms of the spring framework. --- .../diskCacheV111/doors/DcapDoorSettings.java | 27 +++++++++++++++++++ .../org/dcache/ftp/door/FtpDoorSettings.java | 27 +++++++++++++++++++ skel/share/defaults/dcap.properties | 3 +++ skel/share/defaults/ftp.properties | 3 +++ skel/share/defaults/kafka.properties | 4 +++ skel/share/services/dcap.batch | 1 + skel/share/services/ftp.batch | 1 + 7 files changed, 66 insertions(+) diff --git a/modules/dcache-dcap/src/main/java/diskCacheV111/doors/DcapDoorSettings.java b/modules/dcache-dcap/src/main/java/diskCacheV111/doors/DcapDoorSettings.java index 4f33f178a2d..6b86cf39184 100644 --- a/modules/dcache-dcap/src/main/java/diskCacheV111/doors/DcapDoorSettings.java +++ b/modules/dcache-dcap/src/main/java/diskCacheV111/doors/DcapDoorSettings.java @@ -26,6 +26,9 @@ import dmg.cells.nucleus.CellAddressCore; import dmg.cells.nucleus.CellEndpoint; import dmg.cells.nucleus.CellPath; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Properties; @@ -42,8 +45,11 @@ import org.dcache.poolmanager.PoolManagerStub; import org.dcache.services.login.RemoteLoginStrategy; import org.dcache.util.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DcapDoorSettings { + private static final Logger LOGGER = LoggerFactory.getLogger(DcapDoorSettings.class); @Option(name = "authorization") protected String auth; @@ -99,6 +105,9 @@ public class DcapDoorSettings { @Option(name = "kafka-clientid") protected String kafkaclientid; + @Option(name = "kafka-config-file") + protected String kafkaConfigFile; + @Option(name = "hsm", description = "Cell address of hsm manager", @@ -332,6 +341,24 @@ public PoolManagerStub createPoolManagerStub(CellEndpoint cellEndpoint, CellAddr public KafkaProducer createKafkaProducer() { Properties props = new Properties(); + if (kafkaConfigFile != null && !kafkaConfigFile.trim().isEmpty()) { + try { + FileInputStream fis = new FileInputStream(kafkaConfigFile); + try { + props.load(fis); + } catch (IOException ex) { + LOGGER.error("failed to load configuration ", ex); + } finally { + try { + fis.close(); + } catch (IOException ex) { + LOGGER.error("failed to close " + kafkaConfigFile, ex); + } + } + } catch (FileNotFoundException ex) { + LOGGER.error(ex.toString()); + } + } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaclientid); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, diff --git a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/FtpDoorSettings.java b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/FtpDoorSettings.java index 5142513e295..bac1282c333 100644 --- a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/FtpDoorSettings.java +++ b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/FtpDoorSettings.java @@ -4,6 +4,9 @@ import dmg.cells.nucleus.CellAddressCore; import dmg.cells.nucleus.CellEndpoint; import dmg.cells.nucleus.CellPath; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; @@ -14,6 +17,8 @@ import org.dcache.util.Option; import org.dcache.util.OptionParser; import org.dcache.util.PortRange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Object holding configuration options for FTP doors. @@ -21,6 +26,7 @@ * Settings can be injected using {@link OptionParser}. */ public class FtpDoorSettings { + private static final Logger LOGGER = LoggerFactory.getLogger(FtpDoorSettings.class); @Option(name = "poolManager", description = "Well known name of the pool manager", @@ -67,6 +73,9 @@ public class FtpDoorSettings { @Option(name = "kafka-clientid") protected String kafkaclientid; + @Option(name = "kafka-config-file") + protected String kafkaConfigFile; + @Option(name = "clientDataPortRange", defaultValue = "0") @@ -388,6 +397,24 @@ public PoolManagerStub createPoolManagerStub(CellEndpoint cellEndpoint, public KafkaProducer createKafkaProducer() { Properties props = new Properties(); + if (kafkaConfigFile != null && !kafkaConfigFile.trim().isEmpty()) { + try { + FileInputStream fis = new FileInputStream(kafkaConfigFile); + try { + props.load(fis); + } catch (IOException ex) { + LOGGER.error("failed to load configuration ", ex); + } finally { + try { + fis.close(); + } catch (IOException ex) { + LOGGER.error("failed to close " + kafkaConfigFile, ex); + } + } + } catch (FileNotFoundException ex) { + LOGGER.error(ex.toString()); + } + } props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaclientid); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, diff --git a/skel/share/defaults/dcap.properties b/skel/share/defaults/dcap.properties index 2f633054026..89bcc78286e 100644 --- a/skel/share/defaults/dcap.properties +++ b/skel/share/defaults/dcap.properties @@ -221,3 +221,6 @@ dcap.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers} # Kafka topic name dcap.kafka.topic = ${dcache.kafka.topic} + +# File from which to load additional Kafka properties. +dcap.kafka.config-file = ${dcache.kafka.config-file} diff --git a/skel/share/defaults/ftp.properties b/skel/share/defaults/ftp.properties index dd25cc25817..faf7a47f872 100644 --- a/skel/share/defaults/ftp.properties +++ b/skel/share/defaults/ftp.properties @@ -396,3 +396,6 @@ ftp.kafka.bootstrap-servers = ${dcache.kafka.bootstrap-servers} # Kafka topic name ftp.kafka.topic = ${dcache.kafka.topic} + +# File from which to load additional Kafka properties. +ftp.kafka.config-file = ${dcache.kafka.config-file} diff --git a/skel/share/defaults/kafka.properties b/skel/share/defaults/kafka.properties index e947533ac19..4402eba82a2 100644 --- a/skel/share/defaults/kafka.properties +++ b/skel/share/defaults/kafka.properties @@ -24,6 +24,10 @@ dcache.kafka.bootstrap-servers = localhost:9092 # Kafka topic name dcache.kafka.topic = billing +# Optional file from which to load additional Kafka properties for ftp and +# dcap. For other services, use dcache.kafka.configs. +dcache.kafka.config-file = + #--------------------Default Properties------------------------------ diff --git a/skel/share/services/dcap.batch b/skel/share/services/dcap.batch index 795d2e97d23..b2d489da092 100644 --- a/skel/share/services/dcap.batch +++ b/skel/share/services/dcap.batch @@ -119,6 +119,7 @@ create dmg.cells.services.login.LoginManager ${dcap.cell.name} \ -kafka-max-block-units=${dcap.kafka.maximum-block.unit}\ -retries-kafka=0 \ -kafka-clientid=\"${dcap.cell.name}\" \ + -kafka-config-file=\"${dcap.kafka.config-file}\" \ -stageConfigurationFilePath=\"${dcap.authz.staging}\" \ -allowAnonymousStaging=\"${dcap.authz.anonymous-staging}\" \ -io-queue=${dcap.mover.queue} \ diff --git a/skel/share/services/ftp.batch b/skel/share/services/ftp.batch index 2b818f4694b..3013c7c77c9 100644 --- a/skel/share/services/ftp.batch +++ b/skel/share/services/ftp.batch @@ -94,6 +94,7 @@ create dmg.cells.services.login.LoginManager ${ftp.cell.name} \ -billing=\"${ftp.service.billing}\" \ -kafka=\"${ftp.enable.kafka}\" \ -kafka-clientid=\"${ftp.cell.name}\" \ + -kafka-config-file=\"${ftp.kafka.config-file}\" \ -bootstrap-server-kafka=\"${ftp.kafka.bootstrap-servers}\" \ -kafka-topic=\"${ftp.kafka.topic}\" \ -kafka-max-block=${ftp.kafka.maximum-block}\