diff --git a/dali/base/daclient.cpp b/dali/base/daclient.cpp index 7a55e473dc1..a36d418a851 100644 --- a/dali/base/daclient.cpp +++ b/dali/base/daclient.cpp @@ -105,7 +105,7 @@ void installEnvConfigMonitor() // ISDSSubscription impl. virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen=0, const void *valueData=nullptr) override { - executeConfigUpdaterCallbacks(); + refreshConfiguration(); } }; @@ -147,7 +147,6 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, // causes any config update hooks (installed by installConfigUpdateHook() to trigger on an env. change) switch (role) { - case DCR_ThorMaster: case DCR_EclServer: case DCR_EclAgent: case DCR_SashaServer: @@ -158,6 +157,9 @@ bool initClientProcess(IGroup *servergrp, DaliClientRole role, unsigned mpport, case DCR_EclScheduler: case DCR_EclCCServer: installEnvConfigMonitor(); + break; + // Thor does not monitor because a fixed configuration is serialized to the slaves + case DCR_ThorMaster: default: break; } diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 5c4da62b457..c56dac4db02 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -10756,10 +10756,11 @@ class CInitGroups void constructStorageGroups(bool force, StringBuffer &messages) { - Owned storage = getGlobalConfigSP()->getPropTree("storage"); + Owned globalConfig = getGlobalConfig(); + IPropertyTree * storage = globalConfig->queryPropTree("storage"); if (storage) { - normalizeHostGroups(); + normalizeHostGroups(globalConfig); Owned planes = storage->getElements("planes"); ForEach(*planes) diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index df1c816f8a4..25d19f44ff7 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -3722,16 +3722,14 @@ static void generateHosts(IPropertyTree * storage, GroupInfoArray & groups) static CConfigUpdateHook configUpdateHook; static std::atomic normalizeHostGroupUpdateCBId{(unsigned)-1}; static CriticalSection storageCS; -static void doInitializeStorageGroups(bool createPlanesFromGroups) +static void doInitializeStorageGroups(bool createPlanesFromGroups, IPropertyTree * newGlobalConfiguration) { CriticalBlock block(storageCS); - Owned globalConfig = getGlobalConfig(); - Owned storage = globalConfig->getPropTree("storage"); + IPropertyTree * storage = newGlobalConfiguration->queryPropTree("storage"); if (!storage) - storage.set(globalConfig->addPropTree("storage")); + storage = newGlobalConfiguration->addPropTree("storage"); -#ifndef _CONTAINERIZED - if (createPlanesFromGroups) + if (!isContainerized() && createPlanesFromGroups) { // Remove old planes created from groups while (storage->removeProp("planes[@fromGroup='1']")); @@ -3786,25 +3784,63 @@ static void doInitializeStorageGroups(bool createPlanesFromGroups) //Uncomment the following to trace the values that been generated //printYAML(storage); } -#endif //Ensure that host groups that are defined in terms of other host groups are expanded out so they have an explicit list of hosts - normalizeHostGroups(); + normalizeHostGroups(newGlobalConfiguration); //The following can be removed once the storage planes have better integration setupContainerizedStorageLocations(); } -void initializeStorageGroups(bool createPlanesFromGroups) +/* + * This function is used to: + * + * (a) create storage planes from bare-metal groups + * (b) to expand out host groups that are defined in terms of other host groups + * (c) to setup the base directory for the storage planes. (GH->JCS is this threadsafe?) + * + * For most components this init function is called only once - the exception is roxie (called when it connects and disconnects from dali). + * In thor it is important that the update of the global config does not clone the property trees + * In containerized mode, the update function can be called whenever the config file changes (but it will not be creating planes from groups) + * In bare-metal mode the update function may be called whenever the environment changes in dali. (see initClientProcess) + * + * Because of the requirement that thor does not clone the property trees, thor cannot support any background update - via the environment in + * bare-metal, or config files in containerized. If it was to support it the code in the master would need to change, and updates would + * need to be synchronized to the workers. To support this requiremnt a threadSafe parameter is provided to avoid the normal clone. + * + * For roxie, which dynamically connects and disconnects from dali, there are different problems. The code needs to retain whether or not roxie + * is connected to dali - and only create planes from groups if it is connected. There is another potential problem, which I don't think will + * actually be hit: + * Say roxie connects, updates planes from groups and disconnects. If the update functions were called again those storage planes would be lost, + * but in bare-metal only a change to the environment will trigger an update, and that update will not happen if roxie is not connected to dali. + */ + +static bool savedConnectedToDali = false; // Store in a global so it can be used by the callback without having to reregister +static bool lastUpdateConnectedToDali = false; +void initializeStoragePlanes(bool connectedToDali, bool threadSafe) { - auto updateFunc = [createPlanesFromGroups](const IPropertyTree *oldComponentConfiguration, const IPropertyTree *oldGlobalConfiguration) { - PROGLOG("initializeStorageGroups update"); - doInitializeStorageGroups(createPlanesFromGroups); + //If the createPlanesFromGroups parameter is now true, and was previously false, force an update + CriticalBlock block(storageCS); + if (!lastUpdateConnectedToDali && connectedToDali) + configUpdateHook.clear(); + savedConnectedToDali = connectedToDali; + } + + auto updateFunc = [](IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + { + bool connectedToDali = savedConnectedToDali; + lastUpdateConnectedToDali = connectedToDali; + PROGLOG("initializeStoragePlanes update"); + doInitializeStorageGroups(connectedToDali, newGlobalConfiguration); }; - doInitializeStorageGroups(createPlanesFromGroups); - configUpdateHook.installOnce(updateFunc, false); + configUpdateHook.installModifierOnce(updateFunc, threadSafe); +} + +void disableStoragePlanesDaliUpdates() +{ + savedConnectedToDali = false; } bool getDefaultStoragePlane(StringBuffer &ret) diff --git a/dali/base/dafdesc.hpp b/dali/base/dafdesc.hpp index 377bfd1ab67..a9a4e1dee51 100644 --- a/dali/base/dafdesc.hpp +++ b/dali/base/dafdesc.hpp @@ -406,7 +406,9 @@ extern da_decl StringBuffer &getPartMask(StringBuffer &ret,const char *lname=NUL extern da_decl void setPartMask(const char * mask); extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory -extern da_decl void initializeStorageGroups(bool createPlanesFromGroups); +extern da_decl void initializeStoragePlanes(bool createPlanesFromGroups, bool threadSafe); // threadSafe should be true if no other threads will be accessing the global config +extern da_decl void disableStoragePlanesDaliUpdates(); + extern da_decl bool getDefaultStoragePlane(StringBuffer &ret); extern da_decl bool getDefaultSpillPlane(StringBuffer &ret); extern da_decl bool getDefaultIndexBuildStoragePlane(StringBuffer &ret); diff --git a/dali/base/dameta.cpp b/dali/base/dameta.cpp index 44dcbe481e8..f35dfab9f0c 100644 --- a/dali/base/dameta.cpp +++ b/dali/base/dameta.cpp @@ -21,10 +21,12 @@ #include "dautils.hpp" -// Expand indirect hostGroups so each hostGroups has an expanded list of host names -void normalizeHostGroups() +//Expand indirect hostGroups so each hostGroups has an expanded list of host names +//This function cannot use (directly or indirectly) getGlobalConfig(), because it may be called when creating +//a new config. +void normalizeHostGroups(IPropertyTree * globalConfig) { - Owned hostGroupIter = getGlobalConfigSP()->getElements("storage/hostGroups"); + Owned hostGroupIter = globalConfig->getElements("storage/hostGroups"); //Process the groups in order - so that multiple levels of indirection are supported ForEach (*hostGroupIter) { @@ -33,7 +35,15 @@ void normalizeHostGroups() { const char * name = cur.queryProp("@name"); const char * baseGroup = cur.queryProp("@hostGroup"); - Owned match = getHostGroup(baseGroup, true); + if (!baseGroup) + throw makeStringExceptionV(-1, "HostGroup %s with no hosts does not have a base hostgroup", name ? name : ""); + + //Cannot call getHostGroup() because that uses getGlobalConfig() + VStringBuffer xpath("storage/hostGroups[@name='%s']", baseGroup); + IPropertyTree * match = globalConfig->queryPropTree(xpath); + if (!match) + throw makeStringExceptionV(-1, "No entry found for hostGroup: '%s'", baseGroup); + StringArray hosts; Owned hostIter = match->getElements("hosts"); ForEach (*hostIter) diff --git a/dali/base/dameta.hpp b/dali/base/dameta.hpp index c791db82960..f392f56e5f8 100644 --- a/dali/base/dameta.hpp +++ b/dali/base/dameta.hpp @@ -58,6 +58,6 @@ constexpr ResolveOptions operator &(ResolveOptions l, ResolveOptions r) { return */ extern da_decl IPropertyTree * resolveLogicalFilenameFromDali(const char * filename, IUserDescriptor * user, ResolveOptions options); -extern da_decl void normalizeHostGroups(); // Expand indirect hostGroups so each hostGroups has an expanded list of host names +extern da_decl void normalizeHostGroups(IPropertyTree * globalConfig); // Expand indirect hostGroups so each hostGroups has an expanded list of host names #endif diff --git a/dali/base/dautils.cpp b/dali/base/dautils.cpp index 7ad40dfd672..d25e0a7ef9a 100644 --- a/dali/base/dautils.cpp +++ b/dali/base/dautils.cpp @@ -88,8 +88,9 @@ bool getPlaneHost(StringBuffer &host, IPropertyTree *plane, unsigned which) if (!hostGroup) return false; - if (which >= hostGroup->getCount("hosts")) - throw makeStringException(0, "getPlaneHost: index out of range"); + unsigned maxHosts = hostGroup->getCount("hosts"); + if (which >= maxHosts) + throw makeStringExceptionV(0, "getPlaneHost: index %u out of range 1..%u", which, maxHosts); VStringBuffer xpath("hosts[%u]", which+1); // which is 0 based host.append(hostGroup->queryProp(xpath)); return true; diff --git a/dali/daliadmin/daadmin.cpp b/dali/daliadmin/daadmin.cpp index 4274d44db10..87988ead887 100644 --- a/dali/daliadmin/daadmin.cpp +++ b/dali/daliadmin/daadmin.cpp @@ -507,7 +507,7 @@ bool dfspart(const char *lname, IUserDescriptor *userDesc, unsigned partnum, Str void dfsmeta(const char *filename,IUserDescriptor *userDesc, bool includeStorage) { //This function isn't going to work on a container system because it won't have access to the storage planes - initializeStorageGroups(true); + initializeStoragePlanes(true, true); ResolveOptions options = ROpartinfo|ROdiskinfo|ROsizes; if (includeStorage) options = options | ROincludeLocation; diff --git a/dali/dfu/dfuserver.cpp b/dali/dfu/dfuserver.cpp index 514bcd3655a..47417d272a3 100644 --- a/dali/dfu/dfuserver.cpp +++ b/dali/dfu/dfuserver.cpp @@ -198,7 +198,7 @@ int main(int argc, const char *argv[]) IPropertyTree * config = nullptr; installDefaultFileHooks(config); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); } StringBuffer queue, monitorQueue; #ifndef _CONTAINERIZED diff --git a/dali/dfu/dfuutil.cpp b/dali/dfu/dfuutil.cpp index ead2a9478c8..91cb09981bc 100644 --- a/dali/dfu/dfuutil.cpp +++ b/dali/dfu/dfuutil.cpp @@ -513,7 +513,7 @@ class CFileCloner dstfdesc->setDefaultDir(dstdir.str()); Owned plane = getDataStoragePlane(cluster1, false); - if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStorageGroups has been called) + if (plane) // I think it should always exist, even in bare-metal.., but guard against it not for now (assumes initializeStoragePlanes has been called) { DBGLOG("cloneSubFile: destfilename='%s', plane='%s', dirPerPart=%s", destfilename, cluster1.get(), boolToStr(plane->queryDirPerPart())); diff --git a/dali/dfuxref/dfuxrefmain.cpp b/dali/dfuxref/dfuxrefmain.cpp index f9d772a69ea..650d3184679 100644 --- a/dali/dfuxref/dfuxrefmain.cpp +++ b/dali/dfuxref/dfuxrefmain.cpp @@ -76,7 +76,7 @@ int main(int argc, char* argv[]) try { initClientProcess(group, DCR_XRef); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); StringArray args, clusters; bool backupcheck = false; unsigned mode = PMtextoutput|PMcsvoutput|PMtreeoutput; diff --git a/dali/sasha/saserver.cpp b/dali/sasha/saserver.cpp index 16f808542cd..acdf1775772 100644 --- a/dali/sasha/saserver.cpp +++ b/dali/sasha/saserver.cpp @@ -409,7 +409,7 @@ int main(int argc, const char* argv[]) else { addAbortHandler(actionOnAbort); - initializeStorageGroups(true); + initializeStoragePlanes(true, true); #ifdef _CONTAINERIZED service = serverConfig->queryProp("@service"); if (isEmptyString(service)) diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 0af88caa6e2..0acb00ee2ce 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -3785,7 +3785,7 @@ extern int HTHOR_API eclagent_main(int argc, const char *argv[], OwnedgetPropInt("@dafilesrvConnectTimeout", 10)*1000; const unsigned dafilesrvReadTimeout = m_cfg->getPropInt("@dafilesrvReadTimeout", 10)*1000; diff --git a/esp/platform/espp.cpp b/esp/platform/espp.cpp index 20d3898d1f3..230033abe45 100644 --- a/esp/platform/espp.cpp +++ b/esp/platform/espp.cpp @@ -578,7 +578,7 @@ int init_main(int argc, const char* argv[]) config->checkESPCache(*server.get()); initializeMetrics(config); - initializeStorageGroups(daliClientActive()); + initializeStoragePlanes(daliClientActive(), true); } catch(IException* e) { diff --git a/roxie/ccd/ccddali.cpp b/roxie/ccd/ccddali.cpp index bdab2a2c650..f5f39215204 100644 --- a/roxie/ccd/ccddali.cpp +++ b/roxie/ccd/ccddali.cpp @@ -246,6 +246,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface virtual void beforeDispose() { CriticalBlock b(daliHelperCrit); + disconnectSem.interrupt(); connectWatcher.stop(); if (daliHelper==this) // there is a tiny window where new dalihelper created immediately after final release @@ -805,7 +806,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface waitToConnect -= delay; } } - initializeStorageGroups(daliHelper->connected()); + initializeStoragePlanes(daliHelper->connected(), false); // This can be called while queries are running - so is not thread safe return daliHelper; } @@ -928,6 +929,7 @@ class CRoxieDaliHelper : implements IRoxieDaliHelper, public CInterface CriticalBlock b(daliConnectionCrit); if (isConnected) { + disableStoragePlanesDaliUpdates(); isConnected = false; delete serverStatus; serverStatus = NULL; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index fa36b53862e..b466d8e2d7d 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -1463,7 +1463,7 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) #endif //MORE: I'm not sure where this should go, or how it fits in. Possibly the function needs to be split in two. - initializeStorageGroups(false); + initializeStoragePlanes(false, true); EnableSEHtoExceptionMapping(); setSEHtoExceptionHandler(&abortHandler); diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 6f4b6d3017e..2432e6c3a8b 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8707,11 +8707,11 @@ class CConfigUpdater : public CInterface StringAttr componentTag, envPrefix, legacyFilename; IPropertyTree * (*mapper)(IPropertyTree *); StringAttr altNameAttribute; - Owned fileWatcher; + Owned fileWatcher; // null if updates to the config file are not allowed CriticalSection notifyFuncCS; unsigned notifyFuncId = 0; std::unordered_map notifyConfigUpdates; - std::vector pendingInitializeFuncIds; + std::unordered_map modifyConfigUpdates; public: CConfigUpdater() @@ -8721,10 +8721,9 @@ class CConfigUpdater : public CInterface { return args.ordinality(); // NB: null terminated, so always >=1 if initialized } - void init(const char *_absoluteConfigFilename, IPropertyTree *_componentDefault, IPropertyTree *_globalDefault, const char * * argv, const char * _componentTag, const char * _envPrefix, const char *_legacyFilename, IPropertyTree * (_mapper)(IPropertyTree *), const char *_altNameAttribute) + void init(IPropertyTree *_componentDefault, IPropertyTree *_globalDefault, const char * * argv, const char * _componentTag, const char * _envPrefix, const char *_legacyFilename, IPropertyTree * (_mapper)(IPropertyTree *), const char *_altNameAttribute) { dbgassertex(!isInitialized()); - absoluteConfigFilename.set(_absoluteConfigFilename); componentDefault.set(_componentDefault); globalDefault.set(_globalDefault); componentTag.set(_componentTag); @@ -8736,54 +8735,28 @@ class CConfigUpdater : public CInterface args.append(arg); args.append(nullptr); - Owned config = getComponentConfig(); - Owned global = getGlobalConfig(); - while (pendingInitializeFuncIds.size()) - { - unsigned notifyFuncId = pendingInitializeFuncIds.back(); - pendingInitializeFuncIds.pop_back(); - ConfigUpdateFunc notifyFunc = notifyConfigUpdates[notifyFuncId]; - notifyFunc(config, global); - } + refreshConfiguration(true, true); } bool startMonitoring() { #if !defined(__linux__) // file moinitoring only supported in Linux (because createFileEventWatcher only implemented in Linux at the moment) return false; #endif - if (0 == absoluteConfigFilename.length() || (nullptr != fileWatcher.get())) + if (absoluteConfigFilename.isEmpty() || (nullptr != fileWatcher.get())) return false; + auto updateFunc = [&](const char *filename, FileWatchEvents events) { bool changed = containsFileWatchEvents(events, FileWatchEvents::closedWrite) && streq(filename, configFilename); -#ifdef _CONTAINERIZED - // NB: in k8s, it's a little strange, the config file is in a linked dir, a new dir is created and swapped in. - changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); -#endif - if (changed) - { - auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - // NB: block calls to get*Config*() until callbacks notified and new swapped in - CriticalBlock b(configCS); - Owned oldComponentConfiguration = componentConfiguration.getClear(); - Owned oldGlobalConfiguration = globalConfiguration.getClear(); + // NB: in k8s, it's a little strange, the config file is in a linked dir, a new dir is created and swapped in. + if (isContainerized()) + changed = changed | containsFileWatchEvents(events, FileWatchEvents::movedTo) && streq(filename, "..data"); - /* swapin before callbacks called, but before releasing crit. - * That way the CB can see the old/diffs and act on them, but any - * code calling e.g. getComponentConfig() will see new. - */ - componentConfiguration.setown(std::get<1>(result)); - globalConfiguration.setown(std::get<2>(result)); - if (!componentName) - componentConfiguration->getProp("@name", componentName); - - /* NB: we are still holding 'configCS' at this point, blocking all other thread access. - However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ - executeCallbacks(oldComponentConfiguration, oldGlobalConfiguration); - absoluteConfigFilename.set(std::get<0>(result).c_str()); - } + if (changed) + refreshConfiguration(false, false); }; + try { fileWatcher.setown(createFileEventWatcher(updateFunc)); @@ -8802,7 +8775,75 @@ class CConfigUpdater : public CInterface } return true; } - void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) + + void refreshConfiguration(IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + { + CriticalBlock b(notifyFuncCS); + //Ensure all modifications to the config take place before the config is updated, and the monitoring/caching functions are called. + executeModifyCallbacks(newComponentConfiguration, newGlobalConfiguration); + + // NB: block calls to get*Config*() from other threads until callbacks notified and new swapped in, destroy old config outside the critical section + Owned oldComponentConfiguration; + Owned oldGlobalConfiguration; + { + CriticalBlock b(configCS); + oldComponentConfiguration.setown(componentConfiguration.getClear()); + oldGlobalConfiguration.setown(globalConfiguration.getClear()); + + /* swapin before callbacks called, but before releasing crit. + * That way the CB can see the old/diffs and act on them, but any + * code calling e.g. getComponentConfig() will see new. + */ + componentConfiguration.set(newComponentConfiguration); + globalConfiguration.set(newGlobalConfiguration); + + /* NB: we are still holding 'configCS' at this point, blocking all other thread access. + However code in callbacks may call e.g. getComponentConfig() and re-enter the crit */ + executeNotifyCallbacks(oldComponentConfiguration, oldGlobalConfiguration); + } + } + + void refreshConfiguration(bool firstTime, bool avoidClone) + { + if (firstTime || fileWatcher) + { + auto result = doLoadConfiguration(componentDefault, globalDefault, args.getArray(), componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + IPropertyTree * newComponentConfiguration = std::get<1>(result); + IPropertyTree * newGlobalConfiguration = std::get<2>(result); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + + if (firstTime) + { + absoluteConfigFilename.set(std::get<0>(result).c_str()); + newGlobalConfiguration->getProp("@name", componentName); + } + } + else if (avoidClone) + { + //This is added during the initialiation phase, so no danger of other threads accesing the global information + //while it is updated. Thor currently relies on the pointer not changing. + Owned newComponentConfiguration = getComponentConfigSP(); + Owned newGlobalConfiguration = getGlobalConfigSP(); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + } + else + { + // File monitor is disabled - no updates to the configuration files are supported. + //So clone the existing configuration and use that to refresh the config - update fucntions may perform differently. + Owned newComponentConfiguration = createPTreeFromIPT(getComponentConfigSP()); + Owned newGlobalConfiguration = createPTreeFromIPT(getGlobalConfigSP()); + refreshConfiguration(newComponentConfiguration, newGlobalConfiguration); + } + } + + void executeNotifyCallbacks() + { + CriticalBlock notifyBlock(notifyFuncCS); + CriticalBlock configBlock(configCS); + executeNotifyCallbacks(componentConfiguration, globalConfiguration); + } + + void executeNotifyCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) { for (const auto &item: notifyConfigUpdates) { @@ -8817,6 +8858,23 @@ class CConfigUpdater : public CInterface } } } + + void executeModifyCallbacks(IPropertyTree * newComponentConfiguration, IPropertyTree * newGlobalConfiguration) + { + for (auto & modifyFunc : modifyConfigUpdates) + { + try + { + modifyFunc.second(newComponentConfiguration, newGlobalConfiguration); + } + catch (IException *e) + { + EXCLOG(e, "CConfigUpdater callback"); + e->Release(); + } + } + } + unsigned addNotifyFunc(ConfigUpdateFunc notifyFunc, bool callWhenInstalled) { CriticalBlock b(notifyFuncCS); @@ -8826,29 +8884,36 @@ class CConfigUpdater : public CInterface { if (isInitialized()) notifyFunc(getComponentConfigSP(), getGlobalConfigSP()); - else - { - // If the configuration is not yet be loaded, track notify callbacks that - // want to be initialized on install, and call during CConfigUpdater::init. - pendingInitializeFuncIds.push_back(notifyFuncId); - } + } + return notifyFuncId; + } + unsigned addModifyFunc(ConfigModifyFunc notifyFunc, bool threadSafe) + { + CriticalBlock b(notifyFuncCS); + notifyFuncId++; + modifyConfigUpdates[notifyFuncId] = notifyFunc; + if (isInitialized()) + { + //Force all cached values to be recalculated, do not reload the config + //This is only legal if no other threads are accessing the config yet - otherwise the reading thread + //could crash when the global configuration is updated. + refreshConfiguration(false, threadSafe); + DBGLOG("Modify functions should be registered before the configuration is loaded"); } return notifyFuncId; } bool removeNotifyFunc(unsigned funcId) { CriticalBlock b(notifyFuncCS); + if (modifyConfigUpdates.erase(funcId) != 0) + return true; + auto it = notifyConfigUpdates.find(funcId); if (it == notifyConfigUpdates.end()) return false; ConfigUpdateFunc notifyFunc = it->second; notifyConfigUpdates.erase(it); - if (!isInitialized()) - { - auto it = std::remove(pendingInitializeFuncIds.begin(), pendingInitializeFuncIds.end(), funcId); - pendingInitializeFuncIds.erase(it, pendingInitializeFuncIds.end()); - } return true; } }; @@ -8873,6 +8938,13 @@ unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInsta return configFileUpdater->addNotifyFunc(notifyFunc, callWhenInstalled); } +jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc, bool threadSafe) // This function must be called before the configuration is loaded. +{ + if (!configFileUpdater) // NB: installConfigUpdateHook should always be called after configFileUpdater is initialized + return 0; + return configFileUpdater->addModifyFunc(notifyFunc, threadSafe); +} + void removeConfigUpdateHook(unsigned notifyFuncId) { if (0 == notifyFuncId) @@ -8883,11 +8955,11 @@ void removeConfigUpdateHook(unsigned notifyFuncId) WARNLOG("removeConfigUpdateHook(): notifyFuncId %u not installed", notifyFuncId); } -void executeConfigUpdaterCallbacks() +void refreshConfiguration() { - if (!configFileUpdater) // NB: executeConfigUpdaterCallbacks should always be called after configFileUpdater is initialized + if (!configFileUpdater) // NB: refreshConfiguration() should always be called after configFileUpdater is initialized return; - configFileUpdater->executeCallbacks(componentConfiguration, globalConfiguration); + configFileUpdater->refreshConfiguration(false, false); } void CConfigUpdateHook::clear() @@ -8914,6 +8986,22 @@ void CConfigUpdateHook::installOnce(ConfigUpdateFunc callbackFunc, bool callWhen } +void CConfigUpdateHook::installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe) +{ + unsigned id = configCBId.load(std::memory_order_acquire); + if ((unsigned)-1 == id) // avoid CS in common case + { + CriticalBlock b(crit); + // check again now in CS + id = configCBId.load(std::memory_order_acquire); + if ((unsigned)-1 == id) + { + id = installConfigUpdateHook(callbackFunc, threadSafe); + configCBId.store(id, std::memory_order_release); + } + } +} + static std::tuple doLoadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute) { Owned newComponentConfig = createPTreeFromIPT(componentDefault); @@ -9047,7 +9135,10 @@ static std::tuple doLoadConfigura #ifdef _DEBUG // NB: don't re-hold, if CLI --hold already held. if (!held && newComponentConfig->getPropBool("@hold")) + { holdLoop(); + held = true; + } #endif unsigned ptreeMappingThreshold = newGlobalConfig->getPropInt("@ptreeMappingThreshold", defaultSiblingMapThreshold); @@ -9056,17 +9147,12 @@ static std::tuple doLoadConfigura return std::make_tuple(std::string(absConfigFilename.str()), newComponentConfig.getClear(), newGlobalConfig.getClear()); } -jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) +IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPropertyTree *globalDefault, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) { assertex(configFileUpdater); // NB: loadConfiguration should always be called after configFileUpdater is initialized if (configFileUpdater->isInitialized()) throw makeStringExceptionV(99, "Configuration for component %s has already been initialised", componentTag); - auto result = doLoadConfiguration(componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); - - componentConfiguration.setown(std::get<1>(result)); - globalConfiguration.setown(std::get<2>(result)); - /* In k8s, pods auto-restart by default on monitored ConfigMap settings/areas * ConfigMap settings/areas deliberately not monitored will rely on this config updater mechanism, * and if necessary, installed hooks that are called on an update, to perform updates of cached state. @@ -9082,7 +9168,7 @@ jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPr * installed config hooks to be called when an environment change is detected e.g when pushed to Dali) */ - configFileUpdater->init(std::get<0>(result).c_str(), componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); + configFileUpdater->init(componentDefault, globalDefault, argv, componentTag, envPrefix, legacyFilename, mapper, altNameAttribute); if (monitor) configFileUpdater->startMonitoring(); @@ -9090,7 +9176,7 @@ jlib_decl IPropertyTree * loadConfiguration(IPropertyTree *componentDefault, IPr return componentConfiguration.getLink(); } -jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) +IPropertyTree * loadConfiguration(const char * defaultYaml, const char * * argv, const char * componentTag, const char * envPrefix, const char * legacyFilename, IPropertyTree * (mapper)(IPropertyTree *), const char *altNameAttribute, bool monitor) { if (componentConfiguration) throw makeStringExceptionV(99, "Configuration for component %s has already been initialised", componentTag); @@ -9113,20 +9199,19 @@ jlib_decl IPropertyTree * loadConfiguration(const char * defaultYaml, const char void replaceComponentConfig(IPropertyTree *newComponentConfig, IPropertyTree *newGlobalConfig) { - { - CriticalBlock b(configCS); - componentConfiguration.set(newComponentConfig); - globalConfiguration.set(newGlobalConfig); - } - executeConfigUpdaterCallbacks(); + assertex (configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized + configFileUpdater->refreshConfiguration(newComponentConfig, newGlobalConfig); } void initNullConfiguration() { if (componentConfiguration || globalConfiguration) throw makeStringException(99, "Configuration has already been initialised"); - componentConfiguration.setown(createPTree()); - globalConfiguration.setown(createPTree()); + + assertex(configFileUpdater); // NB: replaceComponentConfig should always be called after configFileUpdater is initialized + Owned newComponentConfig(createPTree()); + Owned newGlobalConfig(createPTree()); + configFileUpdater->refreshConfiguration(newComponentConfig, newGlobalConfig); } class CYAMLBufferReader : public CInterfaceOf diff --git a/system/jlib/jptree.hpp b/system/jlib/jptree.hpp index 40366556d9f..cf985f29ac6 100644 --- a/system/jlib/jptree.hpp +++ b/system/jlib/jptree.hpp @@ -332,9 +332,11 @@ jlib_decl const char * queryComponentName(); // ConfigUpdateFunc calls are made in a mutex, but after new confis are swapped in typedef std::function ConfigUpdateFunc; +typedef std::function ConfigModifyFunc; jlib_decl unsigned installConfigUpdateHook(ConfigUpdateFunc notifyFunc, bool callWhenInstalled); +jlib_decl unsigned installConfigUpdateHook(ConfigModifyFunc notifyFunc); // This function must be called before the configuration is loaded. jlib_decl void removeConfigUpdateHook(unsigned notifyFuncId); -jlib_decl void executeConfigUpdaterCallbacks(); +jlib_decl void refreshConfiguration(); // (Optionally) reload the configuration file, reapply changes, and derive cached information class jlib_decl CConfigUpdateHook { @@ -344,6 +346,7 @@ class jlib_decl CConfigUpdateHook ~CConfigUpdateHook() { clear(); } void clear(); void installOnce(ConfigUpdateFunc callbackFunc, bool callWhenInstalled); + void installModifierOnce(ConfigModifyFunc callbackFunc, bool threadSafe); }; /* diff --git a/thorlcr/master/thmastermain.cpp b/thorlcr/master/thmastermain.cpp index 8e5b1a4e517..16f2df64d02 100644 --- a/thorlcr/master/thmastermain.cpp +++ b/thorlcr/master/thmastermain.cpp @@ -389,6 +389,9 @@ class CRegistryServer : public CSimpleInterface publishPodNames(workunit, graphName, &connectedWorkers); } + //Check that nothing has caused the global configuration to be refreshed - otherwise inconsistent values may be used by the slave + assertex(globals == getComponentConfigSP()); + PROGLOG("Workers connected, initializing.."); msg.clear(); msg.append(THOR_VERSION_MAJOR).append(THOR_VERSION_MINOR); @@ -636,7 +639,9 @@ int main( int argc, const char *argv[] ) InitModuleObjects(); NoQuickEditSection xxx; { - globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, false)); + bool monitorConfig = false; // Do not allow updates to the config file, otherwise the slave may not be in sync. + //MORE: What about updates to storage planes - they will not be passed through to the slaves + globals.setown(loadConfiguration(thorDefaultConfigYaml, argv, "thor", "THOR", "thor.xml", nullptr, nullptr, monitorConfig)); } #ifdef _DEBUG unsigned holdWorker = globals->getPropInt("@holdSlave", NotFound); @@ -754,7 +759,8 @@ int main( int argc, const char *argv[] ) } } - initializeStorageGroups(true); + //This can only be called once dali is initialised + initializeStoragePlanes(true, true); if (globals->getPropBool("@MPChannelReconnect")) getMPServer()->setOpt(mpsopt_channelreopen, "true");