Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Flow::InputNode to allow passing in samples without reading an audio file #75

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/Flow/InputNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#include "InputNode.hh"
#include "Timestamp.hh"
#include "Vector.hh"

using namespace Flow;

namespace {
template<typename T>
Flow::Timestamp* createTimestamp(T const* data, unsigned num_samples) {
Flow::Vector<T>* out = new Flow::Vector<T>(num_samples);
std::copy(data, data + num_samples, out->data());
return out;
}
} // namespace

const Core::ParameterInt InputNode::paramSampleRate("sample-rate", "sample rate of input data", 1, 1);
const Core::Choice InputNode::choiceSampleType("s8", static_cast<unsigned>(Flow::SampleType::SampleTypeS8),
"u8", static_cast<unsigned>(Flow::SampleType::SampleTypeU8),
"s16", static_cast<unsigned>(Flow::SampleType::SampleTypeS16),
"u16", static_cast<unsigned>(Flow::SampleType::SampleTypeU16),
"f32", static_cast<unsigned>(Flow::SampleType::SampleTypeF32),
Core::Choice::endMark());
const Core::ParameterChoice InputNode::paramSampleType("sample-type", &choiceSampleType, "data type of the samples", static_cast<unsigned>(Flow::SampleType::SampleTypeU16));
const Core::ParameterInt InputNode::paramTrackCount("track-count", "number of tracks in the stream", 1, 1);
const Core::ParameterInt InputNode::paramBlockSize("block-size", "number of samples per flow vector", 4096, 1);

InputNode::InputNode(const Core::Configuration& c)
: Core::Component(c),
Precursor(c),
sampleRate_(paramSampleRate(c)),
sampleType_(static_cast<Flow::SampleType>(paramSampleType(c))),
trackCount_(paramTrackCount(c)),
blockSize_(paramBlockSize(c)),
byteStreamAppender_(),
queue_(),
sampleCount_(0u),
eos_(true),
eosReceived_(false) {
}

bool InputNode::setParameter(const std::string& name, const std::string& value) {
if (paramSampleRate.match(name)) {
sampleRate_ = paramSampleRate(value);
}
else if (paramSampleType.match(name)) {
sampleType_ = static_cast<Flow::SampleType>(paramSampleType(value));
}
else if (paramTrackCount.match(name)) {
trackCount_ = paramTrackCount(value);
}
else if (paramBlockSize.match(name)) {
blockSize_ = paramBlockSize(value);
}
else {
return Precursor::setParameter(name, value);
}
return true;
}

bool InputNode::configure() {
Core::Ref<Flow::Attributes> a(new Flow::Attributes());
a->set("sample-rate", sampleRate_);
a->set("track-count", trackCount_);
switch (sampleType_) {
case Flow::SampleType::SampleTypeS8:
a->set("datatype", Flow::Vector<s8>::type()->name());
break;
case Flow::SampleType::SampleTypeU8:
a->set("datatype", Flow::Vector<u8>::type()->name());
break;
case Flow::SampleType::SampleTypeS16:
a->set("datatype", Flow::Vector<s16>::type()->name());
break;
case Flow::SampleType::SampleTypeU16:
a->set("datatype", Flow::Vector<u16>::type()->name());
break;
case Flow::SampleType::SampleTypeF32:
a->set("datatype", Flow::Vector<f32>::type()->name());
break;
default:
error("unsupported sample type: %d", static_cast<unsigned>(sampleType_));
return false;
}
unsigned sample_size = static_cast<unsigned>(sampleType_) & 0xFF;
a->set("sample-size", sample_size);
return putOutputAttributes(0, a);
}

bool InputNode::work(Flow::PortId out) {
unsigned sample_size = static_cast<unsigned>(sampleType_) & 0xFF;
if ((not(eos_ and not eosReceived_)) and (queue_.size() < blockSize_ * sample_size)) {
do { // at least once call byteStreamAppender because it might remove the eos status
byteStreamAppender_(queue_);
} while (queue_.size() < blockSize_ * sample_size and not eos_);
}
if (queue_.empty()) {
if (resetSampleCount_) {
sampleCount_ = 0ul;
}
return putEos(out);
}
// remove possible partial samples at EOS
unsigned full_samples = queue_.size() / sample_size;
if (eos_ and queue_.size() % sample_size != 0ul) {
queue_.resize(full_samples * sample_size);
}
// remove possible partial samples in case of multi-channel audio
if (full_samples % trackCount_ != 0ul) {
full_samples -= full_samples % trackCount_;
if (eos_) {
queue_.resize(full_samples * sample_size);
}
}
unsigned num_samples = std::min<unsigned>(blockSize_, full_samples);
std::vector<char> buffer(num_samples * sample_size);
std::copy(queue_.begin(), queue_.begin() + num_samples * sample_size, buffer.begin());
for (size_t i = 0ul; i < num_samples * sample_size; i++) {
queue_.front() = 0; // erase data
queue_.pop_front();
}
Flow::Timestamp* v = nullptr;
switch (sampleType_) {
case Flow::SampleType::SampleTypeS8:
v = createTimestamp<s8>(reinterpret_cast<s8*>(buffer.data()), num_samples);
break;
case Flow::SampleType::SampleTypeU8:
v = createTimestamp<u8>(reinterpret_cast<u8*>(buffer.data()), num_samples);
break;
case Flow::SampleType::SampleTypeS16:
v = createTimestamp<s16>(reinterpret_cast<s16*>(buffer.data()), num_samples);
break;
case Flow::SampleType::SampleTypeU16:
v = createTimestamp<u16>(reinterpret_cast<u16*>(buffer.data()), num_samples);
break;
case Flow::SampleType::SampleTypeF32:
v = createTimestamp<f32>(reinterpret_cast<f32*>(buffer.data()), num_samples);
break;
default:
error("unsupported sample type: %d", static_cast<unsigned>(sampleType_));
return false;
}
for (unsigned i = 0ul; i < num_samples; i++) {
std::fill(buffer.begin(), buffer.end(), 0); // erase data
}
v->setStartTime(Flow::Time(sampleCount_) / Flow::Time(sampleRate_) / Flow::Time(trackCount_));
sampleCount_ += num_samples;
v->setEndTime(Flow::Time(sampleCount_) / Flow::Time(sampleRate_) / Flow::Time(trackCount_));
return putData(out, v);
}
74 changes: 74 additions & 0 deletions src/Flow/InputNode.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#ifndef STREAMING_INPUT_NODE_HH
#define STREAMING_INPUT_NODE_HH
#include <functional>
#include <queue>
#include "Node.hh"

using ByteStreamAppender = std::function<void(std::deque<char>&)>;

namespace Flow {

class InputNode : public Flow::SourceNode {
public:
using Precursor = Flow::SourceNode;
static const Core::ParameterInt paramSampleRate;
static const Core::Choice choiceSampleType;
static const Core::ParameterChoice paramSampleType;
static const Core::ParameterInt paramTrackCount;
static const Core::ParameterInt paramBlockSize;
static std::string filterName();
InputNode(const Core::Configuration& c);
virtual ~InputNode() = default;
virtual bool setParameter(const std::string& name, const std::string& value);
virtual bool configure();
virtual bool work(Flow::PortId out);
void setByteStreamAppender(ByteStreamAppender const& bsa);
bool getEOS() const;
void setEOS(bool eos);
bool getEOSReceived() const;
void setEOSReceived(bool eosReceived);
bool getResetSampleCount() const;
void setResetSampleCount(bool resetSampleCount);

private:
unsigned sampleRate_;
Flow::SampleType sampleType_;
unsigned trackCount_;
unsigned blockSize_;
ByteStreamAppender byteStreamAppender_;
std::deque<char> queue_;
unsigned sampleCount_;
bool eos_;
bool eosReceived_;
bool resetSampleCount_;
};

// ---------- inline implementations ----------
inline std::string InputNode::filterName() {
return "stream-input";
}
inline void InputNode::setByteStreamAppender(ByteStreamAppender const& bsa) {
byteStreamAppender_ = bsa;
}
inline bool InputNode::getEOS() const {
return eos_;
}
inline void InputNode::setEOS(bool eos) {
eos_ = eos;
}
inline bool InputNode::getEOSReceived() const {
return eosReceived_;
}
inline void InputNode::setEOSReceived(bool eosReceived) {
eosReceived_ = eosReceived;
}
inline bool InputNode::getResetSampleCount() const {
return resetSampleCount_;
}
inline void InputNode::setResetSampleCount(bool resetSampleCount) {
resetSampleCount_ = resetSampleCount;
}

} // namespace Flow

#endif // INPUT_NODE_HH
3 changes: 2 additions & 1 deletion src/Flow/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ LIBSPRINTFLOW_O = $(OBJDIR)/AbstractNode.o \
$(OBJDIR)/Synchronization.o \
$(OBJDIR)/Timestamp.o \
$(OBJDIR)/Vector.o \
$(OBJDIR)/VectorTextInput.o
$(OBJDIR)/VectorTextInput.o \
$(OBJDIR)/InputNode.o

CHECK_O = $(OBJDIR)/check.o \
libSprintFlow.$(a) \
Expand Down
3 changes: 3 additions & 0 deletions src/Flow/Module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "VectorSum.hh"
#include "VectorTextInput.hh"
#include "WarpTimeFilter.hh"
#include "InputNode.hh"

// predefined datatypes
#include "DataAdaptor.hh"
Expand Down Expand Up @@ -159,4 +160,6 @@ Module_::Module_() {
registry.registerDatatype<TypedAggregate<Vector<f32>>>();
registry.registerDatatype<Flow::DataAdaptor<Math::Matrix<f32>>>();
registry.registerDatatype<Flow::DataAdaptor<Math::Matrix<f64>>>();

registry.registerFilter<Flow::InputNode>();
}
11 changes: 11 additions & 0 deletions src/Flow/Network.hh
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,17 @@ public:
const std::string& filename() const {
return filename_;
}

void configureAll() {
for (auto n : nodes_) {
auto* network = dynamic_cast<Flow::Network*>(n);
if (network)
network->configureAll();
else
n->configure();
}
}

}; // class Network

/*****************************************************************************/
Expand Down
11 changes: 11 additions & 0 deletions src/Flow/Types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ typedef f64 Time;

const Time timeTolerance = (Time)1e7;
const s32 timeToleranceUlp = 100000;

// possible output types of the node
// the lower 8 bit store the size of one sample
enum class SampleType : unsigned {
SampleTypeS8 = 0x0101,
SampleTypeU8 = 0x0201,
SampleTypeS16 = 0x0302,
SampleTypeU16 = 0x0402,
SampleTypeF32 = 0x0504
};

} // namespace Flow

#endif // _FLOW_TYPES_HH