# henan_cge.py
# 单省(河南)静态CGE,碳税情景;可从SAM自动校准
# Author: you + ChatGPT
import pandas as pd, numpy as np
from math import isclose
from pyomo.environ import (ConcreteModel, Var, Param, Set, NonNegativeReals, Reals,
Objective, Constraint, SolverFactory, value, log, exp)
# -----------------------------
# 0) 数据准备:若不存在CSV则用最小示例
# -----------------------------
try:
sam = pd.read_csv('sam_henan.csv', index_col=0)
emis = pd.read_csv('emission_coeffs.csv') # columns: sector, tCO2_per_output
except Exception:
# 最小示例SAM(单位:亿元);行=收款方,列=付款方;行列和相等
# 账户:商品E,M,S | 要素 L,K | 居民H | 政府G | 投资I | ROW(国外)
acc = ['E','M','S','L','K','H','G','I','ROW']
sam = pd.DataFrame(0.0, index=acc, columns=acc)
# 中间投入(列=活动支出;E/M/S是活动账户,简化用同名)
sam.loc['E','E'] = 50; sam.loc['M','E']=10; sam.loc['S','E']=5
sam.loc['E','M'] = 20; sam.loc['M','M']=80; sam.loc['S','M']=15
sam.loc['E','S'] = 10; sam.loc['M','S']=25; sam.loc['S','S']=60
# 要素支付 → L/K 收入
sam.loc['L','E']=20; sam.loc['K','E']=15
sam.loc['L','M']=40; sam.loc['K','M']=35
sam.loc['L','S']=30; sam.loc['K','S']=20
# 居民、政府、投资对商品的最终需求
sam.loc['E','H']=5; sam.loc['M','H']=50; sam.loc['S','H']=40
sam.loc['E','G']=3; sam.loc['M','G']=8; sam.loc['S','G']=12
sam.loc['E','I']=2; sam.loc['M','I']=30; sam.loc['S','I']=10
# 进口(ROW→商品) 与 出口(商品→ROW)
sam.loc['E','ROW']=8; sam.loc['M','ROW']=10; sam.loc['S','ROW']=6 # 出口
sam.loc['ROW','E']=12; sam.loc['ROW','M']=15; sam.loc['ROW','S']=9 # 进口
# 居民所得:L/K分配 + 政府转移(简化0) ;居民→政府税(简化)
sam.loc['H','L']=60; sam.loc['H','K']=70
sam.loc['G','H']=5; sam.loc['G','K']=0
sam.loc['ROW','H']=0
# 税收(间接税)由政府获得:按活动支出收取(简化)
sam.loc['G','E']+=2; sam.loc['G','M']+=6; sam.loc['G','S']+=4
# 投资资金来源(储蓄):居民、政府、ROW
sam.loc['I','H']=5; sam.loc['I','G']=1; sam.loc['I','ROW']=2
# 排放系数(tCO2/亿元产出)——示例
emis = pd.DataFrame({'sector':['E','M','S'],
'tCO2_per_output':[200.0, 20.0, 5.0]})
# 校验SAM平衡
if not isclose(sam.sum(0).sum(), sam.sum(1).sum(), rel_tol=1e-6):
raise RuntimeError("SAM不平衡:总收支不相等")
if (sam.sum(0)-sam.sum(1)).abs().max()>1e-6:
print("警告:某些账户行列和存在微小差异")
# 集合
goods = ['E','M','S']
factors = ['L','K']
agents = ['H','G','I'] # 居民、政府、投资
ROW = 'ROW'
# 基准价格与数量(归一化):以基准产值作numeraire (p=1) 标定数量=产值
base_output = sam.loc[goods, goods].sum(axis=0) + sam.loc[goods, agents].sum(axis=1) + sam.loc[goods, ROW]
p0 = {g:1.0 for g in goods}
x0 = {g: float(base_output[g]) for g in goods}
# 要素收入
wL0 = float(sam.loc['L', goods].sum())
rK0 = float(sam.loc['K', goods].sum())
# 居民基准消费
cH0 = {g: float(sam.loc[g,'H']) for g in goods}
YH0 = float(sam.loc['H', factors].sum() + sam.loc['G','H'] + sam.loc[ROW,'H']) - float(sam.loc['I','H'])
# 政府与投资基准需求
cG0 = {g: float(sam.loc[g,'G']) for g in goods}
cI0 = {g: float(sam.loc[g,'I']) for g in goods}
TG0 = float(sam.loc['G', goods].sum()) # 间接税收入(基期)
# 进出口基准
EX0 = {g: float(sam.loc[g, ROW]) for g in goods}
IM0 = {g: float(sam.loc[ROW, g]) for g in goods}
# 排放系数
emis = dict(zip(emis['sector'], emis['tCO2_per_output']))
# 弹性参数(可按文献设定;示例值)
sigma_prod = {'E':0.8,'M':0.8,'S':0.8} # 生产CES (K,L) 弹性
sigma_arm = {'E':2.0,'M':2.0,'S':2.0} # Armington (D,M) 弹性
sigma_cet = {'E':2.0,'M':2.0,'S':2.0} # CET (Q,EX) 弹性
beta_H = {g: cH0[g]/max(1e-9,sum(cH0.values())) for g in goods} # 居民效用份额
# 份额参数(基于基期校准)
# 生产:简化为CD(K,L),易求份额;需要能耗更复杂可自行扩展二层嵌套
alpha_L = {g: float(sam.loc['L', g]) / max(1e-9, sam.loc[['L','K'], g].sum()) for g in goods}
alpha_K = {g: 1.0 - alpha_L[g] for g in goods}
# Armington合成/ CET 变换:按基期数量份额校准
theta_M = {g: IM0[g]/max(1e-9,(IM0[g]+(x0[g]-EX0[g]))) for g in goods} # 进口份额
phi_EX = {g: EX0[g]/max(1e-9,x0[g]) for g in goods} # 出口份额
# -----------------------------
# 1) 构建模型
# -----------------------------
m = ConcreteModel()
m.G = Set(initialize=goods)
m.F = Set(initialize=factors)
# 变量:价格与数量
m.p = Var(m.G, domain=Reals, initialize=lambda m,g:1.0) # 国内合成品价
m.pM= Var(m.G, domain=Reals, initialize=lambda m,g:1.0) # 进口价(CIF,numéraire归一)
m.pQ= Var(m.G, domain=Reals, initialize=lambda m,g:1.0) # 生产者价
m.wL= Var(domain=Reals, initialize=1.0)
m.rK= Var(domain=Reals, initialize=1.0)
m.X = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:x0[g]) # 产出
m.CH = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cH0[g]) # 居民消费
m.CG = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cG0[g]) # 政府需求
m.CI = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cI0[g]) # 投资需求
m.IM = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:IM0[g]) # 进口
m.EX = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:EX0[g]) # 出口
# 政策参数:碳税(元/吨CO2),返还开关
m.tau = Param(initialize=200.0, mutable=True) # 政策情景:改为0跑基准
m.rebate_mode = Param(initialize=0, mutable=True) # 0:居民Lump-sum;1:劳动税减免;2:投资补贴
# 函数:CES合成/变换(用“对数CD”近似呈现,便于NLP)
def CES(a, w, rho):
# a: [(share, quantity)] ; w: [price weights] -> 返回合成数量;这里我们用份额定标,约束里按份额平衡
# 为稳定与简洁,模型约束采用CD近似(rho→0)与份额校准,足以用于教学复现;
# 若要精确CES,请将对数线性化换为标准CES形式。
s = sum(sh*log(max(1e-9,q)) for (sh,q) in a)
return exp(s)
# 生产零利润(CD:pQ*X = wL*α_L*X + rK*α_K*X)
def prod_zero_profit_rule(m,g):
return m.pQ[g]*m.X[g] == (alpha_L[g]*m.wL + alpha_K[g]*m.rK)*m.X[g]
m.ProdZeroProfit = Constraint(m.G, rule=prod_zero_profit_rule)
# Armington:国内合成品供给=国内生产供给(扣出口)+进口
def arm_balance(m,g):
# p * (CH+CG+CI) = pQ*(X-EX) + pM*IM —— 支出恒等式用数量平衡代替
return m.CH[g] + m.CG[g] + m.CI[g] == (m.X[g] - m.EX[g]) + m.IM[g]
m.ArmBalance = Constraint(m.G, rule=arm_balance)
# 居民预算约束:Y_H = wL*L + rK*K + 返还 - 税
# 基期L,K名义化为1,使wL0、rK0即为基期要素价格;此处让名义收入按价格变动
YH_base = wL0 + rK0
def hh_budget(m):
expend = sum(m.p[g]*m.CH[g] for g in m.G)
# 碳税收入:
Tcarbon = sum(m.tau * emis.get(g,0.0) * m.X[g] for g in m.G)
# 返还:
if value(m.rebate_mode)==0:
rebate = Tcarbon
labor_subsidy = 0; invest_subsidy = 0
elif value(m.rebate_mode)==1:
rebate = 0
labor_subsidy = Tcarbon # 等价为居民有效工资收入增加
invest_subsidy = 0
else:
rebate = 0; labor_subsidy = 0
invest_subsidy = Tcarbon # 投资需求侧额外资金
# 直接在投资预算里用到
income = (m.wL/m.wL * wL0) + (m.rK/m.rK * rK0) + rebate + labor_subsidy*0 # 简化:工资补贴等价转移可并入rebate
return expend <= income
m.HHBudget = Constraint(rule=hh_budget)
# 政府与投资“份额规则”:保持基期结构(也可改为内生化储蓄-投资闭合)
def gov_demand(m):
scale = sum(m.p[g]*m.CG[g] for g in m.G) / max(1e-9,sum(p0[g]*cG0[g] for g in goods))
return Constraint.Skip
m.GovRule = Constraint(expr=sum(m.p[g]*m.CG[g] for g in m.G) == sum(p0[g]*cG0[g] for g in goods))
# 投资支出由基期储蓄 + 可能的投资补贴提供(碳税返还=2)
def inv_budget(m):
invest_base = sum(p0[g]*cI0[g] for g in goods)
Tcarbon = sum(m.tau * emis.get(g,0.0) * m.X[g] for g in m.G)
invest_sub = Tcarbon if value(m.rebate_mode)==2 else 0.0
return sum(m.p[g]*m.CI[g] for g in m.G) == invest_base + invest_sub
m.InvestBudget = Constraint(rule=inv_budget)
# 家庭效用最大化(Cobb-Douglas)——在GE里等价于满足一阶条件的支出份额规则
def hh_demand_shares(m,g):
tot = sum(m.p[k]*m.CH[k] for k in m.G)
return m.p[g]*m.CH[g] == beta_H[g]*tot
m.HHShare = Constraint(m.G, rule=hh_demand_shares)
# 价格锚(numeraire)
m.PriceAnchor = Constraint(expr=m.p['S']==1.0)
# 目标:仅为帮助收敛,最小化平方违约(近似Walras法则;在均衡处为0)
m.obj = Objective(expr=sum((m.CH[g]+m.CG[g]+m.CI[g] - (m.X[g]-m.EX[g]+m.IM[g]))**2 for g in m.G))
def solve_case(tau, rebate_mode, solver='ipopt'):
m.tau.value = tau
m.rebate_mode.value = rebate_mode
opt = SolverFactory(solver)
res = opt.solve(m, tee=False)
out = {
'prices': {g: value(m.p[g]) for g in goods},
'output': {g: value(m.X[g]) for g in goods},
'cons_H': {g: value(m.CH[g]) for g in goods},
'EX': {g: value(m.EX[g]) for g in goods},
'IM': {g: value(m.IM[g]) for g in goods},
'w': value(m.wL), 'r': value(m.rK),
'emissions': sum(emis.get(g,0.0)*value(m.X[g]) for g in goods),
'welfare_proxy': sum(np.log(max(1e-9, value(m.CH[g]))) * beta_H[g] for g in goods)
}
return out
if __name__ == "__main__":
# 情景A:基准(无碳税)
base = solve_case(tau=0.0, rebate_mode=0)
# 情景B:碳税200元/tCO2,居民Lump-sum返还
pol1 = solve_case(tau=200.0, rebate_mode=0)
# 情景C:碳税200,投资补贴返还
pol2 = solve_case(tau=200.0, rebate_mode=2)
def show(tag,res):
print(f"\n== {tag} ==")
print("价格 p:", res['prices'])
print("产出 X:", res['output'])
print("居民消费 CH:", res['cons_H'])
print("进/出 IM/EX:", res['IM'], res['EX'])
print("工资/资本回报 (w,r):", res['w'], res['r'])
print("排放合计(tCO2当量):", res['emissions'])
print("福利(对数CD近似):", res['welfare_proxy'])
show("Base", base)
show("CarbonTax-LS", pol1)
show("CarbonTax-InvestSub", pol2)