Skip to content

Commit

Permalink
Refactor VeniceController
Browse files Browse the repository at this point in the history
  • Loading branch information
sushantmane committed Dec 17, 2024
1 parent 3450e01 commit 405b106
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class ZkServerWrapper extends ProcessWrapper {
* The tick time can be low because this Zookeeper instance is intended to be used locally.
*/
private static final int TICK_TIME = 200;
private static final int MAX_SESSION_TIMEOUT = 6000 * Time.MS_PER_SECOND;
private static final int MAX_SESSION_TIMEOUT = 60 * Time.MS_PER_SECOND;
private static final int NUM_CONNECTIONS = 5000;

private static final String CLIENT_PORT_PROP = "clientPort";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class VeniceController {
private Optional<StoreGraveyardCleanupService> storeGraveyardCleanupService;
private Optional<SystemStoreRepairService> systemStoreRepairService;

private VeniceControllerRequestHandler secureRequestHandler;
private VeniceControllerRequestHandler unsecureRequestHandler;
private ThreadPoolExecutor grpcExecutor = null;
private static final String CONTROLLER_GRPC_SERVER_THREAD_NAME = "ControllerGrpcServer";

private final boolean sslEnabled;
private final VeniceControllerMultiClusterConfig multiClusterConfigs;
private final MetricsRepository metricsRepository;
Expand All @@ -80,10 +85,9 @@ public class VeniceController {
private final Optional<ClientConfig> routerClientConfig;
private final Optional<ICProvider> icProvider;
private final Optional<SupersetSchemaGenerator> externalSupersetSchemaGenerator;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private final PubSubTopicRepository pubSubTopicRepository;
private final PubSubClientsFactory pubSubClientsFactory;
static final String CONTROLLER_SERVICE_NAME = "venice-controller";
private ThreadPoolExecutor grpcExecutor = null;

/**
* Allocates a new {@code VeniceController} object.
Expand Down Expand Up @@ -147,11 +151,26 @@ public VeniceController(VeniceControllerContext ctx) {
long serviceDiscoveryRegistrationRetryMS = multiClusterConfigs.getServiceDiscoveryRegistrationRetryMS();
this.asyncRetryingServiceDiscoveryAnnouncer =
new AsyncRetryingServiceDiscoveryAnnouncer(serviceDiscoveryAnnouncers, serviceDiscoveryRegistrationRetryMS);
createServices();
this.pubSubTopicRepository = new PubSubTopicRepository();
this.controllerService = createControllerService();
this.adminServer = createAdminServer(false);
this.secureAdminServer = sslEnabled ? createAdminServer(true) : null;
this.topicCleanupService = createTopicCleanupService();
this.storeBackupVersionCleanupService = createStoreBackupVersionCleanupService();
this.disabledPartitionEnablerService = createDisabledPartitionEnablerService();
this.unusedValueSchemaCleanupService = createUnusedValueSchemaCleanupService();
this.storeGraveyardCleanupService = createStoreGraveyardCleanupService();
this.systemStoreRepairService = createSystemStoreRepairService();
if (multiClusterConfigs.isGrpcServerEnabled()) {
initializeGrpcServer();
}

// Run before enabling controller in helix so leadership won't hand back to this controller during schema requests.
initializeSystemSchema(controllerService.getVeniceHelixAdmin());
}

private void createServices() {
controllerService = new VeniceControllerService(
private VeniceControllerService createControllerService() {
VeniceControllerService veniceControllerService = new VeniceControllerService(
multiClusterConfigs,
metricsRepository,
sslEnabled,
Expand All @@ -164,109 +183,109 @@ private void createServices() {
externalSupersetSchemaGenerator,
pubSubTopicRepository,
pubSubClientsFactory);
adminServer = new AdminSparkServer(
// no need to pass the hostname, we are binding to all the addresses
multiClusterConfigs.getAdminPort(),
Admin admin = veniceControllerService.getVeniceHelixAdmin();
if (multiClusterConfigs.isParent() && !(admin instanceof VeniceParentHelixAdmin)) {
throw new VeniceException(
"'VeniceParentHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in parent mode");
}
unsecureRequestHandler = new VeniceControllerRequestHandler(buildRequestHandlerDependencies(false));
if (sslEnabled) {
secureRequestHandler = new VeniceControllerRequestHandler(buildRequestHandlerDependencies(true));
}
return veniceControllerService;
}

AdminSparkServer createAdminServer(boolean secure) {
return new AdminSparkServer(
secure ? multiClusterConfigs.getAdminSecurePort() : multiClusterConfigs.getAdminPort(),
controllerService.getVeniceHelixAdmin(),
metricsRepository,
multiClusterConfigs.getClusters(),
multiClusterConfigs.isControllerEnforceSSLOnly(),
Optional.empty(),
false,
Optional.empty(),
secure || multiClusterConfigs.isControllerEnforceSSLOnly(),
secure ? multiClusterConfigs.getSslConfig() : Optional.empty(),
secure && multiClusterConfigs.adminCheckReadMethodForKafka(),
accessController,
multiClusterConfigs.getDisabledRoutes(),
multiClusterConfigs.getCommonConfig().getJettyConfigOverrides(),
// TODO: Builder pattern or just pass the config object here?
multiClusterConfigs.getCommonConfig().isDisableParentRequestTopicForStreamPushes(),
pubSubTopicRepository,
new VeniceControllerRequestHandler(buildRequestHandlerDependencies(false)));
if (sslEnabled) {
/**
* SSL enabled AdminSparkServer uses a different port number than the regular service.
*/
secureAdminServer = new AdminSparkServer(
multiClusterConfigs.getAdminSecurePort(),
controllerService.getVeniceHelixAdmin(),
metricsRepository,
multiClusterConfigs.getClusters(),
true,
multiClusterConfigs.getSslConfig(),
multiClusterConfigs.adminCheckReadMethodForKafka(),
accessController,
multiClusterConfigs.getDisabledRoutes(),
multiClusterConfigs.getCommonConfig().getJettyConfigOverrides(),
multiClusterConfigs.getCommonConfig().isDisableParentRequestTopicForStreamPushes(),
pubSubTopicRepository,
new VeniceControllerRequestHandler(buildRequestHandlerDependencies(true)));
}
storeBackupVersionCleanupService = Optional.empty();
storeGraveyardCleanupService = Optional.empty();
systemStoreRepairService = Optional.empty();
disabledPartitionEnablerService = Optional.empty();
unusedValueSchemaCleanupService = Optional.empty();
secure ? secureRequestHandler : unsecureRequestHandler);
}

private TopicCleanupService createTopicCleanupService() {
Admin admin = controllerService.getVeniceHelixAdmin();
if (multiClusterConfigs.isParent()) {
topicCleanupService = new TopicCleanupServiceForParentController(
return new TopicCleanupServiceForParentController(
admin,
multiClusterConfigs,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository),
pubSubClientsFactory);
if (!(admin instanceof VeniceParentHelixAdmin)) {
throw new VeniceException(
"'VeniceParentHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in parent mode");
}
storeGraveyardCleanupService =
Optional.of(new StoreGraveyardCleanupService((VeniceParentHelixAdmin) admin, multiClusterConfigs));
LOGGER.info("StoreGraveyardCleanupService is enabled");
if (multiClusterConfigs.getCommonConfig().isParentSystemStoreRepairServiceEnabled()) {
systemStoreRepairService = Optional
.of(new SystemStoreRepairService((VeniceParentHelixAdmin) admin, multiClusterConfigs, metricsRepository));
LOGGER.info("SystemStoreRepairServiceEnabled is enabled");
}
this.unusedValueSchemaCleanupService =
Optional.of(new UnusedValueSchemaCleanupService(multiClusterConfigs, (VeniceParentHelixAdmin) admin));
} else {
topicCleanupService = new TopicCleanupService(
return new TopicCleanupService(
admin,
multiClusterConfigs,
pubSubTopicRepository,
new TopicCleanupServiceStats(metricsRepository),
pubSubClientsFactory);
if (!(admin instanceof VeniceHelixAdmin)) {
throw new VeniceException(
"'VeniceHelixAdmin' is expected of the returned 'Admin' from 'VeniceControllerService#getVeniceHelixAdmin' in child mode");
}
storeBackupVersionCleanupService = Optional
}
}

private Optional<StoreBackupVersionCleanupService> createStoreBackupVersionCleanupService() {
if (!multiClusterConfigs.isParent()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional
.of(new StoreBackupVersionCleanupService((VeniceHelixAdmin) admin, multiClusterConfigs, metricsRepository));
LOGGER.info("StoreBackupVersionCleanupService is enabled");
}
return Optional.empty();
}

disabledPartitionEnablerService =
Optional.of(new DisabledPartitionEnablerService((VeniceHelixAdmin) admin, multiClusterConfigs));
private Optional<DisabledPartitionEnablerService> createDisabledPartitionEnablerService() {
if (!multiClusterConfigs.isParent()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional.of(new DisabledPartitionEnablerService((VeniceHelixAdmin) admin, multiClusterConfigs));
}
// Run before enabling controller in helix so leadership won't hand back to this controller during schema requests.
initializeSystemSchema(controllerService.getVeniceHelixAdmin());
return Optional.empty();
}

// if gRpc server is not enabled, return early
if (multiClusterConfigs.isGrpcServerEnabled()) {
LOGGER.info("gRPC server is enabled in controller. Initializing gRPC server...");
initializeGrpcServer();
private Optional<StoreGraveyardCleanupService> createStoreGraveyardCleanupService() {
if (multiClusterConfigs.isParent()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional.of(new StoreGraveyardCleanupService((VeniceParentHelixAdmin) admin, multiClusterConfigs));
}
return Optional.empty();
}

private Optional<SystemStoreRepairService> createSystemStoreRepairService() {
if (multiClusterConfigs.isParent()
&& multiClusterConfigs.getCommonConfig().isParentSystemStoreRepairServiceEnabled()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional
.of(new SystemStoreRepairService((VeniceParentHelixAdmin) admin, multiClusterConfigs, metricsRepository));
}
return Optional.empty();
}

private Optional<UnusedValueSchemaCleanupService> createUnusedValueSchemaCleanupService() {
if (multiClusterConfigs.isParent()) {
Admin admin = controllerService.getVeniceHelixAdmin();
return Optional.of(new UnusedValueSchemaCleanupService(multiClusterConfigs, (VeniceParentHelixAdmin) admin));
}
return Optional.empty();
}

// package-private for testing
private void initializeGrpcServer() {
LOGGER.info("Initializing gRPC server as it is enabled for the controller...");
ParentControllerRegionValidationInterceptor parentControllerRegionValidationInterceptor =
new ParentControllerRegionValidationInterceptor(controllerService.getVeniceHelixAdmin());
List<ServerInterceptor> interceptors = new ArrayList<>(2);
interceptors.add(parentControllerRegionValidationInterceptor);

VeniceControllerGrpcServiceImpl grpcService =
new VeniceControllerGrpcServiceImpl(new VeniceControllerRequestHandler(buildRequestHandlerDependencies(false)));

VeniceControllerGrpcServiceImpl grpcService = new VeniceControllerGrpcServiceImpl(unsecureRequestHandler);
grpcExecutor = ThreadPoolFactory.createThreadPool(
multiClusterConfigs.getGrpcServerThreadCount(),
"ControllerGrpcServer",
CONTROLLER_GRPC_SERVER_THREAD_NAME,
Integer.MAX_VALUE,
BlockingQueueType.LINKED_BLOCKING_QUEUE);

Expand All @@ -282,8 +301,7 @@ private void initializeGrpcServer() {
SSLFactory sslFactory = SslUtils.getSSLFactory(
multiClusterConfigs.getSslConfig().get().getSslProperties(),
multiClusterConfigs.getSslFactoryClassName());
VeniceControllerGrpcServiceImpl secureGrpcService = new VeniceControllerGrpcServiceImpl(
new VeniceControllerRequestHandler(buildRequestHandlerDependencies(true)));
VeniceControllerGrpcServiceImpl secureGrpcService = new VeniceControllerGrpcServiceImpl(secureRequestHandler);
adminSecureGrpcServer = new VeniceGrpcServer(
new VeniceGrpcServerConfig.Builder().setPort(multiClusterConfigs.getAdminSecureGrpcPort())
.setService(secureGrpcService)
Expand All @@ -294,7 +312,8 @@ private void initializeGrpcServer() {
}
}

private ControllerRequestHandlerDependencies buildRequestHandlerDependencies(boolean secure) {
// package-private for testing
ControllerRequestHandlerDependencies buildRequestHandlerDependencies(boolean secure) {
ControllerRequestHandlerDependencies.Builder builder =
new ControllerRequestHandlerDependencies.Builder().setAdmin(controllerService.getVeniceHelixAdmin())
.setMetricsRepository(metricsRepository)
Expand Down Expand Up @@ -466,4 +485,41 @@ private static void addShutdownHook(VeniceController controller, String zkAddres
D2ClientFactory.release(zkAddress);
}));
}

// helper method to aid in testing
AdminSparkServer getSecureAdminServer() {
return secureAdminServer;
}

VeniceGrpcServer getAdminSecureGrpcServer() {
return adminSecureGrpcServer;
}

AdminSparkServer getAdminServer() {
return adminServer;
}

VeniceGrpcServer getAdminGrpcServer() {
return adminGrpcServer;
}

TopicCleanupService getTopicCleanupService() {
return topicCleanupService;
}

Optional<StoreBackupVersionCleanupService> getStoreBackupVersionCleanupService() {
return storeBackupVersionCleanupService;
}

Optional<DisabledPartitionEnablerService> getDisabledPartitionEnablerService() {
return disabledPartitionEnablerService;
}

Optional<UnusedValueSchemaCleanupService> getUnusedValueSchemaCleanupService() {
return unusedValueSchemaCleanupService;
}

Optional<StoreGraveyardCleanupService> getStoreGraveyardCleanupService() {
return storeGraveyardCleanupService;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public void errOnMissingNodes() {
VeniceControllerClusterConfig.parseClusterMap(builder.build(), REGION_ALLOW_LIST);
}

private Properties getBaseSingleRegionProperties(boolean includeMultiRegionConfig) {
protected static Properties getBaseSingleRegionProperties(boolean includeMultiRegionConfig) {
Properties props = TestUtils.getPropertiesForControllerConfig();
String clusterName = props.getProperty(CLUSTER_NAME);
props.put(LOCAL_REGION_NAME, "dc-0");
Expand Down

0 comments on commit 405b106

Please sign in to comment.