Table of Contents
Text
Distributed lock
Run the test
Distributed filter
Distributed rate limiter
运行测试
其他
Home Database Redis How to use Redis in Golang distributed applications

How to use Redis in Golang distributed applications

May 26, 2023 pm 10:07 PM
redis golang

    Text

    Redis is a high-performance in-memory database that is often used in distributed systems. In addition to being a distributed cache or a simple memory The database also has some special application scenarios. This article combines Golang to write the corresponding middleware.

    Distributed lock

    In a stand-alone system, we can use sync.Mutex to protect critical resources. There is also such a need in a distributed system. When multiple hosts To seize the same resource, you need to add the corresponding "distributed lock".

    In Redis we can achieve this through the setnx command

    • If the key does not exist, the corresponding value can be set. If the setting is successful, the lock will be successful. , key does not exist and returns failure

    • Releasing the lock can be achieved through del.

    The main logic is as follows:

    type RedisLock struct {
    	client     *redis.Client
    	key        string
    	expiration time.Duration // 过期时间,防止宕机或者异常
    }
    func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
    	return &RedisLock{
    		client:     client,
    		key:        key,
    		expiration: expiration,
    	}
    }
    // 加锁将成功会将调用者id保存到redis中
    func (l *RedisLock) Lock(id string) (bool, error) {
    	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
    }
    const unLockScript = `
    if (redis.call("get", KEYS[1]) == KEYS[2]) then
    	redis.call("del", KEYS[1])
    	return true
    end
    return false
    `
    // 解锁通过lua脚本来保证原子性,只能解锁当前调用者加的锁
    func (l *RedisLock) UnLock(id string) error {
    	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    Copy after login

    In order to prevent system downtime or deadlock caused by abnormal requests, an additional timeout period needs to be added, which should be set to Twice the maximum estimated run time.

    Lua script is used to ensure atomicity when unlocking. The caller will only unlock the lock added by itself. Avoid confusion caused by timeout. For example: Process A acquired the lock at time t1, but due to slow execution, the lock timed out at time t2. Process B acquired the lock at t3. If process A finishes executing and unlocks the process, the process will be canceled. B's lock.

    Run the test

    func main() {
        client := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	lock := NewLock(client, "counter", 30*time.Second)
        counter := 0
    	worker := func(i int) {
    		for {
    			id := fmt.Sprintf("worker%d", i)
    			ok, err := lock.Lock(id)
    			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
    			if !ok {
    				time.Sleep(100 * time.Millisecond)
    				continue
    			}
    			defer lock.UnLock(id)
    			counter++
    			log.Printf("worker %d, add counter %d", i, counter)
    			break
    		}
    	}
    	wg := sync.WaitGroup{}
    	for i := 1; i <= 5; i++ {
    		wg.Add(1)
    		id := i
    		go func() {
    			defer wg.Done()
    			worker(id)
    		}()
    	}
    	wg.Wait()
    }
    Copy after login

    The running result shows that the effect is similar to sync.Mutex

    2022/07/22 09: 58:09 worker 5 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:09 worker 5, add counter 1
    2022/07/22 09:58 :09 worker 4 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok : false, err:
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58 :10 worker 1 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 4, add counter 2
    2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 1, add counter 3
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false , err:
    2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 2, add counter 4
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err:
    2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err:
    2022/07/22 09:58:10 worker 3, add counter 5

    Special attention is that, in In a distributed Redis cluster, if an exception occurs (the master node is down), the availability of distributed locks may be reduced, which can be achieved through strong consistency components such as etcd and ZooKeeper.

    Distributed filter

    Suppose we want to develop a crawler service to crawl millions of web pages. How to judge whether a certain web page has been crawled? In addition to using the database and HashMap, we can use Bloom filter is used to do it. Relative to other methods, Bloom filters take up very little space and have very fast insertion and query times.

    The Bloom filter is used to determine whether an element is in the set, using BitSet

    • When inserting data, the value is hashed multiple times, and the BitSet corresponding position is 1

    • When querying, Hash is also performed multiple times to compare whether all bits are 1. If so, it exists.

    The Bloom filter has a certain misjudgment rate and is not suitable for precise query scenarios. In addition, deleting elements is not supported. It is usually used in scenarios such as URL deduplication, spam filtering, and cache breakdown prevention.

    In Redis, we can use the built-in BitSet implementation, and also use the atomicity of lua scripts to avoid multiple query data inconsistencies.

    const (
    	// 插入数据,调用setbit设置对应位
    	setScript = `
    for _, offset in ipairs(ARGV) do
    	redis.call("setbit", KEYS[1], offset, 1)
    end
    `
    	// 查询数据,如果所有位都为1返回true
    	getScript = `
    for _, offset in ipairs(ARGV) do
    	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
    		return false
    	end
    end
    return true
    `
    )
    type BloomFilter struct {
    	client *redis.Client
    	key    string // 存在redis中的key
    	bits   uint // BitSet的大小
    	maps   uint // Hash的次数
    }
    func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
    	client.Del(context.TODO(), key)
    	if maps == 0 {
    		maps = 14
    	}
    	return &BloomFilter{
    		key:    key,
    		client: client,
    		bits:   bits,
    		maps:   maps,
    	}
    }
    // 进行多次Hash, 得到位置列表
    func (f *BloomFilter) getLocations(data []byte) []uint {
    	locations := make([]uint, f.maps)
    	for i := 0; i < int(f.maps); i++ {
    		val := murmur3.Sum64(append(data, byte(i)))
    		locations[i] = uint(val) % f.bits
    	}
    	return locations
    }
    func (f *BloomFilter) Add(data []byte) error {
    	args := getArgs(f.getLocations(data))
    	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
    	if err != nil && err != redis.Nil {
    		return err
    	}
    	return nil
    }
    func (f *BloomFilter) Exists(data []byte) (bool, error) {
    	args := getArgs(f.getLocations(data))
    	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
    	if err != nil {
    		if err == redis.Nil {
    			return false, nil
    		}
    		return false, err
    	}
    	exists, ok := resp.(int64)
    	if !ok {
    		return false, nil
    	}
    	return exists == 1, nil
    }
    func getArgs(locations []uint) []string {
    	args := make([]string, 0)
    	for _, l := range locations {
    		args = append(args, strconv.FormatUint(uint64(l), 10))
    	}
    	return args
    }
    Copy after login

    Run the test

    func main() {
    	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
    	exists, err := bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	if err := bf.Add([]byte("test1")); err != nil {
    		log.Printf("add err: %v", err)
    	}
    	exists, err = bf.Exists([]byte("test1"))
    	log.Printf("exist %t, err %v", exists, err)
    	exists, err = bf.Exists([]byte("test2"))
    	log.Printf("exist %t, err %v", exists, err)
    // output
    // 2022/07/22 10:05:58 exist false, err <nil>
    // 2022/07/22 10:05:58 exist true, err <nil>
    // 2022/07/22 10:05:58 exist false, err <nil>
    }
    Copy after login

    Distributed rate limiter

    Token-based is provided in the golang.org/x/time/rate package Bucket current limiter, if you want to implement current limiting in a distributed environment, you can implement it based on the Redis Lua script.

    The main principle of the token bucket is as follows:

    • Assume that the capacity of a token bucket is burst, and tokens are placed in it at the rate of qps per second

    • Initially, the tokens are filled. If the tokens overflow, they will be discarded directly. When requesting tokens, if there are enough tokens in the bucket, it will be allowed, otherwise it will be rejected.

    • When burst==qps, strictly follow the qps flow limit; when burst>qps, a certain burst of traffic can be allowed

    这里主要参考了官方rate包的实现,将核心逻辑改为Lua实现。

    --- 相关Key
    --- limit rate key值,对应value为当前令牌数
    local limit_key = KEYS[1]
    --- 输入参数
    --[[
    qps: 每秒请求数;
    burst: 令牌桶容量;
    now: 当前Timestamp;
    cost: 请求令牌数;
    max_wait: 最大等待时间
    --]]
    local qps = tonumber(ARGV[1])
    local burst = tonumber(ARGV[2])
    local now = ARGV[3]
    local cost = tonumber(ARGV[4])
    local max_wait = tonumber(ARGV[5])
    --- 获取redis中的令牌数
    local tokens = redis.call("hget", limit_key, "token")
    if not tokens then
    	tokens = burst
    end
    --- 上次修改时间
    local last_time = redis.call("hget", limit_key, "last_time")
    if not last_time then
    	last_time = 0
    end
    --- 最新等待时间
    local last_event = redis.call("hget", limit_key, "last_event")
    if not last_event then
    	last_event = 0
    end
    --- 通过当前时间与上次修改时间的差值,qps计算出当前时间得令牌数
    local delta = math.max(0, now-last_time)
    local new_tokens = math.min(burst, delta * qps + tokens)
    new_tokens = new_tokens - cost --- 最新令牌数,减少请求令牌
    --- 如果最新令牌数小于0,计算需要等待的时间
    local wait_period = 0
    if new_tokens < 0 and qps > 0 then
    	wait_period = wait_period - new_tokens / qps
    end
    wait_period = math.ceil(wait_period)
    local time_act = now + wait_period --- 满足等待间隔的时间戳
    --- 允许请求有两种情况
    --- 当请求令牌数小于burst, 等待时间不超过最大等待时间,可以通过补充令牌满足请求
    --- qps为0时,只要最新令牌数不小于0即可
    local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
    --- 设置对应值
    if ok then
    	redis.call("set", limit_key, new_tokens)
    	redis.call("set", last_time_key, now)
    	redis.call("set", last_event_key, time_act)
    end
    --- 返回列表,{是否允许, 等待时间}
    return {ok, wait_period}
    Copy after login

    在Golang中的相关接口Allow、AllowN、Wait等都是通过调用reserveN实现

    // 调用lua脚本
    func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
    	// ...
    	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
    	if err != nil && err != redis.Nil {
    		return nil, err
    	}
    	//...
    	return &Reservation{
    		ok:        allow == 1,
    		lim:       lim,
    		tokens:    n,
    		timeToAct: now.Add(time.Duration(wait) * time.Second),
    	}, nil
    }
    Copy after login

    运行测试

    func main() {
    	rdb := redis.NewClient(&redis.Options{
    		Addr:     "localhost:6379",
    		Password: "123456",
    		DB:       0, // use default DB
    	})
    	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
    	if err != nil {
    		log.Fatal(err)
    	}
    	r.Reset()
    	for i := 0; i < 5; i++ {
    		err := r.Wait(context.TODO())
    		log.Printf("worker %d allowed: %v", i, err)
    	}
    }
    // output
    // 2022/07/22 12:50:31 worker 0 allowed: <nil>
    // 2022/07/22 12:50:31 worker 1 allowed: <nil>
    // 2022/07/22 12:50:32 worker 2 allowed: <nil>
    // 2022/07/22 12:50:33 worker 3 allowed: <nil>
    // 2022/07/22 12:50:34 worker 4 allowed: <nil>
    Copy after login

    前两个请求在burst内,直接可以获得,后面的请求按照qps的速率生成。

    其他

    Redis还可用于全局计数、去重以及发布订阅等不同情境。参考Redis官方提供的模块,可以通过加载这些模块实现过滤、限流等特性。

    The above is the detailed content of How to use Redis in Golang distributed applications. For more information, please follow other related articles on the PHP Chinese website!

    Statement of this Website
    The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

    Hot AI Tools

    Undresser.AI Undress

    Undresser.AI Undress

    AI-powered app for creating realistic nude photos

    AI Clothes Remover

    AI Clothes Remover

    Online AI tool for removing clothes from photos.

    Undress AI Tool

    Undress AI Tool

    Undress images for free

    Clothoff.io

    Clothoff.io

    AI clothes remover

    Video Face Swap

    Video Face Swap

    Swap faces in any video effortlessly with our completely free AI face swap tool!

    Hot Tools

    Notepad++7.3.1

    Notepad++7.3.1

    Easy-to-use and free code editor

    SublimeText3 Chinese version

    SublimeText3 Chinese version

    Chinese version, very easy to use

    Zend Studio 13.0.1

    Zend Studio 13.0.1

    Powerful PHP integrated development environment

    Dreamweaver CS6

    Dreamweaver CS6

    Visual web development tools

    SublimeText3 Mac version

    SublimeText3 Mac version

    God-level code editing software (SublimeText3)

    How to build the redis cluster mode How to build the redis cluster mode Apr 10, 2025 pm 10:15 PM

    Redis cluster mode deploys Redis instances to multiple servers through sharding, improving scalability and availability. The construction steps are as follows: Create odd Redis instances with different ports; Create 3 sentinel instances, monitor Redis instances and failover; configure sentinel configuration files, add monitoring Redis instance information and failover settings; configure Redis instance configuration files, enable cluster mode and specify the cluster information file path; create nodes.conf file, containing information of each Redis instance; start the cluster, execute the create command to create a cluster and specify the number of replicas; log in to the cluster to execute the CLUSTER INFO command to verify the cluster status; make

    How to clear redis data How to clear redis data Apr 10, 2025 pm 10:06 PM

    How to clear Redis data: Use the FLUSHALL command to clear all key values. Use the FLUSHDB command to clear the key value of the currently selected database. Use SELECT to switch databases, and then use FLUSHDB to clear multiple databases. Use the DEL command to delete a specific key. Use the redis-cli tool to clear the data.

    How to read redis queue How to read redis queue Apr 10, 2025 pm 10:12 PM

    To read a queue from Redis, you need to get the queue name, read the elements using the LPOP command, and process the empty queue. The specific steps are as follows: Get the queue name: name it with the prefix of "queue:" such as "queue:my-queue". Use the LPOP command: Eject the element from the head of the queue and return its value, such as LPOP queue:my-queue. Processing empty queues: If the queue is empty, LPOP returns nil, and you can check whether the queue exists before reading the element.

    How to use the redis command line How to use the redis command line Apr 10, 2025 pm 10:18 PM

    Use the Redis command line tool (redis-cli) to manage and operate Redis through the following steps: Connect to the server, specify the address and port. Send commands to the server using the command name and parameters. Use the HELP command to view help information for a specific command. Use the QUIT command to exit the command line tool.

    C   and Golang: When Performance is Crucial C and Golang: When Performance is Crucial Apr 13, 2025 am 12:11 AM

    C is more suitable for scenarios where direct control of hardware resources and high performance optimization is required, while Golang is more suitable for scenarios where rapid development and high concurrency processing are required. 1.C's advantage lies in its close to hardware characteristics and high optimization capabilities, which are suitable for high-performance needs such as game development. 2.Golang's advantage lies in its concise syntax and natural concurrency support, which is suitable for high concurrency service development.

    PostgreSQL performance optimization under Debian PostgreSQL performance optimization under Debian Apr 12, 2025 pm 08:18 PM

    To improve the performance of PostgreSQL database in Debian systems, it is necessary to comprehensively consider hardware, configuration, indexing, query and other aspects. The following strategies can effectively optimize database performance: 1. Hardware resource optimization memory expansion: Adequate memory is crucial to cache data and indexes. High-speed storage: Using SSD SSD drives can significantly improve I/O performance. Multi-core processor: Make full use of multi-core processors to implement parallel query processing. 2. Database parameter tuning shared_buffers: According to the system memory size setting, it is recommended to set it to 25%-40% of system memory. work_mem: Controls the memory of sorting and hashing operations, usually set to 64MB to 256M

    How to set the redis expiration policy How to set the redis expiration policy Apr 10, 2025 pm 10:03 PM

    There are two types of Redis data expiration strategies: periodic deletion: periodic scan to delete the expired key, which can be set through expired-time-cap-remove-count and expired-time-cap-remove-delay parameters. Lazy Deletion: Check for deletion expired keys only when keys are read or written. They can be set through lazyfree-lazy-eviction, lazyfree-lazy-expire, lazyfree-lazy-user-del parameters.

    How to use redis cluster zset How to use redis cluster zset Apr 10, 2025 pm 10:09 PM

    Use of zset in Redis cluster: zset is an ordered collection that associates elements with scores. Sharding strategy: a. Hash sharding: Distribute the hash value according to the zset key. b. Range sharding: divide into ranges according to element scores, and assign each range to different nodes. Read and write operations: a. Read operations: If the zset key belongs to the shard of the current node, it will be processed locally; otherwise, it will be routed to the corresponding shard. b. Write operation: Always routed to shards holding the zset key.

    See all articles