C/C++高性能服务器网络库框架libhv源码解读

C/C++高性能服务器网络库框架libhv源码解读信号处理 日志 资源池 线程池错误 命令行参数 配置文件 堆 缓存 链表 队列 红黑树 MultiMap JSON tcp udp http WebSocket 事件循环 事件循环线程 事件循环线程

大家好,欢迎来到IT知识分享网。

libhv是国人开发的开源C/C++跨平台网络库,底层功能由C实现(60%),使用C++封装(30%),可用于实现C/C++高性能服务器。

开源地址:https://github.com/ithewei

libhv功能划分

libhv源码框架

按文件夹划分:base(C基础模块),cpputil(C++基础模块),event(C事件循环),evpp(C++事件循环封装),http(http封装),mqtt(MQTT协议客户端),protocol(常用协议),ssl,util(编码和加密)。

base

array.h

#define ARRAY_DECL,预定义宏实现的动态数组。

hatomic.h

原子操作,如果是C++11环境使用std标准库里的原子定义,否则自定义原子数据类型。

hbase.h

基础接口,内存的分配与释放,字符串处理,创建文件夹,文件类型路径判断,可执行文件路径,生成随机数,url 解析等。

hbuf.h

hdef.h

常用宏定义,取绝对值,数组大小,安全释放指针,字节合并与拆分,字母大小写,是否16进制等。

heap.h

hendian.h

大小端,字节序操作。

herr.h

错误码定义,使用宏定义的方式,可根据错误码查询错误描述。

hlog.h

日志记录,C语言风格,提供宏接口,没有使用独立线程,多线程通过锁确保安全;可以设置字体颜色,日志级别,日志文件大小,日志保存天数。

hmain.h

命令行参数解析,后台运行、创建pid文件,信号处理:signal_init,signal_handle,开始、停止、重启进程,master-workers多进程模式、崩溃自动重启等。

hmath.h

数学函数

hmutex.h

线程同步锁,互斥锁、读写锁、条件变量、信号量的宏定义,class MutexLock、class SpinLock、class RWLock、class LockGuard 对上述的封装。

hplatform.h

实现跨平台相关的宏

hproc.h

多进程操作

hsocket.h

socket通用接口封装

hsysinfo.h

获取系统信息

hthread.h

htime.h

日期时间,时间获取,时间格式化,时间转换等。

hversion.h

软件版本获取,版本转换。

list.h

netinet.h

IP、UDP、TCP、ICMP头定义,checksum。

queue.h

队列,宏定义实现,先进先出,#define QUEUE_DECL(type, qtype)。

rbtree.h

红黑树

cpputil

hasync.h

hdir.h

typedef struct hdir_s,接口:listdir,功能类似linux中的ls命令和windows中的dir命令。

hfile.h

class HFile,文件操作,成员:filepath,FILE* fp,接口:open、close、remove、rename、read、write、seek、tell、readall、readline、readrange。 

hmap.h

class MultiMap : public multimap<Key, Value>,把[]操作符封装成insert。

hobjectpool.h

hpath.h

class HV_EXPORT HPath,提供静态接口,常用路径操作,如exists、isdir、isfile、islink,一个完整路径的每段获取,文件名、后缀名、文件夹名等。

hscope.h

hstring.h

std::string操作封装,转换,大小写转换,反转,比较,是否包含,拼接,分隔,修剪,替换等。 class StringCaseLess : public std::less<std::string>,重载operator(),比较字符串大小。 

hthreadpool.h

class HThreadPool,线程池,成员:std::list threads,std::queue tasks;接口:start、stop、pause、resume、commit;线程池共享一个任务队列,commit 提交任务到队列,通过信号量唤醒线程执行;设置最大和最小线程数量,不忙时减少线程数量至最小,忙时增加线程数量至最大。

hurl.h

class HV_EXPORT HUrl,url格式转换。

ifconfig.h

接口:ifconfig,提供类似linux命令ifconfig的功能,返回 ifconfig_t 数组,查询内容:name、ip、mask、broadcast、mac。

iniparser.h

class HV_EXPORT IniParser,配置文件读写。

json.hpp

json操作封装(25000多行代码…)。

singleton.h

单例宏定义。

ThreadLocalStorage.h

class HV_EXPORT ThreadLocalStorage,线程私有变量 pthread_key_t 的封装,接口:set,get,setThreadName等。

event

hkcp.h

kcp封装

ikcp.h

kcp宏定义和相关接口。

hevent.h

hloop.h

iowatcher.h

epoll/poll/select/iocp(win)/kqueue(OS_BSD/OS_MAC)/evport(OS_SOLARIS)相关接口,iowatcher_init,iowatcher_add_event,iowatcher_del_event等。

nlog.h

网络日志,接口:network_logger、nlog_listen,成员:network_logger_t s_logger,创建tcp服务端,接受客户端接入,收到客户端消息写入日志。

overlapio.h

重叠IO

rudp.h

可靠用户数据报协议(RUDP)?

unpack.h

拆包接口,hio_unpack,可按照固定长度、分隔符、头部长度字段拆包。

evpp

Buffer.h

typedef HBuf Buffer; typedef std::shared_ptr BufferPtr;

Channel.h

Event.h

struct Event,struct Timer,分别封装 hevent_t 和 htimer_t。

EventLoop.h

class EventLoop : public Status,基于已有或新建 hloop_t 创建事件循环类,成员:hloop_t*,customEvents自定义任务队列,timers定时器队列,接口:run(调用 hloop_run,循环处理epoll/IO事件、定时器事件、自定义事件),stop,pause,resume,setTimer,killTimer,queueInLoop等,可以启动、停止、暂停、重新开始事件循环,增加/删除定时器,把自定义任务加入队列等。

EventLoopThread.h

class EventLoopThread : public Status,成员:EventLoopPtr loop_,std::shared_ptrstd::thread thread_;接口:start、stop,开始/停止事件循环线程,把 EventLoop 的事件循环放在 thread_ 线程里执行。

EventLoopThreadPool.h

class EventLoopThreadPool : public Status,成员:thread_num_、std::vector loop_threads_;接口:start,stop,loop(获取一个 EventLoop),nextLoop(可选轮询调度、随机调度、最小连接数调度);创建指定数量的 EventLoopThread 事件循环线程池。

Status.h

class Status,封装线程状态。

TcpClient.h

TcpServer.h

TimerThread.h

class TimerThread : public EventLoopThread,接口:setTimer,resetTimer,setInterval,killTimer,创建一个事件循环线程,用来执行定时器。

UdpClient.h

UdpServer.h

http

axios.h

模拟nodejs axios api

requests.h

模拟python requests api

HttpClient.h

class HttpClient,成员:struct http_client_s http_client_t;同步http客户端,也可进行 AsyncHttpClient 异步操作。

WebSocketClient.h

class WebSocketClient : public TcpClientTmpl,WebSocket客户端,成员:HttpParserPtr、HttpRequestPtr、HttpResponsePtr、WebSocketParserPtr。

http_page.h

http页面构造,提供接口:make_http_status_page,make_index_of_page,创建 html 格式字符串。

HttpContext.h

struct HttpContext,管理http内容,提供设置和查询接口,成员:HttpService*,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr。

HttpHandler.h

class HttpHandler,http处理主类,成员:hio_t,HttpService,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr,HttpParserPtr,HttpContextPtr,http_handler,WebSocketService,WebSocketChannelPtr,WebSocketParserPtr 等,以及相关接口。

HttpMiddleware.h

class HttpMiddleware,接口:CORS(HttpRequest* req, HttpResponse* resp)。

HttpResponseWriter.h

class HttpResponseWriter : public SocketChannel,http 响应发送类,成员:HttpResponsePtr,接口:WriteHeader、WriteCookie、WriteBody 等。

HttpServer.h

HttpService.h

http业务类 (包括api service、web service、indexof service),struct http_handler,struct http_method_handler,struct HV_EXPORT HttpService。

WebSocketServer.h

struct WebSocketService,class WebSocketServer : public HttpServer,WebSocket服务类。

grpcdef.h

gRPC封装定义,gRPC是Google公司开发的一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计;RPC是指远程过程调用。

httpdef.h

http定义,http_status(200OK,404notfound等)、http_method(POST/GET等)、http_content_type 标识码/枚举/描述的映射,并提供方法用于标识码与描述的相互转换,如404对应Not Found。

http_parser.h

http1解析实现,错误码 http_errno,结构体 http_parser、http_parser_settings,http解析方法 http_parser_init、http_parser_execute 等。

HttpParser.h

class HV_EXPORT HttpParser,http解析基类,抽象类。

Http1Parser.h

class Http1Parser : public HttpParser,http1解析类。

Http2Parser.h

http2解析类

http_content.h

HttpMessage.h

wsdef.h

websocket 接口定义,enum ws_opcode,ws_build_frame,ws_encode_key 等。

websocket_parser.h

websocket 解析基础功能,struct websocket_parser,struct websocket_parser_settings,接口:websocket_parser_init,websocket_parser_execute 等。

WebSocketChannel.h

class WebSocketChannel : public SocketChannel,WebSocket封装,成员:Buffer sendbuf_,std::mutex mutex_;接口:send、sendPing、sendPong。

WebSocketParser.h

class WebSocketParser,websocket 解析类。

mqtt

mqtt_client.h

typedef struct mqtt_client_s mqtt_client_t;class MqttClient,mqtt 协议客户端。

mqtt_protocol.h

mqtt 协议,接口:mqtt_head_pack,mqtt_head_unpack。

protocol

dns.h

dns 解析接口,nslookup。

ftp.h

ftp 协议封装,struct ftp_handle_t,接口:ftp_command_str,ftp_connect,ftp_login,ftp_quit,ftp_upload,ftp_download,ftp_download_with_cb等。

icmp.h

icmp 协议,接口:ping,模拟ping命令。

smtp.h

smtp 协议,struct mail_t,接口:smtp_command_str,smtp_status_str,smtp_build_command,sendmail。

ssl

hssl.h

SSL协议封装,结构体和接口定义。

util

base64.h

BASE64编解码

md5.h

MD5数字摘要,生成字符串或文件的 md5 值,接口:hv_md5,hv_md5_hex。

sha1.h

SHA1安全散列算法,生成字符串或文件的 sha1 值,接口:hv_sha1,hv_sha1_hex。

libhv源码测试

开源项目中有很多测试代码,例如examples/,unittest/,evpp/,本章只测试几个核心模块。

EventLoopThread 事件循环线程测试

1、事件循环线程是一个网络框架的核心, libhv 把事件循环(EventLoopPtr)和线程(std::thread)分开封装,可以先创建 EventLoopPtr ,再创建 EventLoopThread 启动线程。

2、EventLoopThread 构建时可以传入已有的 EventLoopPtr ,如果不传则自动创建一个 EventLoopPtr ;也可传入Functor pre post ,分别在循环前和循环后执行。

3、EventLoopThread 可以暂停(HLOOP_STATUS_PAUSE 线程还在,只是不处理任务),或无激活事件时退出线程,或只执行一次(HLOOP_FLAG_RUN_ONCE),或一直执行。

libhv 事件循环线程主要处理3种任务: 定时器,网络IO,自定义事件。
1、定时器:支持创建单次或循环定时器,存放在小顶堆(timers)中,堆顶即是最先超时的定时器,每次事件循环检测堆顶,超时的定时器会先出堆再重新入堆, libhv 会优先处理定时器任务。事件循环检测堆顶定时器后,会把下一次定时器超时时长 blocktime_ms 传入 epoll_wait ,避免过渡执行循环,如果没有定时器则 epoll_wait 默认100ms超时。
2、自定义任务:使用 epoll 实现事件监听,事件循环刚启动时创建事件(eventfds)和 epoll,调用 runInLoop 把自定义事件加入队列 customEvents ,并通过 eventfds 唤醒 epoll 执行循环。
3、网络IO:使用和自定义事件同一个 epoll 监听网络IO事件,比如创建tcp客户端时把fd传入epoll监听。


创建epoll调用栈

EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_create_eventfds->hread->hio_read->hio_add->iowatcher_add_event->iowatcher_init->epoll_create.

epoll_wait调用栈

EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events.

详见源码目录:evpp/EventLoopThread_test.cpp

static void onTimer(TimerID timerID, int n) { 
    printf("tid=%ld timerID=%lu time=%lus n=%d\n", hv_gettid(), (unsigned long)timerID, (unsigned long)time(NULL), n); } int EventLoopThread_Test() { 
    HV_MEMCHECK; printf("main tid=%ld\n", hv_gettid()); EventLoopThread loop_thread; const EventLoopPtr& loop = loop_thread.loop(); //创建定时器 // runEvery 1s loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1, 100)); // runAfter 10s loop->setTimeout(10000, [&loop](TimerID timerID){ 
    loop->stop(); }); loop_thread.start(); //创建异步自定义事件 loop->queueInLoop([](){ 
    printf("queueInLoop tid=%ld\n", hv_gettid()); }); loop->runInLoop([](){ 
    printf("runInLoop tid=%ld\n", hv_gettid()); }); // wait loop_thread exit loop_thread.join(); return 0; } 

定时器测试

添加定时器调用栈

任意线程->EventLoop::setInterval->setTimerInLoop->runInLoop->添加自定义事件调用栈->事件循环线程->EventLoop::setTimer->htimer_add(hloop_t::timers)->添加到 EventLoop::timers

定时器超时调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_timers->__hloop_process_timers->EVENT_PENDING(timer) 加入优先级队列->hloop_process_pendings 执行回调

PS: 刚添加定时器时不会立即更新 epoll_wait 超时时长,默认还是100ms超时(建议无事件时 epoll_wait 一直睡眠,添加定时器时立即唤醒一次线程更新 epoll_wait 超时时长).

异步自定义事件

唤醒循环调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events->EVENT_PENDING(io) 加入待处理队列 pendings

待处理队列调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->cur->cb(cur) 执行回调

hloop_s::pendings(待处理优先级队列),是个二维数组,每个优先级一个数组,优先处理高优先级的数组.

tcp客户端测试

任意线程->TcpClient::createsocket->创建fd和 hio_t.

非阻塞连接

任意线程->TcpClient::start->事件循环线程->TcpClientTmpl::startConnect->SocketChannel::startConnect->hio_connect->系统调用 connect,根据返回值判断,epoll监听可写事件,创建超时定时器,超时或连接成功后->onConnection,判断是否重连

接收数据

epoll_wait调用栈->iowatcher_poll_events->EVENT_PENDING(io)添加到优先级队列->(处理优先级队列)hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpClient::onMessage 执行自定义接收回调

数据发送

任意线程 runInLoop->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->EventLoop::onTimer->Channel::write ->hio_write->__nio_write,系统调用 send

使用 SocketChannel::write 发送,数据发送线程安全锁 hio_t::write_mutex ,支持多线程发送。

详见源码目录:evpp/TcpClient_test.cpp

int TcpClient_Test() { 
    const char* remote_host = "192.168.3.91"; int remote_port = 9090; TcpClient cli; //设置tcp组包策略为固定分隔符,只有收到OK时才会调用 onMessage 接收回调函数; //如果不设置 setUnpack ,则收到消息会直接调用 onMessage 回调。 unpack_setting_t OK_unpack_setting; memset(&OK_unpack_setting, 0, sizeof(unpack_setting_t)); OK_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH; OK_unpack_setting.mode = UNPACK_BY_DELIMITER; OK_unpack_setting.delimiter[0] = 'O'; OK_unpack_setting.delimiter[1] = 'K'; OK_unpack_setting.delimiter_bytes = 2; cli.setUnpack(&OK_unpack_setting); int connfd = cli.createsocket(remote_port, remote_host); if (connfd < 0) { 
    return -20; } printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd); //连接成功或断开连接回调 cli.onConnection = [&cli](const SocketChannelPtr& channel) { 
    std::string peeraddr = channel->peeraddr(); if (channel->isConnected()) { 
    printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); // send(time) every 3s //当连接成功时创建定时器,周期性向服务器发送消息 setInterval(3000, [channel](TimerID timerID){ 
    if (channel->isConnected()) { 
    if (channel->isWriteComplete()) { 
    char str[DATETIME_FMT_BUFLEN] = { 
   0}; datetime_t dt = datetime_now(); datetime_fmt(&dt, str); channel->write(str); } } else { 
    killTimer(timerID); } }); } else { 
    printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); } if (cli.isReconnect()) { 
    printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay); } }; //接收消息回调 cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { 
    printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; //连接失败或断开时会重连,重连周期可设置固定、线性、指数等策略 #if TEST_RECONNECT // reconnect: 1,2,4,8,10,10,10... reconn_setting_t reconn; reconn_setting_init(&reconn); reconn.min_delay = 1000; reconn.max_delay = 10000; reconn.delay_policy = 2; cli.setReconnect(&reconn); #endif #if TEST_TLS cli.withTLS(); #endif cli.start(); std::string str; while (std::getline(std::cin, str)) { 
    if (str == "close") { 
    cli.closesocket(); } else if (str == "start") { 
    cli.start(); } else if (str == "stop") { 
    cli.stop(); break; } else { 
    if (!cli.isConnected()) break; cli.send(str); } } return 0; } 

tcp服务端测试

启动监听

任意线程->TcpServer::createsocket->Listen->ListenFD->系统调用 listen

监听tcp接入事件

EventLoop::runInLoop->…->TcpServerEventLoopTmpl::startAccept->haccept->hio_accept->hio_add->iowatcher_add_event,把 listenfd 添加到 epoll 监听.

tcp客户端接入

接收数据

EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpServerEventLoopTmpl::newConnEvent->TcpServerEventLoopTmpl::onMessage,调用自定义接收回调,运行在分配的 EventLoopThreadPool 线程

数据发送

和tcp客户端类似,使用 SocketChannel::write 发送,建议使用分配的 EventLoopThreadPool 线程发送,也可在任意线程发送,线程安全锁 hio_t::write_mutex

详见源码目录:evpp/TcpServer_test.cpp

int TcpServer_Test() { 
    int port = 9090; hlog_set_level(LOG_LEVEL_DEBUG); TcpServer srv; int listenfd = srv.createsocket(port); if (listenfd < 0) { 
    return -20; } printf("server listen on port %d, listenfd=%d ...\n", port, listenfd); //当有客户端接入和断开时执行回调 srv.onConnection = [](const SocketChannelPtr& channel) { 
    std::string peeraddr = channel->peeraddr(); if (channel->isConnected()) { 
    printf("%s connected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid()); } else { 
    printf("%s disconnected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid()); } }; //收到客户端消息回调 srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { 
    // echo printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); channel->write(buf); }; //设置EventLoopThreadPool worker_threads 线程数量 srv.setThreadNum(4); //设置线程分配策略,最小连接数优先 srv.setLoadBalance(LB_LeastConnections); #if TEST_TLS hssl_ctx_opt_t ssl_opt; memset(&ssl_opt, 0, sizeof(hssl_ctx_opt_t)); ssl_opt.crt_file = "cert/server.crt"; ssl_opt.key_file = "cert/server.key"; ssl_opt.verify_peer = 0; srv.withTLS(&ssl_opt); #endif //启动监听线程和任务线程池 srv.start(); std::string str; while (std::getline(std::cin, str)) { 
    if (str == "close") { 
    srv.closesocket(); } else if (str == "start") { 
    srv.start(); } else if (str == "stop") { 
    srv.stop(); break; } else { 
    srv.broadcast(str.data(), str.size()); } } return 0; } 

udp客户端测试

int UdpClient_Test() { 
    const char* remote_host = "192.168.3.91"; int remote_port = 9090; UdpClient cli; //创建udp客户端,指定远端IP和端口号,本地端口号随机分配 int sockfd = cli.createsocket(remote_port, remote_host); if (sockfd < 0) { 
    return -20; } //接收回调 printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd); cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { 
    printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; cli.start(); // sendto(time) every 3s //创建定时器,间隔向对端发送消息 cli.loop()->setInterval(3000, [&cli](TimerID timerID) { 
    char str[DATETIME_FMT_BUFLEN] = { 
   0}; datetime_t dt = datetime_now(); datetime_fmt(&dt, str); cli.sendto(str); }); std::string str; while (std::getline(std::cin, str)) { 
    if (str == "close") { 
    cli.closesocket(); } else if (str == "start") { 
    cli.start(); } else if (str == "stop") { 
    cli.stop(); break; } else { 
    cli.sendto(str); } } return 0; } 

udp服务端测试

int UdpServer_Test() { 
    int port = 9090; UdpServer srv; //创建udp服务端,绑定IP和端口号 int bindfd = srv.createsocket(port); if (bindfd < 0) { 
    return -20; } printf("server bind on port %d, bindfd=%d ...\n", port, bindfd); //接收回调 //UdpServer 不记录接收到消息的历史udp客户端, io->peeraddr 只记录上一次接收到消息的udp客户端 //channel->write 是发消息给 io->peeraddr //UdpServer::sendto 默认发给 io->peeraddr ,指定 peeraddr 时是发给指定的udp客户端。 srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) { 
    // echo printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); channel->write(buf); }; //启动 EventLoopThread 线程 srv.start(); std::string str; while (std::getline(std::cin, str)) { 
    if (str == "close") { 
    srv.closesocket(); } else if (str == "start") { 
    srv.start(); } else if (str == "stop") { 
    srv.stop(); break; } else { 
    srv.sendto(str); } } return 0; } 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/117987.html

(0)
上一篇 2025-11-16 17:26
下一篇 2025-11-16 17:45

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信