これまでは、データベースが RabbitMQ のメッセージ キューが保持できる容量を超えるまでは、問題なく検索エンジンの Web ページをクロールしてインデックスを付けることができました。メッセージ キュー内のメッセージがデフォルトのサイズを超えると、RabbitMQ はエラーをスローしてパニックになります。デフォルトのサイズを変更することもできますが、データベースが大きくなると拡張できなくなります。そのため、ユーザーが心配することなく Web ページをクロールできるようにするためです。メッセージ ブローカーがクラッシュしています。
セグメントを作成するときに TCP と同じ原則に基づいて、最大セグメント サイズまたは MSS を使用してセグメントを作成する関数を実装しました。セグメントには 8 バイトのヘッダーが含まれており、8 バイトのヘッダーの各 4 バイトはシーケンス番号であり、合計セグメント数、本体の残りの部分はセグメント化されたデータベースのペイロードです。
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
大規模なデータセットの小さなセグメントを作成するこの方法は、データベースが拡大した場合でもデータベース クエリを拡張するのに役立ちます。
では、検索エンジンはどのようにしてバッファを解析し、各セグメントを Web ページの配列に変換するのでしょうか?
ヘッダーにはシーケンス番号と合計セグメントという 2 つのプロパティが含まれているため、最初にセグメント ヘッダーを抽出します。
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
シーケンス番号はセグメントの再送信/再キューイングに使用されるため、予期したシーケンス番号が受信したものではない場合は、現在のセグメントから始まるすべてのセグメントを再キューイングします。
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
検索エンジンが受信したセグメントの合計数がデータベース サービスによって送信されるセグメントの合計の長さと等しい場合、セグメントの合計はプロデューサー (データベース サービス) のリスニングを中断するために使用されます。次に、集約されたセグメント バッファを分割して解析し、そうでない場合はリスニングを続け、セグメント ペイロード バッファを Web ページ バッファに追加して、すべての受信セグメントからのバイトを保持します。
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
ところで vim を使っています
私の ted トークにお越しいただきありがとうございます。zensearch にはさらに多くの機能と修正を実装する予定です。
以上がZensearch の機能を拡張してデータベース全体をクエリするの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。