golang API 请求队列

概要

在调用第三方 API 的时候, 基本都有访问限速的限制条件. 第三方的 API 有多个的时候, 就不太好控制访问速度, 常常会导致 HTTP 429(Too Many Requests) 然后就会有一段时间的禁止访问.

为了应对这种限速的情况, 通过一个简单的请求队列来控制访问的速度, 之后基本没遇到过 HTTP 429 了.

实现思路

首先, 每个请求包装成一个 RequestParam 的 struct, 其中包含请求的地址,类型,参数以及 response 的 channel.

发送请求的时候, 只要将 RequestParam 放入请求队列中即可, 请求完成后, 将 response 放入对应的 channel 中.

整个代码实现很简单:

 1 package util 2 3 import ( 4 "fmt" 5 6 apiclient "gitee.com/wangyubin/gutils/api_client" 7 "gitee.com/wangyubin/gutils/logger" 8 ) 9 10 // request 包含的内容11 type RequestParam struct {12 Api string13 Method string14 JsonReq interface{}15 Resp chan []byte16 }17 18 // 请求队列, 本质是一个channel19 type RequestQueue struct {20 Queue chan RequestParam21 }22 23 var queue *RequestQueue24 25 // 获取队列26 func GetQueue() *RequestQueue {27 return queue28 }29 30 // 初始化队列31 func InitRequestQueue(size int) {32 queue = &RequestQueue{33 Queue: make(chan RequestParam, size),34 }35 }36 37 // 将请求放入队列38 func (rq *RequestQueue) Enqueue(p RequestParam) {39 rq.Queue <- p40 }41 42 // 请求队列服务, 一直等待接受和处理请求43 func (rq *RequestQueue) Run() {44 lg := logger.GetLogger()45 for p := range rq.Queue {46 var resp []byte47 var err error48 switch p.Method {49 case "GET":50 resp, err = apiclient.GetJson(p.Api, p.JsonReq)51 case "POST":52 resp, err = apiclient.PostJson(p.Api, p.JsonReq)53 default:54 err = fmt.Errorf("Wrong type of METHOD(%s)\n", p.Method)55 }56 57 if err != nil {58 lg.Err(err).Msg("access api error: " + p.Api)59 continue60 }61 if p.Resp != nil {62 p.Resp <- resp63 close(p.Resp)64 }65 }66 67 lg.Info().Msg("request queue finished!")68 }

这里的请求是用了我自己封装的 apiclient, 可以根据实际情况替换.

在我的应用场景里, 只要 api 顺序访问就不会出现 HTTP 429 了, 如果这样觉得速度太快的的话, 可以尝试在 Run() 函数中加入一些时间间隔.

1 func (rq *RequestQueue) Run() {2 lg := logger.GetLogger()3 for p := range rq.Queue {4 time.Sleep(1 * time.Second)5 // ... 省略的代码 ...6 }7 8 lg.Info().Msg("request queue finished!")9 }

使用方法

使用很简单, 首先启动, 然后每个调用的地方将 RequestParam 放入队列并等待 response 即可.

启动队列服务

1 func main() {2 // init request queue and start queue service3 util.InitRequestQueue(100)4 queue := util.GetQueue()5 defer close(queue.Queue)6 go queue.Run()7 8 // 其他启动代码9 }

使用队列服务

 1 func Request(param1 string, param2 int) error { 2 api := "http://xxxx.com" 3 api = fmt.Sprintf("%s?period=%s&size=%d", api, param1, param2) 4 5 queue := util.GetQueue() 6 param := util.RequestParam{ 7 Api: api, 8 Method: "GET", 9 Resp: make(chan []byte, 1),10 }11 queue.Enqueue(param)12 13 var respData struct {14 Status string `json:"status"`15 Data []model.Data `json:"data"`16 }17 var err error18 for resp := range param.Resp {19 err = json.Unmarshal(resp, &respData)20 if err != nil {21 lg.Err(err).Msg("unmarshal json error")22 return err23 }24 }25 26 fmt.Println(respData) 27 return err28 }

相关文章