全部版块 我的主页
论坛 数据科学与人工智能 人工智能
70 0
2026-02-11

引言

处理包含十亿行的大规模数据集,是数据科学与分析领域的一大挑战。传统工具(如Pandas)在中小规模、能放入系统内存的数据集上表现出色,但随着数据集规模扩大,它们会变得缓慢、占用大量随机存取内存(RAM),且经常因内存不足(OOM)错误崩溃。

这就是Vaex的用武之地——Vaex是一款用于核外数据处理的高性能Python库。它能让你高效、省内存地检查、修改、可视化和分析大型表格数据集,即便在普通笔记本电脑上也能轻松实现。


什么是Vaex?

Vaex是一款用于惰性、核外Datafr ame的Python库(类似Pandas),专为超出RAM容量的数据集设计。

核心特点

  • Vaex通过直接操作磁盘上的数据,仅读取所需部分,避免将整个文件加载到内存,从而高效处理大规模数据集。

  • Vaex采用惰性求值机制,即仅在实际需要结果时才执行计算;它还能通过内存映射,即时打开按列存储(而非按行存储)的柱状数据库,如HDF5、Apache Arrow和Parquet。

  • 基于优化的C/C++后端,Vaex每秒可对数十亿行数据进行统计计算和操作,即便在普通硬件上也能实现大规模分析的高速运行。

  • 拥有类似Pandas的应用程序编程接口(API),让熟悉Pandas的用户能轻松过渡,无需陡峭的学习曲线就能利用大数据处理能力。


Vaex与Dask的对比

Vaex并非整体与Dask相似,而是与构建在Pandas Datafr ame之上的Dask Datafr ames类似。这意味着Dask继承了Pandas的某些问题,例如在某些场景下需要将数据完全加载到RAM中才能处理——而Vaex则无需如此。

Vaex不会复制Datafr ame,因此能在内存较少的机器上处理更大的Datafr ame。Vaex和Dask均采用惰性处理,但核心区别在于:Vaex仅在需要时计算字段,而Dask需要显式调用compute()函数。

若要充分利用Vaex的优势,数据需为HDF5或Apache Arrow格式。


传统工具为何陷入困境

Pandas等工具在处理前会将整个数据集加载到RAM中。对于超出内存的数据集,这会导致:

  • 性能缓慢

  • 系统崩溃(内存不足错误)

  • 交互性受限

而Vaex从不将整个数据集加载到内存中,而是:

  • 从磁盘流式读取数据

  • 使用虚拟列和惰性求值延迟计算

  • 仅在显式需要时才生成结果

这使得即便在普通硬件上,也能对大规模数据集进行分析。


Vaex的底层工作原理

1. 核外执行(Out-of-Core Execution)

Vaex通过内存映射,根据需要从磁盘读取数据。这让它能够处理远大于RAM容量的数据文件。

2. 惰性求值(Lazy Evaluation)

Vaex不会立即执行每个操作,而是构建一个计算图。仅当你请求结果时(例如打印或绘图),才会执行计算。

3. 虚拟列(Virtual Columns)

虚拟列是在数据集上定义的表达式,在计算前不会占用内存。这能节省RAM并加快工作流程。


Vaex快速入门

1. 安装Vaex

创建一个干净的虚拟环境:

conda create -n vaex_demo python=3.9
conda activate vaex_demo

使用pip安装Vaex:

pip install vaex-core vaex-hdf5 vaex-viz

升级Vaex:

pip install --upgrade vaex

安装支持库:

pip install pandas numpy matplotlib

2. 打开大型数据集

Vaex支持多种处理大型数据集的流行存储格式,可直接处理HDF5、Apache Arrow和Parquet文件——这些格式均经过优化,能实现高效的磁盘访问和快速分析。

虽然Vaex也能读取CSV文件,但为了提升处理大型数据集时的性能,它需要先将CSV文件转换为更高效的格式。

打开Parquet文件的方法:

import vaex

df = vaex.open("your_huge_dataset.parquet")
print(df)

现在,你可以在不将数据集加载到内存的情况下,检查其结构。

3. Vaex核心操作

数据过滤:

filtered = df[df.sales > 1000]

这不会立即计算结果,而是注册过滤器,仅在需要时应用。

分组与聚合:

result = df.groupby("category", agg=vaex.agg.mean("sales"))
print(result)

Vaex使用并行算法和最少内存,高效计算聚合结果。

统计计算:

mean_price = df["price"].mean()
print(mean_price)

Vaex通过分块扫描数据集,即时计算统计结果。

4. 出租车数据集实战演示

我们将创建一个包含5000万行的真实出租车数据集,以演示Vaex的功能:

import vaex
import numpy as np
import pandas as pd
import time

# 设置随机种子以保证可重复性
np.random.seed(42)
print("Creating 50 million row dataset...")
n = 50_000_000

# 生成真实的出租车行程数据
data = {
    'passenger_count': np.random.randint(17, n),
    'trip_distance': np.random.exponential(3, n),
    'fare_amount': np.random.gamma(101.5, n),
    'tip_amount': np.random.gamma(21, n),
    'total_amount': np.random.gamma(121.8, n),
    'payment_type': np.random.choice(['credit''cash''mobile'], n),
    'pickup_hour': np.random.randint(024, n),
    'pickup_day': np.random.randint(18, n),
}

# 创建Vaex Datafr ame
df_vaex = vaex.from_dict(data)

# 导出为HDF5格式(Vaex高效格式)
df_vaex.export_hdf5('taxi_50M.hdf5')
print(f"Created dataset with {n:,} rows")

输出结果:

Shape: (50000000, 8)
Created dataset with 50,000,000 rows

现在我们得到了一个包含5000万行、8列的数据集。

5. Vaex与Pandas性能对比

Vaex内存映射打开大型文件:

start = time.time()
df_vaex = vaex.open('taxi_50M.hdf5')
vaex_time = time.time() - start

print(f"Vaex opened {df_vaex.shape[0]:,} rows in {vaex_time:.4f} seconds")
print(f"Memory usage: ~0 MB (memory-mapped)")

输出结果:

Vaex opened 50,000,000 rows in 0.0199 seconds
Memory usage: ~0 MB (memory-mapped)

Pandas:加载到内存(请勿尝试用5000万行数据测试!):

# 大多数机器上会失败
df_pandas = pd.read_hdf('taxi_50M.hdf5')

这会导致内存错误!Vaex几乎能即时打开文件,无论文件大小,因为它不会将数据加载到内存中。

基础聚合:计算5000万行数据的统计信息:

start = time.time()
stats = {
    'mean_fare': df_vaex.fare_amount.mean(),
    'mean_distance': df_vaex.trip_distance.mean(),
    'total_revenue': df_vaex.total_amount.sum(),
    'max_fare': df_vaex.fare_amount.max(),
    'min_fare': df_vaex.fare_amount.min(),
}
agg_time = time.time() - start

print(f"\nComputed 5 aggregations in {agg_time:.4f} seconds:")
print(f"  Mean fare: ${stats['mean_fare']:.2f}")
print(f"  Mean distance: {stats['mean_distance']:.2f} miles")
print(f"  Total revenue: ${stats['total_revenue']:,.2f}")
print(f"  Fare range: ${stats['min_fare']:.2f} - ${stats['max_fare']:.2f}")

输出结果:

Computed 5 aggregations in 0.8771 seconds:
  Mean fare: $15.00
  Mean distance: 3.00 miles
  Total revenue: $1,080,035,827.27
  Fare range: $1.25 - $55.30

过滤操作:筛选长途行程:

start = time.time()
long_trips = df_vaex[df_vaex.trip_distance > 10]
filter_time = time.time() - start

print(f"\nFiltered for trips > 10 miles in {filter_time:.4f} seconds")
print(f"  Found: {len(long_trips):,} long trips")
print(f"  Percentage: {(len(long_trips)/len(df_vaex)*100):.2f}%")

输出结果:

Filtered for trips > 10 miles in 0.0486 seconds
Found: 1,784,122 long trips
Percentage: 3.57%

多条件过滤:

start = time.time()
premium_trips = df_vaex[(df_vaex.trip_distance > 5) & 
                        (df_vaex.fare_amount > 20) & 
                        (df_vaex.payment_type == 'credit')]
multi_filter_time = time.time() - start

print(f"\nMultiple condition filter in {multi_filter_time:.4f} seconds")
print(f"  Premium trips (>5mi, >$20, credit): {len(premium_trips):,}")

输出结果:

Multiple condition filter in 0.0582 seconds
Premium trips (>5mi, >$20, credit): 457,191

分组操作:

start = time.time()
by_payment = df_vaex.groupby('payment_type', agg={
    'mean_fare': vaex.agg.mean('fare_amount'),
    'mean_tip': vaex.agg.mean('tip_amount'),
    'total_trips': vaex.agg.count(),
    'total_revenue': vaex.agg.sum('total_amount')
})
groupby_time = time.time() - start

print(f"\nGroupBy operation in {groupby_time:.4f} seconds")
print(by_payment.to_pandas_df())

输出结果:

GroupBy operation in 5.6362 seconds
  payment_type  mean_fare  mean_tip  total_trips  total_revenue
0       credit  15.001817  2.000065     16663623   3.599456e+08
1       mobile  15.001200  1.999679     16667691   3.600165e+08
2         cash  14.999397  2.000115     16668686   3.600737e+08

更复杂的分组操作:

start = time.time()
by_hour = df_vaex.groupby('pickup_hour', agg={
    'avg_distance': vaex.agg.mean('trip_distance'),
    'avg_fare': vaex.agg.mean('fare_amount'),
    'trip_count': vaex.agg.count()
})
complex_groupby_time = time.time() - start

print(f"\nGroupBy by hour in {complex_groupby_time:.4f} seconds")
print(by_hour.to_pandas_df().head(10))

输出结果:

GroupBy by hour in 1.6910 seconds
   pickup_hour  avg_distance   avg_fare  trip_count
0            0      2.998120  14.997462     2083481
1            1      3.000969  14.998814     2084650
2            2      3.003834  15.001777     2081962
3            3      3.001263  14.998196     2081715
4            4      2.998343  14.999593     2083882
5            5      2.997586  15.003988     2083421
6            6      2.999887  15.011615     2083213
7            7      3.000240  14.996892     2085156
8            8      3.002640  15.000326     2082704
9            9      2.999857  14.997857     2082284

6. Vaex高级特性

虚拟列(计算列)允许在不复制数据的情况下添加列:

df_vaex['tip_percentage'] = (df_vaex.tip_amount / df_vaex.fare_amount) * 100
df_vaex['is_generous_tipper'] = df_vaex.tip_percentage > 20
df_vaex['rush_hour'] = (df_vaex.pickup_hour >= 7) & (df_vaex.pickup_hour <= 9) | \
                        (df_vaex.pickup_hour >= 17) & (df_vaex.pickup_hour <= 19)

这些列即时计算,无内存开销:

print("Added 3 virtual columns with zero memory overhead")
generous_tippers = df_vaex[df_vaex.is_generous_tipper]
print(f"Generous tippers (>20% tip): {len(generous_tippers):,}")

rush_hour_trips = df_vaex[df_vaex.rush_hour]
print(f"Rush hour trips: {len(rush_hour_trips):,}")

输出结果:

VIRTUAL COLUMNS
Added 3 virtual columns with zero memory overhead
Generous tippers (>20% tip): 11,997,433
Rush hour trips: 12,498,848

相关性分析:

corr = df_vaex.correlation(df_vaex.trip_distance, df_vaex.fare_amount)
print(f"Correlation (distance vs fare): {corr:.4f}")

分位数计算:

try:
    percentiles = df_vaex.percentile_approx('fare_amount', [255075909599])
except AttributeError:
    percentiles = [
        df_vaex.fare_amount.quantile(0.25),
        df_vaex.fare_amount.quantile(0.50),
        df_vaex.fare_amount.quantile(0.75),
        df_vaex.fare_amount.quantile(0.90),
        df_vaex.fare_amount.quantile(0.95),
        df_vaex.fare_amount.quantile(0.99),
    ]

print(f"\nFare percentiles:")
print(f"25th: ${percentiles[0]:.2f}")
print(f"50th (median): ${percentiles[1]:.2f}")
print(f"75th: ${percentiles[2]:.2f}")
print(f"90th: ${percentiles[3]:.2f}")
print(f"95th: ${percentiles[4]:.2f}")
print(f"99th: ${percentiles[5]:.2f}")

标准差计算:

std_fare = df_vaex.fare_amount.std()
print(f"\nStandard deviation of fares: ${std_fare:.2f}")

其他常用统计信息:

print(f"\nAdditional statistics:")
print(f"Mean: ${df_vaex.fare_amount.mean():.2f}")
print(f"Min: ${df_vaex.fare_amount.min():.2f}")
print(f"Max: ${df_vaex.fare_amount.max():.2f}")

输出结果:

Correlation (distance vs fare): -0.0001

Fare percentiles:
  25th: $11.57
  50th (median): $nan
  75th: $nan
  90th: $nan
  95th: $nan
  99th: $nan

Standard deviation of fares: $4.74

Additional statistics:
  Mean: $15.00
  Min: $1.25
  Max: $55.30

7. 数据导出

导出过滤后的数据:

# 筛选高价值行程
high_value_trips = df_vaex[df_vaex.total_amount > 50]

导出为多种格式:

start = time.time()
high_value_trips.export_hdf5('high_value_trips.hdf5')
export_time = time.time() - start
print(f"Exported {len(high_value_trips):,} rows to HDF5 in {export_time:.4f}s")

也可导出为CSV、Parquet等格式:

high_value_trips.export_csv('high_value_trips.csv')
high_value_trips.export_parquet('high_value_trips.parquet')

输出结果:

Exported 13,054 rows to HDF5 in 5.4508s

8. 性能汇总面板

print("VAEX PERFORMANCE SUMMARY")
print(f"Dataset size:           {n:,} rows")
print(f"File size on disk:      ~2.4 GB")
print(f"RAM usage:              ~0 MB (memory-mapped)")
print()
print(f"Open time:              {vaex_time:.4f} seconds")
print(f"Single aggregation:     {agg_time:.4f} seconds")
print(f"Simple filter:          {filter_time:.4f} seconds")
print(f"Complex filter:         {multi_filter_time:.4f} seconds")
print(f"GroupBy operation:      {groupby_time:.4f} seconds")
print()
print(f"Throughput:             ~{n/groupby_time:,.0f} rows/second")

输出结果:

VAEX PERFORMANCE SUMMARY
Dataset size:           50,000,000 rows
File size on disk:      ~2.4 GB
RAM usage:              ~0 MB (memory-mapped)

Open time:              0.0199 seconds
Single aggregation:     0.8771 seconds
Simple filter:          0.0486 seconds
Complex filter:         0.0582 seconds
GroupBy operation:      5.6362 seconds

Throughput:             ~8,871,262 rows/second

总结思考

当你处理大于1GB、无法放入RAM的大规模数据集、探索大数据、对百万行数据进行特征工程,或构建数据预处理流水线时,Vaex是理想选择。

以下场景不建议使用Vaex:

  • 数据集小于100MB:此时使用Pandas更简单。

  • 需对多个表格进行复杂连接:使用结构化查询语言(SQL)数据库更佳。

  • 需使用完整Pandas API:Vaex的兼容性有限。

  • 处理实时流数据:需使用其他更合适的工具。

Vaex填补了Python数据科学生态系统的一个空白:无需将所有数据加载到内存,就能高效、交互式地处理十亿行数据集。其核外架构、惰性执行模型和优化算法,使其成为即便在笔记本电脑上也能进行大数据探索的强大工具。无论你是在探索海量日志、科学调查数据还是高频时间序列,Vaex都能帮助你在易用性和大数据扩展性之间搭建桥梁。

推荐学习书籍 《CDA一级教材》适合CDA一级考生备考,也适合业务及数据分析岗位的从业者提升自我。完整电子版已上线CDA网校,累计已有10万+在读~ !

免费加入阅读:https://edu.cda.cn/goods/show/3151?targetId=5147&preview=0

二维码

扫码加我 拉你入群

请注明:姓名-公司-职位

以便审核进群资格,未注明则拒绝

栏目导航
热门文章
推荐文章

说点什么

分享

扫码加好友,拉您进群
各岗位、行业、专业交流群