| import torch |
| from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer, AutoConfig |
| from typing import Union, List |
| from pathlib import Path |
| from typing import Union, List |
| import dotenv |
| import os |
| import sys |
| sys.path.insert(0,"./") |
| from src.utils import full_path |
| from tqdm import tqdm |
|
|
|
|
| dotenv.load_dotenv(os.getenv("./models/.env")) |
| hf = os.getenv("huggingface_token") |
|
|
| def check_model_in_cache(model_name: str): |
| if model_name in ["LLaMA3","llama3"]: |
| return str(full_path("/data/shared/llama3-8b/Meta-Llama-3-8B_shard_size_1GB")) |
| |
| if model_name in ["Mistral","mistral"]: |
| return str(full_path("/data/shared/mistral-7b-v03/Mistral-7B-v0.3_shard_size_1GB")) |
| |
| if model_name in ["olmo","OLMo"]: |
| return str(full_path("/data/shared/olmo/OLMo-7B_shard_size_2GB")) |
|
|
| raise ValueError(f"Model '{model_name}' not found in local cache.") |
|
|
| def mean_pooling(model_output, attention_mask): |
| """ |
| mean_pooling _summary_ |
| |
| Args: |
| model_output (_type_): _description_ |
| attention_mask (_type_): _description_ |
| |
| Returns: |
| _type_: _description_ |
| """ |
| token_embeddings = model_output |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() |
| return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) |
|
|
| class LLMEmbeddings: |
| def __init__(self, model_name: str, device: torch.device = None): |
| """ |
| Initializes any Hugging Face LLM. |
| |
| Args: |
| model_dir (str): Path or Hugging Face repo ID for the model. |
| device (torch.device): Device to load the model on (CPU/GPU). |
| """ |
| self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| |
| |
| try: |
| model_dir = check_model_in_cache(model_name) |
| except: |
| model_dir = model_name |
|
|
| |
| self.tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True) |
| |
| |
| config = AutoConfig.from_pretrained(model_dir, trust_remote_code=True) |
| self.model_type = config.architectures[0] if config.architectures else "" |
|
|
| |
| if "CausalLM" in self.model_type: |
| self.model = AutoModelForCausalLM.from_pretrained( |
| model_dir, trust_remote_code=True, torch_dtype=torch.float16 |
| ).to(self.device) |
| else: |
| self.model = AutoModel.from_pretrained( |
| model_dir, trust_remote_code=True, torch_dtype=torch.float16 |
| ).to(self.device) |
|
|
| |
| if self.tokenizer.pad_token is None: |
| self.tokenizer.pad_token = self.tokenizer.eos_token |
| |
| self.model.eval() |
|
|
| def encode(self, text: Union[str, List[str]]): |
| """Encodes input sentences into embeddings.""" |
| inputs = self.tokenizer( |
| text, return_tensors="pt", padding=True, truncation=True, max_length=1024, return_token_type_ids=False |
| ).to(self.device) |
|
|
| with torch.no_grad(): |
| outputs = self.model(**inputs, output_hidden_states=True, use_cache=False) |
| |
| embeddings = mean_pooling(outputs.hidden_states[-1], inputs["attention_mask"]).squeeze() |
| return embeddings |
| |
| def encode_batch(self, text: Union[str, List[str]], batch_size: int = 32): |
| """Encodes input sentences into embeddings using batching.""" |
| |
| if isinstance(text, str): |
| text = [text] |
|
|
| embeddings_list = [] |
| |
| for i in tqdm(range(0, len(text), batch_size), desc="Processing Batches"): |
| batch_text = text[i:i+batch_size] |
| inputs = self.tokenizer( |
| batch_text, |
| return_tensors="pt", |
| padding=True, |
| truncation=True, |
| max_length=1024, |
| return_token_type_ids=False |
| ).to(self.device) |
|
|
| with torch.no_grad(): |
| outputs = self.model(**inputs, output_hidden_states=True, use_cache=False) |
|
|
| batch_embeddings = mean_pooling(outputs.hidden_states[-1], inputs["attention_mask"]).squeeze() |
| embeddings_list.append(batch_embeddings) |
|
|
| |
| embeddings = torch.cat(embeddings_list, dim=0) |
| return embeddings |
|
|
| |
|
|
| if __name__ == "__main__": |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
| |
| |
| llm = LLMEmbeddings(model_name="llama3", device=device) |
|
|
| |
| embedding = llm.encode("Hugging Face models are powerful!") |
| print(embedding.shape) |
| print("Done!!") |
|
|