RPC 框架

RPC 框架学习记录 rpc 框架

大家好,欢迎来到IT知识分享网。

RPC 全称 Remote Procedure Call——远程过程调用。

  • RPC技术简单说就是为了解决远程调用服务的一种技术,使得调用者像调用本地服务一样方便透明。
  • RPC是一种通过网络从远程计算机程序上请求服务,不需要了解底层网络技术的协议。

集群和分布式

集群:集群(cluster)是指在多台不同的服务器中部署相同应用或服务模块,构成一个集群,通过负载均衡设备对外提供服务。在不同的服务器中部署相同的功能。

分布式:指在多台不同的服务器中部署不同的服务模块,通过远程调用协同工作,对外提供服务。不同服务器中部署不同的功能,通过网络连接起来,组成一个完整的系统。

分布式是以缩短单个任务的执行时间来提升效率的,而集群则是通过提高单位时间内执行的任务数来提升效率。

为什么要有RPC?

服务化:微服务化,跨平台的服务之间远程调用;
分布式系统架构:分布式服务跨机器进行远程调用;
服务可重用:开发一个公共能力服务,供多个服务远程调用。
系统间交互调用:两台服务器A、B,服务器 A 上的应用 a 需要调用服务器 B 上的应用 b 提供的方法,而应用 a 和应用 b 不在一个内存空间,不能直接调用,此时,需要通过网络传输来表达需要调用的语义及传输调用的数据。


使用场景

  • 大型网站:内部涉及多个子系统,服务、接口较多。
  • 注册发现机制:如Nacos、Dubbo等,一般都有注册中心,服务有多个实例,调用方调用的哪个实例无感知。
  • 安全性:不暴露资源。
  • 服务化治理:微服务架构、分布式架构。

常用RPC技术或框架

  • 应用级的服务框架:阿里的 Dubbo/Dubbox、Google gRPC、Spring Boot/Spring Cloud。
  • 远程通信协议:RMI、Socket、SOAP(HTTP XML)、REST(HTTP JSON)。
  • 通信框架:MINA 和 Netty

RPC 原理

在这里插入图片描述
RPC 是指计算机 A 上的进程,调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

远程过程调用采用客户机/服务器(C/S)模式。请求程序就是一个客户机,而服务提供程序就是一台服务器。和常规或本地过程调用一样,远程过程调用是同步操作,在远程过程结果返回之前,需要暂时中止请求程序。使用相同地址空间的低权进程或低权线程允许同时运行多个远程过程调用。

在这里插入图片描述

RPC五大模块及交互关系

在这里插入图片描述

  • user(客户端)
  • user-stub(客户端存根)
  • RPCRuntime(RPC通信包)
  • server-stub(服务端存根)
  • server(服务端)

用户端:当用户希望进行远程调用时,实际上是调用的本地 user-stub 中相应的代码。user-stub 负责将调用的规范和参数打包成一个或多个包,通过 RPCRuntime(RPC通信包)传输到被调用机器。
服务端:服务端接收到这些数据包后,对应的 RPCRuntime(RPC通信包)将它们传递给 server-stub。然后 server-stub 将它们解包,并调用对应的本地实现。同时用户端的调用进程挂起,等待服务端返回结果包。当服务端调用完成时,返回到 server-stub,并通过服务端的RPCRuntime 将结果传回用户端对应的 RPCRuntime(RPC通信包)挂起的进程中。然后通过 user-stub 解包,最后将它们返回给用户。

如果把用户端和服务端代码放在一台机器上,直接绑定在一起,不使用 user-stub 和 server-stub,程序仍然可以工作。RPCRuntime(RPC通信包)是Cedar系统的一个标准部分,因此不用程序员编写通信相关代码,但是 user-stub 和 server-stub 是由一个叫做 Lupine 的程序自动生成的,也不需要程序员编写对应包处理层面的代码。

RPC 业务实现

Callee 对外提供远端可调用方法 LoginRegister,要在 user.proto 中进行注册(service UserServiceRpc)。在Callee中的Login方法接受 LoginRequest message,执行完逻辑后返回LoginResponse message 给 Caller。

Caller 可以调用 UserServiceRpc_Stub::Login发起远端调用,而 Callee 则继承UserServiceRpc类并重写UserServiceRpc::Login函数,实现Login函数的处理逻辑。这是 protobuf 提供的接口,需要服务方法提供者重写这个 Login 函数。

class UserService : public fixbug::UserServiceRpc{ 
    //使用在rpc服务发布端(rpc服务提供者) public: bool Login(std::string name, std::string pwd){ 
    std::cout << "doing local service : LOGIN " << std::endl; std::cout << "name: " << name << " pwd: "<< pwd << std::endl; return true; } //新增的测试方法 bool Register(uint32_t id,std::string name,std::string pwd) { 
    std::cout << "doing local service: Register" << std::endl; std::cout << "id:" << id <<" name:" << name << " pwd:" << pwd << std::endl; return true; } // 重写基类UserServiceRpc的虚函数 下面这些方法都是框架直接调用的 // 1. caller --> Login(LoginRequest) --> muduo --> callee // 2. callee --> Login(LoginRequest) --> 交到下面重写的这个Login方法上了 void Login(::google::protobuf::RpcController* controller, const ::fixbug::LoginRequest* request, ::fixbug::LoginResponse* response, ::google::protobuf::Closure* done) { 
    //框架给业务.上报了请求参数LoginRequest,应用获取相应数据做本地业务 std::string name = request->name(); std::string pwd = request->pwd(); // 做本地业务 bool login_result = Login(name, pwd); // 把响应写入包括错误码、 错误消息、返回值 fixbug::ResultCode *code = response->mutable_result(); code->set_errcode(0); code->set_errmsg(""); response->set_success(login_result); //执行回调操作执行, 响应对象数据的序列化和网络发送 (都是由框架来完成的) done->Run(); } void Register(::google::protobuf::RpcController* controller, const ::fixbug::RegisterRequest* request, ::fixbug::RegisterResponse* response, ::google::protobuf::Closure* done) { 
    uint32_t id = request->id(); std::string name = request->name(); std::string pwd = request->pwd(); //开始做本地业务 bool ret = Register(id, name, pwd); //填充回调结果 response->mutable_result()->set_errcode(0); response->mutable_result()->set_errmsg(""); response->set_success(ret); done->Run(); } }; 

RPC 服务提供

  1. RpcProvider 是一个服务器,接收来自 rpc 客户端的请求,且能在一定程度上承载高并发的需求(考虑多个 rpcClient 给当前 rpcProvider 发送 rpc 调用请求)。
  2. 一个 rpcclient 发送请求过来调用一个远程方法,那么 rpcProvider 收到这个请求之后,能根据请求所携带的数据自动调用发布的 rpc 方法,那么请求必须包含服务名、方法名、以及参数,这样 rpcProvider 才知道怎么调用。即 buffer = service_name + method_name + args。
//框架提供的专门负责发布rpc服务的网络对象类 class RpcProvider{ 
    public: //这里是框架提供给外部使用的,可以发布rpc方法的函数接口 //此处应该使用Service类,而不是指定某个方法 void NotifyService(google::protobuf::Service *service); //启动rpc服务节点,开始提供rpc远程网络调用服务 void Run(); private: //组合 EventLoop muduo::net::EventLoop m_eventLoop; //service服务类型信息 struct ServiceInfo { 
    google::protobuf::Service *m_service;//保存服务对象 std::unordered_map<std::string,const google::protobuf::MethodDescriptor*> m_methodMap;//保存服务方法 }; //存储注册成功的服务对象和其服务方法的所有信息 std::unordered_map<std::string,ServiceInfo> m_serviceMap; // 新的 socket 连接时的回调 void OnConnection(const muduo::net::TcpConnectionPtr &conn); // 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应 void OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp); //Closure的回调操作,用于序列化RPC的响应和网络发送 void SendRpcResponse(const muduo::net::TcpConnectionPtr&,google::protobuf::Message* ); }; 
int main(int argc, char *argv[]) { 
    //先调用框架的初始化操作 provider -i config.conf,从init方法读取配置服务,比如IP地址和端口号 MprpcApplication::Init(argc,argv); //项目提供者,让我们可以发布该服务 RpcProvider provider; //把UserService对象发布到rpc节点上 provider.NotifyService(new UserService()); //启动一个rpc服务发布节点,run以后,进程进入阻塞状态,等待远程的rpc请求 provider.Run(); return 0; } 

NotifyService 函数可以将UserService服务对象及其提供的方法进行预备发布。发布完服务对象后再调用Run()就将预备发布的服务对象及方法注册到ZooKeeper上并开启了对远端调用的网络监听。

Muduo提供的网络模块监听到连接事件并处理完连接逻辑后会调用OnConnection函数,监听到已建立的连接发生可读事件后会调用OnMessage函数

RpcProvider::NotifyService() 实现

Service_Info结构体内定义了一个服务对象,以及这个服务对象内提供的方法们(以std::unordered_map形式存储)

将传入进来的服务对象 service 进行预备发布。其实说直白点就是将这个 service 服务对象及其提供的方法的 Descriptor 描述类,存储在RpcProvider::m_serviceMap中。

/* service_name <=> service 描述 => service* 记录服务对象 => method_name => method 方法对象 json protobuf */ //这里是框架提供给外部使用的,可以发布rpc方法的函数接口 //此处应该使用Service类,而不是指定某个方法 void RpcProvider::NotifyService(google::protobuf::Service *service){ 
    //服务表 ServiceInfo service_info;//服务表 //获取了服务对象的描述信息 const google::protobuf::ServiceDescriptor *pserviceDesc = service->GetDescriptor(); //获取服务的名字 std::string service_name = pserviceDesc->name(); //获取服务对象service的方法数量 int methodCnt= pserviceDesc->method_count(); std::cout<<"service name:"<<service_name<<std::endl; // 添加日志信息后更改 for(int i=0; i<methodCnt; ++i){ 
    //获取了服务对象指定下标的服务方法的描述(抽象描述) const google::protobuf::MethodDescriptor* pmethodDesc = pserviceDesc->method(i); std::string method_name = pmethodDesc->name(); //插入服务 service_info.m_methodMap.insert({ 
   method_name, pmethodDesc}); printf("method_name:%s \n",method_name.c_str()); } //可以使用该表来调用方法 service_info.m_service = service; m_serviceMap.insert({ 
   service_name, service_info}); } 

RpcProvider::Run() 实现

将待发布的服务对象及其方法发布到ZooKeeper上,同时利用Muduo库提供的网络模块开启对RpcServer的(IP, Port)的监听。

// 启动rpc服务节点,开始提供rpc远程网络调用服务 void RpcProvider::Run(){ 
    // 获取配置文件中的 ip 和端口号初始化结构体 std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip"); uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str()); muduo::net::InetAddress address(ip, port); // 为了方便用户使用框架,在 Run 方法中封装 muduo // 创建 TcpServer 对象 muduo::net::TcpServer tcpServer_(&m_eventLoop, address, "MprpcProvider"); // 绑定连接回调和消息读写回调方法 tcpServer_.setConnectionCallback(std::bind(&RpcProvider::OnConnection, this, std::placeholders::_1)); tcpServer_.setMessageCallback(std::bind(&RpcProvider::OnMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); // 设置 muduo 库的线程数 tcpServer_.setThreadNum(4); //把当前rpc节点上要发布的服务全部注册在zk上,让rpc client可以从zk上发现服务 //session的timeout默认为30s,zkclient的网络I/O线程1/3的timeout内不发送心跳则丢弃此节点 ZkClient zkCli; zkCli.Start();//链接zkserver for(auto &sp:m_serviceMap){ 
    //service_name std::string service_path ="/"+sp.first;//拼接路径 zkCli.Create(service_path.c_str(),nullptr,0);//创建临时性节点 for(auto &mp:sp.second.m_methodMap){ 
    //service_name/method_name std::string method_path=service_path+"/"+mp.first;//拼接服务器路径和方法路径 char method_path_data[128]={ 
   0}; sprintf(method_path_data,"%s:%d",ip.c_str(),port);//向data中写入路径 //创建节点,ZOO_EPHEMERAL表示临时节点 zkCli.Create(method_path.c_str(),method_path_data,strlen(method_path_data),ZOO_EPHEMERAL); } } std::cout << "MprpcProvider start service at: " << ip << ':' << port << '\n'; // 启动网络服务 tcpServer_.start(); m_eventLoop.loop(); } 

RpcProvider::OnConnection() 实现

// 新的 socket 连接时的回调 void RpcProvider::OnConnection(const muduo::net::TcpConnectionPtr &conn){ 
    if(!conn->connected()){ 
    //和rpcclient的链接断开了 conn->shutdown(); } } 

RpcProvider::OnMessage() 实现

Caller 端发起远程调用的时候, 会对callee的rpcserver发起tcp连接,rpcserver接受连接后,开启对客户端连接描述符的可读事件监听。caller将请求的服务方法及参数发给callee的rpcserver,此时rpcserver上的muduo网络模块监听到该连接的可读事件,然后就会执行OnMessage(…)函数逻辑。

该方法表示已建立连接用户的读写事件操作,如果有一个远程 RPC 服务的调用请求,那么OnMessage方法就会响应。

  1. 首先要从网络上接收的远程rpc调用请求的字符流;
  2. 从字符流中读取前4个字节的内容,将头部的大小转换成二进制存到这四字节里,不可能会超出范围;
  3. 根据header_size读取数据头的原始字符流,反序列化数据,得到rpc请求的详细信息;
  4. 获取rpc方法参数的字符流数据,略过recv_buf的前面的头部信息(header_size和header_str),4字节加header_size即为开始的位置;
  5. 获取service对象和method对象;
  6. 生成rpc方法调用的请求request和响应response参数;
  7. 在框架上根据远端rpc请求,调用当前rpc节点上发布的方法。
/* 在框架内部需要提前协商好通信使用的protobuf数据类型:比如发送过来的数据类型为:service_name,method_name,args 需要定义proto的message类型,进行数据头的序列化和反序列化,为防止TCP的粘包,需要对各个参数进行参数的长度明确 定义header_size(4字节) + header_str + args_str 已建立连接的用户的读写事件回调,网络上如果有一个远程的rpc服务请求,则onmessge方法就会响应 */ // 已建立连接用户的读写事件回调;当远程有调用 rpc 服务的请求时,OnMessage 方法就会响应 void RpcProvider::OnMessage(const muduo::net::TcpConnectionPtr &conn, muduo::net::Buffer *buffer, muduo::Timestamp){ 
    //获取到数据,即网络上接受的远程rpc调用请求的字符流, Login和args std::string recv_buf= buffer->retrieveAllAsString(); //读取header_size,此时的整数若按照字符串格式发送,读取时会出现问题,所以需要直接按二进制发送 //从字符流中读取前四个字节的内容 uint32_t header_size=0; recv_buf.copy((char*)&header_size,4,0); //根据header_size读取数据头的原始字符流,反序列化数据,得到rpc的详细请求数据 std::string rpc_header_str=recv_buf.substr(4,header_size);//substr从4开始读读取header_size个字节的数据 mprpc::RpcHeader rpcHeader; std::string service_name;//用于存储反序列化成功的服务名字 std::string method_name;//用于存储反序列化成功的服务方法 uint32_t args_size;//用于存储反序列化成功的参数个数 //开始反序列化,参数接受类型为引用,返回值为bool型 if(rpcHeader.ParseFromString(rpc_header_str)){ 
    //数据头反序列化成功 service_name=rpcHeader.service_name(); method_name=rpcHeader.method_name(); args_size=rpcHeader.args_size(); }else { 
    //数据头反序列化失败 std::cout<<"rpc_header_str: "<<rpc_header_str<<"parse error!"<<std::endl; return; } //获取rpc参数方法的字符流数据 std::string args_str=recv_buf.substr(4+header_size,args_size); //打印调试信息 std::cout << "======================================" << std::endl; std::cout << "header_size: " << header_size << std::endl; std::cout << "rpc_header_str" << rpc_header_str << std::endl; std::cout << "service_name: " << service_name << std::endl; std::cout << "method_name: " << method_name << std::endl; std::cout << "args_str: " << args_str << std::endl; std::cout << "======================================" << std::endl; //获取service对象和method对象 auto it = m_serviceMap.find(service_name); if(it == m_serviceMap.end()){ 
    //如果方法不存在 std::cout << service_name << "is not exist!" << std::endl; return; } auto mit = it->second.m_methodMap.find(method_name); if(mit == it->second.m_methodMap.end()){ 
    //如果服务提供的方法不存在 std::cout << service_name << ":" << method_name << "is not exists!" << std::endl; return; } google::protobuf::Service *service=it->second.m_service; // 获取service对象,对应Userservice const google::protobuf::MethodDescriptor *method=mit->second; // 获取method对象,对应Login方法 //生成rpc方法调用的请求request和相应response参数 google::protobuf::Message *request = service->GetRequestPrototype(method).New();//生成一个新对象 if(!request->ParseFromString(args_str)){ 
    std::cout << "request parse error, content:" << args_str << std::endl; return; } google::protobuf::Message *response = service->GetResponsePrototype(method).New();//生成一个新对象 //给下面的method方法的调用,绑定一个Closure的回调函数,因为模板的实参推演失败,所以需要指定类型 google::protobuf::Closure *done = google::protobuf::NewCallback <RpcProvider,const muduo::net::TcpConnectionPtr&,google::protobuf::Message*> (this, &RpcProvider::SendRpcResponse, conn,response); //在框架上根据远端rpc请求,调用当前rpc节点上发布的方法 //相当于UserService调用了Login方法 service->CallMethod(method, nullptr, request, response, done); } 

NewCallback函数会返回一个google::protobuf::Closure类的对象,该Closure类其实相当于一个闭包。这个闭包捕获了一个成员对象的成员函数,以及这个成员函数需要的参数。然后闭包类提供了一个方法Run(),当执行这个闭包对象的Run()函数时,他就会执行捕获到的成员对象的成员函数,也就是相当于执行void RpcProvider::SendRpcResponse(conn, response);,这个函数可以将reponse消息体发送给Tcp连接的另一端,即caller

CallMethod 将服务名方法名进行组装,并用protobuf提供的序列化方法序列化,然后通过服务名方法名查找ZooKeeper服务器上提供该服务方法的RpcServer的地址信息,然后返回。接着再将请求的服务方法及其参数组装并序列化,向RpcServer发起tcp连接请求,连接建立后将序列化的数据发送给RpcServer,然后再等待接收来自RpcServer的返回消息体。

RpcProvider::SendRpcResponse() 实现

//Closure的回调操作,用于序列化RPC的响应和网络发送 void RpcProvider::SendRpcResponse(const muduo::net::TcpConnectionPtr& conn,google::protobuf::Message* response){ 
    std::string response_str; //response进行序列化 if(response->SerializeToString(&response_str)){ 
    //序列化成功后,通过网络把rpc方法执行的结果发送回rpc的调用方 conn->send(response_str); } else { 
    std::cout<<"Serialize response error!"<<std::endl; } //模拟http的短链接服务,由rpcprovider主动断开连接 conn->shutdown(); } 

RPC 服务调用

调用方需要利用到的是 Stub 类。Stub 类需要提供一个带参数的构造函数,需要重写这个实参 RpcChannel。

class MprpcChannel:public google::protobuf::RpcChannel { 
    public: //所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送 void CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf::Closure* done); }; 

提供方调用函数的方法:MprpcChannel::CallMethod,调用方的框架逻辑就是将访问的对象,函数,参数序列化,socket连接到zookeeper,获取对应的 response。

//所有通过stub代理对象调用的rpc方法都从这里处理,统一做方法调用的数据序列化和网络发送 void MprpcChannel::CallMethod(const google::protobuf::MethodDescriptor* method, google::protobuf::RpcController* controller, const google::protobuf::Message* request, google::protobuf::Message* response, google::protobuf::Closure* done) { 
    const google::protobuf::ServiceDescriptor* sd=method->service(); std::string service_name=sd->name(); //service name std::string method_name=method->name(); //method name //获取参数的序列化字符串长度 args_size uint32_t args_size = 0; std::string args_str; if(request->SerializeToString(&args_str)){ 
    //序列化成功 args_size=args_str.size(); } else { 
    controller->SetFailed("serialize request error!");//保存错误信息 // std::cout <<"serialize request error!"<< std::endl; return; } //定义rpc的请求header mprpc::RpcHeader rpcHeader; rpcHeader.set_service_name(service_name); rpcHeader.set_method_name(method_name); rpcHeader.set_args_size(args_size); uint32_t header_size = 0; std::string rpc_header_str; if(rpcHeader.SerializeToString(&rpc_header_str)){ 
    // response进行序列化  header_size = rpc_header_str.size(); } else { 
    // std::cout <<"serialize rpc header error!"<< std::endl; // 优化 controller->SetFailed("serialize rpc header error!"); return; } //组织待发送的rpc请求的字符串 std::string send_rpc_str; send_rpc_str.insert(0, std::string((char *)&header_size, 4)); // header_size send_rpc_str += rpc_header_str; // rpcheader send_rpc_str += args_str; // args std::cout<<"======================================"<<std::endl; std::cout<<"header_size: "<<header_size<<std::endl; std::cout<<"rpc_header_str"<<rpc_header_str<<std::endl; std::cout<<"service_name: "<<service_name<<std::endl; std::cout<<"method_name: "<<method_name<<std::endl; std::cout<<"args_str: "<<args_str<<std::endl; std::cout<<"======================================"<<std::endl; //使用TCP编程,完成rpc方法的远程调用 int clientfd = socket(AF_INET, SOCK_STREAM, 0); if(-1 == clientfd) { 
    // std::cout << "create socket error! errno: "<< errno << std::endl; //改用 controller 记录错误信息 // exit(EXIT_FAILURE); char errtxt[512]={ 
   0}; sprintf(errtxt,"create socket error! errno: %d",errno); controller->SetFailed(errtxt); return; } // std::string ip = MprpcApplication::GetInstance().GetConfig().Load("rpcserverip"); // uint16_t port = atoi(MprpcApplication::GetInstance().GetConfig().Load("rpcserverport").c_str()); /* rpc调用方向调用service_name服务,需要查询zk上该服务所在的host信息 */ ZkClient zkCli; zkCli.Start(); std::string method_path="/"+service_name+"/"+method_name; //获取ip地址和端口号 std::string host_data=zkCli.GetData(method_path.c_str()); if(host_data=="") { 
    controller->SetFailed(method_path+" is not exist!"); return; } int idx=host_data.find(":");//分割符 if(idx==-1) { 
    controller->SetFailed(method_path+" address is invalid!"); return; } std::string ip=host_data.substr(0,idx); //从字符串中返回一个指定的子串 uint32_t port=atoi(host_data.substr(idx+1,host_data.size()-idx).c_str()); //把参数 str 所指向的字符串转换为一个整数 struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(port); server_addr.sin_addr.s_addr = inet_addr(ip.c_str()); //链接rpc服务节点 if(-1 == connect(clientfd,(struct sockaddr*)&server_addr,sizeof(server_addr))) { 
    // std::cout<<"connect error!errno: "<<errno<<std::endl; // close(clientfd); // exit(EXIT_FAILURE); close(clientfd); char errtxt[512]={ 
   0}; sprintf(errtxt,"connect error! errno: %d",errno); controller->SetFailed(errtxt); return; } //发送rpc请求 if(-1 == send(clientfd, send_rpc_str.c_str(), send_rpc_str.size(), 0)) { 
    // std::cout<<"send error!errno: "<<errno<<std::endl; // close(clientfd); // return; close(clientfd); char errtxt[512]={ 
   0}; sprintf(errtxt,"send error! errno: %d",errno); controller->SetFailed(errtxt); return; } //接受rpc请求的响应值 char recv_buf[1024]={ 
   0}; int recv_size = 0; if(-1 == (recv_size = recv(clientfd, recv_buf, 1024, 0))) { 
    close(clientfd); char errtxt[512]={ 
   0}; sprintf(errtxt,"recv error! errno: %d",errno); controller->SetFailed(errtxt); return; } //反序列化rpc调用的响应数据 std::string response_str(recv_buf, 0, recv_size); //bug点:recv_buf遇到\0后的数据不再读取,导致反序列化失败 //解决方案:使用string转换时会遇到\0,由于字符串特性导致不再读取,因为protobuf支持从数组转换,所以换方法直接从Array反序列化 // if(!response->ParseFromString(response_str)){ 
    if(!response->ParsePartialFromArray(recv_buf,recv_size)){ 
    // std::cout<<"parse error! response_str:"<<response_str<<std::endl; // close(clientfd); // return; close(clientfd); char errtxt[512]={ 
   0}; sprintf(errtxt,"arse error!! response_str: %s",response_str.c_str()); controller->SetFailed(errtxt); return; } close(clientfd); } 

zookeeper

安装java环境

在这里插入图片描述

1.sudo apt-get install openjdk-8-jdk
2. 配置环境变量,编辑如下文件:vim ~/.bashrc
在最后一行加:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 

3.测试jdk是否安装成功:java -version
在这里插入图片描述

Ubuntu安装JDK

Zookeeper分布式协调服务

在这里插入图片描述
在这里插入图片描述

zk的原生开发API(c/c++接口)

1.sudo ./configure
2.sudo make
在这里插入图片描述
zookeeper 源码编译生成C函数接口,在 ./configure 后生成的 Makefile 文件中,默认是将警告当成错误的,因此导致上图中的警告,总是以错误形式展现,编译失败


进入到生成的 Makefile 中,修改第548行,将AM_CFLAGS -Wall -Werror 改为 AM_CFLAGS -Wall上述问题

Linux安装zookeeper原生C API接口出现的make编译错误

3.make install

zookeeper 项目应用

ZooKeeper相当于是一个特殊的文件系统,不过和普通文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据,目录节点不行。ZooKeeper内部为了保持高吞吐和低延迟,再内存中维护了一个树状的目录结构,这种特性使ZooKeeper不能存放大量数据,每个节点存放数据的上线为1M。

服务对象名在ZooKeeper中以永久性节点的形式存在,当RpcServer与ZooKeeper断开连接后,整个节点还是会存在。方法对象名则以临时性节点存在,RpcServer与ZooKeeper断开后临时节点被删除。临时节点上带着节点数据,在本项目中,节点数据就是提供该服务方法的RpcServer的通信地址(IP+Port)

//封装的zk客户端类 class ZkClient { 
    public: ZkClient(); ~ZkClient(); //zkclinet启动链接zkserver void Start(); //在zkserver上根据指定的path创建znode节点 void Create(const char *path,const char *data,int datalen,int state=0); //传入参数指定的znode节点路径,或者znode节点的值 std::string GetData(const char *path); private: //zk的客户端句柄 zhandle_t *m_zhandle; }; 
#include"zookeeperutil.h" #include"mprpcapplication.h" #include<semaphore.h> #include<iostream> // 全局的 watcher 观察器 zkserver 给 zkclient 的通知 // 参数 type 和 state 分别是 ZooKeeper 服务端返回的事件类型和连接状态 void global_watcher(zhandle_t *zh,int type,int state,const char *path,void *watcherCtx) { 
    if(type==ZOO_SESSION_EVENT) //回调的消息类型是和会话相关的消息类型 { 
    if(state==ZOO_CONNECTED_STATE) //zkclient和zkserver链接成功 { 
    sem_t *sem=(sem_t*)zoo_get_context(zh); sem_post(sem); //信号量资源加一 } } } ZkClient::ZkClient():m_zhandle(nullptr) { 
   } ZkClient::~ZkClient() { 
    if(m_zhandle!=nullptr) { 
    zookeeper_close(m_zhandle);//关闭句柄释放资源 } } //zkclinet启动链接zkserver void ZkClient::Start() { 
    //加载zk的IP和端口号,默认为2181 std::string host=MprpcApplication::GetInstance().GetConfig().Load("zookeeperip"); std::string port=MprpcApplication::GetInstance().GetConfig().Load("zookeeperport"); std::string connstr=host+":"+port; //调用原生API,端口与IP,回调函数,会话超时时间 /* zookeeper_mt:多线程版本 zookeeper的API客户端程序提供了三个线程 API调用线程 网络I/O线程:专门在一个线程里处理网络I/O watcher回调线程 */ m_zhandle=zookeeper_init(connstr.c_str(), global_watcher, 30000, nullptr, nullptr, 0); // 仅仅通过判断接口返回的句柄是否为NULL,并不能表示句柄是可用的。 // 因为,会话的建立过程是异步的,必须等到会话状态变成ZOO_CONNECTED_STATE才表示句柄可用。 if(nullptr==m_zhandle) { 
    std::cout<<"zookeeper_init error!"<<std::endl; exit(EXIT_FAILURE); } sem_t sem; sem_init(&sem,0,0); //初始化资源为0,用于多线程间的同步 // 将刚才定义的同步信号量sem通过这个 zoo_set_context 函数可以传递给 m_zhandle 进行保存。 // 在global_watcher中可以将这个sem从m_zhandle取出来使用。 zoo_set_context(m_zhandle,&sem); //设置上下文,添加额外信息 sem_wait(&sem); // 阻塞结束后才连接成功!!! std::cout<<"zookeeper_init success!"<<std::endl; } //在zkserver上根据指定的path创建znode节点 void ZkClient::Create(const char *path,const char *data,int datalen,int state) { 
    char path_buffer[128]; int bufferlen=sizeof(path_buffer); int flag; //检查该节点是否存在 flag=zoo_exists(m_zhandle,path,0,nullptr); if(ZNONODE==flag)//该节点并不存在 { 
    //创建指定path的znode节点 flag=zoo_create(m_zhandle,path,data,datalen,&ZOO_OPEN_ACL_UNSAFE,state,path_buffer,bufferlen); if(flag==ZOK) { 
    std::cout<<"znode create success... path:"<<path<<std::endl; } else { 
    std::cout<<"flag:"<<flag<<std::endl; std::cout<<"znode create error... path:"<<path<<std::endl; exit(EXIT_FAILURE); } } } //传入参数指定的znode节点路径,获取znode节点的值 std::string ZkClient::GetData(const char *path) { 
    char buffer[64]; int bufferlen=sizeof(buffer); int flag=zoo_get(m_zhandle,path,0,buffer,&bufferlen,nullptr);//获取信息与返回值 if(flag!=ZOK)//如果获取失败 { 
    std::cout<<"get znode error... path:"<<path<<std::endl; return ""; } else { 
    //获取成功 return buffer; } } 

watcher 机制就是ZooKeeper客户端对某个 znode 建立一个watcher事件,当该znode发生变化时,这些ZK客户端会收到ZK服务端的通知,然后ZK客户端根据znode的变化来做出业务上的改变。

ZooKeeper服务端收到来自客户端 callee 的连接请求后,服务端为节点创建会话(此时这个节点状态发生改变),服务端会返回给客户端callee一个事件通知,然后触发watcher回调(执行global_watcher函数).

总结

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/119770.html

(0)
上一篇 2025-11-03 09:33
下一篇 2025-11-03 10:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信