所有传入的 socketio 事件将按照它们到达的顺序开始处理。那些具有同步处理函数的函数保证以相同的顺序完成处理。但是,对于具有异步处理程序的事件来说,情况可能并非如此,异步处理程序可以按任何顺序完成处理。这种行为使我们的代码更快,但在某些情况下,可能不是我们想要的。
在这篇短文中,您将了解如何使异步任务事件按照事件到达服务器的顺序开始和完成处理。为了实现这一目标,我们将创建一个简单的自定义排队系统。
让我们开始吧。
假设您有两个客户端连接到您的 socketio 服务器的情况。您希望第一个发送事件的人在数据库表中创建一个新行,第二个发送事件的人更新同一行。您的代码可能如下所示:
io.on("connection", (socket) => { console.log("A user connected"); socket.on("SOME_EVENT", async(param) => { //check whether the column already exists: const column = await db.select... //if column exists, update it: if(column){ await db.update... } //else, create one await db.insert... }) }
现在的问题是,如果两个客户端同时发出“SOME_EVENT”,它们有可能都会在数据库中创建一个新行,这不是我们想要的。
我们不会允许 socketio 执行处理函数,而是拦截它们并决定它们何时执行。当我们拦截处理程序时,我们会将它们发送到队列系统,负责按顺序实现它们。
系统将有两个主要组件 - 队列和事件循环。
在计算机科学中,队列是一种数据结构,使我们能够顺序存储和管理数据。为了实现这一点,该结构只允许数据在一端(尾部)添加并在另一端(头部)退出。这种特性通常被称为 FIFO,意思是先进先出。
队列是一种抽象数据类型(ADT)。与其他 ADT 一样,许多编程语言(包括 Javascript)默认情况下都没有它。在这篇文章中,我们将使用 Javascript 数组的 unshift 和 pop 方法对队列进行建模。
从广义上讲,事件循环是一种每隔一段时间运行并有条件执行任务的构造。在我们的例子中,我们将使用 setInterval,它将不断检查队列是否包含待处理的函数,并且仅当前一个函数完成时才会调用下一个函数。
class QueuingSystem { //We're making the queue private so that it can only be //modified only within the class #queue = []; constructor(interval) { this.interval = interval; } //returns last item of an array: lastItem(arr) { return arr[arr.length - 1]; } //returns true if array is empty: isEmpty(arr) { return arr.length == 0; } //responsible for executing the function at the head of the queue: async run(arr) { //removing the function at the head of the queue: const func = arr.pop(); //adding "false" placeholder at the head to indicate that //a function is being executed: arr.push(false); //executing the function: await func(); //removing the false placeholder at the head to indicate that //the run function is ready to execute the next function: arr.pop(); } //function to add to the tail end of the queue: addToQueue(func) { this.#queue.unshift(func); } //function to start the event loop: start() { return setInterval(() => { //checking if the run method is free by checking if the item at the head is false. //and checking if the array isn't empty if (this.lastItem(this.#queue) !== false && !this.isEmpty(this.#queue)) { this.run(this.#queue); } }, this.interval); } //stopping the event loop if no longer needed: stop() { clearInterval(this.start()); } }
我们的消息队列类现在已准备好按顺序接收和执行函数。
我们的排队系统就位后,让我们在代码中使用它:
//Create a socketQueue that loops after every half of a second: const socketQueue = new QueuingSystem(500) io.on("connection", (socket) => { console.log("A user connected"); const handler = async(param) => { //check whether the column already exists: const column = await db.select... //if column exists, update it: if(column){ await db.update... } //else, create one await db.insert... } socket.on("SOME_EVENT", (param) => { //Add the handler function to socketQueue queue socketQueue.addToQueue(hanlder.bind(null, param)) }) } server.listen(PORT, () => { //start the queuing system: socketQueue.start(); console.log("App listening on port", PORT); });
Javascript中的bind方法用于将函数及其参数附加到对象,但不调用它们。在我们的例子中,我们没有将函数附加到任何对象,这就是第一个参数为 null 的原因。
我们创建的消息队列类可以帮助我们通过异步处理程序顺序执行事件。如果您需要更复杂的排队系统,请查看 BullMQ 或其他强大的解决方案。快乐编码!
以上是如何简单地按顺序处理 Socket.io 事件:简短指南的详细内容。更多信息请关注PHP中文网其他相关文章!