大家好,欢迎来到IT知识分享网。
简介
brpc主要是通过Protocol这个接口来支持多协议的。其提供了解析,序列化,处理请求与响应的函数指针,通过函数指针以达到多态的效果
Protocol
结构体定义如下
struct Protocol {
typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket, bool read_eof, const void *arg); Parse parse; typedef void (*SerializeRequest)( butil::IOBuf* request_buf, Controller* cntl, const google::protobuf::Message* request); SerializeRequest serialize_request; typedef void (*PackRequest)( butil::IOBuf* iobuf_out, SocketMessage** user_message_out, uint64_t correlation_id, const google::protobuf::MethodDescriptor* method, Controller* controller, const butil::IOBuf& request_buf, const Authenticator* auth); PackRequest pack_request; typedef void (*ProcessRequest)(InputMessageBase* msg); ProcessRequest process_request; typedef void (*ProcessResponse)(InputMessageBase* msg); ProcessResponse process_response; typedef bool (*Verify)(const InputMessageBase* msg); Verify verify; typedef bool (*ParseServerAddress)(butil::EndPoint* out, const char* server_addr_and_port); ParseServerAddress parse_server_address; typedef const std::string& (*GetMethodName)( const google::protobuf::MethodDescriptor* method, const Controller*); GetMethodName get_method_name; ConnectionType supported_connection_type; const char* name; bool support_client() const {
return serialize_request && pack_request && process_response; } bool support_server() const {
return process_request; } };
parse:服务端和客户端都需要,用于后序的处理请求和响应
serialize_request:客户端需要,用于将Message序列化二进制,用于后序的PackRequest
pack_request:客户端需要,打包请求用于后面通过套接字发送出去
process_request:服务端需要,用于处理客户端发来Parse后的请求
process_response:客户端需要,用于处理服务端回的消息(经过Parse处理后的)
verify:用于授权服务器使用
parse_server_address:可选,将服务器地址及端口转成EndPoint
get_method_name:可选,自定义方法名
support_client:判断协议是否支持客户端
support_server:判断协议是否支持服务端
supported_connection_type:支持的连接类型
name:协议名
协议管理
协议是通过ProtocolMap来管理的,其定义为
struct ProtocolMap {
ProtocolEntry entries[MAX_PROTOCOL_SIZE]; }; struct ProtocolEntry {
butil::atomic<bool> valid; Protocol protocol; ProtocolEntry() : valid(false) {
} };
ProtocolMap 通过ProtocolEntry 类型数组来管理Protocol
在brpc中,GlobalInitializeOrDieImpl函数会注册所有的Protocol
GlobalInitializeOrDie函数会调用GlobalInitializeOrDieImpl
void GlobalInitializeOrDie() {
if (pthread_once(®ister_extensions_once, GlobalInitializeOrDieImpl) != 0) {
LOG(FATAL) << "Fail to pthread_once"; exit(1); } }
在Channel::Init方法中会调用 GlobalInitializeOrDie
注册
是通过RegisterProtocol来完成
int RegisterProtocol(ProtocolType type, const Protocol& protocol) {
const size_t index = type; if (index >= MAX_PROTOCOL_SIZE) {
LOG(ERROR) << "ProtocolType=" << type << " is out of range"; return -1; } if (!protocol.support_client() && !protocol.support_server()) {
LOG(ERROR) << "ProtocolType=" << type << " neither supports client nor server"; return -1; } ProtocolEntry* const protocol_map = get_protocol_map(); BAIDU_SCOPED_LOCK(s_protocol_map_mutex); if (protocol_map[index].valid.load(butil::memory_order_relaxed)) {
LOG(ERROR) << "ProtocolType=" << type << " was registered"; return -1; } protocol_map[index].protocol = protocol; protocol_map[index].valid.store(true, butil::memory_order_release); return 0; }
get_protocol_map方法是用于创建单例ProtocolMap
inline ProtocolEntry* get_protocol_map() {
return butil::get_leaky_singleton<ProtocolMap>()->entries; }
在将协议添加到map中,使用了锁
static pthread_mutex_t s_protocol_map_mutex = PTHREAD_MUTEX_INITIALIZER;
查找type对应的协议
通过FindProtocol完成
const Protocol* FindProtocol(ProtocolType type) {
const size_t index = type; if (index >= MAX_PROTOCOL_SIZE) {
LOG(ERROR) << "ProtocolType=" << type << " is out of range"; return NULL; } ProtocolEntry* const protocol_map = get_protocol_map(); if (protocol_map[index].valid.load(butil::memory_order_acquire)) {
return &protocol_map[index].protocol; } return NULL; }
获取所有的协议
有两种方式,一种是所有的协议列表
void ListProtocols(std::vector<Protocol>* vec) {
vec->clear(); ProtocolEntry* const protocol_map = get_protocol_map(); for (size_t i = 0; i < MAX_PROTOCOL_SIZE; ++i) {
if (protocol_map[i].valid.load(butil::memory_order_acquire)) {
vec->push_back(protocol_map[i].protocol); } } }
一种是type与协议对应的列表
void ListProtocols(std::vector<std::pair<ProtocolType, Protocol> >* vec) {
vec->clear(); ProtocolEntry* const protocol_map = get_protocol_map(); for (size_t i = 0; i < MAX_PROTOCOL_SIZE; ++i) {
if (protocol_map[i].valid.load(butil::memory_order_acquire)) {
vec->emplace_back((ProtocolType)i, protocol_map[i].protocol); } } }
支持的协议
- 百度协议
Protocol baidu_protocol = {
ParseRpcMessage, SerializeRequestDefault, PackRpcRequest, ProcessRpcRequest, ProcessRpcResponse, VerifyRpcRequest, NULL, NULL, CONNECTION_TYPE_ALL, "baidu_std" }; if (RegisterProtocol(PROTOCOL_BAIDU_STD, baidu_protocol) != 0) {
exit(1); }
- 流式协议
Protocol streaming_protocol = {
ParseStreamingMessage, NULL, NULL, ProcessStreamingMessage, ProcessStreamingMessage, NULL, NULL, NULL, CONNECTION_TYPE_SINGLE, "streaming_rpc" }; if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) {
exit(1); }
- http协议
Protocol http_protocol = {
ParseHttpMessage, SerializeHttpRequest, PackHttpRequest, ProcessHttpRequest, ProcessHttpResponse, VerifyHttpRequest, ParseHttpServerAddress, GetHttpMethodName, CONNECTION_TYPE_POOLED_AND_SHORT, "http" }; if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) {
exit(1); }
- http2协议
Protocol http2_protocol = {
ParseH2Message, SerializeHttpRequest, PackH2Request, ProcessHttpRequest, ProcessHttpResponse, VerifyHttpRequest, ParseHttpServerAddress, GetHttpMethodName, CONNECTION_TYPE_SINGLE, "h2" }; if (RegisterProtocol(PROTOCOL_H2, http2_protocol) != 0) {
exit(1); }
- hulu协议
Protocol hulu_protocol = {
ParseHuluMessage, SerializeRequestDefault, PackHuluRequest, ProcessHuluRequest, ProcessHuluResponse, VerifyHuluRequest, NULL, NULL, CONNECTION_TYPE_ALL, "hulu_pbrpc" }; if (RegisterProtocol(PROTOCOL_HULU_PBRPC, hulu_protocol) != 0) {
exit(1); }
- nova协议
Protocol nova_protocol = {
ParseNsheadMessage, SerializeNovaRequest, PackNovaRequest, NULL, ProcessNovaResponse, NULL, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "nova_pbrpc" }; if (RegisterProtocol(PROTOCOL_NOVA_PBRPC, nova_protocol) != 0) {
exit(1); }
- 公有pbrpc协议
Protocol public_pbrpc_protocol = {
ParseNsheadMessage, SerializePublicPbrpcRequest, PackPublicPbrpcRequest, NULL, ProcessPublicPbrpcResponse, NULL, NULL, NULL, // public_pbrpc server implementation // doesn't support full duplex CONNECTION_TYPE_POOLED_AND_SHORT, "public_pbrpc" }; if (RegisterProtocol(PROTOCOL_PUBLIC_PBRPC, public_pbrpc_protocol) != 0) {
exit(1); }
- sofa协议
Protocol sofa_protocol = {
ParseSofaMessage, SerializeRequestDefault, PackSofaRequest, ProcessSofaRequest, ProcessSofaResponse, VerifySofaRequest, NULL, NULL, CONNECTION_TYPE_ALL, "sofa_pbrpc" }; if (RegisterProtocol(PROTOCOL_SOFA_PBRPC, sofa_protocol) != 0) {
exit(1); }
- nshead协议
Protocol nshead_protocol = {
ParseNsheadMessage, SerializeNsheadRequest, PackNsheadRequest, ProcessNsheadRequest, ProcessNsheadResponse, VerifyNsheadRequest, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "nshead" }; if (RegisterProtocol(PROTOCOL_NSHEAD, nshead_protocol) != 0) {
exit(1); }
- mc二进制协议
Protocol mc_binary_protocol = {
ParseMemcacheMessage, SerializeMemcacheRequest, PackMemcacheRequest, NULL, ProcessMemcacheResponse, NULL, NULL, GetMemcacheMethodName, CONNECTION_TYPE_ALL, "memcache" }; if (RegisterProtocol(PROTOCOL_MEMCACHE, mc_binary_protocol) != 0) {
exit(1); }
- mongo协议
Protocol mongo_protocol = {
ParseMongoMessage, NULL, NULL, ProcessMongoRequest, NULL, NULL, NULL, NULL, CONNECTION_TYPE_POOLED, "mongo" }; if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) {
exit(1); }
- redis协议
Protocol redis_protocol = {
ParseRedisMessage, SerializeRedisRequest, PackRedisRequest, ProcessRedisRequest, ProcessRedisResponse, NULL, NULL, GetRedisMethodName, CONNECTION_TYPE_ALL, "redis" }; if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) {
exit(1); }
- thrift二进制协议
Protocol thrift_binary_protocol = {
policy::ParseThriftMessage, policy::SerializeThriftRequest, policy::PackThriftRequest, policy::ProcessThriftRequest, policy::ProcessThriftResponse, policy::VerifyThriftRequest, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" }; if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) {
exit(1); }
- ubrpc_compack协议
Protocol ubrpc_compack_protocol = {
ParseNsheadMessage, SerializeUbrpcCompackRequest, PackUbrpcRequest, NULL, ProcessUbrpcResponse, NULL, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "ubrpc_compack" }; if (RegisterProtocol(PROTOCOL_UBRPC_COMPACK, ubrpc_compack_protocol) != 0) {
exit(1); }
- ubrpc_mcpack2协议
Protocol ubrpc_mcpack2_protocol = {
ParseNsheadMessage, SerializeUbrpcMcpack2Request, PackUbrpcRequest, NULL, ProcessUbrpcResponse, NULL, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "ubrpc_mcpack2" }; if (RegisterProtocol(PROTOCOL_UBRPC_MCPACK2, ubrpc_mcpack2_protocol) != 0) {
exit(1); }
- nshead_mcpack协议
Protocol nshead_mcpack_protocol = {
ParseNsheadMessage, SerializeNsheadMcpackRequest, PackNsheadMcpackRequest, NULL, ProcessNsheadMcpackResponse, NULL, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "nshead_mcpack" }; if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) {
exit(1); }
- rtmp协议
Protocol rtmp_protocol = {
ParseRtmpMessage, SerializeRtmpRequest, PackRtmpRequest, ProcessRtmpMessage, ProcessRtmpMessage, NULL, NULL, NULL, (ConnectionType)(CONNECTION_TYPE_SINGLE|CONNECTION_TYPE_SHORT), "rtmp" }; if (RegisterProtocol(PROTOCOL_RTMP, rtmp_protocol) != 0) {
exit(1); }
- esp协议
Protocol esp_protocol = {
ParseEspMessage, SerializeEspRequest, PackEspRequest, NULL, ProcessEspResponse, NULL, NULL, NULL, CONNECTION_TYPE_POOLED_AND_SHORT, "esp"}; if (RegisterProtocol(PROTOCOL_ESP, esp_protocol) != 0) {
exit(1); }
InputMessageBase
消息基类,Protocol调用parse方法后解析后的消息
InputResponse:redis消息
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/153326.html