以前,我能夠毫無問題地為我的搜尋引擎抓取網頁並為其建立索引,直到我的資料庫增長超過 RabbitMQ 訊息佇列的容納能力。如果訊息佇列中的訊息超過其預設大小,RabbitMQ 會拋出錯誤並引發恐慌,我可以更改預設大小,但如果我的資料庫成長,則不會擴展,因此為了讓使用者抓取網頁而不必擔心訊息代理崩潰了。
我已經實現了一個函數,用於創建具有最大段大小或MSS 的段,其原理與創建段時TCP 的原理相同,該段包含一個8 字節標頭,其中8 字節標頭中的每個4 位元組是序號,分段總數,主體的其餘部分是分段資料庫的有效負載。
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
這種建立大型資料集的小片段的方法將有助於擴展資料庫查詢,即使資料庫成長也是如此。
現在搜尋引擎如何解析緩衝區並將每個段轉換為網頁數組?
先擷取段標頭,因為標頭包含 2 個屬性,即序號和總段數,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
序號將用於分段的重傳/重新排隊,因此如果預期的序號不是收到的序號,則從目前分段開始重新排隊每個分段。
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
如果搜尋引擎接收到的段總數等於資料庫服務要傳送的總段長度,則總段將用於中斷監聽生產者(資料庫服務)然後中斷並解析聚合的段緩衝區,如果不是,則繼續偵聽並將段有效負載緩衝區附加到網頁緩衝區以保存來自所有傳入段的位元組。
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
順便說一句,我用 vim
感謝您參加我的 ted 演講,我將為 zensearch 實現更多功能和修復。
以上是擴展 Zensearch 的能力來查詢整個資料庫的詳細內容。更多資訊請關注PHP中文網其他相關文章!