diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java index 21ba1f829..5ce40a4c3 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java @@ -87,6 +87,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.resources.ConnectionProvider; @Slf4j public class TransformFunctionUtil { @@ -95,47 +96,6 @@ public class TransformFunctionUtil { Arrays.asList( "value", "key", "destinationTopic", "messageKey", "topicName", "eventTime"); - public static OpenAIClient buildOpenAIClient(OpenAIConfig openAIConfig) { - if (openAIConfig == null) { - return null; - } - - OpenAIClientBuilder openAIClientBuilder = new OpenAIClientBuilder(); - - // Configure credentials based on provider - if (openAIConfig.getProvider() == OpenAIProvider.AZURE) { - openAIClientBuilder.credential(new AzureKeyCredential(openAIConfig.getAccessKey())); - } else { - openAIClientBuilder.credential(new KeyCredential(openAIConfig.getAccessKey())); - } - - // Set the endpoint if it is specified - if (openAIConfig.getUrl() != null && !openAIConfig.getUrl().isEmpty()) { - openAIClientBuilder.endpoint(openAIConfig.getUrl()); - } - - // Setting up the Netty HTTP client with specific timeout settings - NettyAsyncHttpClientBuilder httpClientBuilder = - new NettyAsyncHttpClientBuilder() - .connectTimeout( - Duration.ofSeconds(3)) // Set connection timeout to 3 seconds - .readTimeout(Duration.ofSeconds(3)); // Set read timeout to 3 seconds - - // Special handling for localhost (testing scenario) - if (openAIConfig.getUrl() != null && openAIConfig.getUrl().startsWith("http://localhost")) { - HttpPipeline httpPipeline = - new HttpPipelineBuilder() - .httpClient(new MockHttpClient(openAIConfig.getAccessKey())) - .build(); - openAIClientBuilder.pipeline(httpPipeline); - } else { - // Use the custom-configured HTTP client for non-testing scenarios - openAIClientBuilder.httpClient(httpClientBuilder.build()); - } - - return openAIClientBuilder.buildClient(); - } - public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig) { if (openAIConfig == null) { return null; @@ -158,7 +118,9 @@ public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig NettyAsyncHttpClientBuilder httpClientBuilder = new NettyAsyncHttpClientBuilder() .connectTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds())) - .readTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds())); + .readTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds())) + .writeTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds())) + .responseTimeout(Duration.ofSeconds(openAIConfig.getTimeoutInSeconds())); // Special handling for localhost (testing scenario) if (openAIConfig.getUrl() != null && openAIConfig.getUrl().startsWith("http://localhost")) { @@ -168,6 +130,12 @@ public static OpenAIAsyncClient buildOpenAsyncAIClient(OpenAIConfig openAIConfig .build(); openAIClientBuilder.pipeline(httpPipeline); } else { + ConnectionProvider provider = ConnectionProvider.builder("openai-connection-provider") + .maxIdleTime(Duration.ofSeconds(20)) + .maxLifeTime(Duration.ofSeconds(60)) + .pendingAcquireTimeout(Duration.ofSeconds(60)) + .evictInBackground(Duration.ofSeconds(120)).build(); + httpClientBuilder.connectionProvider(provider); // Apply the custom-configured HTTP client for non-testing scenarios openAIClientBuilder.httpClient(httpClientBuilder.build()); }