The content of this article is about the method of deno communication implementation (with code). It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.
Communication method
deno execution code is similar to node, including synchronous and asynchronous methods. The asynchronous method is implemented through Promise.then.
Typescript/Javascript calls rust
In the previous section, it was mentioned that the v8 isolate instance will be initialized when deno starts. During the initialization process, the c Functions are bound to instances of v8 isolate. When v8 executes Javascript code, these bound functions can be called like Javascript functions. The specific binding implementation is as follows:
void InitializeContext(v8::Isolate* isolate, v8::Local<v8::Context> context) { v8::HandleScope handle_scope(isolate); v8::Context::Scope context_scope(context); auto global = context->Global(); auto deno_val = v8::Object::New(isolate); CHECK(global->Set(context, deno::v8_str("libdeno"), deno_val).FromJust()); auto print_tmpl = v8::FunctionTemplate::New(isolate, Print); auto print_val = print_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("print"), print_val).FromJust()); auto recv_tmpl = v8::FunctionTemplate::New(isolate, Recv); auto recv_val = recv_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("recv"), recv_val).FromJust()); auto send_tmpl = v8::FunctionTemplate::New(isolate, Send); auto send_val = send_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("send"), send_val).FromJust()); auto eval_context_tmpl = v8::FunctionTemplate::New(isolate, EvalContext); auto eval_context_val = eval_context_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("evalContext"), eval_context_val) .FromJust()); auto error_to_json_tmpl = v8::FunctionTemplate::New(isolate, ErrorToJSON); auto error_to_json_val = error_to_json_tmpl->GetFunction(context).ToLocalChecked(); CHECK(deno_val->Set(context, deno::v8_str("errorToJSON"), error_to_json_val) .FromJust()); CHECK(deno_val->SetAccessor(context, deno::v8_str("shared"), Shared) .FromJust()); }
After completing the binding, the mapping between c method and Typescript method can be implemented in Typescript through the following code
libdeno.ts
interface Libdeno { recv(cb: MessageCallback): void; send(control: ArrayBufferView, data?: ArrayBufferView): null | Uint8Array; print(x: string, isErr?: boolean): void; shared: ArrayBuffer; /** Evaluate provided code in the current context. * It differs from eval(...) in that it does not create a new context. * Returns an array: [output, errInfo]. * If an error occurs, `output` becomes null and `errInfo` is non-null. */ // eslint-disable-next-line @typescript-eslint/no-explicit-any evalContext(code: string): [any, EvalErrorInfo | null]; errorToJSON: (e: Error) => string; } export const libdeno = window.libdeno as Libdeno;
When executing Typescript code, you only need to introduce libdeno and directly call the c method, for example:
import { libdeno } from "./libdeno"; function sendInternal( builder: flatbuffers.Builder, innerType: msg.Any, inner: flatbuffers.Offset, data: undefined | ArrayBufferView, sync = true ): [number, null | Uint8Array] { const cmdId = nextCmdId++; msg.Base.startBase(builder); msg.Base.addInner(builder, inner); msg.Base.addInnerType(builder, innerType); msg.Base.addSync(builder, sync); msg.Base.addCmdId(builder, cmdId); builder.finish(msg.Base.endBase(builder)); const res = libdeno.send(builder.asUint8Array(), data); builder.inUse = false; return [cmdId, res]; }
Call the libdeno.send method to pass the data to c, and then use c to call the rust code to implement the specific project operate.
Synchronization
In Typescript, you only need to set the sync parameter of the sendInternal method to true. In rust, it will be judged based on the sync parameter. Execute synchronous or asynchronous operations. If sync is true, the libdeono.send method will return the execution result. To transfer data between rust and typescript, the data needs to be serialized. The flatbuffer library is used for the serialization operation here.
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, true);
Asynchronous implementation
Similarly, to implement the asynchronous method, you only need to set the sync parameter to false. However, compared with synchronization, asynchronous operations have more fallback methods when performing asynchronous communication. When the operation is called, the libdeno.send method will return a unique cmdId identifying the operation. At the same time, after the asynchronous communication is completed, a promise object will be created, using cmdId as key and promise as value, and added to the map. The code is as follows:
const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false); util.assert(resBuf == null); const promise = util.createResolvable<msg.Base>(); promiseTable.set(cmdId, promise); return promise;
When the libdeno.send method is called in Typescript, the Send method in the C file binding.cc is called, which is initialized in deno It is bound to v8 isolate. In the Send method, the dispatch method in the ops.rs file is called, which implements the mapping of messages to functions. Each type of message corresponds to a function. For example, a file reading message corresponds to a file reading function.
pub fn dispatch( isolate: &Isolate, control: libdeno::deno_buf, data: libdeno::deno_buf, ) -> (bool, Box<Op>) { let base = msg::get_root_as_base(&control); let is_sync = base.sync(); let inner_type = base.inner_type(); let cmd_id = base.cmd_id(); let op: Box<Op> = if inner_type == msg::Any::SetTimeout { // SetTimeout is an exceptional op: the global timeout field is part of the // Isolate state (not the IsolateState state) and it must be updated on the // main thread. assert_eq!(is_sync, true); op_set_timeout(isolate, &base, data) } else { // Handle regular ops. let op_creator: OpCreator = match inner_type { msg::Any::Accept => op_accept, msg::Any::Chdir => op_chdir, msg::Any::Chmod => op_chmod, msg::Any::Close => op_close, msg::Any::FetchModuleMetaData => op_fetch_module_meta_data, msg::Any::CopyFile => op_copy_file, msg::Any::Cwd => op_cwd, msg::Any::Dial => op_dial, msg::Any::Environ => op_env, msg::Any::Exit => op_exit, msg::Any::Fetch => op_fetch, msg::Any::FormatError => op_format_error, msg::Any::Listen => op_listen, msg::Any::MakeTempDir => op_make_temp_dir, msg::Any::Metrics => op_metrics, msg::Any::Mkdir => op_mkdir, msg::Any::Open => op_open, msg::Any::ReadDir => op_read_dir, msg::Any::ReadFile => op_read_file, msg::Any::Readlink => op_read_link, msg::Any::Read => op_read, msg::Any::Remove => op_remove, msg::Any::Rename => op_rename, msg::Any::ReplReadline => op_repl_readline, msg::Any::ReplStart => op_repl_start, msg::Any::Resources => op_resources, msg::Any::Run => op_run, msg::Any::RunStatus => op_run_status, msg::Any::SetEnv => op_set_env, msg::Any::Shutdown => op_shutdown, msg::Any::Start => op_start, msg::Any::Stat => op_stat, msg::Any::Symlink => op_symlink, msg::Any::Truncate => op_truncate, msg::Any::WorkerGetMessage => op_worker_get_message, msg::Any::WorkerPostMessage => op_worker_post_message, msg::Any::Write => op_write, msg::Any::WriteFile => op_write_file, msg::Any::Now => op_now, msg::Any::IsTTY => op_is_tty, msg::Any::Seek => op_seek, msg::Any::Permissions => op_permissions, msg::Any::PermissionRevoke => op_revoke_permission, _ => panic!(format!( "Unhandled message {}", msg::enum_name_any(inner_type) )), }; op_creator(&isolate, &base, data) }; // ...省略多余的代码 }
In each type of function, synchronous execution or asynchronous execution will be determined based on the sync parameter value passed in when calling the libdeo.send method in Typescript.
let (is_sync, op) = dispatch(isolate, control_buf, zero_copy_buf);
Synchronous execution
After executing the dispatch method, the is_sync variable will be returned. If is_sync is true, it means that the method is executed synchronously, and op means the returned result. The rust code will call the deno_respond method in the c file api.cc to synchronize the execution results back. The deno_respond method determines whether it is a synchronization message based on the value of current_args_. If current_args_ has a value, the result is returned directly.
Asynchronous execution
In deno, the execution of asynchronous operations is implemented through the Tokio module of rust. After calling the dispatch method, if it is an asynchronous operation, the value of is_sync is false, and the op does not Then there is the execution result, but an execution function. Use the tokio module to derive a thread to execute the function asynchronously.
let task = op .and_then(move |buf| { let sender = tx; // tx is moved to new thread sender.send((zero_copy_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task);
When deno is initialized, a pipeline will be created. The code is as follows:
let (tx, rx) = mpsc::channel::<(usize, Buf)>();
Pipelines can realize communication between different threads, because asynchronous operations create a new thread for execution. , so the sub-thread cannot directly communicate with the main thread and needs to be implemented through the pipeline mechanism. After the execution of the asynchronous code is completed, call the tx.send method to add the execution results to the pipeline. The event loop will read the results from the pipeline and return them every time.
Since asynchronous operations rely on event loops, let’s first explain the event loop in deno. In fact, the event loop is very simple. It is a piece of code that is executed in a loop. When the conditions are met, the event The loop will end execution. The main event loop code in deno is implemented as follows:
pub fn event_loop(&self) -> Result<(), JSError> { // Main thread event loop. while !self.is_idle() { match recv_deadline(&self.rx, self.get_timeout_due()) { Ok((zero_copy_id, buf)) => self.complete_op(zero_copy_id, buf), Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("recv_deadline() failed: {:?}", e), } self.check_promise_errors(); if let Some(err) = self.last_exception() { return Err(err); } } // Check on done self.check_promise_errors(); if let Some(err) = self.last_exception() { return Err(err); } Ok(()) }
self.is_idle method is used to determine whether all asynchronous operations have been completed. When all asynchronous operations have been completed, stop the event Loop, the is_idle method code is as follows:
fn is_idle(&self) -> bool { self.ntasks.get() == 0 && self.get_timeout_due().is_none() }
When an asynchronous method call is generated, the following method will be called to increase the internal value of ntasks by 1.
fn ntasks_increment(&self) { assert!(self.ntasks.get() >= 0); self.ntasks.set(self.ntasks.get() + 1); }
In the event loop, each First, the value is retrieved from the pipeline. Here, the event loop acts as the consumer, and the sub-thread executing the asynchronous method acts as the producer. If an execution result is obtained in an event loop, the ntasks_decrement method will be called to decrement the internal value of ntasks by 1. When the value of ntasks is 0, the event loop will exit execution. In each loop, the value obtained in the pipeline is used as a parameter, the complete_op method is called, and the result is returned.
在初始化v8实例时,绑定的c++方法中有一个Recv方法,该方法的作用时暴露一个Typescript的函数给rust,在deno的io.ts文件的start方法中执行libdeno.recv(handleAsyncMsgFromRust),将handleAsyncMsgFromRust函数通过c++方法暴露给rust。具体实现如下:
export function start(source?: string): msg.StartRes { libdeno.recv(handleAsyncMsgFromRust); // First we send an empty `Start` message to let the privileged side know we // are ready. The response should be a `StartRes` message containing the CLI // args and other info. const startResMsg = sendStart(); util.setLogDebug(startResMsg.debugFlag(), source); setGlobals(startResMsg.pid(), startResMsg.noColor(), startResMsg.execPath()!); return startResMsg; }
当异步操作执行完成后,可以在rust中直接调用handleAsyncMsgFromRust方法,将结果返回给Typescript。先看一下handleAsyncMsgFromRust方法的实现细节:
export function handleAsyncMsgFromRust(ui8: Uint8Array): void { // If a the buffer is empty, recv() on the native side timed out and we // did not receive a message. if (ui8 && ui8.length) { const bb = new flatbuffers.ByteBuffer(ui8); const base = msg.Base.getRootAsBase(bb); const cmdId = base.cmdId(); const promise = promiseTable.get(cmdId); util.assert(promise != null, `Expecting promise in table. ${cmdId}`); promiseTable.delete(cmdId); const err = errors.maybeError(base); if (err != null) { promise!.reject(err); } else { promise!.resolve(base); } } // Fire timers that have become runnable. fireTimers(); }
从代码handleAsyncMsgFromRust方法的实现中可以知道,首先通过flatbuffer反序列化返回的结果,然后获取返回结果的cmdId,根据cmdId获取之前创建的promise对象,然后调用promise.resolve方法触发promise.then中的代码执行。
The above is the detailed content of How to implement deno communication (with code). For more information, please follow other related articles on the PHP Chinese website!