Home > Database > Redis > body text

How to implement concurrent queuing based on redis optimistic locking

WBOY
Release: 2023-06-04 09:58:09
forward
1606 people have browsed it

There is a demand scenario like this, using redis to control the number of scrapy runs. After setting the system background to 4, scrapy can only start up to 4 tasks, and excess tasks will be queued to wait.

Overview

I recently built a django scrapy celery redis crawler system. In addition to running other programs, the host purchased by the customer also needs to run the set of programs I developed, so scrapy needs to be manually controlled. The number of instances to avoid excessive crawlers burdening the system.

Process Design

1. The crawler task is initiated by the user in the form of a request, and all user requests are uniformly entered into celery for queuing;
2. The execution of task quantity control is handed over For reids, it is saved to redis via celery, which contains the necessary information required for crawler startup. A crawler can be started by taking a piece of information from redis;
3. Get the number of currently running crawlers through the scrapyd interface to make a decision Next step: If it is less than 4, get the corresponding amount of information from redis to start the crawler. If it is greater than or equal to 4, continue to wait;
4. If the number of running crawlers decreases, timely retrieve it from reids Get the corresponding amount of information to start the crawler.

Code implementation

The business code is a bit complicated and long-winded. Pseudocode is used here to demonstrate

import redis

# 实例化一个redis连接池
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='')

r = redis.Redis(connection_pool=pool)
# 爬虫实例限制为4 即只允许4个scrapy实例在运行
limited = 4

# 声明redis的乐观锁
lock = r.Lock()

# lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码
if lock.acquire():
	# 1、从reids中取一条爬虫信息
	info = redis.get() 
	
	# 2、while循环监听爬虫运行的数量
	while True:
		req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
		# 统计当前有多少个爬虫在运行
		running = req.get('running') + req.get('pending')
		
		# 3、判断是否等待还是要增加爬虫数量
		# 3.1 如果在运行的数量大于等于设置到量 则继续等待
		if running >= limited:
			continue
		
		# 3.2 如果小于 则启动爬虫
		start_scrapy(info)
		# 3.3 将info从redis中删除
		redis.delete(info)
		# 3.4 释放锁
		lock.release()
		break		
Copy after login

Currently, this is just pseudocode. The actual business logic may be very Complex ones, such as:

@shared_task
def scrapy_control(key_uuid):

    r = redis.Redis(connection_pool=pool)
    db = MysqlDB()
    speed_limited = db.fetch_config('REPTILE_SPEED')
    speed_limited = int(speed_limited[0])

    keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM')
    keywords_num = int(keywords_num[0])


    # while True:
    lock = r.lock('lock')
    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' +  '\n')
    try:
        # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中
        if lock.acquire():
            with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' +  '\n')
            # 1 从redis中获取信息
            redis_obj = json.loads(r.get(key_uuid))
            user_id = redis_obj.get('user_id')
            contents = redis_obj.get('contents')
            
            # 2 使用while循环处理核心逻辑          
            is_hold_print = True
            while True:
                req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json()
                running = req.get('running') + req.get('pending')
                # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出
                if running >= speed_limited:
                    if is_hold_print:
                        with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' +  '\n')
                        is_hold_print = False
                    time.sleep(1)
                    continue
                
                # 4 有空余的爬虫位置 则往下走
                # 4.1 处理完所有的内容后 释放锁
                if len(contents) == 0:
                    r.delete(key_uuid)
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' +  '\n')
                    lock.release()
                    with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' +  '\n')
                    break

                # 4.2 创建task任务
                task_uuid = str(uuid.uuid4())
                article_obj = contents.pop()
                article_id = article_obj.get('article_id')
                article = article_obj.get('content')
                try:
                    Task.objects.create(
                        task_uuid = task_uuid,
                        user_id = user_id,
                        article_id = article_id,
                        content = article
                    )
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) +  '\n')
                # finally:
                # 4.3 启动爬虫任务 即便创建task失败也会启动
                try:
                    task_chain(user_id, article, task_uuid, keywords_num)
                except Exception as e:
                    with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) +  '\n')
                
                # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫
                time.sleep(5)

    except Exception as e:
        with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) +  '\n')
        lock.release()
Copy after login

小空
scrapy startup speed is relatively slow, so in the while loop, the code is executed to start the crawler, and you need to sleep for a while before getting the crawler through the scrapyd interface. The number of runs, if read immediately, may cause misjudgment.

The above is the detailed content of How to implement concurrent queuing based on redis optimistic locking. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:yisu.com
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template
About us Disclaimer Sitemap
php.cn:Public welfare online PHP training,Help PHP learners grow quickly!