From a0541721bc48ec7a3561c2d6e80f0db3fc39df86 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Thu, 12 Dec 2024 11:40:27 +0000 Subject: [PATCH 1/9] HPCC-33122 Add the ability to modify configurations Signed-off-by: Gavin Halliday --- system/jlib/jptree.cpp | 87 +++++++++++++++++++++++++++++++----------- system/jlib/jptree.hpp | 2 + 2 files changed, 67 insertions(+), 22 deletions(-) diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 6f4b6d3017e..a21d1e0f140 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8711,6 +8711,7 @@ class CConfigUpdater : public CInterface CriticalSection notifyFuncCS; unsigned notifyFuncId = 0; std::unordered_map notifyConfigUpdates; + std::unordered_map modifyConfigUpdates; std::vector pendingInitializeFuncIds; public: @@ -8738,6 +8739,9 @@ class CConfigUpdater : public CInterface Owned config = getComponentConfig(); Owned global = getGlobalConfig(); + for (auto & modifyFunc : modifyConfigUpdates) + modifyFunc.second(config, global); + while (pendingInitializeFuncIds.size()) { unsigned notifyFuncId = pendingInitializeFuncIds.back(); @@ -8753,6 +8757,7 @@ class CConfigUpdater : public CInterface #endif if (0 == absoluteConfigFilename.length() || (nullptr != fileWatcher.get())) return false; + auto updateFunc = [&](const char *filename, FileWatchEvents events) { bool changed = containsFileWatchEvents(events, FileWatchEvents::closedWrite) && streq(filename, configFilename); @@ -8761,29 +8766,9 @@ class CConfigUpdater : public CInterface changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); #endif if (changed) - { - auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - - // NB: block calls to get*Config*() until callbacks notified and new swapped in - CriticalBlock b(configCS); - Owned oldComponentConfiguration = componentConfiguration.getClear(); - Owned oldGlobalConfiguration = globalConfiguration.getClear(); - - /* swapin before callbacks called, but before releasing crit. - * That way the CB can see the old/diffs and act on them, but any - * code calling e.g. getComponentConfig() will see new. - */ - componentConfiguration.setown(std::get<1>(result)); - globalConfiguration.setown(std::get<2>(result)); - if (!componentName) - componentConfiguration->getProp("@name", componentName); - - /* NB: we are still holding 'configCS' at this point, blocking all other thread access. - However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ - executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); - absoluteConfigFilename.set(std::get<0>(result).c_str()); - } + refreshConfiguration(); }; + try { fileWatcher.setown(createFileEventWatcher(updateFunc)); @@ -8802,6 +8787,42 @@ class CConfigUpdater : public CInterface } return true; } + + void refreshConfiguration() + { + auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + + IPropertyTree * newComponentConfiguration = std::get<1>(result); + IPropertyTree * newGlobalConfiguration = std::get<2>(result); + + //Ensure all modifications to the config take place before the config is update, and the monitoring functions are called. + for (auto & modifyFunc : modifyConfigUpdates) + modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); + + // NB: block calls to get*Config*() from other threads until callbacks notified and new swapped in, destroy old config outside the critical section + Owned oldComponentConfiguration; + Owned oldGlobalConfiguration; + { + CriticalBlock b(configCS); + oldComponentConfiguration.setown(componentConfiguration.getClear()); + oldGlobalConfiguration.setown(globalConfiguration.getClear()); + + /* swapin before callbacks called, but before releasing crit. + * That way the CB can see the old/diffs and act on them, but any + * code calling e.g. getComponentConfig() will see new. + */ + componentConfiguration.setown(newComponentConfiguration); + globalConfiguration.setown(newGlobalConfiguration); + if (!componentName) + componentConfiguration->getProp("@name", componentName); + + /* NB: we are still holding 'configCS' at this point, blocking all other thread access. + However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ + executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); + absoluteConfigFilename.set(std::get<0>(result).c_str()); + } + } + void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) { for (const auto &item: notifyConfigUpdates) @@ -8835,9 +8856,24 @@ class CConfigUpdater : public CInterface } return notifyFuncId; } + unsigned addModifyFunc(ConfigModifyFunc notifyFunc) + { + CriticalBlock b(notifyFuncCS); + notifyFuncId++; + modifyConfigUpdates[notifyFuncId] = notifyFunc; + if (isInitialized()) + { + refreshConfiguiration(); // force all cached values to be recalculated + DBGLOG("Modify functions should be registered before the configuration is loaded"); + } + return notifyFuncId; + } bool removeNotifyFunc(unsigned funcId) { CriticalBlock b(notifyFuncCS); + if (modifyConfigUpdates.erase(funcId) != 0) + return true; + auto it = notifyConfigUpdates.find(funcId); if (it == notifyConfigUpdates.end()) return false; @@ -8873,6 +8909,13 @@ unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInsta return configFileUpdater->addNotifyFunc(notifyFunc, callWhenInstalled); } +jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc) // This function must be called before the configuration is loaded. +{ + if (!configFileUpdater) // NB: installConfigUpdateHook should always be called after configFileUpdater is initialized + return 0; + return configFileUpdater->addModifyFunc(notifyFunc); +} + void removeConfigUpdateHook(unsigned notifyFuncId) { if (0 == notifyFuncId) diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 40366556d9f..822027ac90a 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -332,7 +332,9 @@ jlib_decl const char * queryComponentName(); // ConfigUpdateFunc calls are made in a mutex, but after new confis are swapped in typedef std::function ConfigUpdateFunc; +typedef std::function ConfigModifyFunc; jlib_decl unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInstalled); +jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc); // This function must be called before the configuration is loaded. jlib_decl void removeConfigUpdateHook(unsigned notifyFuncId); jlib_decl void executeConfigUpdaterCallbacks(); From 5ef4ed392506c53014986d8ee391e55eb65e6d82 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Thu, 12 Dec 2024 13:00:15 +0000 Subject: [PATCH 2/9] Various minor refactoring Signed-off-by: Gavin Halliday --- dali/base/daclient.cpp | 3 +- dali/base/dafdesc.cpp | 15 +++++----- system/jlib/jptree.cpp | 66 ++++++++++++++++++++++++++++-------------- system/jlib/jptree.hpp | 3 +- 4 files changed, 56 insertions(+), 31 deletions(-) diff --git a/dali/base/daclient.cpp b/dali/base/daclient.cpp index 7a55e473dc1..c63ca9c411b 100644 --- a/dali/base/daclient.cpp +++ b/dali/base/daclient.cpp @@ -105,7 +105,7 @@ void installEnvConfigMonitor() // ISDSSubscription impl. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=nullptr) override { - executeConfigUpdaterCallbacks(); + forceConfigRefresh(); } }; @@ -158,6 +158,7 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, case DCR_EclScheduler: case DCR_EclCCServer: installEnvConfigMonitor(); + break; default: break; } diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index df1c816f8a4..8940ef61093 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3722,13 +3722,12 @@ static void generateHosts(IPropertyTree * storage, GroupInfoArray & groups) static CConfigUpdateHook configUpdateHook; static std::atomic normalizeHostGroupUpdateCBId{(unsigned)-1}; static CriticalSection storageCS; -static void doInitializeStorageGroups(bool createPlanesFromGroups) +static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree * newGlobalConfiguration) { CriticalBlock block(storageCS); - Owned globalConfig = getGlobalConfig(); - Owned storage = globalConfig->getPropTree("storage"); + Owned storage = newGlobalConfiguration->getPropTree("storage"); if (!storage) - storage.set(globalConfig->addPropTree("storage")); + storage.set(newGlobalConfiguration->addPropTree("storage")); #ifndef _CONTAINERIZED if (createPlanesFromGroups) @@ -3797,14 +3796,14 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups) void initializeStorageGroups(bool createPlanesFromGroups) { - auto updateFunc = [createPlanesFromGroups](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration) + auto updateFunc = [createPlanesFromGroups](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) { + //MORE: createPlanesFromGroup will never be updated.... PROGLOG("initializeStorageGroups update"); - doInitializeStorageGroups(createPlanesFromGroups); + doInitializeStorageGroups(createPlanesFromGroups, newGlobalConfiguration); }; - doInitializeStorageGroups(createPlanesFromGroups); - configUpdateHook.installOnce(updateFunc, false); + configUpdateHook.installOnce(updateFunc); } bool getDefaultStoragePlane(StringBuffer &ret) diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index a21d1e0f140..8383b7c1db1 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8755,7 +8755,7 @@ class CConfigUpdater : public CInterface #if !defined(__linux__) // file moinitoring only supported in Linux (because createFileEventWatcher only implemented in Linux at the moment) return false; #endif - if (0 == absoluteConfigFilename.length() || (nullptr != fileWatcher.get())) + if (absoluteConfigFilename.isEmpty() || (nullptr != fileWatcher.get())) return false; auto updateFunc = [&](const char *filename, FileWatchEvents events) @@ -8788,13 +8788,9 @@ class CConfigUpdater : public CInterface return true; } - void refreshConfiguration() + void refreshConfiguration(const char * configFilename, IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) { - auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - - IPropertyTree * newComponentConfiguration = std::get<1>(result); - IPropertyTree * newGlobalConfiguration = std::get<2>(result); - + CriticalBlock b(notifyFuncCS); //Ensure all modifications to the config take place before the config is update, and the monitoring functions are called. for (auto & modifyFunc : modifyConfigUpdates) modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); @@ -8819,10 +8815,20 @@ class CConfigUpdater : public CInterface /* NB: we are still holding 'configCS' at this point, blocking all other thread access. However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); - absoluteConfigFilename.set(std::get<0>(result).c_str()); + + //GH: MORE: Not sure this should be updated - the watcher isn't updated to match a change in filname + absoluteConfigFilename.set(configFilename); } } + void refreshConfiguration() + { + auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + IPropertyTree * newComponentConfiguration = std::get<1>(result); + IPropertyTree * newGlobalConfiguration = std::get<2>(result); + refreshConfiguration(std::get<0>(result).c_str(), newComponentConfiguration, newGlobalConfiguration); + } + void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) { for (const auto &item: notifyConfigUpdates) @@ -8863,7 +8869,7 @@ class CConfigUpdater : public CInterface modifyConfigUpdates[notifyFuncId] = notifyFunc; if (isInitialized()) { - refreshConfiguiration(); // force all cached values to be recalculated + refreshConfiguration(); // force all cached values to be recalculated DBGLOG("Modify functions should be registered before the configuration is loaded"); } return notifyFuncId; @@ -8926,11 +8932,11 @@ void removeConfigUpdateHook(unsigned notifyFuncId) WARNLOG("removeConfigUpdateHook(): notifyFuncId %u not installed", notifyFuncId); } -void executeConfigUpdaterCallbacks() +void forceConfigRefresh() { if (!configFileUpdater) // NB: executeConfigUpdaterCallbacks should always be called after configFileUpdater is initialized return; - configFileUpdater->executeCallbacks(componentConfiguration, globalConfiguration); + configFileUpdater->refreshConfiguration(); } void CConfigUpdateHook::clear() @@ -8957,6 +8963,22 @@ void CConfigUpdateHook::installOnce(ConfigUpdateFunc callbackFunc, bool callWhen } +void CConfigUpdateHook::installOnce(ConfigModifyFunc callbackFunc) +{ + unsigned id = configCBId.load(std::memory_order_acquire); + if ((unsigned)-1 == id) // avoid CS in common case + { + CriticalBlock b(crit); + // check again now in CS + id = configCBId.load(std::memory_order_acquire); + if ((unsigned)-1 == id) + { + id = installConfigUpdateHook(callbackFunc); + configCBId.store(id, std::memory_order_release); + } + } +} + static std::tuple doLoadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute) { Owned newComponentConfig = createPTreeFromIPT(componentDefault); @@ -9090,7 +9112,10 @@ static std::tuple doLoadConfigura #ifdef _DEBUG // NB: don't re-hold, if CLI --hold already held. if (!held && newComponentConfig->getPropBool("@hold")) + { holdLoop(); + held = true; + } #endif unsigned ptreeMappingThreshold = newGlobalConfig->getPropInt("@ptreeMappingThreshold", defaultSiblingMapThreshold); @@ -9099,7 +9124,7 @@ static std::tuple doLoadConfigura return std::make_tuple(std::string(absConfigFilename.str()), newComponentConfig.getClear(), newGlobalConfig.getClear()); } -jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) +IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) { assertex(configFileUpdater); // NB: loadConfiguration should always be called after configFileUpdater is initialized if (configFileUpdater->isInitialized()) @@ -9133,7 +9158,7 @@ jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPr return componentConfiguration.getLink(); } -jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) +IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) { if (componentConfiguration) throw makeStringExceptionV(99, "Configuration for component %s has already been initialised", componentTag); @@ -9156,20 +9181,19 @@ jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig) { - { - CriticalBlock b(configCS); - componentConfiguration.set(newComponentConfig); - globalConfiguration.set(newGlobalConfig); - } - executeConfigUpdaterCallbacks(); + assertex (configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized + configFileUpdater->refreshConfiguration(nullptr, newComponentConfig, newGlobalConfig); } void initNullConfiguration() { if (componentConfiguration || globalConfiguration) throw makeStringException(99, "Configuration has already been initialised"); - componentConfiguration.setown(createPTree()); - globalConfiguration.setown(createPTree()); + + assertex(configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized + Owned newComponentConfig(createPTree()); + Owned newGlobalConfig(createPTree()); + configFileUpdater->refreshConfiguration(nullptr, newComponentConfig, newGlobalConfig); } class CYAMLBufferReader : public CInterfaceOf diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 822027ac90a..2616fc87fab 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -336,7 +336,7 @@ typedef std::function Date: Fri, 13 Dec 2024 11:53:11 +0000 Subject: [PATCH 3/9] More refactoring Signed-off-by: Gavin Halliday --- dali/base/daclient.cpp | 2 +- system/jlib/jptree.cpp | 79 ++++++++++++++---------------------------- system/jlib/jptree.hpp | 2 +- 3 files changed, 28 insertions(+), 55 deletions(-) diff --git a/dali/base/daclient.cpp b/dali/base/daclient.cpp index c63ca9c411b..5ec74279146 100644 --- a/dali/base/daclient.cpp +++ b/dali/base/daclient.cpp @@ -105,7 +105,7 @@ void installEnvConfigMonitor() // ISDSSubscription impl. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=nullptr) override { - forceConfigRefresh(); + refreshConfiguration(); } }; diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 8383b7c1db1..f14c0f1fe82 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8712,7 +8712,6 @@ class CConfigUpdater : public CInterface unsigned notifyFuncId = 0; std::unordered_map notifyConfigUpdates; std::unordered_map modifyConfigUpdates; - std::vector pendingInitializeFuncIds; public: CConfigUpdater() @@ -8722,10 +8721,9 @@ class CConfigUpdater : public CInterface { return args.ordinality(); // NB: null terminated, so always >=1 if initialized } - void init(const char *_absoluteConfigFilename, IPropertyTree *_componentDefault, IPropertyTree *_globalDefault, const char * * argv, const char * _componentTag, const char * _envPrefix, const char *_legacyFilename, IPropertyTree * (_mapper)(IPropertyTree *), const char *_altNameAttribute) + void init(IPropertyTree *_componentDefault, IPropertyTree *_globalDefault, const char * * argv, const char * _componentTag, const char * _envPrefix, const char *_legacyFilename, IPropertyTree * (_mapper)(IPropertyTree *), const char *_altNameAttribute) { dbgassertex(!isInitialized()); - absoluteConfigFilename.set(_absoluteConfigFilename); componentDefault.set(_componentDefault); globalDefault.set(_globalDefault); componentTag.set(_componentTag); @@ -8737,18 +8735,7 @@ class CConfigUpdater : public CInterface args.append(arg); args.append(nullptr); - Owned config = getComponentConfig(); - Owned global = getGlobalConfig(); - for (auto & modifyFunc : modifyConfigUpdates) - modifyFunc.second(config, global); - - while (pendingInitializeFuncIds.size()) - { - unsigned notifyFuncId = pendingInitializeFuncIds.back(); - pendingInitializeFuncIds.pop_back(); - ConfigUpdateFunc notifyFunc = notifyConfigUpdates[notifyFuncId]; - notifyFunc(config, global); - } + refreshConfiguration(true); } bool startMonitoring() { @@ -8761,12 +8748,13 @@ class CConfigUpdater : public CInterface auto updateFunc = [&](const char *filename, FileWatchEvents events) { bool changed = containsFileWatchEvents(events, FileWatchEvents::closedWrite) && streq(filename, configFilename); -#ifdef _CONTAINERIZED + // NB: in k8s, it's a little strange, the config file is in a linked dir, a new dir is created and swapped in. - changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); -#endif + if (isContainerized()) + changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); + if (changed) - refreshConfiguration(); + refreshConfiguration(false); }; try @@ -8788,10 +8776,10 @@ class CConfigUpdater : public CInterface return true; } - void refreshConfiguration(const char * configFilename, IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + void refreshConfiguration(IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) { CriticalBlock b(notifyFuncCS); - //Ensure all modifications to the config take place before the config is update, and the monitoring functions are called. + //Ensure all modifications to the config take place before the config is updated, and the monitoring/caching functions are called. for (auto & modifyFunc : modifyConfigUpdates) modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); @@ -8807,26 +8795,27 @@ class CConfigUpdater : public CInterface * That way the CB can see the old/diffs and act on them, but any * code calling e.g. getComponentConfig() will see new. */ - componentConfiguration.setown(newComponentConfiguration); - globalConfiguration.setown(newGlobalConfiguration); - if (!componentName) - componentConfiguration->getProp("@name", componentName); + componentConfiguration.set(newComponentConfiguration); + globalConfiguration.set(newGlobalConfiguration); /* NB: we are still holding 'configCS' at this point, blocking all other thread access. However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); - - //GH: MORE: Not sure this should be updated - the watcher isn't updated to match a change in filname - absoluteConfigFilename.set(configFilename); } } - void refreshConfiguration() + void refreshConfiguration(bool firstTime) { auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); IPropertyTree * newComponentConfiguration = std::get<1>(result); IPropertyTree * newGlobalConfiguration = std::get<2>(result); - refreshConfiguration(std::get<0>(result).c_str(), newComponentConfiguration, newGlobalConfiguration); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + + if (firstTime) + { + absoluteConfigFilename.set(std::get<0>(result).c_str()); + newGlobalConfiguration->getProp("@name", componentName); + } } void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) @@ -8853,12 +8842,6 @@ class CConfigUpdater : public CInterface { if (isInitialized()) notifyFunc(getComponentConfigSP(), getGlobalConfigSP()); - else - { - // If the configuration is not yet be loaded, track notify callbacks that - // want to be initialized on install, and call during CConfigUpdater::init. - pendingInitializeFuncIds.push_back(notifyFuncId); - } } return notifyFuncId; } @@ -8869,7 +8852,7 @@ class CConfigUpdater : public CInterface modifyConfigUpdates[notifyFuncId] = notifyFunc; if (isInitialized()) { - refreshConfiguration(); // force all cached values to be recalculated + refreshConfiguration(false); // force all cached values to be recalculated DBGLOG("Modify functions should be registered before the configuration is loaded"); } return notifyFuncId; @@ -8886,11 +8869,6 @@ class CConfigUpdater : public CInterface ConfigUpdateFunc notifyFunc = it->second; notifyConfigUpdates.erase(it); - if (!isInitialized()) - { - auto it = std::remove(pendingInitializeFuncIds.begin(), pendingInitializeFuncIds.end(), funcId); - pendingInitializeFuncIds.erase(it, pendingInitializeFuncIds.end()); - } return true; } }; @@ -8932,11 +8910,11 @@ void removeConfigUpdateHook(unsigned notifyFuncId) WARNLOG("removeConfigUpdateHook(): notifyFuncId %u not installed", notifyFuncId); } -void forceConfigRefresh() +void refreshConfiguration() { - if (!configFileUpdater) // NB: executeConfigUpdaterCallbacks should always be called after configFileUpdater is initialized + if (!configFileUpdater) // NB: refreshConfiguration() should always be called after configFileUpdater is initialized return; - configFileUpdater->refreshConfiguration(); + configFileUpdater->refreshConfiguration(false); } void CConfigUpdateHook::clear() @@ -9130,11 +9108,6 @@ IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree if (configFileUpdater->isInitialized()) throw makeStringExceptionV(99, "Configuration for component %s has already been initialised", componentTag); - auto result = doLoadConfiguration(componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - - componentConfiguration.setown(std::get<1>(result)); - globalConfiguration.setown(std::get<2>(result)); - /* In k8s, pods auto-restart by default on monitored ConfigMap settings/areas * ConfigMap settings/areas deliberately not monitored will rely on this config updater mechanism, * and if necessary, installed hooks that are called on an update, to perform updates of cached state. @@ -9150,7 +9123,7 @@ IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree * installed config hooks to be called when an environment change is detected e.g when pushed to Dali) */ - configFileUpdater->init(std::get<0>(result).c_str(), componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + configFileUpdater->init(componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); if (monitor) configFileUpdater->startMonitoring(); @@ -9182,7 +9155,7 @@ IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig) { assertex (configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized - configFileUpdater->refreshConfiguration(nullptr, newComponentConfig, newGlobalConfig); + configFileUpdater->refreshConfiguration(newComponentConfig, newGlobalConfig); } void initNullConfiguration() @@ -9193,7 +9166,7 @@ void initNullConfiguration() assertex(configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized Owned newComponentConfig(createPTree()); Owned newGlobalConfig(createPTree()); - configFileUpdater->refreshConfiguration(nullptr, newComponentConfig, newGlobalConfig); + configFileUpdater->refreshConfiguration(newComponentConfig, newGlobalConfig); } class CYAMLBufferReader : public CInterfaceOf diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 2616fc87fab..e34617a0e58 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -336,7 +336,7 @@ typedef std::function Date: Fri, 13 Dec 2024 17:01:49 +0000 Subject: [PATCH 4/9] Working with thor master/slave Signed-off-by: Gavin Halliday --- dali/base/daclient.cpp | 3 +- system/jlib/jptree.cpp | 60 ++++++++++++++++++++++++++------- system/jlib/jptree.hpp | 2 +- thorlcr/master/thmastermain.cpp | 10 +++++- 4 files changed, 59 insertions(+), 16 deletions(-) diff --git a/dali/base/daclient.cpp b/dali/base/daclient.cpp index 5ec74279146..a36d418a851 100644 --- a/dali/base/daclient.cpp +++ b/dali/base/daclient.cpp @@ -147,7 +147,6 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, // causes any config update hooks (installed by installConfigUpdateHook() to trigger on an env. change) switch (role) { - case DCR_ThorMaster: case DCR_EclServer: case DCR_EclAgent: case DCR_SashaServer: @@ -159,6 +158,8 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, case DCR_EclCCServer: installEnvConfigMonitor(); break; + // Thor does not monitor because a fixed configuration is serialized to the slaves + case DCR_ThorMaster: default: break; } diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index f14c0f1fe82..3eea4441e56 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8707,7 +8707,7 @@ class CConfigUpdater : public CInterface StringAttr componentTag, envPrefix, legacyFilename; IPropertyTree * (*mapper)(IPropertyTree *); StringAttr altNameAttribute; - Owned fileWatcher; + Owned fileWatcher; // null if updates to the config file are not allowed CriticalSection notifyFuncCS; unsigned notifyFuncId = 0; std::unordered_map notifyConfigUpdates; @@ -8735,7 +8735,7 @@ class CConfigUpdater : public CInterface args.append(arg); args.append(nullptr); - refreshConfiguration(true); + refreshConfiguration(true, false); } bool startMonitoring() { @@ -8754,7 +8754,7 @@ class CConfigUpdater : public CInterface changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); if (changed) - refreshConfiguration(false); + refreshConfiguration(false, false); }; try @@ -8804,18 +8804,46 @@ class CConfigUpdater : public CInterface } } - void refreshConfiguration(bool firstTime) + void refreshConfiguration(bool firstTime, bool avoidClone) { - auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - IPropertyTree * newComponentConfiguration = std::get<1>(result); - IPropertyTree * newGlobalConfiguration = std::get<2>(result); - refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + if (firstTime || fileWatcher) + { + auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + IPropertyTree * newComponentConfiguration = std::get<1>(result); + IPropertyTree * newGlobalConfiguration = std::get<2>(result); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); - if (firstTime) + if (firstTime) + { + absoluteConfigFilename.set(std::get<0>(result).c_str()); + newGlobalConfiguration->getProp("@name", componentName); + } + } + else if (avoidClone) + { + Owned newComponentConfiguration = getComponentConfigSP(); + Owned newGlobalConfiguration = getGlobalConfigSP(); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + } + else { - absoluteConfigFilename.set(std::get<0>(result).c_str()); - newGlobalConfiguration->getProp("@name", componentName); + DBGLOG("Refresh %p,", getComponentConfigSP().get()); + dbglogXML(getComponentConfigSP()); + // File monitor is disabled - no updates to the configuration files are supported. + //So clone the existing configuration and use that to refresh the config - update fucntions may perform differently. + Owned newComponentConfiguration = createPTreeFromIPT(getComponentConfigSP()); + Owned newGlobalConfiguration = createPTreeFromIPT(getGlobalConfigSP()); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); } + DBGLOG("Refreshed %p,", getComponentConfigSP().get()); + dbglogXML(getComponentConfigSP()); + } + + void executeCallbacks() + { + CriticalBlock notifyBlock(notifyFuncCS); + CriticalBlock configBlock(configCS); + executeCallbacks(componentConfiguration, globalConfiguration); } void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) @@ -8852,7 +8880,11 @@ class CConfigUpdater : public CInterface modifyConfigUpdates[notifyFuncId] = notifyFunc; if (isInitialized()) { - refreshConfiguration(false); // force all cached values to be recalculated + //Force all cached values to be recalculated, do not reload the config + //This is only legal if no other threads are accessing the config yet - otherwise the reading thread + //could crash when the global configuration is updated. + //MORE: This function needs an extra parameter to indicate whether or not threads have already started/avoid clone + refreshConfiguration(false, true); DBGLOG("Modify functions should be registered before the configuration is loaded"); } return notifyFuncId; @@ -8914,7 +8946,7 @@ void refreshConfiguration() { if (!configFileUpdater) // NB: refreshConfiguration() should always be called after configFileUpdater is initialized return; - configFileUpdater->refreshConfiguration(false); + configFileUpdater->refreshConfiguration(false, false); } void CConfigUpdateHook::clear() @@ -10199,6 +10231,8 @@ void setExpertOpt(const char *opt, const char *value) config->setPropTree(xpath); getExpertOptPath(opt, xpath.clear()); config->setProp(xpath, value); + DBGLOG("*** SET *** %p", config.get()); + dbglogXML(config); } //--------------------------------------------------------------------------------------------------------------------- diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index e34617a0e58..96b4ea1fd85 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -336,7 +336,7 @@ typedef std::functionserialize(msg); globals->serialize(msg); getGlobalConfigSP()->serialize(msg); + DBGLOG("**** CONFIG ****"); + dbglogXML(globals); msg.append(managerWorkerMpTag); msg.append(kjServiceMpTag); if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) @@ -636,7 +641,9 @@ int main( int argc, const char *argv[] ) InitModuleObjects(); NoQuickEditSection xxx; { - globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, false)); + bool monitorConfig = false; // Do not allow updates to the config file, otherwise the slave may not be in sync. + //MORE: What about updates to storage planes - they will not be passed through to the slaves + globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, monitorConfig)); } #ifdef _DEBUG unsigned holdWorker = globals->getPropInt("@holdSlave", NotFound); @@ -754,6 +761,7 @@ int main( int argc, const char *argv[] ) } } + //This can only be called once dali is initialised initializeStorageGroups(true); if (globals->getPropBool("@MPChannelReconnect")) From f384cc671f6b6445ac380a8f9f6081f1d4add773 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Fri, 13 Dec 2024 17:12:18 +0000 Subject: [PATCH 5/9] Resolve the conflict between thor and roxie semantics Signed-off-by: Gavin Halliday --- dali/base/dafdesc.cpp | 6 +++--- dali/base/dafdesc.hpp | 2 +- dali/daliadmin/daadmin.cpp | 2 +- dali/dfu/dfuserver.cpp | 2 +- dali/dfu/dfuutil.cpp | 2 +- dali/dfuxref/dfuxrefmain.cpp | 2 +- dali/sasha/saserver.cpp | 2 +- ecl/eclagent/eclagent.cpp | 2 +- esp/platform/espcfg.cpp | 2 +- esp/platform/espp.cpp | 2 +- roxie/ccd/ccddali.cpp | 2 +- roxie/ccd/ccdmain.cpp | 2 +- system/jlib/jptree.cpp | 21 ++++++++------------- system/jlib/jptree.hpp | 2 +- thorlcr/master/thmastermain.cpp | 4 +--- 15 files changed, 24 insertions(+), 31 deletions(-) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index 8940ef61093..a03866bcce3 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3794,16 +3794,16 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree setupContainerizedStorageLocations(); } -void initializeStorageGroups(bool createPlanesFromGroups) +void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe) { auto updateFunc = [createPlanesFromGroups](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) { //MORE: createPlanesFromGroup will never be updated.... - PROGLOG("initializeStorageGroups update"); + PROGLOG("initializeStoragePlanes update"); doInitializeStorageGroups(createPlanesFromGroups, newGlobalConfiguration); }; - configUpdateHook.installOnce(updateFunc); + configUpdateHook.installModifierOnce(updateFunc, threadSafe); } bool getDefaultStoragePlane(StringBuffer &ret) diff --git a/dali/base/dafdesc.hpp b/dali/base/dafdesc.hpp index 377bfd1ab67..41640c910f4 100644 --- a/dali/base/dafdesc.hpp +++ b/dali/base/dafdesc.hpp @@ -406,7 +406,7 @@ extern da_decl StringBuffer &getPartMask(StringBuffer &ret,const char *lname=NUL extern da_decl void setPartMask(const char * mask); extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory -extern da_decl void initializeStorageGroups(bool createPlanesFromGroups); +extern da_decl void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe); // threadSafe should be true if no other threads will be accessing the global config extern da_decl bool getDefaultStoragePlane(StringBuffer &ret); extern da_decl bool getDefaultSpillPlane(StringBuffer &ret); extern da_decl bool getDefaultIndexBuildStoragePlane(StringBuffer &ret); diff --git a/dali/daliadmin/daadmin.cpp b/dali/daliadmin/daadmin.cpp index 4274d44db10..87988ead887 100644 --- a/dali/daliadmin/daadmin.cpp +++ b/dali/daliadmin/daadmin.cpp @@ -507,7 +507,7 @@ bool dfspart(const char *lname, IUserDescriptor *userDesc, unsigned partnum, Str void dfsmeta(const char *filename,IUserDescriptor *userDesc, bool includeStorage) { //This function isn't going to work on a container system because it won't have access to the storage planes - initializeStorageGroups(true); + initializeStoragePlanes(true, true); ResolveOptions options = ROpartinfo|ROdiskinfo|ROsizes; if (includeStorage) options = options | ROincludeLocation; diff --git a/dali/dfu/dfuserver.cpp b/dali/dfu/dfuserver.cpp index 514bcd3655a..47417d272a3 100644 --- a/dali/dfu/dfuserver.cpp +++ b/dali/dfu/dfuserver.cpp @@ -198,7 +198,7 @@ int main(int argc, const char *argv[]) IPropertyTree * config = nullptr; installDefaultFileHooks(config); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); } StringBuffer queue, monitorQueue; #ifndef _CONTAINERIZED diff --git a/dali/dfu/dfuutil.cpp b/dali/dfu/dfuutil.cpp index ead2a9478c8..91cb09981bc 100644 --- a/dali/dfu/dfuutil.cpp +++ b/dali/dfu/dfuutil.cpp @@ -513,7 +513,7 @@ class CFileCloner dstfdesc->setDefaultDir(dstdir.str()); Owned plane = getDataStoragePlane(cluster1, false); - if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStorageGroups has been called) + if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStoragePlanes has been called) { DBGLOG("cloneSubFile: destfilename='%s', plane='%s', dirPerPart=%s", destfilename, cluster1.get(), boolToStr(plane->queryDirPerPart())); diff --git a/dali/dfuxref/dfuxrefmain.cpp b/dali/dfuxref/dfuxrefmain.cpp index f9d772a69ea..650d3184679 100644 --- a/dali/dfuxref/dfuxrefmain.cpp +++ b/dali/dfuxref/dfuxrefmain.cpp @@ -76,7 +76,7 @@ int main(int argc, char* argv[]) try { initClientProcess(group, DCR_XRef); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); StringArray args, clusters; bool backupcheck = false; unsigned mode = PMtextoutput|PMcsvoutput|PMtreeoutput; diff --git a/dali/sasha/saserver.cpp b/dali/sasha/saserver.cpp index 16f808542cd..acdf1775772 100644 --- a/dali/sasha/saserver.cpp +++ b/dali/sasha/saserver.cpp @@ -409,7 +409,7 @@ int main(int argc, const char* argv[]) else { addAbortHandler(actionOnAbort); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); #ifdef _CONTAINERIZED service = serverConfig->queryProp("@service"); if (isEmptyString(service)) diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 0af88caa6e2..0acb00ee2ce 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -3785,7 +3785,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], OwnedgetPropInt("@dafilesrvConnectTimeout", 10)*1000; const unsigned dafilesrvReadTimeout = m_cfg->getPropInt("@dafilesrvReadTimeout", 10)*1000; diff --git a/esp/platform/espp.cpp b/esp/platform/espp.cpp index 20d3898d1f3..230033abe45 100644 --- a/esp/platform/espp.cpp +++ b/esp/platform/espp.cpp @@ -578,7 +578,7 @@ int init_main(int argc, const char* argv[]) config->checkESPCache(*server.get()); initializeMetrics(config); - initializeStorageGroups(daliClientActive()); + initializeStoragePlanes(daliClientActive(), true); } catch(IException* e) { diff --git a/roxie/ccd/ccddali.cpp b/roxie/ccd/ccddali.cpp index bdab2a2c650..4d9da90891e 100644 --- a/roxie/ccd/ccddali.cpp +++ b/roxie/ccd/ccddali.cpp @@ -805,7 +805,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface waitToConnect -= delay; } } - initializeStorageGroups(daliHelper->connected()); + initializeStoragePlanes(daliHelper->connected(), false); // This can be called while queries are running - so is not thread safe return daliHelper; } diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index fa36b53862e..b466d8e2d7d 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -1463,7 +1463,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) #endif //MORE: I'm not sure where this should go, or how it fits in. Possibly the function needs to be split in two. - initializeStorageGroups(false); + initializeStoragePlanes(false, true); EnableSEHtoExceptionMapping(); setSEHtoExceptionHandler(&abortHandler); diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 3eea4441e56..c6fedca40be 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8821,22 +8821,20 @@ class CConfigUpdater : public CInterface } else if (avoidClone) { + //This is added during the initialiation phase, so no danger of other threads accesing the global information + //while it is updated. Thor currently relies on the pointer not changing. Owned newComponentConfiguration = getComponentConfigSP(); Owned newGlobalConfiguration = getGlobalConfigSP(); refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); } else { - DBGLOG("Refresh %p,", getComponentConfigSP().get()); - dbglogXML(getComponentConfigSP()); // File monitor is disabled - no updates to the configuration files are supported. //So clone the existing configuration and use that to refresh the config - update fucntions may perform differently. Owned newComponentConfiguration = createPTreeFromIPT(getComponentConfigSP()); Owned newGlobalConfiguration = createPTreeFromIPT(getGlobalConfigSP()); refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); } - DBGLOG("Refreshed %p,", getComponentConfigSP().get()); - dbglogXML(getComponentConfigSP()); } void executeCallbacks() @@ -8873,7 +8871,7 @@ class CConfigUpdater : public CInterface } return notifyFuncId; } - unsigned addModifyFunc(ConfigModifyFunc notifyFunc) + unsigned addModifyFunc(ConfigModifyFunc notifyFunc, bool threadSafe) { CriticalBlock b(notifyFuncCS); notifyFuncId++; @@ -8883,8 +8881,7 @@ class CConfigUpdater : public CInterface //Force all cached values to be recalculated, do not reload the config //This is only legal if no other threads are accessing the config yet - otherwise the reading thread //could crash when the global configuration is updated. - //MORE: This function needs an extra parameter to indicate whether or not threads have already started/avoid clone - refreshConfiguration(false, true); + refreshConfiguration(false, threadSafe); DBGLOG("Modify functions should be registered before the configuration is loaded"); } return notifyFuncId; @@ -8925,11 +8922,11 @@ unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInsta return configFileUpdater->addNotifyFunc(notifyFunc, callWhenInstalled); } -jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc) // This function must be called before the configuration is loaded. +jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc, bool threadSafe) // This function must be called before the configuration is loaded. { if (!configFileUpdater) // NB: installConfigUpdateHook should always be called after configFileUpdater is initialized return 0; - return configFileUpdater->addModifyFunc(notifyFunc); + return configFileUpdater->addModifyFunc(notifyFunc, threadSafe); } void removeConfigUpdateHook(unsigned notifyFuncId) @@ -8973,7 +8970,7 @@ void CConfigUpdateHook::installOnce(ConfigUpdateFunc callbackFunc, bool callWhen } -void CConfigUpdateHook::installOnce(ConfigModifyFunc callbackFunc) +void CConfigUpdateHook::installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe) { unsigned id = configCBId.load(std::memory_order_acquire); if ((unsigned)-1 == id) // avoid CS in common case @@ -8983,7 +8980,7 @@ void CConfigUpdateHook::installOnce(ConfigModifyFunc callbackFunc) id = configCBId.load(std::memory_order_acquire); if ((unsigned)-1 == id) { - id = installConfigUpdateHook(callbackFunc); + id = installConfigUpdateHook(callbackFunc, threadSafe); configCBId.store(id, std::memory_order_release); } } @@ -10231,8 +10228,6 @@ void setExpertOpt(const char *opt, const char *value) config->setPropTree(xpath); getExpertOptPath(opt, xpath.clear()); config->setProp(xpath, value); - DBGLOG("*** SET *** %p", config.get()); - dbglogXML(config); } //--------------------------------------------------------------------------------------------------------------------- diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 96b4ea1fd85..cf985f29ac6 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -346,7 +346,7 @@ class jlib_decl CConfigUpdateHook ~CConfigUpdateHook() { clear(); } void clear(); void installOnce(ConfigUpdateFunc callbackFunc, bool callWhenInstalled); - void installOnce(ConfigModifyFunc callbackFunc); + void installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe); }; /* diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index f74a34cfdfa..16f2df64d02 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -398,8 +398,6 @@ class CRegistryServer : public CSimpleInterface processGroup->serialize(msg); globals->serialize(msg); getGlobalConfigSP()->serialize(msg); - DBGLOG("**** CONFIG ****"); - dbglogXML(globals); msg.append(managerWorkerMpTag); msg.append(kjServiceMpTag); if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND)) @@ -762,7 +760,7 @@ int main( int argc, const char *argv[] ) } //This can only be called once dali is initialised - initializeStorageGroups(true); + initializeStoragePlanes(true, true); if (globals->getPropBool("@MPChannelReconnect")) getMPServer()->setOpt(mpsopt_channelreopen, "true"); From d706cc488940db67b8f91562859798699fddd297 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Fri, 13 Dec 2024 17:40:27 +0000 Subject: [PATCH 6/9] Clean roxie solution Signed-off-by: Gavin Halliday --- dali/base/dafdesc.cpp | 23 +++++++++++++++++++++-- dali/base/dafdesc.hpp | 2 ++ roxie/ccd/ccddali.cpp | 2 ++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index a03866bcce3..6a777288ef3 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3794,11 +3794,25 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree setupContainerizedStorageLocations(); } +//Store whether we are currently creating planes from groups, and what the last update was +static bool globalCreatePlanesFromGroups = false; +static bool lastCreatedPlanesFromGroups = false; void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe) { - auto updateFunc = [createPlanesFromGroups](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + Rename parameter to updatePlanesFromDali? { - //MORE: createPlanesFromGroup will never be updated.... + //If the createPlanesFromGroups parameter is now true, and was previously false, force an update + CriticalBlock block(storageCS); + if (!lastCreatedPlanesFromGroups && createPlanesFromGroups) + configUpdateHook.clear(); + globalCreatePlanesFromGroups = createPlanesFromGroups; + } + MORE: Check this logic + + auto updateFunc = [](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + { + bool createPlanesFromGroups = globalCreatePlanesFromGroups; + lastCreatedPlanesFromGroups = createPlanesFromGroups; PROGLOG("initializeStoragePlanes update"); doInitializeStorageGroups(createPlanesFromGroups, newGlobalConfiguration); }; @@ -3806,6 +3820,11 @@ void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe) configUpdateHook.installModifierOnce(updateFunc, threadSafe); } +void disableStoragePlanesDaliUpdates() +{ + globalCreatePlanesFromGroups = false; +} + bool getDefaultStoragePlane(StringBuffer &ret) { #ifdef _CONTAINERIZED diff --git a/dali/base/dafdesc.hpp b/dali/base/dafdesc.hpp index 41640c910f4..0f0cdfa968f 100644 --- a/dali/base/dafdesc.hpp +++ b/dali/base/dafdesc.hpp @@ -407,6 +407,8 @@ extern da_decl void setPartMask(const char * mask); extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory extern da_decl void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe); // threadSafe should be true if no other threads will be accessing the global config +extern da_decl void disableStoragePlanesUpdater(); + extern da_decl bool getDefaultStoragePlane(StringBuffer &ret); extern da_decl bool getDefaultSpillPlane(StringBuffer &ret); extern da_decl bool getDefaultIndexBuildStoragePlane(StringBuffer &ret); diff --git a/roxie/ccd/ccddali.cpp b/roxie/ccd/ccddali.cpp index 4d9da90891e..f5f39215204 100644 --- a/roxie/ccd/ccddali.cpp +++ b/roxie/ccd/ccddali.cpp @@ -246,6 +246,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface virtual void beforeDispose() { CriticalBlock b(daliHelperCrit); + disconnectSem.interrupt(); connectWatcher.stop(); if (daliHelper==this) // there is a tiny window where new dalihelper created immediately after final release @@ -928,6 +929,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface CriticalBlock b(daliConnectionCrit); if (isConnected) { + disableStoragePlanesDaliUpdates(); isConnected = false; delete serverStatus; serverStatus = NULL; From 849d41595d95f7f3e14492ccab0d2361c4c2adea Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Mon, 16 Dec 2024 10:02:44 +0000 Subject: [PATCH 7/9] Clean roxie solution v2 Signed-off-by: Gavin Halliday --- dali/base/dafdesc.cpp | 48 +++++++++++++++++++++++++++++-------------- dali/base/dafdesc.hpp | 2 +- 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index 6a777288ef3..b3def355529 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3729,8 +3729,7 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree if (!storage) storage.set(newGlobalConfiguration->addPropTree("storage")); -#ifndef _CONTAINERIZED - if (createPlanesFromGroups) + if (!isContainerized() && createPlanesFromGroups) { // Remove old planes created from groups while (storage->removeProp("planes[@fromGroup='1']")); @@ -3785,7 +3784,6 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree //Uncomment the following to trace the values that been generated //printYAML(storage); } -#endif //Ensure that host groups that are defined in terms of other host groups are expanded out so they have an explicit list of hosts normalizeHostGroups(); @@ -3794,27 +3792,47 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree setupContainerizedStorageLocations(); } -//Store whether we are currently creating planes from groups, and what the last update was -static bool globalCreatePlanesFromGroups = false; -static bool lastCreatedPlanesFromGroups = false; -void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe) +/* + * This function is used to: + * + * (a) create storage planes from bare-metal groups + * (b) to expand out host groups that are defined in terms of other host groups + * (c) to setup the base directory for the storage planes. (GH->JCS is this threadsafe?) + * + * For most components this init function is called only once - the exception is roxie (called when it connects and disconnects from dali). + * In thor it is important that the update of the global config does not clone the property trees + * In containerized mode, the update function can be called whenever the config file changes (but it will not be creating planes from groups) + * In bare-metal mode the update function may be called whenever the environment changes in dali. (see initClientProcess) + * + * Because of the requirement that thor does not clone the property trees, thor cannot support any background update - via the environment in + * bare-metal, or config files in containerized. If it was to support it the code in the master would need to change, and updates would + * need to be synchronized to the workers. To support this requiremnt a threadSafe parameter is provided to avoid the normal clone. + * + * For roxie, which dynamically connects and disconnects from dali, there are different problems. The code needs to retain whether or not roxie + * is connected to dali - and only create planes from groups if it is connected. There is another potential problem, which I don't think will + * actually be hit: + * Say roxie connects, updates planes from groups and disconnects. If the update functions were called again those storage planes would be lost, + * but in bare-metal only a change to the environment will trigger an update, and that update will not happen if roxie is not connected to dali. + */ + +static bool savedConnectedToDali = false; // Store in a global so it can be used by the callback without having to reregister +static bool lastUpdateConnectedToDali = false; +void initializeStoragePlanes(bool connectedToDali, bool threadSafe) { - Rename parameter to updatePlanesFromDali? { //If the createPlanesFromGroups parameter is now true, and was previously false, force an update CriticalBlock block(storageCS); - if (!lastCreatedPlanesFromGroups && createPlanesFromGroups) + if (!lastUpdateConnectedToDali && connectedToDali) configUpdateHook.clear(); - globalCreatePlanesFromGroups = createPlanesFromGroups; + savedConnectedToDali = connectedToDali; } - MORE: Check this logic auto updateFunc = [](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) { - bool createPlanesFromGroups = globalCreatePlanesFromGroups; - lastCreatedPlanesFromGroups = createPlanesFromGroups; + bool connectedToDali = savedConnectedToDali; + lastUpdateConnectedToDali = connectedToDali; PROGLOG("initializeStoragePlanes update"); - doInitializeStorageGroups(createPlanesFromGroups, newGlobalConfiguration); + doInitializeStorageGroups(connectedToDali, newGlobalConfiguration); }; configUpdateHook.installModifierOnce(updateFunc, threadSafe); @@ -3822,7 +3840,7 @@ void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe) void disableStoragePlanesDaliUpdates() { - globalCreatePlanesFromGroups = false; + savedConnectedToDali = false; } bool getDefaultStoragePlane(StringBuffer &ret) diff --git a/dali/base/dafdesc.hpp b/dali/base/dafdesc.hpp index 0f0cdfa968f..a9a4e1dee51 100644 --- a/dali/base/dafdesc.hpp +++ b/dali/base/dafdesc.hpp @@ -407,7 +407,7 @@ extern da_decl void setPartMask(const char * mask); extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory extern da_decl void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe); // threadSafe should be true if no other threads will be accessing the global config -extern da_decl void disableStoragePlanesUpdater(); +extern da_decl void disableStoragePlanesDaliUpdates(); extern da_decl bool getDefaultStoragePlane(StringBuffer &ret); extern da_decl bool getDefaultSpillPlane(StringBuffer &ret); From 5240339243c8d82a49216c963efc1598224113c7 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Tue, 17 Dec 2024 10:30:29 +0000 Subject: [PATCH 8/9] Fix problems with hostgroups modifying global config Signed-off-by: Gavin Halliday --- dali/base/dadfs.cpp | 5 +++-- dali/base/dafdesc.cpp | 2 +- dali/base/dameta.cpp | 18 ++++++++++++++---- dali/base/dameta.hpp | 2 +- dali/base/dautils.cpp | 5 +++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 5c4da62b457..8eeac19b9bc 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -10756,10 +10756,11 @@ class CInitGroups void constructStorageGroups(bool force, StringBuffer &messages) { - Owned storage = getGlobalConfigSP()->getPropTree("storage"); + Owned globalConfig = getGlobalConfig(); + Owned storage = globalConfig->getPropTree("storage"); if (storage) { - normalizeHostGroups(); + normalizeHostGroups(globalConfig); Owned planes = storage->getElements("planes"); ForEach(*planes) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index b3def355529..5b47e2707eb 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3786,7 +3786,7 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree } //Ensure that host groups that are defined in terms of other host groups are expanded out so they have an explicit list of hosts - normalizeHostGroups(); + normalizeHostGroups(newGlobalConfiguration); //The following can be removed once the storage planes have better integration setupContainerizedStorageLocations(); diff --git a/dali/base/dameta.cpp b/dali/base/dameta.cpp index 44dcbe481e8..f35dfab9f0c 100644 --- a/dali/base/dameta.cpp +++ b/dali/base/dameta.cpp @@ -21,10 +21,12 @@ #include "dautils.hpp" -// Expand indirect hostGroups so each hostGroups has an expanded list of host names -void normalizeHostGroups() +//Expand indirect hostGroups so each hostGroups has an expanded list of host names +//This function cannot use (directly or indirectly) getGlobalConfig(), because it may be called when creating +//a new config. +void normalizeHostGroups(IPropertyTree * globalConfig) { - Owned hostGroupIter = getGlobalConfigSP()->getElements("storage/hostGroups"); + Owned hostGroupIter = globalConfig->getElements("storage/hostGroups"); //Process the groups in order - so that multiple levels of indirection are supported ForEach (*hostGroupIter) { @@ -33,7 +35,15 @@ void normalizeHostGroups() { const char * name = cur.queryProp("@name"); const char * baseGroup = cur.queryProp("@hostGroup"); - Owned match = getHostGroup(baseGroup, true); + if (!baseGroup) + throw makeStringExceptionV(-1, "HostGroup %s with no hosts does not have a base hostgroup", name ? name : ""); + + //Cannot call getHostGroup() because that uses getGlobalConfig() + VStringBuffer xpath("storage/hostGroups[@name='%s']", baseGroup); + IPropertyTree * match = globalConfig->queryPropTree(xpath); + if (!match) + throw makeStringExceptionV(-1, "No entry found for hostGroup: '%s'", baseGroup); + StringArray hosts; Owned hostIter = match->getElements("hosts"); ForEach (*hostIter) diff --git a/dali/base/dameta.hpp b/dali/base/dameta.hpp index c791db82960..f392f56e5f8 100644 --- a/dali/base/dameta.hpp +++ b/dali/base/dameta.hpp @@ -58,6 +58,6 @@ constexpr ResolveOptions operator &(ResolveOptions l, ResolveOptions r) { return */ extern da_decl IPropertyTree * resolveLogicalFilenameFromDali(const char * filename, IUserDescriptor * user, ResolveOptions options); -extern da_decl void normalizeHostGroups(); // Expand indirect hostGroups so each hostGroups has an expanded list of host names +extern da_decl void normalizeHostGroups(IPropertyTree * globalConfig); // Expand indirect hostGroups so each hostGroups has an expanded list of host names #endif diff --git a/dali/base/dautils.cpp b/dali/base/dautils.cpp index 7ad40dfd672..d25e0a7ef9a 100644 --- a/dali/base/dautils.cpp +++ b/dali/base/dautils.cpp @@ -88,8 +88,9 @@ bool getPlaneHost(StringBuffer &host, IPropertyTree *plane, unsigned which) if (!hostGroup) return false; - if (which >= hostGroup->getCount("hosts")) - throw makeStringException(0, "getPlaneHost: index out of range"); + unsigned maxHosts = hostGroup->getCount("hosts"); + if (which >= maxHosts) + throw makeStringExceptionV(0, "getPlaneHost: index %u out of range 1..%u", which, maxHosts); VStringBuffer xpath("hosts[%u]", which+1); // which is 0 based host.append(hostGroup->queryProp(xpath)); return true; From 6aa210eb41a26e80f7b61978c592583964813821 Mon Sep 17 00:00:00 2001 From: Gavin Halliday Date: Fri, 10 Jan 2025 12:45:01 +0000 Subject: [PATCH 9/9] Changes following review Signed-off-by: Gavin Halliday --- dali/base/dadfs.cpp | 2 +- dali/base/dafdesc.cpp | 4 ++-- system/jlib/jptree.cpp | 30 +++++++++++++++++++++++------- 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 8eeac19b9bc..c56dac4db02 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -10757,7 +10757,7 @@ class CInitGroups void constructStorageGroups(bool force, StringBuffer &messages) { Owned globalConfig = getGlobalConfig(); - Owned storage = globalConfig->getPropTree("storage"); + IPropertyTree * storage = globalConfig->queryPropTree("storage"); if (storage) { normalizeHostGroups(globalConfig); diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index 5b47e2707eb..25d19f44ff7 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3725,9 +3725,9 @@ static CriticalSection storageCS; static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree * newGlobalConfiguration) { CriticalBlock block(storageCS); - Owned storage = newGlobalConfiguration->getPropTree("storage"); + IPropertyTree * storage = newGlobalConfiguration->queryPropTree("storage"); if (!storage) - storage.set(newGlobalConfiguration->addPropTree("storage")); + storage = newGlobalConfiguration->addPropTree("storage"); if (!isContainerized() && createPlanesFromGroups) { diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index c6fedca40be..2432e6c3a8b 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8735,7 +8735,7 @@ class CConfigUpdater : public CInterface args.append(arg); args.append(nullptr); - refreshConfiguration(true, false); + refreshConfiguration(true, true); } bool startMonitoring() { @@ -8780,8 +8780,7 @@ class CConfigUpdater : public CInterface { CriticalBlock b(notifyFuncCS); //Ensure all modifications to the config take place before the config is updated, and the monitoring/caching functions are called. - for (auto & modifyFunc : modifyConfigUpdates) - modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); + executeModifyCallbacks(newComponentConfiguration, newGlobalConfiguration); // NB: block calls to get*Config*() from other threads until callbacks notified and new swapped in, destroy old config outside the critical section Owned oldComponentConfiguration; @@ -8800,7 +8799,7 @@ class CConfigUpdater : public CInterface /* NB: we are still holding 'configCS' at this point, blocking all other thread access. However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ - executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); + executeNotifyCallbacks(oldComponentConfiguration, oldGlobalConfiguration); } } @@ -8837,14 +8836,14 @@ class CConfigUpdater : public CInterface } } - void executeCallbacks() + void executeNotifyCallbacks() { CriticalBlock notifyBlock(notifyFuncCS); CriticalBlock configBlock(configCS); - executeCallbacks(componentConfiguration, globalConfiguration); + executeNotifyCallbacks(componentConfiguration, globalConfiguration); } - void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) + void executeNotifyCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) { for (const auto &item: notifyConfigUpdates) { @@ -8859,6 +8858,23 @@ class CConfigUpdater : public CInterface } } } + + void executeModifyCallbacks(IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + { + for (auto & modifyFunc : modifyConfigUpdates) + { + try + { + modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); + } + catch (IException *e) + { + EXCLOG(e, "CConfigUpdater callback"); + e->Release(); + } + } + } + unsigned addNotifyFunc(ConfigUpdateFunc notifyFunc, bool callWhenInstalled) { CriticalBlock b(notifyFuncCS);