towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
随着大型语言模型(LLMs)的广泛应用,其部署与托管方式也在不断演进。由于模型体积庞大以及对硬件资源高效利用的需求,LLM 推理服务在实际应用中面临诸多挑战。同时,不同场景下的延迟需求也存在差异——部分任务要求即时响应,而另一些则可以接受接近实时的处理时间。
针对非严格低延迟、尤其是偏向离线或半离线性质的推理任务,
SageMaker 异步推理 提供了一个高效的解决方案。顾名思义,该功能适用于那些不需要亚秒级响应、但仍需按需触发并具备可扩展性的应用场景。在 LLM 的使用中,诸如文本摘要、内容生成和编辑等任务正越来越多地采用此类模式。这些任务虽无需即时返回结果,但又不适合完全通过批处理方式执行,因此异步端点成为理想选择,它填补了 SageMaker 批处理转换 与实时服务之间的空白。
本文将演示如何结合使用 HuggingFace 文本生成推理服务器(TGI) 与 Amazon SageMaker 的异步推理能力,来部署 Flan-T5-XXL 模型。
注意:本文默认读者已掌握 Python 编程基础,并了解大型语言模型及 Amazon SageMaker 的基本概念。初次接触 SageMaker 推理功能的用户可参考官方入门指南进行学习。虽然我们会简要介绍异步推理的核心机制,但更深入的内容建议查阅相关示例文档,本文实现也将在此基础上展开。
免责声明:作者为 AWS 机器学习架构师,文中观点仅代表个人立场,不代表公司官方态度。
Amazon SageMaker 目前提供四种主要的推理部署模式,可根据具体业务需求灵活选用。其中三项依赖于持续运行的端点,另一项则专为完全离线任务设计:
异步推理恰好处于批量处理与实时服务之间,兼具两者的优点:既不像批量转换那样完全断开连接,也不像实时端点那样始终占用资源。对于诸如文章摘要、创意写作、文本润色等 LLM 应用而言,这类任务通常由事件驱动、调用时间不确定,但又不能容忍长时间等待,因此非常适合采用异步推理架构。
从性能和经济性角度综合考量,异步推理为中等延迟容忍度的模型服务提供了平衡之选。
以本次实践为例,我们将部署广受欢迎的 Flan 系列模型之一 —— Flan-T5-XXL,借助 SageMaker 异步端点完成部署。创建流程与实时端点类似,关键区别在于输入输出方式:异步调用需指定 S3 存储路径用于读取输入数据和写入推理结果,而非直接传递 JSON 负载。
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
如上图所示,这是典型的异步推理架构(作者提供)。值得注意的是,SageMaker 异步端点内部包含一个请求队列。每当提交新的推理任务,系统会将其加入队列,并在处理完成后将输出写入指定的 S3 位置。此外,当配置了自动扩展策略后,系统将依据队列中的请求数量动态调整计算实例的数量。
为进一步提升可观测性,您还可以选择集成 SNS 主题,以便在推理成功或失败时接收通知,避免频繁轮询 S3 输出路径。
现在我们已经对异步推理的基本原理有了清晰认识,接下来进入具体实现环节。
本实验将在全新的 SageMaker Studio 环境中进行,选用 Base Python3 内核,并以 ml.c5.xlarge 实例作为开发主机。整个部署流程将通过 Boto3 AWS SDK 和高级封装的 SageMaker Python SDK 来协调完成。
在进行异步推理时,首先需要设定一个用于存储输出结果的 S3 路径。以下代码将创建一个默认的 S3 存储路径:
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
bucket_prefix = "async-llm-output"
async_output_path = f"s3://{default_bucket}/{bucket_prefix}/output"
print(f"My model inference outputs will be stored at this S3 path: {async_output_path}")
完成路径定义后,接下来配置异步推理参数。通过 AsyncInferenceConfig 可以指定输出路径及并发调用限制。此处未设置 SNS 主题,但可根据需求选择性添加,以便接收推理成功或失败的通知信息。
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
async_config = AsyncInferenceConfig(
output_path=async_output_path,
max_concurrent_invocations_per_instance=10,
# notification_config = {
# "SuccessTopic": "arn:aws:sns:<aws-region>:<account-id>:<topic-name>",
# "ErrorTopic": "arn:aws:sns:<aws-region>:<account-id>:<topic-name>",
# }
)
随后,我们可以从 HuggingFace Hub 直接获取 Flan T5-XXL 模型,并构建适用于 SageMaker 部署的模型对象。该过程使用了基于 文本生成推理 的模型服务器,支持如张量并行等内置优化机制。
# 获取 HuggingFace 模型部署代码并集成异步配置
hub = {
'HF_MODEL_ID': 'google/flan-t5-xxl',
'SM_NUM_GPUS': json.dumps(4)
}
huggingface_model = HuggingFaceModel(
image_uri=get_huggingface_llm_image_uri("huggingface", version="1.1.0"),
env=hub,
role=role,
)
接着,结合已定义的模型和异步配置,即可部署生成端点。部署过程中指定了实例类型、初始实例数量以及容器启动健康检查超时时间。
# 将模型部署至 SageMaker 推理环境
predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type="ml.g5.12xlarge",
container_startup_health_check_timeout=300,
async_inference_config=async_config
)
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
端点创建完成后,可在 AWS 控制台中确认其类型为“异步推理端点”。
b. 发起异步推理请求
对于单次请求,可以使用与实时推理相同的 .predict 方法来调用异步端点。以下示例展示了如何构造输入数据并提交推理任务:
# 单次调用示例
payload = "What is the capitol of the United States?"
input_data = {
"inputs": payload,
"parameters": {
"early_stopping": True,
"length_penalty": 2.0,
"max_new_tokens": 50,
"temperature": 1,
"min_length": 10,
"no_repeat_ngram_size": 3,
},
}
predictor.predict(input_data)
为了将异步推理应用于实际场景,我们需要通过 S3 中的数据来调用推理端点。异步推理的核心机制是:用户可将多个请求文件存入指定的输入 S3 存储桶中,随后发起调用,系统会为每个数据点生成对应的输出结果文件,并保存至指定的 S3 输出路径。需要注意的是,这种模式与批量转换不同——在批量转换中,整个数据集被一次性处理且无法按需触发;而异步推理支持按需调用端点,具备更高的灵活性。
接下来,我们将构建一个包含相同数据样本的合成数据集,并将其上传至 S3,用于演示该流程。首先,以下代码将创建一个本地目录,用于存放输入文件:
import json
import os
output_directory = 'inputs'
os.makedirs(output_directory, exist_ok=True)
for i in range(1, 20):
json_data = [input_data.copy()]
file_path = os.path.join(output_directory, f'input_{i}.jsonl')
with open(file_path, 'w') as input_file:
for line in json_data:
json.dump(line, input_file)
input_file.write('\n')
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
接着,使用如下定义的工具函数,将这些本地生成的文件上传至 S3,以便后续进行异步推理。该函数利用 SageMaker 会话对象执行上传操作,并确保内容类型正确设置为 JSON 格式:
def upload_file(input_location):
prefix = f"{bucket_prefix}/input"
return sagemaker_session.upload_data(
input_location,
bucket=default_bucket,
key_prefix=prefix,
extra_args={"ContentType": "application/json"}
)
我们选取其中一个文件进行上传示例:
sample_data_point = upload_file("inputs/input_1.jsonl")
print(f"Sample data point uploaded: {sample_data_point}")
[此处为图片2]
完成上传后,即可通过 Boto3 提供的 invoke_endpoint_async API 在指定的 S3 路径上启动异步推理任务。具体调用方式如下:
import boto3
runtime = boto3.client("sagemaker-runtime")
response = runtime.invoke_endpoint_async(
EndpointName=predictor.endpoint_name,
InputLocation=sample_data_point,
Accept='application/json',
ContentType="application/json"
)
output_location = response["OutputLocation"]
print(f"OutputLocation: {output_location}")
[此处为图片3]
最后,为了获取推理结果,我们引入一个轮询函数来持续检查 S3 输出路径,直到目标文件生成为止。由于大语言模型执行推理可能需要一定时间,因此该函数会在文件尚未就绪时等待并重试。其实现逻辑如下:
import urllib, time
from botocore.exceptions import ClientError
def get_output(output_location):
output_url = urllib.parse.urlparse(output_location)
bucket = output_url.netloc
key = output_url.path[1:]
while True:
try:
return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
print("waiting for output...")
time.sleep(60)
continue
raise
output = get_output(output_location)
[此处为图片4]
inferences = []
for i in range(1,20):
input_file = f"inputs/input_{i}.jsonl"
input_file_s3_location = upload_file(input_file)
print(f"Invoking Endpoint with {input_file}")
async_response = predictor.predict_async(input_path=input_file_s3_location)
output_location = async_response.output_path
print(output_location)
inferences += [(input_file, output_location)]
time.sleep(0.5)
for input_file, output_location in inferences:
output = get_output(output_location)
print(f"Input File: {input_file}, Output: {output}")
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
在异步推理场景中,自动扩展功能同样是通过 应用程序自动扩展(Application Auto Scaling) 进行管理的,这一点与实时推理类似。然而,关键区别在于可用于触发扩展的监控指标不同。
异步推理内置了一个任务队列机制。因此,在自动扩展策略中,我们可以依据该队列中的待处理任务数量来动态调整实例规模。这一指标在 CloudWatch 中体现为 “ApproximateBackLogSize”,它反映了当前尚未完成处理的请求总量——包括正在处理和等待处理的项目。
我们可以通过 Boto3 SDK 来配置相应的扩展策略,方式与实时推理基本一致。值得注意的是,异步端点支持将最小实例数设置为零,从而实现完全按需启动,节省资源成本。
client = boto3.client(
"application-autoscaling"
) # Application Auto Scaling 客户端,适用于 SageMaker 等服务
resource_id = (
"endpoint/" + predictor.endpoint_name + "/variant/" + "AllTraffic"
) # 自动扩展系统引用端点的标准格式
# 配置可扩展目标:异步端点支持实例数缩至零
response = client.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
MinCapacity=0,
MaxCapacity=5,
)
设定好实例的最小和最大容量后,接下来需要配置伸缩策略的冷却时间和目标追踪逻辑。在此处,我们将监控的指标明确指定为 “ApproximateBackLogSize”。
response = client.put_scaling_policy(
PolicyName="Invocations-ScalingPolicy",
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
PolicyType="TargetTrackingScaling",
TargetTrackingScalingPolicyConfiguration={
# 此处省略具体配置项,如目标值、预定义/自定义指标等
}
)
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
{
"TargetValue": 5.0,
# 目标值设定为5,用于控制指标 SageMakerVariantInvocationsPerInstance 的自动扩缩行为。
"CustomizedMetricSpecification": {
"MetricName": "ApproximateBacklogSizePerInstance",
"Namespace": "AWS/SageMaker",
"Dimensions": [
{
"Name": "EndpointName",
"Value": predictor.endpoint_name
}
],
"Statistic": "Average"
},
"ScaleInCooldown": 600,
# 缩容冷却时间设置为600秒,确保在一次缩容操作完成后,
# 系统会等待一段时间再进行下一次缩容,以便观察前序操作的影响。
# 可根据实例启动耗时或应用实际需求调整该值。
"ScaleOutCooldown": 100
# 扩容冷却时间为100秒,表示一次扩容动作结束后,
# 需等待100秒才能触发新的扩容流程。
# 'DisableScaleIn': True|False —— 控制是否禁用目标追踪策略的缩容功能。
# 若设为 true,则策略不会减少可扩展资源的容量。
}
为了验证自动扩展机制的有效性,我们可以在指定时间段内持续发送预测请求。需要特别说明的是,依据上述配置的扩展策略,系统将监控队列中待处理或正在处理的请求数量,并将其维持在目标值5左右——即每个实例平均承载约5个未完成调用。
towardsdatascience.com/deploying-large-language-models-with-sagemaker-asynchronous-inference-c00038b70b3e
以下代码段展示了如何在15分钟内循环发起推理请求:
request_duration = 60 * 15 # 持续15分钟
end_time = time.time() + request_duration
print(f"test will run for {request_duration} seconds")
while time.time() < end_time:
predictor.predict(input_data)
当停止发送新请求后,经过一段过渡期,系统会自动检测到负载下降,随后逐步缩减实例数量直至归零,同时后端队列中的积压请求也将被完全处理清空。
[此处为图片2]
关于更多参考资料与总结说明:
完整示例代码可参考 GitHub 项目路径:
SageMaker-Deployment/LLM-Hosting/Async-Inference-LLM/async-llm-tgi.ipynb at master ·…
本文介绍了 Amazon SageMaker 的异步推理功能,适用于那些既非严格实时、也不属于传统批量处理的 LLM 部署场景。通过此方法,能够更灵活地实现大语言模型的大规模推理服务部署。希望本篇内容为您提供了新的思路,也欢迎持续关注该领域后续的技术演进。
感谢您的阅读,欢迎提出宝贵意见与建议。
扫码加好友,拉您进群



收藏
