WEB应用

APISIX插件开发之精细化限速插件

Jager · 11月3日 · 2021年 · · 145次已读

一、需求背景

APISIX 当前版本(2.10.1)请求频率限制相关插件有 limit-count、limit-req、limit-conn 三种。都只能支持给固定的 key 来设置一个统一的限速,比如在 service 或 route 如下添加 limit-count 插件:

"limit-count": {
      "count": 10,
      "key": "consumer_name",
      "policy": "local",
      "rejected_code": 503,
      "time_window": 1
}

那所有 consumer 都是一样的限速阈值,每秒 10 次。

再比如,在 Consumer 侧给指定的 Consumer 如下添加 limit-count 插件:

"limit-count": {
      "count": 10,
      "key": "service_id",
      "policy": "local",
      "rejected_code": 503,
      "time_window": 1
}

那这个 Consumer 访问任意 service 都是一样的限速阈值。

在实际业务场景中,这个限速还太笼统,达不到业务要求。比如,业务 A 要求给 ConsumerA 限制 1 分钟能访问 100 次,给 ConsumerB 1 分钟限制能访问 1000 次,这个限速可以在 Consumer 侧给 A、B 分别设置限速,但是当 ConsumerA 和 ConsumerB 还需要访问业务 B,且业务 B 又有不同限速需求的时候,当前插件就玩不转了,除非客户端针对每个业务都要使用不同的 Consumer,那这个就太复杂了。

为了满足这个需求,我们一开始也与 Apache APISIX 社区技术负责人做了探讨,不过社区目前可能正在进行其他功能的排期,所以暂时没进行相关操作,感兴趣的朋友可以先看下issue

APISIX 官方最新回应:所以会在 limit-count 插件基础上进行改造,重新设计了一个限速机制,能够针对不同的服务给不同的 Consumer 设置差异化的访问限速,满足生产环境更精细化的限速需求【查看原文】。

二、解决方案

最后,我们自己在 limit-count 插件基础上进行改造,重新设计一个限速机制,能够针对不同的服务给不同的 Consumer 设置差异化的访问限速,满足生产环境更精细化的限速需求。

方案原理:将 limit-count 配置插入一个 table,在 table 里面支持定义更复杂的 key 和阈值,具体插件的配置 schema 如下:

{
    "scope": "route_id",              # 标明插件添加位置,支持 route_id 和 service_id
    "default_count": 1000,            # 设置默认的限速阈值
    "default_time_window": 60,        # 设置默认的时间窗口
    "key": "consumer_name",           # 设置要限速的客户端对象,支持 ["remote_addr", "server_addr", "http_x_real_ip", "http_x_forwarded_for", "consumer_name"]
    "map": {                          # 给每个限速对象分别设置不同的限速阈值和时间窗口
        "ConsumerA": {
            "count": 300,
            "time_window": 60
        },
        "ConsumerB": {
            "count": 300,
            "time_window": 60
        },
        "ConsumerC": {
            "count": 300,
            "time_window": 60
        }
    },
    "policy": "redis",
    "error_interrupt": false,        # 新增特性:在连接 redis 超时或其他异常错误时,是否中断用户请求,false 不中断,true 中断,默认不中断
    "redis_database": 0,
    "redis_host": "127.0.0.1",
    "redis_password": "",
    "redis_port": 6379,
    "redis_timeout": 1000,
    "rejected_code": 429              # 官方的限速插件超过限制访问返回的 503,并不友好,这里改成更加直白的 429 Too Many Requests
}

新增参数说明

参数类型可选项有效值默认值功能
scopeString必选route_id 或 service_idroute_id表明插件加载的位置,将和<key>配置拼接为 redis 的限速 key
default_countInteger必选>0全局默认的限速阈值
default_time_windowInteger必选>0全局默认的时间窗口
mapTable可选{"ConsumerA": {"count": 300, "time_window": 60}}指定具体用户的限速阈值和时间窗口
error_interruptBoolean可选false 或 truefalse在连接 redis 超时或其他异常错误时,是否中断用户请求
新增参数说明,其他参数可参考官方 limit-count 插件

在原版 limit-count 插件中,若出现 redis 连接超时等异常,请求将会返回 500,这个其实不太合理。从上面参数可以看到,我们这边额外设计了一个是否中断服务的参数:error_interrupt,因为大部分场景下不能因为网关自身问题影响了用户请求,就算 redis 超时也就卡顿 1S 左右,不至于返回 500。当然,我们也人性化的设计了这个开关,随业务自己选择,体现了最大的可配置性。

三、插件代码

--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements.  See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License.  You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local limit_local_new = require("resty.limit.count").new
local core = require("apisix.core")
local plugin_name = "limit-count-by-client"
local limit_redis_cluster_new
local limit_redis_new
do
    local redis_src = "apisix.plugins.limit-count.limit-count-redis"
    limit_redis_new = require(redis_src).new

    local cluster_src = "apisix.plugins.limit-count.limit-count-redis-cluster"
    limit_redis_cluster_new = require(cluster_src).new
end
local lrucache = core.lrucache.new({
    type = 'plugin', serial_creating = true,
})


local schema = {
    type = "object",
    properties = {
        key = {
            type = "string",
            enum = {"remote_addr", "server_addr", "http_x_real_ip",
                    "http_x_forwarded_for", "consumer_name"},
            default = "remote_addr",
        },
        default_count = {type = "integer", exclusiveMinimum = 0},
        default_time_window = {type = "integer",  exclusiveMinimum = 0},
        scope = {
            type = "string",
            enum = {"route_id", "service_id"},
            default = "route_id",
        },
        map = {
            type = "object",
            items = {
                type = "object",
                count = {type = "integer", exclusiveMinimum = 0},
                time_window = {type = "integer",  exclusiveMinimum = 0},
            }
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 429
        },
        error_interrupt = {type = "boolean", default = false},
        policy = {
            type = "string",
            enum = {"local", "redis", "redis-cluster"},
            default = "local",
        }
    },
    dependencies = {
        policy = {
            oneOf = {
                {
                    properties = {
                        policy = {
                            enum = {"local"},
                        },
                    },
                },
                {
                    properties = {
                        policy = {
                            enum = {"redis"},
                        },
                        redis_host = {
                            type = "string", minLength = 2
                        },
                        redis_port = {
                            type = "integer", minimum = 1, default = 6379,
                        },
                        redis_password = {
                            type = "string", minLength = 0,
                        },
                        redis_database = {
                            type = "integer", minimum = 0, default = 0,
                        },
                        redis_timeout = {
                            type = "integer", minimum = 1, default = 2000,
                        },
                    },
                    required = {"redis_host"},
                },
                {
                    properties = {
                        policy = {
                            enum = {"redis-cluster"},
                        },
                        redis_cluster_nodes = {
                            type = "array",
                            minItems = 2,
                            items = {
                                type = "string", minLength = 2, maxLength = 100
                            },
                        },
                        redis_password = {
                            type = "string", minLength = 0,
                        },
                        redis_timeout = {
                            type = "integer", minimum = 1, default = 1000,
                        },
                        redis_cluster_name = {
                            type = "string",
                        },
                    },
                    required = {"redis_cluster_nodes", "redis_cluster_name"},
                }
            }
        }
    }
}


local _M = {
    version = 0.4,
    priority = 1002,
    name = plugin_name,
    schema = schema,
}


function _M.check_schema(conf)
    local ok, err = core.schema.check(schema, conf)
    if not ok then
        return false, err
    end

    return true
end


local function create_limit_obj(conf, ctx)
    core.log.info("create new limit-count plugin instance")
    
    local req_key = ctx.var[conf.key]
    local item_count = 0
    local item_time_window = 0
    if conf.map[req_key] ~= nil then
        item_count = conf.map[req_key].count
        item_time_window = conf.map[req_key].time_window
    else
        item_count = conf.default_count
        item_time_window = conf.default_time_window

    end

    if not conf.policy or conf.policy == "local" then
        return limit_local_new("plugin-" .. plugin_name, item_count,
                               item_time_window)
    end

    if conf.policy == "redis" then
        return limit_redis_new("plugin-" .. plugin_name,
                               item_count, item_time_window, conf)
    end

    if conf.policy == "redis-cluster" then
        return limit_redis_cluster_new("plugin-" .. plugin_name, item_count,
                                       item_time_window, conf)
    end

    return nil
end


function _M.access(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = core.lrucache.plugin_ctx(lrucache, ctx, conf.policy, create_limit_obj, conf, ctx)

    if lim then
        local req_key = ctx.var[conf.key]
        local limit_key = req_key .. conf.scope
        local key = (limit_key or "") .. ctx.conf_type .. ctx.conf_version
        core.log.info("limit key: ", key)

        local delay, remaining = lim:incoming(key, true)
        if not delay then
            local err = remaining
            if err == "rejected" then
                return conf.rejected_code
            end

            core.log.error("failed to limit count: ", err)
            if conf.error_interrupt then
                return 500, {error_msg = "failed to limit count, please contact the administrator: " .. err}
            end
        end
        local item_count = 0
        local item_time_window = 0
        if conf.map[req_key] ~= nil then
            item_count = conf.map[req_key].count
        else
            item_count = conf.default_count

        end
        core.response.set_header("X-RateLimit-Limit", item_count,
                                "X-RateLimit-Remaining", remaining)
    else
        core.log.error("failed to fetch limit.count object: ", err)
        if conf.error_interrupt then
            return 500, {error_msg = "failed to limit count, please contact the administrator: " .. err}
        end
    end
end


return _M

四、启用方法

需要注意的是,这个插件改造后只能加到 Service 或 Router 中,而不能加到 Consumer 位置,所以取名叫 limit-count-by-client。大家在使用时一定要注意应用位置。

将插件代码保存为 limit-count-by-client.lua,拷贝到apisix/plugins,然后在 config.yaml 插件位置启用,如下,包括 2 个配置:

# 前面略...
nginx_config:
  http:
    lua_shared_dicts:
      plugin-limit-count-by-client: 10m # 插件的 policy 使用 local 模式的时候,需要用到共享内存

# 内容略...

plugins:
  - # 内容略..
  - limit-count-by-client

## 后面内容略...

具体配置,这里贴一个结合 HMAC 认证插件,实现对具体用户进行限频的路由配置,仅供参考:

{
    "uris": [
        "/hello"
    ],
    "plugins": {
        "hmac-auth": {
            "disable": false
        },
        "limit-count-by-client": {
            "default_count": 1000,        
            "default_time_window": 60,
            "key": "consumer_name", 
            "map": {
                "consumer_A": {
                    "count": 1000,
                    "time_window": 60
                },
                "consumer_B": { 
                    "count": 500,
                    "time_window": 60
                }
            },
            "policy": "local",
            "rejected_code": 429, 
            "scope": "route_id" 
        }
    },
    "service_id": "service_foo",
    "status": 1
}

至于需要在官方 dashboard 启用,则需要更新一下 dashboard 的 schema.json,这里就不细说了。

0 条回应
  1. 韩涛博客 2021-11-5 · 15:28

    大佬好久没更新了,过来看看