有個需求場景是這樣的,使用redis控制scrapy運行的數量。設定係統後台為4後,scrapy最多只能啟動4個任務,多餘的任務將被排隊等待。
最近做了一個django scrapy celery redis 的爬蟲系統,客戶購買的主機除了跑其他程序外,還要跑我開發的這套程序,所以需要手動控制scrapy的實例數量,避免過多的爬蟲對系統造成負擔。
1、爬蟲任務由使用者以請求的方式發起,所有的使用者的請求統一進入到celery進行排隊;
2、任務數量控制的執行就交給reids,經由celery保存到redis,包含了爬蟲啟動所需的必要信息,從redis取一條信息即可啟動一個爬蟲;
3、通過scrapyd的接口來獲取當前在運行的爬蟲數量,以便決定下一步流程:如果小於4,則從redis中取相應數量的信息來啟動爬蟲,如果大於等於4,則繼續等待;
4、如果在運行爬蟲的數量有所減少,則及時從reids中取相應數量的資訊來啟動爬蟲。
業務程式碼有點複雜和囉嗦,這裡使用偽程式碼來示範
import redis # 实例化一个redis连接池 pool = redis.ConnectionPool(host='127.0.0.1', port=6379, decode_responses=True, db=4, password='') r = redis.Redis(connection_pool=pool) # 爬虫实例限制为4 即只允许4个scrapy实例在运行 limited = 4 # 声明redis的乐观锁 lock = r.Lock() # lock.acquire中有while循环,即它会线程阻塞,直到当前线程获得redis的lock,才会继续往下执行代码 if lock.acquire(): # 1、从reids中取一条爬虫信息 info = redis.get() # 2、while循环监听爬虫运行的数量 while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() # 统计当前有多少个爬虫在运行 running = req.get('running') + req.get('pending') # 3、判断是否等待还是要增加爬虫数量 # 3.1 如果在运行的数量大于等于设置到量 则继续等待 if running >= limited: continue # 3.2 如果小于 则启动爬虫 start_scrapy(info) # 3.3 将info从redis中删除 redis.delete(info) # 3.4 释放锁 lock.release() break
目前,這只是偽程式碼而已,實際的業務邏輯可能是非常複雜的,如:
@shared_task def scrapy_control(key_uuid): r = redis.Redis(connection_pool=pool) db = MysqlDB() speed_limited = db.fetch_config('REPTILE_SPEED') speed_limited = int(speed_limited[0]) keywords_num = MysqlDB().fetch_config('SEARCH_RANDOM') keywords_num = int(keywords_num[0]) # while True: lock = r.lock('lock') with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 进入处理环节' + '\n') try: # acquire默认阻塞 如果获取不到锁时 会一直阻塞在这个函数的while循环中 if lock.acquire(): with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁' + '\n') # 1 从redis中获取信息 redis_obj = json.loads(r.get(key_uuid)) user_id = redis_obj.get('user_id') contents = redis_obj.get('contents') # 2 使用while循环处理核心逻辑 is_hold_print = True while True: req = requests.get('http://127.0.0.1:6800/daemonstatus.json').json() running = req.get('running') + req.get('pending') # 3 如果仍然有足够的爬虫在运行 则hold住redis锁,等待有空余的爬虫位置让出 if running >= speed_limited: if is_hold_print: with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 爬虫在运行,线程等待中' + '\n') is_hold_print = False time.sleep(1) continue # 4 有空余的爬虫位置 则往下走 # 4.1 处理完所有的内容后 释放锁 if len(contents) == 0: r.delete(key_uuid) with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 任务已完成,从redis中删除' + '\n') lock.release() with open('log/celery/info.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 释放锁' + '\n') break # 4.2 创建task任务 task_uuid = str(uuid.uuid4()) article_obj = contents.pop() article_id = article_obj.get('article_id') article = article_obj.get('content') try: Task.objects.create( task_uuid = task_uuid, user_id = user_id, article_id = article_id, content = article ) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + '->' + str(task_uuid) + ' 创建Task出错: ' + str(e) + '\n') # finally: # 4.3 启动爬虫任务 即便创建task失败也会启动 try: task_chain(user_id, article, task_uuid, keywords_num) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 启动任务链失败: ' + str(e) + '\n') # 加入sleep 防止代码执行速度快于爬虫启动速度而导致当前线程启动额外的爬虫 time.sleep(5) except Exception as e: with open('log/celery/error.log', 'a') as f: f.write(str(datetime.datetime.now()) + '--' + str(key_uuid) + ' 获得锁之后的操作出错: ' + str(e) + '\n') lock.release()
小坑
scrapy啟動速度相對較慢,所以while循環中,程式碼中執行到了爬蟲的啟動,需要sleep一下再去透過scrapyd介面取得爬蟲運行的數量,如果立刻讀取,可能會造成誤判。
以上是基於redis樂觀鎖怎麼實現並發排隊的詳細內容。更多資訊請關注PHP中文網其他相關文章!