0

0

Dagster资产间数据传递与用户配置管理教程

碧海醫心

碧海醫心

发布时间:2025-11-29 13:05:00

|

500人浏览过

|

来源于php中文网

原创

Dagster资产间数据传递与用户配置管理教程

本教程旨在解决dagster中常见的资产间数据传递和用户自定义配置(config)使用问题。通过详细解析错误案例,展示如何正确地将上游资产的输出作为参数传递给下游资产,并有效利用config对象接收用户定义的运行时参数,从而构建健壮、可配置的dagster数据管道,避免`dagsterinvalidconfigerror`等配置与数据流错误。

在数据工程实践中,我们经常需要构建可配置的数据管道,允许用户在运行时输入参数,例如数据拉取的起始日期或特定的筛选条件。同时,数据管道中的各个步骤(在Dagster中通常表现为“资产”)之间需要高效、明确地传递数据。然而,在Dagster中,如果不正确地处理用户配置和资产间的数据流,可能会遇到诸如DagsterInvalidConfigError之类的错误。本教程将深入探讨如何正确地实现这些功能。

理解Dagster中的资产与配置

Dagster的核心理念之一是“软件定义资产”(Software-Defined Assets)。每个资产都代表数据系统中的一个逻辑实体,并且可以定义其如何被计算。资产之间的依赖关系和数据流是其关键特性。

用户自定义配置 (Config)

Dagster通过Config类提供了一种声明式的方式来定义资产在运行时所需的参数。这些参数可以在Dagster UI中由用户输入,或通过编程方式提供。

from dagster import Config

class FruitConfig(Config):
    fruit_select: str

上述代码定义了一个名为FruitConfig的配置对象,它包含一个字符串类型的参数fruit_select。当一个资产需要此配置时,它会在其函数签名中声明一个类型为FruitConfig的参数。

资产间的数据传递

在Dagster中,资产的输出是其下游资产的输入。这种传递不是通过在下游资产中“调用”上游资产函数来实现的,而是通过将上游资产的输出作为参数注入到下游资产的函数中。

常见错误模式与原因分析

考虑以下不正确的Dagster资产定义,它试图使用用户配置并传递数据:

# 错误示例:不正确的资产定义
import pandas as pd
# ... 其他导入 ...
from dagster import asset, Config

# ... generate_dataset 资产定义 (与正确示例相同,略) ...

class fruit_config(Config):
    fruit_select: str 

@asset(deps=[generate_dataset]) # deps在这里用于数据传递是错误的
def filter_data(config: fruit_config):
    # 错误:不应在此处调用上游资产来获取数据
    df = generate_dataset() 
    df2 = df[df['fruit'] == config.fruit_select]
    print(df2)
    return df2

@asset(deps=[filter_data]) # deps在这里用于数据传递是错误的
def filter_again():
    # 错误:不应在此处调用上游资产来获取数据
    df2 = filter_data() 
    df3 = df2[df2['units'] > 5]
    print(df3)
    return df3

上述代码存在以下主要问题:

  1. 错误的资产数据获取方式: 在filter_data资产中,通过直接调用generate_dataset()来获取上游数据是错误的。在Dagster的资产模型中,上游资产的输出会作为参数自动注入到下游资产中。直接调用会导致每次运行时都重新执行上游资产,并且无法正确建立数据流依赖。同样的问题也存在于filter_again资产中。
  2. deps参数的误用: @asset装饰器中的deps参数用于声明“非数据流”依赖,即一个资产的执行依赖于另一个资产的完成,但不需要其输出数据。如果需要传递数据,应通过函数参数显式声明。
  3. 潜在的配置解析问题: 当filter_data试图在内部调用generate_dataset()时,Dagster的运行时可能无法正确解析filter_data所需的配置,因为其输入签名与实际的数据流期望不符,从而引发DagsterInvalidConfigError。

正确实现:数据流与配置的结合

为了正确地实现用户配置和资产间的数据传递,我们需要遵循Dagster的推荐模式:

意兔-AI漫画相机
意兔-AI漫画相机

照片变漫画手绘,做周边好物

下载
  1. 将上游资产的输出作为参数传递给下游资产。
  2. 为资产函数的参数和返回值添加类型提示,增强可读性和运行时检查。
  3. 将Config对象作为参数传递给需要配置的资产。

以下是修正后的代码示例:

import pandas as pd
import random
from datetime import datetime, timedelta
from dagster import asset, Config, materialize # materialize用于本地测试

# 1. 定义生成原始数据集的资产
@asset 
def generate_dataset() -> pd.DataFrame:
    """
    生成一个包含水果、单位和日期的随机数据集。
    """
    def random_dates(start_date, end_date, n=10):
        date_range = end_date - start_date
        return [start_date + timedelta(days=random.randint(0, date_range.days)) for _ in range(n)]

    random.seed(42) # 保证可复现性
    num_rows = 100
    fruits = ['Apple', 'Banana', 'Orange', 'Grapes', 'Kiwi']

    df = pd.DataFrame({
        'fruit': [random.choice(fruits) for _ in range(num_rows)],
        'units': [random.randint(1, 10) for _ in range(num_rows)],
        'date': random_dates(datetime(2022, 1, 1), datetime(2022, 12, 31), num_rows)
    })
    print("Generated Dataset:")
    print(df.head())
    return df

# 2. 定义用户配置类
class FruitConfig(Config):
    """
    用户自定义配置,用于选择要筛选的水果。
    """
    fruit_select: str 

# 3. 定义筛选数据的资产,接收上游资产输出和用户配置
@asset 
def filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
    """
    根据用户配置的fruit_select筛选数据集。

    Args:
        generate_dataset (pd.DataFrame): 上游资产generate_dataset的输出。
        config (FruitConfig): 用户提供的配置对象。

    Returns:
        pd.DataFrame: 筛选后的数据集。
    """
    # 直接使用传入的generate_dataset参数
    df_filtered = generate_dataset[generate_dataset['fruit'] == config.fruit_select]
    print(f"\nFiltered Data for '{config.fruit_select}':")
    print(df_filtered.head())
    return df_filtered

# 4. 定义再次筛选的资产,接收上游资产输出
@asset
def filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
    """
    对上游filter_data资产的输出进行二次筛选(单位大于5)。

    Args:
        filter_data (pd.DataFrame): 上游资产filter_data的输出。

    Returns:
        pd.DataFrame: 再次筛选后的数据集。
    """
    # 直接使用传入的filter_data参数
    df_final = filter_data[filter_data['units'] > 5]
    print("\nFinal Filtered Data (units > 5):")
    print(df_final.head())
    return df_final

# 示例:如何在本地运行包含配置的资产
if __name__ == "__main__":
    # 使用materialize函数在本地运行资产
    # 传递Config的方式是嵌套在'ops'字典中,对应资产名和'config'键
    result = materialize(
        [generate_dataset, filter_data, filter_again],
        run_config={
            "ops": {
                "filter_data": {
                    "config": {
                        "fruit_select": "Banana" # 用户在此处定义参数
                    }
                }
            }
        }
    )
    assert result.success
    print("\nPipeline executed successfully!")

代码解析与最佳实践

  1. 资产函数签名:

    • filter_data(generate_dataset: pd.DataFrame, config: FruitConfig) -> pd.DataFrame:
      • generate_dataset: pd.DataFrame:明确表示filter_data资产依赖于名为generate_dataset的上游资产的输出,并且该输出的类型是pd.DataFrame。Dagster运行时会自动将generate_dataset资产的返回值注入到此参数中。
      • config: FruitConfig:声明此资产需要一个FruitConfig类型的配置对象。用户在Dagster UI或通过run_config提供的值将填充此对象。
      • -> pd.DataFrame:这是Python的类型提示,表明filter_data资产将返回一个pd.DataFrame对象。这对于Dagster理解资产的输出类型至关重要,也增强了代码的可读性。
    • filter_again(filter_data: pd.DataFrame) -> pd.DataFrame:
      • 同样地,filter_again资产接收filter_data资产的输出作为其输入参数。
  2. 移除deps参数:

    • 在正确实现中,@asset装饰器不再需要deps参数来表示数据流依赖。当一个资产函数的参数与另一个资产的名称匹配时,Dagster会自动识别并建立数据流依赖。
  3. 本地运行与配置:

    • 在if __name__ == "__main__":块中,展示了如何使用materialize函数在本地运行这些资产。
    • run_config字典用于提供运行时配置。对于资产级别的配置,它需要嵌套在"ops"键下,然后是资产名称,再是"config"键,最后是配置参数。例如,{"ops": {"filter_data": {"config": {"fruit_select": "Banana"}}}}。

总结

通过本教程,我们学习了在Dagster中构建可配置数据管道的关键原则:

  • 明确的资产输入/输出: 使用函数参数来接收上游资产的输出和用户配置。
  • 类型提示: 强烈建议为资产函数的参数和返回值添加类型提示,这不仅提高了代码的可读性,也帮助Dagster在运行时进行验证。
  • 正确使用Config: 将Config对象作为资产函数的参数,Dagster会自动处理配置的解析和注入。
  • 避免在下游资产中直接调用上游资产: 这种做法违背了Dagster的数据流模型,会导致错误和低效。

遵循这些最佳实践,可以有效地避免常见的配置和数据流错误,构建出更加健壮、可维护的Dagster数据管道。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
if什么意思
if什么意思

if的意思是“如果”的条件。它是一个用于引导条件语句的关键词,用于根据特定条件的真假情况来执行不同的代码块。本专题提供if什么意思的相关文章,供大家免费阅读。

847

2023.08.22

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

761

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

221

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1570

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

651

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

1228

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

1205

2024.04.29

go语言字符串相关教程
go语言字符串相关教程

本专题整合了go语言字符串相关教程,阅读专题下面的文章了解更多详细内容。

193

2025.07.29

TypeScript类型系统进阶与大型前端项目实践
TypeScript类型系统进阶与大型前端项目实践

本专题围绕 TypeScript 在大型前端项目中的应用展开,深入讲解类型系统设计与工程化开发方法。内容包括泛型与高级类型、类型推断机制、声明文件编写、模块化结构设计以及代码规范管理。通过真实项目案例分析,帮助开发者构建类型安全、结构清晰、易维护的前端工程体系,提高团队协作效率与代码质量。

49

2026.03.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 22.5万人学习

Django 教程
Django 教程

共28课时 | 5万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号