diff --git a/CHANGELOG.md b/CHANGELOG.md index e2d28a82e..d29d207a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added +- `tt replicaset upgrade`: command to upgrade the schema on a Tarantool cluster. + * `-r (--replicaset)`: specify the replicaset name(s) to upgrade. + * `-t (--timeout)`: timeout for waiting the LSN synchronization (in seconds) (default 5). + ### Changed ### Fixed diff --git a/cli/cmd/replicaset.go b/cli/cmd/replicaset.go index 3820a309a..d5c620cfb 100644 --- a/cli/cmd/replicaset.go +++ b/cli/cmd/replicaset.go @@ -46,12 +46,42 @@ var ( replicasetIsGlobal bool rebootstrapConfirmed bool + chosenReplicasetAliases []string + lsnTimeout int + replicasetUriHelp = " The URI can be specified in the following formats:\n" + " * [tcp://][username:password@][host:port]\n" + " * [unix://][username:password@]socketpath\n" + " To specify relative path without `unix://` use `./`." ) +// newUpgradeCmd creates a "replicaset upgrade" command. +func newUpgradeCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "upgrade () [flags]", + DisableFlagsInUseLine: true, + Short: "Upgrade tarantool cluster", + Long: "Upgrade tarantool cluster.\n\n" + + libconnect.EnvCredentialsHelp + "\n\n", + Run: func(cmd *cobra.Command, args []string) { + cmdCtx.CommandName = cmd.Name() + err := modules.RunCmd(&cmdCtx, cmd.CommandPath(), &modulesInfo, + internalReplicasetUpgradeModule, args) + util.HandleCmdErr(cmd, err) + }, + Args: cobra.ExactArgs(1), + } + + cmd.Flags().StringArrayVarP(&chosenReplicasetAliases, "replicaset", "r", + []string{}, "specify the replicaset name(s) to upgrade") + + cmd.Flags().IntVarP(&lsnTimeout, "timeout", "t", 5, + "timeout for waiting the LSN synchronization (in seconds)") + + addOrchestratorFlags(cmd) + return cmd +} + // newStatusCmd creates a "replicaset status" command. func newStatusCmd() *cobra.Command { cmd := &cobra.Command{ @@ -341,6 +371,7 @@ func NewReplicasetCmd() *cobra.Command { Aliases: []string{"rs"}, } + cmd.AddCommand(newUpgradeCmd()) cmd.AddCommand(newStatusCmd()) cmd.AddCommand(newPromoteCmd()) cmd.AddCommand(newDemoteCmd()) @@ -490,6 +521,26 @@ func replicasetFillCtx(cmdCtx *cmdcontext.CmdCtx, ctx *replicasetCtx, args []str return nil } +// internalReplicasetUpgradeModule is a "upgrade" command for the replicaset module. +func internalReplicasetUpgradeModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { + var ctx replicasetCtx + if err := replicasetFillCtx(cmdCtx, &ctx, args, false); err != nil { + return err + } + if ctx.IsInstanceConnect { + defer ctx.Conn.Close() + } + return replicasetcmd.Upgrade(replicasetcmd.DiscoveryCtx{ + IsApplication: ctx.IsApplication, + RunningCtx: ctx.RunningCtx, + Conn: ctx.Conn, + Orchestrator: ctx.Orchestrator, + }, replicasetcmd.UpgradeOpts{ + ChosenReplicasetAliases: chosenReplicasetAliases, + LsnTimeout: lsnTimeout, + }) +} + // internalReplicasetPromoteModule is a "promote" command for the replicaset module. func internalReplicasetPromoteModule(cmdCtx *cmdcontext.CmdCtx, args []string) error { var ctx replicasetCtx @@ -561,7 +612,7 @@ func internalReplicasetStatusModule(cmdCtx *cmdcontext.CmdCtx, args []string) er if ctx.IsInstanceConnect { defer ctx.Conn.Close() } - return replicasetcmd.Status(replicasetcmd.StatusCtx{ + return replicasetcmd.Status(replicasetcmd.DiscoveryCtx{ IsApplication: ctx.IsApplication, RunningCtx: ctx.RunningCtx, Conn: ctx.Conn, diff --git a/cli/replicaset/cmd/lua/upgrade.lua b/cli/replicaset/cmd/lua/upgrade.lua new file mode 100644 index 000000000..0a7afff8a --- /dev/null +++ b/cli/replicaset/cmd/lua/upgrade.lua @@ -0,0 +1,10 @@ +local ok, err = pcall(box.schema.upgrade) +if ok then + ok, err = pcall(box.snapshot) +end + +return { + lsn = box.info.lsn, + iid = box.info.id, + err = (not ok) and tostring(err) or nil, +} diff --git a/cli/replicaset/cmd/status.go b/cli/replicaset/cmd/status.go index 99a8c41c1..278ccf3e6 100644 --- a/cli/replicaset/cmd/status.go +++ b/cli/replicaset/cmd/status.go @@ -10,41 +10,43 @@ import ( "github.com/tarantool/tt/cli/running" ) -// StatusCtx contains information about replicaset status command execution -// context. -type StatusCtx struct { +// DiscoveryCtx contains information about replicaset discovery. +type DiscoveryCtx struct { // IsApplication true if an application passed. IsApplication bool // RunningCtx is an application running context. RunningCtx running.RunningCtx // Conn is an active connection to a passed instance. Conn connector.Connector - // Orchestrator is a forced orchestator choice. + // Orchestrator is a forced orchestrator choice. Orchestrator replicaset.Orchestrator } -// Status shows a replicaset status. -func Status(statusCtx StatusCtx) error { - orchestratorType, err := getOrchestratorType(statusCtx.Orchestrator, - statusCtx.Conn, statusCtx.RunningCtx) +// getReplicasets discovers and returns the list of replicasets. +func getReplicasets(ctx DiscoveryCtx) (replicaset.Replicasets, error) { + orchestratorType, err := getOrchestratorType(ctx.Orchestrator, ctx.Conn, ctx.RunningCtx) if err != nil { - return err + return replicaset.Replicasets{}, err } var orchestrator replicasetOrchestrator - if statusCtx.IsApplication { - if orchestrator, err = makeApplicationOrchestrator( - orchestratorType, statusCtx.RunningCtx, nil, nil); err != nil { - return err - } + if ctx.IsApplication { + orchestrator, err = makeApplicationOrchestrator(orchestratorType, + ctx.RunningCtx, nil, nil) } else { - if orchestrator, err = makeInstanceOrchestrator( - orchestratorType, statusCtx.Conn); err != nil { - return err - } + orchestrator, err = makeInstanceOrchestrator(orchestratorType, ctx.Conn) + } + + if err != nil { + return replicaset.Replicasets{}, err } - replicasets, err := orchestrator.Discovery(replicaset.SkipCache) + return orchestrator.Discovery(replicaset.SkipCache) +} + +// Status shows a replicaset status. +func Status(discoveryCtx DiscoveryCtx) error { + replicasets, err := getReplicasets(discoveryCtx) if err != nil { return err } diff --git a/cli/replicaset/cmd/upgrade.go b/cli/replicaset/cmd/upgrade.go new file mode 100644 index 000000000..285f0513a --- /dev/null +++ b/cli/replicaset/cmd/upgrade.go @@ -0,0 +1,268 @@ +package replicasetcmd + +import ( + _ "embed" + "errors" + "fmt" + "time" + + "github.com/mitchellh/mapstructure" + "github.com/tarantool/tt/cli/connector" + "github.com/tarantool/tt/cli/replicaset" + "github.com/tarantool/tt/cli/running" +) + +// UpgradeOpts contains options used for the upgrade process. +type UpgradeOpts struct { + // List of replicaset names specified by the user for the upgrade. + ChosenReplicasetAliases []string + // Timeout period (in seconds) for waiting on LSN synchronization. + LsnTimeout int +} + +type instanceMeta struct { + run running.InstanceCtx + conn connector.Connector +} + +//go:embed lua/upgrade.lua +var upgradeMasterLua string + +type syncInfo struct { + LSN uint64 `mapstructure:"lsn"` + IID uint32 `mapstructure:"iid"` + Err *string `mapstructure:"err"` +} + +// filterReplicasetsByAliases filters the given replicaset list by chosen aliases and +// returns chosen replicasets. If a non-existent alias is found, it returns an error. +func filterReplicasetsByAliases(replicasets replicaset.Replicasets, + chosenReplicasetAliases []string) ([]replicaset.Replicaset, error) { + // If no aliases are provided, return all replicasets. + if len(chosenReplicasetAliases) == 0 { + return replicasets.Replicasets, nil + } + + // Create a map for fast lookup of replicasets by alias. + replicasetMap := make(map[string]replicaset.Replicaset) + for _, rs := range replicasets.Replicasets { + replicasetMap[rs.Alias] = rs + } + + var chosenReplicasets []replicaset.Replicaset + for _, alias := range chosenReplicasetAliases { + rs, exists := replicasetMap[alias] + if !exists { + return nil, fmt.Errorf("replicaset with alias %q doesn't exist", alias) + } + chosenReplicasets = append(chosenReplicasets, rs) + } + + return chosenReplicasets, nil +} + +// Upgrade upgrades tarantool schema. +func Upgrade(discoveryCtx DiscoveryCtx, opts UpgradeOpts) error { + replicasets, err := getReplicasets(discoveryCtx) + if err != nil { + return err + } + + replicasets = fillAliases(replicasets) + replicasetsToUpgrade, err := filterReplicasetsByAliases(replicasets, + opts.ChosenReplicasetAliases) + if err != nil { + return err + } + + return internalUpgrade(replicasetsToUpgrade, opts.LsnTimeout) +} + +func internalUpgrade(replicasets []replicaset.Replicaset, lsnTimeout int) error { + for _, replicaset := range replicasets { + err := upgradeReplicaset(replicaset, lsnTimeout) + if err != nil { + fmt.Printf("• %s: error\n", replicaset.Alias) + return fmt.Errorf("replicaset %s: %w", replicaset.Alias, err) + } + fmt.Printf("• %s: ok\n", replicaset.Alias) + } + return nil +} + +func closeConnectors(master *instanceMeta, replicas []instanceMeta) { + if master != nil { + master.conn.Close() + } + for _, replica := range replicas { + replica.conn.Close() + } +} + +func getInstanceConnector(instance replicaset.Instance) (connector.Connector, error) { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + if fullInstanceName == "" { + fullInstanceName = instance.Alias + } + if fullInstanceName == "" { + fullInstanceName = "unknown" + } + + // Try to connect via unix socket. + conn, err := connector.Connect(connector.ConnectOpts{ + Network: "unix", + Address: run.ConsoleSocket, + }) + + if err != nil { + return nil, fmt.Errorf("instance %s failed to connect via UNIX socket "+ + ": %w", fullInstanceName, err) + } + return conn, nil +} + +func collectRWROInfo(replset replicaset.Replicaset) (*instanceMeta, []instanceMeta, + error) { + var master *instanceMeta = nil + var replicas []instanceMeta + for _, instance := range replset.Instances { + run := instance.InstanceCtx + fullInstanceName := running.GetAppInstanceName(run) + conn, err := getInstanceConnector(instance) + + if err != nil { + return nil, nil, err + } + + if instance.Mode == replicaset.ModeUnknown { + closeConnectors(master, replicas) + return nil, nil, fmt.Errorf( + "can't determine RO/RW mode on instance: %s", fullInstanceName) + } + + isRW := instance.Mode.String() == "rw" + + if isRW && master != nil { + closeConnectors(master, replicas) + return nil, nil, fmt.Errorf("%s and %s are both masters", + running.GetAppInstanceName((*master).run), fullInstanceName) + } else if isRW { + master = &instanceMeta{run, conn} + } else { + replicas = append(replicas, instanceMeta{run, conn}) + } + } + return master, replicas, nil +} + +func waitLSN(conn connector.Connector, masterIID uint32, masterLSN uint64, lsnTimeout int) error { + var lastError error + query := fmt.Sprintf("return box.info.vclock[%d]", masterIID) + + deadline := time.Now().Add(time.Duration(lsnTimeout) * time.Second) + for { + res, err := conn.Eval(query, []any{}, connector.RequestOpts{}) + if err != nil { + lastError = fmt.Errorf("failed to evaluate LSN query: %w", err) + } else if len(res) == 0 { + lastError = errors.New("empty result from LSN query") + } else { + var lsn uint64 + if err := mapstructure.Decode(res[0], &lsn); err != nil { + lastError = fmt.Errorf("failed to decode LSN: %w", err) + } else if lsn >= masterLSN { + return nil + } else { + lastError = fmt.Errorf("current LSN %d is behind required "+ + "master LSN %d", lsn, masterLSN) + } + } + + if time.Now().After(deadline) { + break + } + + time.Sleep(1 * time.Second) + } + + return lastError +} + +func upgradeMaster(master *instanceMeta) (syncInfo, error) { + var upgradeInfo syncInfo + fullMasterName := running.GetAppInstanceName(master.run) + res, err := master.conn.Eval(upgradeMasterLua, []any{}, connector.RequestOpts{}) + if err != nil { + return upgradeInfo, fmt.Errorf( + "failed to execute upgrade script on master instance - %s: %w", + fullMasterName, err) + } + + if err := mapstructure.Decode(res[0], &upgradeInfo); err != nil { + return upgradeInfo, fmt.Errorf( + "failed to decode response from master instance - %s: %w", + fullMasterName, err) + } + + if upgradeInfo.Err != nil { + return upgradeInfo, fmt.Errorf( + "master instance upgrade failed - %s: %w", + fullMasterName, err) + } + return upgradeInfo, nil +} + +func snapshot(instance *instanceMeta) error { + res, err := instance.conn.Eval("return box.snapshot()", []any{}, + connector.RequestOpts{}) + if err != nil { + return fmt.Errorf("failed to execute snapshot on replica: %w", err) + } + if len(res) == 0 { + return fmt.Errorf("snapshot command on %s returned an empty result, "+ + "'ok' expected", running.GetAppInstanceName(instance.run)) + } + + if result, ok := res[0].(string); !ok || result != "ok" { + return fmt.Errorf("snapshot command on %s returned unexpected result: '%v', "+ + "'ok' expected", running.GetAppInstanceName(instance.run), res[0]) + } + return nil +} + +func upgradeReplicaset(replicaset replicaset.Replicaset, lsnTimeout int) error { + master, replicas, err := collectRWROInfo(replicaset) + if err != nil { + return err + } + + defer closeConnectors(master, replicas) + + // Upgrade master instance, collect LSN and IID from master instance. + upgradeInfo, err := upgradeMaster(master) + if err != nil { + return err + } + + // Upgrade replica instances. + masterLSN := upgradeInfo.LSN + masterIID := upgradeInfo.IID + + for _, replica := range replicas { + fullReplicaName := running.GetAppInstanceName(replica.run) + err := waitLSN(replica.conn, masterIID, masterLSN, lsnTimeout) + if err != nil { + return fmt.Errorf("can't ensure that upgrade operations performed on %s "+ + "are replicated to %s to perform snapshotting on it: error "+ + "waiting LSN %d in vclock component %d: %w", + running.GetAppInstanceName(master.run), fullReplicaName, + masterLSN, masterIID, err) + } + err = snapshot(&replica) + if err != nil { + return err + } + } + return nil +} diff --git a/test/integration/replicaset/single-t2-app/init.lua b/test/integration/replicaset/single-t2-app/init.lua new file mode 100644 index 000000000..c7159f3ae --- /dev/null +++ b/test/integration/replicaset/single-t2-app/init.lua @@ -0,0 +1,11 @@ +local fiber = require('fiber') +local fio = require('fio') + +box.cfg({}) + +fh = fio.open('ready', {'O_WRONLY', 'O_CREAT'}, tonumber('644',8)) +fh:close() + +while true do + fiber.sleep(5) +end diff --git a/test/integration/replicaset/single-t2-app/tt.yaml b/test/integration/replicaset/single-t2-app/tt.yaml new file mode 100644 index 000000000..ee312214a --- /dev/null +++ b/test/integration/replicaset/single-t2-app/tt.yaml @@ -0,0 +1,9 @@ +env: + instances_enabled: . + +default: + app: + dir: . + file: init.lua + memtx_dir: ./ + wal_dir: ./ diff --git a/test/integration/replicaset/test_replicaset_upgrade.py b/test/integration/replicaset/test_replicaset_upgrade.py new file mode 100644 index 000000000..f611aec8d --- /dev/null +++ b/test/integration/replicaset/test_replicaset_upgrade.py @@ -0,0 +1,197 @@ +import os +import re +import shutil +import subprocess +import tempfile + +import pytest +from replicaset_helpers import stop_application +from vshard_cluster import VshardCluster + +from utils import get_tarantool_version, run_command_and_get_output, wait_file + +tarantool_major_version, _ = get_tarantool_version() + + +def run_command_on_instance(tt_cmd, tmpdir, full_inst_name, cmd): + con_cmd = [tt_cmd, "connect", full_inst_name, "-f", "-"] + instance_process = subprocess.Popen( + con_cmd, + cwd=tmpdir, + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + text=True, + ) + instance_process.stdin.writelines([cmd]) + instance_process.stdin.close() + output = instance_process.stdout.read() + return output + + +@pytest.mark.skipif( + tarantool_major_version < 3, reason="skip centralized config test for Tarantool < 3" +) +def test_upgrade_multi_master(tt_cmd, tmpdir_with_cfg): + tmpdir = tmpdir_with_cfg + app_name = "test_ccluster_app" + app_path = os.path.join(tmpdir, app_name) + shutil.copytree(os.path.join(os.path.dirname(__file__), app_name), app_path) + try: + # Start a cluster. + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=tmpdir) + assert rc == 0 + + for i in range(1, 6): + file = wait_file( + os.path.join(tmpdir, app_name), f"ready-instance-00{i}", [] + ) + assert file != "" + + status_cmd = [tt_cmd, "replicaset", "upgrade", app_name] + + rc, out = run_command_and_get_output(status_cmd, cwd=tmpdir) + assert rc == 1 + assert "replicaset-002: error" in out and "are both masters" in out + + finally: + stop_application(tt_cmd, app_name, tmpdir, []) + + +def test_upgrade_t2_app_dummy_replicaset(tt_cmd): + app_name = "single-t2-app" + test_app_path_src = os.path.join(os.path.dirname(__file__), app_name) + + with tempfile.TemporaryDirectory() as tmpdir: + test_app_path = os.path.join(tmpdir, app_name) + shutil.copytree(test_app_path_src, test_app_path) + memtx_dir = os.path.join(test_app_path, "var", "lib", app_name) + os.makedirs(memtx_dir, exist_ok=True) + + try: + start_cmd = [tt_cmd, "start", app_name] + rc, out = run_command_and_get_output(start_cmd, cwd=test_app_path) + assert rc == 0 + + file = wait_file(test_app_path, "ready", []) + assert file != "" + + # Downgrade schema. + out = run_command_on_instance( + tt_cmd, + test_app_path, + app_name, + "box.schema.downgrade('2.8.2') box.snapshot()", + ) + + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", app_name, "--custom"] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=test_app_path) + assert rc == 0 + # Out is `• : ok` because the instance has no name. + assert "ok" in out + finally: + stop_application(tt_cmd, app_name, test_app_path, []) + + +@pytest.mark.skipif(tarantool_major_version < 3, + reason="skip test with cluster config for Tarantool < 3") +def test_upgrade_downgraded_cluster_replicasets(tt_cmd, tmp_path): + app_name = "vshard_app" + replicasets = { + "router-001": ["router-001-a"], + "storage-001": ["storage-001-a", "storage-001-b"], + "storage-002": ["storage-002-a", "storage-002-b"], + } + app = VshardCluster(tt_cmd, tmp_path, app_name) + try: + app.build() + app.start() + cmd_master = '''box.space._schema:run_triggers(false) +box.space._schema:delete('replicaset_name') +box.space._schema:run_triggers(true) + +box.space._cluster:run_triggers(false) +box.atomic(function() + for _, tuple in box.space._cluster:pairs() do + pcall(box.space._cluster.update, box.space._cluster, {tuple.id}, {{'#', 'name', 1}}) + end +end) +box.space._cluster:run_triggers(true) +box.schema.downgrade('2.11.1') +box.snapshot() + ''' + + # Downgrade cluster. + for _, replicaset in replicasets.items(): + for replica in replicaset: + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replica}", + "box.cfg{force_recovery=true} return box.cfg.force_recovery" + ) + assert "true" in out + + for _, replicaset in replicasets.items(): + _ = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replicaset[0]}", + cmd_master + ) + + for _, replicaset in replicasets.items(): + if len(replicaset) == 2: + _ = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replicaset[1]}", + "box.snapshot()" + ) + + for _, replicaset in replicasets.items(): + for replica in replicaset: + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:{replica}", + "box.cfg{force_recovery=false} return box.cfg.force_recovery" + ) + assert "false" in out + + # Can't create data (old schema). + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:storage-001-a", + "box.schema.space.create('example_space')" + ) + assert "error: Your schema version is 2.11.1" in out + + # For some reason, the storage-002 replica set is having problems with + # replication after downgrade. For now check only replicaset storage-001. + upgrade_cmd = [tt_cmd, "replicaset", "upgrade", app_name, "-t=15"] + rc, out = run_command_and_get_output(upgrade_cmd, cwd=tmp_path) + + assert rc == 0 + + upgrade_out = out.strip().split("\n") + assert len(upgrade_out) == len(replicasets) + + for i in range(len(replicasets)): + match = re.search(r"•\s*(.*?):\s*(.*)", upgrade_out[i]) + assert match.group(1) in replicasets + assert match.group(2) == "ok" + + # Create data (new schema). + out = run_command_on_instance( + tt_cmd, + tmp_path, + f"{app_name}:storage-001-a", + "box.schema.space.create('example_space')" + ) + assert "name: example_space" in out + + finally: + app.stop()