


Verwendung von Python zur Implementierung des Multithread-Crawlings von Zhihu-Benutzern
需要用到的包:
beautifulsoup4
html5lib
image
requests
redis
PyMySQL
pip安装所有依赖包:
pip install \ Image \ requests \ beautifulsoup4 \ html5lib \ redis \ PyMySQL
运行环境需要支持中文
测试运行环境python3.5,不保证其他运行环境能完美运行
需要安装mysql和redis
配置
config.ini
文件,设置好mysql和redis,并且填写你的知乎帐号
向数据库导入
init.sql
Run
开始抓取数据:
python get_user.py
查看抓取数量:
python check_redis.py
效果
总体思路
1.首先是模拟登陆知乎,利用保存登陆的cookie信息
2.抓取知乎页面的html代码,留待下一步继续进行分析提取信息
3.分析提取页面中用户的个性化url,放入redis(这里特别说明一下redis的思路用法,将提取到的用户的个性化url放入redis的一个名为already_get_user的hash table,表示已抓取的用户,对于已抓取过的用户判断是否存在于already_get_user以去除重复抓取,同时将个性化url放入user_queue的队列中,需要抓取新用户时pop队列获取新的用户)
4.获取用户的关注列表和粉丝列表,继续插入到redis
5.从redis的user_queue队列中获取新用户继续重复步骤3
模拟登陆知乎
首先是登陆,登陆功能作为一个包封装了在login里面,方便整合调用
header部分,这里Connection最好设为close,不然可能会碰到max retireve exceed的错误
原因在于普通的连接是keep-alive的但是却又没有关闭
# http请求的header headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.143 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Host": "www.zhihu.com", "Referer": "https://www.zhihu.com/", "Origin": "https://www.zhihu.com/", "Upgrade-Insecure-Requests": "1", "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "Pragma": "no-cache", "Accept-Encoding": "gzip, deflate, br", 'Connection': 'close' } # 验证是否登陆 def check_login(self): check_url = 'https://www.zhihu.com/settings/profile' try: login_check = self.__session.get(check_url, headers=self.headers, timeout=35) except Exception as err: print(traceback.print_exc()) print(err) print("验证登陆失败,请检查网络") sys.exit() print("验证登陆的http status code为:" + str(login_check.status_code)) if int(login_check.status_code) == 200: return True else: return False
进入首页查看http状态码来验证是否登陆,200为已经登陆,一般304就是被重定向所以就是没有登陆
# 获取验证码 def get_captcha(self): t = str(time.time() * 1000) captcha_url = 'http://www.zhihu.com/captcha.gif?r=' + t + "&type=login" r = self.__session.get(captcha_url, headers=self.headers, timeout=35) with open('captcha.jpg', 'wb') as f: f.write(r.content) f.close() # 用pillow 的 Image 显示验证码 # 如果没有安装 pillow 到源代码所在的目录去找到验证码然后手动输入 '''try: im = Image.open('captcha.jpg') im.show() im.close() except:''' print(u'请到 %s 目录找到captcha.jpg 手动输入' % os.path.abspath('captcha.jpg')) captcha = input("请输入验证码\n>") return captcha
获取验证码的方法。当登录次数太多有可能会要求输入验证码,这里实现这个功能
# 获取xsrf def get_xsrf(self): index_url = 'http://www.zhihu.com' # 获取登录时需要用到的_xsrf try: index_page = self.__session.get(index_url, headers=self.headers, timeout=35) except: print('获取知乎页面失败,请检查网络连接') sys.exit() html = index_page.text # 这里的_xsrf 返回的是一个list BS = BeautifulSoup(html, 'html.parser') xsrf_input = BS.find(attrs={'name': '_xsrf'}) pattern = r'value=\"(.*?)\"' print(xsrf_input) self.__xsrf = re.findall(pattern, str(xsrf_input)) return self.__xsrf[0]
获取xsrf,为什么要获取xsrf呢,因为xsrf是一种防止跨站攻击的手段,具体介绍可以看这里csrf
在获取到xsrf之后把xsrf存入cookie当中,并且在调用api的时候带上xsrf作为头部,不然的话知乎会返回403
# 进行模拟登陆 def do_login(self): try: # 模拟登陆 if self.check_login(): print('您已经登录') return else: if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "password"): self.username = self.config.get("zhihu_account", "username") self.password = self.config.get("zhihu_account", "password") else: self.username = input('请输入你的用户名\n> ') self.password = input("请输入你的密码\n> ") except Exception as err: print(traceback.print_exc()) print(err) sys.exit() if re.match(r"^1\d{10}$", self.username): print("手机登陆\n") post_url = 'http://www.zhihu.com/login/phone_num' postdata = { '_xsrf': self.get_xsrf(), 'password': self.password, 'remember_me': 'true', 'phone_num': self.username, } else: print("邮箱登陆\n") post_url = 'http://www.zhihu.com/login/email' postdata = { '_xsrf': self.get_xsrf(), 'password': self.password, 'remember_me': 'true', 'email': self.username, } try: login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35) login_text = json.loads(login_page.text.encode('latin-1').decode('unicode-escape')) print(postdata) print(login_text) # 需要输入验证码 r = 0为登陆成功代码 if login_text['r'] == 1: sys.exit() except: postdata['captcha'] = self.get_captcha() login_page = self.__session.post(post_url, postdata, headers=self.headers, timeout=35) print(json.loads(login_page.text.encode('latin-1').decode('unicode-escape'))) # 保存登陆cookie self.__session.cookies.save()
这个就是核心的登陆功能啦,非常关键的就是用到了requests库,非常方便的保存到session
我们这里全局都是用单例模式,统一使用同一个requests.session对象进行访问功能,保持登录状态的一致性
最后主要调用登陆的代码为
# 创建login对象 lo = login.login.Login(self.session) # 模拟登陆 if lo.check_login(): print('您已经登录') else: if self.config.get("zhihu_account", "username") and self.config.get("zhihu_account", "username"): username = self.config.get("zhihu_account", "username") password = self.config.get("zhihu_account", "password") else: username = input('请输入你的用户名\n> ') password = input("请输入你的密码\n> ") lo.do_login(username, password)
知乎模拟登陆到此就完成啦
知乎用户抓取
def __init__(self, threadID=1, name=''): # 多线程 print("线程" + str(threadID) + "初始化") threading.Thread.__init__(self) self.threadID = threadID self.name = name try: print("线程" + str(threadID) + "初始化成功") except Exception as err: print(err) print("线程" + str(threadID) + "开启失败") self.threadLock = threading.Lock() # 获取配置 self.config = configparser.ConfigParser() self.config.read("config.ini") # 初始化session requests.adapters.DEFAULT_RETRIES = 5 self.session = requests.Session() self.session.cookies = cookielib.LWPCookieJar(filename='cookie') self.session.keep_alive = False try: self.session.cookies.load(ignore_discard=True) except: print('Cookie 未能加载') finally: pass # 创建login对象 lo = Login(self.session) lo.do_login() # 初始化redis连接 try: redis_host = self.config.get("redis", "host") redis_port = self.config.get("redis", "port") self.redis_con = redis.Redis(host=redis_host, port=redis_port, db=0) # 刷新redis库 # self.redis_con.flushdb() except: print("请安装redis或检查redis连接配置") sys.exit() # 初始化数据库连接 try: db_host = self.config.get("db", "host") db_port = int(self.config.get("db", "port")) db_user = self.config.get("db", "user") db_pass = self.config.get("db", "password") db_db = self.config.get("db", "db") db_charset = self.config.get("db", "charset") self.db = pymysql.connect(host=db_host, port=db_port, user=db_user, passwd=db_pass, db=db_db, charset=db_charset) self.db_cursor = self.db.cursor() except: print("请检查数据库配置") sys.exit() # 初始化系统设置 self.max_queue_len = int(self.config.get("sys", "max_queue_len"))
这个是get_user.py的构造函数,主要功能就是初始化mysql连接、redis连接、验证登陆、生成全局的session对象、导入系统配置、开启多线程。
# 获取首页html def get_index_page(self): index_url = 'https://www.zhihu.com/' try: index_html = self.session.get(index_url, headers=self.headers, timeout=35) except Exception as err: # 出现异常重试 print("获取页面失败,正在重试......") print(err) traceback.print_exc() return None finally: pass return index_html.text # 获取单个用户详情页面 def get_user_page(self, name_url): user_page_url = 'https://www.zhihu.com' + str(name_url) + '/about' try: index_html = self.session.get(user_page_url, headers=self.headers, timeout=35) except Exception as err: # 出现异常重试 print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户") print(err) traceback.print_exc() return None finally: pass return index_html.text # 获取粉丝页面 def get_follower_page(self, name_url): user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers' try: index_html = self.session.get(user_page_url, headers=self.headers, timeout=35) except Exception as err: # 出现异常重试 print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户") print(err) traceback.print_exc() return None finally: pass return index_html.text def get_following_page(self, name_url): user_page_url = 'https://www.zhihu.com' + str(name_url) + '/followers' try: index_html = self.session.get(user_page_url, headers=self.headers, timeout=35) except Exception as err: # 出现异常重试 print("失败name_url:" + str(name_url) + "获取页面失败,放弃该用户") print(err) traceback.print_exc() return None finally: pass return index_html.text # 获取首页上的用户列表,存入redis def get_index_page_user(self): index_html = self.get_index_page() if not index_html: return BS = BeautifulSoup(index_html, "html.parser") self.get_xsrf(index_html) user_a = BS.find_all("a", class_="author-link") # 获取用户的a标签 for a in user_a: if a: self.add_wait_user(a.get('href')) else: continue
这一部分的代码就是用于抓取各个页面的html代码
# 加入带抓取用户队列,先用redis判断是否已被抓取过 def add_wait_user(self, name_url): # 判断是否已抓取 self.threadLock.acquire() if not self.redis_con.hexists('already_get_user', name_url): self.counter += 1 print(name_url + " 加入队列") self.redis_con.hset('already_get_user', name_url, 1) self.redis_con.lpush('user_queue', name_url) print("添加用户 " + name_url + "到队列") self.threadLock.release() # 获取页面出错移出redis def del_already_user(self, name_url): self.threadLock.acquire() if not self.redis_con.hexists('already_get_user', name_url): self.counter -= 1 self.redis_con.hdel('already_get_user', name_url) self.threadLock.release()
用户加入redis的操作,在数据库插入出错时我们调用del_already_user删除插入出错的用户
# 分析粉丝页面获取用户的所有粉丝用户 # @param follower_page get_follower_page()中获取到的页面,这里获取用户hash_id请求粉丝接口获取粉丝信息 def get_all_follower(self, name_url): follower_page = self.get_follower_page(name_url) # 判断是否获取到页面 if not follower_page: return BS = BeautifulSoup(follower_page, 'html.parser') # 获取关注者数量 follower_num = int(BS.find('span', text='关注者').find_parent().find('strong').get_text()) # 获取用户的hash_id hash_id = \ json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[ 'params'][ 'hash_id'] # 获取关注者列表 self.get_xsrf(follower_page) # 获取xsrf post_url = 'https://www.zhihu.com/node/ProfileFollowersListV2' # 开始获取所有的关注者 math.ceil(follower_num/20)*20 for i in range(0, math.ceil(follower_num / 20) * 20, 20): post_data = { 'method': 'next', 'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id}) } try: j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode( 'latin-1').decode( 'unicode-escape') pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL) j = pattern.findall(j) for user in j: user = user.replace('\\', '') self.add_wait_user(user) # 保存到redis except Exception as err: print("获取正在关注失败") print(err) traceback.print_exc() pass # 获取正在关注列表 def get_all_following(self, name_url): following_page = self.get_following_page(name_url) # 判断是否获取到页面 if not following_page: return BS = BeautifulSoup(following_page, 'html.parser') # 获取关注者数量 following_num = int(BS.find('span', text='关注了').find_parent().find('strong').get_text()) # 获取用户的hash_id hash_id = \ json.loads(BS.select("#zh-profile-follows-list")[0].select(".zh-general-list")[0].get('data-init'))[ 'params'][ 'hash_id'] # 获取关注者列表 self.get_xsrf(following_page) # 获取xsrf post_url = 'https://www.zhihu.com/node/ProfileFolloweesListV2' # 开始获取所有的关注者 math.ceil(follower_num/20)*20 for i in range(0, math.ceil(following_num / 20) * 20, 20): post_data = { 'method': 'next', 'params': json.dumps({"offset": i, "order_by": "created", "hash_id": hash_id}) } try: j = self.session.post(post_url, params=post_data, headers=self.headers, timeout=35).text.encode( 'latin-1').decode( 'unicode-escape') pattern = re.compile(r"class=\"zm-item-link-avatar\"[^\"]*\"([^\"]*)", re.DOTALL) j = pattern.findall(j) for user in j: user = user.replace('\\', '') self.add_wait_user(user) # 保存到redis except Exception as err: print("获取正在关注失败") print(err) traceback.print_exc() pass
调用知乎的API,获取所有的关注用户列表和粉丝用户列表,递归获取用户
这里需要注意的是头部要记得带上xsrf不然会抛出403
# 分析about页面,获取用户详细资料 def get_user_info(self, name_url): about_page = self.get_user_page(name_url) # 判断是否获取到页面 if not about_page: print("获取用户详情页面失败,跳过,name_url:" + name_url) return self.get_xsrf(about_page) BS = BeautifulSoup(about_page, 'html.parser') # 获取页面的具体数据 try: nickname = BS.find("a", class_="name").get_text() if BS.find("a", class_="name") else '' user_type = name_url[1:name_url.index('/', 1)] self_domain = name_url[name_url.index('/', 1) + 1:] gender = 2 if BS.find("i", class_="icon icon-profile-female") else (1 if BS.find("i", class_="icon icon-profile-male") else 3) follower_num = int(BS.find('span', text='关注者').find_parent().find('strong').get_text()) following_num = int(BS.find('span', text='关注了').find_parent().find('strong').get_text()) agree_num = int(re.findall(r'<strong>(.*)</strong>.*赞同', about_page)[0]) appreciate_num = int(re.findall(r'<strong>(.*)</strong>.*感谢', about_page)[0]) star_num = int(re.findall(r'<strong>(.*)</strong>.*收藏', about_page)[0]) share_num = int(re.findall(r'<strong>(.*)</strong>.*分享', about_page)[0]) browse_num = int(BS.find_all("span", class_="zg-gray-normal")[2].find("strong").get_text()) trade = BS.find("span", class_="business item").get('title') if BS.find("span", class_="business item") else '' company = BS.find("span", class_="employment item").get('title') if BS.find("span", class_="employment item") else '' school = BS.find("span", class_="education item").get('title') if BS.find("span", class_="education item") else '' major = BS.find("span", class_="education-extra item").get('title') if BS.find("span", class_="education-extra item") else '' job = BS.find("span", class_="position item").get_text() if BS.find("span", class_="position item") else '' location = BS.find("span", class_="location item").get('title') if BS.find("span", class_="location item") else '' description = BS.find("p", class_="bio ellipsis").get('title') if BS.find("p", class_="bio ellipsis") else '' ask_num = int(BS.find_all("a", class_='item')[1].find("span").get_text()) if \ BS.find_all("a", class_='item')[ 1] else int(0) answer_num = int(BS.find_all("a", class_='item')[2].find("span").get_text()) if \ BS.find_all("a", class_='item')[ 2] else int(0) article_num = int(BS.find_all("a", class_='item')[3].find("span").get_text()) if \ BS.find_all("a", class_='item')[3] else int(0) collect_num = int(BS.find_all("a", class_='item')[4].find("span").get_text()) if \ BS.find_all("a", class_='item')[4] else int(0) public_edit_num = int(BS.find_all("a", class_='item')[5].find("span").get_text()) if \ BS.find_all("a", class_='item')[5] else int(0) replace_data = \ (pymysql.escape_string(name_url), nickname, self_domain, user_type, gender, follower_num, following_num, agree_num, appreciate_num, star_num, share_num, browse_num, trade, company, school, major, job, location, pymysql.escape_string(description), ask_num, answer_num, article_num, collect_num, public_edit_num) replace_sql = '''REPLACE INTO user(url,nickname,self_domain,user_type, gender, follower,following,agree_num,appreciate_num,star_num,share_num,browse_num, trade,company,school,major,job,location,description, ask_num,answer_num,article_num,collect_num,public_edit_num) VALUES(%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s,%s,%s, %s,%s,%s,%s,%s)''' try: print("获取到数据:") print(replace_data) self.db_cursor.execute(replace_sql, replace_data) self.db.commit() except Exception as err: print("插入数据库出错") print("获取到数据:") print(replace_data) print("插入语句:" + self.db_cursor._last_executed) self.db.rollback() print(err) traceback.print_exc() except Exception as err: print("获取数据出错,跳过用户") self.redis_con.hdel("already_get_user", name_url) self.del_already_user(name_url) print(err) traceback.print_exc() pass
最后,到用户的about页面,分析页面元素,利用正则或者beatifulsoup分析抓取页面的数据
这里我们SQL语句用REPLACE INTO而不用INSERT INTO,这样可以很好的防止数据重复问题
# 开始抓取用户,程序总入口 def entrance(self): while 1: if int(self.redis_con.llen("user_queue")) <p style="text-align: left;"><br>最后,入口</p><pre class="brush:php;toolbar:false">if __name__ == '__main__': login = GetUser(999, "登陆线程") threads = [] for i in range(0, 4): m = GetUser(i, "thread" + str(i)) threads.append(m) for i in range(0, 4): threads[i].start() for i in range(0, 4): threads[i].join()
这里就是多线程的开启,需要开启多少个线程就把4换成多少就可以了
Docker
嫌麻烦的可以参考一下我用docker简单的搭建一个基础环境:
mysql和redis都是官方镜像
docker run --name mysql -itd mysql:latest docker run --name redis -itd mysql:latest
再利用docker-compose运行python镜像,我的python的docker-compose.yml:
python: container_name: python build: . ports: - "84:80" external_links: - memcache:memcache - mysql:mysql - redis:redis volumes: - /docker_containers/python/www:/var/www/html tty: true stdin_open: true extra_hosts: - "python:192.168.102.140" environment: PYTHONIOENCODING: utf-8
Das obige ist der detaillierte Inhalt vonVerwendung von Python zur Implementierung des Multithread-Crawlings von Zhihu-Benutzern. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

Video Face Swap
Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen



Lösung für Erlaubnisprobleme beim Betrachten der Python -Version in Linux Terminal Wenn Sie versuchen, die Python -Version in Linux Terminal anzuzeigen, geben Sie Python ein ...

Wie lehre ich innerhalb von 10 Stunden die Grundlagen für Computer -Anfänger für Programmierungen? Wenn Sie nur 10 Stunden Zeit haben, um Computer -Anfänger zu unterrichten, was Sie mit Programmierkenntnissen unterrichten möchten, was würden Sie dann beibringen ...

Bei der Verwendung von Pythons Pandas -Bibliothek ist das Kopieren von ganzen Spalten zwischen zwei Datenrahmen mit unterschiedlichen Strukturen ein häufiges Problem. Angenommen, wir haben zwei Daten ...

Wie kann man nicht erkannt werden, wenn Sie Fiddlereverywhere für Man-in-the-Middle-Lesungen verwenden, wenn Sie FiddLereverywhere verwenden ...

Wie hört Uvicorn kontinuierlich auf HTTP -Anfragen an? Uvicorn ist ein leichter Webserver, der auf ASGI basiert. Eine seiner Kernfunktionen ist es, auf HTTP -Anfragen zu hören und weiterzumachen ...

In dem Artikel werden beliebte Python-Bibliotheken wie Numpy, Pandas, Matplotlib, Scikit-Learn, TensorFlow, Django, Flask und Anfragen erörtert, die ihre Verwendung in wissenschaftlichen Computing, Datenanalyse, Visualisierung, maschinellem Lernen, Webentwicklung und h beschreiben

Wie erstellt in Python ein Objekt dynamisch über eine Zeichenfolge und ruft seine Methoden auf? Dies ist eine häufige Programmieranforderung, insbesondere wenn sie konfiguriert oder ausgeführt werden muss ...

Verwenden Sie Python im Linux -Terminal ...
