
本文介绍如何将已部署在 vertex ai model registry 中的自定义预测例程(cpr)无缝接入 kubeflow pipelines,通过官方预构建组件完成端-to-end批量预测任务,避免手动构造模型对象导致的类型不匹配问题。
在 Vertex AI 中使用自定义预测例程(Custom Prediction Routine, CPR)时,模型逻辑被封装在容器镜像中,并注册至 Model Registry(而非存储于 GCS),因此无法通过常规 ImporterComponent(依赖 artifact_uri)引入 Pipeline。直接编写自定义 importer 并返回 aiplatform.Model 实例会导致类型错误——因为 ModelBatchPredictOp 期望的是符合 KFP 类型系统规范的 Model Artifact,而非原始 Python SDK 对象。
✅ 正确做法是:从 Model Registry 中以声明式方式获取已注册模型的 Artifact,推荐使用 ModelGetOp(v1.2.0+ 版本推荐)或兼容旧版的 GetVertexModelOp。
✅ 推荐方案:使用 ModelGetOp
该组件会根据模型资源名称(model_name)从 Vertex AI Model Registry 中检索模型,并输出标准 Model Artifact,与 ModelBatchPredictOp 完全兼容:
from google_cloud_pipeline_components.v1.model import ModelGetOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from kfp.dsl import pipeline
@pipeline(name="cpr-batch-prediction-pipeline")
def cpr_batch_prediction_pipeline(
project: str = "your-project-id",
location: str = "us-central1",
model_display_name: str = "my-cpr-model",
gcs_source_uris: list = ["gs://your-bucket/input/data.csv"],
gcs_destination_prefix: str = "gs://your-bucket/predictions/",
):
# Step 1: 从 Model Registry 获取模型(输出标准 Model Artifact)
get_model_task = ModelGetOp(
model_name=f"projects/{project}/locations/{location}/models/{model_display_name}",
project=project,
location=location,
)
# Step 2: 执行批量预测(自动适配 CPR 模型)
batch_predict_task = ModelBatchPredictOp(
job_display_name="cpr-batch-predict",
model=get_model_task.outputs["model"],
gcs_source_uris=gcs_source_uris,
gcs_destination_output_uri_prefix=gcs_destination_prefix,
instances_format="csv", # 支持 csv/jsonl/records等,需与CPR输入解析逻辑一致
predictions_format="jsonl", # 输出格式建议用 jsonl 便于后续解析
starting_replica_count=1,
max_replica_count=3,
machine_type="n1-standard-4",
)? 关键说明:model_name 必须为完整资源路径(如 projects/my-proj/locations/us-central1/models/1234567890123456789),但更推荐使用 display_name 查询后动态构造。若需运行时解析,可在 pipeline 外部预先获取(非 pipeline 内部调用 SDK),或使用 kfp.dsl.importer + artifact_uri 的替代路径(仅当模型启用“导出为 Artifact”功能时适用,CPR 默认不支持)。
⚠️ 注意事项
- ❌ 不要自行创建 aiplatform.Model 实例并试图注入 pipeline —— 这违反 KFP 的 Artifact 类型契约,必然触发 InconsistentTypeException。
- ✅ 确保模型已在 Model Registry 中成功注册(可通过 Console 或 aiplatform.Model.list() 验证)。
- ✅ instances_format 和 predictions_format 必须与 CPR 容器中 predict() 方法的输入/输出协议严格匹配(例如:若 CPR 接收 JSONL 则不可设为 csv)。
- ? 若使用较老版本 google-cloud-pipeline-components(
✅ 总结
将 CPR 集成进 Vertex AI Pipeline 的核心原则是:复用 Model Registry 作为可信模型源,借助官方类型安全的 ModelGetOp 组件桥接 SDK 世界与 Pipeline Artifact 世界。这种方式无需维护额外镜像、不绕过权限控制、天然支持模型版本追踪与审计,是生产级 MLOps 流水线的最佳实践。










