Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Compiler Pass Ordering RL Environment — Core Logic. | |
| Simulates the compiler optimization problem: given a program's Intermediate | |
| Representation (IR), select a sequence of optimization passes to minimize | |
| estimated runtime cost. This is a real problem solved by GCC/LLVM today using | |
| hand-tuned heuristics (-O2, -O3). RL finds better orderings. | |
| Why RL is necessary (not greedy): | |
| Passes interact via prerequisite gates. A pass applied without its prerequisites | |
| fires at 0.3x effectiveness. With all prerequisites met, it fires at 5–8x. | |
| A greedy agent never applies low-value enabler passes (alias_analysis = 0.01 | |
| base effect), so it never unlocks the high-value chains. RL learns to sacrifice | |
| early reward to unlock these chains — exactly the credit assignment problem RL | |
| is designed for. | |
| Three tasks: | |
| Task 1 (easy): One synergy chain. alias_analysis → vectorization. | |
| Grader threshold: >25% cost reduction. | |
| Task 2 (medium): Two independent chains. Agent must find both. | |
| Grader threshold: >35% cost reduction. | |
| Task 3 (hard): Full program. Multiple chains, suppressions, program diversity. | |
| Grader threshold: >42% cost reduction. | |
| """ | |
| import random | |
| from uuid import uuid4 | |
| import numpy as np | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import ( | |
| CompilerOptAction, CompilerOptObservation, | |
| PASS_NAMES, NUM_PASSES, MAX_STEPS, | |
| TASK_EASY, TASK_MEDIUM, TASK_HARD, | |
| ) | |
| except ImportError: | |
| from models import ( | |
| CompilerOptAction, CompilerOptObservation, | |
| PASS_NAMES, NUM_PASSES, MAX_STEPS, | |
| TASK_EASY, TASK_MEDIUM, TASK_HARD, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Pass base effects: fractional cost reduction on an "average" program. | |
| # Enabler passes (alias_analysis, interprocedural) have very low base effects — | |
| # their value comes entirely from unlocking other passes. | |
| # --------------------------------------------------------------------------- | |
| BASE_PASS_EFFECTS = np.array([ | |
| 0.08, # 0: dead_code_elimination | |
| 0.06, # 1: constant_folding | |
| 0.10, # 2: loop_unrolling | |
| 0.03, # 3: function_inlining (weak alone, needs interprocedural) | |
| 0.02, # 4: vectorization (very weak alone, needs alias+DCE) | |
| 0.06, # 5: loop_invariant_motion | |
| 0.05, # 6: strength_reduction | |
| 0.07, # 7: common_subexpr_elimination | |
| 0.04, # 8: tail_call_optimization | |
| 0.03, # 9: branch_prediction_hints | |
| 0.05, # 10: register_allocation | |
| 0.04, # 11: instruction_scheduling | |
| 0.06, # 12: memory_coalescing (weak alone, needs alias) | |
| 0.01, # 13: alias_analysis (sacrificial enabler — very low base) | |
| 0.01, # 14: interprocedural_analysis (sacrificial enabler — very low base) | |
| ]) | |
| # --------------------------------------------------------------------------- | |
| # Prerequisite gates: a pass fires at full synergy ONLY if ALL listed | |
| # prerequisite passes have already been applied. Otherwise 0.3x (suppressed). | |
| # | |
| # These are the "unlock chains" the RL agent must discover: | |
| # alias(13) + DCE(0) → vectorization(4): 7.0x [key chain 1] | |
| # interprocedural(14) → function_inlining(3): 5.0x [key chain 2] | |
| # alias(13) → memory_coalescing(12): 3.5x [secondary chain] | |
| # alias(13) + const_fold(1) → strength_red(6): 3.0x [secondary chain] | |
| # reg_alloc(10) → instr_scheduling(11): 2.5x | |
| # LIM(5) → loop_unrolling(2): 2.0x | |
| # --------------------------------------------------------------------------- | |
| PREREQ_GATES = { | |
| 4: ([13, 0], 7.0), # vectorization: alias_analysis + DCE required | |
| 3: ([14], 5.0), # function_inlining: interprocedural required | |
| 12: ([13], 3.5), # memory_coalescing: alias_analysis required | |
| 6: ([13, 1], 3.0), # strength_reduction: alias + constant_folding required | |
| 11: ([10], 2.5), # instruction_scheduling: register_allocation required | |
| 2: ([5], 2.0), # loop_unrolling: loop_invariant_motion required | |
| } | |
| # Suppression matrix: applying pass i suppresses future effectiveness of pass j. | |
| # This penalizes greedy's natural sequence (it picks high-base passes early, | |
| # which suppresses other passes it would have picked later). | |
| PASS_SYNERGY = np.ones((NUM_PASSES, NUM_PASSES), dtype=float) | |
| PASS_SYNERGY[2, 5] = 0.3 # early loop_unrolling suppresses LIM | |
| PASS_SYNERGY[3, 0] = 0.3 # early inlining (without interproc) suppresses DCE | |
| PASS_SYNERGY[3, 7] = 0.3 # early inlining suppresses CSE | |
| # --------------------------------------------------------------------------- | |
| # Program templates: different programs = different optimal orderings. | |
| # This prevents the agent from memorizing a single fixed sequence. | |
| # Fields: (name, instructions, loops, branches, functions, loop_depth, cost_scale) | |
| # --------------------------------------------------------------------------- | |
| ALL_PROGRAMS = [ | |
| ("loop_heavy", 500, 20, 10, 5, 4, 1.8), | |
| ("branch_heavy", 400, 5, 40, 8, 2, 1.4), | |
| ("compute_heavy", 800, 15, 15, 3, 3, 2.0), | |
| ("small_utility", 150, 3, 8, 12, 1, 1.1), | |
| ("recursive", 300, 8, 20, 6, 6, 1.6), | |
| ("vectorizable", 600, 25, 5, 4, 3, 1.9), | |
| ("inlining_heavy", 450, 10, 12, 20, 2, 1.5), | |
| ] | |
| # Task 1: only vectorizable programs (single chain to discover) | |
| TASK1_PROGRAMS = [p for p in ALL_PROGRAMS if p[0] in ("vectorizable", "loop_heavy")] | |
| # Task 2: programs where both key chains matter | |
| TASK2_PROGRAMS = [p for p in ALL_PROGRAMS if p[0] in ("compute_heavy", "inlining_heavy", "recursive")] | |
| # Task 3: all programs | |
| TASK3_PROGRAMS = ALL_PROGRAMS | |
| TASK_DESCRIPTIONS = { | |
| TASK_EASY: "Apply passes to a vectorizable program. Discover the alias_analysis → vectorization unlock chain. Target: >25% cost reduction.", | |
| TASK_MEDIUM: "Apply passes to a compute/inlining-heavy program. Discover two independent synergy chains. Target: >35% cost reduction.", | |
| TASK_HARD: "Apply passes to any program type. Discover and sequence all synergy chains optimally. Target: >42% cost reduction.", | |
| } | |
| # Per-program modifiers: how effective each pass is on each program type | |
| PROGRAM_MODIFIERS = { | |
| "loop_heavy": {2: 1.5, 5: 1.4, 4: 1.3}, | |
| "branch_heavy": {9: 1.5, 0: 1.3, 6: 1.2}, | |
| "compute_heavy": {4: 1.6, 12: 1.4, 10: 1.3}, | |
| "small_utility": {1: 1.4, 7: 1.3, 8: 1.2}, | |
| "recursive": {3: 1.5, 14: 1.4, 8: 1.3}, | |
| "vectorizable": {4: 1.8, 13: 1.5, 12: 1.4}, | |
| "inlining_heavy": {3: 1.6, 14: 1.5, 0: 1.2}, | |
| } | |
| # Grader thresholds: minimum improvement% to score above 0 | |
| GRADER_THRESHOLDS = { | |
| TASK_EASY: {"excellent": 0.35, "good": 0.25, "pass": 0.15}, | |
| TASK_MEDIUM: {"excellent": 0.45, "good": 0.35, "pass": 0.22}, | |
| TASK_HARD: {"excellent": 0.52, "good": 0.42, "pass": 0.28}, | |
| } | |
| class CompilerOptEnvironment(Environment): | |
| """ | |
| Compiler pass ordering RL environment implementing the full OpenEnv interface. | |
| Each episode: | |
| 1. A program template is sampled (based on task difficulty) | |
| 2. The agent applies up to MAX_STEPS passes from 15 available passes | |
| 3. Per-step reward = marginal improvement - step penalty | |
| 4. Terminal grader scores performance 0.0–1.0 | |
| Greedy baseline achieves ~19-24% (picks high-base passes, misses unlock chains). | |
| Trained RL agent achieves ~42-50% by learning prerequisite sequences. | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._program = None | |
| self._task_id = TASK_HARD | |
| self._baseline_cost = 0.0 | |
| self._current_cost = 0.0 | |
| self._passes_applied: list[int] = [] | |
| self._synergy_state = np.ones(NUM_PASSES) | |
| # ------------------------------------------------------------------ | |
| # OpenEnv interface | |
| # ------------------------------------------------------------------ | |
| def reset(self, task_id: int = TASK_HARD) -> CompilerOptObservation: | |
| """ | |
| Start a new episode. | |
| Args: | |
| task_id: 1=easy, 2=medium, 3=hard. Defaults to hard (full task). | |
| Returns: | |
| Initial CompilerOptObservation with baseline cost and program features. | |
| """ | |
| self._task_id = task_id | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._passes_applied = [] | |
| self._synergy_state = np.ones(NUM_PASSES) | |
| programs = { | |
| TASK_EASY: TASK1_PROGRAMS, | |
| TASK_MEDIUM: TASK2_PROGRAMS, | |
| TASK_HARD: TASK3_PROGRAMS, | |
| }.get(task_id, TASK3_PROGRAMS) | |
| self._program = random.choice(programs) | |
| _, instructions, loops, branches, functions, loop_depth, cost_scale = self._program | |
| noise = random.uniform(0.9, 1.1) | |
| self._baseline_cost = 1000.0 * cost_scale * noise | |
| self._current_cost = self._baseline_cost | |
| return self._make_observation(reward=0.0, done=False, last_pass_id=None) | |
| def step(self, action: CompilerOptAction) -> CompilerOptObservation: | |
| """ | |
| Apply an optimization pass. | |
| If the pass has been applied before: penalty reward, no cost change. | |
| If prerequisites not met: pass fires at 0.3x (reduced effectiveness). | |
| If prerequisites met: pass fires at full synergy multiplier. | |
| Returns: | |
| CompilerOptObservation with updated cost, synergy state, and reward. | |
| """ | |
| self._state.step_count += 1 | |
| # Use task_id from action if provided, otherwise keep current | |
| task_id = getattr(action, 'task_id', self._task_id) | |
| pass_id = action.pass_id | |
| # Guard: reset was never called | |
| if self._program is None: | |
| self.reset(task_id) | |
| # Penalize re-application | |
| if pass_id in self._passes_applied: | |
| done = self._state.step_count >= MAX_STEPS | |
| return self._make_observation(reward=-0.3, done=done, last_pass_id=pass_id) | |
| # Compute effectiveness | |
| base_effect = BASE_PASS_EFFECTS[pass_id] | |
| synergy_mult = self._compute_gated_synergy(pass_id) | |
| program_mod = self._program_modifier(pass_id) | |
| cost_reduction = base_effect * synergy_mult * program_mod * self._current_cost | |
| cost_reduction = max(0.0, cost_reduction) | |
| prev_cost = self._current_cost | |
| self._current_cost = max(0.0, self._current_cost - cost_reduction) | |
| # Update suppression state for future passes | |
| for future_pass in range(NUM_PASSES): | |
| self._synergy_state[future_pass] *= PASS_SYNERGY[pass_id][future_pass] | |
| self._passes_applied.append(pass_id) | |
| # Per-step shaped reward | |
| marginal_improvement = (prev_cost - self._current_cost) / self._baseline_cost | |
| step_penalty = 0.02 | |
| reward = marginal_improvement - step_penalty | |
| total_improvement = self._total_improvement() | |
| done = (self._state.step_count >= MAX_STEPS) or (len(self._passes_applied) >= NUM_PASSES) | |
| if done: | |
| reward += self._terminal_bonus(total_improvement) | |
| return self._make_observation(reward=reward, done=done, last_pass_id=pass_id) | |
| def state(self) -> State: | |
| """Return current episode state (episode_id, step_count).""" | |
| return self._state | |
| # ------------------------------------------------------------------ | |
| # Grader: called externally to score a completed episode | |
| # ------------------------------------------------------------------ | |
| def grade(self) -> float: | |
| """ | |
| Score the agent's performance on this episode. Returns 0.0–1.0. | |
| Scoring rubric (per task): | |
| score = 1.0 if improvement >= excellent threshold | |
| score = 0.7 if improvement >= good threshold | |
| score = 0.4 if improvement >= pass threshold | |
| score = 0.0 if improvement < pass threshold (or made things worse) | |
| Additionally deducts 0.05 per step beyond the minimum needed | |
| (encourages efficient sequencing, not just throwing passes at the wall). | |
| """ | |
| improvement = self._total_improvement() | |
| thresholds = GRADER_THRESHOLDS[self._task_id] | |
| if improvement >= thresholds["excellent"]: | |
| base_score = 1.0 | |
| elif improvement >= thresholds["good"]: | |
| base_score = 0.7 | |
| elif improvement >= thresholds["pass"]: | |
| base_score = 0.4 | |
| else: | |
| return 0.0 | |
| # Efficiency deduction: penalize using more steps than necessary | |
| steps_used = len(self._passes_applied) | |
| min_steps = self._minimum_steps_needed() | |
| extra_steps = max(0, steps_used - min_steps) | |
| efficiency_penalty = extra_steps * 0.05 | |
| return max(0.0, round(base_score - efficiency_penalty, 3)) | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _compute_gated_synergy(self, pass_id: int) -> float: | |
| """ | |
| Return the synergy multiplier for pass_id given current pass history. | |
| If the pass has defined prerequisite gates and ALL prerequisites have | |
| been applied: returns the full synergy multiplier. | |
| If prerequisites are NOT all met: returns 0.3 (strong suppression). | |
| If no prerequisites defined: returns 1.0 (base effect only). | |
| """ | |
| if pass_id not in PREREQ_GATES: | |
| return 1.0 | |
| prereqs, multiplier = PREREQ_GATES[pass_id] | |
| applied = set(self._passes_applied) | |
| if all(p in applied for p in prereqs): | |
| return multiplier | |
| else: | |
| return 0.3 # prerequisites not met — reduced effectiveness | |
| def _program_modifier(self, pass_id: int) -> float: | |
| """Per-program-type effectiveness modifier for the given pass.""" | |
| if self._program is None: | |
| return 1.0 | |
| program_name = self._program[0] | |
| return PROGRAM_MODIFIERS.get(program_name, {}).get(pass_id, 1.0) | |
| def _total_improvement(self) -> float: | |
| """Fraction of baseline cost eliminated so far.""" | |
| if self._baseline_cost <= 0: | |
| return 0.0 | |
| return (self._baseline_cost - self._current_cost) / self._baseline_cost | |
| def _minimum_steps_needed(self) -> int: | |
| """ | |
| Theoretical minimum steps for a perfect agent on this task. | |
| Used for efficiency scoring in the grader. | |
| """ | |
| return {TASK_EASY: 3, TASK_MEDIUM: 5, TASK_HARD: 7}.get(self._task_id, 7) | |
| def _terminal_bonus(self, total_improvement: float) -> float: | |
| """Bonus reward at episode end based on total optimization level.""" | |
| if total_improvement >= 0.50: | |
| return 0.6 | |
| elif total_improvement >= 0.40: | |
| return 0.4 | |
| elif total_improvement >= 0.30: | |
| return 0.2 | |
| elif total_improvement >= 0.20: | |
| return 0.05 | |
| elif total_improvement > 0.0: | |
| return 0.0 | |
| else: | |
| return -0.2 | |
| def _make_observation(self, reward: float, done: bool, last_pass_id) -> CompilerOptObservation: | |
| """Construct a full CompilerOptObservation from current state.""" | |
| if self._program is None: | |
| return CompilerOptObservation() | |
| name, instructions, loops, branches, functions, loop_depth, _ = self._program | |
| improvement = self._total_improvement() | |
| grader_score = None | |
| if done: | |
| grader_score = self.grade() | |
| return CompilerOptObservation( | |
| estimated_cost = round(self._current_cost, 2), | |
| baseline_cost = round(self._baseline_cost, 2), | |
| num_instructions = instructions, | |
| num_loops = loops, | |
| num_branches = branches, | |
| num_functions = functions, | |
| loop_depth = loop_depth, | |
| program_type = name, | |
| passes_applied = list(self._passes_applied), | |
| passes_available = [i for i in range(NUM_PASSES) if i not in self._passes_applied], | |
| step_count = self._state.step_count, | |
| max_steps = MAX_STEPS, | |
| synergy_state = [round(float(x), 3) for x in self._synergy_state], | |
| task_id = self._task_id, | |
| task_description = TASK_DESCRIPTIONS[self._task_id], | |
| done = done, | |
| reward = round(reward, 4), | |
| improvement_pct = round(improvement * 100, 2), | |
| last_pass_name = PASS_NAMES.get(last_pass_id) if last_pass_id is not None else None, | |
| grader_score = grader_score, | |
| ) | |