diff --git a/examples/applications/openai-completions/README.md b/examples/applications/openai-completions/README.md index 0be01f7d8..778af0034 100644 --- a/examples/applications/openai-completions/README.md +++ b/examples/applications/openai-completions/README.md @@ -1,6 +1,6 @@ -# OpenAI Completions +# OpenAI Chat Completions -This sample application shows how to execute completion using the OpenAI library using the Azure OpenAI API endpoint. +This sample application shows how to execute Chat Completions using the OpenAI library using the Azure OpenAI API endpoint. ## Configure you OpenAI API Key diff --git a/examples/applications/openai-text-completions/README.md b/examples/applications/openai-text-completions/README.md new file mode 100644 index 000000000..f428fc0ee --- /dev/null +++ b/examples/applications/openai-text-completions/README.md @@ -0,0 +1,32 @@ +# OpenAI Instruct Completions + +This sample application shows how to use the `gpt-3.5-turbo-instruct` Open AI model. + +## Configure you OpenAI + + +``` +export OPENAI_ACCESS_KEY=... +``` + +## Deploy the LangStream application +``` +langstream docker run test -app https://github.com/LangStream/langstream/examples/applications/openai-text-completions -s https://raw.githubusercontent.com/LangStream/langstream/main/examples/secrets/secrets.yaml +``` + +## Chat with the model + +``` +./bin/langstream gateway chat test -g chat +``` + +This model is optimized to run tasks. For example, you can ask it to translate a document into another language. + +``` +You: +> Translate "How are you?" in Italian +``` + + + + diff --git a/examples/applications/openai-text-completions/configuration.yaml b/examples/applications/openai-text-completions/configuration.yaml new file mode 100644 index 000000000..3a000f1bd --- /dev/null +++ b/examples/applications/openai-text-completions/configuration.yaml @@ -0,0 +1,23 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "open-ai-configuration" + name: "OpenAI configuration" + configuration: + access-key: "{{ secrets.open-ai.access-key }}" diff --git a/examples/applications/openai-text-completions/gateways.yaml b/examples/applications/openai-text-completions/gateways.yaml new file mode 100644 index 000000000..e306f05a6 --- /dev/null +++ b/examples/applications/openai-text-completions/gateways.yaml @@ -0,0 +1,26 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: chat + type: chat + chat-options: + answers-topic: answers + questions-topic: questions + headers: + - key: langstream-client-session-id + value-from-parameters: sessionId diff --git a/examples/applications/openai-text-completions/pipeline.yaml b/examples/applications/openai-text-completions/pipeline.yaml new file mode 100644 index 000000000..a994dbee1 --- /dev/null +++ b/examples/applications/openai-text-completions/pipeline.yaml @@ -0,0 +1,51 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions" + creation-mode: create-if-not-exists + - name: "answers" + creation-mode: create-if-not-exists + - name: "debug" + creation-mode: create-if-not-exists +pipeline: + - name: "convert-to-json" + type: "document-to-json" + input: "questions" + configuration: + text-field: "question" + - name: "ai-text-completions" + type: "ai-text-completions" + output: "debug" + configuration: + model: "{{{secrets.open-ai.text-completions-model}}}" + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 10 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 10 + prompt: + - "{{{% value.question}}}" diff --git a/examples/secrets/secrets.yaml b/examples/secrets/secrets.yaml index 162902d26..9ce70d65e 100644 --- a/examples/secrets/secrets.yaml +++ b/examples/secrets/secrets.yaml @@ -29,6 +29,7 @@ secrets: provider: "${OPEN_AI_PROVIDER:-openai}" embeddings-model: "${OPEN_AI_EMBEDDINGS_MODEL:-text-embedding-ada-002}" chat-completions-model: "${OPEN_AI_CHAT_COMPLETIONS_MODEL:-gpt-3.5-turbo}" + text-completions-model: "${OPEN_AI_TEXT_COMPLETIONS_MODEL:-gpt-3.5-turbo-instruct}" - id: vertex-ai data: url: "${VERTEX_AI_URL:-https://us-central1-aiplatform.googleapis.com}" diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIAgentCodeProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIAgentCodeProvider.java index e13c2da02..fc592d0e1 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIAgentCodeProvider.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIAgentCodeProvider.java @@ -33,6 +33,7 @@ public class GenAIAgentCodeProvider implements AgentCodeProvider { "compute-ai-embeddings", "query", "ai-chat-completions", + "ai-text-completions", "ai-tools" // legacy ); diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java index f9af1842b..75487fa86 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/GenAIToolKitAgent.java @@ -26,7 +26,6 @@ import ai.langstream.api.runner.code.SimpleRecord; import ai.langstream.api.runner.topics.TopicProducer; import ai.langstream.api.runtime.ComponentType; -import com.datastax.oss.streaming.ai.ChatCompletionsStep; import com.datastax.oss.streaming.ai.TransformContext; import com.datastax.oss.streaming.ai.TransformStep; import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; @@ -35,6 +34,8 @@ import com.datastax.oss.streaming.ai.model.config.StepConfig; import com.datastax.oss.streaming.ai.model.config.TransformStepConfig; import com.datastax.oss.streaming.ai.services.ServiceProvider; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumer; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumerFactory; import com.datastax.oss.streaming.ai.util.TransformFunctionUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -337,13 +338,13 @@ private static TransformSchemaType getSchemaType(Class javaType) { } private static class TopicProducerStreamingAnswersConsumerFactory - implements ChatCompletionsStep.StreamingAnswersConsumerFactory { + implements StreamingAnswersConsumerFactory { private AgentContext agentContext; public TopicProducerStreamingAnswersConsumerFactory() {} @Override - public ChatCompletionsStep.StreamingAnswersConsumer create(String topicName) { + public StreamingAnswersConsumer create(String topicName) { TopicProducer topicProducer = agentContext .getTopicConnectionProvider() @@ -358,8 +359,7 @@ public void setAgentContext(AgentContext agentContext) { } } - private static class TopicStreamingAnswersConsumer - implements ChatCompletionsStep.StreamingAnswersConsumer { + private static class TopicStreamingAnswersConsumer implements StreamingAnswersConsumer { private TopicProducer topicProducer; public TopicStreamingAnswersConsumer(TopicProducer topicProducer) { diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/HuggingFaceProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/HuggingFaceProvider.java index 5ccc9ed7a..d3400fc6c 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/HuggingFaceProvider.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/HuggingFaceProvider.java @@ -161,6 +161,15 @@ public HuggingFaceCompletionsService( this.httpClient = HttpClient.newHttpClient(); } + @Override + public CompletableFuture getTextCompletions( + List prompt, + StreamingChunksConsumer streamingChunksConsumer, + Map options) { + return CompletableFuture.failedFuture( + new UnsupportedOperationException("Not implemented")); + } + @Override @SneakyThrows public CompletableFuture getChatCompletions( diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java index 79ea7d253..4aa3f6f0a 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java @@ -23,14 +23,17 @@ import com.azure.ai.openai.models.ChatCompletionsOptions; import com.azure.ai.openai.models.ChatRole; import com.azure.ai.openai.models.CompletionsFinishReason; +import com.azure.ai.openai.models.CompletionsOptions; import com.datastax.oss.streaming.ai.completions.ChatChoice; import com.datastax.oss.streaming.ai.completions.ChatCompletions; import com.datastax.oss.streaming.ai.completions.ChatMessage; +import com.datastax.oss.streaming.ai.completions.Chunk; import com.datastax.oss.streaming.ai.completions.CompletionsService; import java.io.StringWriter; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -207,4 +210,129 @@ public ChatMessage buildTotalAnswerMessage() { return new ChatMessage(role.get(), totalAnswer.toString()); } } + + @Override + public CompletableFuture getTextCompletions( + List prompt, + StreamingChunksConsumer streamingChunksConsumer, + Map options) { + int minChunksPerMessage = getInteger("min-chunks-per-message", 20, options); + CompletionsOptions completionsOptions = + new CompletionsOptions(prompt) + .setMaxTokens(getInteger("max-tokens", null, options)) + .setTemperature(getDouble("temperature", null, options)) + .setTopP(getDouble("top-p", null, options)) + .setLogitBias((Map) options.get("logit-bias")) + .setStream(getBoolean("stream", true, options)) + .setUser((String) options.get("user")) + .setStop((List) options.get("stop")) + .setPresencePenalty(getDouble("presence-penalty", null, options)) + .setFrequencyPenalty(getDouble("frequency-penalty", null, options)); + + // this is the default behavior, as it is async + // it works even if the streamingChunksConsumer is null + if (completionsOptions.isStream()) { + CompletableFuture finished = new CompletableFuture<>(); + Flux flux = + client.getCompletionsStream((String) options.get("model"), completionsOptions); + + TextCompletionsConsumer textCompletionsConsumer = + new TextCompletionsConsumer( + streamingChunksConsumer, minChunksPerMessage, finished); + + flux.doOnError( + error -> { + log.error( + "Internal error while processing the streaming response", + error); + finished.completeExceptionally(error); + }) + .doOnNext(textCompletionsConsumer) + .subscribe(); + + return finished.thenApply(___ -> textCompletionsConsumer.totalAnswer.toString()); + } else { + com.azure.ai.openai.models.Completions completions = + client.getCompletions((String) options.get("model"), completionsOptions) + .block(); + final String text = completions.getChoices().get(0).getText(); + return CompletableFuture.completedFuture(text); + } + } + + private static class TextCompletionsConsumer + implements Consumer { + private final StreamingChunksConsumer streamingChunksConsumer; + private final CompletableFuture finished; + + private final AtomicReference role = new AtomicReference<>(); + private final StringWriter totalAnswer = new StringWriter(); + + private final StringWriter writer = new StringWriter(); + private final AtomicInteger numberOfChunks = new AtomicInteger(); + private final int minChunksPerMessage; + + private AtomicInteger currentChunkSize = new AtomicInteger(1); + private AtomicInteger index = new AtomicInteger(); + + private final AtomicBoolean firstChunk = new AtomicBoolean(true); + + public TextCompletionsConsumer( + StreamingChunksConsumer streamingChunksConsumer, + int minChunksPerMessage, + CompletableFuture finished) { + this.minChunksPerMessage = minChunksPerMessage; + this.streamingChunksConsumer = + streamingChunksConsumer != null + ? streamingChunksConsumer + : (answerId, index, chunk, last) -> {}; + this.finished = finished; + } + + @Override + @SneakyThrows + public synchronized void accept(com.azure.ai.openai.models.Completions completions) { + List choices = completions.getChoices(); + String answerId = completions.getId(); + if (!choices.isEmpty()) { + com.azure.ai.openai.models.Choice first = choices.get(0); + + CompletionsFinishReason finishReason = first.getFinishReason(); + boolean last = finishReason != null; + final String content = first.getText(); + if (content == null) { + return; + } + if (firstChunk.compareAndSet(true, false)) { + // Some models return two line break at the beginning of the first response, + // even though this is not documented + // https://community.openai.com/t/output-starts-often-with-linebreaks/36333/4 + if (content.isBlank()) { + return; + } + } + writer.write(content); + totalAnswer.write(content); + numberOfChunks.incrementAndGet(); + + // start from 1 chunk, then double the size until we reach the minChunksPerMessage + // this gives better latencies for the first message + int currentMinChunksPerMessage = currentChunkSize.get(); + + if (numberOfChunks.get() >= currentMinChunksPerMessage || last) { + currentChunkSize.set( + Math.min(currentMinChunksPerMessage * 2, minChunksPerMessage)); + final String chunkContent = writer.toString(); + final Chunk chunk = () -> chunkContent; + streamingChunksConsumer.consumeChunk( + answerId, index.incrementAndGet(), chunk, last); + writer.getBuffer().setLength(0); + numberOfChunks.set(0); + } + if (last) { + finished.complete(null); + } + } + } + } } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java index f5e9fda17..ac98207f1 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java @@ -339,6 +339,15 @@ static class Message { } } + @Override + public CompletableFuture getTextCompletions( + List prompt, + StreamingChunksConsumer streamingChunksConsumer, + Map options) { + return CompletableFuture.failedFuture( + new UnsupportedOperationException("Not implemented")); + } + @Data static class Predictions { diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java index 82b4b6a66..c0912fb1c 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java @@ -21,9 +21,12 @@ import com.datastax.oss.streaming.ai.completions.ChatChoice; import com.datastax.oss.streaming.ai.completions.ChatCompletions; import com.datastax.oss.streaming.ai.completions.ChatMessage; +import com.datastax.oss.streaming.ai.completions.Chunk; import com.datastax.oss.streaming.ai.completions.CompletionsService; import com.datastax.oss.streaming.ai.model.JsonRecord; import com.datastax.oss.streaming.ai.model.config.ChatCompletionsConfig; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumer; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumerFactory; import com.samskivert.mustache.Mustache; import com.samskivert.mustache.Template; import java.util.HashMap; @@ -51,17 +54,6 @@ public class ChatCompletionsStep implements TransformStep { private StreamingAnswersConsumer streamingAnswersConsumer; - public interface StreamingAnswersConsumerFactory { - StreamingAnswersConsumer create(String topicName); - } - - public interface StreamingAnswersConsumer { - void streamAnswerChunk( - int index, String message, boolean last, TransformContext outputMessage); - - default void close() {} - } - // for tests public ChatCompletionsStep( CompletionsService completionsService, ChatCompletionsConfig config) { @@ -151,7 +143,7 @@ public CompletableFuture processAsync(TransformContext transformContext) { new CompletionsService.StreamingChunksConsumer() { @Override public void consumeChunk( - String answerId, int index, ChatChoice chunk, boolean last) { + String answerId, int index, Chunk chunk, boolean last) { // we must copy the context because the same context is used for all // chunks @@ -162,9 +154,10 @@ public void consumeChunk( copy.getProperties().put("stream-index", index + ""); copy.getProperties().put("stream-last-message", last + ""); - applyResultFieldToContext(copy, chunk, true); + final String content = chunk.content(); + applyResultFieldToContext(copy, content, true); streamingAnswersConsumer.streamAnswerChunk( - index, chunk.getMessage().getContent(), last, copy); + index, content, last, copy); } }, options); @@ -172,7 +165,7 @@ public void consumeChunk( return chatCompletionsHandle.thenApply( chatCompletions -> { ChatChoice chatChoice = chatCompletions.getChoices().get(0); - applyResultFieldToContext(transformContext, chatChoice, false); + applyResultFieldToContext(transformContext, chatChoice.content(), false); String logField = config.getLogField(); if (logField != null && !logField.isEmpty()) { @@ -192,8 +185,7 @@ public void consumeChunk( } private void applyResultFieldToContext( - TransformContext transformContext, ChatChoice chatChoice, boolean streamingAnswer) { - String content = chatChoice.getMessage().getContent(); + TransformContext transformContext, String content, boolean streamingAnswer) { String fieldName = config.getFieldName(); // maybe we want a different field in the streaming answer diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java new file mode 100644 index 000000000..304080a4d --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java @@ -0,0 +1,170 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai; + +import static com.datastax.oss.streaming.ai.util.TransformFunctionUtil.convertToMap; + +import com.azure.ai.openai.models.CompletionsOptions; +import com.datastax.oss.streaming.ai.completions.Chunk; +import com.datastax.oss.streaming.ai.completions.CompletionsService; +import com.datastax.oss.streaming.ai.model.JsonRecord; +import com.datastax.oss.streaming.ai.model.config.TextCompletionsConfig; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumer; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumerFactory; +import com.samskivert.mustache.Mustache; +import com.samskivert.mustache.Template; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; + +@Slf4j +public class TextCompletionsStep implements TransformStep { + + private final CompletionsService completionsService; + + private final TextCompletionsConfig config; + + private final Map avroValueSchemaCache = new ConcurrentHashMap<>(); + + private final Map avroKeySchemaCache = new ConcurrentHashMap<>(); + + private final Map messageTemplates = new ConcurrentHashMap<>(); + private final StreamingAnswersConsumerFactory streamingAnswersConsumerFactory; + + private StreamingAnswersConsumer streamingAnswersConsumer; + + public TextCompletionsStep( + CompletionsService completionsService, + StreamingAnswersConsumerFactory streamingAnswersConsumerFactory, + TextCompletionsConfig config) { + this.streamingAnswersConsumerFactory = streamingAnswersConsumerFactory; + this.completionsService = completionsService; + this.config = config; + this.streamingAnswersConsumer = (index, message, last, record) -> {}; + config.getPrompt().forEach(p -> messageTemplates.put(p, Mustache.compiler().compile(p))); + } + + @Override + public void start() throws Exception { + if (config.getStreamToTopic() != null && !config.getStreamToTopic().isEmpty()) { + log.info("Streaming answers to topic {}", config.getStreamToTopic()); + this.streamingAnswersConsumer = + streamingAnswersConsumerFactory.create(config.getStreamToTopic()); + } + } + + @Override + public void close() throws Exception { + if (this.streamingAnswersConsumer != null) { + this.streamingAnswersConsumer.close(); + } + } + + @Override + public CompletableFuture processAsync(TransformContext transformContext) { + JsonRecord jsonRecord = transformContext.toJsonRecord(); + + List prompt = + config.getPrompt().stream() + .map(p -> messageTemplates.get(p).execute(jsonRecord)) + .collect(Collectors.toList()); + + CompletionsOptions completionsOptions = + new CompletionsOptions(List.of()) + .setMaxTokens(config.getMaxTokens()) + .setTemperature(config.getTemperature()) + .setTopP(config.getTopP()) + .setLogitBias(config.getLogitBias()) + .setStream(config.isStream()) + .setUser(config.getUser()) + .setStop(config.getStop()) + .setPresencePenalty(config.getPresencePenalty()) + .setFrequencyPenalty(config.getFrequencyPenalty()); + Map options = convertToMap(completionsOptions); + options.put("model", config.getModel()); + options.put("min-chunks-per-message", config.getMinChunksPerMessage()); + options.remove("messages"); + + CompletableFuture chatCompletionsHandle = + completionsService.getTextCompletions( + prompt, + new CompletionsService.StreamingChunksConsumer() { + @Override + public void consumeChunk( + String answerId, int index, Chunk chunk, boolean last) { + + // we must copy the context because the same context is used for all + // chunks + // and also for the final answer + TransformContext copy = transformContext.copy(); + + copy.getProperties().put("stream-id", answerId); + copy.getProperties().put("stream-index", index + ""); + copy.getProperties().put("stream-last-message", last + ""); + + final String content = chunk.content(); + applyResultFieldToContext(copy, content, true); + streamingAnswersConsumer.streamAnswerChunk( + index, content, last, copy); + } + }, + options); + + return chatCompletionsHandle.thenApply( + content -> { + applyResultFieldToContext(transformContext, content, false); + + String logField = config.getLogField(); + if (logField != null && !logField.isEmpty()) { + Map logMap = new HashMap<>(); + logMap.put("model", config.getModel()); + logMap.put("options", options); + logMap.put("messages", prompt); + transformContext.setResultField( + TransformContext.toJson(logMap), + logField, + Schema.create(Schema.Type.STRING), + avroKeySchemaCache, + avroValueSchemaCache); + } + return null; + }); + } + + private void applyResultFieldToContext( + TransformContext transformContext, String content, boolean streamingAnswer) { + String fieldName = config.getFieldName(); + + // maybe we want a different field in the streaming answer + // typically you want to directly stream the answer as the whole "value" + if (streamingAnswer + && config.getStreamResponseCompletionField() != null + && !config.getStreamResponseCompletionField().isEmpty()) { + fieldName = config.getStreamResponseCompletionField(); + } + transformContext.setResultField( + content, + fieldName, + Schema.create(Schema.Type.STRING), + avroKeySchemaCache, + avroValueSchemaCache); + } +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/ChatChoice.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/ChatChoice.java index ca9045e98..ce066af16 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/ChatChoice.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/ChatChoice.java @@ -24,6 +24,11 @@ @Builder @NoArgsConstructor @AllArgsConstructor -public class ChatChoice { +public class ChatChoice implements Chunk { private ChatMessage message; + + @Override + public String content() { + return message.getContent(); + } } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/Chunk.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/Chunk.java new file mode 100644 index 000000000..6b3cc3528 --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/Chunk.java @@ -0,0 +1,20 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai.completions; + +public interface Chunk { + String content(); +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/CompletionsService.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/CompletionsService.java index 38bdc8d18..ccdeddb4c 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/CompletionsService.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/completions/CompletionsService.java @@ -25,7 +25,12 @@ CompletableFuture getChatCompletions( StreamingChunksConsumer streamingChunksConsumer, Map options); + CompletableFuture getTextCompletions( + List prompt, + StreamingChunksConsumer streamingChunksConsumer, + Map options); + interface StreamingChunksConsumer { - void consumeChunk(String answerId, int index, ChatChoice chunk, boolean last); + void consumeChunk(String answerId, int index, Chunk chunk, boolean last); } } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/StepConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/StepConfig.java index f0edac1e4..34373996d 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/StepConfig.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/StepConfig.java @@ -34,6 +34,7 @@ value = ComputeAIEmbeddingsConfig.class, name = "compute-ai-embeddings"), @JsonSubTypes.Type(value = ChatCompletionsConfig.class, name = "ai-chat-completions"), + @JsonSubTypes.Type(value = TextCompletionsConfig.class, name = "ai-text-completions"), @JsonSubTypes.Type(value = QueryConfig.class, name = "query") }) @Getter diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TextCompletionsConfig.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TextCompletionsConfig.java new file mode 100644 index 000000000..dece0e5e8 --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/model/config/TextCompletionsConfig.java @@ -0,0 +1,73 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai.model.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import lombok.Data; + +@Data +public class TextCompletionsConfig extends StepConfig { + + @JsonProperty(required = true) + private String model; + + @JsonProperty(value = "prompt", required = true) + private List prompt; + + @JsonProperty(value = "stream-to-topic") + private String streamToTopic; + + @JsonProperty(value = "stream-response-completion-field") + private String streamResponseCompletionField; + + @JsonProperty(value = "min-chunks-per-message") + private int minChunksPerMessage = 20; + + @JsonProperty(value = "completion-field") + private String fieldName; + + @JsonProperty(value = "stream") + private boolean stream = true; + + @JsonProperty(value = "log-field") + private String logField; + + @JsonProperty(value = "max-tokens") + private Integer maxTokens; + + @JsonProperty(value = "temperature") + private Double temperature; + + @JsonProperty(value = "top-p") + private Double topP; + + @JsonProperty(value = "logit-bias") + private Map logitBias; + + @JsonProperty(value = "user") + private String user; + + @JsonProperty(value = "stop") + private List stop; + + @JsonProperty(value = "presence-penalty") + private Double presencePenalty; + + @JsonProperty(value = "frequency-penalty") + private Double frequencyPenalty; +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumer.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumer.java new file mode 100644 index 000000000..dd38a3faa --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumer.java @@ -0,0 +1,24 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai.streaming; + +import com.datastax.oss.streaming.ai.TransformContext; + +public interface StreamingAnswersConsumer { + void streamAnswerChunk(int index, String message, boolean last, TransformContext outputMessage); + + default void close() {} +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumerFactory.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumerFactory.java new file mode 100644 index 000000000..0f72b5646 --- /dev/null +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/streaming/StreamingAnswersConsumerFactory.java @@ -0,0 +1,20 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.streaming.ai.streaming; + +public interface StreamingAnswersConsumerFactory { + StreamingAnswersConsumer create(String topicName); +} diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java index ff3da0ceb..a86880607 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/util/TransformFunctionUtil.java @@ -35,6 +35,7 @@ import com.datastax.oss.streaming.ai.FlattenStep; import com.datastax.oss.streaming.ai.MergeKeyValueStep; import com.datastax.oss.streaming.ai.QueryStep; +import com.datastax.oss.streaming.ai.TextCompletionsStep; import com.datastax.oss.streaming.ai.TransformContext; import com.datastax.oss.streaming.ai.TransformStep; import com.datastax.oss.streaming.ai.UnwrapKeyValueStep; @@ -57,9 +58,11 @@ import com.datastax.oss.streaming.ai.model.config.OpenAIProvider; import com.datastax.oss.streaming.ai.model.config.QueryConfig; import com.datastax.oss.streaming.ai.model.config.StepConfig; +import com.datastax.oss.streaming.ai.model.config.TextCompletionsConfig; import com.datastax.oss.streaming.ai.model.config.TransformStepConfig; import com.datastax.oss.streaming.ai.model.config.UnwrapKeyValueConfig; import com.datastax.oss.streaming.ai.services.ServiceProvider; +import com.datastax.oss.streaming.ai.streaming.StreamingAnswersConsumerFactory; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -169,7 +172,7 @@ public static StepPredicatePair buildStep( TransformStepConfig transformConfig, ServiceProvider serviceProvider, QueryStepDataSource dataSource, - ChatCompletionsStep.StreamingAnswersConsumerFactory streamingAnswersConsumerFactory, + StreamingAnswersConsumerFactory streamingAnswersConsumerFactory, StepConfig step) throws Exception { TransformStep transformStep; @@ -208,6 +211,13 @@ public static StepPredicatePair buildStep( serviceProvider, streamingAnswersConsumerFactory); break; + case "ai-text-completions": + transformStep = + newTextCompletionsFunction( + (TextCompletionsConfig) step, + serviceProvider, + streamingAnswersConsumerFactory); + break; case "query": transformStep = newQuery((QueryConfig) step, dataSource); break; @@ -315,23 +325,33 @@ public static UnwrapKeyValueStep newUnwrapKeyValueFunction(UnwrapKeyValueConfig } public static Map convertToMap(Object object) { - return new ObjectMapper().convertValue(object, Map.class); + return OBJECT_MAPPER.convertValue(object, Map.class); } public static T convertFromMap(Map map, Class type) { - return new ObjectMapper().convertValue(map, type); + return OBJECT_MAPPER.convertValue(map, type); } public static ChatCompletionsStep newChatCompletionsFunction( ChatCompletionsConfig config, ServiceProvider serviceProvider, - ChatCompletionsStep.StreamingAnswersConsumerFactory streamingAnswersConsumerFactory) + StreamingAnswersConsumerFactory streamingAnswersConsumerFactory) throws Exception { CompletionsService completionsService = serviceProvider.getCompletionsService(convertToMap(config)); return new ChatCompletionsStep(completionsService, streamingAnswersConsumerFactory, config); } + public static TextCompletionsStep newTextCompletionsFunction( + TextCompletionsConfig config, + ServiceProvider serviceProvider, + StreamingAnswersConsumerFactory streamingAnswersConsumerFactory) + throws Exception { + CompletionsService completionsService = + serviceProvider.getCompletionsService(convertToMap(config)); + return new TextCompletionsStep(completionsService, streamingAnswersConsumerFactory, config); + } + public static TransformStep newQuery(QueryConfig config, QueryStepDataSource dataSource) { config.getFields() .forEach( diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/ai.langstream.agents.index b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/ai.langstream.agents.index index cac5d9022..edd1a1a22 100644 --- a/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/ai.langstream.agents.index +++ b/langstream-agents/langstream-ai-agents/src/main/resources/META-INF/ai.langstream.agents.index @@ -7,4 +7,5 @@ drop compute compute-ai-embeddings query -ai-chat-completions \ No newline at end of file +ai-chat-completions +ai-text-completions \ No newline at end of file diff --git a/langstream-agents/langstream-ai-agents/src/main/resources/config-schema.yaml b/langstream-agents/langstream-ai-agents/src/main/resources/config-schema.yaml index 5fae9830c..928f66759 100644 --- a/langstream-agents/langstream-ai-agents/src/main/resources/config-schema.yaml +++ b/langstream-agents/langstream-ai-agents/src/main/resources/config-schema.yaml @@ -60,6 +60,7 @@ components: compute: "#/components/schemas/Compute" compute-ai-embeddings: "#/components/schemas/ComputeAiEmbeddings" ai-chat-completions: "#/components/schemas/AiChatCompletions" + ai-text-completions: "#/components/schemas/AiTextCompletions" query: "#/components/schemas/Query" DropFields: allOf: @@ -371,6 +372,99 @@ components: required: - model - messages + AiTextCompletions: + allOf: + - "$ref": "#/components/schemas/Step" + - type: object + properties: + type: + type: string + enum: + - ai-text-completions + model: + description: ID of the model to use. See the [model endpoint compatibility](https://platform.openai.com/docs/models/model-endpoint-compatibility) table for details on which models work with the Chat API. + example: "gpt-3.5-turbo" + type: string + prompt: + description: A list of prompt to send. [Example Python code](https://github.com/openai/openai-cookbook/blob/main/examples/How_to_format_inputs_to_ChatGPT_models.ipynb). + type: array + minItems: 1 + items: + type: string + temperature: + type: number + minimum: 0 + maximum: 2 + default: 1 + example: 1 + nullable: true + description: | + What sampling temperature to use, between 0 and 2. Higher values like 0.8 will make the output more random, while lower values like 0.2 will make it more focused and deterministic. + + We generally recommend altering this or `top_p` but not both. + top-p: + type: number + minimum: 0 + maximum: 1 + default: 1 + example: 1 + nullable: true + description: | + An alternative to sampling with temperature, called nucleus sampling, where the model considers the results of the tokens with top_p probability mass. So 0.1 means only the tokens comprising the top 10% probability mass are considered. + + We generally recommend altering this or `temperature` but not both. + stop: + description: | + Up to 4 sequences where the API will stop generating further tokens. + default: null + type: array + minItems: 1 + maxItems: 4 + items: + type: string + max-tokens: + description: | + The maximum number of [tokens](https://platform.openai.com/tokenizer) to generate in the chat completion. + + The total length of input tokens and generated tokens is limited by the model's context length. [Example Python code](https://github.com/openai/openai-cookbook/blob/main/examples/How_to_count_tokens_with_tiktoken.ipynb) for counting tokens. + default: inf + type: integer + presence-penalty: + type: number + default: 0 + minimum: -2 + maximum: 2 + nullable: true + description: | + Number between -2.0 and 2.0. Positive values penalize new tokens based on whether they appear in the text so far, increasing the model's likelihood to talk about new topics. + + [See more information about frequency and presence penalties.](https://platform.openai.com/docs/api-reference/parameter-details) + frequency-penalty: + type: number + default: 0 + minimum: -2 + maximum: 2 + nullable: true + description: | + Number between -2.0 and 2.0. Positive values penalize new tokens based on their existing frequency in the text so far, decreasing the model's likelihood to repeat the same line verbatim. + + [See more information about frequency and presence penalties.](https://platform.openai.com/docs/api-reference/parameter-details) + logit-bias: + type: object + default: null + nullable: true + description: | + Modify the likelihood of specified tokens appearing in the completion. + + Accepts a json object that maps tokens (specified by their token ID in the tokenizer) to an associated bias value from -100 to 100. Mathematically, the bias is added to the logits generated by the model prior to sampling. The exact effect will vary per model, but values between -1 and 1 should decrease or increase likelihood of selection; values like -100 or 100 should result in a ban or exclusive selection of the relevant token. + user: + type: string + example: user-1234 + description: | + A unique identifier representing your end-user, which can help OpenAI to monitor and detect abuse. [Learn more](https://platform.openai.com/docs/guides/safety-best-practices/end-user-ids). + required: + - model + - prompt ChatMessage: type: object diff --git a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java index 834d7cff8..88c0c733a 100644 --- a/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java +++ b/langstream-agents/langstream-ai-agents/src/test/java/ai/langstream/ai/agents/services/impl/OpenAIProviderTest.java @@ -24,6 +24,7 @@ import com.datastax.oss.streaming.ai.completions.ChatChoice; import com.datastax.oss.streaming.ai.completions.ChatCompletions; import com.datastax.oss.streaming.ai.completions.ChatMessage; +import com.datastax.oss.streaming.ai.completions.Chunk; import com.datastax.oss.streaming.ai.completions.CompletionsService; import com.datastax.oss.streaming.ai.services.ServiceProvider; import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; @@ -39,7 +40,7 @@ class OpenAIProviderTest { @Test - void testStreamingCompletion(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { + void testStreamingChatCompletion(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { stubFor( post("/openai/deployments/gpt-35-turbo/chat/completions?api-version=2023-08-01-preview") @@ -84,16 +85,14 @@ void testStreamingCompletion(WireMockRuntimeInfo vmRuntimeInfo) throws Exception new CompletionsService.StreamingChunksConsumer() { @Override public void consumeChunk( - String answerId, - int index, - ChatChoice chunk, - boolean last) { - chunks.add(chunk.getMessage().getContent()); + String answerId, int index, Chunk chunk, boolean last) { + ChatChoice chatChoice = (ChatChoice) chunk; + chunks.add(chunk.content()); log.info( "chunk: (last={}) {} {}", last, - chunk.getMessage().getRole(), - chunk.getMessage().getContent()); + chatChoice.getMessage().getRole(), + chatChoice.getMessage().getContent()); } }, Map.of( @@ -113,4 +112,72 @@ public void consumeChunk( assertEquals(" car is", chunks.get(1)); assertEquals(" a vehicle", chunks.get(2)); } + + @Test + void testStreamingTextCompletion(WireMockRuntimeInfo vmRuntimeInfo) throws Exception { + + stubFor( + post("/openai/deployments/gpt-35-turbo-instruct/completions?api-version=2023-08-01-preview") + .willReturn( + okJson( + """ + data: {"choices":[{"text":"\\n\\n","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":"Am","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":"o","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":" le","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":" mac","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":"chine","index":0,"logprobs":null,"finish_reason":null,"content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: {"choices":[{"text":"","index":0,"logprobs":null,"finish_reason":"stop","content_filter_results":null}],"id":"cmpl-82dWhr1wUJ167k6oYiSZ9MsecCCPI"} + + data: [DONE] + """))); + + ServiceProviderProvider provider = new OpenAIServiceProvider(); + ServiceProvider implementation = + provider.createImplementation( + Map.of( + "openai", + Map.of( + "provider", + "azure", + "access-key", + "xxxxxxx", + "url", + vmRuntimeInfo.getHttpBaseUrl()))); + + List chunks = new CopyOnWriteArrayList<>(); + CompletionsService service = implementation.getCompletionsService(Map.of()); + String completions = + service.getTextCompletions( + List.of( + "Translate from English to Italian: \"I love cars\" with quotes"), + new CompletionsService.StreamingChunksConsumer() { + @Override + public void consumeChunk( + String answerId, int index, Chunk chunk, boolean last) { + chunks.add(chunk.content()); + log.info("chunk: (last={}) {}", last, chunk.content()); + } + }, + Map.of( + "model", + "gpt-35-turbo-instruct", + "stream", + true, + "min-chunks-per-message", + 3)) + .get(); + log.info("result: {}", completions); + assertEquals("Amo le macchine", completions); + assertEquals(3, chunks.size()); + assertEquals("Am", chunks.get(0)); + assertEquals("o le", chunks.get(1)); + assertEquals(" macchine", chunks.get(2)); + } }