Home > Web Front-end > JS Tutorial > From Storage to Stream: Delivering MongoDB Data Directly to Users

From Storage to Stream: Delivering MongoDB Data Directly to Users

DDD
Release: 2025-01-04 04:34:38
Original
381 people have browsed it

From Storage to Stream: Delivering MongoDB Data Directly to Users

Step 1: MongoDB Cursor

Here’s how we set up the cursor (reusing your snippet):

const cursor =
    userObject?.data?.serviceProviderName === 'ZYRO'
        ? zyroTransactionModel.find(query).cursor()
        : finoTransactionModel.find(query).cursor();

console.log("Cursor created successfully");
Copy after login

Step 2: Setting Up the ZIP File
Use the yazl library to stream CSV data into a ZIP file:

const yazl = require('yazl');
const zipfile = new yazl.ZipFile();

reply.raw.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": "attachment; filename=transactions.zip",
});

zipfile.outputStream.pipe(reply.raw);

const cleanup = async () => {
    console.log("Cleaning up resources...");
    zipfile.end(); // Finalize ZIP
    await cursor.close();
};
reply.raw.on("close", cleanup);
reply.raw.on("error", cleanup);
Copy after login

Step 3: Creating Dynamic CSV Streams
Generate CSV data dynamically and stream it into the ZIP file:

const createNewCSVStream = (headers) => {
    const csvStream = new Readable({ read() {} });
    csvStream.push(headers.join(",") + "\n"); // Add headers
    return csvStream;
};

const filteredHeaders = getHeaders(transactionDownloadFields, userObject?.state?.auth?.role);
const currentCSVStream = createNewCSVStream(filteredHeaders);

zipfile.addReadStream(currentCSVStream, "transactions_part_1.csv");
Copy after login

Step 4: Streaming MongoDB Data to CSV
Stream the data from MongoDB directly into the CSV:

cursor.on('data', (doc) => {
    const csvRow = filteredHeaders.map(header => doc[header.key] || '').join(',');
    currentCSVStream.push(csvRow + '\n'); // Write row
});

cursor.on('end', () => {
    currentCSVStream.push(null); // End the stream
    zipfile.end(); // Finalize the ZIP
});

Copy after login

Step 5: Processing Data from MongoDB Cursor
Stream documents from the MongoDB cursor, transform them as needed, and dynamically write rows to the CSV stream:

try {
    for await (const doc of cursor) {
        if (clientDisconnected) {
            console.log("Client disconnected. Stopping processing...");
            break;
        }

        streamedCount++;
        rowCount++;

        let row = "";
        const filteredHeaders = getHeaders(
            transactionDownloadFields,
            userObject?.state?.auth?.role
        );

        for (let i = 0; i < filteredHeaders.length; i++) {
            const field = filteredHeaders[i];

            // Fetch the corresponding field configuration from transactionDownloadFields
            const originalField = transactionDownloadFields.find((f) => f.value === field.value);

            // Get the value from the transaction document
            let value = getValueFromTransaction(doc, field.value);

            // Apply transformation if the field has a transform function
            if (originalField?.transform) {
                value = originalField.transform(value);
            }

            // Enclose the value in double quotes
            value = value !== undefined ? `"${value}"` : '"N/A"';
            row += (i > 0 ? "," : "") + value;
        }
        row += "\n";
        currentCSVStream.push(row);

        // Check if the row count has reached the threshold for the current CSV file
        if (rowCount >= MAX_ROWS_PER_FILE) {
            console.log(`Threshold reached for file ${fileIndex - 1}. Starting new file...`);
            currentCSVStream.push(null); // End the current CSV stream
            currentCSVStream = createNewCSVStream(); // Start a new stream
            rowCount = 0; // Reset the row count
        }
    }

    // Finalize the current CSV stream if it has data
    if (currentCSVStream) {
        currentCSVStream.push(null);
    }

    // Finalize the ZIP file
    zipfile.end();
    console.log(`Successfully streamed ${streamedCount} rows across ${fileIndex - 1} files.`);
} catch (error) {
    console.error("Error during processing:", error);
    if (!headersSent) reply.status(500).send({ error: "Failed to generate ZIP file" });
} finally {
    // Cleanup: Close the MongoDB cursor
    await cursor.close().catch((err) => console.error("Error closing cursor:", err));
}
Copy after login

Summary

Document Iteration Using for await...of:

Streams documents one by one from the MongoDB cursor efficiently.
Enables real-time processing without loading all data into memory.

  • Dynamic CSV Row Generation:

Constructs each row dynamically by iterating over filteredHeaders.
Applies transformations using a transform function, if defined in transactionDownloadFields.
Row Threshold and File Splitting:

Monitors the row count against the threshold (MAX_ROWS_PER_FILE).
Ends the current CSV stream and starts a new one when the threshold is reached.

  • Error Handling:

Logs and sends an error response if an issue occurs during processing.
Ensures proper cleanup by closing the MongoDB cursor in the finally block.

  • Finalizing Streams:

Pushes null to terminate the current CSV stream.
Completes the ZIP file once all rows are processed.

The above is the detailed content of From Storage to Stream: Delivering MongoDB Data Directly to Users. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template