Spaces:
Running
Running
Pulastya B
feat: Add 4 major system improvements - semantic layer, error recovery, token budget, parallel execution
05a3c74
| """ | |
| Parallel Tool Execution with Dependency Detection | |
| Enables concurrent execution of independent tools while respecting | |
| dependencies and avoiding overwhelming system resources. | |
| """ | |
| import asyncio | |
| from typing import Dict, List, Any, Set, Optional, Tuple, Callable | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import time | |
| class ToolWeight(Enum): | |
| """Tool execution weight (resource intensity).""" | |
| LIGHT = 1 # Fast operations (< 1s): profiling, validation | |
| MEDIUM = 2 # Moderate operations (1-10s): cleaning, encoding | |
| HEAVY = 3 # Expensive operations (> 10s): ML training, large viz | |
| # Tool weight classification | |
| TOOL_WEIGHTS = { | |
| # Light tools (can run many in parallel) | |
| "profile_dataset": ToolWeight.LIGHT, | |
| "detect_data_quality_issues": ToolWeight.LIGHT, | |
| "analyze_correlations": ToolWeight.LIGHT, | |
| "get_smart_summary": ToolWeight.LIGHT, | |
| "smart_type_inference": ToolWeight.LIGHT, | |
| # Medium tools (limit 2-3 concurrent) | |
| "clean_missing_values": ToolWeight.MEDIUM, | |
| "handle_outliers": ToolWeight.MEDIUM, | |
| "encode_categorical": ToolWeight.MEDIUM, | |
| "create_time_features": ToolWeight.MEDIUM, | |
| "create_interaction_features": ToolWeight.MEDIUM, | |
| "create_ratio_features": ToolWeight.MEDIUM, | |
| "create_statistical_features": ToolWeight.MEDIUM, | |
| "generate_interactive_scatter": ToolWeight.MEDIUM, | |
| "generate_interactive_histogram": ToolWeight.MEDIUM, | |
| "generate_interactive_box_plots": ToolWeight.MEDIUM, | |
| "generate_interactive_correlation_heatmap": ToolWeight.MEDIUM, | |
| # Heavy tools (limit 1 concurrent) - NEVER RUN MULTIPLE HEAVY TOOLS IN PARALLEL | |
| "train_baseline_models": ToolWeight.HEAVY, | |
| "hyperparameter_tuning": ToolWeight.HEAVY, | |
| "perform_cross_validation": ToolWeight.HEAVY, | |
| "train_ensemble_models": ToolWeight.HEAVY, | |
| "auto_ml_pipeline": ToolWeight.HEAVY, | |
| "generate_ydata_profiling_report": ToolWeight.HEAVY, | |
| "generate_combined_eda_report": ToolWeight.HEAVY, | |
| "generate_plotly_dashboard": ToolWeight.HEAVY, | |
| "execute_python_code": ToolWeight.HEAVY, # Unknown code complexity | |
| "auto_feature_engineering": ToolWeight.HEAVY, # ML-based feature generation | |
| } | |
| class ToolExecution: | |
| """Represents a tool execution task.""" | |
| tool_name: str | |
| arguments: Dict[str, Any] | |
| weight: ToolWeight | |
| dependencies: Set[str] # Other tool names that must complete first | |
| execution_id: str | |
| def __hash__(self): | |
| return hash(self.execution_id) | |
| class ToolDependencyGraph: | |
| """ | |
| Analyzes tool dependencies based on input/output files. | |
| Detects dependencies like: | |
| - clean_missing_values → encode_categorical (same file transformation) | |
| - profile_dataset → train_baseline_models (uses profiling results) | |
| - Multiple visualizations (can run in parallel) | |
| """ | |
| def __init__(self): | |
| self.graph: Dict[str, Set[str]] = {} | |
| def detect_dependencies(self, executions: List[ToolExecution]) -> Dict[str, Set[str]]: | |
| """ | |
| Detect dependencies between tool executions. | |
| Rules: | |
| 1. If tool B reads output of tool A → B depends on A | |
| 2. If tools read/write same file → sequential execution | |
| 3. If tools are independent (different files/ops) → parallel | |
| Args: | |
| executions: List of tool executions | |
| Returns: | |
| Dict mapping execution_id → set of execution_ids it depends on | |
| """ | |
| dependencies: Dict[str, Set[str]] = {ex.execution_id: set() for ex in executions} | |
| # Build file I/O map | |
| file_producers: Dict[str, str] = {} # file_path → execution_id | |
| file_consumers: Dict[str, List[str]] = {} # file_path → [execution_ids] | |
| for ex in executions: | |
| # Check input files | |
| input_file = ex.arguments.get("file_path") | |
| if input_file: | |
| if input_file not in file_consumers: | |
| file_consumers[input_file] = [] | |
| file_consumers[input_file].append(ex.execution_id) | |
| # Check output files | |
| output_file = ex.arguments.get("output_path") or ex.arguments.get("output_file") | |
| if output_file: | |
| file_producers[output_file] = ex.execution_id | |
| # Detect dependencies: consumers depend on producers | |
| for output_file, producer_id in file_producers.items(): | |
| if output_file in file_consumers: | |
| for consumer_id in file_consumers[output_file]: | |
| if consumer_id != producer_id: | |
| dependencies[consumer_id].add(producer_id) | |
| # Special rule: training tools depend on profiling/cleaning if they exist | |
| training_tools = ["train_baseline_models", "hyperparameter_tuning", "train_ensemble_models"] | |
| prep_tools = ["profile_dataset", "clean_missing_values", "encode_categorical"] | |
| training_execs = [ex for ex in executions if ex.tool_name in training_tools] | |
| prep_execs = [ex for ex in executions if ex.tool_name in prep_tools] | |
| for train_ex in training_execs: | |
| for prep_ex in prep_execs: | |
| # Same file? Training depends on prep | |
| if train_ex.arguments.get("file_path") == prep_ex.arguments.get("file_path"): | |
| dependencies[train_ex.execution_id].add(prep_ex.execution_id) | |
| return dependencies | |
| def get_execution_batches(self, executions: List[ToolExecution]) -> List[List[ToolExecution]]: | |
| """ | |
| Group executions into batches that can run in parallel. | |
| Returns: | |
| List of batches, where each batch contains independent tools | |
| """ | |
| dependencies = self.detect_dependencies(executions) | |
| # Topological sort to get execution order | |
| batches: List[List[ToolExecution]] = [] | |
| completed: Set[str] = set() | |
| remaining = {ex.execution_id: ex for ex in executions} | |
| while remaining: | |
| # Find all tools with satisfied dependencies | |
| ready = [] | |
| for exec_id, ex in remaining.items(): | |
| deps = dependencies[exec_id] | |
| if deps.issubset(completed): | |
| ready.append(ex) | |
| if not ready: | |
| # Circular dependency or error - add remaining as single batch | |
| print("⚠️ Warning: Possible circular dependency detected") | |
| batches.append(list(remaining.values())) | |
| break | |
| # Add ready tools as a batch | |
| batches.append(ready) | |
| # Mark as completed | |
| for ex in ready: | |
| completed.add(ex.execution_id) | |
| del remaining[ex.execution_id] | |
| return batches | |
| class ParallelToolExecutor: | |
| """ | |
| Executes tools in parallel while respecting dependencies and resource limits. | |
| Features: | |
| - Automatic dependency detection | |
| - Weight-based resource management (limit heavy tools) | |
| - Progress reporting for parallel executions | |
| - Error isolation (one tool failure doesn't crash others) | |
| """ | |
| def __init__(self, max_heavy_concurrent: int = 1, max_medium_concurrent: int = 2, | |
| max_light_concurrent: int = 5): | |
| """ | |
| Initialize parallel executor. | |
| Args: | |
| max_heavy_concurrent: Max heavy tools running simultaneously | |
| max_medium_concurrent: Max medium tools running simultaneously | |
| max_light_concurrent: Max light tools running simultaneously | |
| """ | |
| self.max_heavy = max_heavy_concurrent | |
| self.max_medium = max_medium_concurrent | |
| self.max_light = max_light_concurrent | |
| # Semaphores for resource control | |
| self.heavy_semaphore = asyncio.Semaphore(max_heavy_concurrent) | |
| self.medium_semaphore = asyncio.Semaphore(max_medium_concurrent) | |
| self.light_semaphore = asyncio.Semaphore(max_light_concurrent) | |
| self.dependency_graph = ToolDependencyGraph() | |
| print(f"⚡ Parallel Executor initialized:") | |
| print(f" Heavy tools: {max_heavy_concurrent} concurrent") | |
| print(f" Medium tools: {max_medium_concurrent} concurrent") | |
| print(f" Light tools: {max_light_concurrent} concurrent") | |
| def _get_semaphore(self, weight: ToolWeight) -> asyncio.Semaphore: | |
| """Get appropriate semaphore for tool weight.""" | |
| if weight == ToolWeight.HEAVY: | |
| return self.heavy_semaphore | |
| elif weight == ToolWeight.MEDIUM: | |
| return self.medium_semaphore | |
| else: | |
| return self.light_semaphore | |
| async def _execute_single(self, execution: ToolExecution, | |
| execute_func: Callable, | |
| progress_callback: Optional[Callable] = None) -> Dict[str, Any]: | |
| """ | |
| Execute a single tool with resource management. | |
| Args: | |
| execution: Tool execution details | |
| execute_func: Function to execute tool (sync) | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| Execution result | |
| """ | |
| semaphore = self._get_semaphore(execution.weight) | |
| async with semaphore: | |
| if progress_callback: | |
| await progress_callback(f"⚡ Executing {execution.tool_name}", "start") | |
| start_time = time.time() | |
| try: | |
| # Run sync function in executor to avoid blocking | |
| loop = asyncio.get_event_loop() | |
| result = await loop.run_in_executor( | |
| None, | |
| execute_func, | |
| execution.tool_name, | |
| execution.arguments | |
| ) | |
| duration = time.time() - start_time | |
| if progress_callback: | |
| await progress_callback( | |
| f"✅ {execution.tool_name} completed ({duration:.1f}s)", | |
| "complete" | |
| ) | |
| return { | |
| "execution_id": execution.execution_id, | |
| "tool_name": execution.tool_name, | |
| "success": True, | |
| "result": result, | |
| "duration": duration | |
| } | |
| except Exception as e: | |
| duration = time.time() - start_time | |
| if progress_callback: | |
| await progress_callback( | |
| f"❌ {execution.tool_name} failed: {str(e)[:100]}", | |
| "error" | |
| ) | |
| return { | |
| "execution_id": execution.execution_id, | |
| "tool_name": execution.tool_name, | |
| "success": False, | |
| "error": str(e), | |
| "duration": duration | |
| } | |
| async def execute_batch(self, batch: List[ToolExecution], | |
| execute_func: Callable, | |
| progress_callback: Optional[Callable] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Execute a batch of independent tools in parallel. | |
| Args: | |
| batch: List of tool executions (no dependencies between them) | |
| execute_func: Sync function to execute tools | |
| progress_callback: Optional progress callback | |
| Returns: | |
| List of execution results | |
| """ | |
| print(f"⚡ Parallel batch: {len(batch)} tools") | |
| for ex in batch: | |
| print(f" - {ex.tool_name} ({ex.weight.name})") | |
| # Execute all in parallel | |
| tasks = [ | |
| self._execute_single(ex, execute_func, progress_callback) | |
| for ex in batch | |
| ] | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| # Handle exceptions | |
| processed_results = [] | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| processed_results.append({ | |
| "execution_id": batch[i].execution_id, | |
| "tool_name": batch[i].tool_name, | |
| "success": False, | |
| "error": str(result) | |
| }) | |
| else: | |
| processed_results.append(result) | |
| return processed_results | |
| async def execute_all(self, executions: List[ToolExecution], | |
| execute_func: Callable, | |
| progress_callback: Optional[Callable] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Execute all tools with automatic dependency resolution and parallelization. | |
| Args: | |
| executions: List of all tool executions | |
| execute_func: Sync function to execute tools | |
| progress_callback: Optional progress callback | |
| Returns: | |
| List of all execution results in order | |
| """ | |
| if not executions: | |
| return [] | |
| # Get execution batches (respecting dependencies) | |
| batches = self.dependency_graph.get_execution_batches(executions) | |
| print(f"⚡ Execution plan: {len(batches)} batches for {len(executions)} tools") | |
| all_results = [] | |
| for i, batch in enumerate(batches): | |
| print(f"\n📦 Batch {i+1}/{len(batches)}") | |
| batch_results = await self.execute_batch(batch, execute_func, progress_callback) | |
| all_results.extend(batch_results) | |
| return all_results | |
| def classify_tools(self, tool_calls: List[Dict[str, Any]]) -> List[ToolExecution]: | |
| """ | |
| Convert tool calls to ToolExecution objects with weights. | |
| Args: | |
| tool_calls: List of tool calls from LLM | |
| Returns: | |
| List of ToolExecution objects | |
| """ | |
| executions = [] | |
| for i, call in enumerate(tool_calls): | |
| tool_name = call.get("name") or call.get("tool_name") | |
| arguments = call.get("arguments", {}) | |
| # Get weight | |
| weight = TOOL_WEIGHTS.get(tool_name, ToolWeight.MEDIUM) | |
| execution = ToolExecution( | |
| tool_name=tool_name, | |
| arguments=arguments, | |
| weight=weight, | |
| dependencies=set(), # Will be computed by dependency graph | |
| execution_id=f"{tool_name}_{i}" | |
| ) | |
| executions.append(execution) | |
| return executions | |
| # Global parallel executor | |
| _parallel_executor = None | |
| def get_parallel_executor() -> ParallelToolExecutor: | |
| """Get or create global parallel executor.""" | |
| global _parallel_executor | |
| if _parallel_executor is None: | |
| _parallel_executor = ParallelToolExecutor() | |
| return _parallel_executor | |