Skip to content

Commit

Permalink
Merge branch 'release/0.19.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
floriankrb committed Jan 9, 2024
2 parents 224179c + bb33e0c commit 92a2a0f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 69 deletions.
3 changes: 3 additions & 0 deletions climetlab/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def get(self, x):
order[int(key)] = i
except ValueError:
pass
except TypeError:
print('Cannot convert "%s" to int (%s)' % (key, type(key)))
raise
try:
order[float(key)] = i
except ValueError:
Expand Down
215 changes: 150 additions & 65 deletions climetlab/sources/era5_accumulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,74 @@

import climetlab as cml
from climetlab import Source
from climetlab.core.temporary import temp_file
from climetlab.decorators import normalize
from climetlab.readers.grib.output import new_grib_output
from climetlab.utils.availability import Availability

mapping = [
[-1, 18, 6],
[-1, 18, 7],
[-1, 18, 8],
[-1, 18, 9],
[-1, 18, 10],
[-1, 18, 11],
[-1, 18, 12],
[0, 6, 1],
[0, 6, 2],
[0, 6, 3],
[0, 6, 4],
[0, 6, 5],
[0, 6, 6],
[0, 6, 7],
[0, 6, 8],
[0, 6, 9],
[0, 6, 10],
[0, 6, 11],
[0, 6, 12],
[0, 18, 1],
[0, 18, 2],
[0, 18, 3],
[0, 18, 4],
[0, 18, 5],
]

class Accumulation:
def __init__(self, out, param, date, time, step, number, stepping):
self.out = out
self.param = param
self.date = date
self.time = time * 100
self.number = number
self.steps = tuple(step)
self.values = None
self.seen = set()
self.startStep = None
self.endStep = None
self.done = False
self.stepping = stepping

@property
def key(self):
return (self.param, self.date, self.time, self.steps, self.number)

def add(self, field, values):
step = field.metadata("step")
if step not in self.steps:
return

assert not self.done, (self.key, step)

startStep = field.metadata("startStep")
endStep = field.metadata("endStep")

assert endStep == step, (startStep, endStep, step)
assert step not in self.seen, (self.key, step)

assert endStep - startStep == self.stepping, (startStep, endStep)

if self.startStep is None:
self.startStep = startStep
else:
self.startStep = min(self.startStep, startStep)

if self.endStep is None:
self.endStep = endStep
else:
self.endStep = max(self.endStep, endStep)

if self.values is None:
import numpy as np

self.values = np.zeros_like(values)

self.values += values

self.seen.add(step)

if len(self.seen) == len(self.steps):
self.out.write(
self.values,
template=field,
startStep=self.startStep,
endStep=self.endStep,
)
self.values = None
self.done = True


class Era5Accumulations(Source):
Expand All @@ -64,17 +104,39 @@ def __init__(self, *args, **kwargs):
param = request["param"]
if not isinstance(param, (list, tuple)):
param = [param]

for p in param:
assert p in ["cp", "lsp", "tp"], p

number = request.get("number", [0])
assert isinstance(number, (list, tuple))

user_step = 6 # For now, we only support 6h accumulation

user_dates = request["date"]
user_times = request["time"]

requested = set()

dates = set()
times = set()
steps = set()
era_request = dict(**request)

type_ = request.get("type", "an")
if type_ == "an":
type_ = "fc"

stepping = 1
if request.get("stream") == "enda":
stepping = 3
for n in user_times:
assert n % 6 == 0, n

era_request.update({"class": "ea", "type": type_, "levtype": "sfc"})

tmp = temp_file()
path = tmp.path
out = new_grib_output(path)

requests = []

for user_date, user_time in itertools.product(user_dates, user_times):
assert isinstance(user_date, datetime.datetime), (
Expand All @@ -85,51 +147,74 @@ def __init__(self, *args, **kwargs):
assert isinstance(user_time, int), (type(user_time), user_dates, user_times)
assert 0 <= user_time <= 24, user_time

date = user_date + datetime.timedelta(hours=user_time)
delta, time, step = mapping[date.hour]
requested.add(user_date + datetime.timedelta(hours=user_time))

assert 0 <= time <= 23, time
assert 0 <= step <= 24, step
when = (
user_date
+ datetime.timedelta(hours=user_time)
- datetime.timedelta(hours=user_step)
)
add_step = 0

when = date + datetime.timedelta(days=delta)
dates.add(datetime.datetime(when.year, when.month, when.day))
times.add(time)
steps.add(step)
requested.add(date)
while when.hour not in (6, 18):
when -= datetime.timedelta(hours=stepping)
add_step += stepping

valids = defaultdict(list)
for date, time, step in itertools.product(dates, times, steps):
valids[
date + datetime.timedelta(hours=time) + datetime.timedelta(hours=step)
].append((date, time, step))
steps = tuple(
step + add_step
for step in range(stepping, user_step + stepping, stepping)
)

got = set(valids.keys())
assert all(len(x) == 1 for x in valids.values())
missing = requested - got
assert len(missing) == 0
for p in param:
for n in number:
requests.append(
{
"param": p,
"date": int(when.strftime("%Y%m%d")),
"time": when.hour,
"step": sorted(steps),
"number": n,
}
)

compressed = Availability(requests)
ds = cml.load_source("empty")
for r in compressed.iterate():
era_request.update(r)
ds = ds + cml.load_source("mars", **era_request)

accumulations = defaultdict(list)
for a in [Accumulation(out, stepping=stepping, **r) for r in requests]:
for s in a.steps:
accumulations[(a.param, a.date, a.time, s, a.number)].append(a)

for field in ds:
key = (
field.metadata("param"),
field.metadata("date"),
field.metadata("time"),
field.metadata("step"),
field.metadata("number"),
)
values = field.values # optimisation
for a in accumulations[key]:
a.add(field, values)

# extra = got - requested
for acc in accumulations.values():
for a in acc:
assert a.done, (a.key, a.seen, a.steps)

era_request = dict(**request)
out.close()

type_ = request.get("type", "an")
if type_ == "an":
type_ = "fc"
ds = cml.load_source("file", path)

era_request.update(
{
"class": "ea",
"type": type_,
"levtype": "sfc",
"date": [d.strftime("%Y-%m-%d") for d in dates],
"time": sorted(times),
"step": sorted(steps),
}
self.ds = cml.load_source("file", path)
assert len(self.ds) / len(param) / len(number) == len(requested), (
len(self.ds),
len(param),
len(requested),
)

ds = cml.load_source("mars", **era_request)
index = [d.valid_datetime() in requested for d in ds]
self.ds = ds[index]
self.ds._tmp = tmp

def mutate(self):
return self.ds
Expand Down
6 changes: 3 additions & 3 deletions climetlab/sources/oper_accumulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, *args, **kwargs):
"scda": {"dates": set(), "times": set()},
}

step = 6
user_step = 6
requested = set()

for user_date, user_time in itertools.product(user_dates, user_times):
Expand All @@ -54,7 +54,7 @@ def __init__(self, *args, **kwargs):

requested.add(when)

when -= datetime.timedelta(hours=step)
when -= datetime.timedelta(hours=user_step)
date = datetime.datetime(when.year, when.month, when.day)
time = when.hour

Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__(self, *args, **kwargs):
"stream": stream,
"date": [d.strftime("%Y-%m-%d") for d in dates],
"time": sorted(times),
"step": step,
"step": user_step,
}
)

Expand Down
2 changes: 1 addition & 1 deletion climetlab/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.19.1
0.19.2

0 comments on commit 92a2a0f

Please sign in to comment.