Skip to content

Commit

Permalink
add limit-conn redis and redis-cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
theweakgod committed Jan 24, 2024
1 parent 8fee37a commit ec18565
Show file tree
Hide file tree
Showing 4 changed files with 443 additions and 5 deletions.
94 changes: 92 additions & 2 deletions apisix/plugins/limit-conn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,72 @@ local limit_conn = require("apisix.plugins.limit-conn.init")


local plugin_name = "limit-conn"

local redis_type_to_additional_properties = {
redis = {
properties = {
redis_host = {
type = "string", minLength = 2
},
redis_port = {
type = "integer", minimum = 1, default = 6379,
},
redis_username = {
type = "string", minLength = 1,
},
redis_password = {
type = "string", minLength = 0,
},
redis_database = {
type = "integer", minimum = 0, default = 0,
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
redis_ssl = {
type = "boolean", default = false,
},
redis_ssl_verify = {
type = "boolean", default = false,
},
},
required = {"redis_host"},
},
["redis-cluster"] = {
properties = {
redis_cluster_nodes = {
type = "array",
minItems = 2,
items = {
type = "string", minLength = 2, maxLength = 100
},
},
redis_password = {
type = "string", minLength = 0,
},
redis_timeout = {
type = "integer", minimum = 1, default = 1000,
},
redis_cluster_name = {
type = "string",
},
redis_cluster_ssl = {
type = "boolean", default = false,
},
redis_cluster_ssl_verify = {
type = "boolean", default = false,
},
dict_name = {
type = "string", minLength = 1,
},
},
required = {"redis_cluster_nodes", "redis_cluster_name", "dict_name"},
},
}
local schema = {
type = "object",
properties = {
conn = {type = "integer", exclusiveMinimum = 0},
conn = {type = "integer", exclusiveMinimum = 0}, -- limit.conn max
burst = {type = "integer", minimum = 0},
default_conn_delay = {type = "number", exclusiveMinimum = 0},
only_use_default_delay = {type = "boolean", default = false},
Expand All @@ -31,6 +93,16 @@ local schema = {
enum = {"var", "var_combination"},
default = "var",
},
redis_type = {
type = "string",
enum = {"redis", "redis-cluster"},
default = "redis",
},
counter_type = {
type = "string",
enum = {"redis", "shared-dict"},
default = "shared-dict",
},
rejected_code = {
type = "integer", minimum = 200, maximum = 599, default = 503
},
Expand All @@ -39,7 +111,25 @@ local schema = {
},
allow_degradation = {type = "boolean", default = false}
},
required = {"conn", "burst", "default_conn_delay", "key"}
required = {"conn", "burst", "default_conn_delay", "key"},
["if"] = {
properties = {
redis_type = {
enum = {"redis"},
},
},
},
["then"] = redis_type_to_additional_properties.redis,
["else"] = {
["if"] = {
properties = {
redis_type = {
enum = {"redis-cluster"},
},
},
},
["then"] = redis_type_to_additional_properties["redis-cluster"],
}
}

local _M = {
Expand Down
35 changes: 32 additions & 3 deletions apisix/plugins/limit-conn/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ if ngx.config.subsystem == "stream" then
shdict_name = shdict_name .. "-stream"
end

local redis_single_new
local redis_cluster_new
do
local redis_src = "apisix.plugins.limit-conn.limit-conn-redis-single"
redis_single_new = require(redis_src).new

local cluster_src = "apisix.plugins.limit-conn.limit-conn-redis-cluster"
redis_cluster_new = require(cluster_src).new
end


local lrucache = core.lrucache.new({
type = "plugin",
Expand All @@ -31,9 +41,28 @@ local _M = {}


local function create_limit_obj(conf)
core.log.info("create new limit-conn plugin instance")
return limit_conn_new(shdict_name, conf.conn, conf.burst,
conf.default_conn_delay)
if conf.counter_type == "shared-dict" then
core.log.info("create new limit-conn plugin instance")
return limit_conn_new(shdict_name, conf.conn, conf.burst,
conf.default_conn_delay)
elseif conf.counter_type == "redis" then

core.log.info("create new limit-conn redis plugin instance")

if conf.redis_type == "redis" then
return redis_single_new("plugin-limit-conn", conf, conf.conn, conf.burst,
conf.default_conn_delay)
end

if conf.redis_type == "redis-cluster" then
return redis_cluster_new("plugin-limit-conn", conf, conf.conn, conf.burst,
conf.default_conn_delay)
end

return nil, "redis_type enum not match"
else
return nil, "counter_type enum not match"
end
end


Expand Down
151 changes: 151 additions & 0 deletions apisix/plugins/limit-conn/limit-conn-redis-cluster.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
local rediscluster = require("resty.rediscluster")
local core = require("apisix.core")
local assert = assert
local setmetatable = setmetatable
local math = require "math"
local floor = math.floor
local ipairs = ipairs

local _M = {version = 0.1}


local mt = {
__index = _M
}


local function new_redis_cluster(conf)
local config = {
name = conf.redis_cluster_name,
serv_list = {},
read_timeout = conf.redis_timeout,
auth = conf.redis_password,
dict_name = conf.dict_name,
connect_opts = {
ssl = conf.redis_cluster_ssl,
ssl_verify = conf.redis_cluster_ssl_verify,
}
}

for i, conf_item in ipairs(conf.redis_cluster_nodes) do
local host, port, err = core.utils.parse_addr(conf_item)
if err then
return nil, "failed to parse address: " .. conf_item
.. " err: " .. err
end

config.serv_list[i] = {ip = host, port = port}
end

local red_cli, err = rediscluster:new(config)
if not red_cli then
return nil, "failed to new redis cluster: " .. err
end

return red_cli
end


function _M.new(plugin_name, conf, max, burst, default_conn_delay)
local self = {
conf = conf,
plugin_name = plugin_name,
burst = burst,
max = max + 0, -- just to ensure the param is good
unit_delay = default_conn_delay,
}
return setmetatable(self, mt)
end


function _M.incoming(self, key, commit)
local max = self.max

-- init redis
local conf = self.conf
local red, err = new_redis_cluster(conf)
if not red then
return red, err
end

self.committed = false

local conn, err
if commit then
conn, err = red:incrby(key, 1)
if not conn then
return nil, err
end

if conn > max + self.burst then
conn, err = red:incrby(key, -1)
if not conn then
return nil, err
end
return nil, "rejected"
end
self.committed = true

else
conn_from_red, err = red:get(key)
if err then
return nil, err
end
conn = (conn_from_red or 0) + 1
end

if conn > max then
-- make the excessive connections wait
return self.unit_delay * floor((conn - 1) / max), conn
end

-- we return a 0 delay by default
return 0, conn
end


function _M.is_committed(self)
return self.committed
end


local function leaving_thread(premature, self, key, req_latency)

-- init redis
local conf = self.conf
local red, err = new_redis_cluster(conf)
if not red then
return red, err
end

local conn, err = red:incrby(key, -1)
if not conn then
return nil, err
end

if req_latency then
local unit_delay = self.unit_delay
self.unit_delay = (req_latency + unit_delay) / 2
end

return conn
end


function _M.leaving(self, key, req_latency)
assert(key)

-- log_by_lua can't use cosocket
local ok, err = ngx.timer.at(0, leaving_thread, self, key, req_latency)
if not ok then
core.log.error("failed to create timer: ", err)
return nil, err
end

return ok

end



return _M
Loading

0 comments on commit ec18565

Please sign in to comment.