| | from typing import Dict, Any |
| |
|
| | from flows.base_flows import CircularFlow |
| | from flows.utils import logging |
| |
|
| | from .ControllerAtomicFlow import ControllerAtomicFlow |
| |
|
| | logging.set_verbosity_debug() |
| | log = logging.get_logger(__name__) |
| |
|
| |
|
| | class ControllerExecutorFlow(CircularFlow): |
| | def _on_reach_max_round(self): |
| | self._state_update_dict({ |
| | "answer": "The maximum amount of rounds was reached before the model found an answer.", |
| | "status": "unfinished" |
| | }) |
| |
|
| | @CircularFlow.output_msg_payload_processor |
| | def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[str, Any]: |
| | command = output_payload["command"] |
| | if command == "finish": |
| | return { |
| | "EARLY_EXIT": True, |
| | "answer": output_payload["command_args"]["answer"], |
| | "status": "finished" |
| | } |
| | else: |
| | return output_payload |
| |
|