以前,我能够毫无问题地为我的搜索引擎抓取网页并为其建立索引,直到我的数据库增长超过 RabbitMQ 消息队列的容纳能力。如果消息队列中的消息超过其默认大小,RabbitMQ 会抛出错误并引发恐慌,我可以更改默认大小,但如果我的数据库增长,则不会扩展,因此为了让用户抓取网页而不必担心消息代理崩溃了。
我已经实现了一个函数,用于创建具有最大段大小或 MSS 的段,其原理与创建段时 TCP 的原理相同,该段包含一个 8 字节标头,其中 8 字节标头中的每个 4 字节是序列号,分段总数,主体的其余部分是分段数据库的有效负载。
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
这种创建大型数据集的小片段的方法将有助于扩展数据库查询,即使数据库增长也是如此。
现在搜索引擎如何解析缓冲区并将每个段转换为网页数组?
首先提取段标头,因为标头包含 2 个属性,即序列号和总段数,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
序列号将用于分段的重传/重新排队,因此如果预期的序列号不是收到的序列号,则从当前分段开始重新排队每个分段。
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
如果搜索引擎接收到的段总数等于数据库服务要发送的总段长度,则总段将用于中断监听生产者(数据库服务)然后中断并解析聚合的段缓冲区,如果不是,则继续侦听并将段有效负载缓冲区附加到网页缓冲区以保存来自所有传入段的字节。
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
顺便说一句,我使用 vim
感谢您参加我的 ted 演讲,我将为 zensearch 实现更多功能和修复。
以上是扩展 Zensearch 的能力来查询整个数据库的详细内容。更多信息请关注PHP中文网其他相关文章!