From ff956bef599f5e30b64f491de7d0125f9c486dae Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 22 Nov 2023 09:32:04 +0100 Subject: [PATCH 1/8] Fix selection of correct VariationsFor function --- analyses/cms-open-data-ttbar/analysis.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index f525527..1b1d8ba 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -379,10 +379,10 @@ def main() -> None: ml_results += ml_hist_list # Select the right VariationsFor function depending on RDF or DistRDF - if type(df).__module__ == "DistRDF.Proxy": - variationsfor_func = ROOT.RDF.Experimental.Distributed.VariationsFor - else: + if args.scheduler == "mt": variationsfor_func = ROOT.RDF.Experimental.VariationsFor + else: + variationsfor_func = ROOT.RDF.Experimental.Distributed.VariationsFor for r in results + ml_results: if r.should_vary: r.histo = variationsfor_func(r.histo) From bd2ba72844678f15436836a79e8917ccd7ba9e65 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 22 Nov 2023 12:46:15 +0100 Subject: [PATCH 2/8] Remove some unnecessary branching --- analyses/cms-open-data-ttbar/analysis.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index 1b1d8ba..242431d 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -354,13 +354,10 @@ def main() -> None: else: # Setup for distributed RDataFrame client = create_dask_client(args.scheduler, args.ncores, args.hosts) + ROOT.RDF.Experimental.Distributed.initialize(load_cpp) if args.inference: - ROOT.RDF.Experimental.Distributed.initialize(load_cpp) - if args.inference: - # TODO: make ml.load_cpp working on distributed - ROOT.RDF.Experimental.Distributed.initialize(ml.load_cpp, "./fastforest") - else: - ROOT.RDF.Experimental.Distributed.initialize(load_cpp) + # TODO: make ml.load_cpp working on distributed + ROOT.RDF.Experimental.Distributed.initialize(ml.load_cpp, "./fastforest") run_graphs = ROOT.RDF.Experimental.Distributed.RunGraphs # Book RDataFrame results From 479b010f21eab50c37690e2435795536accd390a Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 22 Nov 2023 15:41:26 +0100 Subject: [PATCH 3/8] Use Vincenzo's safer compile_macro_wrapper --- analyses/cms-open-data-ttbar/analysis.py | 38 ++++++++++++++++++------ 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index 242431d..7734edb 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -313,18 +313,38 @@ def book_histos( return (results, ml_results) +def compile_macro_wrapper(library_path: str): + ROOT.gInterpreter.Declare( + ''' + #ifndef R__COMPILE_MACRO_WRAPPER + #define R__COMPILE_MACRO_WRAPPER + int CompileMacroWrapper(const std::string &library_path) + { + R__LOCKGUARD(gInterpreterMutex); + return gSystem->CompileMacro(library_path.c_str(), "kO"); + } + #endif // R__COMPILE_MACRO_WRAPPER + ''') + + if ROOT.CompileMacroWrapper(library_path) != 1: + raise RuntimeError("Failure in TSystem::CompileMacro!") + def load_cpp(): - """Load C++ helper functions. Works for both local and distributed execution.""" try: - # when using distributed RDataFrame 'helpers.cpp' is copied to the local_directory - # of every worker (via `distribute_unique_paths`) - localdir = get_worker().local_directory - cpp_source = Path(localdir) / "helpers.h" + this_worker = get_worker() except ValueError: - # must be local execution - cpp_source = "helpers.h" - - ROOT.gSystem.CompileMacro(str(cpp_source), "kO") + print("Not on a worker") + return + + if not hasattr(this_worker, "is_library_loaded"): + print("Compiling the macro.") + library_source = "helpers.h" + local_dir = get_worker().local_directory + library_path = os.path.join(local_dir, library_source) + compile_macro_wrapper(library_path) + this_worker.is_library_loaded = True + else: + print("Didn't try to compile the macro.") def main() -> None: From 2fc21fcd9fa4ad566f2e1d04a92c075bbf3c9dfc Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 22 Nov 2023 15:42:37 +0100 Subject: [PATCH 4/8] Do not call Distributed.initialize multiple times It doesn't work. --- analyses/cms-open-data-ttbar/analysis.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index 7734edb..49b492a 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -374,10 +374,15 @@ def main() -> None: else: # Setup for distributed RDataFrame client = create_dask_client(args.scheduler, args.ncores, args.hosts) - ROOT.RDF.Experimental.Distributed.initialize(load_cpp) if args.inference: + def load_all(fastforest_path): + load_cpp() + ml.load_cpp(fastforest_path) + # TODO: make ml.load_cpp working on distributed - ROOT.RDF.Experimental.Distributed.initialize(ml.load_cpp, "./fastforest") + ROOT.RDF.Experimental.Distributed.initialize(load_all, "./fastforest") + else: + ROOT.RDF.Experimental.Distributed.initialize(load_cpp) run_graphs = ROOT.RDF.Experimental.Distributed.RunGraphs # Book RDataFrame results From 66e04356583a205394dfafce4dd6baac6041dfab Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Wed, 22 Nov 2023 15:45:28 +0100 Subject: [PATCH 5/8] Add missing DynamicPath invocation in ml.load_cpp 'm not 100% sure it's needed, but it should not hurt. --- analyses/cms-open-data-ttbar/ml.py | 1 + 1 file changed, 1 insertion(+) diff --git a/analyses/cms-open-data-ttbar/ml.py b/analyses/cms-open-data-ttbar/ml.py index a134c36..4826d58 100644 --- a/analyses/cms-open-data-ttbar/ml.py +++ b/analyses/cms-open-data-ttbar/ml.py @@ -93,6 +93,7 @@ def load_cpp(fastforest_path, max_n_jets=6): lib = os.path.join(fastforest_path, "lib") # path for libraries ROOT.gSystem.AddIncludePath(f"-I{include}") ROOT.gSystem.AddLinkedLibs(f"-L{lib} -lfastforest") + ROOT.gSystem.AddDynamicPath(f"{lib}") ROOT.gSystem.Load(f"{lib}/libfastforest.so.1") ROOT.gSystem.CompileMacro("ml_helpers.cpp", "kO") From 36cfbffae3f7cabeb721d6d1bc547382106adf8b Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Mon, 27 Nov 2023 10:32:16 +0100 Subject: [PATCH 6/8] Misc improvements to ml.py for distRDF - wrap CompileMacro call to make it thread-safe and check its return value - add several sanity checks --- analyses/cms-open-data-ttbar/ml.py | 48 ++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 6 deletions(-) diff --git a/analyses/cms-open-data-ttbar/ml.py b/analyses/cms-open-data-ttbar/ml.py index 4826d58..7fc3bf2 100644 --- a/analyses/cms-open-data-ttbar/ml.py +++ b/analyses/cms-open-data-ttbar/ml.py @@ -1,8 +1,10 @@ import os +import sys from dataclasses import dataclass from typing import Tuple import ROOT +from distributed import get_worker # histogram bin lower limit to use for each ML input feature bin_low = [0, 0, 0, 0, 50, 50, 50, 50, 25, 25, 25, 25, 0, 0, 0, 0, -1, -1, -1, -1] @@ -74,6 +76,24 @@ class MLHistoConf: ] +def compile_macro_wrapper(library_path: str): + ROOT.gInterpreter.Declare( + """ + #ifndef R__COMPILE_MACRO_WRAPPER + #define R__COMPILE_MACRO_WRAPPER + int CompileMacroWrapper(const std::string &library_path) + { + R__LOCKGUARD(gInterpreterMutex); + return gSystem->CompileMacro(library_path.c_str(), "kO"); + } + #endif // R__COMPILE_MACRO_WRAPPER + """ + ) + + if ROOT.CompileMacroWrapper(library_path) != 1: + raise RuntimeError("Failure in TSystem::CompileMacro!") + + def load_cpp(fastforest_path, max_n_jets=6): # the default value of max_n_jets is the same as in the refererence implementation # https://github.com/iris-hep/analysis-grand-challenge @@ -91,17 +111,30 @@ def load_cpp(fastforest_path, max_n_jets=6): include = os.path.join(fastforest_path, "include") # path for headers lib = os.path.join(fastforest_path, "lib") # path for libraries + if not os.path.exists(include) or not os.path.exists(lib): + raise RuntimeError("Cannot find fastforest include/library paths.") ROOT.gSystem.AddIncludePath(f"-I{include}") + ROOT.gInterpreter.AddIncludePath(include) ROOT.gSystem.AddLinkedLibs(f"-L{lib} -lfastforest") ROOT.gSystem.AddDynamicPath(f"{lib}") ROOT.gSystem.Load(f"{lib}/libfastforest.so.1") - ROOT.gSystem.CompileMacro("ml_helpers.cpp", "kO") + + try: + this_worker = get_worker() + except ValueError: + print("Not on a worker", file=sys.stderr) + return + + library_source = "ml_helpers.cpp" + local_dir = this_worker.local_directory + library_path = os.path.join(local_dir, library_source) + compile_macro_wrapper(library_path) # Initialize FastForest models. # Our BDT models have 20 input features according to the AGC documentation # https://agc.readthedocs.io/en/latest/taskbackground.html#machine-learning-component - ROOT.gInterpreter.Declare( + res = ROOT.gInterpreter.Declare( # **Conditional derectives used to avoid redefinition error during distributed computing** # Note: # * moving all stuff in `Declare` to `ml_helpers.cpp` cancels the necessity of using `ifndef` @@ -110,19 +143,22 @@ def load_cpp(fastforest_path, max_n_jets=6): """ #ifndef AGC_MODELS #define AGC_MODELS + #include "fastforest.h" - const std::map fastforest_models = get_fastforests("models/"); + const std::map fastforest_models = get_fastforests("{fastforest_path}/../models/"); const fastforest::FastForest& feven = fastforest_models.at("even"); const fastforest::FastForest& fodd = fastforest_models.at("odd"); - """.__add__( - f""" + size_t max_n_jets = {max_n_jets}; std::map> permutations = get_permutations_dict(max_n_jets); #endif - """ + """.format( + fastforest_path=fastforest_path, max_n_jets=max_n_jets ) ) + if not res: + raise RuntimeError("There was a problem invoking Declare.") def define_features(df: ROOT.RDataFrame) -> ROOT.RDataFrame: From be98ba6858a0bf48c5be7f994d630ec8ece42387 Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Mon, 27 Nov 2023 10:35:38 +0100 Subject: [PATCH 7/8] Add notebook that runs the analysis with DistRDF on SWAN+HTCondor --- .../analysis_distrdf_swan.ipynb | 515 ++++++++++++++++++ 1 file changed, 515 insertions(+) create mode 100644 analyses/cms-open-data-ttbar/analysis_distrdf_swan.ipynb diff --git a/analyses/cms-open-data-ttbar/analysis_distrdf_swan.ipynb b/analyses/cms-open-data-ttbar/analysis_distrdf_swan.ipynb new file mode 100644 index 0000000..ac94bba --- /dev/null +++ b/analyses/cms-open-data-ttbar/analysis_distrdf_swan.ipynb @@ -0,0 +1,515 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "dd7bd568-d02c-40bd-884f-9a115968ec40", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import os\n", + "from pathlib import Path\n", + "from time import time\n", + "from typing import Optional, Tuple\n", + "from dataclasses import dataclass\n", + "\n", + "import ml\n", + "import ROOT\n", + "from distributed import Client, LocalCluster, SSHCluster, get_worker\n", + "from ml import (\n", + " define_features,\n", + " infer_output_ml_features,\n", + " ml_features_config,\n", + ")\n", + "from plotting import save_ml_plots, save_plots\n", + "from utils import (\n", + " AGCInput,\n", + " AGCResult,\n", + " postprocess_results,\n", + " retrieve_inputs,\n", + " save_histos,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6619bc6-d5fa-4253-88cb-f9a20890f7f3", + "metadata": {}, + "outputs": [], + "source": [ + "# NOTE: the client's URL need to be adapted to the actual configuration\n", + "from dask.distributed import Client\n", + "\n", + "client = Client(\"tls://10.100.218.100:30448\")\n", + "client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d237b0d4-74f5-4f3d-be2e-72a7cc045b8a", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Using https://atlas-groupdata.web.cern.ch/atlas-groupdata/dev/AnalysisTop/TopDataPreparation/XSection-MC15-13TeV.data\n", + "# as a reference. Values are in pb.\n", + "XSEC_INFO = {\n", + " \"ttbar\": 396.87 + 332.97, # nonallhad + allhad, keep same x-sec for all\n", + " \"single_top_s_chan\": 2.0268 + 1.2676,\n", + " \"single_top_t_chan\": (36.993 + 22.175) / 0.252, # scale from lepton filter to inclusive\n", + " \"single_top_tW\": 37.936 + 37.906,\n", + " \"wjets\": 61457 * 0.252, # e/mu+nu final states\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4dc262ea-647e-483b-98a8-797efd242758", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "@dataclass\n", + "class Args:\n", + " n_max_files_per_sample = 1\n", + " data_cache = None # shouldn't be used\n", + " remote_data_prefix = 'root://eospublic.cern.ch//eos/root-eos/AGC'\n", + " output = \"histograms.root\"\n", + " inference = True\n", + " scheduler = \"dask-htcondor-swan\"\n", + " ncores = None # shouldn't be used\n", + " npartitions = 2\n", + " hosts = None # shouldn't be used\n", + " verbose = False" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4b52886b-c8e1-4a1d-ae62-9009d149a391", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def create_dask_client(scheduler: str, ncores: int, hosts: str) -> Client:\n", + " return client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "36860eca-f98b-49d7-ad96-9476e1a81d53", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def make_rdf(\n", + " files: list[str], client: Optional[Client], npartitions: Optional[int]\n", + ") -> ROOT.RDataFrame:\n", + " \"\"\"Construct and return a dataframe or, if a dask client is present, a distributed dataframe.\"\"\"\n", + " if client is not None:\n", + " d = ROOT.RDF.Experimental.Distributed.Dask.RDataFrame(\n", + " \"Events\", files, daskclient=client, npartitions=npartitions\n", + " )\n", + " d._headnode.backend.distribute_unique_paths(\n", + " [\n", + " \"helpers.h\",\n", + " \"ml_helpers.cpp\",\n", + " \"ml.py\",\n", + " ]\n", + " )\n", + " return d\n", + "\n", + " return ROOT.RDataFrame(\"Events\", files)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "62ba01d2-48f6-4963-a529-37bd036cb3be", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def define_trijet_mass(df: ROOT.RDataFrame) -> ROOT.RDataFrame:\n", + " \"\"\"Add the trijet_mass observable to the dataframe after applying the appropriate selections.\"\"\"\n", + "\n", + " # First, select events with at least 2 b-tagged jets\n", + " df = df.Filter(\"Sum(Jet_btagCSVV2_cut > 0.5) > 1\")\n", + "\n", + " # Build four-momentum vectors for each jet\n", + " df = df.Define(\"Jet_p4\", \"ConstructP4(Jet_pt_cut, Jet_eta_cut, Jet_phi_cut, Jet_mass_cut)\")\n", + "\n", + " # Build trijet combinations\n", + " df = df.Define(\"Trijet_idx\", \"Combinations(Jet_pt_cut, 3)\")\n", + "\n", + " # Trijet_btag is a helpful array mask indicating whether or not the maximum btag value in Trijet is larger than the 0.5 threshold\n", + " df = df.Define(\n", + " \"Trijet_btag\",\n", + " \"\"\"\n", + " auto J1_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[0]);\n", + " auto J2_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[1]);\n", + " auto J3_btagCSVV2 = Take(Jet_btagCSVV2_cut, Trijet_idx[2]);\n", + " return J1_btagCSVV2 > 0.5 || J2_btagCSVV2 > 0.5 || J3_btagCSVV2 > 0.5;\n", + " \"\"\",\n", + " )\n", + "\n", + " # Assign four-momentums to each trijet combination\n", + " df = df.Define(\n", + " \"Trijet_p4\",\n", + " \"\"\"\n", + " auto J1 = Take(Jet_p4, Trijet_idx[0]);\n", + " auto J2 = Take(Jet_p4, Trijet_idx[1]);\n", + " auto J3 = Take(Jet_p4, Trijet_idx[2]);\n", + " return (J1+J2+J3)[Trijet_btag];\n", + " \"\"\",\n", + " )\n", + "\n", + " # Get trijet transverse momentum values from four-momentum vectors\n", + " df = df.Define(\n", + " \"Trijet_pt\",\n", + " \"return Map(Trijet_p4, [](const ROOT::Math::PxPyPzMVector &v) { return v.Pt(); })\",\n", + " )\n", + "\n", + " # Evaluate mass of trijet with maximum pt and btag higher than threshold\n", + " df = df.Define(\"Trijet_mass\", \"Trijet_p4[ArgMax(Trijet_pt)].M()\")\n", + "\n", + " return df" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b6725ff1-18f0-457d-b5c1-ca8e41407fa9", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def book_histos(\n", + " df: ROOT.RDataFrame, process: str, variation: str, nevents: int, inference=False\n", + ") -> Tuple[list[AGCResult], list[AGCResult]]:\n", + " \"\"\"Return the pair of lists of RDataFrame results pertaining to the given process and variation.\n", + " The first list contains histograms of reconstructed HT and trijet masses.\n", + " The second contains ML inference outputs\"\"\"\n", + " # Calculate normalization for MC\n", + " x_sec = XSEC_INFO[process]\n", + " lumi = 3378 # /pb\n", + " xsec_weight = x_sec * lumi / nevents\n", + " df = df.Define(\"Weights\", str(xsec_weight)) # default weights\n", + "\n", + " if variation == \"nominal\":\n", + " # Jet_pt variations definition\n", + " # pt_scale_up() and pt_res_up(jet_pt) return scaling factors applying to jet_pt\n", + " # pt_scale_up() - jet energy scaly systematic\n", + " # pt_res_up(jet_pt) - jet resolution systematic\n", + " df = df.Vary(\n", + " \"Jet_pt\",\n", + " \"ROOT::RVec{Jet_pt*pt_scale_up(), Jet_pt*jet_pt_resolution(Jet_pt.size())}\",\n", + " [\"pt_scale_up\", \"pt_res_up\"],\n", + " )\n", + "\n", + " if process == \"wjets\":\n", + " # Flat weight variation definition\n", + " df = df.Vary(\n", + " \"Weights\",\n", + " \"Weights*flat_variation()\",\n", + " [f\"scale_var_{direction}\" for direction in [\"up\", \"down\"]],\n", + " )\n", + "\n", + " # Event selection - the core part of the algorithm applied for both regions\n", + " # Selecting events containing at least one lepton and four jets with pT > 25 GeV\n", + " # Applying requirement at least one of them must be b-tagged jet (see details in the specification)\n", + " df = (\n", + " df.Define(\n", + " \"Electron_mask\",\n", + " \"Electron_pt > 30 && abs(Electron_eta) < 2.1 && Electron_sip3d < 4 && Electron_cutBased == 4\",\n", + " )\n", + " .Define(\n", + " \"Muon_mask\",\n", + " \"Muon_pt > 30 && abs(Muon_eta) < 2.1 && Muon_sip3d < 4 && Muon_tightId && Muon_pfRelIso04_all < 0.15\",\n", + " )\n", + " .Filter(\"Sum(Electron_mask) + Sum(Muon_mask) == 1\")\n", + " .Define(\"Jet_mask\", \"Jet_pt > 30 && abs(Jet_eta) < 2.4 && Jet_jetId == 6\")\n", + " .Filter(\"Sum(Jet_mask) >= 4\")\n", + " )\n", + "\n", + " # create columns for \"good\" jets\n", + " df = (\n", + " df.Define(\"Jet_pt_cut\", \"Jet_pt[Jet_mask]\")\n", + " .Define(\"Jet_btagCSVV2_cut\", \"Jet_btagCSVV2[Jet_mask]\")\n", + " .Define(\"Jet_eta_cut\", \"Jet_eta[Jet_mask]\")\n", + " .Define(\"Jet_phi_cut\", \"Jet_phi[Jet_mask]\")\n", + " .Define(\"Jet_mass_cut\", \"Jet_mass[Jet_mask]\")\n", + " )\n", + "\n", + " # b-tagging variations for nominal samples\n", + " if variation == \"nominal\":\n", + " df = df.Vary(\n", + " \"Weights\",\n", + " \"ROOT::RVecD{Weights*btag_weight_variation(Jet_pt_cut)}\",\n", + " [\n", + " f\"{weight_name}_{direction}\"\n", + " for weight_name in [f\"btag_var_{i}\" for i in range(4)]\n", + " for direction in [\"up\", \"down\"]\n", + " ],\n", + " )\n", + "\n", + " # Define HT observable for the 4j1b region\n", + " # Only one b-tagged region required\n", + " # The observable is the total transvesre momentum\n", + " # fmt: off\n", + " df4j1b = df.Filter(\"Sum(Jet_btagCSVV2_cut > 0.5) == 1\")\\\n", + " .Define(\"HT\", \"Sum(Jet_pt_cut)\")\n", + " # fmt: on\n", + "\n", + " # Define trijet_mass observable for the 4j2b region (this one is more complicated)\n", + " df4j2b = define_trijet_mass(df)\n", + "\n", + " # Book histograms and, if needed, their systematic variations\n", + " results = []\n", + " for df, observable, region in zip([df4j1b, df4j2b], [\"HT\", \"Trijet_mass\"], [\"4j1b\", \"4j2b\"]):\n", + " histo_model = ROOT.RDF.TH1DModel(\n", + " name=f\"{region}_{process}_{variation}\", title=process, nbinsx=25, xlow=50, xup=550\n", + " )\n", + " nominal_histo = df.Histo1D(histo_model, observable, \"Weights\")\n", + "\n", + " if variation == \"nominal\":\n", + " results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=True))\n", + " else:\n", + " results.append(AGCResult(nominal_histo, region, process, variation, nominal_histo, should_vary=False))\n", + " print(f\"Booked histogram {histo_model.fName}\")\n", + "\n", + " ml_results: list[AGCResult] = []\n", + "\n", + " if not inference:\n", + " return (results, ml_results)\n", + "\n", + " df4j2b = define_features(df4j2b)\n", + " df4j2b = infer_output_ml_features(df4j2b)\n", + "\n", + " # Book histograms and, if needed, their systematic variations\n", + " for i, feature in enumerate(ml_features_config):\n", + " histo_model = ROOT.RDF.TH1DModel(\n", + " name=f\"{feature.name}_{process}_{variation}\",\n", + " title=feature.title,\n", + " nbinsx=feature.binning[0],\n", + " xlow=feature.binning[1],\n", + " xup=feature.binning[2],\n", + " )\n", + "\n", + " nominal_histo = df4j2b.Histo1D(histo_model, f\"results{i}\", \"Weights\")\n", + "\n", + " if variation == \"nominal\":\n", + " ml_results.append(\n", + " AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=True)\n", + " )\n", + " else:\n", + " ml_results.append(\n", + " AGCResult(nominal_histo, feature.name, process, variation, nominal_histo, should_vary=False)\n", + " )\n", + " print(f\"Booked histogram {histo_model.fName}\")\n", + "\n", + " # Return the booked results\n", + " # Note that no event loop has run yet at this point (RDataFrame is lazy)\n", + " return (results, ml_results)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "aaa61a1d-74ed-4322-80ef-602e433c705e", + "metadata": {}, + "outputs": [], + "source": [ + "def compile_macro_wrapper(library_path: str):\n", + " ROOT.gInterpreter.Declare(\n", + " '''\n", + " #ifndef R__COMPILE_MACRO_WRAPPER\n", + " #define R__COMPILE_MACRO_WRAPPER\n", + " int CompileMacroWrapper(const std::string &library_path)\n", + " {\n", + " R__LOCKGUARD(gInterpreterMutex);\n", + " return gSystem->CompileMacro(library_path.c_str(), \"kO\");\n", + " }\n", + " #endif // R__COMPILE_MACRO_WRAPPER\n", + " ''')\n", + "\n", + " if ROOT.CompileMacroWrapper(library_path) != 1:\n", + " raise RuntimeError(\"Failure in TSystem::CompileMacro!\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b398a6d1-9f28-4299-ab22-16269c295115", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def load_cpp():\n", + " try:\n", + " this_worker = get_worker()\n", + " except ValueError:\n", + " print(\"Not on a worker\")\n", + " return\n", + "\n", + " if not hasattr(this_worker, \"is_library_loaded\"):\n", + " print(\"Compiling the macro.\")\n", + " library_source = \"helpers.h\"\n", + " local_dir = get_worker().local_directory\n", + " library_path = os.path.join(local_dir, library_source)\n", + " compile_macro_wrapper(library_path)\n", + " this_worker.is_library_loaded = True\n", + " else:\n", + " print(\"Didn't try to compile the macro.\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d580595f-dba8-4112-a57f-29805465cb88", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def main() -> None:\n", + " program_start = time()\n", + " args = Args()\n", + "\n", + " # Do not add histograms to TDirectories automatically: we'll do it ourselves as needed.\n", + " ROOT.TH1.AddDirectory(False)\n", + " # Disable interactive graphics: avoids canvases flashing on screen before we save them to file\n", + " ROOT.gROOT.SetBatch(True)\n", + "\n", + " if args.verbose:\n", + " # Set higher RDF verbosity for the rest of the program.\n", + " # To only change the verbosity in a given scope, use ROOT.Experimental.RLogScopedVerbosity.\n", + " ROOT.Detail.RDF.RDFLogChannel.SetVerbosity(ROOT.Experimental.ELogLevel.kInfo)\n", + "\n", + " if args.scheduler == \"mt\":\n", + " # Setup for local, multi-thread RDataFrame\n", + " ROOT.EnableImplicitMT(args.ncores)\n", + " print(f\"Number of threads: {ROOT.GetThreadPoolSize()}\")\n", + " client = None\n", + " load_cpp()\n", + " if args.inference:\n", + " ml.load_cpp(\"./fastforest\")\n", + "\n", + " run_graphs = ROOT.RDF.RunGraphs\n", + " else:\n", + " # Setup for distributed RDataFrame\n", + " client = create_dask_client(args.scheduler, args.ncores, args.hosts)\n", + " if args.inference:\n", + " def load_all(fastforest_path):\n", + " load_cpp()\n", + " ml.load_cpp(fastforest_path)\n", + "\n", + " fastforest_path=\"/eos/user/e/eguiraud/SWAN_projects/analysis-grand-challenge-root/analyses/cms-open-data-ttbar/fastforest\"\n", + " ROOT.RDF.Experimental.Distributed.initialize(load_all, fastforest_path)\n", + " else:\n", + " ROOT.RDF.Experimental.Distributed.initialize(load_cpp)\n", + " run_graphs = ROOT.RDF.Experimental.Distributed.RunGraphs\n", + "\n", + " # Book RDataFrame results\n", + " inputs: list[AGCInput] = retrieve_inputs(\n", + " args.n_max_files_per_sample, args.remote_data_prefix, args.data_cache\n", + " )\n", + " results: list[AGCResult] = []\n", + " ml_results: list[AGCResult] = []\n", + "\n", + " for input in inputs:\n", + " df = make_rdf(input.paths, client, args.npartitions)\n", + " hist_list, ml_hist_list = book_histos(\n", + " df, input.process, input.variation, input.nevents, inference=args.inference\n", + " )\n", + " results += hist_list\n", + " ml_results += ml_hist_list\n", + "\n", + " # Select the right VariationsFor function depending on RDF or DistRDF\n", + " if args.scheduler == \"mt\":\n", + " variationsfor_func = ROOT.RDF.Experimental.VariationsFor\n", + " else:\n", + " variationsfor_func = ROOT.RDF.Experimental.Distributed.VariationsFor\n", + " for r in results + ml_results:\n", + " if r.should_vary:\n", + " r.histo = variationsfor_func(r.histo)\n", + "\n", + " print(f\"Building the computation graphs took {time() - program_start:.2f} seconds\")\n", + "\n", + " # FIXME remove this debug workaround\n", + " print(\"TEST RUN START\")\n", + " print(results[0].nominal_histo.GetEntries())\n", + " print(\"TEST RUN END\")\n", + "\n", + " # Run the event loops for all processes and variations here\n", + " run_graphs_start = time()\n", + " run_graphs([r.nominal_histo for r in results + ml_results])\n", + "\n", + " print(f\"Executing the computation graphs took {time() - run_graphs_start:.2f} seconds\")\n", + "\n", + " results = postprocess_results(results)\n", + " save_plots(results)\n", + " save_histos([r.histo for r in results], output_fname=args.output)\n", + " print(f\"Result histograms saved in file {args.output}\")\n", + "\n", + " if args.inference:\n", + " ml_results = postprocess_results(ml_results)\n", + " save_ml_plots(ml_results)\n", + " output_fname = args.output.split(\".root\")[0] + \"_ml_inference.root\"\n", + " save_histos([r.histo for r in ml_results], output_fname=output_fname)\n", + " print(f\"Result histograms from ML inference step saved in file {output_fname}\")\n", + "\n", + " # FIXME this was moved down here because it looks like postprocess_results still needs the client,\n", + " # but it might be a side-effect of errors happening in the event loop\n", + " if client is not None:\n", + " client.close()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5e6467d4-81ad-4862-9355-fd97b8a3cdc8", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "main()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} From 6c6cc545e307a3a6d30206769045f45976783e0c Mon Sep 17 00:00:00 2001 From: Enrico Guiraud Date: Mon, 27 Nov 2023 11:33:31 +0100 Subject: [PATCH 8/8] Remove unused import --- analyses/cms-open-data-ttbar/analysis.py | 1 - 1 file changed, 1 deletion(-) diff --git a/analyses/cms-open-data-ttbar/analysis.py b/analyses/cms-open-data-ttbar/analysis.py index 49b492a..900bdb7 100644 --- a/analyses/cms-open-data-ttbar/analysis.py +++ b/analyses/cms-open-data-ttbar/analysis.py @@ -1,6 +1,5 @@ import argparse import os -from pathlib import Path from time import time from typing import Optional, Tuple