| import inspect |
| import logging |
| from typing import Awaitable, Callable, get_type_hints |
|
|
| from open_webui.apps.webui.models.tools import Tools |
| from open_webui.apps.webui.models.users import UserModel |
| from open_webui.apps.webui.utils import load_toolkit_module_by_id |
| from open_webui.utils.schemas import json_schema_to_model |
|
|
| log = logging.getLogger(__name__) |
|
|
|
|
| def apply_extra_params_to_tool_function( |
| function: Callable, extra_params: dict |
| ) -> Callable[..., Awaitable]: |
| sig = inspect.signature(function) |
| extra_params = { |
| key: value for key, value in extra_params.items() if key in sig.parameters |
| } |
| is_coroutine = inspect.iscoroutinefunction(function) |
|
|
| async def new_function(**kwargs): |
| extra_kwargs = kwargs | extra_params |
| if is_coroutine: |
| return await function(**extra_kwargs) |
| return function(**extra_kwargs) |
|
|
| return new_function |
|
|
|
|
| |
| def get_tools( |
| webui_app, tool_ids: list[str], user: UserModel, extra_params: dict |
| ) -> dict[str, dict]: |
| tools = {} |
| for tool_id in tool_ids: |
| toolkit = Tools.get_tool_by_id(tool_id) |
| if toolkit is None: |
| continue |
|
|
| module = webui_app.state.TOOLS.get(tool_id, None) |
| if module is None: |
| module, _ = load_toolkit_module_by_id(tool_id) |
| webui_app.state.TOOLS[tool_id] = module |
|
|
| extra_params["__id__"] = tool_id |
| if hasattr(module, "valves") and hasattr(module, "Valves"): |
| valves = Tools.get_tool_valves_by_id(tool_id) or {} |
| module.valves = module.Valves(**valves) |
|
|
| if hasattr(module, "UserValves"): |
| extra_params["__user__"]["valves"] = module.UserValves( |
| **Tools.get_user_valves_by_id_and_user_id(tool_id, user.id) |
| ) |
|
|
| for spec in toolkit.specs: |
| |
| for val in spec.get("parameters", {}).get("properties", {}).values(): |
| if val["type"] == "str": |
| val["type"] = "string" |
| function_name = spec["name"] |
|
|
| |
| original_func = getattr(module, function_name) |
| callable = apply_extra_params_to_tool_function(original_func, extra_params) |
| if hasattr(original_func, "__doc__"): |
| callable.__doc__ = original_func.__doc__ |
|
|
| |
| tool_dict = { |
| "toolkit_id": tool_id, |
| "callable": callable, |
| "spec": spec, |
| "pydantic_model": json_schema_to_model(spec), |
| "file_handler": hasattr(module, "file_handler") and module.file_handler, |
| "citation": hasattr(module, "citation") and module.citation, |
| } |
|
|
| |
| if function_name in tools: |
| log.warning(f"Tool {function_name} already exists in another toolkit!") |
| log.warning(f"Collision between {toolkit} and {tool_id}.") |
| log.warning(f"Discarding {toolkit}.{function_name}") |
| else: |
| tools[function_name] = tool_dict |
| return tools |
|
|
|
|
| def doc_to_dict(docstring): |
| lines = docstring.split("\n") |
| description = lines[1].strip() |
| param_dict = {} |
|
|
| for line in lines: |
| if ":param" in line: |
| line = line.replace(":param", "").strip() |
| param, desc = line.split(":", 1) |
| param_dict[param.strip()] = desc.strip() |
| ret_dict = {"description": description, "params": param_dict} |
| return ret_dict |
|
|
|
|
| def get_tools_specs(tools) -> list[dict]: |
| function_list = [ |
| {"name": func, "function": getattr(tools, func)} |
| for func in dir(tools) |
| if callable(getattr(tools, func)) |
| and not func.startswith("__") |
| and not inspect.isclass(getattr(tools, func)) |
| ] |
|
|
| specs = [] |
| for function_item in function_list: |
| function_name = function_item["name"] |
| function = function_item["function"] |
|
|
| function_doc = doc_to_dict(function.__doc__ or function_name) |
| specs.append( |
| { |
| "name": function_name, |
| |
| "description": function_doc.get("description", function_name), |
| "parameters": { |
| "type": "object", |
| "properties": { |
| param_name: { |
| "type": param_annotation.__name__.lower(), |
| **( |
| { |
| "enum": ( |
| str(param_annotation.__args__) |
| if hasattr(param_annotation, "__args__") |
| else None |
| ) |
| } |
| if hasattr(param_annotation, "__args__") |
| else {} |
| ), |
| "description": function_doc.get("params", {}).get( |
| param_name, param_name |
| ), |
| } |
| for param_name, param_annotation in get_type_hints( |
| function |
| ).items() |
| if param_name != "return" |
| and not ( |
| param_name.startswith("__") and param_name.endswith("__") |
| ) |
| }, |
| "required": [ |
| name |
| for name, param in inspect.signature( |
| function |
| ).parameters.items() |
| if param.default is param.empty |
| and not (name.startswith("__") and name.endswith("__")) |
| ], |
| }, |
| } |
| ) |
|
|
| return specs |
|
|