| import os |
| import re |
| import subprocess |
| import sys |
| from importlib import util |
| import types |
| import tempfile |
|
|
| from open_webui.apps.webui.models.functions import Functions |
| from open_webui.apps.webui.models.tools import Tools |
| from open_webui.config import FUNCTIONS_DIR, TOOLS_DIR |
|
|
|
|
| def extract_frontmatter(content): |
| """ |
| Extract frontmatter as a dictionary from the provided content string. |
| """ |
| frontmatter = {} |
| frontmatter_started = False |
| frontmatter_ended = False |
| frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE) |
|
|
| try: |
| lines = content.splitlines() |
| if len(lines) < 1 or lines[0].strip() != '"""': |
| |
| return {} |
|
|
| frontmatter_started = True |
|
|
| for line in lines[1:]: |
| if '"""' in line: |
| if frontmatter_started: |
| frontmatter_ended = True |
| break |
|
|
| if frontmatter_started and not frontmatter_ended: |
| match = frontmatter_pattern.match(line) |
| if match: |
| key, value = match.groups() |
| frontmatter[key.strip()] = value.strip() |
|
|
| except Exception as e: |
| print(f"An error occurred: {e}") |
| return {} |
|
|
| return frontmatter |
|
|
|
|
| def replace_imports(content): |
| """ |
| Replace the import paths in the content. |
| """ |
| replacements = { |
| "from utils": "from open_webui.utils", |
| "from apps": "from open_webui.apps", |
| "from main": "from open_webui.main", |
| "from config": "from open_webui.config", |
| } |
|
|
| for old, new in replacements.items(): |
| content = content.replace(old, new) |
|
|
| return content |
|
|
|
|
| def load_toolkit_module_by_id(toolkit_id, content=None): |
|
|
| if content is None: |
| tool = Tools.get_tool_by_id(toolkit_id) |
| if not tool: |
| raise Exception(f"Toolkit not found: {toolkit_id}") |
|
|
| content = tool.content |
|
|
| content = replace_imports(content) |
| Tools.update_tool_by_id(toolkit_id, {"content": content}) |
| else: |
| frontmatter = extract_frontmatter(content) |
| |
| install_frontmatter_requirements(frontmatter.get("requirements", "")) |
|
|
| module_name = f"tool_{toolkit_id}" |
| module = types.ModuleType(module_name) |
| sys.modules[module_name] = module |
|
|
| |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False) |
| temp_file.close() |
| try: |
| with open(temp_file.name, "w", encoding="utf-8") as f: |
| f.write(content) |
| module.__dict__["__file__"] = temp_file.name |
|
|
| |
| exec(content, module.__dict__) |
| frontmatter = extract_frontmatter(content) |
| print(f"Loaded module: {module.__name__}") |
|
|
| |
| if hasattr(module, "Tools"): |
| return module.Tools(), frontmatter |
| else: |
| raise Exception("No Tools class found in the module") |
| except Exception as e: |
| print(f"Error loading module: {toolkit_id}: {e}") |
| del sys.modules[module_name] |
| raise e |
| finally: |
| os.unlink(temp_file.name) |
|
|
|
|
| def load_function_module_by_id(function_id, content=None): |
| if content is None: |
| function = Functions.get_function_by_id(function_id) |
| if not function: |
| raise Exception(f"Function not found: {function_id}") |
| content = function.content |
|
|
| content = replace_imports(content) |
| Functions.update_function_by_id(function_id, {"content": content}) |
| else: |
| frontmatter = extract_frontmatter(content) |
| install_frontmatter_requirements(frontmatter.get("requirements", "")) |
|
|
| module_name = f"function_{function_id}" |
| module = types.ModuleType(module_name) |
| sys.modules[module_name] = module |
|
|
| |
| |
| temp_file = tempfile.NamedTemporaryFile(delete=False) |
| temp_file.close() |
| try: |
| with open(temp_file.name, "w", encoding="utf-8") as f: |
| f.write(content) |
| module.__dict__["__file__"] = temp_file.name |
|
|
| |
| exec(content, module.__dict__) |
| frontmatter = extract_frontmatter(content) |
| print(f"Loaded module: {module.__name__}") |
|
|
| |
| if hasattr(module, "Pipe"): |
| return module.Pipe(), "pipe", frontmatter |
| elif hasattr(module, "Filter"): |
| return module.Filter(), "filter", frontmatter |
| elif hasattr(module, "Action"): |
| return module.Action(), "action", frontmatter |
| else: |
| raise Exception("No Function class found in the module") |
| except Exception as e: |
| print(f"Error loading module: {function_id}: {e}") |
| del sys.modules[module_name] |
|
|
| Functions.update_function_by_id(function_id, {"is_active": False}) |
| raise e |
| finally: |
| os.unlink(temp_file.name) |
|
|
|
|
| def install_frontmatter_requirements(requirements): |
| if requirements: |
| req_list = [req.strip() for req in requirements.split(",")] |
| for req in req_list: |
| print(f"Installing requirement: {req}") |
| subprocess.check_call([sys.executable, "-m", "pip", "install", req]) |
| else: |
| print("No requirements found in frontmatter.") |
|
|