Skip to content

Commit

Permalink
Add mTLS support
Browse files Browse the repository at this point in the history
Added mTLS support to set up OpenSearch connection with client certificate

Signed-off-by: Andrey Pleskach <[email protected]>
  • Loading branch information
willyborankin committed May 9, 2024
1 parent 8ed2745 commit 8d35532
Show file tree
Hide file tree
Showing 10 changed files with 800 additions and 87 deletions.
10 changes: 10 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ plugins {
id "com.diffplug.spotless" version "6.25.0"
}

idea {
module {
downloadSources = true
}
}

wrapper {
distributionType = 'ALL'
doLast {
Expand Down Expand Up @@ -173,6 +179,9 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
implementation "com.google.code.gson:gson:2.10.1"
implementation "org.opensearch.client:opensearch-rest-high-level-client:$openSearchVersion"
implementation "org.bouncycastle:bcprov-jdk18on:1.78.1"
implementation "org.bouncycastle:bcpkix-jdk18on:1.78.1"


testImplementation "org.junit.jupiter:junit-jupiter:5.10.2"
testImplementation "org.mockito:mockito-core:5.11.0"
Expand All @@ -184,6 +193,7 @@ dependencies {
testImplementation "com.fasterxml.jackson.core:jackson-core:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-databind:2.17.0"
testImplementation "com.fasterxml.jackson.core:jackson-annotations:2.17.0"
testImplementation "org.apache.commons:commons-lang3:3.14.0"
testRuntimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion"

integrationTestImplementation "org.testcontainers:junit-jupiter:$testcontainersVersion"
Expand Down
74 changes: 74 additions & 0 deletions docs/opensearch-sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,78 @@ Authentication
* Default: null
* Importance: medium

SSL Client Settings
^^^^^^^^^^^^^^^^^^^

``connection.ssl.protocol.type``
SSL protocol type. Default value is TLSv1.3, supported are: TLSv1.2, TLSv1.3

* Type: string
* Default: TLSv1.3
* Valid Values: TLSv1.2, TLSv1.3
* Importance: medium

``connection.access.key.password``
User access key password

* Type: password
* Default: null
* Importance: medium

``connection.ca.certificate.location``
Path to X.509 root CAs file (PEM format)

* Type: string
* Default: null
* Importance: medium

``connection.access.certificate.location``
Path to X.509 user access certificate file (PEM format)

* Type: string
* Default: null
* Importance: medium

``connection.access.key.location``
Path to the user certificate’s keys (PKCS #8) file (PEM format)

* Type: string
* Default: null
* Importance: medium

``connection.truststore.location``
Path to the Truststore file (JKS format)

* Type: string
* Default: null
* Importance: medium

``connection.truststore.password``
Truststore password

* Type: password
* Default: null
* Importance: medium

``connection.keystore.location``
Path to the Keystore file (PKCS12/PFX format)

* Type: string
* Default: null
* Importance: medium

``connection.keystore.type``
Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX

* Type: string
* Default: JKS
* Importance: medium

``connection.keystore.password``
Keystore password

* Type: password
* Default: null
* Importance: medium


Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/
package io.aiven.kafka.connect.opensearch;

import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;
import static io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_PASSWORD_CONFIG;
import static io.aiven.kafka.connect.opensearch.auth.OpensearchBasicAuthConfigurator.CONNECTION_USERNAME_CONFIG;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2019 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aiven.kafka.connect.opensearch.auth;

import static io.aiven.kafka.connect.opensearch.auth.SSLContextBuilder.SUPPORTED_PROTOCOLS;

import java.util.Objects;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;

import io.aiven.kafka.connect.opensearch.OpensearchSinkConnectorConfig;
import io.aiven.kafka.connect.opensearch.spi.ConfigDefContributor;
import io.aiven.kafka.connect.opensearch.spi.OpensearchClientConfigurator;

import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;

/**
* Adds basic authentication to the {@index HttpAsyncClientBuilder} for Opensearch client if configured.
*/
public class OpensearchBasicAuthConfigurator implements OpensearchClientConfigurator, ConfigDefContributor {

private final static String SSL_SETTINGS_GROUP_NAME = "SSL Client Settings";

public static final String CONNECTION_USERNAME_CONFIG = "connection.username";
private static final String CONNECTION_USERNAME_DOC = "The username used to authenticate with OpenSearch. "
+ "The default is the null, and authentication will only be performed if "
+ " both the username and password are non-null.";
public static final String CONNECTION_PASSWORD_CONFIG = "connection.password";
private static final String CONNECTION_PASSWORD_DOC = "The password used to authenticate with OpenSearch. "
+ "The default is the null, and authentication will only be performed if "
+ " both the username and password are non-null.";

public static final String CLIENT_SSL_PROTOCOL_TYPE = "connection.ssl.protocol.type";
public static final String CLIENT_SSL_PROTOCOL_TYPE_DOC = "SSL protocol type. Default value is "
+ SSLContextBuilder.TLS_1_3 + ", supported are: " + SSLContextBuilder.TLS_1_2 + ", "
+ SSLContextBuilder.TLS_1_3;

public static final String CLIENT_SSL_CA_CERTIFICATE_LOCATION = "connection.ca.certificate.location";
private static final String CLIENT_SSL_CA_CERTIFICATE_LOCATION_DOC = "Path to X.509 root CAs file (PEM format)";

public static final String CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION = "connection.access.certificate.location";
private static final String CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION_DOC = "Path to X.509 user access certificate file (PEM format)";

public static final String CLIENT_SSL_ACCESS_KEY_LOCATION = "connection.access.key.location";
private static final String CLIENT_SSL_ACCESS_KEY_LOCATION_DOC = "Path to the user certificate’s keys (PKCS #8) file (PEM format)";

public static final String CLIENT_SSL_ACCESS_KEY_PASSWORD = "connection.access.key.password";
private static final String CLIENT_SSL_ACCESS_KEY_PASSWORD_DOC = "User access key password";

public static final String CLIENT_SSL_TRUSTSTORE_LOCATION = "connection.truststore.location";
private static final String CLIENT_SSL_TRUSTSTORE_LOCATION_DOC = "Path to the Truststore file (JKS format)";

public static final String CLIENT_SSL_TRUSTSTORE_PASSWORD = "connection.truststore.password";
private static final String CLIENT_SSL_TRUSTSTORE_PASSWORD_DOC = "Truststore password";

public static final String CLIENT_SSL_KEYSTORE_LOCATION = "connection.keystore.location";
private static final String CLIENT_SSL_KEYSTORE_LOCATION_DOC = "Path to the Keystore file (PKCS12/PFX format)";

public static final String CLIENT_SSL_KEYSTORE_TYPE = "connection.keystore.type";
private static final String CLIENT_SSL_KEYSTORE_TYPE_DOC = "Keystore type. The default is JKS. Supported values are: JKS, PKCS12 or PFX";

public static final String CLIENT_SSL_KEYSTORE_PASSWORD = "connection.keystore.password";
private static final String CLIENT_SSL_KEYSTORE_PASSWORD_DOC = "Keystore password";

private static final String SUPPORTED_SSL_PROTOCOLS_MESSAGE = String.join(", ", SUPPORTED_PROTOCOLS);

@Override
public boolean apply(final OpensearchSinkConnectorConfig config, final HttpAsyncClientBuilder builder) {
if (!isAuthenticatedConnection(config)) {
return false;
}

final var credentialsProvider = new BasicCredentialsProvider();
for (final var httpHost : config.httpHosts()) {
credentialsProvider.setCredentials(new AuthScope(httpHost),
new UsernamePasswordCredentials(connectionUsername(config), connectionPassword(config).value()));
}
SSLContextBuilder.buildSSLContext(config).map(builder::setSSLContext);
return true;
}

@Override
public void addConfig(final ConfigDef config) {
int order = -1;
config.define(CONNECTION_USERNAME_CONFIG, Type.STRING, null, Importance.MEDIUM, CONNECTION_USERNAME_DOC,
"Authentication", order++, Width.SHORT, "Connection Username")
.define(CONNECTION_PASSWORD_CONFIG, Type.PASSWORD, null, Importance.MEDIUM, CONNECTION_PASSWORD_DOC,
"Authentication", order++, Width.SHORT, "Connection Password");

// Common SSL settings
config.define(CLIENT_SSL_PROTOCOL_TYPE, Type.STRING, SSLContextBuilder.TLS_1_3, new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
assert value instanceof String;
final var s = (String) value;
if (!SSLContextBuilder.TLS_1_3.equalsIgnoreCase(s) && !SSLContextBuilder.TLS_1_2.equalsIgnoreCase(s)) {
throw new ConfigException("Unsupported SSL protocol type " + s + ". Supported are: "
+ SUPPORTED_SSL_PROTOCOLS_MESSAGE);
}
}

@Override
public String toString() {
return SUPPORTED_SSL_PROTOCOLS_MESSAGE;
}
}, Importance.MEDIUM, CLIENT_SSL_PROTOCOL_TYPE_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"SSL protocol type")
.define(CLIENT_SSL_ACCESS_KEY_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_SSL_ACCESS_KEY_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"User access key password");
// PEM Certificates settings
config.define(CLIENT_SSL_CA_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_SSL_CA_CERTIFICATE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT, "Root CAs")
.define(CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_SSL_ACCESS_CERTIFICATE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"User access certificate")
.define(CLIENT_SSL_ACCESS_KEY_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_SSL_ACCESS_KEY_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"User certificate’s key");
// KeyStore and TrustStore files settings
config.define(CLIENT_SSL_TRUSTSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_SSL_TRUSTSTORE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"Trust store location")
.define(CLIENT_SSL_TRUSTSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_SSL_TRUSTSTORE_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT,
"Trust store password")
.define(CLIENT_SSL_KEYSTORE_LOCATION, Type.STRING, null, Importance.MEDIUM,
CLIENT_SSL_KEYSTORE_LOCATION_DOC, SSL_SETTINGS_GROUP_NAME, order, Width.SHORT,
"Key store location")
.define(CLIENT_SSL_KEYSTORE_TYPE, Type.STRING, "JKS", Importance.MEDIUM, CLIENT_SSL_KEYSTORE_TYPE_DOC,
SSL_SETTINGS_GROUP_NAME, order++, Width.SHORT, "Key store type")
.define(CLIENT_SSL_KEYSTORE_PASSWORD, Type.PASSWORD, null, Importance.MEDIUM,
CLIENT_SSL_KEYSTORE_PASSWORD_DOC, SSL_SETTINGS_GROUP_NAME, order + 1, Width.SHORT,
"Key store password");

}

private static boolean isAuthenticatedConnection(final OpensearchSinkConnectorConfig config) {
return Objects.nonNull(connectionUsername(config)) && Objects.nonNull(connectionPassword(config));
}

private static String connectionUsername(final OpensearchSinkConnectorConfig config) {
return config.getString(CONNECTION_USERNAME_CONFIG);
}

private static Password connectionPassword(final OpensearchSinkConnectorConfig config) {
return config.getPassword(CONNECTION_PASSWORD_CONFIG);
}

}
Loading

0 comments on commit 8d35532

Please sign in to comment.