Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
improve openai connection handling (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored May 14, 2024
1 parent a8d071a commit 286f4f3
Showing 1 changed file with 10 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.resources.ConnectionProvider;

@Slf4j
public class TransformFunctionUtil {
Expand All @@ -95,47 +96,6 @@ public class TransformFunctionUtil {
Arrays.asList(
"value", "key", "destinationTopic", "messageKey", "topicName", "eventTime");

public static OpenAIClient buildOpenAIClient(OpenAIConfig openAIConfig) {
if (openAIConfig == null) {
return null;
}

OpenAIClientBuilder openAIClientBuilder = new OpenAIClientBuilder();

// Configure credentials based on provider
if (openAIConfig.getProvider() == OpenAIProvider.AZURE) {
openAIClientBuilder.credential(new AzureKeyCredential(openAIConfig.getAccessKey()));
} else {
openAIClientBuilder.credential(new KeyCredential(openAIConfig.getAccessKey()));
}

// Set the endpoint if it is specified
if (openAIConfig.getUrl() != null && !openAIConfig.getUrl().isEmpty()) {
openAIClientBuilder.endpoint(openAIConfig.getUrl());
}

// Setting up the Netty HTTP client with specific timeout settings
NettyAsyncHttpClientBuilder httpClientBuilder =
new NettyAsyncHttpClientBuilder()
.connectTimeout(
Duration.ofSeconds(3)) // Set connection timeout to 3 seconds
.readTimeout(Duration.ofSeconds(3)); // Set read timeout to 3 seconds

// Special handling for localhost (testing scenario)
if (openAIConfig.getUrl() != null && openAIConfig.getUrl().startsWith("http://localhost")) {
HttpPipeline httpPipeline =
new HttpPipelineBuilder()
.httpClient(new MockHttpClient(openAIConfig.getAccessKey()))
.build();
openAIClientBuilder.pipeline(httpPipeline);
} else {
// Use the custom-configured HTTP client for non-testing scenarios
openAIClientBuilder.httpClient(httpClientBuilder.build());
}

return openAIClientBuilder.buildClient();
}

public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig) {
if (openAIConfig == null) {
return null;
Expand All @@ -158,7 +118,9 @@ public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig
NettyAsyncHttpClientBuilder httpClientBuilder =
new NettyAsyncHttpClientBuilder()
.connectTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds()))
.readTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds()));
.readTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds()))
.writeTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds()))
.responseTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds()));

// Special handling for localhost (testing scenario)
if (openAIConfig.getUrl() != null && openAIConfig.getUrl().startsWith("http://localhost")) {
Expand All @@ -168,6 +130,12 @@ public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig
.build();
openAIClientBuilder.pipeline(httpPipeline);
} else {
ConnectionProvider provider = ConnectionProvider.builder("openai-connection-provider")
.maxIdleTime(Duration.ofSeconds(20))
.maxLifeTime(Duration.ofSeconds(60))
.pendingAcquireTimeout(Duration.ofSeconds(60))
.evictInBackground(Duration.ofSeconds(120)).build();
httpClientBuilder.connectionProvider(provider);
// Apply the custom-configured HTTP client for non-testing scenarios
openAIClientBuilder.httpClient(httpClientBuilder.build());
}
Expand Down

0 comments on commit 286f4f3

Please sign in to comment.