
处理包含十亿行的大规模数据集,是数据科学与分析领域的一大挑战。传统工具(如Pandas)在中小规模、能放入系统内存的数据集上表现出色,但随着数据集规模扩大,它们会变得缓慢、占用大量随机存取内存(RAM),且经常因内存不足(OOM)错误崩溃。
这就是Vaex的用武之地——Vaex是一款用于核外数据处理的高性能Python库。它能让你高效、省内存地检查、修改、可视化和分析大型表格数据集,即便在普通笔记本电脑上也能轻松实现。
Vaex是一款用于惰性、核外Datafr ame的Python库(类似Pandas),专为超出RAM容量的数据集设计。
Vaex通过直接操作磁盘上的数据,仅读取所需部分,避免将整个文件加载到内存,从而高效处理大规模数据集。
Vaex采用惰性求值机制,即仅在实际需要结果时才执行计算;它还能通过内存映射,即时打开按列存储(而非按行存储)的柱状数据库,如HDF5、Apache Arrow和Parquet。
基于优化的C/C++后端,Vaex每秒可对数十亿行数据进行统计计算和操作,即便在普通硬件上也能实现大规模分析的高速运行。
拥有类似Pandas的应用程序编程接口(API),让熟悉Pandas的用户能轻松过渡,无需陡峭的学习曲线就能利用大数据处理能力。
Vaex并非整体与Dask相似,而是与构建在Pandas Datafr ame之上的Dask Datafr ames类似。这意味着Dask继承了Pandas的某些问题,例如在某些场景下需要将数据完全加载到RAM中才能处理——而Vaex则无需如此。
Vaex不会复制Datafr ame,因此能在内存较少的机器上处理更大的Datafr ame。Vaex和Dask均采用惰性处理,但核心区别在于:Vaex仅在需要时计算字段,而Dask需要显式调用compute()函数。
若要充分利用Vaex的优势,数据需为HDF5或Apache Arrow格式。
Pandas等工具在处理前会将整个数据集加载到RAM中。对于超出内存的数据集,这会导致:
性能缓慢
系统崩溃(内存不足错误)
交互性受限
而Vaex从不将整个数据集加载到内存中,而是:
从磁盘流式读取数据
使用虚拟列和惰性求值延迟计算
仅在显式需要时才生成结果
这使得即便在普通硬件上,也能对大规模数据集进行分析。
Vaex通过内存映射,根据需要从磁盘读取数据。这让它能够处理远大于RAM容量的数据文件。
Vaex不会立即执行每个操作,而是构建一个计算图。仅当你请求结果时(例如打印或绘图),才会执行计算。
虚拟列是在数据集上定义的表达式,在计算前不会占用内存。这能节省RAM并加快工作流程。
创建一个干净的虚拟环境:
conda create -n vaex_demo python=3.9
conda activate vaex_demo
使用pip安装Vaex:
pip install vaex-core vaex-hdf5 vaex-viz
升级Vaex:
pip install --upgrade vaex
安装支持库:
pip install pandas numpy matplotlib
Vaex支持多种处理大型数据集的流行存储格式,可直接处理HDF5、Apache Arrow和Parquet文件——这些格式均经过优化,能实现高效的磁盘访问和快速分析。
虽然Vaex也能读取CSV文件,但为了提升处理大型数据集时的性能,它需要先将CSV文件转换为更高效的格式。
打开Parquet文件的方法:
import vaex
df = vaex.open("your_huge_dataset.parquet")
print(df)
现在,你可以在不将数据集加载到内存的情况下,检查其结构。
数据过滤:
filtered = df[df.sales > 1000]
这不会立即计算结果,而是注册过滤器,仅在需要时应用。
分组与聚合:
result = df.groupby("category", agg=vaex.agg.mean("sales"))
print(result)
Vaex使用并行算法和最少内存,高效计算聚合结果。
统计计算:
mean_price = df["price"].mean()
print(mean_price)
Vaex通过分块扫描数据集,即时计算统计结果。
我们将创建一个包含5000万行的真实出租车数据集,以演示Vaex的功能:
import vaex
import numpy as np
import pandas as pd
import time
# 设置随机种子以保证可重复性
np.random.seed(42)
print("Creating 50 million row dataset...")
n = 50_000_000
# 生成真实的出租车行程数据
data = {
'passenger_count': np.random.randint(1, 7, n),
'trip_distance': np.random.exponential(3, n),
'fare_amount': np.random.gamma(10, 1.5, n),
'tip_amount': np.random.gamma(2, 1, n),
'total_amount': np.random.gamma(12, 1.8, n),
'payment_type': np.random.choice(['credit', 'cash', 'mobile'], n),
'pickup_hour': np.random.randint(0, 24, n),
'pickup_day': np.random.randint(1, 8, n),
}
# 创建Vaex Datafr ame
df_vaex = vaex.from_dict(data)
# 导出为HDF5格式(Vaex高效格式)
df_vaex.export_hdf5('taxi_50M.hdf5')
print(f"Created dataset with {n:,} rows")
输出结果:
Shape: (50000000, 8)
Created dataset with 50,000,000 rows
现在我们得到了一个包含5000万行、8列的数据集。
Vaex内存映射打开大型文件:
start = time.time()
df_vaex = vaex.open('taxi_50M.hdf5')
vaex_time = time.time() - start
print(f"Vaex opened {df_vaex.shape[0]:,} rows in {vaex_time:.4f} seconds")
print(f"Memory usage: ~0 MB (memory-mapped)")
输出结果:
Vaex opened 50,000,000 rows in 0.0199 seconds
Memory usage: ~0 MB (memory-mapped)
Pandas:加载到内存(请勿尝试用5000万行数据测试!):
# 大多数机器上会失败
df_pandas = pd.read_hdf('taxi_50M.hdf5')
这会导致内存错误!Vaex几乎能即时打开文件,无论文件大小,因为它不会将数据加载到内存中。
基础聚合:计算5000万行数据的统计信息:
start = time.time()
stats = {
'mean_fare': df_vaex.fare_amount.mean(),
'mean_distance': df_vaex.trip_distance.mean(),
'total_revenue': df_vaex.total_amount.sum(),
'max_fare': df_vaex.fare_amount.max(),
'min_fare': df_vaex.fare_amount.min(),
}
agg_time = time.time() - start
print(f"\nComputed 5 aggregations in {agg_time:.4f} seconds:")
print(f" Mean fare: ${stats['mean_fare']:.2f}")
print(f" Mean distance: {stats['mean_distance']:.2f} miles")
print(f" Total revenue: ${stats['total_revenue']:,.2f}")
print(f" Fare range: ${stats['min_fare']:.2f} - ${stats['max_fare']:.2f}")
输出结果:
Computed 5 aggregations in 0.8771 seconds:
Mean fare: $15.00
Mean distance: 3.00 miles
Total revenue: $1,080,035,827.27
Fare range: $1.25 - $55.30
过滤操作:筛选长途行程:
start = time.time()
long_trips = df_vaex[df_vaex.trip_distance > 10]
filter_time = time.time() - start
print(f"\nFiltered for trips > 10 miles in {filter_time:.4f} seconds")
print(f" Found: {len(long_trips):,} long trips")
print(f" Percentage: {(len(long_trips)/len(df_vaex)*100):.2f}%")
输出结果:
Filtered for trips > 10 miles in 0.0486 seconds
Found: 1,784,122 long trips
Percentage: 3.57%
多条件过滤:
start = time.time()
premium_trips = df_vaex[(df_vaex.trip_distance > 5) &
(df_vaex.fare_amount > 20) &
(df_vaex.payment_type == 'credit')]
multi_filter_time = time.time() - start
print(f"\nMultiple condition filter in {multi_filter_time:.4f} seconds")
print(f" Premium trips (>5mi, >$20, credit): {len(premium_trips):,}")
输出结果:
Multiple condition filter in 0.0582 seconds
Premium trips (>5mi, >$20, credit): 457,191
分组操作:
start = time.time()
by_payment = df_vaex.groupby('payment_type', agg={
'mean_fare': vaex.agg.mean('fare_amount'),
'mean_tip': vaex.agg.mean('tip_amount'),
'total_trips': vaex.agg.count(),
'total_revenue': vaex.agg.sum('total_amount')
})
groupby_time = time.time() - start
print(f"\nGroupBy operation in {groupby_time:.4f} seconds")
print(by_payment.to_pandas_df())
输出结果:
GroupBy operation in 5.6362 seconds
payment_type mean_fare mean_tip total_trips total_revenue
0 credit 15.001817 2.000065 16663623 3.599456e+08
1 mobile 15.001200 1.999679 16667691 3.600165e+08
2 cash 14.999397 2.000115 16668686 3.600737e+08
更复杂的分组操作:
start = time.time()
by_hour = df_vaex.groupby('pickup_hour', agg={
'avg_distance': vaex.agg.mean('trip_distance'),
'avg_fare': vaex.agg.mean('fare_amount'),
'trip_count': vaex.agg.count()
})
complex_groupby_time = time.time() - start
print(f"\nGroupBy by hour in {complex_groupby_time:.4f} seconds")
print(by_hour.to_pandas_df().head(10))
输出结果:
GroupBy by hour in 1.6910 seconds
pickup_hour avg_distance avg_fare trip_count
0 0 2.998120 14.997462 2083481
1 1 3.000969 14.998814 2084650
2 2 3.003834 15.001777 2081962
3 3 3.001263 14.998196 2081715
4 4 2.998343 14.999593 2083882
5 5 2.997586 15.003988 2083421
6 6 2.999887 15.011615 2083213
7 7 3.000240 14.996892 2085156
8 8 3.002640 15.000326 2082704
9 9 2.999857 14.997857 2082284
虚拟列(计算列)允许在不复制数据的情况下添加列:
df_vaex['tip_percentage'] = (df_vaex.tip_amount / df_vaex.fare_amount) * 100
df_vaex['is_generous_tipper'] = df_vaex.tip_percentage > 20
df_vaex['rush_hour'] = (df_vaex.pickup_hour >= 7) & (df_vaex.pickup_hour <= 9) | \
(df_vaex.pickup_hour >= 17) & (df_vaex.pickup_hour <= 19)
这些列即时计算,无内存开销:
print("Added 3 virtual columns with zero memory overhead")
generous_tippers = df_vaex[df_vaex.is_generous_tipper]
print(f"Generous tippers (>20% tip): {len(generous_tippers):,}")
rush_hour_trips = df_vaex[df_vaex.rush_hour]
print(f"Rush hour trips: {len(rush_hour_trips):,}")
输出结果:
VIRTUAL COLUMNS
Added 3 virtual columns with zero memory overhead
Generous tippers (>20% tip): 11,997,433
Rush hour trips: 12,498,848
相关性分析:
corr = df_vaex.correlation(df_vaex.trip_distance, df_vaex.fare_amount)
print(f"Correlation (distance vs fare): {corr:.4f}")
分位数计算:
try:
percentiles = df_vaex.percentile_approx('fare_amount', [25, 50, 75, 90, 95, 99])
except AttributeError:
percentiles = [
df_vaex.fare_amount.quantile(0.25),
df_vaex.fare_amount.quantile(0.50),
df_vaex.fare_amount.quantile(0.75),
df_vaex.fare_amount.quantile(0.90),
df_vaex.fare_amount.quantile(0.95),
df_vaex.fare_amount.quantile(0.99),
]
print(f"\nFare percentiles:")
print(f"25th: ${percentiles[0]:.2f}")
print(f"50th (median): ${percentiles[1]:.2f}")
print(f"75th: ${percentiles[2]:.2f}")
print(f"90th: ${percentiles[3]:.2f}")
print(f"95th: ${percentiles[4]:.2f}")
print(f"99th: ${percentiles[5]:.2f}")
标准差计算:
std_fare = df_vaex.fare_amount.std()
print(f"\nStandard deviation of fares: ${std_fare:.2f}")
其他常用统计信息:
print(f"\nAdditional statistics:")
print(f"Mean: ${df_vaex.fare_amount.mean():.2f}")
print(f"Min: ${df_vaex.fare_amount.min():.2f}")
print(f"Max: ${df_vaex.fare_amount.max():.2f}")
输出结果:
Correlation (distance vs fare): -0.0001
Fare percentiles:
25th: $11.57
50th (median): $nan
75th: $nan
90th: $nan
95th: $nan
99th: $nan
Standard deviation of fares: $4.74
Additional statistics:
Mean: $15.00
Min: $1.25
Max: $55.30
导出过滤后的数据:
# 筛选高价值行程
high_value_trips = df_vaex[df_vaex.total_amount > 50]
导出为多种格式:
start = time.time()
high_value_trips.export_hdf5('high_value_trips.hdf5')
export_time = time.time() - start
print(f"Exported {len(high_value_trips):,} rows to HDF5 in {export_time:.4f}s")
也可导出为CSV、Parquet等格式:
high_value_trips.export_csv('high_value_trips.csv')
high_value_trips.export_parquet('high_value_trips.parquet')
输出结果:
Exported 13,054 rows to HDF5 in 5.4508s
print("VAEX PERFORMANCE SUMMARY")
print(f"Dataset size: {n:,} rows")
print(f"File size on disk: ~2.4 GB")
print(f"RAM usage: ~0 MB (memory-mapped)")
print()
print(f"Open time: {vaex_time:.4f} seconds")
print(f"Single aggregation: {agg_time:.4f} seconds")
print(f"Simple filter: {filter_time:.4f} seconds")
print(f"Complex filter: {multi_filter_time:.4f} seconds")
print(f"GroupBy operation: {groupby_time:.4f} seconds")
print()
print(f"Throughput: ~{n/groupby_time:,.0f} rows/second")
输出结果:
VAEX PERFORMANCE SUMMARY
Dataset size: 50,000,000 rows
File size on disk: ~2.4 GB
RAM usage: ~0 MB (memory-mapped)
Open time: 0.0199 seconds
Single aggregation: 0.8771 seconds
Simple filter: 0.0486 seconds
Complex filter: 0.0582 seconds
GroupBy operation: 5.6362 seconds
Throughput: ~8,871,262 rows/second
当你处理大于1GB、无法放入RAM的大规模数据集、探索大数据、对百万行数据进行特征工程,或构建数据预处理流水线时,Vaex是理想选择。
以下场景不建议使用Vaex:
数据集小于100MB:此时使用Pandas更简单。
需对多个表格进行复杂连接:使用结构化查询语言(SQL)数据库更佳。
需使用完整Pandas API:Vaex的兼容性有限。
处理实时流数据:需使用其他更合适的工具。
Vaex填补了Python数据科学生态系统的一个空白:无需将所有数据加载到内存,就能高效、交互式地处理十亿行数据集。其核外架构、惰性执行模型和优化算法,使其成为即便在笔记本电脑上也能进行大数据探索的强大工具。无论你是在探索海量日志、科学调查数据还是高频时间序列,Vaex都能帮助你在易用性和大数据扩展性之间搭建桥梁。

扫码加好友,拉您进群



收藏
