
本文介绍如何使用 python 对数据库查询结果中的购物车记录按 `product_id` 和 `quantity` 组合进行分组计数,精准统计同一用户下相同商品及数量的出现频次,适用于合并显示购物车重复项的业务场景。
在电商类应用中,用户可能多次将同一商品(相同 product_id)以相同数量(如 quantity=1000)加入购物车,此时前端通常需合并显示为“2 次 product_id=2 with quantity=1000”,而非重复渲染多条相同条目。虽然 SQL 可通过 GROUP BY product_id, quantity 高效完成统计,但若业务逻辑已获取全部购物车记录(例如 ORM 查询结果列表),推荐使用纯 Python 方式处理,避免额外数据库往返。
以下是一个健壮、可复用的统计函数:
from collections import defaultdict
def count_product_quantity_for_user(records, user_id):
"""
统计指定用户购物车中,(product_id, quantity) 组合的出现次数
:param records: 数据库查询返回的字典列表或类字典对象(如 SQLAlchemy Record)
:param user_id: 目标用户的唯一标识
:return: defaultdict(int),键为元组 (product_id, quantity),值为计数
"""
counts = defaultdict(int)
for record in records:
# 兼容属性访问(如 record.product_id)和键访问(如 record['product_id'])
try:
if getattr(record, 'user_id', record.get('user_id')) == user_id:
pid = getattr(record, 'product_id', record.get('product_id'))
qty = getattr(record, 'quantity', record.get('quantity'))
counts[(pid, qty)] += 1
except (AttributeError, TypeError):
continue # 跳过格式异常的记录,增强鲁棒性
return counts
# 示例调用
user_id = 582042554
# 假设 records 是从数据库查询得到的购物车记录列表
# records = session.query(Cart).filter(Cart.user_id == user_id).all()
counts = count_product_quantity_for_user(records, user_id)
# 格式化输出(可用于日志、调试或 API 响应)
for (product_id, quantity), count in counts.items():
print(f"{count} times product_id={product_id} with quantity={quantity}")注意事项:
- 若 quantity 是 Decimal 类型(如示例中的 Decimal('1000.00')),Python 元组可直接包含该对象,无需转换——Decimal('1000.00') == Decimal('1000') 为 False,因此精度差异会被准确区分;如需忽略小数位差异,建议提前归一化(如转为 float 后四舍五入)。
- 函数内部采用 getattr + dict.get() 双重兼容策略,适配 ORM 对象(如 SQLAlchemy)和字典两种常见数据结构。
- 实际生产环境建议结合数据库层优化:对高频查询添加 (user_id, product_id, quantity) 复合索引,并在必要时改用 SQL GROUP BY 提升性能(尤其当购物车数据量较大时)。
该方案简洁、可读性强,且易于集成至 Flask/FastAPI 等 Web 框架的购物车服务中,是平衡开发效率与运行可靠性的实用选择。










