Skip to content

Commit

Permalink
add option 'symmetric_mode' per port
Browse files Browse the repository at this point in the history
in some cases, the same number of queues is required.
for example: qemu virtio device with tap host interface.
on configure virtio device with 1 rx and 2 tx queues -> auto create 2 rx and 2 tx queues.
  • Loading branch information
Timur Aitov committed Oct 11, 2023
1 parent e9281c2 commit 92146b2
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 36 deletions.
2 changes: 1 addition & 1 deletion common/config.release.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#define CONFIG_YADECAP_MBUFS_COUNT (16 * 1024)
#define CONFIG_YADECAP_MBUF_SIZE (10 * 1024)
#define CONFIG_YADECAP_WORKER_PORTS_SIZE (1)
#define CONFIG_YADECAP_WORKER_PORTS_SIZE (8)
#define CONFIG_YADECAP_MBUFS_BURST_SIZE (32)
#define CONFIG_YADECAP_LPM4_EXTENDED_SIZE (4 * 1024)
#define CONFIG_YADECAP_LPM6_EXTENDED_SIZE (1 * 1024)
Expand Down
15 changes: 15 additions & 0 deletions dataplane/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ class permanently
transportSizes[IPPROTO_GRE] = sizeof(rte_gre_hdr);
}

bool add_worker_port(const tPortId port_id,
tQueueId queue_id)
{
if (workerPortsCount >= CONFIG_YADECAP_WORKER_PORTS_SIZE)
{
return false;
}

workerPorts[workerPortsCount].inPortId = port_id;
workerPorts[workerPortsCount].inQueueId = queue_id;
workerPortsCount++;

return true;
}

dataplane::globalBase::atomic* globalBaseAtomic;
/// Pointers to all globalBaseAtomic for each CPU socket.
///
Expand Down
20 changes: 10 additions & 10 deletions dataplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,17 +703,17 @@ common::idp::getConfig::response cControlPlane::getConfig() const
common::idp::getConfig::response response;
auto& [response_ports, response_workers, response_values] = response;

for (const auto& portIter : dataPlane->ports)
for (const auto& [port_id, port] : dataPlane->ports)
{
const tPortId& portId = portIter.first;
const std::string& interfaceName = std::get<0>(portIter.second);
const auto& etherAddress = std::get<2>(portIter.second);
const std::string pci = std::get<3>(portIter.second);

response_ports[portId] = {interfaceName,
rte_eth_dev_socket_id(portId),
etherAddress,
pci};
const auto& [interface_name, rx_queues, tx_queues_count, mac_address, pci, symmetric_mode] = port;
(void)rx_queues;
(void)tx_queues_count;
(void)symmetric_mode;

response_ports[port_id] = {interface_name,
rte_eth_dev_socket_id(port_id),
mac_address,
pci};
}

for (const auto& workerIter : dataPlane->workers)
Expand Down
111 changes: 95 additions & 16 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ eResult cDataPlane::initPorts()
for (const auto& configPortIter : config.ports)
{
const std::string& interfaceName = configPortIter.first;
const auto& [pci] = configPortIter.second;
const auto& [pci, symmetric_mode] = configPortIter.second;

tPortId portId;
if (strncmp(pci.data(), SOCK_DEV_PREFIX, strlen(SOCK_DEV_PREFIX)) == 0)
Expand All @@ -319,12 +319,8 @@ eResult cDataPlane::initPorts()
interfaceName.data(),
pci.data());

std::get<0>(ports[portId]) = interfaceName;
std::get<3>(ports[portId]) = pci;

rte_ether_addr etherAddress;
rte_eth_macaddr_get(portId, &etherAddress);
memcpy(std::get<2>(ports[portId]).data(), etherAddress.addr_bytes, 6);

rte_eth_dev_info devInfo;
rte_eth_dev_info_get(portId, &devInfo);
Expand Down Expand Up @@ -371,6 +367,8 @@ eResult cDataPlane::initPorts()
YADECAP_LOG_INFO("max_lro_pkt_size: %u\n", portConf.rxmode.max_lro_pkt_size);
YADECAP_LOG_INFO("mtu: %u\n", mtu);

std::map<tCoreId, tQueueId> rx_queues;

uint16_t rxQueuesCount = 0;
uint16_t txQueuesCount = config.workers.size() + 1; ///< tx queue '0' for control plane
for (const auto& configWorkerIter : config.workers)
Expand All @@ -381,12 +379,21 @@ eResult cDataPlane::initPorts()
{
if (interfaceName == workerInterfaceName)
{
std::get<1>(ports[portId])[coreId] = rxQueuesCount;
rx_queues[coreId] = rxQueuesCount;
rxQueuesCount++;
}
}
}

if (symmetric_mode &&
rxQueuesCount < txQueuesCount)
{
YADECAP_LOG_INFO("symmetric mode is enabled. configure rx queues size from '%u' to '%u'\n",
rxQueuesCount,
txQueuesCount);
rxQueuesCount = txQueuesCount;
}

YADECAP_LOG_INFO("rxQueuesCount: %u, txQueuesCount: %u\n", rxQueuesCount, txQueuesCount);

int ret = rte_eth_dev_configure(portId,
Expand All @@ -407,6 +414,13 @@ eResult cDataPlane::initPorts()
}

rte_eth_stats_reset(portId);

ports[portId] = {interfaceName,
rx_queues,
txQueuesCount,
etherAddress.addr_bytes,
pci,
symmetric_mode};
}

for (const auto& interface_name : remove_keys)
Expand Down Expand Up @@ -743,18 +757,75 @@ eResult cDataPlane::initWorkers()
basePermanently.nat64stateful_numa_id = rte_cpu_to_be_16(socket_id);
}

for (const auto& portIter : ports)
for (const auto& [port_id, port] : ports)
{
const auto& portId = portIter.first;
const auto& rxQueues = std::get<1>(portIter.second);
const auto& [interface_name, rx_queues, tx_queues_count, mac_address, pci, symmetric_mode] = port;
(void)mac_address;
(void)pci;

if (exist(rxQueues, coreId))
if (exist(rx_queues, coreId))
{
basePermanently.workerPorts[basePermanently.workerPortsCount].inPortId = portId;
basePermanently.workerPorts[basePermanently.workerPortsCount].inQueueId = rxQueues.find(coreId)->second;
basePermanently.workerPortsCount++;
YANET_LOG_DEBUG("worker[%u]: add_worker_port(port_id: %u, queue_id: %u)\n",
coreId,
port_id,
rx_queues.find(coreId)->second);

if (!basePermanently.add_worker_port(port_id, rx_queues.find(coreId)->second))
{
YADECAP_LOG_ERROR("can't add port '%s' to worker '%u'\n",
interface_name.data(),
coreId);
return eResult::invalidCoresCount;
}

if (symmetric_mode)
{
/// symmetric mode. add more rx queues
///
/// before
/// rx_queue_id -> core_id
/// 0 -> 1
/// 1 -> 2
/// 2 -> 3
/// 3 -> n/s
/// 4 -> n/s
/// 5 -> n/s
/// 6 -> n/s
///
/// after
/// rx_queue_id -> core_id
/// 0 -> 1
/// 1 -> 2
/// 2 -> 3
/// 3 -> 1
/// 4 -> 2
/// 5 -> 3
/// 6 -> 1

uint16_t workers_count = rx_queues.size();
uint16_t rx_queue_id = rx_queues.find(coreId)->second + workers_count;

while (rx_queue_id < tx_queues_count)
{
YANET_LOG_DEBUG("worker[%u]: add_worker_port(port_id: %u, queue_id: %u)\n",
coreId,
port_id,
rx_queue_id);

if (!basePermanently.add_worker_port(port_id, rx_queue_id))
{
YADECAP_LOG_ERROR("can't add port '%s' to worker '%u'\n",
interface_name.data(),
coreId);
return eResult::invalidCoresCount;
}

rx_queue_id += workers_count;
}
}
}
}

basePermanently.outQueueId = outQueueId;
basePermanently.ports_count = ports.size();

Expand Down Expand Up @@ -1366,14 +1437,20 @@ eResult cDataPlane::parseJsonPorts(const nlohmann::json& json)
{
std::string interfaceName = portJson["interfaceName"];
std::string pci = portJson["pci"];
bool symmetric_mode = false;

if (exist(config.ports, interfaceName))
{
YADECAP_LOG_ERROR("interfaceName '%s' already exist\n", interfaceName.data());
return eResult::invalidConfigurationFile;
}

config.ports[interfaceName] = {pci};
if (exist(portJson, "symmetric_mode"))
{
symmetric_mode = portJson["symmetric_mode"];
}

config.ports[interfaceName] = {pci, symmetric_mode};

for (tCoreId coreId : portJson["coreIds"])
{
Expand Down Expand Up @@ -1582,7 +1659,8 @@ eResult cDataPlane::checkConfig()
std::set<std::string> pcis;
for (const auto& portIter : config.ports)
{
const auto& [pci] = portIter.second;
const auto& [pci, symmetric_mode] = portIter.second;
(void)symmetric_mode;

if (exist(pcis, pci))
{
Expand Down Expand Up @@ -1674,7 +1752,8 @@ eResult cDataPlane::initEal(const std::string& binaryPath,

for (const auto& port : config.ports)
{
const auto& [pci] = port.second;
const auto& [pci, symmetric_mode] = port.second;
(void)symmetric_mode;

// Do not whitelist sock dev virtual devices
if (strncmp(pci.data(), SOCK_DEV_PREFIX, strlen(SOCK_DEV_PREFIX)) == 0)
Expand Down
14 changes: 9 additions & 5 deletions dataplane/dataplane.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ struct tDataPlaneConfig
DPDK.
*/
std::map<std::string, ///< interfaceName
std::tuple<std::string>> ///< pci
std::tuple<std::string, ///< pci
bool ///< symmetric_mode
>>
ports;

std::set<tCoreId> workerGCs;
Expand Down Expand Up @@ -292,10 +294,12 @@ class cDataPlane
tDataPlaneConfig config;

std::map<tPortId,
std::tuple<std::string, ///< interfaceName
std::map<tCoreId, tQueueId>, ///< rxQueues
std::array<uint8_t, 6>, ///< address
std::string ///< pci
std::tuple<std::string, ///< interface_name
std::map<tCoreId, tQueueId>, ///< rx_queues
unsigned int, ///< tx_queues_count
common::mac_address_t, ///< mac_address
std::string, ///< pci
bool ///< symmetric_mode
>>
ports;
std::map<tCoreId, cWorker*> workers;
Expand Down
8 changes: 4 additions & 4 deletions dataplane/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ eResult cWorker::init(const tCoreId& coreId,
this->bases[currentBaseId] = base;
this->bases[currentBaseId ^ 1] = base;

unsigned int elements_count = 2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
unsigned int elements_count = 2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_highPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_normalPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_lowPriority_size);
Expand Down Expand Up @@ -180,8 +180,8 @@ void cWorker::start()

/// @todo: prepare()

unsigned int mbufs_count_expect = 2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
unsigned int mbufs_count_expect = 2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_highPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_normalPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_lowPriority_size);
Expand Down

0 comments on commit 92146b2

Please sign in to comment.