python 爬虫 多线程用queue做队列,消费者线程无法从queue中取出数据。
高洛峰
高洛峰 2017-04-18 09:26:42
0
2
456
高洛峰
高洛峰

拥有18年软件开发和IT教学经验。曾任多家上市公司技术总监、架构师、项目经理、高级软件工程师等职务。 网络人气名人讲师,...

reply all(2)
大家讲道理

Consumer join, try it, and then you judge the conditions. If the queue is empty, the loop breaks

阿神

If self.Queue is empty, self.Queue.get() will throw a Queue.Empty exception. At this time, the thread will exit and there will be no working threads.
Add try...except in the while loop of the run() method and try to catch the exception. The code is roughly as follows:

while True:
    try:
       self.Queue.get(timeout=5) #这里的timeout可以根据情况设置为合适的值
    except Queue.Empty:  # 任务队列空的时候结束此线程
        break
    except:
        raise
        

==================================================== ======================
First of all: As for the problem you said that mysql does not support multi-threaded writing, I simply wrote a verification program and concluded that mysql supports it. Multi-thread writing (note: in actual work, writing data to the db in multi-threads requires adding a lock mechanism, which is simplified here), the code is as follows:


#coding: utf-8

import MySQLdb
import MySQLdb.cursors
import threading

class MySql(object):
    def __init__(self, host, user, pwd, db_name, port=3306):
        self.host = host
        self.user = user
        self.pwd = pwd
        self.db_name = db_name
        self.port = port
        self.conn = None
        self.cursor = None
        self.connect()

    def connect(self):
        try:
            self.conn = MySQLdb.connect(host=self.host, user=self.user, passwd=self.pwd, db=self.db_name,
                                        port=self.port)
            self.cursor = self.conn.cursor(cursorclass=MySQLdb.cursors.DictCursor)
        except Exception, err:
            print 'connect: %s' % err
        return self.conn
            
    def execute(self, sql):
        rs = ()
        try:
            self.cursor.execute(sql)
            rs = self.cursor.fetchall()
        except Exception, err:
            pass
        return rs

    def exec_and_result(self, sql):
        ret_id = -1
        try:
            self.cursor.execute(sql)
            self.conn.commit()
            ret_id = self.cursor.lastrowid
        except Exception, err:
            print 'exec_and_result: %s' % err
        return ret_id

    def close(self):
        try:
            self.cursor.close()
            self.conn.close()
        except Exception, err:
            pass

db = {
    'ip': 'xxx.xxx.xxx.xxx',
    'port': xxx,
    'user': 'xxx',
    'pwd': 'xxx',
    'db_name': 'xxx'
}             
mysql = MySql(db['ip'], db['user'], db['pwd'], db['db_name'], int(db['port']))
threads = []

def do(name):
    sql = "insert into site(name, status, create_time, update_time, update_user_account, comment) values('{0}', 0, NOW(), NOW(), 'daiyapeng', 'test');"
    rid = mysql.exec_and_result(sql.format(name))
    print rid

for i in ['test-0','test-1','test-2']:
    t = threading.Thread(target=do, args=(i, ))
    threads.append(t)

for t in threads:
    t.start()

for t in threads:
    t.join()
            
mysql.close()
            
            
            

In addition: Because I don’t know the specific details of your code, I can’t completely locate the problem. I wrote a simulation program myself, and it didn’t happen to you. I hope it will be helpful to you. The code is as follows:


#coding: utf-8

import Queue
import threading

class MyThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.queue = queue

    def run(self):
        while True:
            try:
                task = self.queue.get(timeout=2)
                print 'task: %s' % task
                # 这里可以处理task
            except Exception, err:
                break

if __name__ == '__main__':
    threads = []
    q = Queue.Queue()
    for i in xrange(3):
        thread = MyThread(q)
        threads.append(thread)

    for t in threads:
        t.start()

    for i in xrange(30):
        q.put(i)
    
    for t in threads:
        t.join()

    print '====== done ======'      
            
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template