Skip to content

Commit

Permalink
Merge pull request #157 from sfegan/main
Browse files Browse the repository at this point in the history
Improve thread handling
  • Loading branch information
sfegan authored Oct 7, 2024
2 parents fc92d78 + 8b1afda commit d6267b7
Show file tree
Hide file tree
Showing 16 changed files with 528 additions and 345 deletions.
6 changes: 4 additions & 2 deletions include/diagnostics/clock_regression.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ class ClockRegressionParallelEventVisitor:

struct ClockTest {
~ClockTest() { for(auto& ibin : bins) { delete ibin.second; } }
const calin::ix::diagnostics::clock_regression::SingleClockRegressionConfig* config = nullptr;
calin::ix::diagnostics::clock_regression::SingleClockRegressionConfig config;
std::map<int, RegressionAccumulator*> bins;
};

struct ModuleClockTest {
const calin::ix::diagnostics::clock_regression::SingleClockRegressionConfig* config = nullptr;
calin::ix::diagnostics::clock_regression::SingleClockRegressionConfig config;
std::vector<ClockTest> modules;
};

Expand All @@ -91,6 +91,8 @@ class ClockRegressionParallelEventVisitor:

calin::ix::diagnostics::clock_regression::ClockRegressionConfig config_;
int rebalance_ = 0;
std::string principal_clock_name_ = {};
int principal_clock_id_ = 0;
std::vector<ClockTest> camera_tests_;
std::vector<ModuleClockTest> module_tests_;
};
Expand Down
48 changes: 35 additions & 13 deletions include/iact_data/cta_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,8 @@
namespace calin { namespace iact_data { namespace cta_data_source {

class CTAZFITSDataSource:
public calin::iact_data::telescope_data_source::TelescopeRandomAccessDataSourceWithRunConfig,
public calin::io::data_source::FragmentList,
private calin::pattern::delegation::Delegator<
calin::iact_data::telescope_data_source::TelescopeRandomAccessDataSourceWithRunConfig>
public calin::iact_data::zfits_data_source::BasicZFITSDataSource,
private calin::pattern::delegation::Delegator<calin::iact_data::zfits_data_source::BasicZFITSDataSource>
{
public:
CALIN_TYPEALIAS(decoder_config_type,
Expand All @@ -54,12 +52,6 @@ class CTAZFITSDataSource:
const calin::ix::iact_data::cta_data_source::CTACameraEventDecoderConfig& decoder_config = default_decoder_config(),
const calin::ix::iact_data::zfits_data_source::ZFITSDataSourceConfig& config = default_config());

CTAZFITSDataSource(const std::string& filename, CTAZFITSDataSource* base_data_source,
const calin::ix::iact_data::zfits_data_source::ZFITSDataSourceConfig& config = default_config());
CTAZFITSDataSource(const std::string& filename,
const calin::ix::iact_data::zfits_data_source::ZFITSDataSourceConfig& config,
CTAZFITSDataSource* base_data_source);

virtual ~CTAZFITSDataSource();

calin::ix::iact_data::telescope_event::TelescopeEvent* get_next(
Expand All @@ -74,11 +66,41 @@ class CTAZFITSDataSource:
unsigned num_fragments() const override;
std::string fragment_name(unsigned index) const override;

config_type config() const { return config_; }
decoder_config_type decoder_config() const { return decoder_config_; }

calin::iact_data::zfits_data_source::BasicZFITSDataSource*
new_of_type(const std::string& filename, const config_type& config,
const calin::ix::iact_data::telescope_run_configuration::
TelescopeRunConfiguration* forced_run_configuration = nullptr) override;

private:
static TelescopeRandomAccessDataSourceWithRunConfig* construct_delegate(
config_type config_;
decoder_config_type decoder_config_;
static calin::iact_data::zfits_data_source::BasicZFITSDataSource* construct_delegate(
std::string filename, config_type config, decoder_config_type decoder_config);
static TelescopeRandomAccessDataSourceWithRunConfig* copy_base_data_source(
std::string filename, config_type config, CTAZFITSDataSource* base_data_source);
};

class CTAZFITSDataSourceFactory:
public calin::io::data_source::DataSourceFactory<
calin::ix::iact_data::telescope_event::TelescopeEvent>
{
public:
CTAZFITSDataSourceFactory(CTAZFITSDataSource* base_data_source,
const calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* run_config,
bool adopt_base_data_source = false, bool adopt_run_config = false):
calin::io::data_source::DataSourceFactory<calin::ix::iact_data::telescope_event::TelescopeEvent>(),
base_data_source_(base_data_source), run_config_(run_config),
adopt_base_data_source_(adopt_base_data_source), adopt_run_config_(adopt_run_config) { }
virtual ~CTAZFITSDataSourceFactory();
calin::io::data_source::DataSource<
calin::ix::iact_data::telescope_event::TelescopeEvent>* new_data_source() override;
private:
CTAZFITSDataSource* base_data_source_ = nullptr;
const calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* run_config_ = nullptr;
bool adopt_base_data_source_ = false;
bool adopt_run_config_ = false;
std::atomic<uint_fast32_t> isource_ { 0 };
};

} } } // namespace calin::iact_data::cta_data_source
26 changes: 16 additions & 10 deletions include/iact_data/parallel_event_dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,30 +59,36 @@ class ParallelEventDispatcher: protected
add_visitor(visitor, std::string{}, adopt_visitor);
}

void process_run(calin::iact_data::telescope_data_source::
void process_src(calin::iact_data::telescope_data_source::
TelescopeRandomAccessDataSourceWithRunConfig* src,
unsigned log_frequency = 0, int nthread = 0);

void process_run(std::vector<calin::iact_data::telescope_data_source::
void process_src(calin::io::data_source::DataSource<
calin::ix::iact_data::telescope_event::TelescopeEvent>* src,
calin::ix::iact_data::
telescope_run_configuration::TelescopeRunConfiguration* run_config,
unsigned log_frequency = 0, int nthread = 0);

void process_src_list(std::vector<calin::iact_data::telescope_data_source::
TelescopeDataSourceWithRunConfig*> src_list,
unsigned log_frequency = 0);

void process_run(std::vector<calin::iact_data::telescope_data_source::
void process_src_list(std::vector<calin::iact_data::telescope_data_source::
TelescopeRandomAccessDataSourceWithRunConfig*> src_list,
unsigned log_frequency = 0);

void process_run(calin::io::data_source::DataSource<
calin::ix::iact_data::telescope_event::TelescopeEvent>* src,
calin::ix::iact_data::
telescope_run_configuration::TelescopeRunConfiguration* run_config,
unsigned log_frequency = 0, int nthread = 0);

void process_run(std::vector<calin::io::data_source::DataSource<
void process_src_list(std::vector<calin::io::data_source::DataSource<
calin::ix::iact_data::telescope_event::TelescopeEvent>*> src_list,
calin::ix::iact_data::
telescope_run_configuration::TelescopeRunConfiguration* merged_run_config,
unsigned log_frequency = 0);

void process_src_factory(calin::io::data_source::DataSourceFactory<
calin::ix::iact_data::telescope_event::TelescopeEvent>* src_factory,
calin::ix::iact_data::
telescope_run_configuration::TelescopeRunConfiguration* run_config,
unsigned log_frequency = 0, int nthread = 0);

void process_cta_zfits_run(const std::string& filename,
const calin::ix::iact_data::event_dispatcher::EventDispatcherConfig& config = default_config());

Expand Down
8 changes: 5 additions & 3 deletions include/iact_data/zfits_acada_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ class ZFITSSingleFileACADACameraEventDataSource:
data_stream_type* get_data_stream() override;

static config_type default_config();
const config_type& config() const { return config_; }
config_type config() const { return config_; }

private:
std::string filename_;
ZFITSSingleFileSingleMessageDataSource<event_type>* zfits_;
ZFITSSingleFileSingleMessageDataSource<event_type>* zfits_ = nullptr;
header_type* run_header_ = nullptr;
data_stream_type* data_stream_ = nullptr;
config_type config_;
Expand Down Expand Up @@ -137,7 +137,7 @@ class ZFITSACADACameraEventDataSource:
void set_next_index(uint64_t next_index) override;

static config_type default_config();
const config_type& config() const { return config_; }
config_type config() const { return config_; }

protected:
using BaseDataSource::source_;
Expand All @@ -147,7 +147,9 @@ class ZFITSACADACameraEventDataSource:
using BaseDataSource::open_file;

private:
void load_run_header();
config_type config_;
bool run_header_loaded_ = false;
header_type* run_header_ = nullptr;
data_stream_type* data_stream_ = nullptr;
};
Expand Down
64 changes: 15 additions & 49 deletions include/iact_data/zfits_data_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,60 +39,23 @@

namespace calin { namespace iact_data { namespace zfits_data_source {

#if 0 // UNUSED
template<typename EventMessage, typename HeaderMessage>
class ZFITSSingleFileDataSource:
public calin::iact_data::telescope_data_source::
TelescopeRandomAccessDataSourceWithRunConfig
class BasicZFITSDataSource:
public calin::iact_data::telescope_data_source::TelescopeRandomAccessDataSourceWithRunConfig,
public calin::io::data_source::FragmentList
{
public:
CALIN_TYPEALIAS(config_type,
calin::ix::iact_data::zfits_data_source::ZFITSDataSourceConfig);

ZFITSSingleFileDataSource(
calin::iact_data::zfits_acada_data_source::
ZFITSSingleFileACADACameraEventDataSource<EventMessage,HeaderMessage>* actl_zfits,
bool dont_decode_run_configuration,
calin::iact_data::acada_event_decoder::
ACADACameraEventDecoder<EventMessage,HeaderMessage>* decoder,
bool adopt_decoder = false,
bool adopt_actl_zfits = false);

ZFITSSingleFileDataSource(const std::string& filename,
calin::iact_data::acada_event_decoder::
ACADACameraEventDecoder<EventMessage,HeaderMessage>* decoder,
bool adopt_decoder = false,
const config_type& config = default_config());

virtual ~ZFITSSingleFileDataSource();

calin::ix::iact_data::telescope_event::TelescopeEvent* get_next(
uint64_t& seq_index_out, google::protobuf::Arena** arena = nullptr) override;
uint64_t size() override;
void set_next_index(uint64_t next_index) override;

calin::ix::iact_data::telescope_run_configuration::
TelescopeRunConfiguration* get_run_configuration() override;

static config_type default_config();

private:
calin::iact_data::acada_event_decoder::
ACADACameraEventDecoder<EventMessage,HeaderMessage>* decoder_;
bool adopt_decoder_ = false;
calin::iact_data::zfits_acada_data_source::
ZFITSSingleFileACADACameraEventDataSource<EventMessage,HeaderMessage>* acada_zfits_ = nullptr;
bool adopt_acada_zfits_ = false;
calin::ix::iact_data::telescope_run_configuration::
TelescopeRunConfiguration* run_config_ = nullptr;
virtual ~BasicZFITSDataSource();
virtual BasicZFITSDataSource* new_of_type(const std::string& filename, const config_type& config,
const calin::ix::iact_data::telescope_run_configuration::
TelescopeRunConfiguration* forced_run_configuration = nullptr) = 0;
};
#endif // UNUSED


template<typename MessageSet>
class ZFITSDataSource:
public calin::iact_data::telescope_data_source::TelescopeRandomAccessDataSourceWithRunConfig,
public calin::io::data_source::FragmentList
public BasicZFITSDataSource
{
public:
CALIN_TYPEALIAS(config_type,
Expand All @@ -111,10 +74,6 @@ class ZFITSDataSource:
bool adopt_decoder = false,
const config_type& config = default_config());

ZFITSDataSource(const std::string& filename,
ZFITSDataSource<MessageSet>* base_datasource,
const config_type& config = default_config());

virtual ~ZFITSDataSource();

calin::ix::iact_data::telescope_event::TelescopeEvent* get_next(
Expand All @@ -132,7 +91,14 @@ class ZFITSDataSource:

static config_type default_config();

ZFITSDataSource<MessageSet>* new_of_type(const std::string& filename, const config_type& config,
const calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* forced_run_config) override;

protected:
ZFITSDataSource(const std::string& filename,
calin::iact_data::acada_event_decoder::ACADACameraEventDecoder<MessageSet>* decoder,
const config_type& config,
const calin::ix::iact_data::telescope_run_configuration::TelescopeRunConfiguration* forced_run_config);

calin::iact_data::acada_event_decoder::
ACADACameraEventDecoder<MessageSet>* decoder_;
Expand Down
47 changes: 32 additions & 15 deletions proto/diagnostics/clock_regression.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ import "calin.proto";

package calin.ix.diagnostics.clock_regression;

enum Regressor {
PRINCIPAL_CLOCK = 0;
EVENT_NUMBER = 1;
};

enum ClockPartitionMode {
SINGLE_PARTITION = 0;
Expand All @@ -36,29 +40,37 @@ enum ClockPartitionMode {
};

message SingleClockRegressionConfig {
int32 clock_id = 1 [
(CFO).desc = "ID of camera or module clock to calculate regression on." ];
ClockPartitionMode partition_mode = 2 [
string clock_name = 1 [
(CFO).desc = "Name of the camera of module clock to calculate regression on." ];
int32 clock_id = 2 [
(CFO).desc = "ID of camera or module clock to calculate regression on. "
"This is overridden by the clock_name parameter if it is present," ];
Regressor regressor = 3 [
(CFO).desc = "Source of (independent) regressor." ];
ClockPartitionMode partition_mode = 4 [
(CFO).desc = "How to partition the clock data into bins." ];
int64 partition_bin_size = 3 [
int64 partition_bin_size = 5 [
(CFO).desc = "Size of clock partition bins, either in events or ticks of "
"principal clock." ];
int64 principal_clock_divisor = 4 [
int64 principal_clock_divisor = 6 [
(CFO).desc = "Divisor to apply to principal clock before regression. Should "
"only be used if there is a risk of overflow of the 64-bit "
"accumulators." ];
bool include_possibly_suspect_time_values = 5 [
bool include_possibly_suspect_time_values = 7 [
(CFO).desc = "Include values of the test clock that are flagged as "
"possibly suspicious in the regression." ];
};

message ClockRegressionConfig {
int32 principal_clock_id = 1 [
(CFO).desc = "Master camera clock id to use in regression." ];
bool include_possibly_suspect_principal_time_values = 2 [
string principal_clock_name = 1 [
(CFO).desc = "Master camera clock name to use in regression." ];
int32 principal_clock_id = 2 [
(CFO).desc = "Master camera clock id to use in regression. "
"This is overridden by the principal_clock_name parameter if it is present." ];
bool include_possibly_suspect_principal_time_values = 3 [
(CFO).desc = "Include values of the principal clock that are flagged as "
"possibly suspicious in the regression." ];
int32 rebalance_nevent = 3 [
int32 rebalance_nevent = 4 [
(CFO).desc = "Number of events between rebalancing, if positive, or "
"do not do rebalancing otherwise." ];

Expand Down Expand Up @@ -94,16 +106,21 @@ message ClockRegressionParameters {
};

message SingleClockRegressionResults {
int32 clock_id = 1;
map<uint32, ClockRegressionParameters> bins = 2;
string clock_name = 1;
int32 clock_id = 2;
ClockPartitionMode partition_mode = 3;
int64 partition_bin_size = 4;
int64 principal_clock_divisor = 5;
map<uint32, ClockRegressionParameters> bins = 6;
};

message ModuleClockRegressionResults {
repeated SingleClockRegressionResults modules = 1;
};

message ClockRegressionResults {
int32 principal_clock_id = 1;
repeated SingleClockRegressionResults camera_clock = 2;
repeated ModuleClockRegressionResults module_clock = 3;
string principal_clock_name = 1;
int32 principal_clock_id = 2;
repeated SingleClockRegressionResults camera_clock = 3;
repeated ModuleClockRegressionResults module_clock = 4;
};
9 changes: 8 additions & 1 deletion proto/diagnostics/run_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ message RunInfo {
(CFO).desc = "Histogram of elapsed time for forced array triggers." ];
calin.ix.math.histogram.Histogram1DData elapsed_time_histogram_trigger_ucts_aux = 17 [
(CFO).desc = "Histogram of elapsed time for UCTS triggers." ];

calin.ix.math.histogram.Histogram1DData elapsed_time_histogram_muon_candidate = 18 [
(CFO).desc = "Histogram of elapsed time for UCTS triggers." ];

uint64 num_events_missing_cdts = 20 [
(CFO).desc = "Number of events without CDTS information.",
(CFO).is_counter = true ];
Expand Down Expand Up @@ -192,6 +194,10 @@ message RunInfo {
(CFO).desc = "Number of events where TIB and UCTS triiger codes do not match.",
(CFO).is_counter = true ];

uint64 num_muon_candidate = 51 [
(CFO).desc = "Number of events with muon candidate flag bit set.",
(CFO).is_counter = true ];

int64 min_event_time = 60 [
(CFO).desc = "Minimum absolute event time from all events.",
(CFO).units = "ns",
Expand Down Expand Up @@ -276,6 +282,7 @@ message PartialRunInfo {
uint64 num_slow_control_trigger = 46 [ (CFO).is_counter = true ];
uint64 num_busy_trigger = 47 [ (CFO).is_counter = true ];
uint64 num_tib_ucts_trigger_code_mismatch = 48 [ (CFO).is_counter = true ];
uint64 num_muon_candidate = 49 [ (CFO).is_counter = true ];

int64 min_event_time = 50 [ (CFO).integration_algorithm = MIN ];
int64 max_event_time = 51 [ (CFO).integration_algorithm = MAX ];
Expand Down
6 changes: 5 additions & 1 deletion proto/iact_data/telescope_event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ message CDTSData {
(CFO).desc = "Pedestal trigger type bit set." ];
bool slow_control_trigger = 36 [
(CFO).desc = "Slow control trigger type bit set." ];
bool busy_trigger = 37 [
bool local_trigger = 37 [
(CFO).desc = "Local trigger type bit set." ];
bool muon_candidate = 38 [
(CFO).desc = "Muon candidate flag (MCF) bit set." ];
bool busy_trigger = 39 [
(CFO).desc = "Busy trigger type bit set." ];
};

Expand Down
Loading

0 comments on commit d6267b7

Please sign in to comment.