C++生产者-消费者无锁缓冲区的简单实现

C++生产者-消费者无锁缓冲区的简单实现本文将介绍如何实现无锁字节缓冲区 LockFreeByte 并通过 GoogleBenchm 对其性能进行测试

大家好,欢迎来到IT知识分享网。

1. 引言

本文将介绍如何实现无锁字节缓冲区(LockFreeBytesBuffer),并通过Google Benchmark对其性能进行测试。该缓冲区设计为单生产者单消费者(SPSC)模型,以简化线程安全问题。

2. SPSC 环形缓冲区设计思想

在环形缓冲区的上下文中,实现零拷贝的方法之一是通过直接操作缓冲区的指针,而不是复制数据。这种方式需要修改接口,使得消费者直接访问缓冲区的数据,而不是通过中间变量进行复制。

在设计LockFreeBytesBuffer时遵循了以下原则:

  1. 线程安全:使用原子操作确保数据一致性。
  2. 内存效率:使用固定大小的缓冲区,避免动态内存分配。

LockFreeBytesBuffer使用一个固定大小的环形缓冲区来存储数据,并通过原子操作管理读写索引。生产者线程将数据追加到缓冲区,消费者线程从缓冲区读取数据。

为了确保单生产者单消费者模型下的线程安全,我们需要确保:

  1. 生产者和消费者线程不会同时写入或读取同一块内存。
  2. 使用原子操作来更新索引,以确保读写操作的正确性和可见性。

3. 缓冲区类定义

下面是LockFreeBytesBuffer类的定义:

class LockFreeBytesBuffer { 
    public: static const std::size_t kBufferSize = 1024U; // 缓冲区大小 LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) { 
    std::memset(buffer_, 0, kBufferSize); } bool Append(const char* data, std::size_t length) noexcept; std::size_t BeginRead(const char** target) noexcept; void EndRead(std::size_t length) noexcept; private: char buffer_[kBufferSize]; std::atomic<std::size_t> reader_index_; std::atomic<std::size_t> writer_index_; }; 

4. 追加数据

生产者线程调用Append方法将数据写入缓冲区。该方法首先检查缓冲区是否有足够的空间存放新数据。如果有足够的空间,它将数据写入缓冲区,并更新写索引。

bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept { 
    const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed); const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire); const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize; if (length > free_space) { 
    return false; // 缓冲区满 } const std::size_t pos = current_write_index % kBufferSize; const std::size_t first_part = std::min(length, kBufferSize - pos); std::memcpy(&buffer_[pos], data, first_part); std::memcpy(&buffer_[0], data + first_part, length - first_part); writer_index_.store(current_write_index + length, std::memory_order_release); return true; } 

5. 读取数据

消费者线程调用BeginRead方法获取缓冲区中可用的数据。该方法返回一个指向缓冲区中数据的指针,并更新读索引。消费者处理完数据后,调用EndRead方法标记数据已读。

std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept { 
    const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire); const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize; if (available_data == 0U) { 
    return 0U; // 缓冲区空 } const std::size_t pos = current_read_index % kBufferSize; *target = &buffer_[pos]; return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小 } void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept { 
    const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); reader_index_.store(current_read_index + length, std::memory_order_release); } 

6. 完整代码

以下是完整的LockFreeBytesBuffer实现:

#ifndef EMBEDDED_BYTE_BUFFER_H_ #define EMBEDDED_BYTE_BUFFER_H_ #include <algorithm> #include <atomic> #include <cstddef> #include <cstring> class LockFreeBytesBuffer { 
    public: static const std::size_t kBufferSize = 1024U; // 缓冲区大小 LockFreeBytesBuffer() noexcept : reader_index_(0U), writer_index_(0U) { 
    std::memset(buffer_, 0, kBufferSize); } // 将数据追加到缓冲区 bool Append(const char* data, std::size_t length) noexcept; // 获取指向缓冲区的读指针 std::size_t BeginRead(const char** target) noexcept; // 标记数据已读 void EndRead(std::size_t length) noexcept; private: char buffer_[kBufferSize]; std::atomic<std::size_t> reader_index_; std::atomic<std::size_t> writer_index_; }; bool LockFreeBytesBuffer::Append(const char* data, std::size_t length) noexcept { 
    const std::size_t current_write_index = writer_index_.load(std::memory_order_relaxed); const std::size_t current_read_index = reader_index_.load(std::memory_order_acquire); const std::size_t free_space = (current_read_index + kBufferSize - current_write_index - 1U) % kBufferSize; if (length > free_space) { 
    return false; // 缓冲区满 } const std::size_t pos = current_write_index % kBufferSize; const std::size_t first_part = std::min(length, kBufferSize - pos); std::memcpy(&buffer_[pos], data, first_part); std::memcpy(&buffer_[0], data + first_part, length - first_part); writer_index_.store(current_write_index + length, std::memory_order_release); return true; } std::size_t LockFreeBytesBuffer::BeginRead(const char** target) noexcept { 
    const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); const std::size_t current_write_index = writer_index_.load(std::memory_order_acquire); const std::size_t available_data = (current_write_index - current_read_index) % kBufferSize; if (available_data == 0U) { 
    return 0U; // 缓冲区空 } const std::size_t pos = current_read_index % kBufferSize; *target = &buffer_[pos]; return std::min(available_data, kBufferSize - pos); // 返回第一段数据的大小 } void LockFreeBytesBuffer::EndRead(std::size_t length) noexcept { 
    const std::size_t current_read_index = reader_index_.load(std::memory_order_relaxed); reader_index_.store(current_read_index + length, std::memory_order_release); } #endif // EMBEDDED_BYTE_BUFFER_H_ 

7. 基准测试

为了评估LockFreeBytesBuffer的性能,我们使用Google Benchmark库编写了基准测试程序。该程序创建生产者和消费者线程,并在基准测试循环中进行数据读写操作。

7.1. 测试代码
#include <benchmark/benchmark.h> #include <cstring> #include <thread> #include "LockFreeBytesBuffer.h" void Producer(LockFreeBytesBuffer* buffer, bool* stop) noexcept { 
    const char data[] = "test data"; while (!*stop) { 
    buffer->Append(data, std::strlen(data)); // 模拟生产数据的延迟 std::this_thread::sleep_for(std::chrono::microseconds(10)); } } void Consumer(LockFreeBytesBuffer* buffer, bool* stop) noexcept { 
    const char* data = nullptr; while (!*stop) { 
    const std::size_t length = buffer->BeginRead(&data); if (length > 0U) { 
    // 处理数据 buffer->EndRead(length); // 模拟处理数据的延迟 std::this_thread::sleep_for(std::chrono::microseconds(10)); } } } // Benchmark函数 static void BM_Buffer(benchmark::State& state) { 
    LockFreeBytesBuffer buffer; bool stop = false; std::thread producer_thread(Producer, &buffer, &stop); std::thread consumer_thread(Consumer, &buffer, &stop); for (auto _ : state) { 
    // 在这里进行基准测试 const char data[] = "benchmark data"; buffer.Append(data, std::strlen(data)); const char* target = nullptr; const std::size_t length = buffer.BeginRead(&target); buffer.EndRead(length); } stop = true; // 停止生产者和消费者线程 producer_thread.join(); consumer_thread.join(); } // 注册benchmark BENCHMARK(BM_Buffer)->Threads(1); int main(int argc, char** argv) { 
    ::benchmark::Initialize(&argc, argv); ::benchmark::RunSpecifiedBenchmarks(); return 0; } 

8. 执行结果

安装Google Benchmark:如果还没有安装Google Benchmark,可以按照Google Benchmark GitHub上的说明进行安装。

编译: g++ -O2 buffer_benchmark.cpp -lbenchmark -lpthread -o buffer_benchmark

运行基准测试程序:

$ ./buffer_benchmark 2024-06-23T14:47:30+00:00 Running ./buffer_benchmark Run on (2 X 2096.07 MHz CPU s) CPU Caches: L1 Data 32 KiB (x2) L1 Instruction 64 KiB (x2) L2 Unified 512 KiB (x2) L3 Unified 4096 KiB (x2) Load Average: 0.00, 0.00, 0.00 *WARNING* Library was built as DEBUG. Timings may be affected. -------------------------------------------------------------- Benchmark Time CPU Iterations -------------------------------------------------------------- BM_Buffer/threads:1 10.8 ns 10.8 ns  

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/140571.html

(0)
上一篇 2025-05-25 20:10
下一篇 2025-05-25 20:15

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信