场景描述:
现有许多行日志文本,按天压缩成一个个TB级的gzip文件。
使用流对每个压缩文件的数据段进行传输然后解压,对解压出的文本分词并索引
以后查到这个词时,定位到这个词所在的文件和段,再用流传输并解压
(实际上是想利用已有的压缩文件构造一个类似ES的搜索引擎)
现在的问题是,因为接收到的不是完整的压缩文件而是块状二进制数据,所以接收的数据由于信息不完全而无法解压
现在想实现这样的功能:首先将接收到的流数据解压还原为完整的数据(原始日志数据以换行符分隔,能得到每段流数据压缩前的文本和对应文件的偏移量就好),然后考虑到传输和存储等过程可能使数据出错,所以针对每段数据流,在出错的情况下解压出尽可能多的数据。
部分相关代码如下:(改自https://stackoverflow.com/que...)
import zlib
import traceback
CHUNKSIZE=30
d = zlib.decompressobj(16 + zlib.MAX_WBITS)
f = open('test.py.gz','rb')
buffer = f.read(CHUNKSIZE)
i = 0
while buffer :
i += 1
try:
#skip two chunk
if i < 3 or i > 4:
outstr = d.decompress(buffer)
print('*'*10 + outstr + '#'*10)
except Exception, e:
print(traceback.print_exc())
finally:
buffer = f.read(CHUNKSIZE)
outstr = d.flush()
print(outstr)
f.close()
当i>=3以后,每次循环均报错
我的结论是 若流不连续(跳过接收部分数据),则之后的数据都无法解压。
问题1:如果做到能正确的解压出收到的每部分数据?(因为可能牵涉到gzip压缩的算法和数据结构,我正在看相关代码。如果可以通过追加传输头部的某一chuck或者需要解压的数据的前后某些chuck能解决问题也算可以)
问题2:
如果不能正确的解压接收到的每部分数据,那么如何做到解压出尽可能多的数据?
我觉得可以做一个出错重新续传的功能,传输前备份当前这一段数据流,你得判断出当前传输的这一段数据流是否传输完整了。这就要求传送端和接收端之间的传输协议是你能改动的,出现错误就立刻反馈fail给传输端,从刚才这段重新续传,没有错误就反馈OK,继续传输下一段。这样就能保证数据的完整性。如果文件太大,可以在内存中备份多些数据段,做些细节性的判断。
不太确定你描述的问题,不过在stackoverflow 有些问答或许有帮助。
How can I decompress a gzip stream with zlib?
Python decompressing gzip chunk-by-chunk