大家好,欢迎来到IT知识分享网。
libhv是国人开发的开源C/C++跨平台网络库,底层功能由C实现(60%),使用C++封装(30%),可用于实现C/C++高性能服务器。
开源地址:https://github.com/ithewei
目录
libhv功能划分
libhv源码框架
按文件夹划分:base(C基础模块),cpputil(C++基础模块),event(C事件循环),evpp(C++事件循环封装),http(http封装),mqtt(MQTT协议客户端),protocol(常用协议),ssl,util(编码和加密)。
base
array.h
#define ARRAY_DECL,预定义宏实现的动态数组。
hatomic.h
原子操作,如果是C++11环境使用std标准库里的原子定义,否则自定义原子数据类型。
hbase.h
基础接口,内存的分配与释放,字符串处理,创建文件夹,文件类型路径判断,可执行文件路径,生成随机数,url 解析等。
hbuf.h
hdef.h
常用宏定义,取绝对值,数组大小,安全释放指针,字节合并与拆分,字母大小写,是否16进制等。
heap.h
hendian.h
大小端,字节序操作。
herr.h
错误码定义,使用宏定义的方式,可根据错误码查询错误描述。
hlog.h
日志记录,C语言风格,提供宏接口,没有使用独立线程,多线程通过锁确保安全;可以设置字体颜色,日志级别,日志文件大小,日志保存天数。
hmain.h
命令行参数解析,后台运行、创建pid文件,信号处理:signal_init,signal_handle,开始、停止、重启进程,master-workers多进程模式、崩溃自动重启等。
hmath.h
数学函数
hmutex.h
线程同步锁,互斥锁、读写锁、条件变量、信号量的宏定义,class MutexLock、class SpinLock、class RWLock、class LockGuard 对上述的封装。
hplatform.h
实现跨平台相关的宏
hproc.h
多进程操作
hsocket.h
socket通用接口封装
hsysinfo.h
获取系统信息
hthread.h
htime.h
日期时间,时间获取,时间格式化,时间转换等。
hversion.h
软件版本获取,版本转换。
list.h
netinet.h
IP、UDP、TCP、ICMP头定义,checksum。
queue.h
队列,宏定义实现,先进先出,#define QUEUE_DECL(type, qtype)。
rbtree.h
红黑树
cpputil
hasync.h
hdir.h
typedef struct hdir_s,接口:listdir,功能类似linux中的ls命令和windows中的dir命令。
hfile.h
class HFile,文件操作,成员:filepath,FILE* fp,接口:open、close、remove、rename、read、write、seek、tell、readall、readline、readrange。
hmap.h
class MultiMap : public multimap<Key, Value>,把[]操作符封装成insert。
hobjectpool.h
hpath.h
class HV_EXPORT HPath,提供静态接口,常用路径操作,如exists、isdir、isfile、islink,一个完整路径的每段获取,文件名、后缀名、文件夹名等。
hscope.h
hstring.h
std::string操作封装,转换,大小写转换,反转,比较,是否包含,拼接,分隔,修剪,替换等。 class StringCaseLess : public std::less<std::string>,重载operator(),比较字符串大小。
hthreadpool.h
class HThreadPool,线程池,成员:std::list threads,std::queue tasks;接口:start、stop、pause、resume、commit;线程池共享一个任务队列,commit 提交任务到队列,通过信号量唤醒线程执行;设置最大和最小线程数量,不忙时减少线程数量至最小,忙时增加线程数量至最大。
hurl.h
class HV_EXPORT HUrl,url格式转换。
ifconfig.h
接口:ifconfig,提供类似linux命令ifconfig的功能,返回 ifconfig_t 数组,查询内容:name、ip、mask、broadcast、mac。
iniparser.h
class HV_EXPORT IniParser,配置文件读写。
json.hpp
json操作封装(25000多行代码…)。
singleton.h
单例宏定义。
ThreadLocalStorage.h
class HV_EXPORT ThreadLocalStorage,线程私有变量 pthread_key_t 的封装,接口:set,get,setThreadName等。
event
hkcp.h
kcp封装
ikcp.h
kcp宏定义和相关接口。
hevent.h
hloop.h
iowatcher.h
epoll/poll/select/iocp(win)/kqueue(OS_BSD/OS_MAC)/evport(OS_SOLARIS)相关接口,iowatcher_init,iowatcher_add_event,iowatcher_del_event等。
nlog.h
网络日志,接口:network_logger、nlog_listen,成员:network_logger_t s_logger,创建tcp服务端,接受客户端接入,收到客户端消息写入日志。
overlapio.h
重叠IO
rudp.h
可靠用户数据报协议(RUDP)?
unpack.h
拆包接口,hio_unpack,可按照固定长度、分隔符、头部长度字段拆包。
evpp
Buffer.h
typedef HBuf Buffer; typedef std::shared_ptr BufferPtr;
Channel.h
Event.h
struct Event,struct Timer,分别封装 hevent_t 和 htimer_t。
EventLoop.h
class EventLoop : public Status,基于已有或新建 hloop_t 创建事件循环类,成员:hloop_t*,customEvents自定义任务队列,timers定时器队列,接口:run(调用 hloop_run,循环处理epoll/IO事件、定时器事件、自定义事件),stop,pause,resume,setTimer,killTimer,queueInLoop等,可以启动、停止、暂停、重新开始事件循环,增加/删除定时器,把自定义任务加入队列等。
EventLoopThread.h
class EventLoopThread : public Status,成员:EventLoopPtr loop_,std::shared_ptrstd::thread thread_;接口:start、stop,开始/停止事件循环线程,把 EventLoop 的事件循环放在 thread_ 线程里执行。
EventLoopThreadPool.h
class EventLoopThreadPool : public Status,成员:thread_num_、std::vector loop_threads_;接口:start,stop,loop(获取一个 EventLoop),nextLoop(可选轮询调度、随机调度、最小连接数调度);创建指定数量的 EventLoopThread 事件循环线程池。
Status.h
class Status,封装线程状态。
TcpClient.h
TcpServer.h
TimerThread.h
class TimerThread : public EventLoopThread,接口:setTimer,resetTimer,setInterval,killTimer,创建一个事件循环线程,用来执行定时器。
UdpClient.h
UdpServer.h
http
axios.h
模拟nodejs axios api
requests.h
模拟python requests api
HttpClient.h
class HttpClient,成员:struct http_client_s http_client_t;同步http客户端,也可进行 AsyncHttpClient 异步操作。
WebSocketClient.h
class WebSocketClient : public TcpClientTmpl,WebSocket客户端,成员:HttpParserPtr、HttpRequestPtr、HttpResponsePtr、WebSocketParserPtr。
http_page.h
http页面构造,提供接口:make_http_status_page,make_index_of_page,创建 html 格式字符串。
HttpContext.h
struct HttpContext,管理http内容,提供设置和查询接口,成员:HttpService*,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr。
HttpHandler.h
class HttpHandler,http处理主类,成员:hio_t,HttpService,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr,HttpParserPtr,HttpContextPtr,http_handler,WebSocketService,WebSocketChannelPtr,WebSocketParserPtr 等,以及相关接口。
HttpMiddleware.h
class HttpMiddleware,接口:CORS(HttpRequest* req, HttpResponse* resp)。
HttpResponseWriter.h
class HttpResponseWriter : public SocketChannel,http 响应发送类,成员:HttpResponsePtr,接口:WriteHeader、WriteCookie、WriteBody 等。
HttpServer.h
HttpService.h
http业务类 (包括api service、web service、indexof service),struct http_handler,struct http_method_handler,struct HV_EXPORT HttpService。
WebSocketServer.h
struct WebSocketService,class WebSocketServer : public HttpServer,WebSocket服务类。
grpcdef.h
gRPC封装定义,gRPC是Google公司开发的一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计;RPC是指远程过程调用。
httpdef.h
http定义,http_status(200OK,404notfound等)、http_method(POST/GET等)、http_content_type 标识码/枚举/描述的映射,并提供方法用于标识码与描述的相互转换,如404对应Not Found。
http_parser.h
http1解析实现,错误码 http_errno,结构体 http_parser、http_parser_settings,http解析方法 http_parser_init、http_parser_execute 等。
HttpParser.h
class HV_EXPORT HttpParser,http解析基类,抽象类。
Http1Parser.h
class Http1Parser : public HttpParser,http1解析类。
Http2Parser.h
http2解析类
http_content.h
HttpMessage.h
wsdef.h
websocket 接口定义,enum ws_opcode,ws_build_frame,ws_encode_key 等。
websocket_parser.h
websocket 解析基础功能,struct websocket_parser,struct websocket_parser_settings,接口:websocket_parser_init,websocket_parser_execute 等。
WebSocketChannel.h
class WebSocketChannel : public SocketChannel,WebSocket封装,成员:Buffer sendbuf_,std::mutex mutex_;接口:send、sendPing、sendPong。
WebSocketParser.h
class WebSocketParser,websocket 解析类。
mqtt
mqtt_client.h
typedef struct mqtt_client_s mqtt_client_t;class MqttClient,mqtt 协议客户端。
mqtt_protocol.h
mqtt 协议,接口:mqtt_head_pack,mqtt_head_unpack。
protocol
dns.h
dns 解析接口,nslookup。
ftp.h
ftp 协议封装,struct ftp_handle_t,接口:ftp_command_str,ftp_connect,ftp_login,ftp_quit,ftp_upload,ftp_download,ftp_download_with_cb等。
icmp.h
icmp 协议,接口:ping,模拟ping命令。
smtp.h
smtp 协议,struct mail_t,接口:smtp_command_str,smtp_status_str,smtp_build_command,sendmail。
ssl
hssl.h
SSL协议封装,结构体和接口定义。
util
base64.h
BASE64编解码
md5.h
MD5数字摘要,生成字符串或文件的 md5 值,接口:hv_md5,hv_md5_hex。
sha1.h
SHA1安全散列算法,生成字符串或文件的 sha1 值,接口:hv_sha1,hv_sha1_hex。
libhv源码测试
开源项目中有很多测试代码,例如examples/,unittest/,evpp/,本章只测试几个核心模块。
EventLoopThread 事件循环线程测试
1、事件循环线程是一个网络框架的核心, libhv 把事件循环(EventLoopPtr)和线程(std::thread)分开封装,可以先创建 EventLoopPtr ,再创建 EventLoopThread 启动线程。
2、EventLoopThread 构建时可以传入已有的 EventLoopPtr ,如果不传则自动创建一个 EventLoopPtr ;也可传入Functor pre post ,分别在循环前和循环后执行。
3、EventLoopThread 可以暂停(HLOOP_STATUS_PAUSE 线程还在,只是不处理任务),或无激活事件时退出线程,或只执行一次(HLOOP_FLAG_RUN_ONCE),或一直执行。
libhv 事件循环线程主要处理3种任务: 定时器,网络IO,自定义事件。
1、定时器:支持创建单次或循环定时器,存放在小顶堆(timers)中,堆顶即是最先超时的定时器,每次事件循环检测堆顶,超时的定时器会先出堆再重新入堆, libhv 会优先处理定时器任务。事件循环检测堆顶定时器后,会把下一次定时器超时时长 blocktime_ms 传入 epoll_wait ,避免过渡执行循环,如果没有定时器则 epoll_wait 默认100ms超时。
2、自定义任务:使用 epoll 实现事件监听,事件循环刚启动时创建事件(eventfds)和 epoll,调用 runInLoop 把自定义事件加入队列 customEvents ,并通过 eventfds 唤醒 epoll 执行循环。
3、网络IO:使用和自定义事件同一个 epoll 监听网络IO事件,比如创建tcp客户端时把fd传入epoll监听。
创建epoll调用栈
EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_create_eventfds->hread->hio_read->hio_add->iowatcher_add_event->iowatcher_init->epoll_create.
epoll_wait调用栈
EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events.
详见源码目录:evpp/EventLoopThread_test.cpp
static void onTimer(TimerID timerID, int n) {
printf("tid=%ld timerID=%lu time=%lus n=%d\n", hv_gettid(), (unsigned long)timerID, (unsigned long)time(NULL), n); } int EventLoopThread_Test() {
HV_MEMCHECK; printf("main tid=%ld\n", hv_gettid()); EventLoopThread loop_thread; const EventLoopPtr& loop = loop_thread.loop(); //创建定时器 // runEvery 1s loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1, 100)); // runAfter 10s loop->setTimeout(10000, [&loop](TimerID timerID){
loop->stop(); }); loop_thread.start(); //创建异步自定义事件 loop->queueInLoop([](){
printf("queueInLoop tid=%ld\n", hv_gettid()); }); loop->runInLoop([](){
printf("runInLoop tid=%ld\n", hv_gettid()); }); // wait loop_thread exit loop_thread.join(); return 0; }
定时器测试
添加定时器调用栈
任意线程->EventLoop::setInterval->setTimerInLoop->runInLoop->添加自定义事件调用栈->事件循环线程->EventLoop::setTimer->htimer_add(hloop_t::timers)->添加到 EventLoop::timers
定时器超时调用栈
EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_timers->__hloop_process_timers->EVENT_PENDING(timer) 加入优先级队列->hloop_process_pendings 执行回调
PS: 刚添加定时器时不会立即更新 epoll_wait 超时时长,默认还是100ms超时(建议无事件时 epoll_wait 一直睡眠,添加定时器时立即唤醒一次线程更新 epoll_wait 超时时长).
异步自定义事件
唤醒循环调用栈
EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events->EVENT_PENDING(io) 加入待处理队列 pendings
待处理队列调用栈
EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->cur->cb(cur) 执行回调
hloop_s::pendings(待处理优先级队列),是个二维数组,每个优先级一个数组,优先处理高优先级的数组.
tcp客户端测试
任意线程->TcpClient::createsocket->创建fd和 hio_t.
非阻塞连接
任意线程->TcpClient::start->事件循环线程->TcpClientTmpl::startConnect->SocketChannel::startConnect->hio_connect->系统调用 connect,根据返回值判断,epoll监听可写事件,创建超时定时器,超时或连接成功后->onConnection,判断是否重连
接收数据
epoll_wait调用栈->iowatcher_poll_events->EVENT_PENDING(io)添加到优先级队列->(处理优先级队列)hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpClient::onMessage 执行自定义接收回调
数据发送
任意线程 runInLoop->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->EventLoop::onTimer->Channel::write ->hio_write->__nio_write,系统调用 send
使用 SocketChannel::write 发送,数据发送线程安全锁 hio_t::write_mutex ,支持多线程发送。
详见源码目录:evpp/TcpClient_test.cpp
int TcpClient_Test() {
const char* remote_host = "192.168.3.91"; int remote_port = 9090; TcpClient cli; //设置tcp组包策略为固定分隔符,只有收到OK时才会调用 onMessage 接收回调函数; //如果不设置 setUnpack ,则收到消息会直接调用 onMessage 回调。 unpack_setting_t OK_unpack_setting; memset(&OK_unpack_setting, 0, sizeof(unpack_setting_t)); OK_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH; OK_unpack_setting.mode = UNPACK_BY_DELIMITER; OK_unpack_setting.delimiter[0] = 'O'; OK_unpack_setting.delimiter[1] = 'K'; OK_unpack_setting.delimiter_bytes = 2; cli.setUnpack(&OK_unpack_setting); int connfd = cli.createsocket(remote_port, remote_host); if (connfd < 0) {
return -20; } printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd); //连接成功或断开连接回调 cli.onConnection = [&cli](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr(); if (channel->isConnected()) {
printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); // send(time) every 3s //当连接成功时创建定时器,周期性向服务器发送消息 setInterval(3000, [channel](TimerID timerID){
if (channel->isConnected()) {
if (channel->isWriteComplete()) {
char str[DATETIME_FMT_BUFLEN] = {
0}; datetime_t dt = datetime_now(); datetime_fmt(&dt, str); channel->write(str); } } else {
killTimer(timerID); } }); } else {
printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd()); } if (cli.isReconnect()) {
printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay); } }; //接收消息回调 cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; //连接失败或断开时会重连,重连周期可设置固定、线性、指数等策略 #if TEST_RECONNECT // reconnect: 1,2,4,8,10,10,10... reconn_setting_t reconn; reconn_setting_init(&reconn); reconn.min_delay = 1000; reconn.max_delay = 10000; reconn.delay_policy = 2; cli.setReconnect(&reconn); #endif #if TEST_TLS cli.withTLS(); #endif cli.start(); std::string str; while (std::getline(std::cin, str)) {
if (str == "close") {
cli.closesocket(); } else if (str == "start") {
cli.start(); } else if (str == "stop") {
cli.stop(); break; } else {
if (!cli.isConnected()) break; cli.send(str); } } return 0; }
tcp服务端测试
启动监听
任意线程->TcpServer::createsocket->Listen->ListenFD->系统调用 listen
监听tcp接入事件
EventLoop::runInLoop->…->TcpServerEventLoopTmpl::startAccept->haccept->hio_accept->hio_add->iowatcher_add_event,把 listenfd 添加到 epoll 监听.
tcp客户端接入
接收数据
EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpServerEventLoopTmpl::newConnEvent->TcpServerEventLoopTmpl::onMessage,调用自定义接收回调,运行在分配的 EventLoopThreadPool 线程
数据发送
和tcp客户端类似,使用 SocketChannel::write 发送,建议使用分配的 EventLoopThreadPool 线程发送,也可在任意线程发送,线程安全锁 hio_t::write_mutex
详见源码目录:evpp/TcpServer_test.cpp
int TcpServer_Test() {
int port = 9090; hlog_set_level(LOG_LEVEL_DEBUG); TcpServer srv; int listenfd = srv.createsocket(port); if (listenfd < 0) {
return -20; } printf("server listen on port %d, listenfd=%d ...\n", port, listenfd); //当有客户端接入和断开时执行回调 srv.onConnection = [](const SocketChannelPtr& channel) {
std::string peeraddr = channel->peeraddr(); if (channel->isConnected()) {
printf("%s connected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid()); } else {
printf("%s disconnected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid()); } }; //收到客户端消息回调 srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); channel->write(buf); }; //设置EventLoopThreadPool worker_threads 线程数量 srv.setThreadNum(4); //设置线程分配策略,最小连接数优先 srv.setLoadBalance(LB_LeastConnections); #if TEST_TLS hssl_ctx_opt_t ssl_opt; memset(&ssl_opt, 0, sizeof(hssl_ctx_opt_t)); ssl_opt.crt_file = "cert/server.crt"; ssl_opt.key_file = "cert/server.key"; ssl_opt.verify_peer = 0; srv.withTLS(&ssl_opt); #endif //启动监听线程和任务线程池 srv.start(); std::string str; while (std::getline(std::cin, str)) {
if (str == "close") {
srv.closesocket(); } else if (str == "start") {
srv.start(); } else if (str == "stop") {
srv.stop(); break; } else {
srv.broadcast(str.data(), str.size()); } } return 0; }
udp客户端测试
int UdpClient_Test() {
const char* remote_host = "192.168.3.91"; int remote_port = 9090; UdpClient cli; //创建udp客户端,指定远端IP和端口号,本地端口号随机分配 int sockfd = cli.createsocket(remote_port, remote_host); if (sockfd < 0) {
return -20; } //接收回调 printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd); cli.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); }; cli.start(); // sendto(time) every 3s //创建定时器,间隔向对端发送消息 cli.loop()->setInterval(3000, [&cli](TimerID timerID) {
char str[DATETIME_FMT_BUFLEN] = {
0}; datetime_t dt = datetime_now(); datetime_fmt(&dt, str); cli.sendto(str); }); std::string str; while (std::getline(std::cin, str)) {
if (str == "close") {
cli.closesocket(); } else if (str == "start") {
cli.start(); } else if (str == "stop") {
cli.stop(); break; } else {
cli.sendto(str); } } return 0; }
udp服务端测试
int UdpServer_Test() {
int port = 9090; UdpServer srv; //创建udp服务端,绑定IP和端口号 int bindfd = srv.createsocket(port); if (bindfd < 0) {
return -20; } printf("server bind on port %d, bindfd=%d ...\n", port, bindfd); //接收回调 //UdpServer 不记录接收到消息的历史udp客户端, io->peeraddr 只记录上一次接收到消息的udp客户端 //channel->write 是发消息给 io->peeraddr //UdpServer::sendto 默认发给 io->peeraddr ,指定 peeraddr 时是发给指定的udp客户端。 srv.onMessage = [](const SocketChannelPtr& channel, Buffer* buf) {
// echo printf("< %.*s\n", (int)buf->size(), (char*)buf->data()); channel->write(buf); }; //启动 EventLoopThread 线程 srv.start(); std::string str; while (std::getline(std::cin, str)) {
if (str == "close") {
srv.closesocket(); } else if (str == "start") {
srv.start(); } else if (str == "stop") {
srv.stop(); break; } else {
srv.sendto(str); } } return 0; }
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/117987.html