0

0

Kedro与Streamlit集成:动态数据目录下的管道运行实践

花韻仙語

花韻仙語

发布时间:2025-11-14 12:51:06

|

1037人浏览过

|

来源于php中文网

原创

Kedro与Streamlit集成:动态数据目录下的管道运行实践

本文旨在指导开发者如何在streamlit应用中集成并运行kedro数据管道,重点解决如何动态创建并传递自定义`datacatalog`以处理streamlit加载的数据。文章将阐明常见的错误尝试及其原因,并提供一种健壮的方法,通过`kedrosession.run()`的`data_catalog`参数正确地将运行时数据注入kedro管道,从而实现数据处理的无缝衔接。

在构建交互式数据应用时,将强大的数据管道框架(如Kedro)与灵活的Web应用框架(如Streamlit)结合是一种常见的需求。特别是当数据源是动态的,例如用户通过Streamlit界面上传文件时,我们需要一种机制来将这些运行时加载的数据作为输入,传递给Kedro管道进行处理。本文将详细介绍如何实现这一目标,并纠正在此过程中可能遇到的常见误区。

理解Kedro与Streamlit集成中的数据流

Kedro的核心概念之一是DataCatalog,它定义了数据加载和保存的方式。通常,DataCatalog在conf/base/catalog.yml中静态定义。然而,在Streamlit应用中,数据通常在运行时由用户上传,这意味着我们需要一个动态的DataCatalog来封装这些内存中的DataFrame。

目标是将Streamlit中加载的DataFrame包装成MemoryDataSet,然后构建一个临时的DataCatalog,并将其传递给Kedro管道执行。

常见错误尝试与原因分析

在尝试将自定义DataCatalog传递给Kedro管道时,开发者可能会遇到一些AttributeError。理解这些错误的原因对于正确实现集成至关重要。

错误尝试一:直接修改KedroContext或KedroSession的catalog属性

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import pandas as pd
import streamlit as st

# 假设 df1, df2, ... 是在Streamlit中加载的DataFrame
df1 = pd.DataFrame({'col1': [1, 2]})
df2 = pd.DataFrame({'col2': [3, 4]})

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        # 尝试直接设置context.catalog,这将导致AttributeError
        # context.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # 尝试直接设置session.catalog,同样会导致AttributeError
        # session.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # ...后续管道运行代码

原因分析:AttributeError: can't set attribute 'catalog'

KedroSession和KedroContext的catalog属性在Kedro内部被设计为只读。这意味着您不能在会话或上下文创建之后,通过直接赋值的方式来修改它们引用的DataCatalog对象。DataCatalog在KedroSession初始化时被加载并冻结,以确保管道执行的一致性和可预测性。尝试直接修改它会违反这一设计原则,从而引发AttributeError。

错误尝试二:通过KedroContext访问pipeline_registry

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.runner import SequentialRunner
import streamlit as st

# ... (数据加载和catalog创建) ...

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        runner = SequentialRunner()
        # 尝试通过context.pipeline_registry获取管道,这将导致AttributeError
        # runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=custom_catalog) # AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

原因分析:AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

KedroContext对象并不直接拥有pipeline_registry属性。管道注册是KedroSession负责管理的一部分。当您通过KedroSession.run()方法执行管道时,会话会自动处理管道的查找和注册。直接从KedroContext中访问pipeline_registry是不符合Kedro设计模式的。

LobeHub
LobeHub

LobeChat brings you the best user experience of ChatGPT, OLLaMA, Gemini, Claude

下载

正确的方法:通过KedroSession.run()传递自定义DataCatalog

Kedro提供了一种简洁且推荐的方式来在运行时注入自定义DataCatalog,即使用KedroSession.run()方法的data_catalog(或旧版本中的catalog)参数。

步骤一:在Streamlit中加载数据并创建MemoryDataSet

首先,在Streamlit应用中,您需要使用文件上传器或其他方式加载数据,并将其转换为Pandas DataFrame。然后,将这些DataFrame包装成Kedro的MemoryDataSet对象。MemoryDataSet是Kedro提供的一种数据集类型,用于处理内存中的数据,非常适合这种动态场景。

import streamlit as st
import pandas as pd
from kedro.io import DataCatalog, MemoryDataSet
from kedro.framework.session import KedroSession
from pathlib import Path

# 假设你的Kedro项目路径
project_path = Path(__file__).parent / "my_kedro_project" # 根据实际项目结构调整

st.title("Kedro与Streamlit数据处理应用")

uploaded_file1 = st.file_uploader("上传 Reagentes CSV", type=["csv"])
uploaded_file2 = st.file_uploader("上传 Balanço de Massas CSV", type=["csv"])
# ... 可以根据需要添加更多文件上传器

df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None

if uploaded_file1:
    df1 = pd.read_csv(uploaded_file1)
    st.write("Reagentes 数据加载成功:")
    st.dataframe(df1.head())

if uploaded_file2:
    df2 = pd.read_csv(uploaded_file2)
    st.write("Balanço de Massas 数据加载成功:")
    st.dataframe(df2.head())

# 假设还有其他文件加载,这里简化
# df3 = pd.DataFrame(...)
# df4 = pd.DataFrame(...)
# df5 = pd.DataFrame(...)
# df6 = pd.DataFrame(...)

步骤二:构建自定义DataCatalog

当所有必需的DataFrame都加载完毕后,您可以创建一个新的DataCatalog实例,并将这些MemoryDataSet对象作为键值对添加到其中。键名应与您的Kedro管道中期望的数据集名称相匹配。

# ... (承接上一步的代码) ...

if st.button('Processar Dados de Entrada'):
    if df1 is not None and df2 is not None: # 确保所有必要数据都已加载
        # 创建自定义DataCatalog
        custom_catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            # 根据需要添加更多数据集
            # "laboratorio_raw": MemoryDataSet(df3),
            # "laboratorio_raiox_raw": MemoryDataSet(df4),
            # "carta_controle_pims_raw": MemoryDataSet(df5),
            # "blend_raw": MemoryDataSet(df6)
        })

        st.info("正在执行Kedro管道...")

        try:
            # 步骤三:通过KedroSession.run()传递自定义DataCatalog
            with KedroSession.create(project_path=project_path) as session:
                session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

            st.success('数据处理成功!')

            # 步骤四:从自定义DataCatalog中加载处理后的结果
            # 假设管道输出一个名为 "merged_raw_data_process" 的数据集
            if "merged_raw_data_process" in custom_catalog.list():
                merged_data = custom_catalog.load("merged_raw_data_process")
                st.header('结果数据预览')
                st.dataframe(merged_data.head())

                # 假设结果数据中有一个时间戳列
                if 'timestamp_column' in merged_data.columns: # 请替换为实际的时间戳列名
                    last_update = merged_data['timestamp_column'].max()
                    st.write(f"最新数据时间: {last_update.strftime('%d/%m/%Y %H:%M')}")
            else:
                st.warning("管道未生成预期的 'merged_raw_data_process' 数据集。")

        except Exception as e:
            st.error(f"Kedro管道执行失败: {e}")
    else:
        st.warning("请上传所有必要的数据文件。")

步骤三:通过KedroSession.run()传递自定义DataCatalog

这是解决问题的关键步骤。KedroSession.run()方法接受一个data_catalog(或旧版本中的catalog)参数,允许您传入一个临时的DataCatalog实例。这个传入的DataCatalog会与项目默认的DataCatalog合并,或者在某些情况下完全覆盖默认的同名数据集定义,从而将您的内存数据注入到管道执行中。

# ... (代码片段已包含在步骤二中) ...
with KedroSession.create(project_path=project_path) as session:
    session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

请注意,pipeline_name参数用于指定要运行的特定管道。如果您的Kedro项目只有一个默认管道,可以省略此参数。

步骤四:从自定义DataCatalog中加载处理后的结果

管道执行完成后,如果您的管道配置为将结果保存到MemoryDataSet中(例如,通过在catalog.yml中将输出数据集定义为MemoryDataSet,或者在运行时通过custom_catalog覆盖),您可以直接从传入的custom_catalog中加载这些结果。

# ... (代码片段已包含在步骤二中) ...
merged_data = custom_catalog.load("merged_raw_data_process")

注意事项与最佳实践

  1. Kedro项目结构: 确保您的Streamlit应用能够正确找到Kedro项目的根目录(project_path)。通常,Streamlit应用可以放在Kedro项目之外,但需要正确指定project_path。
  2. MemoryDataSet的使用: MemoryDataSet非常适合处理临时数据。如果需要将处理结果持久化,您可以在DataCatalog中定义其他类型的数据集(如ParquetDataSet、CSVDataSet等),或者在管道的末尾手动保存DataFrame。
  3. 管道定义: 确保您的Kedro管道中的节点能够接收和处理MemoryDataSet提供的数据。输入数据集的名称必须与custom_catalog中定义的键名匹配。
  4. 错误处理: 在实际应用中,务必添加适当的错误处理机制,以捕获Kedro管道执行过程中可能出现的异常,并向用户提供友好的反馈。
  5. 性能考虑: 对于大型数据集,MemoryDataSet可能会消耗大量内存。根据您的数据规模和性能需求,可能需要考虑更高效的数据处理策略,例如分块处理或使用更适合大数据的Kedro数据集类型。
  6. Kedro版本兼容性: 本文中的data_catalog参数适用于较新版本的Kedro。如果您使用的是旧版本,可能需要使用catalog参数。请查阅您的Kedro版本文档以确认正确的参数名称。

总结

通过在KedroSession.run()方法中利用data_catalog参数,我们可以优雅地将Streamlit中加载的动态数据注入到Kedro管道中进行处理。这种方法避免了直接修改Kedro内部只读属性的错误,提供了一种符合Kedro设计哲学且易于维护的集成方案。遵循本文介绍的步骤和最佳实践,您将能够构建出功能强大、交互性强的数据处理应用。

相关专题

更多
Python 时间序列分析与预测
Python 时间序列分析与预测

本专题专注讲解 Python 在时间序列数据处理与预测建模中的实战技巧,涵盖时间索引处理、周期性与趋势分解、平稳性检测、ARIMA/SARIMA 模型构建、预测误差评估,以及基于实际业务场景的时间序列项目实操,帮助学习者掌握从数据预处理到模型预测的完整时序分析能力。

52

2025.12.04

无人机驾驶证报考 uom民用无人机综合管理平台官网
无人机驾驶证报考 uom民用无人机综合管理平台官网

无人机驾驶证(CAAC执照)报考需年满16周岁,初中以上学历,身体健康(矫正视力1.0以上,无严重疾病),且无犯罪记录。个人需通过民航局授权的训练机构报名,经理论(法规、原理)、模拟飞行、实操(GPS/姿态模式)及地面站训练后考试合格,通常15-25天拿证。

0

2026.01.21

Python多线程合集
Python多线程合集

本专题整合了Python多线程相关教程,阅读专题下面的文章了解更多详细内容。

0

2026.01.21

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

1

2026.01.21

windows激活码分享 windows一键激活教程指南
windows激活码分享 windows一键激活教程指南

Windows 10/11一键激活可以通过PowerShell脚本或KMS工具实现永久或长期激活。最推荐的简便方法是打开PowerShell(管理员),运行 irm https://get.activated.win | iex 脚本,按提示选择数字激活(选项1)。其他方法包括使用HEU KMS Activator工具进行智能激活。

1

2026.01.21

excel表格操作技巧大全 表格制作excel教程
excel表格操作技巧大全 表格制作excel教程

Excel表格操作的核心技巧在于 熟练使用快捷键、数据处理函数及视图工具,如Ctrl+C/V(复制粘贴)、Alt+=(自动求和)、条件格式、数据验证及数据透视表。掌握这些可大幅提升数据分析与办公效率,实现快速录入、查找、筛选和汇总。

3

2026.01.21

毒蘑菇显卡测试网站入口 毒蘑菇测试官网volumeshader_bm
毒蘑菇显卡测试网站入口 毒蘑菇测试官网volumeshader_bm

毒蘑菇VOLUMESHADER_BM测试网站网址为https://toolwa.com/vsbm/,该平台基于WebGL技术通过渲染高复杂度三维分形图形评估设备图形处理能力,用户可通过拖动彩色物体观察画面流畅度判断GPU与CPU协同性能;测试兼容多种设备,但中低端手机易卡顿或崩溃,高端机型可能因发热降频影响表现,桌面端需启用独立显卡并使用支持WebGL的主流浏览器以确保准确结果

7

2026.01.21

github中文官网入口 github中文版官网网页进入
github中文官网入口 github中文版官网网页进入

github中文官网入口https://docs.github.com/zh/get-started,GitHub 是一种基于云的平台,可在其中存储、共享并与他人一起编写代码。 通过将代码存储在GitHub 上的“存储库”中,你可以: “展示或共享”你的工作。 持续“跟踪和管理”对代码的更改。

4

2026.01.21

windows安全中心怎么关闭打开_windows安全中心操作指南
windows安全中心怎么关闭打开_windows安全中心操作指南

Windows安全中心可以通过系统设置轻松开关。 暂时关闭:打开“设置” -> “隐私和安全性” -> “Windows安全中心” -> “病毒和威胁防护” -> “管理设置”,将“实时保护”关闭。打开:同样路径将开关开启即可。如需彻底关闭,需在组策略(gpedit.msc)或注册表中禁用Windows Defender。

3

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
CSS3 教程
CSS3 教程

共18课时 | 4.7万人学习

PostgreSQL 教程
PostgreSQL 教程

共48课时 | 7.5万人学习

Django 教程
Django 教程

共28课时 | 3.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号