| | from aiflows.base_flows import CompositeFlow |
| | from aiflows.utils import logging |
| | from aiflows.messages import FlowMessage |
| | from aiflows.interfaces import KeyInterface |
| | from aiflows.data_transformations import RegexFirstOccurrenceExtractor,EndOfInteraction |
| | log = logging.get_logger(f"aiflows.{__name__}") |
| |
|
| |
|
| | class ChatHumanFlowModule(CompositeFlow): |
| | """ This class implements a Chat Human Flow Module. It is a flow that consists of two sub-flows that are executed circularly. It Contains the following subflows: |
| | |
| | - A User Flow: A flow makes queries to the Assistant Flow. E.g. The user asks the assistant (LLM) a question. |
| | - A Assistant Flow: A flow that responds to queries made by the User Flow. E.g. The assistant (LLM) answers the user's question. |
| | |
| | To end the interaction, the user must type "\<END\>" |
| | |
| | An illustration of the flow is as follows: |
| | |
| | |------> User Flow -----------> | |
| | ^ | |
| | | | |
| | | v |
| | |<------ Assistant Flow <-------| |
| | |
| | *Configuration Parameters*: |
| | |
| | - `name` (str): The name of the flow. Default: "ChatHumanFlowModule" |
| | - `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
| | Default: "Flow that enables chatting between a ChatAtomicFlow and a user providing the input." |
| | - `max_rounds` (int): The maximum number of rounds the flow can run for. Default: None, which means that there is no limit on the number of rounds. |
| | - `early_exit_key` (str): The key that is used to exit the flow. Default: "end_of_interaction" |
| | - `subflows_config` (Dict[str,Any]): A dictionary of subflows configurations. Default: |
| | - `Assistant Flow`: The configuration of the Assistant Flow. By default, it a ChatAtomicFlow. It default parmaters are defined in ChatAtomicFlowModule. |
| | - `User Flow`: The configuration of the User Flow. By default, it a HumanStandardInputFlow. It default parmaters are defined in HumanStandardInputFlowModule. |
| | - `topology` (str): (List[Dict[str,Any]]): The topology of the flow which is "circular". |
| | By default, the topology is the one shown in the illustration above (the topology is also described in ChatHumanFlowModule.yaml). |
| | |
| | *Input Interface*: |
| | |
| | - None. By default, the input interface doesn't expect any input. |
| | |
| | *Output Interface*: |
| | |
| | - `end_of_interaction` (bool): Whether the interaction is finished or not. |
| | |
| | :param \**kwargs: Arguments to be passed to the parent class CircularFlow constructor. |
| | :type \**kwargs: Dict[str, Any] |
| | """ |
| | |
| |
|
| | def __init__(self, **kwargs): |
| | super().__init__(**kwargs) |
| | |
| | |
| | self.regex_extractor = RegexFirstOccurrenceExtractor(**self.flow_config["regex_first_occurrence_extractor"]) |
| | |
| | |
| | self.end_of_interaction = EndOfInteraction(**self.flow_config["end_of_interaction"]) |
| | |
| | self.input_interface_assistant = KeyInterface( |
| | keys_to_rename = {"human_input": "query"}, |
| | additional_transformations = [self.regex_extractor, self.end_of_interaction] |
| | ) |
| | |
| | def set_up_flow_state(self): |
| | """ This method sets up the flow state. It is called when the flow is executed.""" |
| | super().set_up_flow_state() |
| | self.flow_state["last_flow_called"] = None |
| | self.flow_state["current_round"] = 0 |
| | self.flow_state["user_inputs"] = [] |
| | self.flow_state["assistant_outputs"] = [] |
| | self.flow_state["input_message"] = None |
| | self.flow_state["end_of_interaction"] = False |
| |
|
| | @classmethod |
| | def type(cls): |
| | """ This method returns the type of the flow.""" |
| | return "OpenAIChatHumanFlowModule" |
| | |
| | def max_rounds_reached(self): |
| | return self.flow_config["max_rounds"] is not None and self.flow_state["current_round"] >= self.flow_config["max_rounds"] |
| | |
| | def generate_reply(self): |
| | |
| | reply = self._package_output_message( |
| | input_message = self.flow_state["input_message"], |
| | response = { |
| | "user_inputs": self.flow_state["user_inputs"], |
| | "assistant_outputs": self.flow_state["assistant_outputs"], |
| | "end_of_interaction": self.flow_state["end_of_interaction"] |
| | }, |
| | ) |
| | |
| | self.reply_to_message( |
| | reply = reply, |
| | to = self.flow_state["input_message"] |
| | ) |
| | |
| | |
| | |
| | def call_to_user(self,input_message): |
| | |
| | self.flow_state["assistant_outputs"].append(input_message.data["api_output"]) |
| |
|
| | if self.max_rounds_reached(): |
| | self.generate_reply() |
| | else: |
| | self.subflows["User"].send_message_async(input_message,pipe_to=self.flow_config["flow_ref"]) |
| | self.flow_state["last_flow_called"] = "User" |
| | |
| | self.flow_state["current_round"] += 1 |
| | |
| | |
| | |
| | def call_to_assistant(self,input_message): |
| | message = self.input_interface_assistant(input_message) |
| | |
| | if self.flow_state["last_flow_called"] is None: |
| | self.flow_state["input_message"] = input_message |
| | |
| | else: |
| | self.flow_state["user_inputs"].append(input_message.data["query"]) |
| | |
| | if message.data["end_of_interaction"]: |
| | self.flow_state["end_of_interaction"] = True |
| | self.generate_reply() |
| | |
| | else: |
| | self.subflows["Assistant"].send_message_async(message,pipe_to=self.flow_config["flow_ref"]) |
| | self.flow_state["last_flow_called"] = "Assistant" |
| | |
| | def run(self,input_message: FlowMessage): |
| | """ This method runs the flow. It is the main method of the flow and it is called when the flow is executed. |
| | |
| | :param input_message: The input message to the flow. |
| | :type input_message: FlowMessage |
| | """ |
| | last_flow_called = self.flow_state["last_flow_called"] |
| | |
| | if last_flow_called is None or last_flow_called == "User": |
| | self.call_to_assistant(input_message=input_message) |
| | |
| | |
| | elif last_flow_called == "Assistant": |
| | self.call_to_user(input_message=input_message) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|