0

0

手把手教你使用Flask搭建ES搜索引擎(预备篇)

Go语言进阶学习

Go语言进阶学习

发布时间:2023-07-25 17:27:28

|

1327人浏览过

|

来源于Go语言进阶学习

转载

/1 前言/

    Elasticsearch 是一个开源的搜索引擎,建立在一个全文搜索引擎库 Apache Lucene™ 基础之上。


手把手教你使用Flask搭建ES搜索引擎(预备篇)

    那么如何实现 elasticsearchpython  的对接成为我们所关心的问题了 (怎么什么都要和 python 关联啊)。

/2 Python 交互/

所以,Python 也就提供了可以对接 Elasticsearch的依赖库。


pip install elasticsearch


初始化连接一个 Elasticsearch  操作对象。

def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):

    # self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)
    self.es = Elasticsearch("localhost:9200")
    self.index_type = index_type
    self.index_name = index_name

默认端口 9200,初始化前请确保本地已搭建好 Elasticsearch的所属环境。

根据 ID 获取文档数据


def get_doc(self, uid):
    return self.es.get(index=self.index_name, id=uid)


插入文档数据


def insert_one(self, doc: dict):
    self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

def insert_array(self, docs: list):
    for doc in docs:
        self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)


搜索文档数据


def search(self, query, count: int = 30):
    dsl = {
        "query": {
            "multi_match": {
                "query": query,
                "fields": ["title", "content", "link"]
            }
        },
        "highlight": {
            "fields": {
                "title": {}
            }
        }
    }
    match_data = self.es.search(index=self.index_name, body=dsl, size=count)
    return match_data

def __search(self, query: dict, count: int = 20): # count: 返回的数据大小
    results = []
    params = {
        'size': count
    }
    match_data = self.es.search(index=self.index_name, body=query, params=params)
    for hit in match_data['hits']['hits']:
        results.append(hit['_source'])

    return results

删除文档数据


def delete_index(self):
    try:
        self.es.indices.delete(index=self.index_name)
    except:
        pass

好啊,封装 search 类也是为了方便调用,整体贴一下。

from elasticsearch import Elasticsearch


class elasticSearch():

    def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):

        # self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)
        self.es = Elasticsearch("localhost:9200")
        self.index_type = index_type
        self.index_name = index_name

    def create_index(self):
        if self.es.indices.exists(index=self.index_name) is True:
            self.es.indices.delete(index=self.index_name)
        self.es.indices.create(index=self.index_name, ignore=400)

    def delete_index(self):
        try:
            self.es.indices.delete(index=self.index_name)
        except:
            pass

    def get_doc(self, uid):
        return self.es.get(index=self.index_name, id=uid)

    def insert_one(self, doc: dict):
        self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

    def insert_array(self, docs: list):
        for doc in docs:
            self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

    def search(self, query, count: int = 30):
        dsl = {
            "query": {
                "multi_match": {
                    "query": query,
                    "fields": ["title", "content", "link"]
                }
            },
            "highlight": {
                "fields": {
                    "title": {}
                }
            }
        }
        match_data = self.es.search(index=self.index_name, body=dsl, size=count)
        return match_data

尝试一下把 Mongodb 中的数据插入到 ES 中。

import json
from datetime import datetime
import pymongo
from app.elasticsearchClass import elasticSearch

client = pymongo.MongoClient('127.0.0.1', 27017)
db = client['spider']
sheet = db.get_collection('Spider').find({}, {'_id': 0, })

es = elasticSearch(index_type="spider_data",index_name="spider")
es.create_index()

for i in sheet:
    data = {
            'title': i["title"],
            'content':i["data"],
            'link': i["link"],
            'create_time':datetime.now()
        }

    es.insert_one(doc=data)

到 ES 中查看一下,启动 elasticsearch-head 插件。

如果是 npm 安装的那么 cd 到根目录之后直接 npm run start 就跑起来了。

本地访问  http://localhost:9100/

手把手教你使用Flask搭建ES搜索引擎(预备篇)

发现新加的 spider 数据文档确实已经进去了。

/3 爬虫入库/

要想实现 ES 搜索,首先要有数据支持,而海量的数据往往来自爬虫。

Mokker AI
Mokker AI

AI产品图添加背景

下载

为了节省时间,编写一个最简单的爬虫,抓取 百度百科

简单粗暴一点,先 递归获取 很多很多的 url 链接


import requests
import re
import time

exist_urls = []
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',
}

def get_link(url):
    try:
        response = requests.get(url=url, headers=headers)
        response.encoding = 'UTF-8'
        html = response.text
        link_lists = re.findall('.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>', html)
        return link_lists
    except Exception as e:
        pass
    finally:
        exist_urls.append(url)


# 当爬取深度小于10层时,递归调用主函数,继续爬取第二层的所有链接
def main(start_url, depth=1):
    link_lists = get_link(start_url)
    if link_lists:
        unique_lists = list(set(link_lists) - set(exist_urls))
        for unique_url in unique_lists:
            unique_url = 'https://baike.baidu.com/item/' + unique_url

            with open('url.txt', 'a+') as f:
                f.write(unique_url + '\n')
                f.close()
        if depth < 10:
            main(unique_url, depth + 1)

if __name__ == '__main__':
    start_url = 'https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91'
    main(start_url)


把全部 url 存到 url.txt 文件中之后,然后启动任务。


# parse.py
from celery import Celery
import requests
from lxml import etree
import pymongo
app = Celery('tasks', broker='redis://localhost:6379/2')
client = pymongo.MongoClient('localhost',27017)
db = client['baike']
@app.task
def get_url(link):
    item = {}
    headers = {'User-Agent':'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36'}
    res = requests.get(link,headers=headers)
    res.encoding = 'UTF-8'
    doc = etree.HTML(res.text)
    content = doc.xpath("//div[@class='lemma-summary']/div[@class='para']//text()")
    print(res.status_code)
    print(link,'\t','++++++++++++++++++++')
    item['link'] = link
    data = ''.join(content).replace(' ', '').replace('\t', '').replace('\n', '').replace('\r', '')
    item['data'] = data
    if db['Baike'].insert(dict(item)):
        print("is OK ...")
    else:
        print('Fail')

run.py 飞起来


from parse import get_url

def main(url):
    result = get_url.delay(url)
    return result

def run():
    with open('./url.txt', 'r') as f:
        for url in f.readlines():
            main(url.strip('\n'))

if __name__ == '__main__':
    run()


黑窗口键入


celery -A parse worker -l info -P gevent -c 10

哦豁 !!   你居然使用了 Celery 任务队列,gevent 模式,-c 就是10个线程刷刷刷就干起来了,速度杠杠的 !!

啥?分布式? 那就加多几台机器啦,直接把代码拷贝到目标服务器,通过 redis 共享队列协同多机抓取。

这里是先将数据存储到了 MongoDB 上(个人习惯),你也可以直接存到 ES 中,但是单条单条的插入速度堪忧(接下来会讲到优化,哈哈)。

使用前面的例子将 Mongo 中的数据批量导入到 ES 中,OK !!!

手把手教你使用Flask搭建ES搜索引擎(预备篇)

到这一个简单的数据抓取就已经完毕了。

好啦,现在 ES 中已经有了数据啦,接下来就应该是 Flask web 的操作啦,当然,DjangoFastAPI 也很优秀。嘿嘿,你喜欢 !!

关于FastAPI 的文章可以看这个系列文章:

1、(入门篇)简析Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

2、(进阶篇)Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

3、(完结篇)Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

/4 Flask 项目结构/

手把手教你使用Flask搭建ES搜索引擎(预备篇)


这样一来前期工作就差不多了,接下来剩下的工作主要集中于 Flask 的实际开发中,蓄力中 !!

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

WorkBuddy
WorkBuddy

腾讯云推出的AI原生桌面智能体工作台

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
Python Web 框架 Django 深度开发
Python Web 框架 Django 深度开发

本专题系统讲解 Python Django 框架的核心功能与进阶开发技巧,包括 Django 项目结构、数据库模型与迁移、视图与模板渲染、表单与认证管理、RESTful API 开发、Django 中间件与缓存优化、部署与性能调优。通过实战案例,帮助学习者掌握 使用 Django 快速构建功能全面的 Web 应用与全栈开发能力。

166

2026.02.04

Python Flask框架
Python Flask框架

本专题专注于 Python 轻量级 Web 框架 Flask 的学习与实战,内容涵盖路由与视图、模板渲染、表单处理、数据库集成、用户认证以及RESTful API 开发。通过博客系统、任务管理工具与微服务接口等项目实战,帮助学员掌握 Flask 在快速构建小型到中型 Web 应用中的核心技能。

104

2025.08.25

Python Flask Web框架与API开发
Python Flask Web框架与API开发

本专题系统介绍 Python Flask Web框架的基础与进阶应用,包括Flask路由、请求与响应、模板渲染、表单处理、安全性加固、数据库集成(SQLAlchemy)、以及使用Flask构建 RESTful API 服务。通过多个实战项目,帮助学习者掌握使用 Flask 开发高效、可扩展的 Web 应用与 API。

81

2025.12.15

什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

407

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

251

2023.10.07

Python FastAPI异步API开发_Python怎么用FastAPI构建异步API
Python FastAPI异步API开发_Python怎么用FastAPI构建异步API

Python FastAPI 异步开发利用 async/await 关键字,通过定义异步视图函数、使用异步数据库库 (如 databases)、异步 HTTP 客户端 (如 httpx),并结合后台任务队列(如 Celery)和异步依赖项,实现高效的 I/O 密集型 API,显著提升吞吐量和响应速度,尤其适用于处理数据库查询、网络请求等耗时操作,无需阻塞主线程。

28

2025.12.22

Python 微服务架构与 FastAPI 框架
Python 微服务架构与 FastAPI 框架

本专题系统讲解 Python 微服务架构设计与 FastAPI 框架应用,涵盖 FastAPI 的快速开发、路由与依赖注入、数据模型验证、API 文档自动生成、OAuth2 与 JWT 身份验证、异步支持、部署与扩展等。通过实际案例,帮助学习者掌握 使用 FastAPI 构建高效、可扩展的微服务应用,提高服务响应速度与系统可维护性。

251

2026.02.06

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

765

2023.08.10

Python异步编程与Asyncio高并发应用实践
Python异步编程与Asyncio高并发应用实践

本专题围绕 Python 异步编程模型展开,深入讲解 Asyncio 框架的核心原理与应用实践。内容包括事件循环机制、协程任务调度、异步 IO 处理以及并发任务管理策略。通过构建高并发网络请求与异步数据处理案例,帮助开发者掌握 Python 在高并发场景中的高效开发方法,并提升系统资源利用率与整体运行性能。

37

2026.03.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Python Web框架Flask进阶视频教程
Python Web框架Flask进阶视频教程

共12课时 | 3万人学习

Python Web框架Flask入门视频教程
Python Web框架Flask入门视频教程

共7课时 | 2.6万人学习

Flask实战视频教程
Flask实战视频教程

共9课时 | 2.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号