Home > Backend Development > Python Tutorial > Teach you step by step how to use Flask to build an ES search engine (preparatory part)

Teach you step by step how to use Flask to build an ES search engine (preparatory part)

Release: 2023-07-25 17:27:28
forward
1132 people have browsed it

/1 Introduction/

## Elasticsearch is an open source search engine built on a full-text search engine library Apache Lucene™ Based on.


Teach you step by step how to use Flask to build an ES search engine (preparatory part)

# So how to realize the connection between

Elasticsearch and Python has become our concern The problem is (why should everything be related to Python?).

##/2 Python Interaction/

So, Python also provides a way to connect Elasticsearch's dependent libraries.


1

pip install elasticsearch

Copy after login


Initialize a connection

Elasticsearch Operation object.

1

2

3

4

5

6

def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):

 

    # self.es = Elasticsearch([ip], http_auth=('username', 'password'), port=9200)

    self.es = Elasticsearch("localhost:9200")

    self.index_type = index_type

    self.index_name = index_name

Copy after login
Default port

9200, please ensure that the local environment of Elasticsearch has been set up before initialization.

Get document data based on ID


##

1

2

def get_doc(self, uid):

    return self.es.get(index=self.index_name, id=uid)

Copy after login

插入文档数据


1

2

3

4

5

6

def insert_one(self, doc: dict):

    self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

 

def insert_array(self, docs: list):

    for doc in docs:

        self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

Copy after login


搜索文档数据


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

def search(self, query, count: int = 30):

    dsl = {

        "query": {

            "multi_match": {

                "query": query,

                "fields": ["title", "content", "link"]

            }

        },

        "highlight": {

            "fields": {

                "title": {}

            }

        }

    }

    match_data = self.es.search(index=self.index_name, body=dsl, size=count)

    return match_data

 

def __search(self, query: dict, count: int = 20): # count: 返回的数据大小

    results = []

    params = {

        'size': count

    }

    match_data = self.es.search(index=self.index_name, body=query, params=params)

    for hit in match_data['hits']['hits']:

        results.append(hit['_source'])

 

    return results

Copy after login

删除文档数据


1

2

3

4

5

def delete_index(self):

    try:

        self.es.indices.delete(index=self.index_name)

    except:

        pass

Copy after login

好啊,封装 search 类也是为了方便调用,整体贴一下。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

from elasticsearch import Elasticsearch

 

 

class elasticSearch():

 

    def __init__(self, index_type: str, index_name: str, ip="127.0.0.1"):

 

        # self.es = Elasticsearch([ip], http_auth=('elastic', 'password'), port=9200)

        self.es = Elasticsearch("localhost:9200")

        self.index_type = index_type

        self.index_name = index_name

 

    def create_index(self):

        if self.es.indices.exists(index=self.index_name) is True:

            self.es.indices.delete(index=self.index_name)

        self.es.indices.create(index=self.index_name, ignore=400)

 

    def delete_index(self):

        try:

            self.es.indices.delete(index=self.index_name)

        except:

            pass

 

    def get_doc(self, uid):

        return self.es.get(index=self.index_name, id=uid)

 

    def insert_one(self, doc: dict):

        self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

 

    def insert_array(self, docs: list):

        for doc in docs:

            self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)

 

    def search(self, query, count: int = 30):

        dsl = {

            "query": {

                "multi_match": {

                    "query": query,

                    "fields": ["title", "content", "link"]

                }

            },

            "highlight": {

                "fields": {

                    "title": {}

                }

            }

        }

        match_data = self.es.search(index=self.index_name, body=dsl, size=count)

        return match_data

Copy after login

尝试一下把 Mongodb 中的数据插入到 ES 中。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

import json

from datetime import datetime

import pymongo

from app.elasticsearchClass import elasticSearch

 

client = pymongo.MongoClient('127.0.0.1', 27017)

db = client['spider']

sheet = db.get_collection('Spider').find({}, {'_id': 0, })

 

es = elasticSearch(index_type="spider_data",index_name="spider")

es.create_index()

 

for i in sheet:

    data = {

            'title': i["title"],

            'content':i["data"],

            'link': i["link"],

            'create_time':datetime.now()

        }

 

    es.insert_one(doc=data)

Copy after login

ES 中查看一下,启动 elasticsearch-head 插件。

如果是 npm 安装的那么 cd 到根目录之后直接 npm run start 就跑起来了。

本地访问 http://localhost:9100/

Teach you step by step how to use Flask to build an ES search engine (preparatory part)

发现新加的 spider 数据文档确实已经进去了。

/3 爬虫入库/

要想实现 ES 搜索,首先要有数据支持,而海量的数据往往来自爬虫。

为了节省时间,编写一个最简单的爬虫,抓取 百度百科

简单粗暴一点,先 递归获取 很多很多的 url 链接


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

import requests

import re

import time

 

exist_urls = []

headers = {

    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.62 Safari/537.36',

}

 

def get_link(url):

    try:

        response = requests.get(url=url, headers=headers)

        response.encoding = 'UTF-8'

        html = response.text

        link_lists = re.findall(&#39;.*?<a target=_blank href="/item/([^:#=<>]*?)".*?</a>&#39;, html)

        return link_lists

    except Exception as e:

        pass

    finally:

        exist_urls.append(url)

 

 

# 当爬取深度小于10层时,递归调用主函数,继续爬取第二层的所有链接

def main(start_url, depth=1):

    link_lists = get_link(start_url)

    if link_lists:

        unique_lists = list(set(link_lists) - set(exist_urls))

        for unique_url in unique_lists:

            unique_url = &#39;https://baike.baidu.com/item/&#39; + unique_url

 

            with open(&#39;url.txt&#39;, &#39;a+&#39;) as f:

                f.write(unique_url + &#39;\n&#39;)

                f.close()

        if depth < 10:

            main(unique_url, depth + 1)

 

if __name__ == &#39;__main__&#39;:

    start_url = &#39;https://baike.baidu.com/item/%E7%99%BE%E5%BA%A6%E7%99%BE%E7%A7%91&#39;

    main(start_url)

Copy after login


把全部 url 存到 url.txt 文件中之后,然后启动任务。


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

# parse.py

from celery import Celery

import requests

from lxml import etree

import pymongo

app = Celery(&#39;tasks&#39;, broker=&#39;redis://localhost:6379/2&#39;)

client = pymongo.MongoClient(&#39;localhost&#39;,27017)

db = client[&#39;baike&#39;]

@app.task

def get_url(link):

    item = {}

    headers = {&#39;User-Agent&#39;:&#39;Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.131 Safari/537.36&#39;}

    res = requests.get(link,headers=headers)

    res.encoding = &#39;UTF-8&#39;

    doc = etree.HTML(res.text)

    content = doc.xpath("//div[@class=&#39;lemma-summary&#39;]/div[@class=&#39;para&#39;]//text()")

    print(res.status_code)

    print(link,&#39;\t&#39;,&#39;++++++++++++++++++++&#39;)

    item[&#39;link&#39;] = link

    data = &#39;&#39;.join(content).replace(&#39; &#39;, &#39;&#39;).replace(&#39;\t&#39;, &#39;&#39;).replace(&#39;\n&#39;, &#39;&#39;).replace(&#39;\r&#39;, &#39;&#39;)

    item[&#39;data&#39;] = data

    if db[&#39;Baike&#39;].insert(dict(item)):

        print("is OK ...")

    else:

        print(&#39;Fail&#39;)

Copy after login

run.py 飞起来


1

2

3

4

5

6

7

8

9

10

11

12

13

from parse import get_url

 

def main(url):

    result = get_url.delay(url)

    return result

 

def run():

    with open(&#39;./url.txt&#39;, &#39;r&#39;) as f:

        for url in f.readlines():

            main(url.strip(&#39;\n&#39;))

 

if __name__ == &#39;__main__&#39;:

    run()

Copy after login


黑窗口键入


1

celery -A parse worker -l info -P gevent -c 10

Copy after login

哦豁 !!   你居然使用了 Celery 任务队列,gevent 模式,-c 就是10个线程刷刷刷就干起来了,速度杠杠的 !!

啥?分布式? 那就加多几台机器啦,直接把代码拷贝到目标服务器,通过 redis 共享队列协同多机抓取。

这里是先将数据存储到了 MongoDB 上(个人习惯),你也可以直接存到 ES 中,但是单条单条的插入速度堪忧(接下来会讲到优化,哈哈)。

使用前面的例子将 Mongo 中的数据批量导入到 ES 中,OK !!!

Teach you step by step how to use Flask to build an ES search engine (preparatory part)

到这一个简单的数据抓取就已经完毕了。

好啦,现在 ES 中已经有了数据啦,接下来就应该是 Flask web 的操作啦,当然,DjangoFastAPI 也很优秀。嘿嘿,你喜欢 !!

关于FastAPI 的文章可以看这个系列文章:

1、(入门篇)简析Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

2、(进阶篇)Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

3、(完结篇)Python web框架FastAPI——一个比Flask和Tornada更高性能的API 框架

/4 Flask 项目结构/

Teach you step by step how to use Flask to build an ES search engine (preparatory part)


这样一来前期工作就差不多了,接下来剩下的工作主要集中于 Flask 的实际开发中,蓄力中 !!

The above is the detailed content of Teach you step by step how to use Flask to build an ES search engine (preparatory part). For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:Go语言进阶学习
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template