| import time |
| import asyncio |
| from typing import Any, Dict, Optional, Tuple, List |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class TTLCache: |
| """ |
| Async-safe TTL Cache for provider responses. |
| Features: |
| - Time-To-Live expiration |
| - Async get/set |
| - Invalidation |
| """ |
| def __init__(self, default_ttl: int = 60): |
| self._cache: Dict[str, Tuple[Any, float]] = {} |
| self._lock = asyncio.Lock() |
| self.default_ttl = default_ttl |
|
|
| async def get(self, key: str) -> Optional[Any]: |
| """Get value from cache if not expired""" |
| async with self._lock: |
| if key in self._cache: |
| value, expiry = self._cache[key] |
| if time.time() < expiry: |
| return value |
| else: |
| |
| del self._cache[key] |
| return None |
|
|
| async def set(self, key: str, value: Any, ttl: Optional[int] = None): |
| """Set value in cache with TTL""" |
| ttl_val = ttl if ttl is not None else self.default_ttl |
| expiry = time.time() + ttl_val |
| async with self._lock: |
| self._cache[key] = (value, expiry) |
|
|
| async def delete(self, key: str): |
| """Delete specific key""" |
| async with self._lock: |
| if key in self._cache: |
| del self._cache[key] |
|
|
| async def clear(self): |
| """Clear all cache""" |
| async with self._lock: |
| self._cache.clear() |
| |
| async def cleanup(self): |
| """Remove expired items""" |
| now = time.time() |
| keys_to_remove = [] |
| async with self._lock: |
| for key, (_, expiry) in self._cache.items(): |
| if now >= expiry: |
| keys_to_remove.append(key) |
| |
| for key in keys_to_remove: |
| del self._cache[key] |
| |
| def get_sync(self, key: str) -> Optional[Any]: |
| """Synchronous get for non-async contexts (use with caution regarding race conditions)""" |
| if key in self._cache: |
| value, expiry = self._cache[key] |
| if time.time() < expiry: |
| return value |
| else: |
| del self._cache[key] |
| return None |
|
|
| |
| ttl_cache = TTLCache(default_ttl=60) |
|
|