| from typing import Dict, Any |
| import os |
|
|
| from flows.base_flows import AtomicFlow |
| from flows.utils.general_helpers import get_pyfile_functions_metadata_from_file |
|
|
|
|
| class MemoryReadingAtomicFlow(AtomicFlow): |
| """A flow to read memory from given files. |
| |
| Any composite flow that uses this flow should have |
| memory_files: Dict[str, str] which maps memory name to its memory file location in the flow_state |
| |
| *Input Interface*: - `memory_files` : name of the Dict which maps the memory name to its file location e.g. |
| {"plan": "examples/JARVIS/plan.txt"} |
| """ |
|
|
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
| self.supported_mem_name = ["plan", "logs", "code_library"] |
|
|
| def _check_input_data(self, input_data: Dict[str, Any]): |
| """input data sanity check""" |
| assert "memory_files" in input_data, "memory_files not passed to MemoryReadingAtomicFlow" |
|
|
| for mem_name, mem_path in input_data["memory_files"].items(): |
| assert mem_name in self.supported_mem_name, (f"{mem_name} is not supported in MemoryReadingAtomicFlow, " |
| f"supported names are: {self.supported_mem_name}") |
| assert os.path.exists(mem_path), f"{mem_path} does not exist." |
| assert os.path.isfile(mem_path), f"{mem_path} is not a file." |
|
|
| def _read_text(self, file_location): |
| with open(file_location, 'r', encoding='utf-8') as file: |
| content = file.read() |
| return content |
|
|
| def _read_py_code_library(self, file_location): |
| metadata = get_pyfile_functions_metadata_from_file(file_location) |
| if len(metadata) == 0: |
| return "No functions yet." |
| descriptions = [] |
| for function in metadata: |
| description = f"Function Name: {function['name']}\n" |
| description += f"Documentation: {function.get('docstring', 'No documentation')}\n" |
| description += f"Parameters: {', '.join(function.get('parameters', []))}\n" |
| description += f"Default Values: {function.get('defaults', 'None')}" |
| descriptions.append(description) |
| return "\n\n".join(descriptions) |
|
|
| def run( |
| self, |
| input_data: Dict[str, Any]): |
| self._check_input_data(input_data) |
| response = {} |
| for mem_name, mem_path in input_data["memory_files"].items(): |
| if mem_name in ['plan', 'logs']: |
| response[mem_name] = self._read_text(mem_path) |
| elif mem_name == 'code_library' and mem_path.endswith('.py'): |
| response[mem_name] = self._read_py_code_library(mem_path) |
| return response |
|
|