| |
|
| | from typing import Dict, Any |
| | from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow |
| | from aiflows.utils.general_helpers import encode_image,encode_from_buffer |
| | import cv2 |
| |
|
| |
|
| | class VisionAtomicFlow(ChatAtomicFlow): |
| | """ This class implements the atomic flow for the VisionFlowModule. It is a flow that, given a textual input, and a set of images and/or videos, generates a textual output. |
| | It uses the litellm library as a backend. See https://docs.litellm.ai/docs/providers for supported models and APIs. |
| | |
| | *Configuration Parameters*: |
| | |
| | - `name` (str): The name of the flow. Default: "VisionAtomicFlow" |
| | - `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
| | Default: "A flow that, given a textual input, and a set of images and/or videos, generates a textual output." |
| | - enable_cache (bool): If True, the flow will use the cache. Default: True |
| | - `n_api_retries` (int): The number of times to retry the API call in case of failure. Default: 6 |
| | - `wait_time_between_api_retries` (int): The time to wait between API retries in seconds. Default: 20 |
| | - `system_name` (str): The name of the system. Default: "system" |
| | - `user_name` (str): The name of the user. Default: "user" |
| | - `assistant_name` (str): The name of the assistant. Default: "assistant" |
| | - `backend` (Dict[str, Any]): The configuration of the backend which is used to fetch api keys. Default: LiteLLMBackend with the |
| | default parameters of ChatAtomicFlow (see Flow card of ChatAtomicFlowModule). Except for the following parameters |
| | whose default value is overwritten: |
| | - `api_infos` (List[Dict[str, Any]]): The list of api infos. Default: No default value, this parameter is required. |
| | - `model_name` (Union[Dict[str,str],str]): The name of the model to use. |
| | When using multiple API providers, the model_name can be a dictionary of the form |
| | {"provider_name": "model_name"}. |
| | Default: "gpt-4-vision-preview" (the name needs to follow the name of the model in litellm https://docs.litellm.ai/docs/providers). |
| | - `n` (int) : The number of answers to generate. Default: 1 |
| | - `max_tokens` (int): The maximum number of tokens to generate. Default: 2000 |
| | - `temperature` (float): The temperature to use. Default: 0.3 |
| | - `top_p` (float): An alternative to sampling with temperature. It instructs the model to consider the results of |
| | the tokens with top_p probability. Default: 0.2 |
| | - `frequency_penalty` (float): The higher this value, the more likely the model will repeat itself. Default: 0.0 |
| | - `presence_penalty` (float): The higher this value, the less likely the model will talk about a new topic. Default: 0.0 |
| | - `system_message_prompt_template` (Dict[str,Any]): The template of the system message. It is used to generate the system message. |
| | By default its of type aiflows.prompt_template.JinjaPrompt. |
| | None of the parameters of the prompt are defined by default and therefore need to be defined if one wants to use the system prompt. |
| | Default parameters are defined in aiflows.prompt_template.jinja2_prompts.JinjaPrompt. |
| | - `init_human_message_prompt_template` (Dict[str,Any]): The prompt template of the human/user message used to initialize the conversation |
| | (first time in). It is used to generate the human message. It's passed as the user message to the LLM. |
| | By default its of type aiflows.prompt_template.JinjaPrompt. None of the parameters of the prompt are defined by default and therefore need to be defined if one |
| | wants to use the init_human_message_prompt_template. Default parameters are defined in aiflows.prompt_template.jinja2_prompts.JinjaPrompt. |
| | - `previous_messages` (Dict[str,Any]): Defines which previous messages to include in the input of the LLM. Note that if `first_k`and `last_k` are both none, |
| | all the messages of the flows's history are added to the input of the LLM. Default: |
| | - `first_k` (int): If defined, adds the first_k earliest messages of the flow's chat history to the input of the LLM. Default: None |
| | - `last_k` (int): If defined, adds the last_k latest messages of the flow's chat history to the input of the LLM. Default: None |
| | - Other parameters are inherited from the default configuration of ChatAtomicFlow (see Flow card of ChatAtomicFlowModule). |
| | |
| | *Input Interface Initialized (Expected input the first time in flow)*: |
| | |
| | - `query` (str): The textual query to run the model on. |
| | - `data` (Dict[str, Any]): The data (images or video) to run the model on. It can contain the following keys: |
| | - `images` (List[Dict[str, Any]]): A list of images to run the model on. Each image is a dictionary that contains the following keys: |
| | - `type` (str): The type of the image. It can be "local_path" or "url". |
| | - `image` (str): The image. If type is "local_path", it is a local path to the image. If type is "url", it is a url to the image. |
| | - `video` (Dict[str, Any]): A video to run the model on. It is a dictionary that contains the following keys: |
| | - `video_path` (str): The path to the video. |
| | - `resize` (int): The resize we want to apply on the frames of the video. |
| | - `frame_step_size` (int): The step size between the frames of the video (to send to the model). |
| | - `start_frame` (int): The start frame of the video (to send to the model). |
| | - `end_frame` (int): The last frame of the video (to send to the model). |
| | |
| | *Input Interface (Expected input the after the first time in flow)*: |
| | |
| | - `query` (str): The textual query to run the model on. |
| | - `data` (Dict[str, Any]): The data (images or video) to run the model on. It can contain the following keys: |
| | - `images` (List[Dict[str, Any]]): A list of images to run the model on. Each image is a dictionary that contains the following keys: |
| | - `type` (str): The type of the image. It can be "local_path" or "url". |
| | - `image` (str): The image. If type is "local_path", it is a local path to the image. If type is "url", it is a url to the image. |
| | - `video` (Dict[str, Any]): A video to run the model on. It is a dictionary that contains the following keys: |
| | - `video_path` (str): The path to the video. |
| | - `resize` (int): The resize we want to apply on the frames of the video. |
| | - `frame_step_size` (int): The step size between the frames of the video (to send to the model). |
| | - `start_frame` (int): The start frame of the video (to send to the model). |
| | - `end_frame` (int): The last frame of the video (to send to the model). |
| | |
| | *Output Interface*: |
| | |
| | - `api_output`s (str): The api output of the flow to the query and data |
| | |
| | """ |
| | @staticmethod |
| | def get_image(image): |
| | """ This method returns an image in the appropriate format for API. |
| | |
| | :param image: The image dictionary. |
| | :type image: Dict[str, Any] |
| | :return: The image url. |
| | :rtype: Dict[str, Any] |
| | """ |
| | extension_dict = { |
| | "jpg": "jpeg", |
| | "jpeg": "jpeg", |
| | "png": "png", |
| | "webp": "webp", |
| | "gif": "gif" |
| | } |
| | supported_image_types = ["local_path","url"] |
| | assert image.get("type",None) in supported_image_types, f"Must define a valid image type for every image \n your type: {image.get('type',None)} \n supported types{supported_image_types} " |
| | |
| | processed_image = None |
| | url = None |
| | if image["type"] == "local_path": |
| | processed_image = encode_image(image.get("image")) |
| | image_extension_type = image.get("image").split(".")[-1] |
| | url = f"data:image/{extension_dict[image_extension_type]};base64, {processed_image}" |
| | |
| | elif image["type"] == "url": |
| | processed_image = image |
| | url = image.get("image") |
| | |
| | return {"type": "image_url", "image_url": {"url": url}} |
| | |
| | @staticmethod |
| | def get_video(video): |
| | """ This method returns the video in the appropriate format for API. |
| | |
| | :param video: The video dictionary. |
| | :type video: Dict[str, Any] |
| | :return: The video url. |
| | :rtype: Dict[str, Any] |
| | """ |
| | video_path = video["video_path"] |
| | resize = video.get("resize",768) |
| | frame_step_size = video.get("frame_step_size",10) |
| | start_frame = video.get("start_frame",0) |
| | end_frame = video.get("end_frame",None) |
| | base64Frames = [] |
| | video = cv2.VideoCapture(video_path) |
| | while video.isOpened(): |
| | success,frame = video.read() |
| | if not success: |
| | break |
| | _,buffer = cv2.imencode(".jpg",frame) |
| | base64Frames.append(encode_from_buffer(buffer)) |
| | video.release() |
| | return map(lambda x: {"image": x, "resize": resize},base64Frames[start_frame:end_frame:frame_step_size]) |
| |
|
| | @staticmethod |
| | def get_user_message(prompt_template, input_data: Dict[str, Any]): |
| | """ This method constructs the user message to be passed to the API. |
| | |
| | :param prompt_template: The prompt template to use. |
| | :type prompt_template: PromptTemplate |
| | :param input_data: The input data. |
| | :type input_data: Dict[str, Any] |
| | :return: The constructed user message (images , videos and text). |
| | :rtype: Dict[str, Any] |
| | """ |
| | content = VisionAtomicFlow._get_message(prompt_template=prompt_template,input_data=input_data) |
| | media_data = input_data["data"] |
| | if "video" in media_data: |
| | content = [ content[0], *VisionAtomicFlow.get_video(media_data["video"])] |
| | if "images" in media_data: |
| | images = [VisionAtomicFlow.get_image(image) for image in media_data["images"]] |
| | content.extend(images) |
| | return content |
| | |
| | @staticmethod |
| | def _get_message(prompt_template, input_data: Dict[str, Any]): |
| | """ This method constructs the textual message to be passed to the API. |
| | |
| | :param prompt_template: The prompt template to use. |
| | :type prompt_template: PromptTemplate |
| | :param input_data: The input data. |
| | :type input_data: Dict[str, Any] |
| | :return: The constructed textual message. |
| | :rtype: Dict[str, Any] |
| | """ |
| | template_kwargs = {} |
| | for input_variable in prompt_template.input_variables: |
| | template_kwargs[input_variable] = input_data[input_variable] |
| | msg_content = prompt_template.format(**template_kwargs) |
| | return [{"type": "text", "text": msg_content}] |
| |
|
| | def _process_input(self, input_data: Dict[str, Any]): |
| | """ This method processes the input data (prepares the messages to send to the API). |
| | |
| | :param input_data: The input data. |
| | :type input_data: Dict[str, Any] |
| | :return: The processed input data. |
| | :rtype: Dict[str, Any] |
| | """ |
| | if self._is_conversation_initialized(): |
| | |
| | user_message_content = self.get_user_message(self.human_message_prompt_template, input_data) |
| |
|
| | else: |
| | |
| | self._initialize_conversation(input_data) |
| | if getattr(self, "init_human_message_prompt_template", None) is not None: |
| | |
| | user_message_content = self.get_user_message(self.init_human_message_prompt_template, input_data) |
| | else: |
| | user_message_content = self.get_user_message(self.human_message_prompt_template, input_data) |
| |
|
| | self._state_update_add_chat_message(role=self.flow_config["user_name"], |
| | content=user_message_content) |