分治(pide&Conquer),參考大數據演算法:對5億資料進行排序
對這個一個500000000行的total.txt 進行排序,該檔案大小4.6G。
每讀10000行就排序並寫入到一個新的子檔案裡(這裡使用的是快速排序)。
#!/usr/bin/python2.7 import time def readline_by_yield(bfile): with open(bfile, 'r') as rf: for line in rf: yield line def quick_sort(lst): if len(lst) < 2: return lst pivot = lst[0] left = [ ele for ele in lst[1:] if ele < pivot ] right = [ ele for ele in lst[1:] if ele >= pivot ] return quick_sort(left) + [pivot,] + quick_sort(right) def split_bfile(bfile): count = 0 nums = [] for line in readline_by_yield(bfile): num = int(line) if num not in nums: nums.append(num) if 10000 == len(nums): nums = quick_sort(nums) with open('subfile/subfile{}.txt'.format(count+1),'w') as wf: wf.write('\n'.join([ str(i) for i in nums ])) nums[:] = [] count += 1 print count now = time.time() split_bfile('total.txt') run_t = time.time()-now print 'Runtime : {}'.format(run_t)
會產生 50000 個小文件,每個小文件大小約在 96K左右。
程式在執行過程中,記憶體佔用一直處在 5424kB #左右
#整個檔案分割完耗時
94146
秒。
2.合併#!/usr/bin/python2.7 # -*- coding: utf-8 -*- import os import time testdir = '/ssd/subfile' now = time.time() # Step 1 : 获取全部文件描述符 fds = [] for f in os.listdir(testdir): ff = os.path.join(testdir,f) fds.append(open(ff,'r')) # Step 2 : 每个文件获取第一行,即当前文件最小值 nums = [] tmp_nums = [] for fd in fds: num = int(fd.readline()) tmp_nums.append(num) # Step 3 : 获取当前最小值放入暂存区,并读取对应文件的下一行;循环遍历。 count = 0 while 1: val = min(tmp_nums) nums.append(val) idx = tmp_nums.index(val) next = fds[idx].readline() # 文件读完了 if not next: del fds[idx] del tmp_nums[idx] else: tmp_nums[idx] = int(next) # 暂存区保存1000个数,一次性写入硬盘,然后清空继续读。 if 1000 == len(nums): with open('final_sorted.txt','a') as wf: wf.write('\n'.join([ str(i) for i in nums ]) + '\n') nums[:] = [] if 499999999 == count: break count += 1 with open('runtime.txt','w') as wf: wf.write('Runtime : {}'.format(time.time()-now))
240M左右
跑了38個小時左右,才合併完不到5千萬行資料...
雖然降低了記憶體使用,但時間複雜度太高了;可以透過減少檔案數(每個小檔案儲存行數增加)來進一步降低記憶體使用。
問題二:一個檔案有一千億行數據,每行是IP位址,需要對IP位址進行排序。 IP位址轉換成數字# 方法一:手动计算 In [62]: ip Out[62]: '10.3.81.150' In [63]: ip.split('.')[::-1] Out[63]: ['150', '81', '3', '10'] In [64]: [ '{}-{}'.format(idx,num) for idx,num in enumerate(ip.split('.')[::-1]) ] Out[64]: ['0-150', '1-81', '2-3', '3-10'] In [65]: [256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])] Out[65]: [150, 20736, 196608, 167772160] In [66]: sum([256**idx*int(num) for idx,num in enumerate(ip.split('.')[::-1])]) Out[66]: 167989654 In [67]: # 方法二:使用C扩展库来计算 In [71]: import socket,struct In [72]: socket.inet_aton(ip) Out[72]: b'\n\x03Q\x96' In [73]: struct.unpack("!I", socket.inet_aton(ip)) # !表示使用网络字节顺序解析, 后面的I表示unsigned int, 对应Python里的integer or long Out[73]: (167989654,) In [74]: struct.unpack("!I", socket.inet_aton(ip))[0] Out[74]: 167989654 In [75]: socket.inet_ntoa(struct.pack("!I", 167989654)) Out[75]: '10.3.81.150' In [76]:
基本概念
:迭代讀大文件,把大文件分割成多個小文件;最後再歸併這些小文件。分割的規則
:迭代讀取大文件,記憶體中維護字典,key是字串,value是該字串出現的次數;
當字典維護的字串種類達到10000(可自訂)的時候,把該字典依照key從小到大排序
,然後寫入小文件,每行是key\tvalue; 然後清空字典,繼續往下讀,直到大檔案讀完。歸併的規則:
首先取得全部小檔案的檔案描述子
,然後各自讀出第一行(即每個小檔案字串ascii值最小的字串),進行比較。 找出ascii值最小的字串,如果有重複的,這把各自出現的次數累加起來,然後把當前字串和總次數儲存到記憶體中的一個列表。 接著把最小字串所在的檔案的讀取指標向下移,也就是從對應小檔案再讀出一行進行下一輪比較。 當記憶體中的列表個數達到10000時,則一次把該列表內容寫到一個最終檔案儲存到硬碟上。同時清空列表,進行之後的比較。, | 最後迭代去讀這個最終文件,找出重複次數最多的即可。 | 1. 分割 | 2. 歸併 | 歸併結果分析: | |
分割時記憶體中維護的字典大小 | 分割的小檔案個數 | 歸併時需維護的檔案描述子數 | 歸併時記憶體佔用 | 歸併耗時 | |
10000 | 9000 | 9000 ~ 0 | 200M | 歸併速度慢,暫未統計完成時間 |
900
900 ~ 027M######歸併速度快,只需2572秒#################3. 找出出現次數最多的字串及其次數######import time def read_line(filepath): with open(filepath,'r') as rf: for line in rf: yield line start_ts = time.time() max_str = None max_count = 0 for line in read_line('merged.txt'): string,count = line.strip().split('\t') if int(count) > max_count: max_count = int(count) max_str = string print(max_str,max_count) print('Runtime {}'.format(time.time()-start_ts))
以上是一文了解大文件排序/外存排序問題的詳細內容。更多資訊請關注PHP中文網其他相關文章!