导航
1前言
7后记
1 前言
在上一篇文章《gRPC基础:C++服务端与客户端代码示例》中以从0到1的入门视角,详细介绍了gRPC的编译安装、编写proto文件、以及根据proto文件生成服务端与客户端的代码示例。但还遗留了一个非常重要的场景未讨论,它就是观察者模式,因此本文专门分析和演示如何通过gRPC实现很多高级编程语言都具备的观察者模式。来源:https://www.wubayue.com
2 软件开发中的观察者模式
观察者模式是软件开发中的一种消息通信范式,它定义了主题与观察者之间一对多的依赖关系,当主题发生变化时,所有依赖它的观察者均收到通知并更新状态。比如QT中的信号槽,C#中的事件机制,都是基于观察者模式。来源:https://www.wubayue.com
3 gRPC中的流(stream)模式
流是一个常见词汇,在日常生活中,能连续不断移动的物质通常称之为流,比如水流、电流、气流。我们在网上听的音乐、刷的短视频叫做流媒体,因为它是媒体数据在连续不断流动。
gRPC基于HTTP/2协议进行数据传输,HTTP/2相对于HTTP/1.1,一个很大的变化就是通过引入流模式实现了多路复用:在HTTP/1.1时代,一次请求对应一次响应,在上一次请求未处理完成之前,下一次请求就无法处理;而在HTTP/2之后,所有操作基于流模式,多次请求之间不再阻塞。了解了这个背景,就大致明白了gRPC中的流其实是完全基于的HTTP/2协议,有如下四种方式:
普通模式是传统的一问一答,它并不体现流的特征,但它是gRPC中使用最多的模式。
服务端流模式即客户端发起一次请求,服务端多次响应。本文中所描述的观察者模式以及代码示例,均对应服务端流模式:客户端对应观察者,服务端对应主题,客户端发起的一次请求即观察者向主题进行的订阅,服务端的多次响应即主题发生变更时向观察者的多次推送。
客户端流模式是客户端发起多次请求,服务端一次响应。应用场景如客户端需要向服务端传输一份大数据,分多次请求传输,服务端在处理完最后一份数据后响应完成。
双向流模式是客户端发起多次请求,服务端多次响应。它可应用于更复杂的业务场景。来源:https://www.wubayue.com
4 编写proto文件
// 指定版本
syntax = "proto3";
// 引用公共的数据类型
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
// 命名空间
package GrpcDemo;
service Computer_Grpc {
// 获取电脑开/关状态
rpc getComputerStatus(google.protobuf.Empty) returns (google.protobuf.BoolValue) {}
// 设置电脑开/关状态
rpc setComputerStatus(google.protobuf.BoolValue) returns (google.protobuf.Empty) {}
// 电脑开关状态变更事件
rpc computerStatusChanged(google.protobuf.Empty) returns (stream google.protobuf.BoolValue) {}
}
在proto中启用流模式比较简单,只需要将对应的参数类型定义为stream即可。来源:https://www.wubayue.com
5 C++服务端
业务逻辑
ComputerGrpcService.h
#pragma once
#include <string>
#include <map>
#include "Computer_Grpc.grpc.pb.h"
using grpc::Server;
using namespace google::protobuf;
using namespace GrpcDemo;
namespace GrpcServer
{
// 流模式示例
class ComputerGrpcService final : public Computer_Grpc::Service
{
private:
bool _computerStatus;
std::map<std::string, bool> _clients;
public:
ComputerGrpcService();
~ComputerGrpcService();
// 获取电脑开/关状态
::grpc::Status getComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::google::protobuf::BoolValue* response) override;
// 设置电脑开/关状态
::grpc::Status setComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::BoolValue* request, ::google::protobuf::Empty* response) override;
// 电脑开关状态变更事件
::grpc::Status computerStatusChanged(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::google::protobuf::BoolValue>* writer) override;
};
}
ComputerGrpcService.cpp
#include "ComputerGrpcService.h"
namespace GrpcServer
{
ComputerGrpcService::ComputerGrpcService()
{
_computerStatus = false;
_clients.clear();
}
ComputerGrpcService::~ComputerGrpcService()
{
}
::grpc::Status ComputerGrpcService::getComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::google::protobuf::BoolValue* response)
{
response->set_value(_computerStatus);
return ::grpc::Status::OK;
}
::grpc::Status ComputerGrpcService::setComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::BoolValue* request, ::google::protobuf::Empty* response)
{
_computerStatus = request->value();
std::cout << "setComputerStatus is called, parameter is : " << _computerStatus << std::endl;
return ::grpc::Status::OK;
}
::grpc::Status ComputerGrpcService::computerStatusChanged(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::google::protobuf::BoolValue>* writer)
{
// 从请求上下文中获取客户端ID(需要客户端配合传递)
std::string clientID = "";
auto clientMetaData = context->client_metadata();
for (auto it = clientMetaData.begin(); it != clientMetaData.end(); ++it)
{
char k[64] = {};
memcpy(&k[0], it->first.data(), it->first.length());
char v[128] = {};
memcpy(&v[0], it->second.data(), it->second.length());
if (std::string(k) == "client_id")
clientID = std::string(v);
}
if (clientID.length() == 0)
return grpc::Status(grpc::StatusCode::ABORTED, "Failed to retrieve client_id from the request context.");
if (_clients.count(clientID) == 0)
_clients.insert(std::pair<std::string, bool>(clientID, _computerStatus));
// 循环检测并向客户端发送状态变更事件
while (true)
{
// 获取向该用户最后一次发送的状态
bool lastComputerStatus = false;
std::map<std::string, bool>::iterator iter = _clients.find(clientID);
if (iter != _clients.end())
lastComputerStatus = iter->second;
// 如果最后一次向客户端发送的状态与当前最新的状态不一致
if (lastComputerStatus != _computerStatus)
{
// 发送
google::protobuf::BoolValue response;
response.set_value(_computerStatus);
writer->Write(response);
// 更新最后一次发送的状态
_clients[clientID] = _computerStatus;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
return ::grpc::Status::OK;
}
}
开启gRPC服务
GrpcService.h
#pragma once
#include <iostream>
#include <grpcpp/grpcpp.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/health_check_service_interface.h>
#include "ComputerGrpcService.h"
namespace GrpcServer
{
class GrpcService
{
public:
GrpcService();
~GrpcService();
// 启动Grpc服务
bool start();
private:
ComputerGrpcService* _pComputerGrpcService;
// gRPC服务线程相关
bool _isGrpcServiceStarted;
std::shared_ptr<std::thread> _pGrpcServiceThread;
void grpcServiceThread();
};
}
GrpcService.cpp 来源:https://www.wubayue.com
#include "GrpcService.h";
namespace GrpcServer
{
GrpcService::GrpcService()
{
_pComputerGrpcService = new ComputerGrpcService();
_isGrpcServiceStarted = false;
}
GrpcService::~GrpcService()
{
if (_pComputerGrpcService != nullptr)
{
delete _pComputerGrpcService;
_pComputerGrpcService = nullptr;
}
_isGrpcServiceStarted = false;
}
bool GrpcService::start()
{
if (_isGrpcServiceStarted)
return false;
// 在单独的线程中运行gRPC服务
_pGrpcServiceThread = std::make_shared<std::thread>(&GrpcService::grpcServiceThread, this);
_isGrpcServiceStarted = true;
return true;
}
void GrpcService::grpcServiceThread()
{
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
grpc::ServerBuilder builder;
// 在所有IP段监听8888端口
builder.AddListeningPort("0.0.0.0:8888", grpc::InsecureServerCredentials());
builder.RegisterService(_pComputerGrpcService);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
server->Wait();
}
}
int main()
{
GrpcServer::GrpcService grpcService;
std::cout << "Grpc server start " << (grpcService.start() ? "succeed." : "failed.") << std::endl;
std::string k;
std::cin >> k;
return 0;
}
6 C++客户端
ComputerGrpcClient.h
#pragma once
#include <grpcpp/grpcpp.h>
#include "Computer_Grpc.grpc.pb.h"
namespace GrpcClient
{
class ComputerGrpcClient
{
private:
std::unique_ptr<GrpcDemo::Computer_Grpc::Stub> _stub;
public:
ComputerGrpcClient(std::shared_ptr<grpc::Channel> channel);
GrpcDemo::Computer_Grpc::Stub* getStub();
};
}
ComputerGrpcClient.cpp 来源:https://www.wubayue.com
#include "ComputerGrpcClient.h"
namespace GrpcClient
{
// 创建Stub对象(Stub可理解为服务端的代理)
ComputerGrpcClient::ComputerGrpcClient(std::shared_ptr<grpc::Channel> channel)
: _stub(GrpcDemo::Computer_Grpc::NewStub(channel)) {
}
// 对外提供Stub对象
GrpcDemo::Computer_Grpc::Stub* ComputerGrpcClient::getStub()
{
return _stub.get();
}
}
int main()
{
auto channel = grpc::CreateChannel("127.0.0.1:8888", grpc::InsecureChannelCredentials());
// 进阶示例(Server stream)
GrpcClient::ComputerGrpcClient computerGrpcClient(channel);
// 开始在线程中轮询读取 Server stream
std::thread t1([](GrpcClient::ComputerGrpcClient* computerClient) {
grpc::ClientContext context;
// 使用当前时间模拟client_id
std::time_t now = std::time(nullptr);
char buffer[64];
std::strftime(buffer, sizeof(buffer), "%Y_%m_%d_%H_%M_%S", std::localtime(&now));
std::string clientID = buffer;
context.AddMetadata("client_id", clientID);
google::protobuf::Empty request;
google::protobuf::BoolValue response;
auto reader = (*computerClient).getStub()->computerStatusChanged(&context, request);
while (reader->Read(&response))
{
std::cout << "computerStatusChanged : " << response.value() << std::endl;
}
}, &computerGrpcClient);
// 在线程中循环设置状态
std::thread t2([](GrpcClient::ComputerGrpcClient* computerClient) {
bool val = false;
while (true)
{
grpc::ClientContext context;
google::protobuf::BoolValue request;
google::protobuf::Empty response;
if (val)
request.set_value(false);
else
request.set_value(true);
val = !val;
(*computerClient).getStub()->setComputerStatus(&context, request, &response);
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
}
}, &computerGrpcClient);
t1.join();
t2.join();
}
7 后记
大概在2007年的时候项目中使用微软的.Net Remoting,后来被整合进WCF,它们都只能在微软的.Net框架中使用。再后来微软传奇CEO Satya Nadella上台,开始大力推动跨平台,WCF逐渐成为历史长河中的沧海一粟,随之而去的还有WebService、XML、jQuery......那些当年的小甜甜如今都变成了牛夫人。
gRPC现在是小甜甜,什么时候会变成牛夫人?通过两篇文章记录了gRPC的基本应用,备忘。来源:https://www.wubayue.com
<全文完>