Skip to content

Commit

Permalink
play: add timestamp flag to filter output
Browse files Browse the repository at this point in the history
After this commit user can filter output of the `play` command by record
timestamp.

Closes #227
  • Loading branch information
patapenka-alexey committed Nov 18, 2024
1 parent 87acaf9 commit 3c08af5
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 11 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- New `tt cat` flag `--timestamp` is added to specify operations ending
with the given timestamp.
- New `tt cat` and `tt play` flag `--timestamp` is added to specify operations ending
with the given timestamp. It value can be specified as a number or using
[RFC3339/RFC3339Nano](https://go.dev/src/time/format.go) time format.

### Changed

Expand Down
27 changes: 18 additions & 9 deletions cli/checkpoint/lua/play.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
-- The --space flags passes through 'TT_CLI_PLAY_SPACES'.
-- The --from flag passes through 'TT_CLI_PLAY_FROM'.
-- The --to flag passes through 'TT_CLI_PLAY_TO'.
-- The --timestamp flag passes through 'TT_CLI_PLAY_TIMESTAMP'.
-- The --replica flags passes through 'TT_CLI_PLAY_REPLICAS'.

local log = require('log')
Expand All @@ -24,20 +25,21 @@ local function find_in_list(id, list)
end

local function filter_xlog(gen, param, state, opts, cb)
local from, to, spaces = opts.from, opts.to, opts.space
local from, to, timestamp, spaces = opts.from, opts.to, opts.timestamp, opts.space
local show_system, replicas = opts['show-system'], opts.replica

for lsn, record in gen, param, state do
local sid = record.BODY and record.BODY.space_id
local rid = record.HEADER.replica_id
if replicas and #replicas == 1 and replicas[1] == rid and lsn >= to then
local ts = record.HEADER.timestamp
if replicas and #replicas == 1 and replicas[1] == rid and (lsn >= to or ts >= timestamp) then
-- Stop, as we've finished reading tuple with lsn == to
-- and the next lsn's will be bigger.
break
elseif (lsn < from) or (lsn >= to) or
(not spaces and sid and sid < 512 and not show_system) or
(spaces and (sid == nil or not find_in_list(sid, spaces))) or
(replicas and not find_in_list(rid, replicas)) then
elseif (lsn < from) or (lsn >= to) or (ts >= timestamp) or
(not spaces and sid and sid < 512 and not show_system) or
(spaces and (sid == nil or not find_in_list(sid, spaces))) or
(replicas and not find_in_list(rid, replicas)) then
-- Pass this tuple, luacheck: ignore.
else
cb(record)
Expand Down Expand Up @@ -97,7 +99,7 @@ local function main()

local files_and_uri = os.getenv('TT_CLI_PLAY_FILES_AND_URI')
if files_and_uri == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_FILES_AND_URI')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_FILES_AND_URI')
os.exit(1)
end
positional_arguments = json.decode(files_and_uri)
Expand All @@ -117,18 +119,25 @@ local function main()

local from = os.getenv('TT_CLI_PLAY_FROM')
if from == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_FROM')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_FROM')
os.exit(1)
end
keyword_arguments['from'] = tonumber(from)

local to = os.getenv('TT_CLI_PLAY_TO')
if to == nil then
log.error('Internal error: failed to get cat params from TT_CLI_PLAY_TO')
log.error('Internal error: failed to get play params from TT_CLI_PLAY_TO')
os.exit(1)
end
keyword_arguments['to'] = tonumber(to)

local timestamp = os.getenv('TT_CLI_PLAY_TIMESTAMP')
if timestamp == nil then
log.error('Internal error: failed to get play params from TT_CLI_PLAY_TIMESTAMP')
os.exit(1)
end
keyword_arguments['timestamp'] = tonumber(timestamp)

local replicas = os.getenv('TT_CLI_PLAY_REPLICAS')
if replicas ~= nil then
keyword_arguments['replica'] = {}
Expand Down
31 changes: 31 additions & 0 deletions cli/cmd/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"strconv"
"time"

"github.com/apex/log"
"github.com/spf13/cobra"
Expand All @@ -23,6 +24,7 @@ import (
var playFlags = checkpoint.Opts{
From: 0,
To: math.MaxUint64,
Timestamp: "",
Space: nil,
Replica: nil,
ShowSystem: false,
Expand All @@ -46,12 +48,16 @@ func NewPlayCmd() *cobra.Command {
internalPlayModule, args)
util.HandleCmdErr(cmd, err)
},
Example: "tt play uri /path/to/xlog --timestamp 2024-11-13T14:02:36.818700000+00:00\n" +
" tt play uri /path/to/xlog --timestamp=1731592956.818",
}

playCmd.Flags().StringVarP(&playUsername, "username", "u", "", "username")
playCmd.Flags().StringVarP(&playPassword, "password", "p", "", "password")
playCmd.Flags().Uint64Var(&playFlags.To, "to", playFlags.To,
"Show operations ending with the given lsn")
playCmd.Flags().StringVar(&playFlags.Timestamp, "timestamp", playFlags.Timestamp,
"Show operations ending with the given timestamp")
playCmd.Flags().Uint64Var(&playFlags.From, "from", playFlags.From,
"Show operations starting from the given lsn")
playCmd.Flags().IntSliceVar(&playFlags.Space, "space", playFlags.Space,
Expand Down Expand Up @@ -118,6 +124,31 @@ func internalPlayModule(cmdCtx *cmdcontext.CmdCtx, args []string) error {
os.Setenv("TT_CLI_PLAY_FROM", strconv.FormatUint(playFlags.From, 10))
os.Setenv("TT_CLI_PLAY_TO", strconv.FormatUint(playFlags.To, 10))

// float
tsParsed := false
floatTimestamp, err := strconv.ParseFloat(playFlags.Timestamp, 64)
if err == nil {
os.Setenv("TT_CLI_PLAY_TIMESTAMP", strconv.FormatFloat(floatTimestamp, 'f', -1, 64))
tsParsed = true
}
// RFC3339/RFC3339Nano = "2006-01-02T15:04:05.999999999Z07:00"
rfc3339NanoTs, err := time.Parse(time.RFC3339Nano, string(playFlags.Timestamp))
if err == nil && !tsParsed {
tsSec := rfc3339NanoTs.Unix()
tsNanoSec := rfc3339NanoTs.Nanosecond()
ts := fmt.Sprintf("%s.%s", strconv.FormatInt(tsSec, 10),
strconv.FormatInt(int64(tsNanoSec), 10))
os.Setenv("TT_CLI_PLAY_TIMESTAMP", ts)
tsParsed = true
}
if !tsParsed {
os.Setenv("TT_CLI_PLAY_TIMESTAMP", strconv.FormatUint(math.MaxUint64, 10))
}
if playFlags.Timestamp != "" && !tsParsed {
util.InternalError("Problem with timestamp parsing: %s",
version.GetVersion, err)
}

// List of replicas is passed to lua play script via environment variable in json format.
replicasJson, err := json.Marshal(playFlags.Replica)
if err != nil {
Expand Down
22 changes: 22 additions & 0 deletions test/integration/play/test_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@ def test_play_test_remote_instance(tt_cmd, test_instance):
assert re.search(r"[3, 'Ace of Base', 1993]", output)


def test_play_test_remote_instance_timestamp(tt_cmd, test_instance):
# Play .xlog file to the remote instance.
cmd = [tt_cmd, "cat", "test.xlog", "--space=999"]
rc, output = run_command_and_get_output(cmd, cwd=test_instance._tmpdir)
assert rc == 0

cmd = [tt_cmd, "play", "127.0.0.1:" + test_instance.port, "test.xlog",
"--timestamp=1651130533.1534", "--space=999"]
rc, output = run_command_and_get_output(cmd, cwd=test_instance._tmpdir)
assert rc == 0
assert re.search(r"Play result: completed successfully", output)

# Testing played .xlog file from the remote instance.
cmd = [tt_cmd, "cat", "00000000000000000000.xlog", "--space=999"]
rc, output = run_command_and_get_output(cmd, cwd=test_instance._tmpdir)
assert rc == 0
assert re.search(r"space_id: 999", output)
assert re.search(r"[1, 'Roxette', 1986]", output)
assert re.search(r"[2, 'Scorpions', 2015]", output)
assert not re.search(r"Ace of Base", output)


@pytest.mark.parametrize("opts", [
pytest.param({"flags": ["--username=test_user", "--password=4"]}),
pytest.param({"flags": ["--username=fry"]}),
Expand Down

0 comments on commit 3c08af5

Please sign in to comment.