Skip to content

Commit

Permalink
stfb: implement readout interface v2
Browse files Browse the repository at this point in the history
requires Readout v1.5
  • Loading branch information
ironMann committed Nov 13, 2020
1 parent 1bde5ca commit a277b3c
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 232 deletions.
2 changes: 1 addition & 1 deletion script/datadist_start_standalone.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ STF_BUILDER="StfBuilder"
STF_BUILDER+=" --transport shmem"
STF_BUILDER+=" --mq-config $chainConfig"
STF_BUILDER+=" --detector TPC"
STF_BUILDER+=" --detector 4"
STF_BUILDER+=" --detector-rdh 6"
STF_BUILDER+=" --io-threads $IO_THREADS"
STF_BUILDER+=" --max-buffered-stfs 2"

Expand Down
36 changes: 18 additions & 18 deletions script/discovery-flp-epn-chain.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/readout-pipe-0"
}
Expand All @@ -25,7 +25,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/readout-pipe-0",
"rateLogging": "1"
Expand All @@ -37,7 +37,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/stf-builder-pipe-0",
"rateLogging": "1"
Expand All @@ -49,7 +49,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-dpl-pipe-0",
"rateLogging": "1"
Expand All @@ -66,7 +66,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-pipe-0"
}
Expand All @@ -83,7 +83,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/readout-pipe-1"
}
Expand All @@ -99,7 +99,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/readout-pipe-1"
}
Expand All @@ -110,7 +110,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/stf-builder-pipe-1"
}
Expand All @@ -121,7 +121,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-dpl-pipe-1"
}
Expand All @@ -137,7 +137,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-pipe-1"
}
Expand All @@ -154,7 +154,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/readout-pipe-2"
}
Expand All @@ -170,7 +170,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/readout-pipe-2"
}
Expand All @@ -181,7 +181,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "push",
"method": "bind",
"address": "ipc:///tmp/stf-builder-pipe-2"
}
Expand All @@ -192,7 +192,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-dpl-pipe-2"
}
Expand All @@ -208,7 +208,7 @@
"transport": "shmem",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/stf-builder-pipe-2"
}
Expand All @@ -226,7 +226,7 @@
"transport": "zeromq",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/tf-builder-dpl-pipe-0",
"rateLogging": "1"
Expand All @@ -243,7 +243,7 @@
"transport": "zeromq",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/tf-builder-dpl-pipe-1",
"rateLogging": "1"
Expand All @@ -260,7 +260,7 @@
"transport": "zeromq",
"sockets": [
{
"type": "pair",
"type": "pull",
"method": "connect",
"address": "ipc:///tmp/tf-builder-dpl-pipe-2",
"rateLogging": "1"
Expand Down
54 changes: 21 additions & 33 deletions script/readout_cfg/readout_emu.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,9 @@ rate=-1
exitTimeout=-1


disableAggregatorSlicing=1


###################################
# data sampling
###################################

[sampling]

# enable/disable data sampling (1/0)
enabled=0

# which class of datasampling to use (FairInjector, MockInjector)
class=MockInjector

aggregatorSliceTimeout=1
aggregatorStfTimeout=2
disableAggregatorSlicing=0


###################################
Expand All @@ -55,7 +43,7 @@ equipmentType=dummy
enabled=0
eventMaxSize=20000
eventMinSize=10000
memPoolElementSize=64000
memPoolElementSize=6400

[equipment-dummy-2]
name=dummy-2
Expand Down Expand Up @@ -101,12 +89,12 @@ cardId=86:00.0

# collect data statistics
[consumer-stats]
enabled=0
monitoringUpdatePeriod=5
monitoringEnabled=0
consoleUpdate=1
consumerType=stats
monitoringConfig=file:/etc/monitoring.cfg
enabled=1
monitoringUpdatePeriod=1
monitoringEnabled=1
consoleUpdate=1



# recording to file
Expand Down Expand Up @@ -170,23 +158,23 @@ disableSending=0

# equipment that emulates CRU data format (RDH in 8k blocks, LHC clocks, etc)
[equipment-emulator-1]
equipmentType=cruEmulator
linkId=100
name=emulator-1
numberOfLinks=1
equipmentType=cruEmulator
enabled=0
linkId=1
numberOfLinks=2
memoryPoolNumberOfPages=10000
memoryPoolPageSize=128k
enabled=1

[equipment-emulator-2]
name=emulator-2
equipmentType=cruEmulator
enabled=0
eventMaxSize=8000
eventMinSize=8000
idleSleepTime=100
memoryPoolNumberOfPages=100
memoryPoolPageSize=3200k
numberOfLinks=16
linkId=200
enabled=1
linkId=2
numberOfLinks=4
PayloadSize=4096
EmptyHbRatio=0.01
#idleSleepTime=100
memoryPoolNumberOfPages=10000
memoryPoolPageSize=128k

15 changes: 8 additions & 7 deletions src/ReadoutEmulator/CruEmulator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <unistd.h>

#include <Headers/DAQID.h>

namespace o2
{
namespace DataDistribution
Expand Down Expand Up @@ -86,10 +88,11 @@ void CruLinkEmulator::linkReadoutThread()
// Each channel is reported separately to the O2
ReadoutLinkO2Data linkO2Data;

linkO2Data.mLinkDataHeader.dataOrigin = (rand() % 100 < 70) ? o2::header::gDataOriginTPC : o2::header::gDataOriginITS;
linkO2Data.mLinkDataHeader.dataDescription = o2::header::gDataDescriptionRawData;
linkO2Data.mLinkDataHeader.payloadSerializationMethod = o2::header::gSerializationMethodNone;
linkO2Data.mLinkDataHeader.subSpecification = mLinkID;
linkO2Data.mLinkHeader.mSystemId = (rand() % 100 < 70) ? o2::header::DAQID::TPC : o2::header::DAQID::ITS;
linkO2Data.mLinkHeader.mFeeId = 0xFEE0;
linkO2Data.mLinkHeader.mEquipmentId = 0xE1D0; // ?
linkO2Data.mLinkHeader.mLinkId = mLinkID;
linkO2Data.mLinkHeader.mFlags.mIsRdhFormat = 1;

for (unsigned d = 0; d < cNumDmaChunkPerSuperpage; d++, lHbfToSend--) {

Expand Down Expand Up @@ -121,15 +124,13 @@ void CruLinkEmulator::linkReadoutThread()
});
}

// record how many chunks are there in a superpage
linkO2Data.mLinkDataHeader.payloadSize = linkO2Data.mLinkRawData.size();
// Put the link info data into the send queue
mMemHandler->putLinkData(std::move(linkO2Data));

} else {
// signal lost data (no free superpages)
ReadoutLinkO2Data linkO2Data;
linkO2Data.mLinkDataHeader.subSpecification = -1;
linkO2Data.mLinkHeader.mFlags.mIsRdhFormat = 0;

mMemHandler->putLinkData(std::move(linkO2Data));
break;
Expand Down
2 changes: 1 addition & 1 deletion src/ReadoutEmulator/CruMemoryHandler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void CruMemoryHandler::put_data_buffer(const char* dataBufferAddr, const std::si
std::lock_guard<std::mutex> lock(lBucket.mLock);

if (lBucket.mUsedSuperPages.count(lSpStartAddr) == 0) {
DDLOGF(fair::Severity::ERROR, "Returned data buffer is not in the list of used superpages!");
DDLOGF_RL(200, fair::Severity::ERROR, "Returned data buffer is not in the list of used superpages!");
return;
}

Expand Down
3 changes: 1 addition & 2 deletions src/ReadoutEmulator/CruMemoryHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ struct CruDmaPacket {
};

struct ReadoutLinkO2Data {

o2::header::DataHeader mLinkDataHeader;
ReadoutSubTimeframeHeader mLinkHeader;
std::vector<CruDmaPacket> mLinkRawData;
};

Expand Down
34 changes: 19 additions & 15 deletions src/ReadoutEmulator/ReadoutDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,12 @@ void ReadoutDevice::SendingThread()
// finish an STF every ~1/45 seconds
static const auto cDataTakingStart = std::chrono::high_resolution_clock::now();
static constexpr auto cStfInterval = std::chrono::microseconds(22810);
static uint64_t lNumberSentStfs = 0;
static uint64_t lCurrentTfId = 0;
uint64_t lNumberSentStfs = 0;
uint64_t lCurrentTfId = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count() / cStfInterval.count();;

while (IsRunningState()) {

auto isStfFinished =
(std::chrono::high_resolution_clock::now() - cDataTakingStart) - (lNumberSentStfs * cStfInterval) > cStfInterval;

if (isStfFinished) {
lNumberSentStfs += 1;
lCurrentTfId = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now().time_since_epoch()).count() / cStfInterval.count();
}

ReadoutLinkO2Data lCruLinkData;
if (!mCruMemoryHandler->getLinkData(lCruLinkData)) {
DDLOG(fair::Severity::INFO) << "GetLinkData failed. Stopping interface thread.";
Expand All @@ -145,15 +138,26 @@ void ReadoutDevice::SendingThread()
mFreeSuperpagesSamples.Fill(mCruMemoryHandler->free_superpages());

// check no data signal
if (lCruLinkData.mLinkDataHeader.subSpecification ==
o2::header::DataHeader::SubSpecificationType(-1)) {
if (lCruLinkData.mLinkHeader.mFlags.mIsRdhFormat == 0) {
// DDLOG(fair::Severity::WARN) << "No Superpages left! Losing data...";
}

ReadoutSubTimeframeHeader lHBFHeader;
ReadoutSubTimeframeHeader lHBFHeader = lCruLinkData.mLinkHeader;
lHBFHeader.mTimeFrameId = lCurrentTfId;
lHBFHeader.mNumberHbf = lCruLinkData.mLinkRawData.size();
lHBFHeader.mLinkId = lCruLinkData.mLinkDataHeader.subSpecification;
lHBFHeader.mTimeframeOrbitFirst = lCurrentTfId * 256;
lHBFHeader.mTimeframeOrbitLast = (lCurrentTfId + 1) * 256 - 1;
lHBFHeader.mFlags.mLastTFMessage = 0;

// last one?
auto isStfFinished =
(std::chrono::high_resolution_clock::now() - cDataTakingStart) - (lNumberSentStfs * cStfInterval) > cStfInterval;

if (isStfFinished) {
lNumberSentStfs += 1;
lCurrentTfId = std::chrono::duration_cast<std::chrono::microseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch()).count() / cStfInterval.count();
lHBFHeader.mFlags.mLastTFMessage = 1;
}

assert(mDataBlockMsgs.empty());
mDataBlockMsgs.reserve(lCruLinkData.mLinkRawData.size());
Expand Down
2 changes: 1 addition & 1 deletion src/StfBuilder/StfBuilderDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class StfBuilderDevice : public DataDistDevice,
// DROP policy in StfBuilder is to keep most current STFs. This will ensure that all
// StfBuilders have the same set of STFs ready for distribution

DDLOGF(fair::Severity::WARNING, "Dropping oldest STF due to reaching the maximum number of buffered "
DDLOGF_RL(500, fair::Severity::WARNING, "Dropping oldest STF due to reaching the maximum number of buffered "
"STFs in the process ({}). Consider increasing the limit, or reducing the input data rate.",
I().mMaxStfsInPipeline);

Expand Down
Loading

0 comments on commit a277b3c

Please sign in to comment.