From 517ba9ebef0a617054c0005c5ca32a45b29aa61a Mon Sep 17 00:00:00 2001 From: Ruben Gonzalez Date: Thu, 12 Dec 2024 10:10:56 -0700 Subject: [PATCH 1/4] Simplified the timer (now always the start of the interval) --- src/_TimeSeriesCollector.jl | 27 ++++++++++++--------------- test/runtests.jl | 4 ++-- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/src/_TimeSeriesCollector.jl b/src/_TimeSeriesCollector.jl index efcc5b7..3eb93db 100644 --- a/src/_TimeSeriesCollector.jl +++ b/src/_TimeSeriesCollector.jl @@ -106,13 +106,15 @@ Notes: - 'snapshot' and 'interval' can span more than one 'collector.interval' if many intervals have elapsed between samples """ function Base.take!(collector::TimeSeriesCollector, t::DateTime) - if t > (collector.timer[] + collector.delay) + if t > (collector.timer[] + collector.delay + collector.interval) #Construct the time interval - t0 = collector.timer[] - collector.interval #Start of this interval - tn = starttimer!(collector, t - collector.delay) #End of next interval - t1 = tn - collector.interval #End of this interval + t0 = collector.timer[] #Start of this interval + t1 = next_interval_start(collector, t) interval = TimeInterval(t0, t1) + #Set the start time of the new interval + collector.timer[] = t1 + #Collect data that spans the time interval snapshot = getouter(collector.data, interval) @@ -148,19 +150,14 @@ function Base.push!(collector::TimeSeriesCollector{T}, tagrecord::Pair{<:Abstrac end """ -starttimer!(collector::TimeSeriesCollector, start::DateTime) +calctimer(collector::TimeSeriesCollector, current::DateTime) -Sets the timer on 'collector' to the end of the interval that contains 'start', "zero" interval sets timer to "start" +Calculates the beginning of the next time interval given the current time """ -function starttimer!(collector::TimeSeriesCollector, start::DateTime) - if start > collector.timer[] - if iszero(collector.interval) - collector.timer[] = start - else - collector.timer[] = floor(start, collector.interval) + collector.interval - end - end - return collector.timer[] +function next_interval_start(collector::TimeSeriesCollector, current::DateTime) + rawstart = current - collector.delay - collector.interval + newstart = iszero(collector.interval) ? rawstart : floor(rawstart, collector.interval) + return max(collector.timer[], newstart) end diff --git a/test/runtests.jl b/test/runtests.jl index 169f3ae..6767fa5 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -143,6 +143,7 @@ using Dates for dt in (Millisecond(0), Millisecond(1000), Millisecond(2500)) for λt in (Millisecond(0), Millisecond(1), dt) + display((interval=dt, delay=λt)) t0 = DateTime(2024,1,1,0,0,0) t1 = DateTime(2024,1,1,0,1,0) vt = datetime2unix.(t0:Second(1):t1) @@ -150,7 +151,7 @@ using Dates λt = Millisecond(0) function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T - return getinner(data, TimeInterval(interval[1], interval[2]-0.001)) + return getinner(data, TimeInterval(interval[1], interval[2])) end pert = rand(length(vt)).*0 @@ -171,7 +172,6 @@ using Dates if !isnothing(result) y1 = getouter(result.snapshot, result.interval) y0 = getouter(original, result.interval) - if iszero(λt) #When delay is zero, future values won't be accessible for (k,v) in pairs(y0) keepat!(records(v), 1:length(y1[k])) From 9baf1bc4a054c51a46604e18556e5779489054e2 Mon Sep 17 00:00:00 2001 From: Ruben Gonzalez Date: Thu, 12 Dec 2024 12:06:53 -0700 Subject: [PATCH 2/4] Fixed broken test where delay and interval were in fact not changed --- src/_TimeSeriesCollector.jl | 4 ++-- test/runtests.jl | 31 +++++++++++++++++++------------ 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/_TimeSeriesCollector.jl b/src/_TimeSeriesCollector.jl index 3eb93db..223c334 100644 --- a/src/_TimeSeriesCollector.jl +++ b/src/_TimeSeriesCollector.jl @@ -11,7 +11,7 @@ Used to collect tagged time records Pair{String=>TimeRecord{T}} arriving mostly when set to zero, the interval will be the distance between timestamps - 'delay' is the amount of time we wait beyond the interval to collect data. This helps make algorithms robust against slightly out-of-order data - - 'timer' is a DateTime reference that indicates when the end of the next interval is due (will wait for 'delay') before collecting + - 'timer' is a DateTime reference that indicates the beginning of the next collection interval - 'data' is a Dict of TimeSeries that stores collected data. """ @kwdef struct TimeSeriesCollector{T} @@ -156,7 +156,7 @@ Calculates the beginning of the next time interval given the current time """ function next_interval_start(collector::TimeSeriesCollector, current::DateTime) rawstart = current - collector.delay - collector.interval - newstart = iszero(collector.interval) ? rawstart : floor(rawstart, collector.interval) + newstart = iszero(collector.interval) ? rawstart : ceil(rawstart, collector.interval)-collector.interval return max(collector.timer[], newstart) end diff --git a/test/runtests.jl b/test/runtests.jl index 6767fa5..c6f9243 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -141,18 +141,20 @@ using Dates return recordpair.(taggedseries) end + function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T + return getinner(data, TimeInterval(interval[1], interval[2])) + end + for dt in (Millisecond(0), Millisecond(1000), Millisecond(2500)) for λt in (Millisecond(0), Millisecond(1), dt) - display((interval=dt, delay=λt)) + #dt = Millisecond(0) + #λt = Millisecond(1) + #display((interval=dt, delay=λt)) + t0 = DateTime(2024,1,1,0,0,0) t1 = DateTime(2024,1,1,0,1,0) vt = datetime2unix.(t0:Second(1):t1) - dt = Millisecond(0) - λt = Millisecond(0) - - function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T - return getinner(data, TimeInterval(interval[1], interval[2])) - end + pert = rand(length(vt)).*0 original = Dict( @@ -163,7 +165,7 @@ using Dates #Mismatch test - collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt)) + collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt)) mismatches = Tuple{Dict{String,TimeSeries{Float64}}, Dict{String,TimeSeries{Float64}}}[] for tagrecord in dataseries @@ -183,13 +185,16 @@ using Dates end end end + if !isempty(mismatches) + @warn "Test failed at $((interval=dt, delay=λt))" + end @test isempty(mismatches) #Reconstruction test collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt)) reconstructed = Dict{String, TimeSeries{Float64}}() for tagrecord in dataseries - result = apply!(callback, collector, tagrecord) + result = apply!(getinner, collector, tagrecord) if !isnothing(result) data = fetch(result) for (k,v) in pairs(data) @@ -203,13 +208,15 @@ using Dates anymismatches = Ref(false) for (k, ts) in pairs(reconstructed) - mismatched = (ts != original[k][1:length(ts)]) + n = min(length(original[k]), length(ts)) + mismatched = (ts != original[k][1:n]) anymismatches[] == anymismatches[] | mismatched end + if anymismatches[] + @info "Test failed at $((interval=dt, delay=λt))" + end @test !anymismatches[] end end - - end From edf1580183417199a878137522e0648d3675c4d2 Mon Sep 17 00:00:00 2001 From: Ruben Gonzalez Date: Thu, 12 Dec 2024 15:36:12 -0700 Subject: [PATCH 3/4] Fixed more test issues, no more roundoff errors when defaulting timer --- src/_TimeSeriesCollector.jl | 2 +- test/runtests.jl | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/_TimeSeriesCollector.jl b/src/_TimeSeriesCollector.jl index 223c334..7a26761 100644 --- a/src/_TimeSeriesCollector.jl +++ b/src/_TimeSeriesCollector.jl @@ -17,7 +17,7 @@ Used to collect tagged time records Pair{String=>TimeRecord{T}} arriving mostly @kwdef struct TimeSeriesCollector{T} interval :: Millisecond delay :: Millisecond - timer :: Base.RefValue{DateTime} = Ref(floor(now(UTC), interval)) + timer :: Base.RefValue{DateTime} = Ref(iszero(interval) ? now(UTC) : floor(now(UTC), interval)) data :: Dict{String, TimeSeries{T}} = Dict{String, TimeSeries{T}}() function TimeSeriesCollector{T}(interval, delay, timer, data) where T if interval < zero(interval) diff --git a/test/runtests.jl b/test/runtests.jl index c6f9243..962fd46 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -142,14 +142,14 @@ using Dates end function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T - return getinner(data, TimeInterval(interval[1], interval[2])) + return getinner(data, TimeInterval(interval[1], interval[2]-1e-6)) end for dt in (Millisecond(0), Millisecond(1000), Millisecond(2500)) for λt in (Millisecond(0), Millisecond(1), dt) #dt = Millisecond(0) #λt = Millisecond(1) - #display((interval=dt, delay=λt)) + display((interval=dt, delay=λt)) t0 = DateTime(2024,1,1,0,0,0) t1 = DateTime(2024,1,1,0,1,0) @@ -165,7 +165,7 @@ using Dates #Mismatch test - collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt)) + collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0)) mismatches = Tuple{Dict{String,TimeSeries{Float64}}, Dict{String,TimeSeries{Float64}}}[] for tagrecord in dataseries @@ -174,7 +174,7 @@ using Dates if !isnothing(result) y1 = getouter(result.snapshot, result.interval) y0 = getouter(original, result.interval) - if iszero(λt) #When delay is zero, future values won't be accessible + if λt < Second(1) #When delay is less than input sampling rate, future values won't be accessible for (k,v) in pairs(y0) keepat!(records(v), 1:length(y1[k])) end @@ -194,7 +194,7 @@ using Dates collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt)) reconstructed = Dict{String, TimeSeries{Float64}}() for tagrecord in dataseries - result = apply!(getinner, collector, tagrecord) + result = apply!(callback, collector, tagrecord) if !isnothing(result) data = fetch(result) for (k,v) in pairs(data) From da829b590bfe35e1091c38bf5f678a443d027de8 Mon Sep 17 00:00:00 2001 From: Ruben Gonzalez Date: Thu, 12 Dec 2024 15:36:51 -0700 Subject: [PATCH 4/4] Bumped version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 50cff68..1d8e147 100644 --- a/Project.toml +++ b/Project.toml @@ -1,7 +1,7 @@ name = "TimeRecords" uuid = "b543fe20-4c68-4b5f-af79-4641a0d39826" authors = ["Ruben Gonzalez and contributors"] -version = "1.3.4" +version = "1.3.5" [deps] Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"