Skip to content

Commit

Permalink
chore: improve kafka startup logging
Browse files Browse the repository at this point in the history
Signed-off-by: Lukáš Zapletal <[email protected]>
  • Loading branch information
lzap committed Oct 10, 2023
1 parent 12d1ae5 commit 4cccdb5
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 33 deletions.
8 changes: 3 additions & 5 deletions config/api.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,21 @@
# GCP_PROJECT_ID string
# GCP service account project id (default "")
# KAFKA_AUTH_TYPE string
# kafka authentication type (mtls, sasl or empty) (default "")
# kafka authentication type (MTLS, SASL or empty) (default "")
# KAFKA_BROKERS slice
# kafka hostname:port list of brokers (default "localhost:9092")
# KAFKA_CA_CERT string
# kafka TLS CA certificate path (use the OS cert store when blank) (default "")
# KAFKA_ENABLED bool
# kafka service enabled (default "false")
# KAFKA_PROTOCOL string
# kafka SASL security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL, empty means PLAINTEXT) (default "")
# KAFKA_SASL_MECHANISM string
# kafka SASL mechanism (scram-sha-512, scram-sha-256 or plain) (default "")
# KAFKA_SASL_PASSWORD string
# kafka SASL password (default "")
# KAFKA_SASL_PROTOCOL string
# kafka SASL security protocol (default "")
# KAFKA_SASL_USERNAME string
# kafka SASL username (default "")
# KAFKA_TLS_ENABLED bool
# enable TLS or use plaintext when false (default "false")
# KAFKA_TLS_SKIP_VERIFY bool
# do not verify TLS server certificate (default "false")
# LOGGING_LEVEL string
Expand Down
11 changes: 0 additions & 11 deletions deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ objects:
value: ${APP_CACHE_TYPE}
- name: WORKER_QUEUE
value: ${WORKER_QUEUE}
- name: KAFKA_TLS_ENABLED
value: ${KAFKA_TLS_ENABLED}
resources:
limits:
cpu: ${{CPU_LIMIT}}
Expand Down Expand Up @@ -192,8 +190,6 @@ objects:
value: ${APP_INSTANCE_PREFIX}
- name: APP_CACHE_TYPE
value: ${APP_CACHE_TYPE}
- name: KAFKA_TLS_ENABLED
value: ${KAFKA_TLS_ENABLED}
resources:
limits:
cpu: ${{CPU_LIMIT}}
Expand Down Expand Up @@ -243,8 +239,6 @@ objects:
value: ${APP_INSTANCE_PREFIX}
- name: APP_CACHE_TYPE
value: ${APP_CACHE_TYPE}
- name: KAFKA_TLS_ENABLED
value: ${KAFKA_TLS_ENABLED}
resources:
limits:
cpu: ${{CPU_LIMIT}}
Expand Down Expand Up @@ -371,8 +365,6 @@ objects:
value: ${APP_CACHE_TYPE}
- name: WORKER_QUEUE
value: ${WORKER_QUEUE}
- name: KAFKA_TLS_ENABLED
value: ${KAFKA_TLS_ENABLED}
resources:
limits:
cpu: ${{CPU_LIMIT}}
Expand Down Expand Up @@ -582,6 +574,3 @@ parameters:
- description: Notification service enabled
name: APP_NOTIFICATIONS_ENABLED
value: "true"
- description: Kafka TLS connection
name: KAFKA_TLS_ENABLED
value: "false"
25 changes: 12 additions & 13 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,16 @@ var config struct {
Dsn string `env:"DSN" env-default:"" env-description:"data source name (empty value disables Sentry)"`
} `env-prefix:"SENTRY_"`
Kafka struct {
Enabled bool `env:"ENABLED" env-default:"false" env-description:"kafka service enabled"`
TlsEnabled bool `env:"TLS_ENABLED" env-default:"false" env-description:"enable TLS or use plaintext when false"`
TlsSkipVerify bool `env:"TLS_SKIP_VERIFY" env-default:"false" env-description:"do not verify TLS server certificate"`
Brokers []string `env:"BROKERS" env-default:"localhost:9092" env-description:"kafka hostname:port list of brokers"`
AuthType string `env:"AUTH_TYPE" env-default:"" env-description:"kafka authentication type (mtls, sasl or empty)"`
CACert string `env:"CA_CERT" env-default:"" env-description:"kafka TLS CA certificate path (use the OS cert store when blank)"`
SASL struct {
Username string `env:"USERNAME" env-default:"" env-description:"kafka SASL username"`
Password string `env:"PASSWORD" env-default:"" env-description:"kafka SASL password"`
SaslMechanism string `env:"MECHANISM" env-default:"" env-description:"kafka SASL mechanism (scram-sha-512, scram-sha-256 or plain)"`
SecurityProtocol string `env:"PROTOCOL" env-default:"" env-description:"kafka SASL security protocol"`
Enabled bool `env:"ENABLED" env-default:"false" env-description:"kafka service enabled"`
AuthType string `env:"AUTH_TYPE" env-default:"" env-description:"kafka authentication type (MTLS, SASL or empty)"`
SecurityProtocol string `env:"PROTOCOL" env-default:"" env-description:"kafka SASL security protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL, empty means PLAINTEXT)"`
TlsSkipVerify bool `env:"TLS_SKIP_VERIFY" env-default:"false" env-description:"do not verify TLS server certificate"`
Brokers []string `env:"BROKERS" env-default:"localhost:9092" env-description:"kafka hostname:port list of brokers"`
CACert string `env:"CA_CERT" env-default:"" env-description:"kafka TLS CA certificate path (use the OS cert store when blank)"`
SASL struct {
Username string `env:"USERNAME" env-default:"" env-description:"kafka SASL username"`
Password string `env:"PASSWORD" env-default:"" env-description:"kafka SASL password"`
SaslMechanism string `env:"MECHANISM" env-default:"" env-description:"kafka SASL mechanism (scram-sha-512, scram-sha-256 or plain)"`
} `env-prefix:"SASL_"`
} `env-prefix:"KAFKA_"`
}
Expand Down Expand Up @@ -281,7 +280,7 @@ func Initialize(configFiles ...string) {
for i, b := range cfg.Kafka.Brokers {
config.Kafka.Brokers[i] = fmt.Sprintf("%s:%d", b.Hostname, *b.Port)

// assumption: TLS/SASL credentials are always the same for all nodes in a cluster
// assumption: SASL credentials are always the same for all nodes in a cluster
if b.Authtype != nil && *b.Authtype != "" {
config.Kafka.AuthType = string(*b.Authtype)
}
Expand All @@ -290,7 +289,7 @@ func Initialize(configFiles ...string) {
}
if b.Sasl != nil {
if b.SecurityProtocol != nil && *b.SecurityProtocol != "" {
config.Kafka.SASL.SecurityProtocol = *b.SecurityProtocol
config.Kafka.SecurityProtocol = *b.SecurityProtocol
}
if b.Sasl.SaslMechanism != nil && *b.Sasl.SaslMechanism != "" {
config.Kafka.SASL.SaslMechanism = *b.Sasl.SaslMechanism
Expand Down
14 changes: 10 additions & 4 deletions internal/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,14 @@ func NewKafkaBroker(ctx context.Context) (Broker, error) {
var saslMechanism sasl.Mechanism

logger := zerolog.Ctx(ctx)
logger.Debug().Msgf("Setting up Kafka transport: %v", config.Kafka.Brokers)
logger.Info().Msgf("Setting up Kafka transport: %v Verify:%t CA:%t Auth:%s Proto:%sSASLMech:%s",
config.Kafka.Brokers,
!config.Kafka.TlsSkipVerify,
config.Kafka.CACert != "",
config.Kafka.AuthType,
config.Kafka.SecurityProtocol,
config.Kafka.SASL.SaslMechanism,
)

if config.Kafka.CACert != "" {
logger.Debug().Str("cert", config.Kafka.CACert).Msg("Configuring TLS CA pool for Kafka")
Expand All @@ -93,18 +100,17 @@ func NewKafkaBroker(ctx context.Context) (Broker, error) {
}
}

if config.Kafka.TlsEnabled && !config.InEphemeralClowder() {
if strings.Contains(strings.ToUpper(config.Kafka.SecurityProtocol), "SSL") {
logger.Debug().Msg("Configuring Kafka for TLS")

//nolint:gosec
tlsConfig = &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: pool,
InsecureSkipVerify: config.Kafka.TlsSkipVerify,
}
}

if config.Kafka.SASL.SaslMechanism != "" {
if strings.Contains(strings.ToUpper(config.Kafka.AuthType), "SASL") && config.Kafka.SASL.SaslMechanism != "" {
var err error
saslMechanism, err = createSASLMechanism(config.Kafka.SASL.SaslMechanism, config.Kafka.SASL.Username, config.Kafka.SASL.Password)
if err != nil {
Expand Down

0 comments on commit 4cccdb5

Please sign in to comment.