Skip to content

Commit

Permalink
add option 'symmetric_mode' per port
Browse files Browse the repository at this point in the history
In the current architecture, each RX queue binds to exactly one worker (one CPU core), and the number of TX queues is equal to the number of all workers (including slowworker). But in some cases, the same number of RX and TX queues is required. For example, virtual device or hardware without RSS.

Now you can specify the option symmetric_mode: true on the port, which will create the same number of RX queues as on TX.

1 core per port:

symmetric_mode: false (default)

port0_rx0 ------> worker0 ---+--> port0_tx0
                  (core0)    \--> port1_tx0

port1_rx0 ------> worker1 ---+--> port0_tx1
                  (core1)    \--> port1_tx1

kernel ------> slowworker ---+--> port0_tx2
               (core2)       \--> port1_tx2

total: RX 1, TX 3 per port

symmetric_mode: true

port0_rx0 ---+--> worker0 ---+--> port0_tx0
port0_rx1 ---|    (core0)    \--> port1_tx0
port0_rx2 ---/

port1_rx0 ---+--> worker1 ---+--> port0_tx1
port1_rx1 ---|    (core1)    \--> port1_tx1
port1_rx2 ---/

kernel ------> slowworker ---+--> port0_tx2
               (core2)       \--> port1_tx2

total: RX 3, TX 3 per port


2 cores per port:

symmetric_mode: false (default)

port0_rx0 ------> worker0 ---+--> port0_tx0
                  (core0)    \--> port1_tx0

port0_rx1 ------> worker1 ---+--> port0_tx1
                  (core1)    \--> port1_tx1

port1_rx0 ------> worker2 ---+--> port0_tx2
                  (core2)    \--> port1_tx2

port1_rx1 ------> worker3 ---+--> port0_tx3
                  (core3)    \--> port1_tx3

kernel ------> slowworker ---+--> port0_tx4
               (core4)       \--> port1_tx4

total: RX 2, TX 5 per port

symmetric_mode: true

port0_rx0 ---+--> worker0 ---+--> port0_tx0
port0_rx2 ---|    (core0)    \--> port1_tx0
port0_rx4 ---/

port0_rx1 ---+--> worker1 ---+--> port0_tx1
port0_rx3 ---/    (core1)    \--> port1_tx1

port1_rx0 ---+--> worker2 ---+--> port0_tx2
port1_rx2 ---|    (core2)    \--> port1_tx2
port1_rx4 ---/

port1_rx1 ---+--> worker3 ---+--> port0_tx3
port1_rx3 ---/    (core3)    \--> port1_tx3

kernel ------> slowworker ---+--> port0_tx4
               (core4)       \--> port1_tx4

total: RX 5, TX 5 per port
  • Loading branch information
Timur Aitov committed Oct 12, 2023
1 parent e9281c2 commit 99be505
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 36 deletions.
2 changes: 1 addition & 1 deletion common/config.release.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#define CONFIG_YADECAP_MBUFS_COUNT (16 * 1024)
#define CONFIG_YADECAP_MBUF_SIZE (10 * 1024)
#define CONFIG_YADECAP_WORKER_PORTS_SIZE (1)
#define CONFIG_YADECAP_WORKER_PORTS_SIZE (8)
#define CONFIG_YADECAP_MBUFS_BURST_SIZE (32)
#define CONFIG_YADECAP_LPM4_EXTENDED_SIZE (4 * 1024)
#define CONFIG_YADECAP_LPM6_EXTENDED_SIZE (1 * 1024)
Expand Down
15 changes: 15 additions & 0 deletions dataplane/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,21 @@ class permanently
transportSizes[IPPROTO_GRE] = sizeof(rte_gre_hdr);
}

bool add_worker_port(const tPortId port_id,
tQueueId queue_id)
{
if (workerPortsCount >= CONFIG_YADECAP_WORKER_PORTS_SIZE)
{
return false;
}

workerPorts[workerPortsCount].inPortId = port_id;
workerPorts[workerPortsCount].inQueueId = queue_id;
workerPortsCount++;

return true;
}

dataplane::globalBase::atomic* globalBaseAtomic;
/// Pointers to all globalBaseAtomic for each CPU socket.
///
Expand Down
20 changes: 10 additions & 10 deletions dataplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,17 +703,17 @@ common::idp::getConfig::response cControlPlane::getConfig() const
common::idp::getConfig::response response;
auto& [response_ports, response_workers, response_values] = response;

for (const auto& portIter : dataPlane->ports)
for (const auto& [port_id, port] : dataPlane->ports)
{
const tPortId& portId = portIter.first;
const std::string& interfaceName = std::get<0>(portIter.second);
const auto& etherAddress = std::get<2>(portIter.second);
const std::string pci = std::get<3>(portIter.second);

response_ports[portId] = {interfaceName,
rte_eth_dev_socket_id(portId),
etherAddress,
pci};
const auto& [interface_name, rx_queues, tx_queues_count, mac_address, pci, symmetric_mode] = port;
(void)rx_queues;
(void)tx_queues_count;
(void)symmetric_mode;

response_ports[port_id] = {interface_name,
rte_eth_dev_socket_id(port_id),
mac_address,
pci};
}

for (const auto& workerIter : dataPlane->workers)
Expand Down
111 changes: 95 additions & 16 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ eResult cDataPlane::initPorts()
for (const auto& configPortIter : config.ports)
{
const std::string& interfaceName = configPortIter.first;
const auto& [pci] = configPortIter.second;
const auto& [pci, symmetric_mode] = configPortIter.second;

tPortId portId;
if (strncmp(pci.data(), SOCK_DEV_PREFIX, strlen(SOCK_DEV_PREFIX)) == 0)
Expand All @@ -319,12 +319,8 @@ eResult cDataPlane::initPorts()
interfaceName.data(),
pci.data());

std::get<0>(ports[portId]) = interfaceName;
std::get<3>(ports[portId]) = pci;

rte_ether_addr etherAddress;
rte_eth_macaddr_get(portId, &etherAddress);
memcpy(std::get<2>(ports[portId]).data(), etherAddress.addr_bytes, 6);

rte_eth_dev_info devInfo;
rte_eth_dev_info_get(portId, &devInfo);
Expand Down Expand Up @@ -371,6 +367,8 @@ eResult cDataPlane::initPorts()
YADECAP_LOG_INFO("max_lro_pkt_size: %u\n", portConf.rxmode.max_lro_pkt_size);
YADECAP_LOG_INFO("mtu: %u\n", mtu);

std::map<tCoreId, tQueueId> rx_queues;

uint16_t rxQueuesCount = 0;
uint16_t txQueuesCount = config.workers.size() + 1; ///< tx queue '0' for control plane
for (const auto& configWorkerIter : config.workers)
Expand All @@ -381,12 +379,21 @@ eResult cDataPlane::initPorts()
{
if (interfaceName == workerInterfaceName)
{
std::get<1>(ports[portId])[coreId] = rxQueuesCount;
rx_queues[coreId] = rxQueuesCount;
rxQueuesCount++;
}
}
}

if (symmetric_mode &&
rxQueuesCount < txQueuesCount)
{
YADECAP_LOG_INFO("symmetric mode is enabled. configure rx queues size from '%u' to '%u'\n",
rxQueuesCount,
txQueuesCount);
rxQueuesCount = txQueuesCount;
}

YADECAP_LOG_INFO("rxQueuesCount: %u, txQueuesCount: %u\n", rxQueuesCount, txQueuesCount);

int ret = rte_eth_dev_configure(portId,
Expand All @@ -407,6 +414,13 @@ eResult cDataPlane::initPorts()
}

rte_eth_stats_reset(portId);

ports[portId] = {interfaceName,
rx_queues,
txQueuesCount,
etherAddress.addr_bytes,
pci,
symmetric_mode};
}

for (const auto& interface_name : remove_keys)
Expand Down Expand Up @@ -743,18 +757,75 @@ eResult cDataPlane::initWorkers()
basePermanently.nat64stateful_numa_id = rte_cpu_to_be_16(socket_id);
}

for (const auto& portIter : ports)
for (const auto& [port_id, port] : ports)
{
const auto& portId = portIter.first;
const auto& rxQueues = std::get<1>(portIter.second);
const auto& [interface_name, rx_queues, tx_queues_count, mac_address, pci, symmetric_mode] = port;
(void)mac_address;
(void)pci;

if (exist(rxQueues, coreId))
if (exist(rx_queues, coreId))
{
basePermanently.workerPorts[basePermanently.workerPortsCount].inPortId = portId;
basePermanently.workerPorts[basePermanently.workerPortsCount].inQueueId = rxQueues.find(coreId)->second;
basePermanently.workerPortsCount++;
YANET_LOG_DEBUG("worker[%u]: add_worker_port(port_id: %u, queue_id: %u)\n",
coreId,
port_id,
rx_queues.find(coreId)->second);

if (!basePermanently.add_worker_port(port_id, rx_queues.find(coreId)->second))
{
YADECAP_LOG_ERROR("can't add port '%s' to worker '%u'\n",
interface_name.data(),
coreId);
return eResult::invalidCoresCount;
}

if (symmetric_mode)
{
/// symmetric mode. add more rx queues
///
/// before
/// rx_queue_id -> core_id
/// 0 -> 1
/// 1 -> 2
/// 2 -> 3
/// 3 -> n/s
/// 4 -> n/s
/// 5 -> n/s
/// 6 -> n/s
///
/// after
/// rx_queue_id -> core_id
/// 0 -> 1
/// 1 -> 2
/// 2 -> 3
/// 3 -> 1
/// 4 -> 2
/// 5 -> 3
/// 6 -> 1

uint16_t workers_count = rx_queues.size();
uint16_t rx_queue_id = rx_queues.find(coreId)->second + workers_count;

while (rx_queue_id < tx_queues_count)
{
YANET_LOG_DEBUG("worker[%u]: add_worker_port(port_id: %u, queue_id: %u)\n",
coreId,
port_id,
rx_queue_id);

if (!basePermanently.add_worker_port(port_id, rx_queue_id))
{
YADECAP_LOG_ERROR("can't add port '%s' to worker '%u'\n",
interface_name.data(),
coreId);
return eResult::invalidCoresCount;
}

rx_queue_id += workers_count;
}
}
}
}

basePermanently.outQueueId = outQueueId;
basePermanently.ports_count = ports.size();

Expand Down Expand Up @@ -1366,14 +1437,20 @@ eResult cDataPlane::parseJsonPorts(const nlohmann::json& json)
{
std::string interfaceName = portJson["interfaceName"];
std::string pci = portJson["pci"];
bool symmetric_mode = false;

if (exist(config.ports, interfaceName))
{
YADECAP_LOG_ERROR("interfaceName '%s' already exist\n", interfaceName.data());
return eResult::invalidConfigurationFile;
}

config.ports[interfaceName] = {pci};
if (exist(portJson, "symmetric_mode"))
{
symmetric_mode = portJson["symmetric_mode"];
}

config.ports[interfaceName] = {pci, symmetric_mode};

for (tCoreId coreId : portJson["coreIds"])
{
Expand Down Expand Up @@ -1582,7 +1659,8 @@ eResult cDataPlane::checkConfig()
std::set<std::string> pcis;
for (const auto& portIter : config.ports)
{
const auto& [pci] = portIter.second;
const auto& [pci, symmetric_mode] = portIter.second;
(void)symmetric_mode;

if (exist(pcis, pci))
{
Expand Down Expand Up @@ -1674,7 +1752,8 @@ eResult cDataPlane::initEal(const std::string& binaryPath,

for (const auto& port : config.ports)
{
const auto& [pci] = port.second;
const auto& [pci, symmetric_mode] = port.second;
(void)symmetric_mode;

// Do not whitelist sock dev virtual devices
if (strncmp(pci.data(), SOCK_DEV_PREFIX, strlen(SOCK_DEV_PREFIX)) == 0)
Expand Down
14 changes: 9 additions & 5 deletions dataplane/dataplane.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ struct tDataPlaneConfig
DPDK.
*/
std::map<std::string, ///< interfaceName
std::tuple<std::string>> ///< pci
std::tuple<std::string, ///< pci
bool ///< symmetric_mode
>>
ports;

std::set<tCoreId> workerGCs;
Expand Down Expand Up @@ -292,10 +294,12 @@ class cDataPlane
tDataPlaneConfig config;

std::map<tPortId,
std::tuple<std::string, ///< interfaceName
std::map<tCoreId, tQueueId>, ///< rxQueues
std::array<uint8_t, 6>, ///< address
std::string ///< pci
std::tuple<std::string, ///< interface_name
std::map<tCoreId, tQueueId>, ///< rx_queues
unsigned int, ///< tx_queues_count
common::mac_address_t, ///< mac_address
std::string, ///< pci
bool ///< symmetric_mode
>>
ports;
std::map<tCoreId, cWorker*> workers;
Expand Down
8 changes: 4 additions & 4 deletions dataplane/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ eResult cWorker::init(const tCoreId& coreId,
this->bases[currentBaseId] = base;
this->bases[currentBaseId ^ 1] = base;

unsigned int elements_count = 2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
unsigned int elements_count = 2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_highPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_normalPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_lowPriority_size);
Expand Down Expand Up @@ -180,8 +180,8 @@ void cWorker::start()

/// @todo: prepare()

unsigned int mbufs_count_expect = 2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * CONFIG_YADECAP_WORKER_PORTS_SIZE * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
unsigned int mbufs_count_expect = 2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_rx_queue_size) +
2 * basePermanently.workerPortsCount * dataPlane->getConfigValue(eConfigType::port_tx_queue_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_highPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_normalPriority_size) +
2 * dataPlane->getConfigValue(eConfigType::ring_lowPriority_size);
Expand Down

0 comments on commit 99be505

Please sign in to comment.