場景描述:
现有许多行日志文本,按天压缩成一个个TB级的gzip文件。
使用流对每个压缩文件的数据段进行传输然后解压,对解压出的文本分词并索引
以后查到这个词时,定位到这个词所在的文件和段,再用流传输并解压
(实际上是想利用已有的压缩文件构造一个类似ES的搜索引擎)
現在的問題是,因為接收到的不是完整的壓縮檔案而是塊狀二進位數據,所以接收的數據由於資訊不完全而無法解壓縮
現在想實現這樣的功能:首先將接收到的流資料解壓縮還原為完整的資料(原始日誌資料以換行符分隔,能得到每段流資料壓縮前的文字和對應文件的偏移量就好),然後考慮到傳輸和儲存等過程可能會使資料出錯,所以針對每段資料流,在出錯的情況下解壓縮出盡可能多的資料。
部分相關程式碼如下:(改自https://stackoverflow.com/que...)
import zlib
import traceback
CHUNKSIZE=30
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
f = open('test.py.gz','rb')
buffer = f.read(CHUNKSIZE)
i = 0
while buffer :
i += 1
try:
#skip two chunk
if i < 3 or i > 4:
outstr = d.decompress(buffer)
print('*'*10 + outstr + '#'*10)
except Exception, e:
print(traceback.print_exc())
finally:
buffer = f.read(CHUNKSIZE)
outstr = d.flush()
print(outstr)
f.close()
當i>=3以後,每次循環均報錯
我的結論是 若流不連續(跳過接收部分資料),則之後的資料都無法解壓縮。
問題1:如果做到能正確的解壓縮出收到的每部分資料? (因為可能牽涉到gzip壓縮的演算法和資料結構,我正在看相關程式碼。如果可以透過追加傳輸頭部的某一chuck或需要解壓縮的資料的前後某些chuck能解決問題也算可以)
問題2:
如果無法正確的解壓縮接收到的每部分數據,那麼如何做到解壓縮出盡可能多的數據?
我覺得可以做一個出錯重新續傳的功能,傳輸前備份當前這一段資料流,你得判斷出當前傳輸的這一段資料流是否傳輸完整了。這就要求傳送端和接收端之間的傳輸協定是你能改動的,出現錯誤就立刻反饋fail給傳輸端,從剛才這段重新續傳,沒有錯誤就反饋OK,繼續傳輸下一段。這樣就能保證資料的完整性。如果檔案太大,可以在記憶體中備份多些資料段,做些細節性的判斷。
不太確定你描述的問題,不過在stackoverflow 有些問答或許有幫助。
How can I decompress a gzip stream with zlib?
Python decompressing gzip chunk-by-chunk