Text Generation
Transformers
English
agents
offline-first
edge-computing
context-aware
global-south
low-resource-nlp
Instructions to use tflux2011/contextual-engineering-framework with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use tflux2011/contextual-engineering-framework with Transformers:
# Use a pipeline as a high-level helper from transformers import pipeline pipe = pipeline("text-generation", model="tflux2011/contextual-engineering-framework")# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("tflux2011/contextual-engineering-framework", dtype="auto") - Notebooks
- Google Colab
- Kaggle
- Local Apps
- vLLM
How to use tflux2011/contextual-engineering-framework with vLLM:
Install from pip and serve model
# Install vLLM from pip: pip install vllm # Start the vLLM server: vllm serve "tflux2011/contextual-engineering-framework" # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:8000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "tflux2011/contextual-engineering-framework", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker
docker model run hf.co/tflux2011/contextual-engineering-framework
- SGLang
How to use tflux2011/contextual-engineering-framework with SGLang:
Install from pip and serve model
# Install SGLang from pip: pip install sglang # Start the SGLang server: python3 -m sglang.launch_server \ --model-path "tflux2011/contextual-engineering-framework" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "tflux2011/contextual-engineering-framework", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }'Use Docker images
docker run --gpus all \ --shm-size 32g \ -p 30000:30000 \ -v ~/.cache/huggingface:/root/.cache/huggingface \ --env "HF_TOKEN=<secret>" \ --ipc=host \ lmsysorg/sglang:latest \ python3 -m sglang.launch_server \ --model-path "tflux2011/contextual-engineering-framework" \ --host 0.0.0.0 \ --port 30000 # Call the server using curl (OpenAI-compatible API): curl -X POST "http://localhost:30000/v1/completions" \ -H "Content-Type: application/json" \ --data '{ "model": "tflux2011/contextual-engineering-framework", "prompt": "Once upon a time,", "max_tokens": 512, "temperature": 0.5 }' - Docker Model Runner
How to use tflux2011/contextual-engineering-framework with Docker Model Runner:
docker model run hf.co/tflux2011/contextual-engineering-framework
| # ------------------------------------------------------------------------------ | |
| # Contextual Engineering Patterns | |
| # Copyright (c) 2025 Tobi Lekan Adeosun | |
| # Licensed under the MIT License. | |
| # | |
| # From Chapter 3: "The Disconnected Agent" | |
| # ------------------------------------------------------------------------------ | |
| import uuid | |
| import time | |
| import json | |
| import hashlib | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Dict, Optional | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration Constants | |
| DEFAULT_TTL_HOURS = 24 | |
| DEFAULT_MIN_BATTERY_LEVEL = 15 | |
| SECONDS_PER_HOUR = 3600 | |
| class ActionStatus(Enum): | |
| PENDING = "PENDING" | |
| SYNCED = "SYNCED" | |
| FAILED = "FAILED" | |
| EXPIRED = "EXPIRED" | |
| def generate_hash(data: str) -> str: | |
| """Helper to create a deterministic hash for idempotency.""" | |
| return hashlib.sha256(data.encode('utf-8')).hexdigest() | |
| class OfflineAction: | |
| """Represents an action to be synced when connectivity is restored.""" | |
| action_type: str | |
| payload: Dict[str, Any] | |
| priority: int = 1 | |
| ttl_hours: int = DEFAULT_TTL_HOURS | |
| min_battery_level: int = DEFAULT_MIN_BATTERY_LEVEL | |
| # Auto-generated fields | |
| id: str = field(default_factory=lambda: str(uuid.uuid4())) | |
| timestamp: float = field(default_factory=time.time) | |
| status: ActionStatus = field(default=ActionStatus.PENDING) | |
| idempotency_key: str = field(default="", init=False) | |
| def __post_init__(self): | |
| """Generate idempotency key after initialization.""" | |
| # CRITICAL: The Idempotency Key ensures that if the | |
| # network flutters and sends the request twice, | |
| # the server only executes it once. | |
| key_data = f"{json.dumps(self.payload, sort_keys=True)}:{self.timestamp}" | |
| self.idempotency_key = generate_hash(key_data) | |
| def expiry_timestamp(self) -> float: | |
| """Calculate the expiry timestamp based on TTL.""" | |
| return self.timestamp + (self.ttl_hours * SECONDS_PER_HOUR) | |
| def is_expired(self) -> bool: | |
| """Check if the action has exceeded its TTL.""" | |
| return time.time() > self.expiry_timestamp | |
| def can_sync(self, current_battery_level: int) -> bool: | |
| """Check if conditions allow syncing this action.""" | |
| if self.is_expired(): | |
| logger.warning(f"Action {self.id} has expired") | |
| return False | |
| if current_battery_level < self.min_battery_level: | |
| logger.warning( | |
| f"Battery too low ({current_battery_level}%) to sync action {self.id}" | |
| ) | |
| return False | |
| return True | |
| def mark_synced(self) -> None: | |
| """Mark the action as successfully synced.""" | |
| self.status = ActionStatus.SYNCED | |
| logger.info(f"Action {self.id} marked as SYNCED") | |
| def mark_failed(self, reason: Optional[str] = None) -> None: | |
| """Mark the action as failed.""" | |
| self.status = ActionStatus.FAILED | |
| logger.error(f"Action {self.id} marked as FAILED: {reason}") | |
| def mark_expired(self) -> None: | |
| """Mark the action as expired.""" | |
| self.status = ActionStatus.EXPIRED | |
| logger.warning(f"Action {self.id} marked as EXPIRED") | |
| def serialize(self) -> str: | |
| """Convert to JSON for storage in local SQLite.""" | |
| data = { | |
| "id": self.id, | |
| "action_type": self.action_type, | |
| "payload": self.payload, | |
| "priority": self.priority, | |
| "timestamp": self.timestamp, | |
| "status": self.status.value, | |
| "ttl_hours": self.ttl_hours, | |
| "expiry_timestamp": self.expiry_timestamp | |
| # Note: idempotency_key intentionally excluded from serialization | |
| # to prevent exposure; regenerate on deserialization | |
| } | |
| return json.dumps(data) | |
| def deserialize(cls, json_str: str) -> "OfflineAction": | |
| """Create an OfflineAction from JSON string.""" | |
| try: | |
| data = json.loads(json_str) | |
| action = cls( | |
| action_type=data["action_type"], | |
| payload=data["payload"], | |
| priority=data.get("priority", 1), | |
| ttl_hours=data.get("ttl_hours", DEFAULT_TTL_HOURS) | |
| ) | |
| action.id = data["id"] | |
| action.timestamp = data["timestamp"] | |
| action.status = ActionStatus(data.get("status", "PENDING")) | |
| return action | |
| except (json.JSONDecodeError, KeyError) as e: | |
| logger.exception("Failed to deserialize OfflineAction") | |
| raise ValueError(f"Invalid JSON data for OfflineAction: {str(e)}") | |