이 글에서는 Python의 멀티스레드 웹 페이지 크롤링 기능을 주로 소개하며, 구체적인 예제를 바탕으로 Python 멀티스레드 프로그래밍의 관련 운영 기술과 주의 사항을 자세히 분석합니다. 또한 구현 방법을 제공하는 데모 예제도 함께 제공됩니다. 멀티 스레드 웹 페이지 크롤링 , 필요한 친구는 참고할 수 있습니다
이 기사에서는 Python의 멀티 스레드 웹 페이지 크롤링 기능 구현 예를 설명합니다. 참고하실 수 있도록 모든 사람과 공유하세요. 자세한 내용은 다음과 같습니다.
최근에 웹 크롤러와 관련된 일을 하고 있습니다. 오픈소스 C++로 작성된 라빈 크롤러를 살펴보고, 디자인 아이디어와 몇 가지 핵심 기술의 구현을 주의 깊게 읽었습니다.
1. Larbin의 URL 재사용 해제는 매우 효율적인 블룸 필터 알고리즘입니다.
2. DNS 처리는 adns 비동기 오픈 소스 구성 요소를 사용합니다.
3. URL 대기열 처리의 경우 일부는 파일에 저장됩니다. 수입 전략.
4. Larbin은 파일 관련 작업을 많이 수행했습니다. 5. larbin에는 연결 풀이 있습니다. 소켓을 생성하여 대상 사이트에 HTTP 프로토콜의 GET 메서드를 보내고 콘텐츠를 가져옵니다. 그런 다음 헤더 등을 구문 분석합니다.
6. 폴링 방법을 통해 I/O 다중화에 많은 수의 설명자가 사용됩니다. 이는 매우 효율적입니다. 7. Larbin은 매우 구성 가능합니다. 작가님이 밑바닥부터 직접 작성하셔서 기본적으로 쓸모가 없는 것들이 많아요.
지난 이틀 동안 저는 Python으로 멀티스레드 페이지 다운로드 프로그램을 작성했습니다. I/O 집약적인 애플리케이션의 경우 멀티스레딩이 확실히 좋은 솔루션입니다. 방금 작성한 스레드 풀도 사용할 수 있습니다. 실제로 Python을 사용하여 페이지를 크롤링하는 것은 매우 간단합니다. 사용하기 매우 편리하고 기본적으로 두세 줄의 코드로 수행할 수 있는 urllib2 모듈이 있습니다. 타사 모듈을 사용하면 문제를 매우 편리하게 해결할 수 있지만 핵심 알고리즘은 자신이 아닌 다른 사람이 구현하므로 세부 사항을 많이 알 수 없기 때문에 개인 기술 축적에는 도움이 되지 않습니다. 기술 전문가로서 우리는 다른 사람이 작성한 모듈이나 API를 사용할 수 없으며 더 많은 것을 배울 수 있도록 직접 구현해야 합니다.
저는 GET 프로토콜을 캡슐화하고 헤더를 구문 분석하는 소켓에서 시작하기로 결정했습니다. DNS 캐싱과 같은 DNS 구문 분석 프로세스도 별도로 처리할 수 있으므로 직접 작성하면 제어가 더 용이해지고 더 좋아질 것입니다. 확장에 도움이 됩니다. 타임아웃 처리의 경우 전역 5초 타임아웃 처리를 사용합니다. 재배치(301 또는 302) 처리의 경우 최대 재배치는 3회입니다. 이전 테스트 과정에서 많은 사이트의 재배치가 자신에게 리디렉션되는 것을 발견했기 때문입니다. 무한 루프이므로 상한이 설정됩니다. 구체적인 원리는 비교적 간단합니다. 코드를 살펴보세요.
작성을 마친 후 urllib2와 성능을 비교해보니 제가 직접 작성한 글의 효율성이 상대적으로 높았고, urllib2의 오류율이 조금 더 높았던 이유는 모르겠습니다. 인터넷상의 어떤 사람들은 urllib2가 멀티스레드 환경에서 약간의 사소한 문제가 있다고 말하지만, 나는 그 세부 사항에 대해 특별히 명확하지 않습니다.
먼저 코드 게시:
fetchPage.py Http 프로토콜의 Get 메서드를 사용하여 페이지를 다운로드하고 파일로 저장합니다.
''' Created on 2012-3-13 Get Page using GET method Default using HTTP Protocol , http port 80 @author: xiaojay ''' import socket import statistics import datetime import threading socket.setdefaulttimeout(statistics.timeout) class Error404(Exception): '''Can not find the page.''' pass class ErrorOther(Exception): '''Some other exception''' def __init__(self,code): #print 'Code :',code pass class ErrorTryTooManyTimes(Exception): '''try too many times''' pass def downPage(hostname ,filename , trytimes=0): try : #To avoid too many tries .Try times can not be more than max_try_times if trytimes >= statistics.max_try_times : raise ErrorTryTooManyTimes except ErrorTryTooManyTimes : return statistics.RESULTTRYTOOMANY,hostname+filename try: s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) #DNS cache if statistics.DNSCache.has_key(hostname): addr = statistics.DNSCache[hostname] else: addr = socket.gethostbyname(hostname) statistics.DNSCache[hostname] = addr #connect to http server ,default port 80 s.connect((addr,80)) msg = 'GET '+filename+' HTTP/1.0\r\n' msg += 'Host: '+hostname+'\r\n' msg += 'User-Agent:xiaojay\r\n\r\n' code = '' f = None s.sendall(msg) first = True while True: msg = s.recv(40960) if not len(msg): if f!=None: f.flush() f.close() break # Head information must be in the first recv buffer if first: first = False headpos = msg.index("\r\n\r\n") code,other = dealwithHead(msg[:headpos]) if code=='200': #statistics.fetched_url += 1 f = open('pages/'+str(abs(hash(hostname+filename))),'w') f.writelines(msg[headpos+4:]) elif code=='301' or code=='302': #if code is 301 or 302 , try down again using redirect location if other.startswith("http") : hname, fname = parse(other) downPage(hname,fname,trytimes+1)#try again else : downPage(hostname,other,trytimes+1) elif code=='404': raise Error404 else : raise ErrorOther(code) else: if f!=None :f.writelines(msg) s.shutdown(socket.SHUT_RDWR) s.close() return statistics.RESULTFETCHED,hostname+filename except Error404 : return statistics.RESULTCANNOTFIND,hostname+filename except ErrorOther: return statistics.RESULTOTHER,hostname+filename except socket.timeout: return statistics.RESULTTIMEOUT,hostname+filename except Exception, e: return statistics.RESULTOTHER,hostname+filename def dealwithHead(head): '''deal with HTTP HEAD''' lines = head.splitlines() fstline = lines[0] code =fstline.split()[1] if code == '404' : return (code,None) if code == '200' : return (code,None) if code == '301' or code == '302' : for line in lines[1:]: p = line.index(':') key = line[:p] if key=='Location' : return (code,line[p+2:]) return (code,None) def parse(url): '''Parse a url to hostname+filename''' try: u = url.strip().strip('\n').strip('\r').strip('\t') if u.startswith('http://') : u = u[7:] elif u.startswith('https://'): u = u[8:] if u.find(':80')>0 : p = u.index(':80') p2 = p + 3 else: if u.find('/')>0: p = u.index('/') p2 = p else: p = len(u) p2 = -1 hostname = u[:p] if p2>0 : filename = u[p2:] else : filename = '/' return hostname, filename except Exception ,e: print "Parse wrong : " , url print e def PrintDNSCache(): '''print DNS dict''' n = 1 for hostname in statistics.DNSCache.keys(): print n,'\t',hostname, '\t',statistics.DNSCache[hostname] n+=1 def dealwithResult(res,url): '''Deal with the result of downPage''' statistics.total_url+=1 if res==statistics.RESULTFETCHED : statistics.fetched_url+=1 print statistics.total_url , '\t fetched :', url if res==statistics.RESULTCANNOTFIND : statistics.failed_url+=1 print "Error 404 at : ", url if res==statistics.RESULTOTHER : statistics.other_url +=1 print "Error Undefined at : ", url if res==statistics.RESULTTIMEOUT : statistics.timeout_url +=1 print "Timeout ",url if res==statistics.RESULTTRYTOOMANY: statistics.trytoomany_url+=1 print e ,"Try too many times at", url if __name__=='__main__': print 'Get Page using GET method'
''' Created on 2012-3-16 @author: xiaojay ''' import fetchPage import threadpool import datetime import statistics import urllib2 '''one thread''' def usingOneThread(limit): urlset = open("input.txt","r") start = datetime.datetime.now() for u in urlset: if limit <= 0 : break limit-=1 hostname , filename = parse(u) res= fetchPage.downPage(hostname,filename,0) fetchPage.dealwithResult(res) end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total fetched :', statistics.fetched_url '''threadpoll and GET method''' def callbackfunc(request,result): fetchPage.dealwithResult(result[0],result[1]) def usingThreadpool(limit,num_thread): urlset = open("input.txt","r") start = datetime.datetime.now() main = threadpool.ThreadPool(num_thread) for url in urlset : try : hostname , filename = fetchPage.parse(url) req = threadpool.WorkRequest(fetchPage.downPage,args=[hostname,filename],kwds={},callback=callbackfunc) main.putRequest(req) except Exception: print Exception.message while True: try: main.poll() if statistics.total_url >= limit : break except threadpool.NoResultsPending: print "no pending results" break except Exception ,e: print e end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total url :',statistics.total_url print 'Total fetched :', statistics.fetched_url print 'Lost url :', statistics.total_url - statistics.fetched_url print 'Error 404 :' ,statistics.failed_url print 'Error timeout :',statistics.timeout_url print 'Error Try too many times ' ,statistics.trytoomany_url print 'Error Other faults ',statistics.other_url main.stop() '''threadpool and urllib2 ''' def downPageUsingUrlib2(url): try: req = urllib2.Request(url) fd = urllib2.urlopen(req) f = open("pages3/"+str(abs(hash(url))),'w') f.write(fd.read()) f.flush() f.close() return url ,'success' except Exception: return url , None def writeFile(request,result): statistics.total_url += 1 if result[1]!=None : statistics.fetched_url += 1 print statistics.total_url,'\tfetched :', result[0], else: statistics.failed_url += 1 print statistics.total_url,'\tLost :',result[0], def usingThreadpoolUrllib2(limit,num_thread): urlset = open("input.txt","r") start = datetime.datetime.now() main = threadpool.ThreadPool(num_thread) for url in urlset : try : req = threadpool.WorkRequest(downPageUsingUrlib2,args=[url],kwds={},callback=writeFile) main.putRequest(req) except Exception ,e: print e while True: try: main.poll() if statistics.total_url >= limit : break except threadpool.NoResultsPending: print "no pending results" break except Exception ,e: print e end = datetime.datetime.now() print "Start at :\t" , start print "End at :\t" , end print "Total Cost :\t" , end - start print 'Total url :',statistics.total_url print 'Total fetched :', statistics.fetched_url print 'Lost url :', statistics.total_url - statistics.fetched_url main.stop() if __name__ =='__main__': '''too slow''' #usingOneThread(100) '''use Get method''' #usingThreadpool(3000,50) '''use urllib2''' usingThreadpoolUrllib2(3000,50)
실험 데이터: larbin으로 캡처한 3000개의 URL은 Mercator 대기열 모델로 처리되었습니다. (C++로 구현했으며, 확보되면 블로그에 게시하겠습니다.) 미래의 기회) 무작위적이고 대표적인 URL 모음입니다. 50개 스레드로 구성된 스레드 풀을 사용합니다. 실험 환경:
ubuntu10.04, 좋은 네트워크, python2.6저장: 작은 파일, 각 페이지, 하나의 파일 저장PS: 학교의 인터넷 접속 비용은 트래픽에 따라 부과되므로 웹 크롤러를 수행하는 데는 비용이 많이 듭니다. ! ! ! 며칠 안에 대규모 URL 다운로드 실험을 실시하고 수십만 개의 URL을 사용해 시도할 수도 있습니다.
실험 결과:
사용, ThreadpoolUrllib2(3000,50)시작 시간 : 2012-03-16 22:18:20.956054
끝 시간 : 2012-03-1 6 22:22:15.203018총 비용: 0:03:54.246964총 URL: 3001총 가져온 URL: 2442
잃어버린 URL: 559
다운로드 페이지의 물리적 저장 크기: 84088kb
나만의 getPage 사용UsingGet, usingThreadpool(3000,50)
시작 시간 : 2012-03-16 22:23:40.206730
종료 : 2012-03-16 22:26:26.843563
총 URL : 3002
총 가져온 : 2484Lost URL : 518
오류 404 : 94오류 시간 초과 : 312
Error 너무 여러 번 시도 0
Error 기타 오류 112
다운로드 페이지의 물리적 저장 크기: 87168kb
요약: 제가 직접 작성한 다운로드 페이지 프로그램은 매우 효율적이며 손실된 페이지가 적습니다. 하지만 사실 생각해보면 아직 최적화할 수 있는 곳이 많이 있습니다. 예를 들어 파일이 너무 분산되어 있기 때문에 작은 파일을 너무 많이 생성하고 릴리스하면 확실히 성능 오버헤드가 많이 발생하고 프로그램이 실행되지 않습니다. 해시 이름 지정을 사용하므로 계산 측면에서도 많은 문제가 발생합니다. 좋은 전략이 있으면 실제로 이러한 비용을 생략할 수 있습니다. DNS 외에도 Python과 함께 제공되는 DNS 확인을 사용할 필요가 없습니다. 기본 DNS 확인은 동기 작업이고 DNS 확인은 일반적으로 시간이 많이 걸리기 때문에 멀티 스레드 비동기 방식으로 수행할 수 있습니다. 방식을 적절한 DNS 캐싱과 결합하면 효율성이 크게 향상될 수 있습니다. 뿐만 아니라, 실제 페이지 크롤링 과정에서 URL의 양이 많아 한번에 메모리에 저장하는 것은 불가능하며, 대신 특정 전략이나 알고리즘에 따라 합리적으로 할당해야 합니다. 간단히 말해서, 아직 해야 할 일과 컬렉션 페이지에 최적화할 수 있는 일이 많이 있습니다.
위 내용은 Python은 멀티스레딩을 사용하여 웹페이지 정보를 크롤링합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!