Zuvor konnte ich problemlos Webseiten für meine Suchmaschine crawlen und indizieren, bis meine Datenbank stärker wuchs, als die Nachrichtenwarteschlange von RabbitMQ aufnehmen konnte. Wenn eine Nachricht in einer Nachrichtenwarteschlange ihre Standardgröße überschreitet, gibt RabbitMQ einen Fehler aus und gerät in Panik. Ich könnte die Standardgröße ändern, aber das würde sich nicht skalieren, wenn meine Datenbank wächst, damit Benutzer Webseiten ohne Bedenken crawlen können Der Nachrichtenbroker stürzt ab.
Ich habe eine Funktion zum Erstellen von Segmenten mit einer maximalen Segmentgröße oder MSS nach den gleichen Prinzipien von TCP implementiert. Beim Erstellen von Segmenten enthält das Segment einen 8-Byte-Header, wobei jedes 4 Byte des 8-Byte-Headers die Sequenznummer ist und die Gesamtsegmentanzahl und der Rest des Körpers ist die Nutzlast der segmentierten Datenbank.
// MSS is number in bytes function createSegments( webpages: Array<Webpage>, // webpages queried from database MSS: number, ): Array<ArrayBufferLike> { const text_encoder = new TextEncoder(); const encoded_text = text_encoder.encode(JSON.stringify(webpages)); const data_length = encoded_text.byteLength; let currentIndex = 0; let segmentCount = Math.trunc(data_length / MSS) + 1; // + 1 to store the remainder let segments: Array<ArrayBufferLike> = []; let pointerPosition = MSS; for (let i = 0; i < segmentCount; i++) { let currentDataLength = Math.abs(currentIndex - data_length); let slicedArray = encoded_text.slice(currentIndex, pointerPosition); currentIndex += slicedArray.byteLength; // Add to offset MSS to point to the next segment in the array // manipulate pointerPosition to adjust to lower values using Math.min() // Is current data length enough to fit MSS? // if so add from current position + MSS // else get remaining of the currentDataLength pointerPosition += Math.min(MSS, currentDataLength); const payload = new Uint8Array(slicedArray.length); payload.set(slicedArray); segments.push(newSegment(i, segmentCount, Buffer.from(payload))); } return segments; } function newSegment( sequenceNum: number, segmentCount: number, payload: Buffer, ): ArrayBufferLike { // 4 bytes for sequenceNum 4 bytes for totalSegmentsCount const sequenceNumBuffer = convertIntToBuffer(sequenceNum); const segmentCountBuffer = convertIntToBuffer(segmentCount); const headerBuffer = new ArrayBuffer(8); const header = new Uint8Array(headerBuffer); header.set(Buffer.concat([sequenceNumBuffer, segmentCountBuffer])); return Buffer.concat([header, payload]); } function convertIntToBuffer(int: number): Buffer { const bytes = Buffer.alloc(4); bytes.writeIntLE(int, 0, 4); console.log(bytes); return bytes; }
Diese Methode zum Erstellen kleiner Segmente eines großen Datensatzes würde dazu beitragen, die Datenbankabfrage zu skalieren, selbst wenn die Datenbank wächst.
Wie analysiert nun die Suchmaschine den Puffer und wandelt jedes Segment in ein Webseiten-Array um?
Extrahieren Sie zunächst den Segment-Header, da der Header zwei Eigenschaften enthält, nämlich Sequenznummer und Gesamtsegmente,
func GetSegmentHeader(buf []byte) (*SegmentHeader, error) { byteReader := bytes.NewBuffer(buf) headerOffsets := []int{0, 4} newSegmentHeader := SegmentHeader{} for i := range headerOffsets { buffer := make([]byte, 4) _, err := byteReader.Read(buffer) if err != nil { return &SegmentHeader{}, err } value := binary.LittleEndian.Uint32(buffer) // this feels disgusting but i dont feel like bothering with this if i == 0 { newSegmentHeader.SequenceNum = value continue } newSegmentHeader.TotalSegments = value } return &newSegmentHeader, nil } func GetSegmentPayload(buf []byte) ([]byte, error) { headerOffset := 8 byteReader := bytes.NewBuffer(buf[headerOffset:]) return byteReader.Bytes(), nil }
Die Sequenznummer wird für die Neuübertragung/Neueinreihung der Segmente verwendet. Wenn also die erwartete Sequenznummer nicht mit der empfangenen übereinstimmt, stellen Sie jedes Segment, beginnend mit dem aktuellen, erneut in die Warteschlange.
// for retransmission/requeuing if segmentHeader.SequenceNum != expectedSequenceNum { ch.Nack(data.DeliveryTag, true, true) log.Printf("Expected Sequence number %d, got %d\n", expectedSequenceNum, segmentHeader.SequenceNum) continue }
Das Gesamtsegment wird zum Ausbrechen des Abhörens des Produzenten (Datenbankdienst) verwendet, wenn die Gesamtzahl der von der Suchmaschine empfangenen Segmente gleich der Länge der Gesamtsegmente ist, die vom Datenbankdienst gesendet werden sollen Brechen Sie dann den aggregierten Segmentpuffer aus und analysieren Sie ihn. Wenn nicht, hören Sie weiter zu und hängen Sie den Segment-Nutzlastpuffer an einen Webseitenpuffer an, um Bytes aus allen eingehenden Segmenten zu speichern.
segmentCounter++ fmt.Printf("Total Segments : %d\n", segmentHeader.TotalSegments) fmt.Printf("current segments : %d\n", segmentCounter) expectedSequenceNum++ ch.Ack(data.DeliveryTag, false) webpageBytes = append(webpageBytes, segmentPayload...) fmt.Printf("Byte Length: %d\n", len(webpageBytes)) if segmentCounter == segmentHeader.TotalSegments { log.Printf("Got all segments from Database %d", segmentCounter) break }
Ich verwende übrigens vim
Vielen Dank, dass Sie zu meinem Ted-Vortrag gekommen sind. Ich werde weitere Funktionen und Korrekturen für Zensearch implementieren.
Das obige ist der detaillierte Inhalt vonSkalierung der Funktionen von Zensearch, um die gesamte Datenbank abzufragen. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!