이 글에서는 주로 JavaScript 멀티 스레드 런타임 라이브러리 Nexus.js의 학습 경험과 코드 공유를 소개합니다. 도움이 필요한 친구들이 참고하여 함께 배울 수 있습니다.
우선, 이 프로젝트에 대해 잘 모르신다면 이전에 쓴 일련의 기사를 읽어 보시기를 권장합니다. 이 글을 읽고 싶지 않다면 걱정하지 마세요. 해당 내용도 여기에서 다루겠습니다.
이제 시작해 보겠습니다.
작년에 저는 Webkit/JavaScript 커널을 기반으로 하는 다중 스레드 서버측 JavaScript 런타임인 Nexus.js를 구현하기 시작했습니다. 나는 한동안 그 일을 포기했습니다. 제가 통제할 수 없는 이유로 여기서는 논의하지 않겠습니다. 주로: 오랫동안 일을 할 수 없었습니다.
이제 Nexus의 아키텍처와 작동 방식에 대해 논의하는 것부터 시작하겠습니다.
이벤트 루프
이벤트 루프가 없습니다
(잠금 없는) 작업 개체가 있는 스레드 풀이 있습니다
setTimeout 또는 setImmediate가 호출되거나 Promise가 생성될 때마다 작업이 대기열에 추가됩니다. 작업 대기열 시계.
작업이 예약될 때마다 사용 가능한 첫 번째 스레드가 작업을 선택하고 실행합니다.
CPU 코어에서 Promise를 처리합니다. Promise.all()을 호출하면 Promise가 병렬로 해결됩니다. ㅋㅋㅋ
CommonJS는 지원되지 않습니다. (require(...) 및 module.exports) 모든 모듈은 ES6 가져오기/내보내기 구문을 사용합니다. import('file-or-packge').then(...)을 통해 동적 가져오기를 지원합니다.
import 를 지원합니다. 메타(예: import.meta.filename 및 import.meta.dirname 등)
추가 기능: 다음과 같은 URL에서 직접 가져오기 지원:
import { h } from 'https://unpkg.com/preact/dist/preact.esm.js';
Nexus는 Promise 기반 EventEmitter 클래스를 구현합니다 이벤트 핸들러는 모든 스레드에서 정렬되며 병렬로 실행됩니다. EventEmitter.emit(...)의 반환 값은 이벤트 핸들러에서 반환 값 배열로 구문 분석될 수 있는 Promise입니다.
예:
class EmitterTest extends Nexus.EventEmitter { constructor() { super(); for(let i = 0; i < 4; i++) this.on('test', value => { console.log(`fired test ${i}!`); console.inspect(value); }); for(let i = 0; i < 4; i++) this.on('returns-a-value', v => `${v + i}`); } } const test = new EmitterTest(); async function start() { await test.emit('test', { payload: 'test 1' }); console.log('first test done!'); await test.emit('test', { payload: 'test 2' }); console.log('second test done!'); const values = await test.emit('returns-a-value', 10); console.log('third test done, returned values are:'); console.inspect(values); } start().catch(console.error);
I/O
모든 입력/출력은 장치, 필터 및 스트림의 세 가지 기본 요소를 통해 수행됩니다.
모든 입력/출력 프리미티브는 EventEmitter 클래스를 구현합니다. Device를 사용하려면 Device 위에 ReadableStream 또는 WritableStream을 만들어야 합니다.
데이터를 조작하려면 ReadableStream 또는 WritableStream에 필터를 추가할 수 있습니다.
마지막으로 source.pipe(...destinationStreams)를 사용하고 source.resume()이 데이터를 처리할 때까지 기다립니다.
모든 입력/출력 작업은 ArrayBuffer 개체를 사용하여 수행됩니다.
Filter는 데이터를 처리하기 위해 process(buffer) 방식을 시도했습니다.
예: 2개의 별도 출력 파일을 사용하여 UTF-8을 UTF6으로 변환합니다. const startTime = Date.now();
try {
const device = new Nexus.IO.FilePushDevice('enwik8');
const stream = new Nexus.IO.ReadableStream(device);
stream.pushFilter(new Nexus.IO.EncodingConversionFilter("UTF-8", "UTF-16LE"));
const wstreams = [0,1,2,3]
.map(i => new Nexus.IO.WritableStream(new Nexus.IO.FileSinkDevice('enwik16-' + i)));
console.log('piping...');
stream.pipe(...wstreams);
console.log('streaming...');
await stream.resume();
await stream.close();
await Promise.all(wstreams.map(stream => stream.close()));
console.log(`finished in ${(Date.now() * startTime) / 1000} seconds!`);
} catch (e) {
console.error('An error occurred: ', e);
}
}
start().catch(console.error);
Nexus.js는 IP 주소/포트 바인딩 및 연결 모니터링을 담당하는 Acceptor 클래스를 제공합니다.
연결 요청이 수신될 때마다 연결 이벤트가 트리거되고 제공됩니다. 소켓 장치.
각 소켓 인스턴스는 전이중 I/O 장치입니다.
ReadableStream과 WritableStream을 사용하여 소켓을 작동할 수 있습니다.
가장 기본적인 예: (클라이언트에 "Hello World" 전송)
const acceptor = new Nexus.Net.TCP.Acceptor(); let count = 0; acceptor.on('connection', (socket, endpoint) => { const connId = count++; console.log(`connection #${connId} from ${endpoint.address}:${endpoint.port}`); const rstream = new Nexus.IO.ReadableStream(socket); const wstream = new Nexus.IO.WritableStream(socket); const buffer = new Uint8Array(13); const message = 'Hello World!\n'; for(let i = 0; i < 13; i++) buffer[i] = message.charCodeAt(i); rstream.pushFilter(new Nexus.IO.UTF8StringFilter()); rstream.on('data', buffer => console.log(`got message: ${buffer}`)); rstream.resume().catch(e => console.log(`client #${connId} at ${endpoint.address}:${endpoint.port} disconnected!`)); console.log(`sending greeting to #${connId}!`); wstream.write(buffer); }); acceptor.bind('127.0.0.1', 10000); acceptor.listen(); console.log('server ready');
Http
Nexus는 기본적으로 TCPAcceptor를 상속하는 Nexus.Net.HTTP.Server 클래스를 제공합니다 기본 인터페이스
When 서버는 들어오는 연결의 기본 HTTP 헤더에 대한 구문 분석/검증을 완료하고 연결 및 동일한 정보를 사용하여 연결 이벤트가 트리거됩니다.
각 연결 인스턴스에는 요청과 응답 개체가 있습니다. 입출력 장치들입니다.
ReadableStream 및 WritableStream을 구성하여 요청/응답을 조작할 수 있습니다.
파이프를 통해 응답 객체에 연결하면 입력 스트림은 청크 인코딩 모드를 사용합니다. 그렇지 않으면 response.write()를 사용하여 일반 문자열을 작성할 수 있습니다.
복잡한 예: (블록 인코딩이 포함된 기본 Http 서버, 자세한 내용은 생략)
.... /** * Creates an input stream from a path. * @param path * @returns {Promise<ReadableStream>} */ async function createInputStream(path) { if (path.startsWith('/')) // If it starts with '/', omit it. path = path.substr(1); if (path.startsWith('.')) // If it starts with '.', reject it. throw new NotFoundError(path); if (path === '/' || !path) // If it's empty, set to index.html. path = 'index.html'; /** * `import.meta.dirname` and `import.meta.filename` replace the old CommonJS `__dirname` and `__filename`. */ const filePath = Nexus.FileSystem.join(import.meta.dirname, 'server_root', path); try { // Stat the target path. const {type} = await Nexus.FileSystem.stat(filePath); if (type === Nexus.FileSystem.FileType.Directory) // If it's a directory, return its 'index.html' return createInputStream(Nexus.FileSystem.join(filePath, 'index.html')); else if (type === Nexus.FileSystem.FileType.Unknown || type === Nexus.FileSystem.FileType.NotFound) // If it's not found, throw NotFound. throw new NotFoundError(path); } catch(e) { if (e.code) throw e; throw new NotFoundError(path); } try { // First, we create a device. const fileDevice = new Nexus.IO.FilePushDevice(filePath); // Then we return a new ReadableStream created using our source device. return new Nexus.IO.ReadableStream(fileDevice); } catch(e) { throw new InternalServerError(e.message); } } /** * Connections counter. */ let connections = 0; /** * Create a new HTTP server. * @type {Nexus.Net.HTTP.Server} */ const server = new Nexus.Net.HTTP.Server(); // A server error means an error occurred while the server was listening to connections. // We can mostly ignore such errors, we display them anyway. server.on('error', e => { console.error(FgRed + Bright + 'Server Error: ' + e.message + '\n' + e.stack, Reset); }); /** * Listen to connections. */ server.on('connection', async (connection, peer) => { // Start with a connection ID of 0, increment with every new connection. const connId = connections++; // Record the start time for this connection. const startTime = Date.now(); // Destructuring is supported, why not use it? const { request, response } = connection; // Parse the URL parts. const { path } = parseURL(request.url); // Here we'll store any errors that occur during the connection. const errors = []; // inStream is our ReadableStream file source, outStream is our response (device) wrapped in a WritableStream. let inStream, outStream; try { // Log the request. console.log(`> #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}"`, Reset); // Set the 'Server' header. response.set('Server', `nexus.js/0.1.1`); // Create our input stream. inStream = await createInputStream(path); // Create our output stream. outStream = new Nexus.IO.WritableStream(response); // Hook all `error` events, add any errors to our `errors` array. inStream.on('error', e => { errors.push(e); }); request.on('error', e => { errors.push(e); }); response.on('error', e => { errors.push(e); }); outStream.on('error', e => { errors.push(e); }); // Set content type and request status. response .set('Content-Type', mimeType(path)) .status(200); // Hook input to output(s). const disconnect = inStream.pipe(outStream); try { // Resume our file stream, this causes the stream to switch to HTTP chunked encoding. // This will return a promise that will only resolve after the last byte (HTTP chunk) is written. await inStream.resume(); } catch (e) { // Capture any errors that happen during the streaming. errors.push(e); } // Disconnect all the callbacks created by `.pipe()`. return disconnect(); } catch(e) { // If an error occurred, push it to the array. errors.push(e); // Set the content type, status, and write a basic message. response .set('Content-Type', 'text/plain') .status(e.code || 500) .send(e.message || 'An error has occurred.'); } finally { // Close the streams manually. This is important because we may run out of file handles otherwise. if (inStream) await inStream.close(); if (outStream) await outStream.close(); // Close the connection, has no real effect with keep-alive connections. await connection.close(); // Grab the response's status. let status = response.status(); // Determine what colour to output to the terminal. const statusColors = { '200': Bright + FgGreen, // Green for 200 (OK), '404': Bright + FgYellow, // Yellow for 404 (Not Found) '500': Bright + FgRed // Red for 500 (Internal Server Error) }; let statusColor = statusColors[status]; if (statusColor) status = statusColor + status + Reset; // Log the connection (and time to complete) to the console. console.log(`< #${FgCyan + connId + Reset} ${Bright + peer.address}:${peer.port + Reset} ${ FgGreen + request.method + Reset} "${FgYellow}${path}${Reset}" ${status} ${(Date.now() * startTime)}ms` + (errors.length ? " " + FgRed + Bright + errors.map(error => error.message).join(', ') + Reset : Reset)); } }); /** * IP and port to listen on. */ const ip = '0.0.0.0', port = 3000; /** * Whether or not to set the `reuse` flag. (optional, default=false) */ const portReuse = true; /** * Maximum allowed concurrent connections. Default is 128 on my system. (optional, system specific) * @type {number} */ const maxConcurrentConnections = 1000; /** * Bind the selected address and port. */ server.bind(ip, port, portReuse); /** * Start listening to requests. */ server.listen(maxConcurrentConnections); /** * Happy streaming! */ console.log(FgGreen + `Nexus.js HTTP server listening at ${ip}:${port}` + Reset);
Benchmarks 지금까지 구현한 모든 내용을 다룬 것 같습니다. 그럼 이제 성능에 대해 이야기해보겠습니다. 다음은 100개의 동시 연결과 총 10000개의 요청을 포함하는 Appeal HTTP 서버의 현재 벤치마크입니다:
This is ApacheBench, Version 2.3 <$Revision: 1796539 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/ Benchmarking localhost (be patient).....done Server Software: nexus.js/0.1.1 Server Hostname: localhost Server Port: 3000 Document Path: / Document Length: 8673 bytes Concurrency Level: 100 Time taken for tests: 9.991 seconds Complete requests: 10000 Failed requests: 0 Total transferred: 87880000 bytes HTML transferred: 86730000 bytes Requests per second: 1000.94 [#/sec] (mean) Time per request: 99.906 [ms] (mean) Time per request: 0.999 [ms] (mean, across all concurrent requests) Transfer rate: 8590.14 [Kbytes/sec] received Connection Times (ms) min mean[+/-sd] median max Connect: 0 0 0.1 0 1 Processing: 6 99 36.6 84 464 Waiting: 5 99 36.4 84 463 Total: 6 100 36.6 84 464 Percentage of the requests served within a certain time (ms) 50% 84 66% 97 75% 105 80% 112 90% 134 95% 188 98% 233 99% 238 100% 464 (longest request)
초당 요청 1000개. 구형 i7에서는 벤치마크 소프트웨어, 5G 메모리를 차지하는 IDE 및 서버 자체를 실행합니다.
voodooattack@voodooattack:~$ cat /proc/cpuinfo processor : 0 vendor_id : GenuineIntel cpu family : 6 model : 60 model name : Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz stepping : 3 microcode : 0x22 cpu MHz : 3392.093 cache size : 8192 KB physical id : 0 siblings : 8 core id : 0 cpu cores : 4 apicid : 0 initial apicid : 0 fpu : yes fpu_exception : yes cpuid level : 13 wp : yes flags : fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clflush dts acpi mmx fxsr sse sse2 ss ht tm pbe syscall nx pdpe1gb rdtscp lm constant_tsc arch_perfmon pebs bts rep_good nopl xtopology nonstop_tsc cpuid aperfmperf pni pclmulqdq dtes64 monitor ds_cpl vmx smx est tm2 ssse3 sdbg fma cx16 xtpr pdcm pcid sse4_1 sse4_2 x2apic movbe popcnt tsc_deadline_timer aes xsave avx f16c rdrand lahf_lm abm cpuid_fault tpr_shadow vnmi flexpriority ept vpid fsgsbase tsc_adjust bmi1 avx2 smep bmi2 erms invpcid xsaveopt dtherm ida arat pln pts bugs : bogomips : 6784.18 clflush size : 64 cache_alignment : 64 address sizes : 39 bits physical, 48 bits virtual power management:
1000개의 동시 요청을 시도했지만 열려 있는 소켓이 많아 APacheBench 시간이 초과되었습니다. httperf를 시도했는데 결과는 다음과 같습니다.
voodooattack@voodooattack:~$ httperf --port=3000 --num-conns=10000 --rate=1000 httperf --client=0/1 --server=localhost --port=3000 --uri=/ --rate=1000 --send-buffer=4096 --recv-buffer=16384 --num-conns=10000 --num-calls=1 httperf: warning: open file limit > FD_SETSIZE; limiting max. # of open files to FD_SETSIZE Maximum connect burst length: 262 Total: connections 9779 requests 9779 replies 9779 test-duration 10.029 s Connection rate: 975.1 conn/s (1.0 ms/conn, <=1022 concurrent connections) Connection time [ms]: min 0.5 avg 337.9 max 7191.8 median 79.5 stddev 848.1 Connection time [ms]: connect 207.3 Connection length [replies/conn]: 1.000 Request rate: 975.1 req/s (1.0 ms/req) Request size [B]: 62.0 Reply rate [replies/s]: min 903.5 avg 974.6 max 1045.7 stddev 100.5 (2 samples) Reply time [ms]: response 129.5 transfer 1.1 Reply size [B]: header 89.0 content 8660.0 footer 2.0 (total 8751.0) Reply status: 1xx=0 2xx=9779 3xx=0 4xx=0 5xx=0 CPU time [s]: user 0.35 system 9.67 (user 3.5% system 96.4% total 99.9%) Net I/O: 8389.9 KB/s (68.7*10^6 bps) Errors: total 221 client-timo 0 socket-timo 0 connrefused 0 connreset 0 Errors: fd-unavail 221 addrunavail 0 ftab-full 0 other 0
보시다시피 여전히 작동합니다. 스트레스로 인해 일부 연결 시간이 초과될 수도 있습니다. 이 문제의 원인을 계속 조사 중입니다.
위 내용은 제가 여러분을 위해 정리한 내용입니다. 앞으로 도움이 되길 바랍니다.
관련 기사:
Angular에서 일기 예보를 쿼리하는 방법nodejs에서 OAuth2.0 인증 서비스 인증 구현하는 방법
JavaScript에서 자동 숫자 증가를 구현하는 방법
React 프로젝트에서 Redux를 사용하는 방법(자세한 튜토리얼)
위 내용은 JavaScript의 다중 스레드 런타임 라이브러리 정보(자세한 튜토리얼)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!