Skip to content

Commit

Permalink
[controller][gRPC] Migrate Venice Controller APIs to gRPC
Browse files Browse the repository at this point in the history
- Add controller handler service and gRPC server with secure mode
- Implement create store, get store, getAllStores, list stores APIs
- Add proto definitions for Stores and empty push
- Enable configuration to toggle gRPC server on/off
- Fix error handling and transport in ControllerClient for test compatibility
- Add additional routes: AdminCommandExecution, AdminTopicMetadata, Cluster, Version
- Update Docker setup for gRPC compatibility

Add experimental changes

Add integration test
  • Loading branch information
sushantmane committed Dec 17, 2024
1 parent 6771f7d commit a9e4293
Show file tree
Hide file tree
Showing 56 changed files with 3,144 additions and 240 deletions.
11 changes: 9 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
id 'maven-publish'
id 'com.diffplug.spotless' version '6.12.0'
id 'com.dorongold.task-tree' version '2.1.0'
id 'com.github.johnrengelman.shadow' version '6.1.0' apply false
id 'com.github.johnrengelman.shadow' version '7.1.2' apply false
id 'com.github.spotbugs' version '4.8.0' apply false
id 'org.gradle.test-retry' version '1.5.0' apply false
id 'com.form.diff-coverage' version '0.9.5' apply false
Expand Down Expand Up @@ -87,6 +87,7 @@ ext.libraries = [
grpcProtobuf: "io.grpc:grpc-protobuf:${grpcVersion}",
grpcServices: "io.grpc:grpc-services:${grpcVersion}",
grpcStub: "io.grpc:grpc-stub:${grpcVersion}",
grpcTesting: "io.grpc:grpc-testing:${grpcVersion}",
hadoopCommon: "org.apache.hadoop:hadoop-common:${hadoopVersion}",
hadoopHdfs: "org.apache.hadoop:hadoop-hdfs:${hadoopVersion}",
httpAsyncClient: 'org.apache.httpcomponents:httpasyncclient:4.1.5',
Expand Down Expand Up @@ -515,6 +516,12 @@ subprojects {
value = 'COVEREDRATIO'
minimum = threshold
}
// Ignore generate files
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect { fileTree(dir: it, exclude: [
'**/com/linkedin/venice/protocols/**',
])}))
}
}
}
}
Expand Down Expand Up @@ -818,4 +825,4 @@ task verifyJdkVersion {
gradle.taskGraph.whenReady {
// Ensure the JDK version is verified before any other tasks
verifyJdkVersion
}
}
5 changes: 5 additions & 0 deletions clients/venice-admin-tool/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ jar {
}
}

shadowJar {
// Enable merging service files from different dependencies. Required to make gRPC based clients work.
mergeServiceFiles()
}

ext {
jacocoCoverageThreshold = 0.00
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
import io.grpc.ChannelCredentials;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -74,7 +72,7 @@ public GrpcTransportClient(GrpcClientConfig grpcClientConfig) {
this.port = port;
this.serverGrpcChannels = new VeniceConcurrentHashMap<>();
this.stubCache = new VeniceConcurrentHashMap<>();
this.channelCredentials = buildChannelCredentials(sslFactory);
this.channelCredentials = GrpcUtils.buildChannelCredentials(sslFactory);
}

@Override
Expand All @@ -99,25 +97,6 @@ public void close() throws IOException {
r2TransportClientForNonStorageOps.close();
}

@VisibleForTesting
ChannelCredentials buildChannelCredentials(SSLFactory sslFactory) {
// TODO: Evaluate if this needs to fail instead since it depends on plain text support on server
if (sslFactory == null) {
return InsecureChannelCredentials.create();
}

try {
TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder()
.keyManager(GrpcUtils.getKeyManagers(sslFactory))
.trustManager(GrpcUtils.getTrustManagers(sslFactory));
return tlsBuilder.build();
} catch (Exception e) {
throw new VeniceClientException(
"Failed to initialize SSL channel credentials for Venice gRPC Transport Client",
e);
}
}

@VisibleForTesting
VeniceClientRequest buildVeniceClientRequest(String[] requestParts, byte[] requestBody, boolean isSingleGet) {
VeniceClientRequest.Builder requestBuilder = VeniceClientRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package com.linkedin.venice.fastclient.transport;

import static org.mockito.Mockito.*;
import static org.testng.Assert.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import com.google.common.collect.ImmutableMap;
import com.linkedin.r2.transport.common.Client;
import com.linkedin.venice.HttpMethod;
import com.linkedin.venice.client.exceptions.VeniceClientException;
import com.linkedin.venice.client.store.transport.TransportClient;
import com.linkedin.venice.client.store.transport.TransportClientResponse;
import com.linkedin.venice.fastclient.GrpcClientConfig;
import com.linkedin.venice.protocols.VeniceClientRequest;
import com.linkedin.venice.protocols.VeniceReadServiceGrpc;
import com.linkedin.venice.protocols.VeniceServerResponse;
import com.linkedin.venice.security.SSLFactory;
import io.grpc.ChannelCredentials;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -61,14 +68,6 @@ public void setUp() {
grpcTransportClient = new GrpcTransportClient(mockClientConfig);
}

@Test(expectedExceptions = VeniceClientException.class)
public void testBuildChannelCredentials() {
ChannelCredentials actualChannelCredentials = grpcTransportClient.buildChannelCredentials(null);
assertNotNull(actualChannelCredentials, "Null ssl factory should default to insecure channel credentials");

grpcTransportClient.buildChannelCredentials(mock(SSLFactory.class));
}

@Test
public void testBuildVeniceClientRequestForSingleGet() {
VeniceClientRequest clientRequest =
Expand Down
6 changes: 4 additions & 2 deletions gradle/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,10 @@
</Match>
<Match>
<!--Ignore SpotBug Checks for the following package as classes are generated by protoc compiler-->
<Package name="com.linkedin.venice.protocols"/>
<Bug pattern="MS_EXPOSE_REP"/>
<Or>
<Package name="com.linkedin.venice.protocols"/>
<Package name="com.linkedin.venice.protocols.controller"/>
</Or>
</Match>
<Match>
<Bug pattern="SE_INNER_CLASS"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ private ConfigKeys() {
// What tags to assign to a controller instance
public static final String CONTROLLER_INSTANCE_TAG_LIST = "controller.instance.tag.list";

/**
* Whether to enable gRPC server in controller or not.
*/
public static final String CONTROLLER_GRPC_SERVER_ENABLED = "controller.grpc.server.enabled";

/**
* A port for the controller to listen on for incoming requests. On this port, the controller will
* server non-ssl requests.
*/
public static final String CONTROLLER_ADMIN_GRPC_PORT = "controller.admin.grpc.port";
/**
* A port for the controller to listen on for incoming requests. On this port, the controller will
* only serve ssl requests.
*/
public static final String CONTROLLER_ADMIN_SECURE_GRPC_PORT = "controller.admin.secure.grpc.port";

/**
* Number of threads to use for the gRPC server in controller.
*/
public static final String CONTROLLER_GRPC_SERVER_THREAD_COUNT = "controller.grpc.server.thread.count";

/** List of forbidden admin paths */
public static final String CONTROLLER_DISABLED_ROUTES = "controller.cluster.disabled.routes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ public class ControllerApiConstants {
public static final String SOURCE_GRID_FABRIC = "source_grid_fabric";
public static final String BATCH_JOB_HEARTBEAT_ENABLED = "batch_job_heartbeat_enabled";

public static final String NAME = "store_name";
public static final String STORE_NAME = "store_name";
/**
* @deprecated Use {@link #STORE_NAME} instead.
*/
public static final String NAME = STORE_NAME;
public static final String STORE_PARTITION = "store_partition";
public static final String STORE_VERSION = "store_version";
public static final String OWNER = "owner";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ public class LeaderControllerResponse
private String cluster;
private String url;
private String secureUrl = null;
private String grpcUrl = null;
private String secureGrpcUrl = null;

public String getCluster() {
return cluster;
Expand All @@ -29,4 +31,20 @@ public String getSecureUrl() {
public void setSecureUrl(String url) {
this.secureUrl = url;
}

public void setGrpcUrl(String url) {
this.grpcUrl = url;
}

public String getGrpcUrl() {
return grpcUrl;
}

public void setSecureGrpcUrl(String url) {
this.secureGrpcUrl = url;
}

public String getSecureGrpcUrl() {
return secureGrpcUrl;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.linkedin.venice.controllerapi.request;

public class ClusterDiscoveryRequest extends ControllerRequest {
private static final String CLUSTER_NAME_PLACEHOLDER = "UNKNOWN";

public ClusterDiscoveryRequest(String storeName) {
super(CLUSTER_NAME_PLACEHOLDER, storeName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.venice.controllerapi.request;

import static com.linkedin.venice.controllerapi.ControllerApiConstants.CLUSTER;
import static com.linkedin.venice.controllerapi.ControllerApiConstants.STORE_NAME;


/**
* Base class for request objects used in controller endpoints.
*
* Extend this class to ensure required parameters are validated in the constructor of the extending class.
* This class is intended for use on both the client and server sides.
* All required parameters should be passed to and validated within the constructor of the extending class.
*/
public class ControllerRequest {
protected String clusterName;
protected String storeName;

public ControllerRequest(String clusterName) {
this.clusterName = validateParam(clusterName, CLUSTER);
this.storeName = null;
}

public ControllerRequest(String clusterName, String storeName) {
this.clusterName = validateParam(clusterName, CLUSTER);
this.storeName = validateParam(storeName, STORE_NAME);
}

public String getClusterName() {
return clusterName;
}

public String getStoreName() {
return storeName;
}

public static String validateParam(String param, String paramName) {
if (param == null || param.isEmpty()) {
throw new IllegalArgumentException("The request is missing the " + paramName + ", which is a mandatory field.");
}
return param;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.linkedin.venice.controllerapi.request;

/**
* Represents a request to create a new store in the specified Venice cluster with the provided parameters.
* This class encapsulates all necessary details for the creation of a store, including its name, owner,
* schema definitions, and access permissions.
*/
public class CreateNewStoreRequest extends ControllerRequest {
public static final String DEFAULT_STORE_OWNER = "";

private final String owner;
private final String keySchema;
private final String valueSchema;
private final boolean isSystemStore;

// a JSON string representing the access permissions for the store
private final String accessPermissions;

public CreateNewStoreRequest(
String clusterName,
String storeName,
String owner,
String keySchema,
String valueSchema,
String accessPermissions,
boolean isSystemStore) {
super(clusterName, storeName);
this.keySchema = validateParam(keySchema, "Key schema");
this.valueSchema = validateParam(valueSchema, "Value schema");
this.owner = owner == null ? DEFAULT_STORE_OWNER : owner;
this.accessPermissions = accessPermissions;
this.isSystemStore = isSystemStore;
}

public String getOwner() {
return owner;
}

public String getKeySchema() {
return keySchema;
}

public String getValueSchema() {
return valueSchema;
}

public String getAccessPermissions() {
return accessPermissions;
}

public boolean isSystemStore() {
return isSystemStore;
}
}
Loading

0 comments on commit a9e4293

Please sign in to comment.