首页>国内 > 正文

自己动手用Go语言写三种实用队列

2023-03-28 09:18:09来源:今日头条

背景

我们在使用kubernetes的客户端k8s.io/client-go 进行开发的时候,比如写个CRD的operator, 经常会用到队列这种数据结构。并且很多时候,我们在做服务器端后台开发的时候,需要用到任务队列,进行任务的异步处理与任务管理。k8s.io/client-go中的workqueue包里面提供了三种常用的队列。今天给大家演示下三种队列的使用方法与相应的使用场景,大家在工作中可以直接copy这些代码,加速自己项目的开发。这三个队列的关系如下图所示:

k8s队列关系

队列

type (基础队列)


(资料图)

下面给出了数据结构,其中dirty,processing两个集合分别存储的是需要处理的任务和正在处理的任务,queue[]t按序存放的是所有添加的任务。这三个属性的关系很有意思,dirty用于快速判断queue中是否存在相应的任务,这样有以下两个用处:

1. 在Add的时候,可以防止重复添加。(代码查看:​https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120​)。

2.由于在任务完成后要调用Done方法,把任务从processing集合中删除掉,那么如果在完成前(即调用Done方法之前),把任务再次添加进dirty集合,那么在完成调用Done方法的时候,会再次把任务重新添加进queue队列,进行处理(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L180)。

而processing集合存放的是当前正在执行的任务,它的作用有以下几点。

1.在Add的时候,如果任务正在处理,就直接返回。这样在任务调用Done的时候,由于dirty集合中有,会把这个任务再次放在队列的尾部。(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L120)。

2.用于判断队列中是否还有任务正在执行,这样在shutdown的时候,可以有的放矢。(代码查看:https://github.com/kubernetes/client-go/blob/master/util/workqueue/queue.go#L221)。

整个queue队列工作模式是,你的工作线程通过Get方法从队列中获取任务(如果队列长度为0,需要q.cond.Wait()),然后处理任务(你自己的业务逻辑),处理完后调用Done方法,表明任务完成了,同时调用q.cond.Signal(),唤醒等待的工作线程​。

type Type struct {// queue defines the order in which we will work on items. Every// element of queue should be in the dirty set and not in the// processing set.queue []t// dirty defines all of the items that need to be processed.dirty set// Things that are currently being processed are in the processing set.// These things may be simultaneously in the dirty set. When we finish// processing something and remove it from this set, we"ll check if// it"s in the dirty set, and if so, add it to the queue.processing setcond *sync.CondshuttingDown booldrain boolmetrics queueMetricsunfinishedWorkUpdatePeriod time.Durationclock clock.WithTicker}
delaying_queue(延迟队列)

这个延迟队列继承了上面的基础队列,同时提供了addAfter函数,实现根据延迟时间把元素增加进延迟队列。其中的waitForPriorityQueue实现了一个用于waitFor元素的优先级队列,其实就是一个最小堆。

func (q *delayingType) AddAfter(item interface{}, duration time.Duration)这个函数(代码https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L162)。

当duration为0,就直接通过q.add放到它继承的基础执行队列里面,如果有延迟值,就放在q.waitingForAddCh通道里面,等待readyAt时机成熟,再放到队列中。那这个通道里面的元素当readyAt后,如何加入到基础执行队列?下面的截图给出了答案,便是启动的ret.waitingLoop协程。这个方法的具体代码(https://github.com/kubernetes/client-go/blob/master/util/workqueue/delaying_queue.go#L189),具体思路就是利用了上面的waitForPriorityQueue最小堆,还有等待加入队列通道q.waitingForAddCh,大家可以看看给出的具体代码,大致的思想就会了解。

创建延迟队列

// delayingType wraps an Interface and provides delayed re-enquingtype delayingType struct {Interface// clock tracks time for delayed firingclock clock.Clock// stopCh lets us signal a shutdown to the waiting loopstopCh chan struct{}// stopOnce guarantees we only signal shutdown a single timestopOnce sync.Once// heartbeat ensures we wait no more than maxWait before firingheartbeat clock.Ticker// waitingForAddCh is a buffered channel that feeds waitingForAddwaitingForAddCh chan *waitFor// metrics counts the number of retriesmetrics retryMetrics}// waitFor holds the data to add and the time it should be addedtype waitFor struct {data treadyAt time.Time// index in the priority queue (heap)index int}type waitForPriorityQueue []*waitFor

元素添加逻辑

下面是测试代码,大家可以看看如何创建延迟队列,还有添加任务。

下面的代码,在延迟队列里面增加了一个字符串"foo",延迟执行的时间是50毫秒。然后差不多50毫秒后,延迟队列长度为0fakeClock := testingclock.NewFakeClock(time.Now())q := NewDelayingQueueWithCustomClock(fakeClock, "")first := "foo"q.AddAfter(first, 50*time.Millisecond)if err := waitForWaitingQueueToFill(q); err != nil {t.Fatalf("unexpected err: %v", err)}if q.Len() != 0 {t.Errorf("should not have added")}fakeClock.Step(60 * time.Millisecond)if err := waitForAdded(q, 1); err != nil {t.Errorf("should have added")}item, _ := q.Get()q.Done(item)// step past the next heartbeatfakeClock.Step(10 * time.Second)err := wait.Poll(1*time.Millisecond, 30*time.Millisecond, func() (done bool, err error) {if q.Len() > 0 {return false, fmt.Errorf("added to queue")}return false, nil})if err != wait.ErrWaitTimeout {t.Errorf("expected timeout, got: %v", err)}if q.Len() != 0 {t.Errorf("should not have added")}func waitForAdded(q DelayingInterface, depth int) error {return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {if q.Len() == depth {return true, nil}return false, nil})}func waitForWaitingQueueToFill(q DelayingInterface) error {return wait.Poll(1*time.Millisecond, 10*time.Second, func() (done bool, err error) {if len(q.(*delayingType).waitingForAddCh) == 0 {return true, nil}return false, nil})}
rate_limiting_queue(限速队列)​

限速队列是利用延迟队列的延迟特性,延迟某个元素的插入FIFO队列的时间,达到限速的目的

workqueue包下面的rateLimiter有多种,下面的代码显示的是ItemExponentialFailureRateLimiter(排队指数算法)。

type ItemExponentialFailureRateLimiter struct {failuresLock sync.Mutexfailures map[interface{}]intbaseDelay time.DurationmaxDelay time.Duration}

它有个基础延迟时间,加入到延迟队列后,被执行的延迟时间的计算公式是如下所示。另外它还有个最大延迟时间的参数。

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {r.failuresLock.Lock()defer r.failuresLock.Unlock()exp := r.failures[item]r.failures[item] = r.failures[item] + 1// The backoff is capped such that "calculated" value never overflows.backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))if backoff > math.MaxInt64 {return r.maxDelay}calculated := time.Duration(backoff)if calculated > r.maxDelay {return r.maxDelay}return calculated}
下面的测试代码,显示的是创建了一个1毫秒基础延迟,最大1秒的延迟队列。它在延迟队列中增加了一个"one"字符串,由于是第一次添加,所以基于上面的公式它的延迟时间是1毫秒,再次增加"one"后,它的延迟时间是2*1毫秒,即2毫秒,对于增加的字符串"two"也是一样,当我们调用forget方法后ItemExponentialFailureRateLimiter中的计数器会重置,再次增加"one"字符串后,它的延迟时间又变成了1毫秒limiter := NewItemExponentialFailureRateLimiter(1*time.Millisecond, 1*time.Second)queue := NewRateLimitingQueue(limiter).(*rateLimitingType)fakeClock := testingclock.NewFakeClock(time.Now())delayingQueue := &delayingType{Interface: New(),clock: fakeClock,heartbeat: fakeClock.NewTicker(maxWait),stopCh: make(chan struct{}),waitingForAddCh: make(chan *waitFor, 1000),metrics: newRetryMetrics(""),}queue.DelayingInterface = delayingQueuequeue.AddRateLimited("one")waitEntry := <-delayingQueue.waitingForAddChif e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {t.Errorf("expected %v, got %v", e, a)}queue.AddRateLimited("one")waitEntry = <-delayingQueue.waitingForAddChif e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {t.Errorf("expected %v, got %v", e, a)}if e, a := 2, queue.NumRequeues("one"); e != a {t.Errorf("expected %v, got %v", e, a)}queue.AddRateLimited("two")waitEntry = <-delayingQueue.waitingForAddChif e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {t.Errorf("expected %v, got %v", e, a)}queue.AddRateLimited("two")waitEntry = <-delayingQueue.waitingForAddChif e, a := 2*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {t.Errorf("expected %v, got %v", e, a)}queue.Forget("one")if e, a := 0, queue.NumRequeues("one"); e != a {t.Errorf("expected %v, got %v", e, a)}queue.AddRateLimited("one")waitEntry = <-delayingQueue.waitingForAddChif e, a := 1*time.Millisecond, waitEntry.readyAt.Sub(fakeClock.Now()); e != a {t.Errorf("expected %v, got %v", e, a)}

此外这个包下面还有ItemFastSlowRateLimiter,BucketRateLimiter等。具体的大家可以查看default_rate_limiters.go(代码:https://github.com/kubernetes/client-go/blob/master/util/workqueue/default_rate_limiters.go)。

应用场景

延迟队列场景:

1、订单延迟支付关闭

常见的打车软件都会有匹配司机,这个可以用延迟队列来实现;处理已提交订单超过30分钟未付款失效的订单,延迟队列可以很好的解决;又或者注册了超过30天的用户,发短信撩动等。

2、定时任务调度

比如使用DelayQueue保存当天将会执行的任务和执行时间,或是需要设置一个倒计时,倒计时结束后更新数据库中某个表状态

限速队列场景:

比如限制数据队列的写入速度。

关键词:

相关新闻

Copyright 2015-2020   三好网  版权所有 联系邮箱:435 22 640@qq.com  备案号: 京ICP备2022022245号-21