
本文档旨在解决在Snowflake中使用已保存的编码器进行模型推理时遇到的 "ufunc 'isnan' not supported" 错误。我们将深入探讨问题的原因,并提供一种可行的解决方案,包括代码示例和关键步骤,以确保成功地将编码器和模型部署到Snowflake并进行推理。
问题分析
在Snowflake中部署机器学习模型时,一个常见的挑战是如何正确地保存和加载预处理步骤,例如One-Hot Encoding和Ordinal Encoding。当模型和编码器被保存到Snowflake并尝试在UDF(用户定义函数)中使用时,可能会遇到 "ufunc 'isnan' not supported" 错误。
这个错误通常表明在编码器的转换过程中,输入数据包含了编码器在训练时未遇到的值,或者数据类型不匹配,导致isnan函数无法处理。具体来说,问题可能源于以下几个方面:
- 数据类型不一致: UDF中接收的数据类型与模型训练时使用的数据类型不一致。
- 缺失值处理不当: 编码器在训练时没有正确处理缺失值,导致在推理时遇到缺失值时出错。
- 未知值处理: 推理数据中包含编码器在训练时未遇到的类别,且编码器没有正确配置handle_unknown参数。
- API调用错误: 混淆了Snowpark API和scikit-learn API的使用,导致数据转换方式不正确。
解决方案
以下步骤提供了一种解决该问题的完整方案:
1. 编码器和模型的保存
首先,确保使用joblib库正确地保存编码器和模型。以下代码展示了如何将OneHotEncoder、OrdinalEncoder和XGBoost模型保存到Snowflake的stage中:
from joblib import dump
def save_object(object_,filename,stagename,auto_compress=True):
dump(object_, filename)
session.file.put(filename, stagename, overwrite=True,auto_compress=auto_compress)
return
# Extract model object
xgb_model = xgb.to_xgboost()
ohe_obj = ohe.to_sklearn()
oe_obj = oe.to_sklearn()
save_object(xgb_model,'xgb_model.joblib','@AM_TEST_MODELS')
save_object(ohe_obj,'one_hot_encode.joblib','@AM_TEST_MODELS',auto_compress=False)
save_object(oe_obj,'ordinal_encode.joblib','@AM_TEST_MODELS',auto_compress=False)注意:
- auto_compress=False 可以避免压缩编码器文件,有时可以解决加载问题。
- 使用@AM_TEST_MODELS作为stage名称,请根据实际情况修改。
2. UDF的创建和配置
创建UDF时,需要注意以下几点:
- 导入必要的库: 在UDF中导入pandas、joblib、sklearn和xgboost等库。
- 加载编码器和模型: 使用joblib.load从Snowflake的import directory加载编码器和模型。
- 数据类型转换: 确保UDF中接收的数据类型与模型训练时使用的数据类型一致。
- 错误处理: 添加适当的错误处理机制,以便在出现问题时能够及时发现和解决。
以下是一个UDF的示例代码:
import cachetools
@cachetools.cached(cache={})
def read_file(filename):
import sys
import os
import joblib
# Get the "path" of where files added through iport are avalible
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m
@F.udf(
name='predict_package_mix_p',session=session,replace=True,
is_permanent=True,stage_location='@AM_TEST_UDFS',
)
def predict_package_mix_p(
df:PandasDataFrame[int,str,str,str,str,str,int]
) -> PandasSeries[float]:
import sys
import pandas as pd
from joblib import load
import sklearn
import xgboost as xgb
import json
import snowflake.ml.modeling
def transform_simple_target_encode_manual(
df,transform_col,transform_df
):
df = df.merge(transform_df, on=transform_col)
return df
def remove_space(df):
cols = df.columns
space_cols = [x for x in cols if ' ' in x]
for c in space_cols:
new_col = c.replace(" ","_")
df = df.rename(columns={c:new_col})
return df
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
ohe = read_file('one_hot_encode.pkl')
oe = read_file('ordinal_encode.pkl')
te = pd.read_csv(import_dir + 'target_encoding.csv.gz')
model = read_file('xgb_model.pkl.gz')
print('loaded models')
features = [
"LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
"ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL",
"ARRIVAL_DATETIME_LOCAL","CAPACITY"
]
df.columns = features
print('loaded dataframe')
# transform data for one hot and ordinal encodings
df_ohe = ohe.transform(df[['ROUTE_CATEGORY_NAME']])
encoded_df = pd.DataFrame(df_ohe, columns=ohe.categories_)
encoded_df.columns = encoded_df.columns.get_level_values(0)
encoded_df = encoded_df.add_prefix('ROUTE_NAME_OHE_')
df = pd.concat([df, encoded_df], axis=1)
df['DEPART_CODE_ENCODE'] = oe.transform(df[['DEPARTURE_AIRPORT_CODE']])
print('transformed via one hot and ordinal')
# transform using pre-set target encoding
df_te = transform_simple_target_encode_manual(df,'ARRIVAL_AIRPORT_CODE',te)
df_final = remove_space(df_te)
print('transformed via target encode')
# change date cols to datetime
df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'] = pd.to_datetime(
df_final.loc[:,'DEPARTURE_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
df_final['ARRIVAL_DATETIME_LOCAL'] = pd.to_datetime(
df_final['ARRIVAL_DATETIME_LOCAL'],format='%Y-%m-%d %H:%M:%S',yearfirst=True
)
print('transformed dates')
df_final['DEPART_HOUR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.hour
# snowpark function goes from 1-7 whereas pandas goes from 0-6
df_final['DEPART_WEEKDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_week + 1
df_final['DEPART_MONTHDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day
df_final['DEPART_YEARDAY'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.day_of_year
df_final['DEPART_MONTH'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.month
df_final['DEPART_YEAR'] = df_final['DEPARTURE_DATETIME_LOCAL'].dt.year
df_final['ARRIVE_HOUR'] = df_final['ARRIVAL_DATETIME_LOCAL'].dt.hour
print('created features')
pm = pd.Series(model.predict(df_final[
["DEPART_CODE_ENCODE","ROUTE_NAME_OHE_CITY","ROUTE_NAME_OHE_FAR_SUN",
"ROUTE_NAME_OHE_SKI","ROUTE_NAME_OHE_SUN","CAPACITY",
"ARRIVAL_AIRPORT_CODE_ENCODED","DEPART_HOUR",
"DEPART_WEEKDAY","DEPART_MONTHDAY","DEPART_YEARDAY",
"DEPART_MONTH","DEPART_YEAR","ARRIVE_HOUR"]
]))
return pm关键点:
- @cachetools.cached装饰器: 使用cachetools库的@cached装饰器可以缓存加载的编码器和模型,避免每次调用UDF时都重新加载,提高性能。
- sys._xoptions: 使用sys._xoptions获取import directory,确保可以正确加载Snowflake stage中的文件。
- API调用: 确保在UDF中使用的是scikit-learn API(例如ohe.transform(df[['ROUTE_CATEGORY_NAME']])),而不是Snowpark API。
- 列名转换: 注意Snowflake会将列名转换为大写,因此在UDF中需要使用大写列名。
- 数据预处理: 在UDF中完成所有必要的数据预处理步骤,包括One-Hot Encoding、Ordinal Encoding和日期时间特征提取。
3. 推理
创建UDF后,可以使用Snowpark DataFrame调用UDF进行推理:
test_df = session.create_dataframe(
[[979152,"LBA","ALC","SUN","2023-11-24 08:30:00","2023-11-24 12:25:00",189],
[987073,"LBA","FAO","SUN","2023-12-13 16:15:00","2023-12-13 11:25:00",189],
[951384,"STN","FNC","FAR SUN","2023-12-05 09:40:00","2023-12-05 13:35:00",189],
[952380,"MAN","LPA","FAR SUN","2023-12-22 19:45:00","2023-12-22 14:30:00",235],
[963602,"MAN","FUE","FAR SUN","2023-12-29 10:30:00","2023-12-29 15:05:00",235]],
schema=[
"LS1_FLIGHT_ID","DEPARTURE_AIRPORT_CODE","ARRIVAL_AIRPORT_CODE",
"ROUTE_CATEGORY_NAME","DEPARTURE_DATETIME_LOCAL","ARRIVAL_DATETIME_LOCAL","CAPACITY"
]
)
test_df.withColumn(
'PREDICTED_PACKAGE_MIX',
predict_package_mix_p([*test_df])).show()注意:
- session.add_import 用于添加UDF所需的依赖文件。
- session.add_packages 用于添加UDF所需的Python包。
- 确保添加所有必需的依赖项,否则UDF可能会失败。
总结
通过遵循以上步骤,可以解决在Snowflake中使用已保存的编码器进行模型推理时遇到的 "ufunc 'isnan' not supported" 错误。关键在于正确地保存和加载编码器和模型,并在UDF中进行必要的数据预处理和类型转换。此外,使用cachetools库可以提高UDF的性能。
注意事项:
- 确保本地环境和Snowflake环境的Python包版本一致。
- 在部署UDF之前,务必进行充分的测试,以确保其正常工作。
- 监控UDF的性能,并根据需要进行优化。
希望本文档能够帮助您成功地在Snowflake中部署机器学习模型并进行推理。










