| | import gym |
| | import pandas as pd |
| | import numpy as np |
| | import random |
| | from gym.spaces import Box |
| |
|
| | random.seed(42) |
| | np.random.seed(42) |
| |
|
| | class SolarSys(gym.Env): |
| | """ |
| | Flat (non-hierarchical) OpenAI Gym Environment for Multi-Agent energy management |
| | in a residential cluster, featuring complex P2P pricing and reward structures |
| | similar to the low-level agents in the Hierarchical model. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | data_path: str = "./data/training/simulated_data.csv", |
| | state: str = "region_a", |
| | time_freq: str = "15T", |
| | ): |
| | |
| | super().__init__() |
| | self.data_path = data_path |
| | self.time_freq = time_freq |
| | self.state = state.lower() |
| |
|
| | |
| | self._pricing_info = { |
| | "region_a": { |
| | "max_grid_price": 0.2112, |
| | "feed_in_tariff": 0.04, |
| | "price_function": self._get_region_a_price |
| | }, |
| | "region_b": { |
| | "max_grid_price": 0.32, |
| | "feed_in_tariff": 0.055, |
| | "price_function": self._get_region_b_price |
| | }, |
| | "region_c": { |
| | "max_grid_price": 0.12505, |
| | "feed_in_tariff": 0.06, |
| | "price_function": self._get_region_c_price |
| | } |
| | } |
| |
|
| | if self.state not in self._pricing_info: |
| | raise ValueError(f"State '{self.state}' is not supported. Available states: {list(self._pricing_info.keys())}") |
| | |
| | state_config = self._pricing_info[self.state] |
| | self.max_grid_price = state_config["max_grid_price"] |
| | self.feed_in_tariff = state_config["feed_in_tariff"] |
| | self._get_price_function = state_config["price_function"] |
| |
|
| | |
| | try: |
| | all_data = pd.read_csv(data_path) |
| | all_data["local_15min"] = pd.to_datetime(all_data["local_15min"], utc=True) |
| | all_data.set_index("local_15min", inplace=True) |
| | all_data = all_data.resample(time_freq).mean() |
| |
|
| | except FileNotFoundError: |
| | raise FileNotFoundError(f"Data file {data_path} not found.") |
| | except pd.errors.EmptyDataError: |
| | raise ValueError(f"Data file {data_path} is empty.") |
| | except Exception as e: |
| | raise ValueError(f"Error loading data: {e}") |
| |
|
| | |
| | grid_cols = [c for c in all_data.columns if c.startswith("grid_")] |
| | solar_cols = [c for c in all_data.columns if c.startswith("total_solar_")] |
| | all_grid = all_data[grid_cols].values |
| | all_solar = all_data[solar_cols].values |
| |
|
| | self.global_max_demand = float((all_grid + all_solar).max()) + 1e-8 |
| | self.global_max_solar = float(all_solar.max()) + 1e-8 |
| |
|
| | self.all_data = all_data |
| | |
| | |
| | freq_offset = pd.tseries.frequencies.to_offset(time_freq) |
| | minutes_per_step = freq_offset.nanos / 1e9 / 60.0 |
| | self.steps_per_day = int(24 * 60 // minutes_per_step) |
| |
|
| | total_rows = len(self.all_data) |
| | self.total_days = total_rows // self.steps_per_day |
| | if self.total_days < 1: |
| | raise ValueError("Dataset has less than a single day of data.") |
| |
|
| | self.house_ids = [ |
| | col.split("_")[1] for col in self.all_data.columns |
| | if col.startswith("grid_") |
| | ] |
| | self.num_agents = len(self.house_ids) |
| | self.original_no_p2p_import = {} |
| | for hid in self.house_ids: |
| | col_grid = f"grid_{hid}" |
| | self.original_no_p2p_import[hid] = self.all_data[col_grid].clip(lower=0.0).values |
| | |
| | |
| | solar_sums = self.all_data[solar_cols].sum(axis=0).to_dict() |
| | self.agent_groups = [ |
| | 1 if solar_sums[f"total_solar_{hid}"] > 0 else 0 for hid in self.house_ids |
| | ] |
| | self.solar_houses = [ |
| | hid for hid in self.house_ids if self.agent_groups[self.house_ids.index(hid)] == 1 |
| | ] |
| |
|
| | self.battery_options = { |
| | "teslapowerwall": {"max_capacity": 13.5, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 5.0, "max_discharge_rate": 5.0, "degradation_cost_per_kwh": 0.005}, |
| | "enphase": {"max_capacity": 5.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 2.0, "max_discharge_rate": 2.0, "degradation_cost_per_kwh": 0.005}, |
| | "franklin": {"max_capacity": 15.0, "charge_efficiency": 0.95, "discharge_efficiency": 0.90, "max_charge_rate": 6.0, "max_discharge_rate": 6.0, "degradation_cost_per_kwh": 0.005}, |
| | } |
| | |
| | |
| | self.batteries = {} |
| | self.has_battery = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_max_capacity = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_charge_efficiency = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_discharge_efficiency = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_max_charge_rate = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_max_discharge_rate = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_degradation_cost = np.zeros(self.num_agents, dtype=np.float32) |
| | self.battery_soc = np.zeros(self.num_agents, dtype=np.float32) |
| | |
| | for i, hid in enumerate(self.house_ids): |
| | if hid in self.solar_houses: |
| | choice = random.choice(list(self.battery_options)) |
| | specs = self.battery_options[choice] |
| | self.batteries[hid] = specs |
| | |
| | self.has_battery[i] = 1.0 |
| | self.battery_max_capacity[i] = specs["max_capacity"] |
| | self.battery_charge_efficiency[i] = specs["charge_efficiency"] |
| | self.battery_discharge_efficiency[i] = specs["discharge_efficiency"] |
| | self.battery_max_charge_rate[i] = specs["max_charge_rate"] |
| | self.battery_max_discharge_rate[i] = specs["max_discharge_rate"] |
| | self.battery_degradation_cost[i] = specs["degradation_cost_per_kwh"] |
| |
|
| | |
| | |
| | self.observation_space = Box( |
| | low=-np.inf, high=np.inf, |
| | shape=(self.num_agents, 8), |
| | dtype=np.float32 |
| | ) |
| | |
| | |
| | self.action_space = Box( |
| | low=0.0, |
| | high=1.0, |
| | shape=(self.num_agents, 6), |
| | dtype=np.float32 |
| | ) |
| | |
| | self.episode_metrics = {} |
| | self._initialize_episode_metrics() |
| | |
| | |
| | self.data = None |
| | self.demands_day = None |
| | self.solars_day = None |
| | self.hours_day = None |
| | self.current_step = 0 |
| | self.num_steps = self.steps_per_day |
| | self.previous_actions = np.zeros((self.num_agents, 6), dtype=np.float32) |
| |
|
| |
|
| | def _initialize_episode_metrics(self): |
| | """Initialize or reset all metrics tracked over a single episode.""" |
| | self.cumulative_grid_reduction = 0.0 |
| | self.cumulative_grid_reduction_peak = 0.0 |
| | self.cumulative_degradation_cost = 0.0 |
| | self.agent_cost_savings = np.zeros(self.num_agents, dtype=np.float32) |
| | self.degradation_cost_timeseries = [] |
| | self.cost_savings_timeseries = [] |
| | self.grid_reduction_timeseries = [] |
| |
|
| |
|
| | |
| | def get_grid_price(self, step_idx): |
| | """Return grid price for the current step.""" |
| | return self._get_price_function(step_idx) |
| |
|
| | def _get_region_a_price(self, step_idx): |
| | minutes_per_step = 24 * 60 / self.steps_per_day |
| | hour = int((step_idx * minutes_per_step) // 60) % 24 |
| | if 14 <= hour < 19: |
| | return 0.2112 |
| | else: |
| | return 0.0434 |
| |
|
| | def _get_region_b_price(self, step_idx): |
| | minutes_per_step = 24 * 60 / self.steps_per_day |
| | hour = int((step_idx * minutes_per_step) // 60) % 24 |
| | if 15 <= hour < 19: |
| | return 0.32 |
| | elif 13 <= hour < 15: |
| | return 0.22 |
| | else: |
| | return 0.12 |
| |
|
| | def _get_region_c_price(self, step_idx): |
| | minutes_per_step = 24 * 60 / self.steps_per_day |
| | hour = int((step_idx * minutes_per_step) // 60) % 24 |
| | if 13 <= hour < 21: |
| | return 0.125048 |
| | elif hour >= 23 or hour < 6: |
| | return 0.057014 |
| | else: |
| | return 0.079085 |
| |
|
| | def get_peer_price(self, step_idx, total_surplus, total_shortfall): |
| | """ |
| | Calculates P2P price based on supply/demand ratio (Arctangent-log approach). |
| | This matches the logic used in the Hierarchical model's coordination layer. |
| | """ |
| | grid_price = self.get_grid_price(step_idx) |
| | feed_in_tariff = self.feed_in_tariff |
| | |
| | |
| | p_balance = (grid_price * 0.80) + (feed_in_tariff * 0.20) |
| | p_con = (grid_price - feed_in_tariff) / (1.5 * np.pi) |
| | k = 1.5 |
| | epsilon = 1e-6 |
| | supply = total_surplus + epsilon |
| | demand = total_shortfall + epsilon |
| | |
| | ratio = demand / supply |
| | log_ratio = np.log(ratio) |
| | if log_ratio < 0: |
| | power_term = - (np.abs(log_ratio) ** k) |
| | else: |
| | power_term = log_ratio ** k |
| | |
| | price_offset = 2 * np.pi * p_con * np.arctan(power_term) |
| | |
| | peer_price = p_balance + price_offset |
| | |
| | final_price = float(np.clip(peer_price, feed_in_tariff, grid_price)) |
| | |
| | return final_price |
| |
|
| |
|
| | def reset(self): |
| | |
| | if self.current_step > 0: |
| | positive_savings = self.agent_cost_savings[self.agent_cost_savings > 0] |
| | fairness_on_savings = self._compute_jains_index(positive_savings) if len(positive_savings) > 1 else 0.0 |
| | self.episode_metrics = { |
| | "total_cost_savings": np.sum(self.agent_cost_savings), |
| | "fairness_on_cost_savings": fairness_on_savings, |
| | "battery_degradation_cost_total": self.cumulative_degradation_cost, |
| | |
| | } |
| | |
| | |
| | self.day_index = np.random.randint(0, self.total_days) |
| | start_row = self.day_index * self.steps_per_day |
| | end_row = start_row + self.steps_per_day |
| | day_data = self.all_data.iloc[start_row:end_row].copy() |
| | self.data = day_data |
| |
|
| | |
| | demand_list = [] |
| | solar_list = [] |
| | for hid in self.house_ids: |
| | col_grid = f"grid_{hid}" |
| | col_solar = f"total_solar_{hid}" |
| | grid_series = day_data[col_grid].fillna(0.0) |
| | solar_series = day_data[col_solar].fillna(0.0).clip(lower=0.0) |
| | demand_array = grid_series.values + solar_series.values |
| | demand_array = np.clip(demand_array, 0.0, None) |
| | demand_list.append(demand_array) |
| | solar_list.append(solar_series.values) |
| |
|
| | self.demands_day = np.stack(demand_list, axis=1).astype(np.float32) |
| | self.solars_day = np.stack(solar_list, axis=1).astype(np.float32) |
| | self.hours_day = (self.data.index.hour + self.data.index.minute / 60.0).values |
| | |
| | self.no_p2p_import_day = np.stack( |
| | [self.original_no_p2p_import[hid][start_row:end_row] for hid in self.house_ids], axis=1 |
| | ) |
| |
|
| | |
| | self.current_step = 0 |
| | self._initialize_episode_metrics() |
| | self.previous_actions = np.zeros((self.num_agents, 6), dtype=np.float32) |
| | |
| | |
| | lows = 0.30 * self.battery_max_capacity |
| | highs = 0.70 * self.battery_max_capacity |
| | self.battery_soc = np.random.uniform(low=lows, high=highs) |
| | self.battery_soc *= self.has_battery |
| |
|
| | |
| | obs = self._get_obs() |
| | return obs, {} |
| |
|
| |
|
| | def step(self, actions): |
| | actions = np.clip(np.array(actions, dtype=np.float32), 0.0, 1.0) |
| | |
| | a_sellGrid, a_buyGrid, a_sellPeers, a_buyPeers, a_chargeBatt, a_dischargeBatt = actions.T |
| | |
| | demands = self.demands_day[self.current_step] |
| | solars = self.solars_day[self.current_step] |
| |
|
| | |
| | total_surplus = np.maximum(solars - demands, 0.0).sum() |
| | total_shortfall = np.maximum(demands - solars, 0.0).sum() |
| | peer_price = self.get_peer_price(self.current_step, total_surplus, total_shortfall) |
| | grid_price = self.get_grid_price(self.current_step) |
| | feed_in_tariff = self.feed_in_tariff |
| | |
| | |
| | final_shortfall = np.maximum(demands - solars, 0.0) |
| | final_surplus = np.maximum(solars - demands, 0.0) |
| | |
| | |
| | available_from_batt = self.battery_soc * self.battery_discharge_efficiency |
| | desired_discharge = a_dischargeBatt * self.battery_max_discharge_rate |
| | discharge_amount = np.minimum.reduce([desired_discharge, available_from_batt, final_shortfall]) |
| | discharge_amount *= self.has_battery |
| |
|
| | |
| | self.battery_soc -= (discharge_amount / (self.battery_discharge_efficiency + 1e-9)) * self.has_battery |
| | self.battery_soc = np.maximum(0.0, self.battery_soc) |
| | final_shortfall -= discharge_amount |
| |
|
| | |
| | cap_left = self.battery_max_capacity - self.battery_soc |
| | desired_charge = a_chargeBatt * self.battery_max_charge_rate |
| | charge_limit = cap_left / (self.battery_charge_efficiency + 1e-9) |
| | charge_amount = np.minimum.reduce([desired_charge, charge_limit, final_surplus]) |
| | charge_amount *= self.has_battery |
| |
|
| | |
| | self.battery_soc += charge_amount * self.battery_charge_efficiency |
| | final_surplus -= charge_amount |
| |
|
| | |
| | battery_offer = (self.battery_soc * self.battery_discharge_efficiency) * self.has_battery |
| | effective_surplus = final_surplus + battery_offer |
| |
|
| | netPeer = a_buyPeers - a_sellPeers |
| | p2p_buy_request = np.maximum(0, netPeer) * final_shortfall |
| | p2p_sell_offer = np.maximum(0, -netPeer) * effective_surplus |
| |
|
| | total_sell = np.sum(p2p_sell_offer) |
| | total_buy = np.sum(p2p_buy_request) |
| | matched = min(total_sell, total_buy) |
| |
|
| | if matched > 1e-9: |
| | sell_fraction = p2p_sell_offer / (total_sell + 1e-12) |
| | buy_fraction = p2p_buy_request / (total_buy + 1e-12) |
| | actual_sold = matched * sell_fraction |
| | actual_bought = matched * buy_fraction |
| | else: |
| | actual_sold = np.zeros(self.num_agents, dtype=np.float32) |
| | actual_bought = np.zeros(self.num_agents, dtype=np.float32) |
| | |
| | |
| | from_batt_p2p = np.minimum(actual_sold, battery_offer) |
| | from_solar_p2p = actual_sold - from_batt_p2p |
| |
|
| | |
| | final_surplus -= from_solar_p2p |
| | final_shortfall -= actual_bought |
| | |
| | |
| | soc_reduction_p2p = (from_batt_p2p / (self.battery_discharge_efficiency + 1e-9)) * self.has_battery |
| | self.battery_soc -= soc_reduction_p2p |
| | self.battery_soc = np.maximum(0.0, self.battery_soc) |
| | |
| | |
| | netGrid = a_buyGrid - a_sellGrid |
| | grid_import = np.maximum(0, netGrid) * final_shortfall |
| | grid_export = np.maximum(0, -netGrid) * final_surplus |
| | |
| | |
| | forced_import = np.maximum(final_shortfall - grid_import, 0.0) |
| | grid_import += forced_import |
| | |
| | |
| | costs = ( |
| | (grid_import * grid_price) |
| | - (grid_export * feed_in_tariff) |
| | + (actual_bought * peer_price) |
| | - (actual_sold * peer_price) |
| | ) |
| | |
| | final_rewards = self._compute_rewards( |
| | grid_import, grid_export, actual_sold, actual_bought, |
| | charge_amount, discharge_amount, costs, grid_price, peer_price |
| | ) |
| |
|
| | |
| | no_p2p_import_this_step = self.no_p2p_import_day[self.current_step] |
| | |
| | step_grid_reduction = np.sum(no_p2p_import_this_step - grid_import) |
| | self.cumulative_grid_reduction += step_grid_reduction |
| | self.grid_reduction_timeseries.append(step_grid_reduction) |
| | if grid_price >= self.max_grid_price * 0.99: |
| | self.cumulative_grid_reduction_peak += step_grid_reduction |
| |
|
| | cost_no_p2p = no_p2p_import_this_step * grid_price |
| | step_cost_savings_per_agent = cost_no_p2p - costs |
| | self.agent_cost_savings += step_cost_savings_per_agent |
| | self.cost_savings_timeseries.append(np.sum(step_cost_savings_per_agent)) |
| |
|
| | degradation_cost_agent = (charge_amount + discharge_amount) * self.battery_degradation_cost |
| | step_degradation_cost = np.sum(degradation_cost_agent) |
| | self.cumulative_degradation_cost += step_degradation_cost |
| | self.degradation_cost_timeseries.append(step_degradation_cost) |
| | |
| | info = { |
| | "p2p_buy": actual_bought, "p2p_sell": actual_sold, |
| | "grid_import_with_p2p": grid_import, "grid_import_no_p2p": no_p2p_import_this_step, |
| | "grid_export": grid_export, "costs": costs, |
| | "charge_amount": charge_amount, "discharge_amount": discharge_amount, |
| | "step": self.current_step, "agent_rewards": final_rewards, |
| | } |
| |
|
| | |
| | self.current_step += 1 |
| | done = (self.current_step >= self.num_steps) |
| | obs_next = self._get_obs() |
| |
|
| | |
| | rewards_list = list(final_rewards) |
| | return obs_next, rewards_list, done, info |
| | |
| |
|
| | def _get_obs(self): |
| | step = min(self.current_step, self.num_steps - 1) |
| | demands = self.demands_day[step] |
| | solars = self.solars_day[step] |
| | |
| | |
| | total_surplus = float(np.maximum(solars - demands, 0.0).sum()) |
| | total_shortfall = float(np.maximum(demands - solars, 0.0).sum()) |
| |
|
| | grid_price = self.get_grid_price(step) |
| | peer_price = self.get_peer_price(step, total_surplus, total_shortfall) |
| | hour = self.hours_day[step] |
| | |
| | |
| | soc_frac = self.battery_soc / (self.battery_max_capacity + 1e-9) |
| | soc_frac = np.where(self.has_battery == 1, soc_frac, -1.0) |
| | |
| | |
| | obs = np.stack([ |
| | demands, |
| | solars, |
| | soc_frac, |
| | np.full(self.num_agents, grid_price), |
| | np.full(self.num_agents, peer_price), |
| | demands.sum() - demands, |
| | solars.sum() - solars, |
| | np.full(self.num_agents, hour) |
| | ], axis=1).astype(np.float32) |
| |
|
| | return obs |
| |
|
| |
|
| | def _compute_jains_index(self, usage_array): |
| | """Simple Jain's Fairness Index.""" |
| | x = np.array(usage_array, dtype=np.float32) |
| | numerator = (np.sum(x))**2 |
| | denominator = len(x) * np.sum(x**2) + 1e-8 |
| | return numerator / denominator |
| |
|
| |
|
| | def _compute_rewards( |
| | self, grid_import, grid_export, actual_sold, actual_bought, |
| | charge_amount, discharge_amount, costs, grid_price, peer_price |
| | ): |
| | """Calculates the weighted, combined reward for all agents (vectorized).""" |
| | |
| | |
| | w1 = 0.3; w2 = 0.5; w3 = 0.5; w4 = 0.1; w5 = 0.05; w6 = 0.4; w7 = 1.0 |
| |
|
| | |
| | jfi = self._compute_jains_index(actual_bought + actual_sold) |
| |
|
| | |
| | p_grid_norm = grid_price / self.max_grid_price |
| | p_peer_norm = peer_price / self.max_grid_price |
| |
|
| | |
| | rewards = -costs * w7 |
| |
|
| | |
| | rewards -= w1 * grid_import * p_grid_norm |
| |
|
| | |
| | rewards += w2 * actual_sold * p_peer_norm |
| |
|
| | |
| | buy_bonus_factor = (grid_price - peer_price) / self.max_grid_price |
| | buy_bonus = w3 * actual_bought * buy_bonus_factor |
| | rewards += np.where(peer_price < grid_price, buy_bonus, 0.0) |
| |
|
| | |
| | soc_frac = self.battery_soc / (self.battery_max_capacity + 1e-9) |
| | soc_penalties = w4 * ((soc_frac - 0.5) ** 2) * self.has_battery |
| | rewards -= soc_penalties |
| |
|
| | |
| | degrad_penalties = w5 * (charge_amount + discharge_amount) * self.battery_degradation_cost |
| | rewards -= degrad_penalties |
| |
|
| | |
| | rewards += w6 * jfi |
| | |
| | return rewards |
| |
|
| |
|
| | def get_episode_metrics(self): |
| | """Return performance metrics for the last completed episode.""" |
| | return self.episode_metrics |