大家好,欢迎来到IT知识分享网。
1. 引言
本文将介绍如何实现无锁字节缓冲区(LockFreeBytesBuffer),并通过Google Benchmark对其性能进行测试。该缓冲区设计为单生产者单消费者(SPSC)模型,以简化线程安全问题。
2. SPSC 环形缓冲区设计思想
在环形缓冲区的上下文中,实现零拷贝的方法之一是通过直接操作缓冲区的指针,而不是复制数据。这种方式需要修改接口,使得消费者直接访问缓冲区的数据,而不是通过中间变量进行复制。
在设计LockFreeBytesBuffer时遵循了以下原则:
- 线程安全:使用原子操作确保数据一致性。
- 内存效率:使用固定大小的缓冲区,避免动态内存分配。
LockFreeBytesBuffer使用一个固定大小的环形缓冲区来存储数据,并通过原子操作管理读写索引。生产者线程将数据追加到缓冲区,消费者线程从缓冲区读取数据。
为了确保单生产者单消费者模型下的线程安全,我们需要确保:
- 生产者和消费者线程不会同时写入或读取同一块内存。
- 使用原子操作来更新索引,以确保读写操作的正确性和可见性。
3. 缓冲区类定义
下面是LockFreeBytesBuffer类的定义:
class LockFreeBytesBuffer {
public: static const std::size_t kBufferSize = 1024U; // 缓冲区大小 LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {
std::memset(buffer_, 0, kBufferSize); } bool Append(const char* data, std::size_t length) noexcept; std::size_t BeginRead(const char** target) noexcept; void EndRead(std::size_t length) noexcept; private: char buffer_[kBufferSize]; std::atomic<std::size_t> reader_index_; std::atomic<std::size_t> writer_index_; };
4. 追加数据
生产者线程调用Append
方法将数据写入缓冲区。该方法首先检查缓冲区是否有足够的空间存放新数据。如果有足够的空间,它将数据写入缓冲区,并更新写索引。
bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept {
const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed); const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire); const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize; if (length > free_space) {
return false; // 缓冲区满 } const std::size_t pos = current_write_index % kBufferSize; const std::size_t first_part = std::min(length, kBufferSize - pos); std::memcpy(&buffer_[pos], data, first_part); std::memcpy(&buffer_[0], data + first_part, length - first_part); writer_index_.store(current_write_index + length, std::memory_order_release); return true; }
5. 读取数据
消费者线程调用BeginRead
方法获取缓冲区中可用的数据。该方法返回一个指向缓冲区中数据的指针,并更新读索引。消费者处理完数据后,调用EndRead
方法标记数据已读。
std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept {
const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire); const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize; if (available_data == 0U) {
return 0U; // 缓冲区空 } const std::size_t pos = current_read_index % kBufferSize; *target = &buffer_[pos]; return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小 } void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept {
const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); reader_index_.store(current_read_index + length, std::memory_order_release); }
6. 完整代码
以下是完整的LockFreeBytesBuffer
实现:
#ifndef EMBEDDED_BYTE_BUFFER_H_ #define EMBEDDED_BYTE_BUFFER_H_ #include <algorithm> #include <atomic> #include <cstddef> #include <cstring> class LockFreeBytesBuffer {
public: static const std::size_t kBufferSize = 1024U; // 缓冲区大小 LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) {
std::memset(buffer_, 0, kBufferSize); } // 将数据追加到缓冲区 bool Append(const char* data, std::size_t length) noexcept; // 获取指向缓冲区的读指针 std::size_t BeginRead(const char** target) noexcept; // 标记数据已读 void EndRead(std::size_t length) noexcept; private: char buffer_[kBufferSize]; std::atomic<std::size_t> reader_index_; std::atomic<std::size_t> writer_index_; }; bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept {
const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed); const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire); const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize; if (length > free_space) {
return false; // 缓冲区满 } const std::size_t pos = current_write_index % kBufferSize; const std::size_t first_part = std::min(length, kBufferSize - pos); std::memcpy(&buffer_[pos], data, first_part); std::memcpy(&buffer_[0], data + first_part, length - first_part); writer_index_.store(current_write_index + length, std::memory_order_release); return true; } std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept {
const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire); const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize; if (available_data == 0U) {
return 0U; // 缓冲区空 } const std::size_t pos = current_read_index % kBufferSize; *target = &buffer_[pos]; return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小 } void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept {
const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); reader_index_.store(current_read_index + length, std::memory_order_release); } #endif // EMBEDDED_BYTE_BUFFER_H_
7. 基准测试
为了评估LockFreeBytesBuffer的性能,我们使用Google Benchmark库编写了基准测试程序。该程序创建生产者和消费者线程,并在基准测试循环中进行数据读写操作。
7.1. 测试代码
#include <benchmark/benchmark.h> #include <cstring> #include <thread> #include "LockFreeBytesBuffer.h" void Producer(LockFreeBytesBuffer* buffer, bool* stop) noexcept {
const char data[] = "test data"; while (!*stop) {
buffer->Append(data, std::strlen(data)); // 模拟生产数据的延迟 std::this_thread::sleep_for(std::chrono::microseconds(10)); } } void Consumer(LockFreeBytesBuffer* buffer, bool* stop) noexcept {
const char* data = nullptr; while (!*stop) {
const std::size_t length = buffer->BeginRead(&data); if (length > 0U) {
// 处理数据 buffer->EndRead(length); // 模拟处理数据的延迟 std::this_thread::sleep_for(std::chrono::microseconds(10)); } } } // Benchmark函数 static void BM_Buffer(benchmark::State& state) {
LockFreeBytesBuffer buffer; bool stop = false; std::thread producer_thread(Producer, &buffer, &stop); std::thread consumer_thread(Consumer, &buffer, &stop); for (auto _ : state) {
// 在这里进行基准测试 const char data[] = "benchmark data"; buffer.Append(data, std::strlen(data)); const char* target = nullptr; const std::size_t length = buffer.BeginRead(&target); buffer.EndRead(length); } stop = true; // 停止生产者和消费者线程 producer_thread.join(); consumer_thread.join(); } // 注册benchmark BENCHMARK(BM_Buffer)->Threads(1); int main(int argc, char** argv) {
::benchmark::Initialize(&argc, argv); ::benchmark::RunSpecifiedBenchmarks(); return 0; }
8. 执行结果
安装Google Benchmark:如果还没有安装Google Benchmark,可以按照Google Benchmark GitHub上的说明进行安装。
编译: g++ -O2 buffer_benchmark.cpp -lbenchmark -lpthread -o buffer_benchmark
运行基准测试程序:
$ ./buffer_benchmark 2024-06-23T14:47:30+00:00 Running ./buffer_benchmark Run on (2 X 2096.07 MHz CPU s) CPU Caches: L1 Data 32 KiB (x2) L1 Instruction 64 KiB (x2) L2 Unified 512 KiB (x2) L3 Unified 4096 KiB (x2) Load Average: 0.00, 0.00, 0.00 *WARNING* Library was built as DEBUG. Timings may be affected. -------------------------------------------------------------- Benchmark Time CPU Iterations -------------------------------------------------------------- BM_Buffer/threads:1 10.8 ns 10.8 ns
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/140571.html