diff --git a/src/Flow/InputNode.cc b/src/Flow/InputNode.cc new file mode 100644 index 00000000..882b10f3 --- /dev/null +++ b/src/Flow/InputNode.cc @@ -0,0 +1,149 @@ +#include "InputNode.hh" +#include "Timestamp.hh" +#include "Vector.hh" + +using namespace Flow; + +namespace { +template +Flow::Timestamp* createTimestamp(T const* data, unsigned num_samples) { + Flow::Vector* out = new Flow::Vector(num_samples); + std::copy(data, data + num_samples, out->data()); + return out; +} +} // namespace + +const Core::ParameterInt InputNode::paramSampleRate("sample-rate", "sample rate of input data", 1, 1); +const Core::Choice InputNode::choiceSampleType("s8", static_cast(Flow::SampleType::SampleTypeS8), + "u8", static_cast(Flow::SampleType::SampleTypeU8), + "s16", static_cast(Flow::SampleType::SampleTypeS16), + "u16", static_cast(Flow::SampleType::SampleTypeU16), + "f32", static_cast(Flow::SampleType::SampleTypeF32), + Core::Choice::endMark()); +const Core::ParameterChoice InputNode::paramSampleType("sample-type", &choiceSampleType, "data type of the samples", static_cast(Flow::SampleType::SampleTypeU16)); +const Core::ParameterInt InputNode::paramTrackCount("track-count", "number of tracks in the stream", 1, 1); +const Core::ParameterInt InputNode::paramBlockSize("block-size", "number of samples per flow vector", 4096, 1); + +InputNode::InputNode(const Core::Configuration& c) + : Core::Component(c), + Precursor(c), + sampleRate_(paramSampleRate(c)), + sampleType_(static_cast(paramSampleType(c))), + trackCount_(paramTrackCount(c)), + blockSize_(paramBlockSize(c)), + byteStreamAppender_(), + queue_(), + sampleCount_(0u), + eos_(true), + eosReceived_(false) { +} + +bool InputNode::setParameter(const std::string& name, const std::string& value) { + if (paramSampleRate.match(name)) { + sampleRate_ = paramSampleRate(value); + } + else if (paramSampleType.match(name)) { + sampleType_ = static_cast(paramSampleType(value)); + } + else if (paramTrackCount.match(name)) { + trackCount_ = paramTrackCount(value); + } + else if (paramBlockSize.match(name)) { + blockSize_ = paramBlockSize(value); + } + else { + return Precursor::setParameter(name, value); + } + return true; +} + +bool InputNode::configure() { + Core::Ref a(new Flow::Attributes()); + a->set("sample-rate", sampleRate_); + a->set("track-count", trackCount_); + switch (sampleType_) { + case Flow::SampleType::SampleTypeS8: + a->set("datatype", Flow::Vector::type()->name()); + break; + case Flow::SampleType::SampleTypeU8: + a->set("datatype", Flow::Vector::type()->name()); + break; + case Flow::SampleType::SampleTypeS16: + a->set("datatype", Flow::Vector::type()->name()); + break; + case Flow::SampleType::SampleTypeU16: + a->set("datatype", Flow::Vector::type()->name()); + break; + case Flow::SampleType::SampleTypeF32: + a->set("datatype", Flow::Vector::type()->name()); + break; + default: + error("unsupported sample type: %d", static_cast(sampleType_)); + return false; + } + unsigned sample_size = static_cast(sampleType_) & 0xFF; + a->set("sample-size", sample_size); + return putOutputAttributes(0, a); +} + +bool InputNode::work(Flow::PortId out) { + unsigned sample_size = static_cast(sampleType_) & 0xFF; + if ((not(eos_ and not eosReceived_)) and (queue_.size() < blockSize_ * sample_size)) { + do { // at least once call byteStreamAppender because it might remove the eos status + byteStreamAppender_(queue_); + } while (queue_.size() < blockSize_ * sample_size and not eos_); + } + if (queue_.empty()) { + if (resetSampleCount_) { + sampleCount_ = 0ul; + } + return putEos(out); + } + // remove possible partial samples at EOS + unsigned full_samples = queue_.size() / sample_size; + if (eos_ and queue_.size() % sample_size != 0ul) { + queue_.resize(full_samples * sample_size); + } + // remove possible partial samples in case of multi-channel audio + if (full_samples % trackCount_ != 0ul) { + full_samples -= full_samples % trackCount_; + if (eos_) { + queue_.resize(full_samples * sample_size); + } + } + unsigned num_samples = std::min(blockSize_, full_samples); + std::vector buffer(num_samples * sample_size); + std::copy(queue_.begin(), queue_.begin() + num_samples * sample_size, buffer.begin()); + for (size_t i = 0ul; i < num_samples * sample_size; i++) { + queue_.front() = 0; // erase data + queue_.pop_front(); + } + Flow::Timestamp* v = nullptr; + switch (sampleType_) { + case Flow::SampleType::SampleTypeS8: + v = createTimestamp(reinterpret_cast(buffer.data()), num_samples); + break; + case Flow::SampleType::SampleTypeU8: + v = createTimestamp(reinterpret_cast(buffer.data()), num_samples); + break; + case Flow::SampleType::SampleTypeS16: + v = createTimestamp(reinterpret_cast(buffer.data()), num_samples); + break; + case Flow::SampleType::SampleTypeU16: + v = createTimestamp(reinterpret_cast(buffer.data()), num_samples); + break; + case Flow::SampleType::SampleTypeF32: + v = createTimestamp(reinterpret_cast(buffer.data()), num_samples); + break; + default: + error("unsupported sample type: %d", static_cast(sampleType_)); + return false; + } + for (unsigned i = 0ul; i < num_samples; i++) { + std::fill(buffer.begin(), buffer.end(), 0); // erase data + } + v->setStartTime(Flow::Time(sampleCount_) / Flow::Time(sampleRate_) / Flow::Time(trackCount_)); + sampleCount_ += num_samples; + v->setEndTime(Flow::Time(sampleCount_) / Flow::Time(sampleRate_) / Flow::Time(trackCount_)); + return putData(out, v); +} diff --git a/src/Flow/InputNode.hh b/src/Flow/InputNode.hh new file mode 100644 index 00000000..a723db9a --- /dev/null +++ b/src/Flow/InputNode.hh @@ -0,0 +1,74 @@ +#ifndef STREAMING_INPUT_NODE_HH +#define STREAMING_INPUT_NODE_HH +#include +#include +#include "Node.hh" + +using ByteStreamAppender = std::function&)>; + +namespace Flow { + +class InputNode : public Flow::SourceNode { +public: + using Precursor = Flow::SourceNode; + static const Core::ParameterInt paramSampleRate; + static const Core::Choice choiceSampleType; + static const Core::ParameterChoice paramSampleType; + static const Core::ParameterInt paramTrackCount; + static const Core::ParameterInt paramBlockSize; + static std::string filterName(); + InputNode(const Core::Configuration& c); + virtual ~InputNode() = default; + virtual bool setParameter(const std::string& name, const std::string& value); + virtual bool configure(); + virtual bool work(Flow::PortId out); + void setByteStreamAppender(ByteStreamAppender const& bsa); + bool getEOS() const; + void setEOS(bool eos); + bool getEOSReceived() const; + void setEOSReceived(bool eosReceived); + bool getResetSampleCount() const; + void setResetSampleCount(bool resetSampleCount); + +private: + unsigned sampleRate_; + Flow::SampleType sampleType_; + unsigned trackCount_; + unsigned blockSize_; + ByteStreamAppender byteStreamAppender_; + std::deque queue_; + unsigned sampleCount_; + bool eos_; + bool eosReceived_; + bool resetSampleCount_; +}; + +// ---------- inline implementations ---------- +inline std::string InputNode::filterName() { + return "stream-input"; +} +inline void InputNode::setByteStreamAppender(ByteStreamAppender const& bsa) { + byteStreamAppender_ = bsa; +} +inline bool InputNode::getEOS() const { + return eos_; +} +inline void InputNode::setEOS(bool eos) { + eos_ = eos; +} +inline bool InputNode::getEOSReceived() const { + return eosReceived_; +} +inline void InputNode::setEOSReceived(bool eosReceived) { + eosReceived_ = eosReceived; +} +inline bool InputNode::getResetSampleCount() const { + return resetSampleCount_; +} +inline void InputNode::setResetSampleCount(bool resetSampleCount) { + resetSampleCount_ = resetSampleCount; +} + +} // namespace Flow + +#endif // INPUT_NODE_HH diff --git a/src/Flow/Makefile b/src/Flow/Makefile index 42c3faed..3362c304 100644 --- a/src/Flow/Makefile +++ b/src/Flow/Makefile @@ -33,7 +33,8 @@ LIBSPRINTFLOW_O = $(OBJDIR)/AbstractNode.o \ $(OBJDIR)/Synchronization.o \ $(OBJDIR)/Timestamp.o \ $(OBJDIR)/Vector.o \ - $(OBJDIR)/VectorTextInput.o + $(OBJDIR)/VectorTextInput.o \ + $(OBJDIR)/InputNode.o CHECK_O = $(OBJDIR)/check.o \ libSprintFlow.$(a) \ diff --git a/src/Flow/Module.cc b/src/Flow/Module.cc index 01c1c785..392ba948 100644 --- a/src/Flow/Module.cc +++ b/src/Flow/Module.cc @@ -43,6 +43,7 @@ #include "VectorSum.hh" #include "VectorTextInput.hh" #include "WarpTimeFilter.hh" +#include "InputNode.hh" // predefined datatypes #include "DataAdaptor.hh" @@ -159,4 +160,6 @@ Module_::Module_() { registry.registerDatatype>>(); registry.registerDatatype>>(); registry.registerDatatype>>(); + + registry.registerFilter(); } diff --git a/src/Flow/Network.hh b/src/Flow/Network.hh index caee1ee1..34c59c2d 100644 --- a/src/Flow/Network.hh +++ b/src/Flow/Network.hh @@ -418,6 +418,17 @@ public: const std::string& filename() const { return filename_; } + + void configureAll() { + for (auto n : nodes_) { + auto* network = dynamic_cast(n); + if (network) + network->configureAll(); + else + n->configure(); + } + } + }; // class Network /*****************************************************************************/ diff --git a/src/Flow/Types.hh b/src/Flow/Types.hh index c0d892ba..37bd211c 100644 --- a/src/Flow/Types.hh +++ b/src/Flow/Types.hh @@ -33,6 +33,17 @@ typedef f64 Time; const Time timeTolerance = (Time)1e7; const s32 timeToleranceUlp = 100000; + +// possible output types of the node +// the lower 8 bit store the size of one sample +enum class SampleType : unsigned { + SampleTypeS8 = 0x0101, + SampleTypeU8 = 0x0201, + SampleTypeS16 = 0x0302, + SampleTypeU16 = 0x0402, + SampleTypeF32 = 0x0504 +}; + } // namespace Flow #endif // _FLOW_TYPES_HH