From d0b8bf4e6395e07c322a2fa2046ef8cd865e4ff5 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 26 Nov 2024 14:16:50 +0000 Subject: [PATCH 1/5] Add forwarding of scan requests --- src/gribjump/CMakeLists.txt | 3 + src/gribjump/Config.cc | 4 +- src/gribjump/Config.h | 11 +- src/gribjump/Engine.cc | 140 ++++++++------------------ src/gribjump/Engine.h | 6 +- src/gribjump/ExtractionItem.h | 2 + src/gribjump/Forwarder.cc | 106 +++++++++++++++++++ src/gribjump/Forwarder.h | 39 +++++++ src/gribjump/Lister.cc | 51 +++++----- src/gribjump/Lister.h | 4 +- src/gribjump/Task.cc | 29 ++++-- src/gribjump/Task.h | 21 +++- src/gribjump/Types.h | 4 +- src/gribjump/URIHelper.h | 39 +++++++ src/gribjump/info/InfoCache.cc | 23 +++-- src/gribjump/info/InfoCache.h | 4 +- src/gribjump/remote/GribJumpUser.cc | 3 + src/gribjump/remote/RemoteGribJump.cc | 37 ++++++- src/gribjump/remote/RemoteGribJump.h | 10 +- src/gribjump/remote/Request.cc | 49 +++++---- src/gribjump/remote/Request.h | 33 +++++- 21 files changed, 434 insertions(+), 184 deletions(-) create mode 100644 src/gribjump/Forwarder.cc create mode 100644 src/gribjump/Forwarder.h create mode 100644 src/gribjump/URIHelper.h diff --git a/src/gribjump/CMakeLists.txt b/src/gribjump/CMakeLists.txt index dce4cc5..7086f13 100644 --- a/src/gribjump/CMakeLists.txt +++ b/src/gribjump/CMakeLists.txt @@ -65,6 +65,9 @@ if( HAVE_GRIBJUMP_LOCAL_EXTRACT ) Task.h ExtractionItem.cc ExtractionItem.h + Forwarder.cc + Forwarder.h + URIHelper.h tools/EccodesExtract.h tools/EccodesExtract.cc diff --git a/src/gribjump/Config.cc b/src/gribjump/Config.cc index a1286cb..160d0b6 100644 --- a/src/gribjump/Config.cc +++ b/src/gribjump/Config.cc @@ -41,7 +41,7 @@ Config::Config(const eckit::PathName path) : path_{path} { } -std::map Config::loadServerMap() const { +Config::ServerMap Config::loadServerMap() const { // e.g. yaml // servermap: // - fdb: "host1:port1" @@ -50,7 +50,7 @@ std::map Config::loadServerMap() const { // gribjump: "host4:port4" // becomes map: // { "host1:port1": "host2:port2", "host3:port3": "host4:port4" } - std::map map; + Config::ServerMap map; eckit::LocalConfiguration conf = getSubConfiguration("servermap"); std::vector serverList = conf.getSubConfigurations(); diff --git a/src/gribjump/Config.h b/src/gribjump/Config.h index 6528d03..958cfa4 100644 --- a/src/gribjump/Config.h +++ b/src/gribjump/Config.h @@ -12,26 +12,29 @@ #pragma once -#include +#include #include "eckit/config/LocalConfiguration.h" +#include "eckit/net/Endpoint.h" namespace gribjump { class Config : public eckit::LocalConfiguration { +public: // types + using ServerMap = std::unordered_map; public: Config(); Config(const eckit::PathName); - const std::map& serverMap() const { return serverMap_; } + const ServerMap& serverMap() const { return serverMap_; } ///@note : Will be empty if default config is used const std::string& path() const { return path_; } private: - std::map loadServerMap() const; + ServerMap loadServerMap() const; private: - std::map serverMap_; + ServerMap serverMap_; std::string path_; }; diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index d9a6c1c..8e88975 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -18,6 +18,7 @@ #include "gribjump/Engine.h" #include "gribjump/ExtractionItem.h" +#include "gribjump/Forwarder.h" #include "gribjump/remote/WorkQueue.h" #include "gribjump/jumper/JumperFactory.h" @@ -28,15 +29,6 @@ namespace gribjump { //---------------------------------------------------------------------------------------------------------------------- // Stringify requests and keys alphabetically -namespace { -// ---------------------------------------------------------------------------------------------------------------------- - -bool isRemote(eckit::URI uri) { - return uri.scheme() == "fdb"; -} - -} // namespace -//---------------------------------------------------------------------------------------------------------------------- Engine::Engine() {} @@ -112,81 +104,26 @@ filemap_t Engine::buildFileMap(const metkit::mars::MarsRequest& unionrequest, Ex return filemap; } -void Engine::forwardRemoteExtraction(filemap_t& filemap) { - // get servermap from config, which maps fdb remote uri to gribjump server uri - // format: fdbhost:port -> gjhost:port - /// @todo: dont parse servermap every request - const std::map& servermap_str = LibGribJump::instance().config().serverMap(); - ASSERT(!servermap_str.empty()); - - if (LibGribJump::instance().debug()) { - for (auto& [fdb, gj] : servermap_str) { - LOG_DEBUG_LIB(LibGribJump) << "Servermap: " << fdb << " -> " << gj << std::endl; - } - } - std::unordered_map servermap; - for (auto& [fdb, gj] : servermap_str) { - eckit::net::Endpoint fdbEndpoint(fdb); - eckit::net::Endpoint gjEndpoint(gj); - servermap[fdbEndpoint] = gjEndpoint; - } - - // Match servers with files - std::unordered_map> serverfiles; - for (auto& [fname, extractionItems] : filemap) { - eckit::URI uri = extractionItems[0]->URI(); - eckit::net::Endpoint fdbEndpoint; - - if(!isRemote(uri)) { - throw eckit::SeriousBug("URI is not remote: " + fname); - } - - fdbEndpoint = eckit::net::Endpoint(uri.host(), uri.port()); +void Engine::scheduleExtractionTasks(filemap_t& filemap){ - if (servermap.find(fdbEndpoint) == servermap.end()) { - throw eckit::SeriousBug("No gribjump endpoint found for fdb endpoint: " + std::string(fdbEndpoint)); - } - - serverfiles[servermap[fdbEndpoint]].push_back(fname); - } - - // make subfilemaps for each server - std::unordered_map serverfilemaps; - - for (auto& [server, files] : serverfiles) { - filemap_t subfilemap; - for (auto& fname : files) { - subfilemap[fname] = filemap[fname]; - } - serverfilemaps[server] = subfilemap; - } - - // forward to servers - size_t counter = 0; - for (auto& [endpoint, subfilemap] : serverfilemaps) { - taskGroup_.enqueueTask(new RemoteExtractionTask(taskGroup_, counter++, endpoint, subfilemap)); - } - - taskGroup_.waitForTasks(); -} - -void Engine::scheduleTasks(filemap_t& filemap){ - - bool remoteExtraction = LibGribJump::instance().config().getBool("remoteExtraction", false); - if (remoteExtraction) { - forwardRemoteExtraction(filemap); + bool forwardExtraction = LibGribJump::instance().config().getBool("forwardExtraction", false); + if (forwardExtraction) { + Forwarder forwarder; + forwarder.extract(filemap); return; } + bool inefficientExtraction = LibGribJump::instance().config().getBool("inefficientExtraction", false); + size_t counter = 0; for (auto& [fname, extractionItems] : filemap) { - if (isRemote(extractionItems[0]->URI())) { - // Only possible if we are using remoteFDB, which requires remoteExtraction to be enabled. - // We technically do support it via inefficient extraction, but we are disabling this for now. - // taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); - throw eckit::SeriousBug("Got remote URI from FDB, but remoteExtraction enabled in gribjump config."); - } - else { + if (extractionItems[0]->isRemote()) { + if (inefficientExtraction) { + taskGroup_.enqueueTask(new InefficientFileExtractionTask(taskGroup_, counter++, fname, extractionItems)); + } else { + throw eckit::SeriousBug("Got remote URI from FDB, but forwardExtraction enabled in gribjump config."); + } + } else { taskGroup_.enqueueTask(new FileExtractionTask(taskGroup_, counter++, fname, extractionItems)); } } @@ -204,7 +141,7 @@ ResultsMap Engine::extract(ExtractionRequests& requests) { MetricsManager::instance().set("elapsed_build_filemap", timer.elapsed()); timer.reset("Gribjump Engine: Built file map"); - scheduleTasks(filemap); + scheduleExtractionTasks(filemap); MetricsManager::instance().set("elapsed_tasks", timer.elapsed()); timer.reset("Gribjump Engine: All tasks finished"); @@ -230,38 +167,45 @@ ResultsMap Engine::collectResults(ExItemMap& keyToExtractionItem) { size_t Engine::scan(const MarsRequests& requests, bool byfiles) { - const std::map< eckit::PathName, eckit::OffsetList > filemap = FDBLister::instance().filesOffsets(requests); + std::vector uris = FDBLister::instance().URIs(requests); + + // forwarded scan requests + if (LibGribJump::instance().config().getBool("forwardScan", false)) { + Forwarder forwarder; + return forwarder.scan(uris); + } + + std::map< eckit::PathName, eckit::OffsetList > filemap = FDBLister::instance().filesOffsets(uris); if (byfiles) { // ignore offsets and scan entire file - std::vector files; - for (auto& [fname, offsets] : filemap) { - files.push_back(fname); + for (auto& [uri, offsets] : filemap) { + offsets.clear(); } - - return scan(files); - } - - size_t counter = 0; - for (auto& [fname, offsets] : filemap) { - taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, fname, offsets)); - } - taskGroup_.waitForTasks(); - - return filemap.size(); + return scan(filemap); } size_t Engine::scan(std::vector files) { - size_t counter = 0; - + + scanmap_t scanmap; for (auto& fname : files) { - taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, fname, {})); + scanmap[fname] = {}; } + return scan(scanmap); +} + +size_t Engine::scan(const scanmap_t& scanmap) { + + size_t counter = 0; + std::atomic nfields(0); + for (auto& [uri, offsets] : scanmap) { + taskGroup_.enqueueTask(new FileScanTask(taskGroup_, counter++, uri.path(), offsets, nfields)); + } taskGroup_.waitForTasks(); - return files.size(); + return nfields; } std::map > Engine::axes(const std::string& request, int level) { diff --git a/src/gribjump/Engine.h b/src/gribjump/Engine.h index 5302e96..4521795 100644 --- a/src/gribjump/Engine.h +++ b/src/gribjump/Engine.h @@ -33,10 +33,11 @@ class Engine { // byfiles: scan entire file, not just fields matching request size_t scan(const MarsRequests& requests, bool byfiles = false); size_t scan(std::vector files); + size_t scan(const scanmap_t& scanmap); std::map > axes(const std::string& request, int level=3); - void scheduleTasks(filemap_t& filemap); + void scheduleExtractionTasks(filemap_t& filemap); void reportErrors(eckit::Stream& client_); void raiseErrors(); @@ -45,12 +46,11 @@ class Engine { filemap_t buildFileMap(const metkit::mars::MarsRequest& unionrequest, ExItemMap& keyToExtractionItem); ResultsMap collectResults(ExItemMap& keyToExtractionItem); - void forwardRemoteExtraction(filemap_t& filemap); metkit::mars::MarsRequest buildRequestMap(ExtractionRequests& requests, ExItemMap& keyToExtractionItem ); private: - TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. + TaskGroup taskGroup_; /// @todo Maybe we should be returning the taskGroup, rather than storing it here. /// I would really prefer a stateless engine. }; diff --git a/src/gribjump/ExtractionItem.h b/src/gribjump/ExtractionItem.h index f90b892..eb7caee 100644 --- a/src/gribjump/ExtractionItem.h +++ b/src/gribjump/ExtractionItem.h @@ -18,6 +18,7 @@ #include "gribjump/LibGribJump.h" #include "gribjump/Types.h" +#include "gribjump/URIHelper.h" namespace gribjump { @@ -62,6 +63,7 @@ class ExtractionItem : public eckit::NonCopyable { const std::string& gridHash() const { return gridHash_; } void URI(const eckit::URI& uri) { uri_ = uri; } + bool isRemote() const { return URIHelper::isRemote(uri_); } void values(ExValues values) { values_ = std::move(values); } void mask(ExMask mask) { mask_ = std::move(mask); } diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc new file mode 100644 index 0000000..6720889 --- /dev/null +++ b/src/gribjump/Forwarder.cc @@ -0,0 +1,106 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley + +#include +#include +#include "eckit/filesystem/URI.h" +#include "gribjump/Forwarder.h" +#include "gribjump/ExtractionItem.h" +#include "gribjump/LibGribJump.h" +#include "gribjump/Task.h" +namespace gribjump { + +Forwarder::Forwarder() { +} + +Forwarder::~Forwarder() { +} + +size_t Forwarder::scan(const std::vector& uris) { + + // scanmap_t: filename -> offsets + std::unordered_map serverfilemaps; + + // Match servers with files + for (auto& uri : uris) { + eckit::net::Endpoint server = this->serverForURI(uri); + eckit::Offset offset = URIHelper::offset(uri); + std::string fname = uri.path(); + serverfilemaps[server][fname].push_back(offset); + } + + TaskGroup taskGroup; + size_t counter = 0; + std::vector n_fields; + size_t i = 0; + for (auto& [endpoint, scanmap] : serverfilemaps) { + taskGroup.enqueueTask(new ForwardScanTask(taskGroup, i, endpoint, scanmap, n_fields[i])); + i++; + } + taskGroup.waitForTasks(); + + return std::accumulate(n_fields.begin(), n_fields.end(), 0); +} + +void Forwarder::extract(filemap_t& filemap) { + std::unordered_map serverfilemaps = serverFileMap(filemap); + + TaskGroup taskGroup; + size_t counter = 0; + for (auto& [endpoint, subfilemap] : serverfilemaps) { + taskGroup.enqueueTask(new ForwardExtractionTask(taskGroup, counter++, endpoint, subfilemap)); + } + taskGroup.waitForTasks(); +} + + +const eckit::net::Endpoint& Forwarder::serverForURI(const eckit::URI& uri) const { + const Config::ServerMap& servermap = LibGribJump::instance().config().serverMap(); + + eckit::net::Endpoint fdbEndpoint(uri.host(), uri.port()); + + if (servermap.find(fdbEndpoint) == servermap.end()) { + throw eckit::SeriousBug("No gribjump endpoint found for fdb endpoint: " + std::string(fdbEndpoint)); + } + + return servermap.at(fdbEndpoint); +} + +// Splits a filemap into subfilemaps, each to be handled by a seperate remote server +std::unordered_map Forwarder::serverFileMap(filemap_t& filemap) { + + Config::ServerMap servermap = LibGribJump::instance().config().serverMap(); + + // Match servers with files + std::unordered_map> serverfiles; + for (auto& [fname, extractionItems] : filemap) { + eckit::URI uri = extractionItems[0]->URI(); // URIs are already grouped by file + eckit::net::Endpoint server = serverForURI(uri); + serverfiles[server].push_back(fname); + } + + // make subfilemaps for each server + std::unordered_map serverfilemaps; + + for (auto& [server, files] : serverfiles) { + filemap_t subfilemap; + for (auto& fname : files) { + subfilemap[fname] = filemap[fname]; + } + serverfilemaps[server] = subfilemap; + } + + return serverfilemaps; +} + + +} // namespace gribjump \ No newline at end of file diff --git a/src/gribjump/Forwarder.h b/src/gribjump/Forwarder.h new file mode 100644 index 0000000..3cea3f4 --- /dev/null +++ b/src/gribjump/Forwarder.h @@ -0,0 +1,39 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley +#include "gribjump/Types.h" +#include "eckit/net/Endpoint.h" + +#pragma once + +// Class used by Engine to forward requests onto remote servers +// Example: when the remoteFDB is in use, data is distributed across multiple stores and a catalogue +// The Engine will obtain the URIs from the catalogue and use this class to forward the requests to the appropriate gribjump server +namespace gribjump{ + +class Forwarder { +public: + + Forwarder(); + ~Forwarder(); + + size_t scan(const std::vector& uris); + void extract(filemap_t& filemap); + +private: + + std::unordered_map serverFileMap(filemap_t& filemap); + + const eckit::net::Endpoint& serverForURI(const eckit::URI& uri) const; + +}; + +} // namespace gribjump \ No newline at end of file diff --git a/src/gribjump/Lister.cc b/src/gribjump/Lister.cc index 8b644eb..5bd823d 100644 --- a/src/gribjump/Lister.cc +++ b/src/gribjump/Lister.cc @@ -17,6 +17,7 @@ #include "gribjump/GribJump.h" #include "gribjump/Lister.h" #include "gribjump/GribJumpException.h" +#include "gribjump/URIHelper.h" namespace gribjump { @@ -137,40 +138,40 @@ filemap_t FDBLister::fileMap(const metkit::mars::MarsRequest& unionRequest, cons return filemap; } -std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(std::vector requests) { - - eckit::AutoLock lock(this); +std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(const std::vector& requests) { + return filesOffsets(URIs(requests)); +} - std::map< eckit::PathName, eckit::OffsetList > files; +std::map< eckit::PathName, eckit::OffsetList > FDBLister::filesOffsets(const std::vector& uris) { + std::map< eckit::PathName, eckit::OffsetList > files; + for (auto& uri : uris) { + eckit::PathName path = uri.path(); + eckit::Offset offset = URIHelper::offset(uri); + auto it = files.find(path); + if(it == files.end()) { + eckit::OffsetList offsets; + offsets.push_back(offset); + files.emplace(path, offsets); + } + else { + it->second.push_back(offset); + } + } + return files; +} +std::vector FDBLister::URIs(const std::vector& requests) { + eckit::AutoLock lock(this); + std::vector uris; for (auto& request : requests) { - - size_t count = request.count(); // worse case scenario all fields in one file - fdb5::FDBToolRequest fdbreq(request); auto listIter = fdb_.list(fdbreq, true); - fdb5::ListElement elem; while (listIter.next(elem)) { - - const fdb5::FieldLocation& loc = elem.location(); - - eckit::PathName path = loc.uri().path(); - - auto it = files.find(path); - if(it == files.end()) { - eckit::OffsetList offsets; - offsets.reserve(count); - offsets.push_back(loc.offset()); - files.emplace(path, offsets); - } - else { - it->second.push_back(loc.offset()); - } + uris.push_back(elem.location().fullUri()); } } - - return files; + return uris; } std::map > FDBLister::axes(const std::string& request, int level) { diff --git a/src/gribjump/Lister.h b/src/gribjump/Lister.h index e181569..240c6b1 100644 --- a/src/gribjump/Lister.h +++ b/src/gribjump/Lister.h @@ -60,8 +60,10 @@ class FDBLister : public Lister { filemap_t fileMap(const metkit::mars::MarsRequest& unionRequest, const ExItemMap& reqToXRR); // Used during extraction - std::map< eckit::PathName, eckit::OffsetList > filesOffsets(std::vector requests); // Used during scan + std::map< eckit::PathName, eckit::OffsetList > filesOffsets(const std::vector& requests); // Used during scan + std::map< eckit::PathName, eckit::OffsetList > filesOffsets(const std::vector& uris); + std::vector URIs(const std::vector& requests); private: FDBLister(); diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index 2ca85d1..cf6eedd 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -197,20 +197,34 @@ void FileExtractionTask::extract() { //---------------------------------------------------------------------------------------------------------------------- // Forward the work to a remote server, and wait for the results. -RemoteExtractionTask::RemoteExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap) : +ForwardExtractionTask::ForwardExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap) : Task(taskgroup, id), endpoint_(endpoint), filemap_(filemap) {} -void RemoteExtractionTask::execute(){ +void ForwardExtractionTask::execute(){ RemoteGribJump remoteGribJump(endpoint_); - remoteGribJump.extract(filemap_); + remoteGribJump.forwardExtract(filemap_); notify(); } +ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, size_t& count) : + Task(taskgroup, id), + endpoint_(endpoint), + scanmap_(scanmap), + count_(count) { +} + +void ForwardScanTask::execute(){ + + RemoteGribJump remoteGribJump(endpoint_); + count_ = remoteGribJump.forwardScan(scanmap_); + notify(); +} + //---------------------------------------------------------------------------------------------------------------------- InefficientFileExtractionTask::InefficientFileExtractionTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, ExtractionItems& extractionItems): FileExtractionTask(taskgroup, id, fname, extractionItems) { @@ -258,10 +272,11 @@ void InefficientFileExtractionTask::extract() { //---------------------------------------------------------------------------------------------------------------------- -FileScanTask::FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets) : +FileScanTask::FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets, std::atomic& nfields) : Task(taskgroup, id), fname_(fname), - offsets_(offsets) { + offsets_(offsets), + nfields_(nfields){ } void FileScanTask::execute() { @@ -278,11 +293,11 @@ void FileScanTask::execute() { void FileScanTask::scan() { if (offsets_.size() == 0) { - InfoCache::instance().scan(fname_); + nfields_ += InfoCache::instance().scan(fname_); return; } - InfoCache::instance().scan(fname_, offsets_); + nfields_ += InfoCache::instance().scan(fname_, offsets_); } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 05576bb..5e00ddd 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -145,10 +145,10 @@ class InefficientFileExtractionTask : public FileExtractionTask { //---------------------------------------------------------------------------------------------------------------------- // Task that forwards the work to a remote server, based on the URI of the extraction item. -class RemoteExtractionTask : public Task { +class ForwardExtractionTask : public Task { public: - RemoteExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap); + ForwardExtractionTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, filemap_t& filemap); void execute() override; @@ -157,6 +157,20 @@ class RemoteExtractionTask : public Task { filemap_t& filemap_; }; +// Task that forwards the work to a remote server, based on the URI of the extraction item. +class ForwardScanTask : public Task { +public: + + ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, size_t& count); + + void execute() override; + +private: + eckit::net::Endpoint endpoint_; + scanmap_t& scanmap_; + size_t& count_; +}; + //---------------------------------------------------------------------------------------------------------------------- class FileScanTask : public Task { @@ -164,7 +178,7 @@ class FileScanTask : public Task { // Each extraction item is assumed to be for the same file. - FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets); + FileScanTask(TaskGroup& taskgroup, const size_t id, const eckit::PathName& fname, const std::vector& offsets, std::atomic& nfields); void execute() override; @@ -173,6 +187,7 @@ class FileScanTask : public Task { private: eckit::PathName fname_; std::vector offsets_; + std::atomic& nfields_; }; diff --git a/src/gribjump/Types.h b/src/gribjump/Types.h index d0b5501..0517727 100644 --- a/src/gribjump/Types.h +++ b/src/gribjump/Types.h @@ -14,6 +14,7 @@ #include #include #include +#include "metkit/mars/MarsRequest.h" namespace gribjump { @@ -36,6 +37,7 @@ using ExtractionItems = std::vector; // Non-owning pointers using ExItemMap = std::map>; // filemap holds non-owning pointers to ExtractionItems -using filemap_t = std::map; +using filemap_t = std::map; // filename -> ExtractionItems +using scanmap_t = std::map; // filename -> offsets } // namespace gribjump diff --git a/src/gribjump/URIHelper.h b/src/gribjump/URIHelper.h new file mode 100644 index 0000000..a7014f5 --- /dev/null +++ b/src/gribjump/URIHelper.h @@ -0,0 +1,39 @@ +/* + * (C) Copyright 2024- ECMWF. + * + * This software is licensed under the terms of the Apache Licence Version 2.0 + * which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. + * In applying this licence, ECMWF does not waive the privileges and immunities + * granted to it by virtue of its status as an intergovernmental organisation nor + * does it submit to any jurisdiction. + */ + +/// @author Christopher Bradley +#include "eckit/filesystem/URI.h" + +#pragma once + +namespace gribjump{ + +// Helper class to extend eckit::URI +class URIHelper { +public: + + static eckit::Offset offset(const eckit::URI& uri) { + const std::string& fragment = uri.fragment(); + eckit::Offset offset; + try { + offset = std::stoll(fragment); + } catch (std::invalid_argument& e) { + throw eckit::BadValue("Invalid offset: '" + fragment + "' in URI: " + uri.asString(), Here()); + } + return offset; + } + + static bool isRemote(const eckit::URI& uri) { + return uri.scheme() == "fdb"; + } + +}; + +} // namespace gribjump \ No newline at end of file diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 7ed2647..2d9c7d1 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -188,14 +188,13 @@ void InfoCache::clear() { cache_.clear(); } -void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector& offsets) { +size_t InfoCache::scan(const eckit::PathName& fdbpath, const std::vector& offsets) { // this will be executed in parallel so we dont lock main mutex_ here // we will rely on each method to lock mutex when needed LOG_DEBUG_LIB(LibGribJump) << "Scanning " << fdbpath << " at " << eckit::Plural(offsets.size(), "offsets") << std::endl; - // if cache exists load so we can merge with memory cache std::shared_ptr filecache = getFileCache(fdbpath); filecache->load(); @@ -209,30 +208,33 @@ void InfoCache::scan(const eckit::PathName& fdbpath, const std::vector> uinfos = extractor.extract(fdbpath, newOffsets); + std::vector> infos = extractor.extract(fdbpath, newOffsets); // std::vector> infos; // infos.reserve(uinfos.size()); // std::move(uinfos.begin(), uinfos.end(), std::back_inserter(infos)); // filecache->insert(infos); - for (size_t i = 0; i < uinfos.size(); i++) { - filecache->insert(newOffsets[i], std::move(uinfos[i])); + for (size_t i = 0; i < infos.size(); i++) { + filecache->insert(newOffsets[i], std::move(infos[i])); } if (persistentCache_) { filecache->write(); } + return infos.size(); } -void InfoCache::scan(const eckit::PathName& fdbpath) { +size_t InfoCache::scan(const eckit::PathName& fdbpath) { LOG_DEBUG_LIB(LibGribJump) << "Scanning whole file " << fdbpath << std::endl; @@ -241,15 +243,16 @@ void InfoCache::scan(const eckit::PathName& fdbpath) { filecache->load(); InfoExtractor extractor; - std::vector>> uinfos = extractor.extract(fdbpath); /* This needs to give use the offsets too*/ - for (size_t i = 0; i < uinfos.size(); i++) { - filecache->insert(uinfos[i].first, std::move(uinfos[i].second)); + std::vector>> infos = extractor.extract(fdbpath); + for (size_t i = 0; i < infos.size(); i++) { + filecache->insert(infos[i].first, std::move(infos[i].second)); } if (persistentCache_) { filecache->write(); } + return infos.size(); } void InfoCache::print(std::ostream& s) const { diff --git a/src/gribjump/info/InfoCache.h b/src/gribjump/info/InfoCache.h index b695a5e..ae89f39 100644 --- a/src/gribjump/info/InfoCache.h +++ b/src/gribjump/info/InfoCache.h @@ -40,9 +40,9 @@ class InfoCache { /// @brief Scans grib file at provided offsets and populates cache /// @param path full path to grib file /// @param offsets list of offsets to at which GribInfo should be extracted - void scan(const eckit::PathName& path, const std::vector& offsets); + size_t scan(const eckit::PathName& path, const std::vector& offsets); - void scan(const eckit::PathName& path); // < scan all fields in a file + size_t scan(const eckit::PathName& path); // < scan all fields in a file /// Inserts a JumpInfo entry diff --git a/src/gribjump/remote/GribJumpUser.cc b/src/gribjump/remote/GribJumpUser.cc index a8cf187..85fca7e 100644 --- a/src/gribjump/remote/GribJumpUser.cc +++ b/src/gribjump/remote/GribJumpUser.cc @@ -83,6 +83,9 @@ void GribJumpUser::handle_client(eckit::Stream& s, eckit::Timer& timer) { case RequestType::FORWARD_EXTRACT: processRequest(s); break; + case RequestType::FORWARD_SCAN: + processRequest(s); + break; default: throw eckit::SeriousBug("Unknown request type: " + std::to_string(i_requestType)); } diff --git a/src/gribjump/remote/RemoteGribJump.cc b/src/gribjump/remote/RemoteGribJump.cc index 15c036b..c3ca619 100644 --- a/src/gribjump/remote/RemoteGribJump.cc +++ b/src/gribjump/remote/RemoteGribJump.cc @@ -78,6 +78,39 @@ size_t RemoteGribJump::scan(const std::vector& reques return count; } +// Forward scan request to another server +size_t RemoteGribJump::forwardScan(const std::map& map) { + ///@todo we could probably do the connection logic in the ctor + eckit::Timer timer("RemoteGribJump::scan()"); + eckit::net::TCPClient client; + eckit::net::InstantTCPStream stream(client.connect(host_, port_)); + timer.report("Connection established"); + + sendHeader(stream, RequestType::FORWARD_SCAN); + + size_t nFiles = map.size(); + stream << nFiles; + + for (auto& [fname, offsets] : map) { + stream << fname; + stream << offsets; + } + + bool error = receiveErrors(stream); + + size_t count = 0; + for (size_t i = 0; i < nFiles; i++) { + size_t nOffsets; + stream >> nOffsets; + count += nOffsets; + } + + eckit::Log::info() << "Scanned " << eckit::Plural(count, "field") << "on endpoint " << host_ << ":" << port_ << std::endl; + + timer.report("Scans complete"); + return count; +} + std::vector>> RemoteGribJump::extract(std::vector& requests) { eckit::Timer timer("RemoteGribJump::extract()"); std::vector>> result; @@ -121,9 +154,9 @@ std::vector> RemoteGribJump::extract(const eckit } // Forward extraction request to another server -void RemoteGribJump::extract(filemap_t& filemap){ +void RemoteGribJump::forwardExtract(filemap_t& filemap){ - eckit::Timer timer("RemoteGribJump::extract()"); + eckit::Timer timer("RemoteGribJump::forwardExtract()"); ///@todo we could probably do the connection logic in the ctor eckit::net::TCPClient client; diff --git a/src/gribjump/remote/RemoteGribJump.h b/src/gribjump/remote/RemoteGribJump.h index ce0ac04..a46c22c 100644 --- a/src/gribjump/remote/RemoteGribJump.h +++ b/src/gribjump/remote/RemoteGribJump.h @@ -22,9 +22,11 @@ enum class RequestType : uint16_t { EXTRACT = 0, AXES, SCAN, - FORWARD_EXTRACT + FORWARD_EXTRACT, + FORWARD_SCAN }; -constexpr static uint16_t remoteProtocolVersion = 2; + +constexpr static uint16_t remoteProtocolVersion = 3; class RemoteGribJump : public GribJumpBase { @@ -35,12 +37,12 @@ class RemoteGribJump : public GribJumpBase { ~RemoteGribJump(); size_t scan(const std::vector& path) override { NOTIMP; } - size_t scan(const std::vector& requests, bool byfiles) override; + size_t forwardScan(const std::map& map); std::vector>> extract(std::vector& polyRequest) override; std::vector> extract(const eckit::PathName& path, const std::vector& offsets, const std::vector>& ranges) override; - void extract(filemap_t& filemap); + void forwardExtract(filemap_t& filemap); std::map> axes(const std::string& request, int level) override; diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index 93f6f0d..fdf6e7b 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -31,8 +31,6 @@ Request::Request(eckit::Stream& stream) : client_(stream) { MetricsManager::instance().set("request_id", id_); } -Request::~Request() {} - void Request::reportErrors() { engine_.reportErrors(client_); } @@ -58,17 +56,12 @@ ScanRequest::ScanRequest(eckit::Stream& stream) : Request(stream) { MetricsManager::instance().set("count_requests", numRequests); } -ScanRequest::~ScanRequest() { -} - void ScanRequest::execute() { nfiles_ = engine_.scan(requests_, byfiles_); } void ScanRequest::replyToClient() { - client_ << nfiles_; - } //---------------------------------------------------------------------------------------------------------------------- @@ -91,9 +84,6 @@ ExtractRequest::ExtractRequest(eckit::Stream& stream) : Request(stream) { MetricsManager::instance().set("count_requests", nRequests); } -ExtractRequest::~ExtractRequest() { -} - void ExtractRequest::execute() { results_ = engine_.extract(requests_); @@ -167,11 +157,8 @@ ForwardedExtractRequest::ForwardedExtractRequest(eckit::Stream& stream) : Reques ASSERT(count > 0); // We should not be talking to this server if we have no requests. } -ForwardedExtractRequest::~ForwardedExtractRequest() { -} - void ForwardedExtractRequest::execute() { - engine_.scheduleTasks(filemap_); + engine_.scheduleExtractionTasks(filemap_); } void ForwardedExtractRequest::replyToClient() { @@ -187,6 +174,37 @@ void ForwardedExtractRequest::replyToClient() { } } + +//---------------------------------------------------------------------------------------------------------------------- + +ForwardedScanRequest::ForwardedScanRequest(eckit::Stream& stream) : Request(stream) { + MetricsManager::instance().set("action", "forwarded-scan"); + + size_t nFiles; + client_ >> nFiles; + LOG_DEBUG_LIB(LibGribJump) << "ForwardedScanRequest: nFiles=" << nFiles << std::endl; + + + size_t count = 0; + for (size_t i = 0; i < nFiles; i++) { + std::string fname; + eckit::OffsetList offsets; + client_ >> fname; + client_ >> offsets; + scanmap_[fname] = offsets; + count += offsets.size(); + } + + MetricsManager::instance().set("count_received_offsets", count); +} + +void ForwardedScanRequest::execute() { + nfields_ = engine_.scan(scanmap_); +} + +void ForwardedScanRequest::replyToClient() { + client_ << nfields_; +} //---------------------------------------------------------------------------------------------------------------------- AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) { @@ -196,9 +214,6 @@ AxesRequest::AxesRequest(eckit::Stream& stream) : Request(stream) { ASSERT(request_.size() > 0); } -AxesRequest::~AxesRequest() { -} - void AxesRequest::execute() { axes_ = engine_.axes(request_, level_); } diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index 463b972..abb3681 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -32,7 +32,7 @@ class Request { Request(eckit::Stream& stream); - virtual ~Request(); + virtual ~Request() = default; // Have engine execute the request virtual void execute() = 0; @@ -57,7 +57,7 @@ class ScanRequest : public Request { ScanRequest(eckit::Stream& stream); - ~ScanRequest(); + ~ScanRequest() = default; void execute() override; @@ -79,7 +79,7 @@ class ExtractRequest : public Request { ExtractRequest(eckit::Stream& stream); - ~ExtractRequest(); + ~ExtractRequest() = default; void execute() override; @@ -99,7 +99,7 @@ class ForwardedExtractRequest : public Request { ForwardedExtractRequest(eckit::Stream& stream); - ~ForwardedExtractRequest(); + ~ForwardedExtractRequest() = default; void execute() override; @@ -113,6 +113,29 @@ class ForwardedExtractRequest : public Request { ResultsMap results_; }; + +//---------------------------------------------------------------------------------------------------------------------- + +class ForwardedScanRequest : public Request { +public: + + ForwardedScanRequest(eckit::Stream& stream); + + ~ForwardedScanRequest() = default; + + void execute() override; + + void replyToClient() override; + +private: + + std::vector> items_; + scanmap_t scanmap_; + ResultsMap results_; + + size_t nfields_; +}; + //---------------------------------------------------------------------------------------------------------------------- class AxesRequest : public Request { @@ -120,7 +143,7 @@ class AxesRequest : public Request { AxesRequest(eckit::Stream& stream); - ~AxesRequest(); + ~AxesRequest() = default; void execute() override; From a07399b32671ca72aa70aada63f6d806d5ad9b08 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 26 Nov 2024 16:06:45 +0000 Subject: [PATCH 2/5] Fix reporting of number of fields scanned --- src/gribjump/Engine.cc | 7 +++++++ src/gribjump/Forwarder.cc | 11 +++++++---- src/gribjump/Task.cc | 6 +++--- src/gribjump/Task.h | 4 ++-- src/gribjump/info/InfoCache.cc | 14 +++++--------- src/gribjump/remote/RemoteGribJump.cc | 23 +++++++---------------- src/gribjump/remote/Request.cc | 4 ++-- src/gribjump/remote/Request.h | 2 +- src/tools/gribjump-scan.cc | 3 ++- 9 files changed, 36 insertions(+), 38 deletions(-) diff --git a/src/gribjump/Engine.cc b/src/gribjump/Engine.cc index 8e88975..cb3aa02 100644 --- a/src/gribjump/Engine.cc +++ b/src/gribjump/Engine.cc @@ -169,6 +169,11 @@ size_t Engine::scan(const MarsRequests& requests, bool byfiles) { std::vector uris = FDBLister::instance().URIs(requests); + if (uris.empty()) { + MetricsManager::instance().set("count_scanned_fields", 0); + return 0; + } + // forwarded scan requests if (LibGribJump::instance().config().getBool("forwardScan", false)) { Forwarder forwarder; @@ -205,6 +210,8 @@ size_t Engine::scan(const scanmap_t& scanmap) { } taskGroup_.waitForTasks(); + MetricsManager::instance().set("count_scanned_fields", static_cast(nfields)); + return nfields; } diff --git a/src/gribjump/Forwarder.cc b/src/gribjump/Forwarder.cc index 6720889..5b01e37 100644 --- a/src/gribjump/Forwarder.cc +++ b/src/gribjump/Forwarder.cc @@ -12,11 +12,13 @@ #include #include +#include #include "eckit/filesystem/URI.h" #include "gribjump/Forwarder.h" #include "gribjump/ExtractionItem.h" #include "gribjump/LibGribJump.h" #include "gribjump/Task.h" + namespace gribjump { Forwarder::Forwarder() { @@ -27,6 +29,8 @@ Forwarder::~Forwarder() { size_t Forwarder::scan(const std::vector& uris) { + ASSERT(uris.size() > 0); + // scanmap_t: filename -> offsets std::unordered_map serverfilemaps; @@ -40,15 +44,14 @@ size_t Forwarder::scan(const std::vector& uris) { TaskGroup taskGroup; size_t counter = 0; - std::vector n_fields; + std::atomic nFields(0); size_t i = 0; for (auto& [endpoint, scanmap] : serverfilemaps) { - taskGroup.enqueueTask(new ForwardScanTask(taskGroup, i, endpoint, scanmap, n_fields[i])); - i++; + taskGroup.enqueueTask(new ForwardScanTask(taskGroup, counter++, endpoint, scanmap, nFields)); } taskGroup.waitForTasks(); - return std::accumulate(n_fields.begin(), n_fields.end(), 0); + return nFields; } void Forwarder::extract(filemap_t& filemap) { diff --git a/src/gribjump/Task.cc b/src/gribjump/Task.cc index cf6eedd..69fd3ec 100644 --- a/src/gribjump/Task.cc +++ b/src/gribjump/Task.cc @@ -211,17 +211,17 @@ void ForwardExtractionTask::execute(){ notify(); } -ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, size_t& count) : +ForwardScanTask::ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields): Task(taskgroup, id), endpoint_(endpoint), scanmap_(scanmap), - count_(count) { + nfields_(nfields) { } void ForwardScanTask::execute(){ RemoteGribJump remoteGribJump(endpoint_); - count_ = remoteGribJump.forwardScan(scanmap_); + nfields_ += remoteGribJump.forwardScan(scanmap_); notify(); } diff --git a/src/gribjump/Task.h b/src/gribjump/Task.h index 5e00ddd..2824db2 100644 --- a/src/gribjump/Task.h +++ b/src/gribjump/Task.h @@ -161,14 +161,14 @@ class ForwardExtractionTask : public Task { class ForwardScanTask : public Task { public: - ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, size_t& count); + ForwardScanTask(TaskGroup& taskgroup, const size_t id, eckit::net::Endpoint endpoint, scanmap_t& scanmap, std::atomic& nfields_); void execute() override; private: eckit::net::Endpoint endpoint_; scanmap_t& scanmap_; - size_t& count_; + std::atomic& nfields_; }; //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 2d9c7d1..40a215d 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -49,7 +49,6 @@ InfoCache::InfoCache(): bool enabled = config.getBool("cache.enabled", true); if (!enabled) { - persistentCache_ = false; LOG_DEBUG_LIB(LibGribJump) << "Cache disabled" << std::endl; return; } @@ -197,7 +196,7 @@ size_t InfoCache::scan(const eckit::PathName& fdbpath, const std::vector filecache = getFileCache(fdbpath); - filecache->load(); + filecache->reload(); // Find which offsets are not already in file cache std::vector newOffsets; @@ -227,20 +226,19 @@ size_t InfoCache::scan(const eckit::PathName& fdbpath, const std::vectorinsert(newOffsets[i], std::move(infos[i])); } - if (persistentCache_) { - filecache->write(); - } + filecache->write(); return infos.size(); } +/// @todo maybe merge this with the above method size_t InfoCache::scan(const eckit::PathName& fdbpath) { LOG_DEBUG_LIB(LibGribJump) << "Scanning whole file " << fdbpath << std::endl; // if cache exists load so we can merge with memory cache std::shared_ptr filecache = getFileCache(fdbpath); - filecache->load(); + filecache->reload(); InfoExtractor extractor; std::vector>> infos = extractor.extract(fdbpath); @@ -248,9 +246,7 @@ size_t InfoCache::scan(const eckit::PathName& fdbpath) { filecache->insert(infos[i].first, std::move(infos[i].second)); } - if (persistentCache_) { - filecache->write(); - } + filecache->write(); return infos.size(); } diff --git a/src/gribjump/remote/RemoteGribJump.cc b/src/gribjump/remote/RemoteGribJump.cc index c3ca619..6921556 100644 --- a/src/gribjump/remote/RemoteGribJump.cc +++ b/src/gribjump/remote/RemoteGribJump.cc @@ -66,16 +66,11 @@ size_t RemoteGribJump::scan(const std::vector& reques bool error = receiveErrors(stream); - size_t count = 0; - for (size_t i = 0; i < nRequests; i++) { - size_t nfiles; - stream >> nfiles; - eckit::Log::info() << "Scanned " << eckit::Plural(nfiles, "file") << std::endl; - count += nfiles; - } + size_t nFields; + stream >> nFields; timer.report("Scans complete"); - return count; + return nFields; } // Forward scan request to another server @@ -98,17 +93,13 @@ size_t RemoteGribJump::forwardScan(const std::map> nOffsets; - count += nOffsets; - } + size_t nfields = 0; + stream >> nfields; - eckit::Log::info() << "Scanned " << eckit::Plural(count, "field") << "on endpoint " << host_ << ":" << port_ << std::endl; + eckit::Log::info() << "Scanned " << nfields << " field(s) on endpoint " << host_ << ":" << port_ << std::endl; timer.report("Scans complete"); - return count; + return nfields; } std::vector>> RemoteGribJump::extract(std::vector& requests) { diff --git a/src/gribjump/remote/Request.cc b/src/gribjump/remote/Request.cc index fdf6e7b..fa5434c 100644 --- a/src/gribjump/remote/Request.cc +++ b/src/gribjump/remote/Request.cc @@ -57,11 +57,11 @@ ScanRequest::ScanRequest(eckit::Stream& stream) : Request(stream) { } void ScanRequest::execute() { - nfiles_ = engine_.scan(requests_, byfiles_); + nFields_ = engine_.scan(requests_, byfiles_); } void ScanRequest::replyToClient() { - client_ << nfiles_; + client_ << nFields_; } //---------------------------------------------------------------------------------------------------------------------- diff --git a/src/gribjump/remote/Request.h b/src/gribjump/remote/Request.h index abb3681..7483f7d 100644 --- a/src/gribjump/remote/Request.h +++ b/src/gribjump/remote/Request.h @@ -68,7 +68,7 @@ class ScanRequest : public Request { std::vector requests_; bool byfiles_; - size_t nfiles_; + size_t nFields_; }; diff --git a/src/tools/gribjump-scan.cc b/src/tools/gribjump-scan.cc index 6baad57..0d601fe 100644 --- a/src/tools/gribjump-scan.cc +++ b/src/tools/gribjump-scan.cc @@ -76,7 +76,8 @@ void Scan::execute(const eckit::option::CmdArgs &args) { GribJump gj; - gj.scan(requests, files); + size_t nfields = gj.scan(requests, files); + eckit::Log::info() << "Scanned " << nfields << " field(s)" << std::endl; } } // gribjump::tool From 0f29c103d9992ca2ae9336c1fa18da98234e800c Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 26 Nov 2024 17:01:27 +0000 Subject: [PATCH 3/5] Improve scan command line arg support --- src/tools/gribjump-scan.cc | 66 +++++++++++++++++++----------- tests/tools/callback_vs_scan.sh.in | 2 +- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/tools/gribjump-scan.cc b/src/tools/gribjump-scan.cc index 0d601fe..ceb370d 100644 --- a/src/tools/gribjump-scan.cc +++ b/src/tools/gribjump-scan.cc @@ -15,6 +15,7 @@ #include "metkit/mars/MarsExpension.h" #include "fdb5/api/FDB.h" +#include "fdb5/api/helpers/FDBToolRequest.h" #include "gribjump/GribJump.h" #include "gribjump/tools/GribJumpTool.h" @@ -31,21 +32,21 @@ class Scan: public GribJumpTool { virtual void execute(const eckit::option::CmdArgs &args); virtual void usage(const std::string &tool) const; - virtual int numberOfPositionalArguments() const { return 1; } + virtual int numberOfPositionalArguments() const { return -1; } public: Scan(int argc, char **argv): GribJumpTool(argc, argv) { + options_.push_back(new eckit::option::SimpleOption("file", "Reads the mars requests from a file, rather than from the command line")); options_.push_back(new eckit::option::SimpleOption("raw", "Uses the raw request, without expansion")); - options_.push_back(new eckit::option::SimpleOption("files", "Scan entire files matching the request (default: true)")); + options_.push_back(new eckit::option::SimpleOption("byfiles", "Scan entire files matching the request (default: true)")); } }; void Scan::usage(const std::string &tool) const { eckit::Log::info() << std::endl - << "Usage: " << tool << " " << std::endl - << " " << tool << " --raw " << std::endl - << " " << tool << " --files " << std::endl + << "Usage: " << tool << " class=od,stream=oper,expver=xxxx" << std::endl + << " " << tool << " --file=" << std::endl ; GribJumpTool::usage(tool); @@ -54,29 +55,48 @@ void Scan::usage(const std::string &tool) const { void Scan::execute(const eckit::option::CmdArgs &args) { bool raw = args.getBool("raw", false); - bool files = args.getBool("files", false); - - // Build request(s) from input - std::ifstream in(args(0).c_str()); - if (in.bad()) { - throw eckit::ReadError(args(0)); - } + bool byfiles = args.getBool("byfiles", false); + std::string file = args.getString("file", ""); std::vector requests; - metkit::mars::MarsParser parser(in); - auto parsedRequests = parser.parse(); - if (raw) { - for (auto r : parsedRequests) - requests.push_back(r); + + if (args.count() > 0 && !file.empty()) { + throw eckit::UserError("Invalid arguments: Cannot specify both a file (--file) and a request (positional arguments)"); + } + + if (!file.empty()){ + // Build request(s) from + std::ifstream in(file); + if (in.bad()) { + throw eckit::ReadError(args(0)); + } + + metkit::mars::MarsParser parser(in); + auto parsedRequests = parser.parse(); + if (raw) { + for (auto r : parsedRequests) + requests.push_back(r); + } else { + bool inherit = false; + metkit::mars::MarsExpension expand(inherit); + requests = expand.expand(parsedRequests); + } } else { - bool inherit = false; - metkit::mars::MarsExpension expand(inherit); - requests = expand.expand(parsedRequests); + // Build request(s) from positional arguments + std::vector requests_in; + for (size_t i = 0; i < args.count(); ++i) { + requests_in.emplace_back(args(i)); + } + + for (const std::string& request_string : requests_in) { + auto parsed = fdb5::FDBToolRequest::requestsFromString(request_string, {}, raw, "retrieve"); + for (auto r : parsed) { + requests.push_back(r.request()); + } + } } - GribJump gj; - - size_t nfields = gj.scan(requests, files); + size_t nfields = gj.scan(requests, byfiles); eckit::Log::info() << "Scanned " << nfields << " field(s)" << std::endl; } diff --git a/tests/tools/callback_vs_scan.sh.in b/tests/tools/callback_vs_scan.sh.in index 51db402..46ade95 100755 --- a/tests/tools/callback_vs_scan.sh.in +++ b/tests/tools/callback_vs_scan.sh.in @@ -80,7 +80,7 @@ FDB5_CONFIG_FILE=${bindir}/config_root1.yaml $fdbwrite ${testdata} echo "Writing to FDB without gribjump plugin, then scanning" FDB5_CONFIG_FILE=${bindir}/config_root2.yaml $fdbwrite ${testdata} -FDB5_CONFIG_FILE=${bindir}/config_root2.yaml $gjscan $selectedrequests +FDB5_CONFIG_FILE=${bindir}/config_root2.yaml $gjscan --file=$selectedrequests # ------------------------------------- # 3: FDB with gribjump plugin From 6f5a5ab87d18cf97bd0033a8ab08fa6922cf024b Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 26 Nov 2024 17:20:26 +0000 Subject: [PATCH 4/5] Add remote scan test (without forwarding) --- tests/remote/test_remote.cc | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/remote/test_remote.cc b/tests/remote/test_remote.cc index 84915cf..c7add79 100644 --- a/tests/remote/test_remote.cc +++ b/tests/remote/test_remote.cc @@ -94,6 +94,20 @@ CASE( "Remote protocol: axes" ) { } +CASE( "Remote protocol: scan" ) { + + std::vector requests; + + for (auto& r : fdb5::FDBToolRequest::requestsFromString("class=rd,expver=xxxx")) { + requests.push_back(r.request()); + } + + GribJump gribjump; + LogContext ctx("test_scan"); + size_t nfields = gribjump.scan(requests, false, ctx); + EXPECT_EQUAL(nfields, 3); +} + #ifdef GRIBJUMP_HAVE_DHSKIT // metrics target is set by dhskit CASE( "Parse the metrics file" ) { @@ -112,7 +126,7 @@ CASE( "Parse the metrics file" ) { "context" }; - // Expect 2 JSON objects in the file + // Expect one JSON object in the file, per test case above std::vector values; std::ifstream file(metricsFile.asString().c_str()); std::string line; @@ -126,7 +140,7 @@ CASE( "Parse the metrics file" ) { EXPECT(v.contains(key)); } } - EXPECT_EQUAL(values.size(), 2); + EXPECT_EQUAL(values.size(), 3); // Check extract eckit::Value v = values[0]; EXPECT_EQUAL(v["action"], "extract"); @@ -137,6 +151,10 @@ CASE( "Parse the metrics file" ) { EXPECT_EQUAL(v["action"], "axes"); EXPECT_EQUAL(v["context"], "test_axes"); + // Check scan + v = values[2]; + EXPECT_EQUAL(v["action"], "scan"); + EXPECT_EQUAL(v["context"], "test_scan"); } #endif } // namespace test From cb4a1ec218bef6d3cc24989cacc6bb95c9cab16b Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Tue, 26 Nov 2024 18:00:53 +0000 Subject: [PATCH 5/5] Assert correct file extension before writing indexes --- src/gribjump/info/InfoCache.cc | 14 +++++++++----- src/gribjump/info/InfoCache.h | 10 ++++------ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/gribjump/info/InfoCache.cc b/src/gribjump/info/InfoCache.cc index 0e0114f..8b1ca19 100644 --- a/src/gribjump/info/InfoCache.cc +++ b/src/gribjump/info/InfoCache.cc @@ -314,7 +314,7 @@ void IndexFile::reload() { load(); } -void IndexFile::encode(eckit::Stream& s) { +void IndexFile::encode(eckit::Stream& s) const { std::lock_guard lock(mutex_); s << currentVersion_; @@ -342,13 +342,15 @@ void IndexFile::decode(eckit::Stream& s) { LOG_DEBUG_LIB(LibGribJump) << "Loaded " << count << " entries from stream" << std::endl; } -void IndexFile::toNewFile(eckit::PathName path){ +void IndexFile::toNewFile(const eckit::PathName& path) const { + ASSERT(path.extension() == file_ext); eckit::FileStream s(path, "w"); encode(s); s.close(); } -void IndexFile::appendToFile(eckit::PathName path) { +void IndexFile::appendToFile(const eckit::PathName& path) const { + ASSERT(path.extension() == file_ext); // NB: Non-atomic. Gribjump does not support concurrent writes to the same file from different processes. if (!path.exists()) { toNewFile(path); @@ -368,7 +370,7 @@ void IndexFile::appendToFile(eckit::PathName path) { s.close(); } -void IndexFile::fromFile(eckit::PathName path) { +void IndexFile::fromFile(const eckit::PathName& path) { eckit::FileStream s(path, "r"); decode(s); s.close(); @@ -387,11 +389,13 @@ void IndexFile::merge(IndexFile& other) { void IndexFile::write() { // create a unique filename for the cache file before (atomically) moving it into place - eckit::PathName uniqPath = eckit::PathName::unique(path_); + eckit::PathName uniqPath = eckit::PathName::unique(path_) + file_ext; toNewFile(uniqPath); // atomically move the file into place LOG_DEBUG_LIB(LibGribJump) << "IndexFile writing to file " << path_ << std::endl; + + ASSERT(path_.extension() == file_ext); eckit::PathName::rename(uniqPath, path_); } diff --git a/src/gribjump/info/InfoCache.h b/src/gribjump/info/InfoCache.h index 59482a2..a519966 100644 --- a/src/gribjump/info/InfoCache.h +++ b/src/gribjump/info/InfoCache.h @@ -85,8 +85,6 @@ class InfoCache { mutable std::mutex infomutex_;; //< mutex for infocache_ infocache_t infocache_; - bool persistentCache_ = true; - bool lazy_; //< if true, cache.get may construct JumpInfo on the fly bool shadowCache_ = false; //< if true, cache files are persisted next to the original data files (e.g. in FDB) @@ -117,7 +115,7 @@ class IndexFile { private: // Methods are only intended to be called from InfoCache - void encode(eckit::Stream& s); + void encode(eckit::Stream& s) const; void decode(eckit::Stream& s); @@ -129,9 +127,9 @@ class IndexFile { void insert(eckit::Offset offset, std::shared_ptr info); - void toNewFile(eckit::PathName path); - void appendToFile(eckit::PathName path); - void fromFile(eckit::PathName path); + void toNewFile(const eckit::PathName& path) const; + void appendToFile(const eckit::PathName& path) const; + void fromFile(const eckit::PathName& path); // wrapper around map_.find() std::shared_ptr find(eckit::Offset offset);