Skip to content

Commit

Permalink
Merge pull request #17 from Deduction42/DEV
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
Deduction42 authored Dec 12, 2024
2 parents f130333 + da829b5 commit 8cf7a4c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "TimeRecords"
uuid = "b543fe20-4c68-4b5f-af79-4641a0d39826"
authors = ["Ruben Gonzalez <[email protected]> and contributors"]
version = "1.3.4"
version = "1.3.5"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
31 changes: 14 additions & 17 deletions src/_TimeSeriesCollector.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ Used to collect tagged time records Pair{String=>TimeRecord{T}} arriving mostly
when set to zero, the interval will be the distance between timestamps
- 'delay' is the amount of time we wait beyond the interval to collect data.
This helps make algorithms robust against slightly out-of-order data
- 'timer' is a DateTime reference that indicates when the end of the next interval is due (will wait for 'delay') before collecting
- 'timer' is a DateTime reference that indicates the beginning of the next collection interval
- 'data' is a Dict of TimeSeries that stores collected data.
"""
@kwdef struct TimeSeriesCollector{T}
interval :: Millisecond
delay :: Millisecond
timer :: Base.RefValue{DateTime} = Ref(floor(now(UTC), interval))
timer :: Base.RefValue{DateTime} = Ref(iszero(interval) ? now(UTC) : floor(now(UTC), interval))
data :: Dict{String, TimeSeries{T}} = Dict{String, TimeSeries{T}}()
function TimeSeriesCollector{T}(interval, delay, timer, data) where T
if interval < zero(interval)
Expand Down Expand Up @@ -106,13 +106,15 @@ Notes:
- 'snapshot' and 'interval' can span more than one 'collector.interval' if many intervals have elapsed between samples
"""
function Base.take!(collector::TimeSeriesCollector, t::DateTime)
if t > (collector.timer[] + collector.delay)
if t > (collector.timer[] + collector.delay + collector.interval)
#Construct the time interval
t0 = collector.timer[] - collector.interval #Start of this interval
tn = starttimer!(collector, t - collector.delay) #End of next interval
t1 = tn - collector.interval #End of this interval
t0 = collector.timer[] #Start of this interval
t1 = next_interval_start(collector, t)
interval = TimeInterval(t0, t1)

#Set the start time of the new interval
collector.timer[] = t1

#Collect data that spans the time interval
snapshot = getouter(collector.data, interval)

Expand Down Expand Up @@ -148,19 +150,14 @@ function Base.push!(collector::TimeSeriesCollector{T}, tagrecord::Pair{<:Abstrac
end

"""
starttimer!(collector::TimeSeriesCollector, start::DateTime)
calctimer(collector::TimeSeriesCollector, current::DateTime)
Sets the timer on 'collector' to the end of the interval that contains 'start', "zero" interval sets timer to "start"
Calculates the beginning of the next time interval given the current time
"""
function starttimer!(collector::TimeSeriesCollector, start::DateTime)
if start > collector.timer[]
if iszero(collector.interval)
collector.timer[] = start
else
collector.timer[] = floor(start, collector.interval) + collector.interval
end
end
return collector.timer[]
function next_interval_start(collector::TimeSeriesCollector, current::DateTime)
rawstart = current - collector.delay - collector.interval
newstart = iszero(collector.interval) ? rawstart : ceil(rawstart, collector.interval)-collector.interval
return max(collector.timer[], newstart)
end


Expand Down
31 changes: 19 additions & 12 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,20 @@ using Dates
return recordpair.(taggedseries)
end

function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T
return getinner(data, TimeInterval(interval[1], interval[2]-1e-6))
end

for dt in (Millisecond(0), Millisecond(1000), Millisecond(2500))
for λt in (Millisecond(0), Millisecond(1), dt)
#dt = Millisecond(0)
#λt = Millisecond(1)
display((interval=dt, delay=λt))

t0 = DateTime(2024,1,1,0,0,0)
t1 = DateTime(2024,1,1,0,1,0)
vt = datetime2unix.(t0:Second(1):t1)
dt = Millisecond(0)
λt = Millisecond(0)

function callback(data::Dict{String, TimeSeries{T}}, interval::TimeInterval) where T
return getinner(data, TimeInterval(interval[1], interval[2]-0.001))
end


pert = rand(length(vt)).*0
original = Dict(
Expand All @@ -162,7 +165,7 @@ using Dates


#Mismatch test
collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0+dt))
collector = TimeSeriesCollector{Float64}(interval=dt, delay=λt, timer=Ref(t0))
mismatches = Tuple{Dict{String,TimeSeries{Float64}}, Dict{String,TimeSeries{Float64}}}[]

for tagrecord in dataseries
Expand All @@ -171,8 +174,7 @@ using Dates
if !isnothing(result)
y1 = getouter(result.snapshot, result.interval)
y0 = getouter(original, result.interval)

if iszero(λt) #When delay is zero, future values won't be accessible
if λt < Second(1) #When delay is less than input sampling rate, future values won't be accessible
for (k,v) in pairs(y0)
keepat!(records(v), 1:length(y1[k]))
end
Expand All @@ -183,6 +185,9 @@ using Dates
end
end
end
if !isempty(mismatches)
@warn "Test failed at $((interval=dt, delay=λt))"
end
@test isempty(mismatches)

#Reconstruction test
Expand All @@ -203,13 +208,15 @@ using Dates

anymismatches = Ref(false)
for (k, ts) in pairs(reconstructed)
mismatched = (ts != original[k][1:length(ts)])
n = min(length(original[k]), length(ts))
mismatched = (ts != original[k][1:n])
anymismatches[] == anymismatches[] | mismatched
end
if anymismatches[]
@info "Test failed at $((interval=dt, delay=λt))"
end
@test !anymismatches[]
end
end


end

2 comments on commit 8cf7a4c

@Deduction42
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator register

Release notes:

  1. collector.timer[] now marks the beginning of the next interval
  2. TimeSeriesCollector constructor now allows default timer when interval was set to zero (previously produced a domain error)
  3. Fixed an issue where extra data was taken if t in take!(collector, t) fell on the edge of an interval

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/121307

Tagging

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.3.5 -m "<description of version>" 8cf7a4cda83313686e5db73b53a966e91627bddd
git push origin v1.3.5

Please sign in to comment.