Skip to content

Commit

Permalink
Improvements and features (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
v1r3n authored May 2, 2024
1 parent 6ba04d6 commit ff7f24e
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 16 deletions.
2 changes: 1 addition & 1 deletion examples/helloworld/greetings_worker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
This file contains a Simple Worker that can be used in any workflow.
For detailed information https://github.com/conductor-sdk/conductor-python/blob/main/README.md#step-2-write-worker
""""
"""
from conductor.client.worker.worker_task import worker_task


Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
26 changes: 26 additions & 0 deletions examples/orkes/http_poll.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import uuid

from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.http_poll_task import HttpPollTask, HttpPollInput


def main():
workflow_executor = OrkesClients().get_workflow_executor()
workflow = ConductorWorkflow(executor=workflow_executor, name='http_poll_example_' + str(uuid.uuid4()))
http_poll = HttpPollTask(task_ref_name='http_poll_ref',
http_input=HttpPollInput(
uri='https://orkes-api-tester.orkesconductor.com/api',
polling_strategy='EXPONENTIAL_BACKOFF',
polling_interval=1000,
termination_condition='(function(){ return $.output.response.body.randomInt < 10;})();'),
)
workflow >> http_poll

# execute the workflow to get the results
result = workflow.execute(workflow_input={}, wait_for_seconds=10)
print(f'result: {result.output}')


if __name__ == '__main__':
main()
218 changes: 218 additions & 0 deletions examples/orkes/multiagent_chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import time
import uuid
from typing import List

from conductor.client.ai.orchestrator import AIOrchestrator
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.task.do_while_task import LoopTask
from conductor.client.workflow.task.llm_tasks.llm_chat_complete import LlmChatComplete, ChatMessage
from conductor.client.workflow.task.set_variable_task import SetVariableTask
from conductor.client.workflow.task.simple_task import SimpleTask
from conductor.client.workflow.task.switch_task import SwitchTask
from conductor.client.workflow.task.timeout_policy import TimeoutPolicy


def main():
agent1_provider = 'openai_v1'
agent1_model = 'gpt-4'

agent1_provider = 'mistral'
agent1_model = 'mistral-large-latest'

agent2_provider = 'anthropic_cloud'
agent2_model = 'claude-3-sonnet-20240229'
# anthropic_model = 'claude-3-opus-20240229'

moderator_provider = 'cohere_saas'
moderator_model = 'command-r'

mistral = 'mistral'
mistral_model = 'mistral-large-latest'

api_config = Configuration()

clients = OrkesClients(configuration=api_config)
workflow_executor = clients.get_workflow_executor()
workflow_client = clients.get_workflow_client()

moderator = 'moderator'
moderator_text = """You are very good at moderating the debates and discussions. In this discussion, there are 2 panelists, ${ua1} and ${ua2}.
As a moderator, you summarize the discussion so far, pick one of the panelist ${ua1} or ${ua2} and ask them a relevant question to continue the discussion.
You are also an expert in formatting the results into structured json format. You only output a valid JSON as a response.
You answer in RFC8259 compliant
JSON format ONLY with two fields result and user. You can effectively manage a hot discussion while keeping it
quite civil and also at the same time continue the discussion forward encouraging participants and their views.
Your answer MUST be in a JSON dictionary with keys "result" and "user". Before answer, check the output for correctness of the JSON format.
The values MUST not have new lines or special characters that are not escaped. The JSON must be RFC8259 compliant.
You produce the output in the following JSON keys:
{
"result": ACTUAL_MESSAGE
"user": USER_WHO_SOULD_RESPOND_NEXT --> One of ${ua1} or ${ua2}
}
"result" should summarize the conversation so far and add the last message in the conversation.
"user" should be the one who should respond next.
You be fair in giving chance to all participants, alternating between ${ua1} and ${ua2}.
the last person to talk was ${last_user}
Do not repeat what you have said before and do not summarize the discussion each time,
just use first person voice to ask questions to move discussion forward.
Do not use filler sentences like 'in this discussion....'
JSON:
"""

agent1 = 'agent_1'
agent1_text = """
You are ${ua1} and you reason and think like ${ua1}. Your language reflects your persona.
You are very good at analysis of the content and coming up with insights and questions on the subject and the context.
You are in a panel with other participants discussing a specific event/topic as set in the context.
You avoid any repetitive argument, discussion that you have already talked about.
Here is the context on the conversation, add a follow up with your insights and questions to the conversation:
Do not mention that you are an AI model.
${context}
You answer in a very clear way, do not add any preamble to the response:
"""

agent2 = 'agent_2'
agent2_text = """
You are ${ua2} and you reason and think like ${ua2}. Your language reflects your persona.
You are very good at continuing the conversation with more insightful question.
You are in a panel with other participants discussing a specific event/topic as set in the context.
You bring in your contrarian views to the conversation and always challenge the norms.
You avoid any repetitive argument, discussion that you have already talked about.
Your responses are times extreme and a bit hyperbolic.
When given the history of conversation, you ask a meaningful followup question that continues to conversation
and dives deeper into the topic.
Do not mention that you are an AI model.
Here is the context on the conversation:
${context}
You answer in a very clear way, do not add any preamble to the response:
"""

orchestrator = AIOrchestrator(api_configuration=api_config)

orchestrator.add_prompt_template(moderator, moderator_text, 'moderator instructions')
orchestrator.associate_prompt_template(moderator, moderator_provider, [moderator_model])

orchestrator.add_prompt_template(agent1, agent1_text, 'agent1 instructions')
orchestrator.associate_prompt_template(agent1, agent1_provider, [agent1_model])

orchestrator.add_prompt_template(agent2, agent2_text, 'agent2 instructions')
orchestrator.associate_prompt_template(agent2, agent2_provider, [agent2_model])

get_context = SimpleTask(task_reference_name='get_document', task_def_name='GET_DOCUMENT')
get_context.input_parameter('url','${workflow.input.url}')

wf_input = {'ua1': 'donald trump', 'ua2': 'joe biden', 'last_user': '${workflow.variables.last_user}',
'url': 'https://www.foxnews.com/media/billionaire-mark-cuban-dodges-question-asking-pays-fair-share-taxes-pay-owe'}

template_vars = {
'context': get_context.output('result'),
'ua1': '${workflow.input.ua1}',
'ua2': '${workflow.input.ua2}',
}

max_tokens = 500
moderator_task = LlmChatComplete(task_ref_name='moderator_ref',
max_tokens=2000,
llm_provider=moderator_provider, model=moderator_model,
instructions_template=moderator,
messages='${workflow.variables.history}',
template_variables={
'ua1': '${workflow.input.ua1}',
'ua2': '${workflow.input.ua2}',
'last_user': '${workflow.variables.last_user}'
})

agent1_task = LlmChatComplete(task_ref_name='agent1_ref',
max_tokens=max_tokens,
llm_provider=agent1_provider, model=agent1_model,
instructions_template=agent1,
messages=[ChatMessage(role='user', message=moderator_task.output('result'))],
template_variables=template_vars)

set_variable1 = (SetVariableTask(task_ref_name='task_ref_name1')
.input_parameter('history',
[
ChatMessage(role='assistant', message=moderator_task.output('result')),
ChatMessage(role='user',
message='[' + '${workflow.input.ua1}] ' + f'{agent1_task.output("result")}')
])
.input_parameter('_merge', True)
.input_parameter('last_user', "${workflow.input.ua1}"))

agent2_task = LlmChatComplete(task_ref_name='agent2_ref',
max_tokens=max_tokens,
llm_provider=agent2_provider, model=agent2_model,
instructions_template=agent2,
messages=[ChatMessage(role='user', message=moderator_task.output('result'))],
template_variables=template_vars)

set_variable2 = (SetVariableTask(task_ref_name='task_ref_name2')
.input_parameter('history', [
ChatMessage(role='assistant', message=moderator_task.output('result')),
ChatMessage(role='user', message='[' + '${workflow.input.ua2}] ' + f'{agent2_task.output("result")}')
])
.input_parameter('_merge', True)
.input_parameter('last_user', "${workflow.input.ua2}"))

init = SetVariableTask(task_ref_name='init_ref')
init.input_parameter('history',
[ChatMessage(role='user',
message="""analyze the following context:
BEGIN
${get_document.output.result}
END """)]
)
init.input_parameter('last_user', '')

wf = ConductorWorkflow(name='multiparty_chat_tmp', version=1, executor=workflow_executor)

script = """
(function(){
if ($.user == $.ua1) return 'ua1';
if ($.user == $.ua2) return 'ua2';
return 'ua1';
})();
"""
next_up = SwitchTask(task_ref_name='next_up_ref', case_expression=script, use_javascript=True)
next_up.switch_case('ua1', [agent1_task, set_variable1])
next_up.switch_case('ua2', [agent2_task, set_variable2])
next_up.input_parameter('user', moderator_task.output('user'))
next_up.input_parameter('ua1', '${workflow.input.ua1}')
next_up.input_parameter('ua2', '${workflow.input.ua2}')

loop_tasks = [moderator_task, next_up]
chat_loop = LoopTask(task_ref_name='loop', iterations=6, tasks=loop_tasks)
wf >> get_context >> init >> chat_loop



wf.timeout_seconds(1200).timeout_policy(timeout_policy=TimeoutPolicy.TIME_OUT_WORKFLOW)
wf.register(overwrite=True)

result = wf.execute(wait_until_task_ref=agent1_task.task_reference_name, wait_for_seconds=1,
workflow_input=wf_input)

result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True)
print(f'started workflow {api_config.ui_host}/{result.workflow_id}')
while result.is_running():
time.sleep(10) # wait for 10 seconds LLMs are slow!
result = workflow_client.get_workflow_status(result.workflow_id, include_output=True, include_variables=True)
op = result.variables['history']
if len(op) > 1:
print('=======================================')
print(f'{op[len(op) - 1]["message"]}')
print('\n')


if __name__ == '__main__':
main()
36 changes: 36 additions & 0 deletions examples/shell_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import subprocess
from typing import List

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.worker.worker_task import worker_task


# @worker_task(task_definition_name='shell')
def execute_shell(command: str, args: List[str]) -> str:
full_command = [command]
full_command = full_command + args
result = subprocess.run(full_command, stdout=subprocess.PIPE)

return str(result.stdout)

@worker_task(task_definition_name='task_with_retries2')
def execute_shell() -> str:
return "hello"

def main():
# defaults to reading the configuration using following env variables
# CONDUCTOR_SERVER_URL : conductor server e.g. https://play.orkes.io/api
# CONDUCTOR_AUTH_KEY : API Authentication Key
# CONDUCTOR_AUTH_SECRET: API Auth Secret
api_config = Configuration()


task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

task_handler.join_processes()


if __name__ == '__main__':
main()
48 changes: 48 additions & 0 deletions examples/untrusted_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import urllib3

from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from conductor.client.configuration.settings.authentication_settings import AuthenticationSettings
from conductor.client.http.api_client import ApiClient
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
from conductor.client.orkes.orkes_task_client import OrkesTaskClient
from conductor.client.orkes.orkes_workflow_client import OrkesWorkflowClient
from conductor.client.worker.worker_task import worker_task
from conductor.client.workflow.conductor_workflow import ConductorWorkflow
from conductor.client.workflow.executor.workflow_executor import WorkflowExecutor
from greetings_workflow import greetings_workflow
import requests


def register_workflow(workflow_executor: WorkflowExecutor) -> ConductorWorkflow:
workflow = greetings_workflow(workflow_executor=workflow_executor)
workflow.register(True)
return workflow


@worker_task(task_definition_name='hello')
def hello(name: str) -> str:
print(f'executing.... {name}')
return f'Hello {name}'


def main():
urllib3.disable_warnings()

# points to http://localhost:8080/api by default
api_config = Configuration()
api_config.http_connection = requests.Session()
api_config.http_connection.verify = False

metadata_client = OrkesMetadataClient(api_config)
task_client = OrkesTaskClient(api_config)
workflow_client = OrkesWorkflowClient(api_config)

task_handler = TaskHandler(configuration=api_config)
task_handler.start_processes()

# task_handler.stop_processes()


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions src/conductor/client/http/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@
from conductor.client.http.models.integration import Integration
from conductor.client.http.models.integration_api import IntegrationApi
from conductor.client.http.models.state_change_event import StateChangeEvent, StateChangeConfig, StateChangeEventType
from conductor.client.http.models.workflow_task import CacheConfig
18 changes: 18 additions & 0 deletions src/conductor/client/http/models/workflow_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import six

terminal_status = ('COMPLETED', 'FAILED', 'TIMED_OUT', 'TERMINATED')
successful_status = ('PAUSED', 'COMPLETED')
running_status = ('RUNNING', 'PAUSED')

class WorkflowStatus(object):
"""NOTE: This class is auto generated by the swagger code generator program.
Expand Down Expand Up @@ -189,6 +192,21 @@ def to_dict(self):

return result

def is_completed(self) -> bool:
"""Checks if the workflow has completed
:return: True if the workflow status is COMPLETED, FAILED or TERMINATED
"""
return self._status in terminal_status

def is_successful(self) -> bool:
"""Checks if the workflow has completed in successful state (ie COMPLETED)
:return: True if the workflow status is COMPLETED
"""
return self._status in successful_status

def is_running(self) -> bool:
return self.status in running_status

def to_str(self):
"""Returns the string representation of the model"""
return pprint.pformat(self.to_dict())
Expand Down
Loading

0 comments on commit ff7f24e

Please sign in to comment.