Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): meta rpc call handshake #13887

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1bd95c0
call meta.v1
chronolaw Nov 18, 2024
4cef43f
_register_meta_call
chronolaw Nov 18, 2024
9ced349
_meta_call
chronolaw Nov 18, 2024
554d4a8
retry meta call
chronolaw Nov 18, 2024
f112e2d
self:_register_meta_call
chronolaw Nov 18, 2024
c7a7cdd
meta.handshake
chronolaw Nov 18, 2024
f99fb26
log err
chronolaw Nov 18, 2024
b56439b
no handshake
chronolaw Nov 18, 2024
af110ee
ngx.timer
chronolaw Nov 18, 2024
c42e814
kong.rpc:init
chronolaw Nov 19, 2024
cbcb11a
fix _meta_call
chronolaw Nov 19, 2024
3c64207
clean
chronolaw Nov 19, 2024
b25f4a3
fix 14-dp_privileged_agent_spec.lua
chronolaw Nov 19, 2024
53d7f0a
check rpc_protocol
chronolaw Nov 19, 2024
37af7ed
ngx.timer
chronolaw Nov 19, 2024
330b34a
check meta_rpc_call
chronolaw Nov 19, 2024
5d12b8b
more in meta call
chronolaw Nov 19, 2024
dbeaf8b
only kong.meta.v1
chronolaw Nov 19, 2024
8dc1686
_meta_call
chronolaw Nov 19, 2024
c75a0fd
no retry
chronolaw Nov 19, 2024
639a4d8
meta_v1_supported
chronolaw Nov 19, 2024
b1e5f89
no timer for meta call
chronolaw Nov 19, 2024
8b6b2ef
no kong.rpc:init
chronolaw Nov 19, 2024
97ebc2b
more info in meta call
chronolaw Nov 19, 2024
e179ee6
check payload.method
chronolaw Nov 19, 2024
02072db
tostring
chronolaw Nov 19, 2024
b26d2f8
comments
chronolaw Nov 19, 2024
5769164
kong.meta.v1.hello
chronolaw Nov 20, 2024
a1141d3
snappy-framed
chronolaw Nov 20, 2024
22d0bc6
cjson enc/dec
chronolaw Nov 20, 2024
4f88df5
RPC_SNAPPY_FRAMED
chronolaw Nov 20, 2024
1a0efd1
add kong_conf
chronolaw Nov 20, 2024
4e31d9c
clean
chronolaw Nov 20, 2024
630d867
clean
chronolaw Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 120 additions & 43 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local clustering_tls = require("kong.clustering.tls")
local constants = require("kong.constants")
local table_isempty = require("table.isempty")
local pl_tablex = require("pl.tablex")
local cjson = require("cjson.safe")
local string_tools = require("kong.tools.string")


local ngx_var = ngx.var
Expand All @@ -23,12 +23,13 @@ local ngx_exit = ngx.exit
local ngx_time = ngx.time
local exiting = ngx.worker.exiting
local pl_tablex_makeset = pl_tablex.makeset
local cjson_encode = cjson.encode
local cjson_decode = cjson.decode
local validate_client_cert = clustering_tls.validate_client_cert
local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL


local RPC_MATA_V1 = "kong.meta.v1"


local WS_OPTS = {
timeout = constants.CLUSTERING_TIMEOUT,
max_payload_len = kong.configuration.cluster_max_payload,
Expand Down Expand Up @@ -58,7 +59,7 @@ function _M.new(conf, node_id)
end


function _M:_add_socket(socket, capabilities_list)
function _M:_add_socket(socket)
local node_id = socket.node_id

local sockets = self.clients[node_id]
Expand All @@ -71,11 +72,6 @@ function _M:_add_socket(socket, capabilities_list)
self.clients[node_id] = sockets
end

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

assert(not sockets[socket])

sockets[socket] = true
Expand Down Expand Up @@ -141,6 +137,59 @@ function _M:_find_node_and_check_capability(node_id, cap)
end


function _M:init()
if self.conf.role ~= "control_plane" then
return
end

-- CP => DP
self.callbacks:register(RPC_MATA_V1, function(node_id, info)
local capabilities_list = info.capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

return { capabilities = self.callbacks:get_capabilities_list() }
end)
end


-- DP => CP
function _M:_meta_call(node_id, s, method)
local params = {
{ -- info
capabilities = self.callbacks:get_capabilities_list(),
version = KONG_VERSION,
hostname = kong.node.get_hostname(),
-- conf and others
},
}

local fut = future.new(node_id, s, method, params)
assert(fut:start())

local ok, err = fut:wait(5)
if err then
return nil, err
end

if not ok then
return nil, fut.error.message
end

local capabilities_list = fut.result.capabilities

self.client_capabilities[node_id] = {
set = pl_tablex_makeset(capabilities_list),
list = capabilities_list,
}

return true
end


-- low level helper used internally by :call() and concentrator
-- this one does not consider forwarding using concentrator
-- when node does not exist
Expand Down Expand Up @@ -232,16 +281,9 @@ end

-- handle incoming client connections
function _M:handle_websocket()
local kong_version = ngx_var.http_x_kong_version
local node_id = ngx_var.http_x_kong_node_id
local rpc_protocol = ngx_var.http_sec_websocket_protocol
local content_encoding = ngx_var.http_content_encoding
local rpc_capabilities = ngx_var.http_x_kong_rpc_capabilities

if not kong_version then
ngx_log(ngx_ERR, "[rpc] client did not provide version number")
return ngx_exit(ngx.HTTP_CLOSE)
end

if not node_id then
ngx_log(ngx_ERR, "[rpc] client did not provide node ID")
Expand All @@ -253,21 +295,22 @@ function _M:handle_websocket()
return ngx_exit(ngx.HTTP_CLOSE)
end

if rpc_protocol ~= "kong.rpc.v1" then
ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " ..
tostring(rpc_protocol) ..
", doesn't know how to communicate with client")
return ngx_exit(ngx.HTTP_CLOSE)
end
local rpc_found
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
local protocols = string_tools.split(rpc_protocol, ",")

if not rpc_capabilities then
ngx_log(ngx_ERR, "[rpc] client did not provide capability list")
return ngx_exit(ngx.HTTP_CLOSE)
-- choice a proper protocol
for _, v in ipairs(protocols) do
-- now we only support kong.meta.v1
if RPC_MATA_V1 == string_tools.strip(v) then
rpc_found = true
break
end
end

rpc_capabilities = cjson_decode(rpc_capabilities)
if not rpc_capabilities then
ngx_log(ngx_ERR, "[rpc] failed to decode client capability list")
if not rpc_found then
ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " ..
tostring(rpc_protocol) ..
", doesn't know how to communicate with client")
return ngx_exit(ngx.HTTP_CLOSE)
end

Expand All @@ -277,7 +320,8 @@ function _M:handle_websocket()
return ngx_exit(ngx.HTTP_CLOSE)
end

ngx.header["X-Kong-RPC-Capabilities"] = cjson_encode(self.callbacks:get_capabilities_list())
-- now we only use kong.meta.v1
ngx.header["Sec-WebSocket-Protocol"] = RPC_MATA_V1

local wb, err = server:new(WS_OPTS)
if not wb then
Expand All @@ -286,11 +330,22 @@ function _M:handle_websocket()
end

local s = socket.new(self, wb, node_id)
self:_add_socket(s, rpc_capabilities)
self:_add_socket(s)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr

-- check if client handshake success in 2 seconds
ngx.timer.at(2, function(premature)
if premature then
return
end

if not self.client_capabilities[node_id] then
s:stop()
end
end)

s:start()
local res, err = s:join()
self:_remove_socket(s)
Expand Down Expand Up @@ -339,13 +394,10 @@ function _M:connect(premature, node_id, host, path, cert, key)
ssl_verify = true,
client_cert = cert,
client_priv_key = key,
protocols = "kong.rpc.v1",
protocols = RPC_MATA_V1,
headers = {
"X-Kong-Version: " .. KONG_VERSION,
"X-Kong-Node-Id: " .. self.node_id,
"X-Kong-Hostname: " .. kong.node.get_hostname(),
"X-Kong-RPC-Capabilities: " .. cjson_encode(self.callbacks:get_capabilities_list()),
"Content-Encoding: x-snappy-framed"
"Content-Encoding: x-snappy-framed",
},
}

Expand All @@ -372,24 +424,49 @@ function _M:connect(premature, node_id, host, path, cert, key)
do
local resp_headers = c:get_resp_headers()
-- FIXME: resp_headers should not be case sensitive
if not resp_headers or not resp_headers["x_kong_rpc_capabilities"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide capability list, node_id: ", node_id)

if not resp_headers or not resp_headers["sec_websocket_protocol"] then
ngx_log(ngx_ERR, "[rpc] peer did not provide sec_websocket_protocol, node_id: ", node_id)
c:send_close() -- can't do much if this fails
goto err
end

local capabilities = resp_headers["x_kong_rpc_capabilities"]
capabilities = cjson_decode(capabilities)
if not capabilities then
ngx_log(ngx_ERR, "[rpc] unable to decode peer capability list, node_id: ", node_id,
" list: ", capabilities)
-- should like "kong.meta.v1"
local meta_rpc_call = resp_headers["sec_websocket_protocol"]

if meta_rpc_call ~= RPC_MATA_V1 then
ngx_log(ngx_ERR, "[rpc] did not support protocol : ", meta_rpc_call)
c:send_close() -- can't do much if this fails
goto err
end

local s = socket.new(self, c, node_id)
s:start()
self:_add_socket(s, capabilities)
self:_add_socket(s)

ngx.timer.at(0, function(premature)
if premature then
return
end

local retry_count = 5

-- retry
for i = 1, retry_count do
local ok, err = self:_meta_call("control_plane", s, meta_rpc_call)
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
if ok then
return
end

ngx_log(ngx_ERR, "[rpc] unable to get peer capability list, node_id: ", node_id,
" err: ", err)

ngx.sleep(0.1 * i)
end

-- retry failed
s:stop()
end)

ok, err = s:join() -- main event loop

Expand Down
1 change: 1 addition & 0 deletions kong/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ function Kong.init()

if config.cluster_rpc then
kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id())
kong.rpc:init()

if config.cluster_incremental_sync then
kong.sync = require("kong.clustering.services.sync").new(db, is_control_plane(config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ describe("DP diabled Incremental Sync RPC #" .. strategy, function()
nginx_conf = "spec/fixtures/custom_nginx.template",
nginx_worker_processes = 2, -- multiple workers

cluster_rpc = "off", -- DISABLE rpc
cluster_incremental_sync = "off", -- DISABLE incremental sync

dedicated_config_processing = dedicated, -- privileged agent
Expand Down
Loading