0

0

Python怎样操作Apache Druid?pydruid查询

雪夜

雪夜

发布时间:2025-08-13 13:46:01

|

680人浏览过

|

来源于php中文网

原创

首先,使用pydruid库操作apache druid需构建json查询并发送至druid集群;1. 安装pydruid:pip install pydruid;2. 使用querybuilder或直接构造json发送请求;3. 查询包含datasource、intervals、granularity、aggregations和dimensions等核心字段;4. 针对查询慢问题,优化方法包括:优化索引、合理分片、避免全表扫描、减少返回列数、使用limit、启用近似查询、开启缓存、调优资源、优化数据模型、避免复杂join、升级pydruid版本;5. 处理时间戳需注意iso 8601格式、数据摄入时配置timestampspec、查询时使用datetime对象、处理时区转换、合理设置时间粒度、处理null值及使用bound过滤;6. 对于复杂聚合如百分位数,需构造含quantilesdoublessketch聚合器和quantilesdoublessketchtoquantiles后聚合器的json查询,并确保datasketches扩展已加载;7. 可考虑使用druidapi等更高级库简化复杂查询构建。

Python怎样操作Apache Druid?pydruid查询

Python操作Apache Druid,核心在于构建查询并发送给Druid集群。通常,你会使用

pydruid
这个库,它简化了与Druid交互的过程。
pydruid
查询的重点在于构造合适的JSON查询体,并处理Druid返回的结果。

解决方案

首先,你需要安装

pydruid

立即学习Python免费学习笔记(深入)”;

pip install pydruid

然后,就可以开始构建查询了。一个基本的Druid查询包含以下几个部分:

dataSource
(数据源),
intervals
(时间范围),
granularity
(时间粒度),
aggregations
(聚合操作)和
dimensions
(维度)。

例如,要查询名为

wikipedia
的数据源,统计过去24小时内每个小时的事件数量,可以这样写:

from pydruid.client import *
from pydruid.query import QueryBuilder
import datetime

client = PyDruid('http://your_druid_host:8082', 'druid/v2') # 替换为你的Druid Coordinator地址

q = QueryBuilder()
q.datasource('wikipedia')
q.intervals(datetime.datetime.now() - datetime.timedelta(days=1), datetime.datetime.now())
q.granularity('hour')
q.aggregator('count', 'events')

results = client.query(q)

for row in results:
    print(row)

这段代码首先连接到Druid Coordinator,然后使用

QueryBuilder
构建一个查询。
dataSource
指定了数据源,
intervals
指定了查询的时间范围,
granularity
指定了时间粒度为小时,
aggregator
指定了聚合操作为统计事件数量。最后,使用
client.query(q)
执行查询,并打印结果。

除了

QueryBuilder
,你也可以直接构建JSON查询体,然后发送给Druid。这种方式更灵活,可以支持更复杂的查询。

import requests
import json

druid_url = 'http://your_druid_host:8082/druid/v2/?pretty' # 替换为你的Druid Coordinator地址

query = {
    "queryType": "groupBy",
    "dataSource": "wikipedia",
    "intervals": [
        f"{datetime.datetime.now() - datetime.timedelta(days=1)}/{datetime.datetime.now()}"
    ],
    "granularity": "hour",
    "dimensions": [],
    "aggregations": [
        {"type": "count", "name": "events"}
    ],
    "limit": 10
}

headers = {'Content-Type': 'application/json'}
response = requests.post(druid_url, data=json.dumps(query), headers=headers)

if response.status_code == 200:
    results = response.json()
    for row in results:
        print(row)
else:
    print(f"Error: {response.status_code} - {response.text}")

这段代码直接构建了一个JSON查询体,并使用

requests
库发送给Druid。注意,你需要根据你的Druid集群配置修改
druid_url

pydruid查询慢,有哪些优化方法?

  1. 索引优化: Druid的性能很大程度上取决于索引。确保你的数据源配置了合适的索引,特别是针对经常用于过滤和分组的维度。检查

    indexSpec
    ,确保使用了合适的bitmap索引和column索引。例如,
    string
    类型的维度列,可以考虑使用
    invertedIndex

  2. 数据分片: Druid通过segment来组织数据。合理的分片策略可以提高查询性能。过大或过小的segment都会影响性能。一般来说,每个segment的大小在300MB到700MB之间比较合适。检查你的

    segmentGranularity
    配置。

  3. 查询优化: 避免全表扫描。尽量使用时间范围过滤,缩小查询范围。尽量减少返回的列数,只选择需要的列。合理使用

    limit
    ,避免返回过多的数据。

  4. 资源调优: 确保你的Druid集群有足够的资源。增加Historical节点的数量,提高查询并发能力。调整Historical节点的内存大小,提高数据缓存能力。检查Coordinator和Overlord节点的配置,确保它们能够及时处理任务。

  5. 使用近似查询: 对于一些不需要精确结果的查询,可以使用近似查询,例如

    approxCountDistinct
    。近似查询可以显著提高查询性能。

  6. 缓存: Druid有两级缓存:query cache和result cache。确保query cache开启,可以缓存最近的查询结果。Result cache可以缓存更细粒度的结果,但需要额外的配置。

  7. 监控和诊断: 使用Druid的监控工具,例如Druid Console,监控查询性能。分析查询日志,找出慢查询的原因。使用Druid的查询分析工具,例如

    explain
    命令,分析查询计划。

  8. 数据建模: 优化数据模型。如果你的数据源包含多个维度,可以考虑使用rollup,预先计算一些聚合结果。Rollup可以显著提高查询性能,但会增加数据摄入的复杂度。

  9. 避免使用复杂的JOIN操作: Druid本身并不擅长复杂的JOIN操作。尽量在数据摄入阶段完成JOIN操作,或者使用lookup external。

    OneAI
    OneAI

    将生成式AI技术打包为API,整合到企业产品和服务中

    下载
  10. Pydruid版本: 确保使用的

    pydruid
    是最新版本,新版本可能包含性能优化。

如何处理pydruid查询中的时间戳问题?

时间戳在Druid中至关重要,因为它用于数据分片和查询过滤。在

pydruid
中处理时间戳,需要注意以下几点:

  1. Druid的时间戳格式: Druid默认使用ISO 8601格式的时间戳,例如

    2023-10-27T10:00:00.000Z
    。确保你的时间戳数据符合这个格式。

  2. 数据摄入: 在数据摄入阶段,需要指定时间戳列。通常,你会使用

    timestampSpec
    来配置时间戳列的名称和格式。如果你的时间戳数据不是ISO 8601格式,你需要指定
    format
    参数。

    {
      "type": "index_parallel",
      "spec": {
        "dataSchema": {
          "dataSource": "your_data_source",
          "timestampSpec": {
            "column": "your_timestamp_column",
            "format": "yyyy-MM-dd HH:mm:ss"
          },
          "dimensionsSpec": {
            "dimensions": [
              "dimension1",
              "dimension2"
            ]
          },
          "metricsSpec": [
            {
              "type": "count",
              "name": "count"
            }
          ]
        },
        "ioConfig": {
          "type": "index_parallel",
          "inputSource": {
            "type": "local",
            "baseDir": "/path/to/your/data",
            "filter": "*.csv"
          },
          "inputFormat": {
            "type": "csv",
            "columns": [
              "your_timestamp_column",
              "dimension1",
              "dimension2"
            ]
          }
        },
        "tuningConfig": {
          "type": "index_parallel",
          "maxRowsInMemory": 75000,
          "forceGuaranteedRollup": true
        }
      }
    }
  3. 查询时间范围:

    pydruid
    查询中,你需要使用
    datetime
    对象来指定时间范围。
    pydruid
    会自动将
    datetime
    对象转换为Druid需要的ISO 8601格式。

    from pydruid.client import *
    from pydruid.query import QueryBuilder
    import datetime
    
    client = PyDruid('http://your_druid_host:8082', 'druid/v2')
    
    q = QueryBuilder()
    q.datasource('your_data_source')
    q.intervals(datetime.datetime(2023, 10, 26), datetime.datetime(2023, 10, 27))
    q.granularity('day')
    q.aggregator('count', 'events')
    
    results = client.query(q)
    
    for row in results:
        print(row)
  4. 时区问题: Druid内部使用UTC时间。如果你的时间戳数据不是UTC时间,你需要进行时区转换。可以在数据摄入阶段进行时区转换,也可以在查询阶段进行时区转换。

  5. 时间粒度: 在查询时,你需要指定时间粒度。时间粒度决定了Druid如何聚合数据。常见的时间粒度包括

    all
    year
    month
    day
    hour
    minute
    second

  6. 处理Null时间戳: 有时,你的数据可能包含Null时间戳。你需要决定如何处理这些Null时间戳。可以选择忽略这些数据,也可以选择使用默认时间戳填充。

  7. 时间戳过滤: 在查询时,可以使用时间戳过滤来缩小查询范围。可以使用

    bound
    过滤器来指定时间范围。

    {
      "type": "timeseries",
      "dataSource": "your_data_source",
      "intervals": [
        "2023-10-26T00:00:00.000Z/2023-10-27T00:00:00.000Z"
      ],
      "granularity": "day",
      "aggregations": [
        {
          "type": "count",
          "name": "events"
        }
      ],
      "filters": [
        {
          "type": "bound",
          "dimension": "__time",
          "lower": "2023-10-26T12:00:00.000Z",
          "upper": "2023-10-26T18:00:00.000Z",
          "ordering": "numeric"
        }
      ]
    }

pydruid如何进行更复杂的聚合查询,例如计算百分位数?

对于更复杂的聚合查询,例如计算百分位数,

pydruid
可能不够直接。你需要构造更底层的JSON查询,利用Druid的
quantilesDoublesSketch
聚合器。

首先,你需要确保你的Druid集群已经加载了

datasketches
扩展。

然后,你可以构建如下的JSON查询:

import requests
import json
import datetime

druid_url = 'http://your_druid_host:8082/druid/v2/?pretty'

query = {
    "queryType": "groupBy",
    "dataSource": "your_data_source",
    "intervals": [
        f"{datetime.datetime.now() - datetime.timedelta(days=1)}/{datetime.datetime.now()}"
    ],
    "granularity": "all",
    "dimensions": [],
    "aggregations": [
        {
            "type": "quantilesDoublesSketch",
            "name": "value_sketch",
            "fieldName": "your_value_column",
            "k": 128  # 可选,控制精度,默认值是128
        }
    ],
    "postAggregations": [
        {
            "type": "quantilesDoublesSketchToQuantiles",
            "name": "quantiles",
            "field": {
                "type": "fieldAccess",
                "fieldName": "value_sketch"
            },
            "fractions": [0.25, 0.5, 0.75, 0.9, 0.99]  # 要计算的百分位数
        }
    ]
}

headers = {'Content-Type': 'application/json'}
response = requests.post(druid_url, data=json.dumps(query), headers=headers)

if response.status_code == 200:
    results = response.json()
    for row in results:
        print(row)
else:
    print(f"Error: {response.status_code} - {response.text}")

这个查询首先使用

quantilesDoublesSketch
聚合器计算
your_value_column
的sketch。然后,使用
quantilesDoublesSketchToQuantiles
post-aggregator计算指定的百分位数。

fieldName
指定了要计算百分位数的列。
fractions
指定了要计算的百分位数,例如
[0.25, 0.5, 0.75]
表示计算25%,50%和75%的百分位数。

注意,

k
参数控制了
quantilesDoublesSketch
的精度。
k
越大,精度越高,但内存消耗也越大。

除了百分位数,Druid还支持其他的复杂聚合操作,例如

approxCountDistinct
(近似去重计数),
thetaSketch
(用于集合操作)等。你可以根据你的需求选择合适的聚合器。

使用

pydruid
构建复杂的JSON查询可能比较繁琐。你可以考虑使用其他的Python库,例如
druidapi
,它提供了更高级的API,可以更方便地构建复杂的查询。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

760

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

639

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

762

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

618

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1265

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

549

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

579

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

709

2023.08.11

高德地图升级方法汇总
高德地图升级方法汇总

本专题整合了高德地图升级相关教程,阅读专题下面的文章了解更多详细内容。

72

2026.01.16

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 4.6万人学习

Django 教程
Django 教程

共28课时 | 3.2万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号