我想要一个以 kw 为单位的 EXPORT_CAPACITY 变量,对于每个时间步长(以 30 分钟为增量)。
m.exportLimit = en.Param(initialize=-EXPORT_CAPACITY*TIME_STEP_HOURS)
它是负数,因为导出作为负载 pv 的负净值返回
def E_neg_net_rule(m, t):
return m.negNetLoad[t] == m.negLoad[t] + m.posEInPV[t] + m.negEOutExport[t]
m.E_negNet_cons = en.Constraint(m.Time, rule=E_neg_net_rule)
例如,如果用户将输出限制设置为 100kw,则在每个时间步,
和 m.negEOutExport[t]
之间任何时间步的最大输出将为 -50kw。
我将在此处提供完整的代码以及 github 存储库的链接。
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pyomo.environ as en
from pyomo.opt import SolverFactory
from assests.battery import Battery
import time
import logging
# Set up seaborn style
'axes.linewidth': 1, 'axes.edgecolor': 'black',
'xtick.direction': 'out', 'xtick.major.size': 4.0,
'ytick.direction': 'out', 'ytick.major.size': 4.0,
'axes.facecolor': 'white', 'grid.color': '.8',
'grid.linestyle': '-', 'grid.linewidth': 0.5
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Constants
BIG_M = 500000
# Battery Parameters
Bcapacity = 300 # kWh
Bcharging_power = 75 # kW
Bdischarging_power = -75 # kW
Bcharging_efficiency = 0.95
Bdischarging_efficiency = 0.95
Bmin_soc = 0.5
Bmax_soc = 0.97
Binitial_soc = Bcapacity * Bmin_soc
# Create an instance of the Battery class
battery = Battery(
# Print battery parameters
# Load test data
input_file_path = './input_data/BESS_Input.csv'
testData = pd.read_csv(input_file_path, parse_dates=['timestamp'], index_col='timestamp')
logging.info(f"Loaded data from {input_file_path}")
except FileNotFoundError:
logging.error(f"File not found: {input_file_path}")
# Ensure the index is a DateTimeIndex with the correct format
date_format = "%d/%m/%Y %H:%M"
g_df = testData.copy()
g_df.index = pd.to_datetime(g_df.index, format=date_format, dayfirst=True)
def run_monthly_optimization(start_date, end_date, initial_soc):
Run optimization for a given month.
- start_date: pd.Timestamp, start of the optimization period
- end_date: pd.Timestamp, end of the optimization period
- initial_soc: float, initial state of charge of the battery
- outputVars: np.array, optimization results
- final_soc: float, final state of charge of the battery
- cost_without_batt: float, cost without battery
- cost_with_batt: float, cost with battery
- score: float, optimization score
- money_saved: float, money saved/earned
# month_data = g_df.loc[start_date:end_date].copy()
# month_data['demand'] = month_data['demand'].shift(-1)
# month_data['generation'] = month_data['generation'].shift(-1)
month_data = g_df.loc[start_date:end_date].copy()
month_data['demand'] = month_data['demand']
month_data['generation'] = month_data['generation']
# load = month_data['demand'].values[:-1]
# PV = month_data['generation'].values[:-1]
# sellPrice = month_data['sellPrice'].values[:-1]
# buyPrice = month_data['buyPrice'].values[:-1]
load = month_data['demand'].values
PV = month_data['generation'].values
sellPrice = month_data['sellPrice'].values
buyPrice = month_data['buyPrice'].values
priceDict1 = dict(enumerate(sellPrice))
priceDict2 = dict(enumerate(buyPrice))
net = load - PV
posLoad = np.clip(net, a_min=0, a_max=None)
negLoad = np.clip(net, a_min=None, a_max=0)
posLoadDict = dict(enumerate(posLoad))
negLoadDict = dict(enumerate(negLoad))
m = en.ConcreteModel()
m.Time = en.RangeSet(0, len(net) - 1)
m.SOC = en.Var(m.Time, bounds=(battery.min_depth_of_discharge, battery.max_depth_of_discharge), initialize=initial_soc)
m.posDeltaSOC = en.Var(m.Time, initialize=0)
m.negDeltaSOC = en.Var(m.Time, initialize=0)
m.posEInGrid = en.Var(m.Time, bounds=(0, battery.charging_power_limit * TIME_STEP_HOURS), initialize=0)
m.posEInPV = en.Var(m.Time, bounds=(0, battery.charging_power_limit * TIME_STEP_HOURS), initialize=0)
m.negEOutLocal = en.Var(m.Time, bounds=(battery.discharging_power_limit * TIME_STEP_HOURS, 0), initialize=0)
m.negEOutExport = en.Var(m.Time, bounds=(battery.discharging_power_limit * TIME_STEP_HOURS, 0), initialize=0)
m.posNetLoad = en.Var(m.Time, initialize=posLoadDict)
m.negNetLoad = en.Var(m.Time, initialize=negLoadDict)
m.Bool_char = en.Var(m.Time, within=en.Boolean)
m.Bool_dis = en.Var(m.Time, within=en.Boolean, initialize=0)
m.priceSell = en.Param(m.Time, initialize=priceDict1)
m.priceBuy = en.Param(m.Time, initialize=priceDict2)
m.posLoad = en.Param(m.Time, initialize=posLoadDict)
m.negLoad = en.Param(m.Time, initialize=negLoadDict)
m.etaChg = en.Param(initialize=battery.charging_efficiency)
m.etaDisChg = en.Param(initialize=battery.discharging_efficiency)
m.ChargingLimit = en.Param(initialize=battery.charging_power_limit * TIME_STEP_HOURS)
m.DischargingLimit = en.Param(initialize=battery.discharging_power_limit * TIME_STEP_HOURS)
def Obj_fn(m):
return sum((m.priceBuy[t] * m.posNetLoad[t]) + (m.priceSell[t] * m.negNetLoad[t]) for t in m.Time)
m.total_cost = en.Objective(rule=Obj_fn, sense=en.minimize)
def SOC_rule(m, t):
if t == 0:
return m.SOC[t] == initial_soc + m.posDeltaSOC[t] + m.negDeltaSOC[t]
return m.SOC[t] == m.SOC[t-1] + m.posDeltaSOC[t] + m.negDeltaSOC[t]
m.Batt_SOC = en.Constraint(m.Time, rule=SOC_rule)
def Bool_char_rule_1(m, t):
return m.posDeltaSOC[t] >= -BIG_M * m.Bool_char[t]
m.Batt_ch1 = en.Constraint(m.Time, rule=Bool_char_rule_1)
def Bool_char_rule_2(m, t):
return m.posDeltaSOC[t] <= 0 + BIG_M * (1 - m.Bool_dis[t])
m.Batt_ch2 = en.Constraint(m.Time, rule=Bool_char_rule_2)
def Bool_char_rule_3(m, t):
return m.negDeltaSOC[t] <= BIG_M * m.Bool_dis[t]
m.Batt_cd3 = en.Constraint(m.Time, rule=Bool_char_rule_3)
def Bool_char_rule_4(m, t):
return m.negDeltaSOC[t] >= 0 - BIG_M * (1 - m.Bool_char[t])
m.Batt_cd4 = en.Constraint(m.Time, rule=Bool_char_rule_4)
def Batt_char_dis(m, t):
return m.Bool_char[t] + m.Bool_dis[t] <= 1
m.Batt_char_dis = en.Constraint(m.Time, rule=Batt_char_dis)
def pos_E_in_rule(m, t):
return (m.posEInGrid[t] + m.posEInPV[t]) == m.posDeltaSOC[t] / m.etaChg
m.posEIn_cons = en.Constraint(m.Time, rule=pos_E_in_rule)
def neg_E_out_rule(m, t):
return (m.negEOutLocal[t] + m.negEOutExport[t]) == m.negDeltaSOC[t] * m.etaDisChg
m.negEOut_cons = en.Constraint(m.Time, rule=neg_E_out_rule)
def E_charging_rate_rule(m, t):
return (m.posEInGrid[t] + m.posEInPV[t]) <= m.ChargingLimit
m.chargingLimit_cons = en.Constraint(m.Time, rule=E_charging_rate_rule)
def E_discharging_rate_rule(m, t):
return (m.negEOutLocal[t] + m.negEOutExport[t]) >= m.DischargingLimit
m.dischargingLimit_cons = en.Constraint(m.Time, rule=E_discharging_rate_rule)
def E_solar_charging_rule(m, t):
return m.posEInPV[t] <= -m.negLoad[t]
m.solarChargingLimit_cons = en.Constraint(m.Time, rule=E_solar_charging_rule)
def E_local_discharge_rule(m, t):
return m.negEOutLocal[t] >= -m.posLoad[t]
m.localDischargingLimit_cons = en.Constraint(m.Time, rule=E_local_discharge_rule)
def E_pos_net_rule(m, t):
return m.posNetLoad[t] == m.posLoad[t] + m.posEInGrid[t] + m.negEOutLocal[t]
m.E_posNet_cons = en.Constraint(m.Time, rule=E_pos_net_rule)
def E_neg_net_rule(m, t):
return m.negNetLoad[t] == m.negLoad[t] + m.posEInPV[t] + m.negEOutExport[t]
m.E_negNet_cons = en.Constraint(m.Time, rule=E_neg_net_rule)
opt = SolverFactory("glpk", executable=r"C:\Python\glpk-4.65\w64\glpsol.exe")
t = time.time()
results = opt.solve(m)
elapsed = time.time() - t
logging.info(f'Time elapsed for optimization: {elapsed} seconds')
outputVars = np.zeros((9, len(sellPrice)))
variable_indices = {
'SOC': 0,
'posDeltaSOC': 1,
'negDeltaSOC': 2,
'posEInGrid': 3,
'posEInPV': 4,
'negEOutLocal': 5,
'negEOutExport': 6,
'posNetLoad': 7,
'negNetLoad': 8
for var_name, var_index in variable_indices.items():
varobject = getattr(m, var_name)
for index in varobject:
if index < len(sellPrice):
outputVars[var_index, index] = varobject[index].value
final_soc = outputVars[0, -1]
# Get the total cost for the current month
cost_without_batt = round(np.sum([(buyPrice[i]*posLoad[i] + sellPrice[i]*negLoad[i]) for i in range(len(buyPrice))]))
cost_with_batt = round(np.sum([(buyPrice[i]*outputVars[7, i] + sellPrice[i]*outputVars[8, i]) for i in range(len(buyPrice))]))
# Calculate the score and money saved/earned for the current month
score = (cost_with_batt - cost_without_batt) / np.abs(cost_without_batt) * 100
money_saved = round(abs(cost_without_batt - cost_with_batt), 2)
return outputVars, final_soc, cost_without_batt, cost_with_batt, score, money_saved
def main():
Main function to run the battery optimization for each month of the year.
full_output = []
initial_soc = Binitial_soc
total_cost_without_batt = 0
total_cost_with_batt = 0
total_score = 0
total_money_saved = 0
for month in range(1, 13):
start_date = pd.Timestamp(year=2023, month=month, day=1)
end_date = start_date + pd.offsets.MonthEnd(0) + pd.Timedelta(hours=23.5)
logging.info(f"Running optimization from {start_date.strftime(' %D %Y %H:%M ')} to {end_date.strftime(' %D %Y %H:%M ')}")
monthly_output, initial_soc, cost_without_batt, cost_with_batt, score, money_saved = run_monthly_optimization(start_date, end_date, initial_soc)
total_cost_without_batt += cost_without_batt
total_cost_with_batt += cost_with_batt
total_score = (total_cost_with_batt - total_cost_without_batt) / np.abs(total_cost_without_batt) * 100
total_money_saved += money_saved
logging.info(f"Cost without battery for {start_date.strftime('%B %Y')}: £{cost_without_batt}")
logging.info(f"Cost with battery for {start_date.strftime('%B %Y')}: £{cost_with_batt}")
logging.info(f"Score for {start_date.strftime('%B %Y')}: {score:.4f}%")
logging.info(f"Money saved/earned for {start_date.strftime('%B %Y')}: £{money_saved}")
# Print the total results for the entire year
logging.info(f'Total cost without battery for the entire year: £{total_cost_without_batt}')
logging.info(f'Total cost with battery for the entire year: £{total_cost_with_batt}')
logging.info(f'Total score for the entire year: {total_score:.4f}%')
logging.info(f'Total money saved/earned for the entire year: £{total_money_saved}')
# Concatenate results from all months
full_output = np.concatenate(full_output, axis=1)
# Create datetime index
time_index = pd.date_range(start=g_df.index.min(), periods=len(full_output[0]), freq='30min')
timeDF = pd.DataFrame(index=time_index)
timeDF['demand'] = g_df['demand'].values[:len(full_output[0])]
timeDF['pv'] = g_df['generation'].values[:len(full_output[0])]
timeDF['buyPrice'] = g_df['buyPrice'].values[:len(full_output[0])]
timeDF['sellPrice'] = g_df['sellPrice'].values[:len(full_output[0])]
# Export results to CSV
headers = ['SOC', 'posDeltaSOC', 'negDeltaSOC', 'posEInGrid', 'posEInPV', 'negEOutLocal', 'negEOutExport', 'posNetLoad', 'negNetLoad']
outputDF = pd.DataFrame(full_output.T, columns=headers, index=time_index)
outputDF = pd.concat([timeDF, outputDF], axis=1)
logging.info("Optimization results saved to full_output.csv")
plot_results(outputDF, full_output)
plot_new_daily_profiles(outputDF, sns.color_palette())
def plot_results(outputDF, full_output):
Plot the results of the optimization.
newNetLoad = full_output[7] + full_output[8]
colors = sns.color_palette()
# Convert the index to a datetime format for plotting
outputDF['datetime'] = pd.to_datetime(outputDF.index)
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(14, 12))
# Plot demand, PV, and net load with storage
ax1.plot(outputDF['datetime'], outputDF['demand'], color=colors[0], label='Demand')
ax1.plot(outputDF['datetime'], outputDF['pv'], color=colors[1], label='PV')
ax1.plot(outputDF['datetime'], newNetLoad, color=colors[3], label='With storage')
ax1.set_xlim([outputDF['datetime'].min(), outputDF['datetime'].max()])
ax1.set_title('Demand, PV, and Net Load with Storage')
# Plot buy and sell prices
ax2.plot(outputDF['datetime'], outputDF['buyPrice'], color=colors[3], label='Buy Price')
ax2.plot(outputDF['datetime'], outputDF['sellPrice'], color=colors[4], label='Sell Price')
ax2.set_xlim([outputDF['datetime'].min(), outputDF['datetime'].max()])
ax2.set_title('Buy and Sell Prices')
# Plot battery action and SOC
ax3.plot(outputDF['datetime'], np.sum(full_output[3:7, :], axis=0), color=colors[5], label='Battery action (kWh)')
ax3.plot(outputDF['datetime'], full_output[0], color=colors[4], label='SOC (kWh)')
ax3.set_xlim([outputDF['datetime'].min(), outputDF['datetime'].max()])
ax3.set_title('Battery Action and SOC')
plot_daily_profiles(outputDF, colors)
def plot_daily_profiles(outputDF, colors):
Plot daily profiles of the optimization results.
fig, ax1 = plt.subplots(figsize=(14, 8))
# Resample to daily averages
daily_df = outputDF.drop(columns=['datetime']).resample('D').sum()
ax1.plot(daily_df.index, daily_df['demand'], color=colors[0], label='Demand')
ax1.plot(daily_df.index, daily_df['pv'], color=colors[1], label='PV')
ax1.plot(daily_df.index, (daily_df['posNetLoad'] + daily_df['negNetLoad']), color=colors[3], label='With storage')
ax1.set_xlim([daily_df.index.min(), daily_df.index.max()])
ax1.set_title('Daily Demand, PV, and Net Load with Storage')
def plot_new_daily_profiles(outputDF, colors):
Plot a new daily profile with demand as a line and stacked areas for imports, PV to demand, battery to demand,
and negative stacked areas for exports.
fig, ax = plt.subplots(figsize=(14, 8))
# Resample to daily sums
daily_df = outputDF.drop(columns=['datetime']).resample('D').sum()
# Calculate the daily import, PV to demand, battery to demand, and exports
daily_df['pv_to_demand'] = daily_df['pv'] + daily_df['negNetLoad'] - daily_df['posEInPV']
daily_df['import_to_meet_demand'] = daily_df['posNetLoad'] - daily_df['posEInGrid']
daily_df['battery_to_demand'] = -daily_df['negEOutLocal']
daily_df['import_to_battery'] = daily_df['posEInGrid']
daily_df['total_demand'] = daily_df['demand']
daily_df['export_pv_to_grid'] = daily_df['negNetLoad']
daily_df['export_battery_to_grid'] = daily_df['negEOutExport']
# Plot the total demand as a line
ax.plot(daily_df.index, daily_df['total_demand'], color=colors[0], label='Total Demand', linewidth=2)
# Plot the positive stacked areas
ax.fill_between(daily_df.index, 0, daily_df['import_to_meet_demand'], color=colors[1], label='Import to Meet Demand')
ax.fill_between(daily_df.index, daily_df['import_to_meet_demand'], daily_df['import_to_meet_demand'] + daily_df['pv_to_demand'], color=colors[2], label='PV to Demand')
ax.fill_between(daily_df.index, daily_df['import_to_meet_demand'] + daily_df['pv_to_demand'], daily_df['import_to_meet_demand'] + daily_df['pv_to_demand'] + daily_df['battery_to_demand'], color=colors[3], label='Battery to Demand')
# Plot the negative stacked areas below the x-axis
ax.fill_between(daily_df.index, 0, daily_df['export_battery_to_grid'], color=colors[4], label='Battery Export')
ax.fill_between(daily_df.index, daily_df['export_battery_to_grid'], daily_df['export_battery_to_grid'] + daily_df['export_pv_to_grid'], color=colors[5], label='PV to Grid')
# Formatting the plot
ax.set_xlabel('Date', fontsize=12)
ax.set_ylabel('Energy (kWh)', fontsize=12)
ax.set_title('Daily Energy Balance with Demand and Storage', fontsize=14)
ax.legend(loc='upper left', fontsize=10)
# Optionally, add data labels to stacked areas
# for i in range(len(daily_df)):
# ax.text(daily_df.index[i], daily_df['import_to_meet_demand'][i] / 2, f"{daily_df['import_to_meet_demand'][i]:.1f}", ha='center', va='center', fontsize=8, color='white')
# ax.text(daily_df.index[i], daily_df['import_to_meet_demand'][i] + daily_df['pv_to_demand'][i] / 2, f"{daily_df['pv_to_demand'][i]:.1f}", ha='center', va='center', fontsize=8, color='white')
# ax.text(daily_df.index[i], daily_df['import_to_meet_demand'][i] + daily_df['pv_to_demand'][i] + daily_df['battery_to_demand'][i] / 2, f"{daily_df['battery_to_demand'][i]:.1f}", ha='center', va='center', fontsize=8, color='white')
if __name__ == "__main__":
我不确定你的基本模型是否运行良好。 您在求解后不会检查最佳求解。 当我将你的数据缩减为前 4 个条目时,模型是无界的,所以有些地方出了问题。
考虑围绕解决方案编辑代码来执行此操作(glpk 部分很好,我只使用 CBC):
# opt = SolverFactory("glpk", executable=r"C:\Program Files\glpk-4.65\w64\glpsol.exe")
opt = SolverFactory('cbc')
t = time.time()
results = opt.solve(m)
if not check_optimal_termination(results=results):
print('non-optimal termination')
然后,复制数据并将其缩减为前 4 个值。 我认为这应该可以解决,但事实并非如此。 也许你的一些变量应该是NonNegativeReals