Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clustering): report node version in sync #13844

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
10 changes: 5 additions & 5 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function _M.new(conf, node_id)

if conf.role == "control_plane" then
self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db)
self.client_ips = {} -- store DP node's ip addr
self.client_info = {} -- store DP node's ip addr and version
end

return setmetatable(self, _MT)
Expand Down Expand Up @@ -95,7 +95,7 @@ function _M:_remove_socket(socket)
self.client_capabilities[node_id] = nil

if self.concentrator then
self.client_ips[node_id] = nil
self.client_info[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end
Expand Down Expand Up @@ -289,7 +289,7 @@ function _M:handle_websocket()
self:_add_socket(s, rpc_capabilities)

-- store DP's ip addr
self.client_ips[node_id] = ngx_var.remote_addr
self.client_info[node_id] = { ip = ngx_var.remote_addr, version = kong_version, }
chronolaw marked this conversation as resolved.
Show resolved Hide resolved

s:start()
local res, err = s:join()
Expand Down Expand Up @@ -420,8 +420,8 @@ function _M:get_peers()
end


function _M:get_peer_ip(node_id)
return self.client_ips[node_id]
function _M:get_peer_info(node_id)
return self.client_info[node_id]
end


Expand Down
21 changes: 12 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }


local assert = assert
local ipairs = ipairs
local fmt = string.format
local ngx_null = ngx.null
Expand Down Expand Up @@ -79,13 +80,14 @@ function _M:init_cp(manager)

-- { default = { version = 1000, }, }
local default_namespace_version = default_namespace.version
local node_info = assert(kong.rpc:get_peer_info(node_id))
Copy link
Contributor

@chobits chobits Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For other reviewers, if assertion failed, it will break up a single timer runtime, so it's safe for CP.

  • kong/clustering/rpc//socket.lua: callback is runned in a timer
        -- call dispatch
        local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s",
                                                           self.node_id, payload.id, payload.method),
                                                           0, _M._dispatch, self, dispatch_cb, payload)


-- XXX TODO: follow update_sync_status() in control_plane.lua
-- follow update_sync_status() in control_plane.lua
local ok, err = kong.db.clustering_data_planes:upsert({ id = node_id }, {
last_seen = ngx.time(),
hostname = node_id,
ip = kong.rpc:get_peer_ip(node_id), -- try to get the correct ip
version = "3.8.0.0", -- XXX TODO: get from rpc call
ip = node_info.ip, -- get the correct ip
version = node_info.version, -- get from rpc call
sync_status = CLUSTERING_SYNC_STATUS.NORMAL,
config_hash = fmt("%032d", default_namespace_version),
rpc_capabilities = rpc_peers and rpc_peers[node_id] or {},
Expand Down Expand Up @@ -182,12 +184,13 @@ end


local function do_sync()
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
}

local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", msg)
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
Expand Down
6 changes: 5 additions & 1 deletion spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
local helpers = require "spec.helpers"
local cjson = require("cjson.safe")
local CLUSTERING_SYNC_STATUS = require("kong.constants").CLUSTERING_SYNC_STATUS

-- we need incremental sync to verify rpc
for _, inc_sync in ipairs { "on" } do
Expand Down Expand Up @@ -56,9 +57,12 @@ for _, strategy in helpers.each_strategy() do
-- TODO: perhaps need a new test method
for _, v in pairs(json.data) do
if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then
table.sort(v.rpc_capabilities)
assert.near(14 * 86400, v.ttl, 3)
assert.matches("^(%d+%.%d+)%.%d+", v.version)
chronolaw marked this conversation as resolved.
Show resolved Hide resolved
assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status)

-- check the available rpc service
table.sort(v.rpc_capabilities)
assert.same("kong.sync.v2", v.rpc_capabilities[1])
return true
end
Expand Down
Loading