满纸荒唐言,一把心酸泪,都云作者痴,谁解其中味。 技术博客 心情随笔
gRPC进阶:通过stream实现观察者模式
2025/2/12 344

导航

1前言

2软件开发中的观察者模式

3gRPC中的流(stream)模式

4编写proto文件

5C++服务端

6C++客户端

7后记

1 前言

在上一篇文章《gRPC基础:C++服务端与客户端代码示例》中以从0到1的入门视角,详细介绍了gRPC的编译安装、编写proto文件、以及根据proto文件生成服务端与客户端的代码示例。但还遗留了一个非常重要的场景未讨论,它就是观察者模式,因此本文专门分析和演示如何通过gRPC实现很多高级编程语言都具备的观察者模式。来源:https://www.wubayue.com

2 软件开发中的观察者模式

观察者模式是软件开发中的一种消息通信范式,它定义了主题与观察者之间一对多的依赖关系,当主题发生变化时,所有依赖它的观察者均收到通知并更新状态。比如QT中的信号槽,C#中的事件机制,都是基于观察者模式。来源:https://www.wubayue.com

观察者模式

3 gRPC中的流(stream)模式

流是一个常见词汇,在日常生活中,能连续不断移动的物质通常称之为流,比如水流、电流、气流。我们在网上听的音乐、刷的短视频叫做流媒体,因为它是媒体数据在连续不断流动。

gRPC基于HTTP/2协议进行数据传输,HTTP/2相对于HTTP/1.1,一个很大的变化就是通过引入流模式实现了多路复用:在HTTP/1.1时代,一次请求对应一次响应,在上一次请求未处理完成之前,下一次请求就无法处理;而在HTTP/2之后,所有操作基于流模式,多次请求之间不再阻塞。了解了这个背景,就大致明白了gRPC中的流其实是完全基于的HTTP/2协议,有如下四种方式:

gRPC普通模式

普通模式是传统的一问一答,它并不体现流的特征,但它是gRPC中使用最多的模式。

gRPC服务端流模式

服务端流模式即客户端发起一次请求,服务端多次响应。本文中所描述的观察者模式以及代码示例,均对应服务端流模式:客户端对应观察者,服务端对应主题,客户端发起的一次请求即观察者向主题进行的订阅,服务端的多次响应即主题发生变更时向观察者的多次推送。

gRPC客户端流模式

客户端流模式是客户端发起多次请求,服务端一次响应。应用场景如客户端需要向服务端传输一份大数据,分多次请求传输,服务端在处理完最后一份数据后响应完成。

gRPC双向流模式

双向流模式是客户端发起多次请求,服务端多次响应。它可应用于更复杂的业务场景。来源:https://www.wubayue.com

4 编写proto文件

// 指定版本
syntax = "proto3";
// 引用公共的数据类型
import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
// 命名空间
package GrpcDemo;

service Computer_Grpc {
    // 获取电脑开/关状态
    rpc getComputerStatus(google.protobuf.Empty) returns (google.protobuf.BoolValue) {}
    // 设置电脑开/关状态
    rpc setComputerStatus(google.protobuf.BoolValue) returns (google.protobuf.Empty) {}
    // 电脑开关状态变更事件
    rpc computerStatusChanged(google.protobuf.Empty) returns (stream google.protobuf.BoolValue) {}
} 

在proto中启用流模式比较简单,只需要将对应的参数类型定义为stream即可。来源:https://www.wubayue.com

5 C++服务端

业务逻辑

ComputerGrpcService.h

#pragma once
#include <string>
#include <map>
#include "Computer_Grpc.grpc.pb.h"

using grpc::Server;
using namespace google::protobuf;
using namespace GrpcDemo;

namespace GrpcServer
{
    // 流模式示例
    class ComputerGrpcService final : public Computer_Grpc::Service
    {
    private:
        bool _computerStatus;
        std::map<std::string, bool> _clients;

    public:
        ComputerGrpcService();
        ~ComputerGrpcService();

        // 获取电脑开/关状态
        ::grpc::Status getComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::google::protobuf::BoolValue* response) override;
        // 设置电脑开/关状态
        ::grpc::Status setComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::BoolValue* request, ::google::protobuf::Empty* response) override;
        // 电脑开关状态变更事件
        ::grpc::Status computerStatusChanged(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::google::protobuf::BoolValue>* writer) override;

    };
} 

ComputerGrpcService.cpp

#include "ComputerGrpcService.h"

namespace GrpcServer
{
    ComputerGrpcService::ComputerGrpcService()
    {
        _computerStatus = false;
        _clients.clear();
    }
    ComputerGrpcService::~ComputerGrpcService()
    {
    }

    ::grpc::Status ComputerGrpcService::getComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::google::protobuf::BoolValue* response)
    {
        response->set_value(_computerStatus);
        return ::grpc::Status::OK;
    }
    ::grpc::Status ComputerGrpcService::setComputerStatus(::grpc::ServerContext* context, const ::google::protobuf::BoolValue* request, ::google::protobuf::Empty* response)
    {
        _computerStatus = request->value();
        std::cout << "setComputerStatus is called, parameter is : " << _computerStatus << std::endl;
        return ::grpc::Status::OK;
    }
    ::grpc::Status ComputerGrpcService::computerStatusChanged(::grpc::ServerContext* context, const ::google::protobuf::Empty* request, ::grpc::ServerWriter<::google::protobuf::BoolValue>* writer)
    {
        // 从请求上下文中获取客户端ID(需要客户端配合传递)
        std::string clientID = "";
        auto clientMetaData = context->client_metadata();
        for (auto it = clientMetaData.begin(); it != clientMetaData.end(); ++it)
        {
            char k[64] = {};
            memcpy(&k[0], it->first.data(), it->first.length());
            char v[128] = {};
            memcpy(&v[0], it->second.data(), it->second.length());
            if (std::string(k) == "client_id")
                clientID = std::string(v);
        }
        if (clientID.length() == 0)
            return grpc::Status(grpc::StatusCode::ABORTED, "Failed to retrieve client_id from the request context.");
        if (_clients.count(clientID) == 0)
            _clients.insert(std::pair<std::string, bool>(clientID, _computerStatus));

        // 循环检测并向客户端发送状态变更事件
        while (true)
        {
            // 获取向该用户最后一次发送的状态
            bool lastComputerStatus = false;
            std::map<std::string, bool>::iterator iter = _clients.find(clientID);
            if (iter != _clients.end())
                lastComputerStatus = iter->second;
            // 如果最后一次向客户端发送的状态与当前最新的状态不一致
            if (lastComputerStatus != _computerStatus)
            {
                // 发送
                google::protobuf::BoolValue response;
                response.set_value(_computerStatus);
                writer->Write(response);
                // 更新最后一次发送的状态
                _clients[clientID] = _computerStatus;
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        }
        return ::grpc::Status::OK;
    }
} 

开启gRPC服务

GrpcService.h

#pragma once
#include <iostream>
#include <grpcpp/grpcpp.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/health_check_service_interface.h>
#include "ComputerGrpcService.h"

namespace GrpcServer
{
    class GrpcService
    {
    public:
        GrpcService();
        ~GrpcService();

        // 启动Grpc服务
        bool start();

    private:
        ComputerGrpcService* _pComputerGrpcService;

        // gRPC服务线程相关
        bool _isGrpcServiceStarted;
        std::shared_ptr<std::thread> _pGrpcServiceThread;
        void grpcServiceThread();
    };
} 

GrpcService.cpp 来源:https://www.wubayue.com

#include "GrpcService.h";

namespace GrpcServer
{
    GrpcService::GrpcService()
    {
        _pComputerGrpcService = new ComputerGrpcService();
        _isGrpcServiceStarted = false;
    }
    GrpcService::~GrpcService()
    {
        if (_pComputerGrpcService != nullptr)
        {
            delete _pComputerGrpcService;
            _pComputerGrpcService = nullptr;
        }
        _isGrpcServiceStarted = false;
    }

    bool GrpcService::start()
    {
        if (_isGrpcServiceStarted)
            return false;
        // 在单独的线程中运行gRPC服务
        _pGrpcServiceThread = std::make_shared<std::thread>(&GrpcService::grpcServiceThread, this);
        _isGrpcServiceStarted = true;
        return true;
    }

    void GrpcService::grpcServiceThread()
    {
        grpc::EnableDefaultHealthCheckService(true);
        grpc::reflection::InitProtoReflectionServerBuilderPlugin();
        grpc::ServerBuilder builder;
        // 在所有IP段监听8888端口
        builder.AddListeningPort("0.0.0.0:8888", grpc::InsecureServerCredentials());
        builder.RegisterService(_pComputerGrpcService);
        std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
        server->Wait();
    }
}

int main()
{
    GrpcServer::GrpcService grpcService;
    std::cout << "Grpc server start " << (grpcService.start() ? "succeed." : "failed.") << std::endl;
    std::string k;
    std::cin >> k;
    return 0;
} 

6 C++客户端

ComputerGrpcClient.h

#pragma once
#include <grpcpp/grpcpp.h>
#include "Computer_Grpc.grpc.pb.h"

namespace GrpcClient
{
    class ComputerGrpcClient
    {
    private:
        std::unique_ptr<GrpcDemo::Computer_Grpc::Stub> _stub;

    public:
        ComputerGrpcClient(std::shared_ptr<grpc::Channel> channel);
        GrpcDemo::Computer_Grpc::Stub* getStub();
    };
} 

ComputerGrpcClient.cpp 来源:https://www.wubayue.com

#include "ComputerGrpcClient.h"

namespace GrpcClient
{
    // 创建Stub对象(Stub可理解为服务端的代理)
    ComputerGrpcClient::ComputerGrpcClient(std::shared_ptr<grpc::Channel> channel)
        : _stub(GrpcDemo::Computer_Grpc::NewStub(channel)) {
    }

    // 对外提供Stub对象
    GrpcDemo::Computer_Grpc::Stub* ComputerGrpcClient::getStub()
    {
        return _stub.get();
    }
}
int main()
{   
    auto channel = grpc::CreateChannel("127.0.0.1:8888", grpc::InsecureChannelCredentials());
    

    // 进阶示例(Server stream)
    GrpcClient::ComputerGrpcClient computerGrpcClient(channel);
    // 开始在线程中轮询读取 Server stream
    std::thread t1([](GrpcClient::ComputerGrpcClient* computerClient) {
        grpc::ClientContext context;
        // 使用当前时间模拟client_id
        std::time_t now = std::time(nullptr);
        char buffer[64];
        std::strftime(buffer, sizeof(buffer), "%Y_%m_%d_%H_%M_%S", std::localtime(&now));
        std::string clientID = buffer;
        context.AddMetadata("client_id", clientID);

        google::protobuf::Empty request;
        google::protobuf::BoolValue response;
        auto reader = (*computerClient).getStub()->computerStatusChanged(&context, request);
        while (reader->Read(&response))
        {
            std::cout << "computerStatusChanged : " << response.value() << std::endl;
        }
    }, &computerGrpcClient);
    // 在线程中循环设置状态
    std::thread t2([](GrpcClient::ComputerGrpcClient* computerClient) {
        bool val = false;
        while (true)
        {
            grpc::ClientContext context;
            google::protobuf::BoolValue request;
            google::protobuf::Empty response;
            if (val)
                request.set_value(false);
            else
                request.set_value(true);
            val = !val;
            (*computerClient).getStub()->setComputerStatus(&context, request, &response);
            std::this_thread::sleep_for(std::chrono::milliseconds(3000));
        }
    }, &computerGrpcClient);
    t1.join();
    t2.join();
} 

7 后记

大概在2007年的时候项目中使用微软的.Net Remoting,后来被整合进WCF,它们都只能在微软的.Net框架中使用。再后来微软传奇CEO Satya Nadella上台,开始大力推动跨平台,WCF逐渐成为历史长河中的沧海一粟,随之而去的还有WebService、XML、jQuery......那些当年的小甜甜如今都变成了牛夫人。

gRPC现在是小甜甜,什么时候会变成牛夫人?通过两篇文章记录了gRPC的基本应用,备忘。来源:https://www.wubayue.com

<全文完>