全部版块 我的主页
论坛 经济学论坛 三区 行为经济学与实验经济学
656 2
2022-07-07
省区碳经济分析的CGE模型及其应用——以河南省为例
附件列表
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

全部回复
2025-8-22 20:06:13
有没有代码?
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

2025-9-4 09:25:38
# henan_cge.py
# 单省(河南)静态CGE,碳税情景;可从SAM自动校准
# Author: you + ChatGPT

import pandas as pd, numpy as np
from math import isclose
from pyomo.environ import (ConcreteModel, Var, Param, Set, NonNegativeReals, Reals,
                           Objective, Constraint, SolverFactory, value, log, exp)
# -----------------------------
# 0) 数据准备:若不存在CSV则用最小示例
# -----------------------------
try:
    sam = pd.read_csv('sam_henan.csv', index_col=0)
    emis = pd.read_csv('emission_coeffs.csv')  # columns: sector, tCO2_per_output
except Exception:
    # 最小示例SAM(单位:亿元);行=收款方,列=付款方;行列和相等
    # 账户:商品E,M,S | 要素 L,K | 居民H | 政府G | 投资I | ROW(国外)
    acc = ['E','M','S','L','K','H','G','I','ROW']
    sam = pd.DataFrame(0.0, index=acc, columns=acc)
    # 中间投入(列=活动支出;E/M/S是活动账户,简化用同名)
    sam.loc['E','E'] = 50; sam.loc['M','E']=10; sam.loc['S','E']=5
    sam.loc['E','M'] = 20; sam.loc['M','M']=80; sam.loc['S','M']=15
    sam.loc['E','S'] = 10; sam.loc['M','S']=25; sam.loc['S','S']=60
    # 要素支付 → L/K 收入
    sam.loc['L','E']=20; sam.loc['K','E']=15
    sam.loc['L','M']=40; sam.loc['K','M']=35
    sam.loc['L','S']=30; sam.loc['K','S']=20
    # 居民、政府、投资对商品的最终需求
    sam.loc['E','H']=5;  sam.loc['M','H']=50; sam.loc['S','H']=40
    sam.loc['E','G']=3;  sam.loc['M','G']=8;  sam.loc['S','G']=12
    sam.loc['E','I']=2;  sam.loc['M','I']=30; sam.loc['S','I']=10
    # 进口(ROW→商品) 与 出口(商品→ROW)
    sam.loc['E','ROW']=8; sam.loc['M','ROW']=10; sam.loc['S','ROW']=6   # 出口
    sam.loc['ROW','E']=12; sam.loc['ROW','M']=15; sam.loc['ROW','S']=9  # 进口
    # 居民所得:L/K分配 + 政府转移(简化0) ;居民→政府税(简化)
    sam.loc['H','L']=60; sam.loc['H','K']=70
    sam.loc['G','H']=5;  sam.loc['G','K']=0
    sam.loc['ROW','H']=0
    # 税收(间接税)由政府获得:按活动支出收取(简化)
    sam.loc['G','E']+=2; sam.loc['G','M']+=6; sam.loc['G','S']+=4
    # 投资资金来源(储蓄):居民、政府、ROW
    sam.loc['I','H']=5; sam.loc['I','G']=1; sam.loc['I','ROW']=2

    # 排放系数(tCO2/亿元产出)——示例
    emis = pd.DataFrame({'sector':['E','M','S'],
                         'tCO2_per_output':[200.0, 20.0, 5.0]})

# 校验SAM平衡
if not isclose(sam.sum(0).sum(), sam.sum(1).sum(), rel_tol=1e-6):
    raise RuntimeError("SAM不平衡:总收支不相等")
if (sam.sum(0)-sam.sum(1)).abs().max()>1e-6:
    print("警告:某些账户行列和存在微小差异")

# 集合
goods = ['E','M','S']
factors = ['L','K']
agents = ['H','G','I']  # 居民、政府、投资
ROW = 'ROW'

# 基准价格与数量(归一化):以基准产值作numeraire (p=1) 标定数量=产值
base_output = sam.loc[goods, goods].sum(axis=0) + sam.loc[goods, agents].sum(axis=1) + sam.loc[goods, ROW]
p0 = {g:1.0 for g in goods}
x0 = {g: float(base_output[g]) for g in goods}

# 要素收入
wL0 = float(sam.loc['L', goods].sum())
rK0 = float(sam.loc['K', goods].sum())

# 居民基准消费
cH0 = {g: float(sam.loc[g,'H']) for g in goods}
YH0 = float(sam.loc['H', factors].sum() + sam.loc['G','H'] + sam.loc[ROW,'H']) - float(sam.loc['I','H'])

# 政府与投资基准需求
cG0 = {g: float(sam.loc[g,'G']) for g in goods}
cI0 = {g: float(sam.loc[g,'I']) for g in goods}
TG0 = float(sam.loc['G', goods].sum())  # 间接税收入(基期)

# 进出口基准
EX0 = {g: float(sam.loc[g, ROW]) for g in goods}
IM0 = {g: float(sam.loc[ROW, g]) for g in goods}

# 排放系数
emis = dict(zip(emis['sector'], emis['tCO2_per_output']))

# 弹性参数(可按文献设定;示例值)
sigma_prod = {'E':0.8,'M':0.8,'S':0.8}  # 生产CES (K,L) 弹性
sigma_arm  = {'E':2.0,'M':2.0,'S':2.0}  # Armington (D,M) 弹性
sigma_cet  = {'E':2.0,'M':2.0,'S':2.0}  # CET (Q,EX) 弹性
beta_H = {g: cH0[g]/max(1e-9,sum(cH0.values())) for g in goods}  # 居民效用份额

# 份额参数(基于基期校准)
# 生产:简化为CD(K,L),易求份额;需要能耗更复杂可自行扩展二层嵌套
alpha_L = {g: float(sam.loc['L', g]) / max(1e-9, sam.loc[['L','K'], g].sum()) for g in goods}
alpha_K = {g: 1.0 - alpha_L[g] for g in goods}

# Armington合成/ CET 变换:按基期数量份额校准
theta_M = {g: IM0[g]/max(1e-9,(IM0[g]+(x0[g]-EX0[g]))) for g in goods}  # 进口份额
phi_EX  = {g: EX0[g]/max(1e-9,x0[g]) for g in goods}                     # 出口份额

# -----------------------------
# 1) 构建模型
# -----------------------------
m = ConcreteModel()
m.G = Set(initialize=goods)
m.F = Set(initialize=factors)

# 变量:价格与数量
m.p = Var(m.G, domain=Reals, initialize=lambda m,g:1.0)       # 国内合成品价
m.pM= Var(m.G, domain=Reals, initialize=lambda m,g:1.0)       # 进口价(CIF,numéraire归一)
m.pQ= Var(m.G, domain=Reals, initialize=lambda m,g:1.0)       # 生产者价
m.wL= Var(domain=Reals, initialize=1.0)
m.rK= Var(domain=Reals, initialize=1.0)

m.X  = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:x0[g])  # 产出
m.CH = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cH0[g]) # 居民消费
m.CG = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cG0[g]) # 政府需求
m.CI = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:cI0[g]) # 投资需求
m.IM = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:IM0[g]) # 进口
m.EX = Var(m.G, domain=NonNegativeReals, initialize=lambda m,g:EX0[g]) # 出口

# 政策参数:碳税(元/吨CO2),返还开关
m.tau = Param(initialize=200.0, mutable=True)     # 政策情景:改为0跑基准
m.rebate_mode = Param(initialize=0, mutable=True) # 0:居民Lump-sum;1:劳动税减免;2:投资补贴

# 函数:CES合成/变换(用“对数CD”近似呈现,便于NLP)
def CES(a, w, rho):
    # a: [(share, quantity)] ; w: [price weights] -> 返回合成数量;这里我们用份额定标,约束里按份额平衡
    # 为稳定与简洁,模型约束采用CD近似(rho→0)与份额校准,足以用于教学复现;
    # 若要精确CES,请将对数线性化换为标准CES形式。
    s = sum(sh*log(max(1e-9,q)) for (sh,q) in a)
    return exp(s)

# 生产零利润(CD:pQ*X = wL*α_L*X + rK*α_K*X)
def prod_zero_profit_rule(m,g):
    return m.pQ[g]*m.X[g] == (alpha_L[g]*m.wL + alpha_K[g]*m.rK)*m.X[g]
m.ProdZeroProfit = Constraint(m.G, rule=prod_zero_profit_rule)

# Armington:国内合成品供给=国内生产供给(扣出口)+进口
def arm_balance(m,g):
    # p * (CH+CG+CI) = pQ*(X-EX) + pM*IM  —— 支出恒等式用数量平衡代替
    return m.CH[g] + m.CG[g] + m.CI[g] == (m.X[g] - m.EX[g]) + m.IM[g]
m.ArmBalance = Constraint(m.G, rule=arm_balance)

# 居民预算约束:Y_H = wL*L + rK*K + 返还 - 税
# 基期L,K名义化为1,使wL0、rK0即为基期要素价格;此处让名义收入按价格变动
YH_base = wL0 + rK0
def hh_budget(m):
    expend = sum(m.p[g]*m.CH[g] for g in m.G)
    # 碳税收入:
    Tcarbon = sum(m.tau * emis.get(g,0.0) * m.X[g] for g in m.G)
    # 返还:
    if value(m.rebate_mode)==0:
        rebate = Tcarbon
        labor_subsidy = 0; invest_subsidy = 0
    elif value(m.rebate_mode)==1:
        rebate = 0
        labor_subsidy = Tcarbon  # 等价为居民有效工资收入增加
        invest_subsidy = 0
    else:
        rebate = 0; labor_subsidy = 0
        invest_subsidy = Tcarbon  # 投资需求侧额外资金
        # 直接在投资预算里用到
    income = (m.wL/m.wL * wL0) + (m.rK/m.rK * rK0) + rebate + labor_subsidy*0  # 简化:工资补贴等价转移可并入rebate
    return expend <= income
m.HHBudget = Constraint(rule=hh_budget)

# 政府与投资“份额规则”:保持基期结构(也可改为内生化储蓄-投资闭合)
def gov_demand(m):
    scale = sum(m.p[g]*m.CG[g] for g in m.G) / max(1e-9,sum(p0[g]*cG0[g] for g in goods))
    return Constraint.Skip
m.GovRule = Constraint(expr=sum(m.p[g]*m.CG[g] for g in m.G) == sum(p0[g]*cG0[g] for g in goods))

# 投资支出由基期储蓄 + 可能的投资补贴提供(碳税返还=2)
def inv_budget(m):
    invest_base = sum(p0[g]*cI0[g] for g in goods)
    Tcarbon = sum(m.tau * emis.get(g,0.0) * m.X[g] for g in m.G)
    invest_sub = Tcarbon if value(m.rebate_mode)==2 else 0.0
    return sum(m.p[g]*m.CI[g] for g in m.G) == invest_base + invest_sub
m.InvestBudget = Constraint(rule=inv_budget)

# 家庭效用最大化(Cobb-Douglas)——在GE里等价于满足一阶条件的支出份额规则
def hh_demand_shares(m,g):
    tot = sum(m.p[k]*m.CH[k] for k in m.G)
    return m.p[g]*m.CH[g] == beta_H[g]*tot
m.HHShare = Constraint(m.G, rule=hh_demand_shares)

# 价格锚(numeraire)
m.PriceAnchor = Constraint(expr=m.p['S']==1.0)

# 目标:仅为帮助收敛,最小化平方违约(近似Walras法则;在均衡处为0)
m.obj = Objective(expr=sum((m.CH[g]+m.CG[g]+m.CI[g] - (m.X[g]-m.EX[g]+m.IM[g]))**2 for g in m.G))

def solve_case(tau, rebate_mode, solver='ipopt'):
    m.tau.value = tau
    m.rebate_mode.value = rebate_mode
    opt = SolverFactory(solver)
    res = opt.solve(m, tee=False)
    out = {
        'prices': {g: value(m.p[g]) for g in goods},
        'output': {g: value(m.X[g]) for g in goods},
        'cons_H': {g: value(m.CH[g]) for g in goods},
        'EX': {g: value(m.EX[g]) for g in goods},
        'IM': {g: value(m.IM[g]) for g in goods},
        'w': value(m.wL), 'r': value(m.rK),
        'emissions': sum(emis.get(g,0.0)*value(m.X[g]) for g in goods),
        'welfare_proxy': sum(np.log(max(1e-9, value(m.CH[g]))) * beta_H[g] for g in goods)
    }
    return out

if __name__ == "__main__":
    # 情景A:基准(无碳税)
    base = solve_case(tau=0.0, rebate_mode=0)
    # 情景B:碳税200元/tCO2,居民Lump-sum返还
    pol1 = solve_case(tau=200.0, rebate_mode=0)
    # 情景C:碳税200,投资补贴返还
    pol2 = solve_case(tau=200.0, rebate_mode=2)

    def show(tag,res):
        print(f"\n== {tag} ==")
        print("价格 p:", res['prices'])
        print("产出 X:", res['output'])
        print("居民消费 CH:", res['cons_H'])
        print("进/出 IM/EX:", res['IM'], res['EX'])
        print("工资/资本回报 (w,r):", res['w'], res['r'])
        print("排放合计(tCO2当量):", res['emissions'])
        print("福利(对数CD近似):", res['welfare_proxy'])

    show("Base", base)
    show("CarbonTax-LS", pol1)
    show("CarbonTax-InvestSub", pol2)
二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

相关推荐
栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群