CategoryResourceRepost/极客时间专栏/深入浅出gRPC/02 | 客户端创建和调用原理.md
louzefeng d3828a7aee mod
2024-07-11 05:50:32 +00:00

30 KiB
Raw Permalink Blame History

1. gRPC客户端创建流程

1.1 背景

gRPC 是在 HTTP/2 之上实现的 RPC 框架HTTP/2 是第7层应用层协议它运行在TCP第4层 - 传输层协议之上相比于传统的REST/JSON机制有诸多的优点

  1. 基于HTTP/2之上的二进制协议Protobuf序列化机制
  2. 一个连接上可以多路复用,并发处理多个请求和响应;
  3. 多种语言的类库实现;
  4. 服务定义文件和自动代码生成(.proto文件和Protobuf编译工具

此外gRPC还提供了很多扩展点用于对框架进行功能定制和扩展例如通过开放负载均衡接口可以无缝的与第三方组件进行集成对接Zookeeper、域名解析服务、SLB服务等

一个完整的 RPC 调用流程示例如下:

gRPC的RPC调用与上述流程相似下面我们一起学习下gRPC的客户端创建和服务调用流程。

1.2 业务代码示例

以gRPC入门级的helloworld Demo为例客户端发起RPC调用的代码主要包括如下几部分

  1. 根据hostname和port创建ManagedChannelImpl
  2. 根据helloworld.proto文件生成的GreeterGrpc创建客户端Stub用来发起RPC调用
  3. 使用客户端StubGreeterBlockingStub发起RPC调用获取响应。

相关示例代码如下所示HelloWorldClient类

HelloWorldClient(ManagedChannelBuilder<?> channelBuilder) {
    channel = channelBuilder.build();
    blockingStub = GreeterGrpc.newBlockingStub(channel);
    futureStub = GreeterGrpc.newFutureStub(channel);
    stub = GreeterGrpc.newStub(channel);
  }
  public void blockingGreet(String name) {
    logger.info("Will try to greet " + name + " ...");
    HelloRequest request = HelloRequest.newBuilder().setName(name).build();
    try {
      HelloReply response = blockingStub
              .sayHello(request);
...

1.3 RPC调用流程

gRPC的客户端调用主要包括基于Netty的HTTP/2客户端创建、客户端负载均衡、请求消息的发送和响应接收处理四个流程。

1.3.1 客户端调用总体流程

gRPC的客户端调用总体流程如下图所示

gRPC的客户端调用流程如下

  1. 客户端Stub(GreeterBlockingStub)调用sayHello(request)发起RPC调用
  2. 通过DnsNameResolver进行域名解析获取服务端的地址信息列表随后使用默认的LoadBalancer策略选择一个具体的gRPC服务端实例
  3. 如果与路由选中的服务端之间没有可用的连接则创建NettyClientTransport和NettyClientHandler发起HTTP/2连接
  4. 对请求消息使用PBProtobuf做序列化通过HTTP/2 Stream发送给gRPC服务端
  5. 接收到服务端响应之后使用PBProtobuf做反序列化
  6. 回调GrpcFuture的set(Response)方法唤醒阻塞的客户端调用线程获取RPC响应。

需要指出的是客户端同步阻塞RPC调用阻塞的是调用方线程通常是业务线程底层Transport的I/O线程Netty的NioEventLoop仍然是非阻塞的。

1.3.2 ManagedChannel创建流程

ManagedChannel是对Transport层SocketChannel的抽象Transport层负责协议消息的序列化和反序列化以及协议消息的发送和读取。

ManagedChannel将处理后的请求和响应传递给与之相关联的ClientCall进行上层处理同时ManagedChannel提供了对Channel的生命周期管理链路创建、空闲、关闭等

ManagedChannel提供了接口式的切面ClientInterceptor它可以拦截RPC客户端调用注入扩展点以及功能定制方便框架的使用者对gRPC进行功能扩展。

ManagedChannel的主要实现类ManagedChannelImpl创建流程如下

流程关键技术点解读:

  1. 使用builder模式创建ManagedChannelBuilder实现类NettyChannelBuilderNettyChannelBuilder提供了buildTransportFactory工厂方法创建NettyTransportFactory最终用于创建NettyClientTransport
  2. 初始化HTTP/2连接方式采用plaintext协商模式还是默认的TLS模式HTTP/2的连接有两种模式h2基于TLS之上构建的HTTP/2和h2c直接在TCP之上构建的HTTP/2
  3. 创建NameResolver.Factory工厂类用于服务端URI的解析gRPC默认采用DNS域名解析方式。

ManagedChannel实例构造完成之后即可创建ClientCall发起RPC调用。

1.3.3 ClientCall创建流程

完成ManagedChannelImpl创建之后由ManagedChannelImpl发起创建一个新的ClientCall实例。ClientCall的用途是业务应用层的消息调度和处理它的典型用法如下

 call = channel.newCall(unaryMethod, callOptions);
 call.start(listener, headers);
 call.sendMessage(message);
 call.halfClose();
 call.request(1);
 // wait for listener.onMessage()

ClientCall实例的创建流程如下所示

流程关键技术点解读:

  • ClientCallImpl的主要构造参数是MethodDescriptor和CallOptions其中MethodDescriptor存放了需要调用RPC服务的接口名、方法名、服务调用的方式例如UNARY类型以及请求和响应的序列化和反序列化实现类。 CallOptions则存放了RPC调用的其它附加信息例如超时时间、鉴权信息、消息长度限制和执行客户端调用的线程池等。
  • 设置压缩和解压缩的注册类CompressorRegistry和DecompressorRegistry以便可以按照指定的压缩算法对HTTP/2消息做压缩和解压缩。
  • ClientCallImpl实例创建完成之后就可以调用ClientTransport创建HTTP/2 Client向gRPC服务端发起远程服务调用。

    1.3.4 基于Netty的HTTP/2 Client创建流程

    gRPC客户端底层基于Netty4.1的HTTP/2协议栈框架构建以便可以使用HTTP/2协议来承载RPC消息在满足标准化规范的前提下提升通信性能。

    gRPC HTTP/2协议栈客户端的关键实现是NettyClientTransport和NettyClientHandler客户端初始化流程如下所示

    流程关键技术点解读:

  • **NettyClientHandler的创建**级联创建Netty的Http2FrameReader、Http2FrameWriter和Http2Connection用于构建基于Netty的gRPC HTTP/2客户端协议栈。
  • **HTTP/2 Client启动**仍然基于Netty的Bootstrap来初始化并启动客户端但是有两个细节需要注意
    • NettyClientHandler实际被包装成ProtocolNegotiator.Handler用于HTTP/2的握手协商创建之后不是由传统的ChannelInitializer在初始化Channel时将NettyClientHandler加入到pipeline中而是直接通过Bootstrap的handler方法直接加入到pipeline中以便可以立即接收发送任务。
    • 客户端使用的work线程组并非通常意义的EventLoopGroup而是一个EventLoop即HTTP/2客户端使用的work线程并非一组线程默认线程数为CPU内核 * 2而是一个EventLoop线程。这个其实也很容易理解一个NioEventLoop线程可以同时处理多个HTTP/2客户端连接它是多路复用的对于单个HTTP/2客户端如果默认独占一个work线程组将造成极大的资源浪费同时也可能会导致句柄溢出并发启动大量HTTP/2客户端
  • **WriteQueue创建**Netty的NioSocketChannel初始化并向Selector注册之后发起HTTP连接之前立即由NettyClientHandler创建WriteQueue用于接收并处理gRPC内部的各种Command例如链路关闭指令、发送Frame指令、发送Ping指令等。
  • HTTP/2 Client创建完成之后即可由客户端根据协商策略发起HTTP/2连接。如果连接创建成功后续即可复用该HTTP/2连接进行RPC调用。

    1.3.5 HTTP/2连接创建流程

    HTTP/2在TCP连接之初通过协商的方式进行通信只有协商成功才能进行后续的业务层数据发送和接收。

    HTTP/2的版本标识分为两类

    1. 基于TLS之上构架的HTTP/2, 即HTTPS使用h2表示ALPN0x68与0x32
    2. 直接在TCP之上构建的HTTP/2,即HTTP使用h2c表示。

    HTTP/2连接创建分为两种通过协商升级协议方式和直接连接方式。

    假如不知道服务端是否支持HTTP/2可以先使用HTTP/1.1进行协商,客户端发送协商请求消息(只含消息头),报文示例如下:

    GET / HTTP/1.1
    Host: 127.0.0.1
    Connection: Upgrade, HTTP2-Settings
    Upgrade: h2c
    HTTP2-Settings: <base64url encoding of HTTP/2 SETTINGS payload>
    
    

    服务端接收到协商请求之后如果不支持HTTP/2则直接按照HTTP/1.1响应返回双方通过HTTP/1.1进行通信,报文示例如下:

    HTTP/1.1 200 OK
    Content-Length: 28
    Content-Type: text/css
    
    body...
    
    

    如果服务端支持HTTP/2,则协商成功返回101结果码通知客户端一起升级到HTTP/2进行通信示例报文如下

    HTTP/1.1 101 Switching Protocols
    Connection: Upgrade
    Upgrade: h2c
    
    [ HTTP/2 connection...
    
    

    101响应之后服务需要发送SETTINGS帧作为连接序言客户端接收到101响应之后也必须发送一个序言作为回应示例如下

    PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n
    SETTINGS帧
    
    

    客户端序言发送完成之后可以不需要等待服务端的SETTINGS帧而直接发送业务请求Frame。

    假如客户端和服务端已经约定使用HTTP/2,则可以免去101协商和切换流程直接发起HTTP/2连接具体流程如下所示

    几个关键点:

    1. 如果已经明确知道服务端支持HTTP/2则可免去通过HTTP/1.1 101协议切换方式进行升级TCP连接建立之后即可发送序言否则只能在接收到服务端101响应之后发送序言
    2. 针对一个连接服务端第一个要发送的帧必须是SETTINGS帧连接序言所包含的SETTINGS帧可以为空
    3. 客户端可以在发送完序言之后发送应用帧数据不用等待来自服务器端的序言SETTINGS帧。

    gRPC支持三种Protocol Negotiator策略

    1. **PlaintextNegotiator**明确服务端支持HTTP/2采用HTTP直接连接的方式与服务端建立HTTP/2连接省去101协议切换过程
    2. **PlaintextUpgradeNegotiator**不清楚服务端是否支持HTTP/2采用HTTP/1.1协商模式切换升级到HTTP/2
    3. **TlsNegotiator**在TLS之上构建HTTP/2协商采用ALPN扩展协议以"h2"作为协议标识符。

    下面我们以PlaintextNegotiator为例了解下基于Netty的HTTP/2连接创建流程

    1.3.6 负载均衡策略

    总体上看RPC的负载均衡策略有两大类

    1. 服务端负载均衡(例如代理模式、外部负载均衡服务)
    2. 客户端负载均衡(内置负载均衡策略和算法,客户端实现)

    外部负载均衡模式如下所示:

    以代理LB模式为例RPC客户端向负载均衡代理发送请求负载均衡代理按照指定的路由策略将请求消息转发到后端可用的服务实例上。负载均衡代理负责维护后端可用的服务列表如果发现某个服务不可用则将其剔除出路由表。

    代理LB模式的优点是客户端不需要实现负载均衡策略算法也不需要维护后端的服务列表信息不直接跟后端的服务进行通信在做网络安全边界隔离时非常实用。例如通过Nginx做L7层负载均衡将互联网前端的流量安全的接入到后端服务中。

    代理LB模式通常支持L4Transport和L7Application)层负载均衡两者各有优缺点可以根据RPC的协议特点灵活选择。L4/L7层负载均衡对应场景如下

    • **L4层**对时延要求苛刻、资源损耗少、RPC本身采用私有TCP协议
    • **L7层**有会话状态的连接、HTTP协议簇例如Restful

    客户端负载均衡策略由客户端内置负载均衡能力通过静态配置、域名解析服务例如DNS服务、订阅发布例如Zookeeper服务注册中心等方式获取RPC服务端地址列表并将地址列表缓存到客户端内存中。

    每次RPC调用时根据客户端配置的负载均衡策略由负载均衡算法从缓存的服务地址列表中选择一个服务实例发起RPC调用。

    客户端负载均衡策略工作原理示例如下:

    gRPC默认采用客户端负载均衡策略同时提供了扩展机制使用者通过自定义实现NameResolver和LoadBalancer即可覆盖gRPC默认的负载均衡策略实现自定义路由策略的扩展。

    gRPC提供的负载均衡策略实现类如下所示

    • **PickFirstBalancer**无负载均衡能力,即使有多个服务端地址可用,也只选择第一个地址;
    • RoundRobinLoadBalancer"RoundRobin"负载均衡策略。

    gRPC负载均衡流程如下所示

    流程关键技术点解读:

  • 负载均衡功能模块的输入是客户端指定的hostName、需要调用的接口名和方法名等参数输出是执行负载均衡算法后获得的NettyClientTransport通过NettyClientTransport可以创建基于Netty HTTP/2的gRPC客户端发起RPC调用
  • gRPC系统默认提供的是DnsNameResolver它通过InetAddress.getAllByName(host)获取指定host的IP地址列表本地DNS服务对于扩展者而言可以继承NameResolver实现自定义的地址解析服务例如使用Zookeeper替换DnsNameResolver把Zookeeper作为动态的服务地址配置中心它的伪代码示例如下 **第一步:**继承NameResolver实现start(Listener listener)方法:
  • void start(Listener listener)
    {
     //获取ZooKeeper地址并连接
     //创建Watcher并实现process(WatchedEvent event),监听地址变更
     //根据接口名和方法名调用getChildren方法获取发布该服务的地址列表
    //将地址列表加到List中
    // 调用NameResolver.Listener.onAddresses(),通知地址解析完成
    
    

    **第二步:**创建ManagedChannelBuilder时指定Target的地址为Zookeeper服务端地址同时设置nameResolver为Zookeeper NameResolver,示例代码如下所示:

    this(ManagedChannelBuilder.forTarget(zookeeperAddr)
            .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
            .nameResolverFactory(new ZookeeperNameResolverProvider())
            .usePlaintext(false));
    
    
    1. LoadBalancer负责从nameResolver中解析获得的服务端URL中按照指定路由策略选择一个目标服务端地址并创建ClientTransport。同样可以通过覆盖handleResolvedAddressGroups实现自定义负载均衡策略。

    通过LoadBalancer + NameResolver可以实现灵活的负载均衡策略扩展。例如基于Zookeeper、etcd的分布式配置服务中心方案。

    1.3.7 RPC请求消息发送流程

    gRPC默认基于Netty HTTP/2 + PB进行RPC调用请求消息发送流程如下所示

    流程关键技术点解读:

    1. ClientCallImpl的sendMessage调用主要完成了请求对象的序列化基于PB、HTTP/2 Frame的初始化
    2. ClientCallImpl的halfClose调用将客户端准备就绪的请求Frame封装成自定义的SendGrpcFrameCommand写入到WriteQueue中
    3. WriteQueue执行flush()将SendGrpcFrameCommand写入到Netty的Channel中调用Channel的write方法被NettyClientHandler拦截到由NettyClientHandler负责具体的发送操作
    4. NettyClientHandler调用Http2ConnectionEncoder的writeData方法将Frame写入到HTTP/2 Stream中完成请求消息的发送。

    1.3.8 RPC响应接收和处理流程

    gRPC客户端响应消息的接收入口是NettyClientHandler它的处理流程如下所示

    流程关键技术点解读:

    1. NettyClientHandler的onHeadersRead(int streamId, Http2Headers headers, boolean endStream)方法会被调用两次根据endStream判断是否是Stream结尾
    2. 请求和响应的关联根据streamId可以关联同一个HTTP/2 Stream将NettyClientStream缓存到Stream中客户端就可以在接收到响应消息头或消息体时还原出NettyClientStream进行后续处理
    3. RPC客户端调用线程的阻塞和唤醒使用到了GrpcFuture的wait和notify机制来实现客户端调用线程的同步阻塞和唤醒
    4. 客户端和服务端的HTTP/2 Header和Data Frame解析共用同一个方法即MessageDeframer的deliver()。

    客户端源码分析

    gRPC客户端调用原理并不复杂但是代码却相对比较繁杂。下面围绕关键的类库对主要功能点进行源码分析。

    NettyClientTransport功能和源码分析

    NettyClientTransport的主要功能如下

    • 通过start(Listener transportListener) 创建HTTP/2 Client并连接gRPC服务端
    • 通过newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions) 创建ClientStream
    • 通过shutdown() 关闭底层的HTTP/2连接。

    以启动HTTP/2客户端为例进行讲解NettyClientTransport类

    EventLoop eventLoop = group.next();
        if (keepAliveTimeNanos != KEEPALIVE_TIME_NANOS_DISABLED) {
          keepAliveManager = new KeepAliveManager(
              new ClientKeepAlivePinger(this), eventLoop, keepAliveTimeNanos, keepAliveTimeoutNanos,
              keepAliveWithoutCalls);
        }
        handler = NettyClientHandler.newHandler(lifecycleManager, keepAliveManager, flowControlWindow,
            maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
        HandlerSettings.setAutoWindow(handler);
        negotiationHandler = negotiator.newHandler(handler);
    
    

    根据启动时配置的HTTP/2协商策略以NettyClientHandler为参数创建ProtocolNegotiator.Handler。

    创建Bootstrap并设置EventLoopGroup需要指出的是此处并没有使用EventLoopGroup而是它的一种实现类EventLoop原因在前文中已经说明相关代码示例如下NettyClientTransport类

    Bootstrap b = new Bootstrap();
        b.group(eventLoop);
        b.channel(channelType);
        if (NioSocketChannel.class.isAssignableFrom(channelType)) {
          b.option(SO_KEEPALIVE, true);
        }
    
    

    创建WriteQueue并设置到NettyClientHandler中用于接收内部的各种QueuedCommand初始化完成之后发起HTTP/2连接代码如下NettyClientTransport类

    handler.startWriteQueue(channel);
        channel.connect(address).addListener(new ChannelFutureListener() {
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
              ChannelHandlerContext ctx = future.channel().pipeline().context(handler);
              if (ctx != null) {
                ctx.fireExceptionCaught(future.cause());
              }
              future.channel().pipeline().fireExceptionCaught(future.cause());
            }
    
    

    2.2 NettyClientHandler功能和源码分析

    NettyClientHandler继承自Netty的Http2ConnectionHandler是gRPC接收和发送HTTP/2消息的关键实现类也是gRPC和Netty的交互桥梁它的主要功能如下所示

    • 发送各种协议消息给gRPC服务端
    • 接收gRPC服务端返回的应答消息头、消息体和其它协议消息
    • 处理HTTP/2协议相关的指令例如StreamError、ConnectionError等。

    协议消息的发送无论是业务请求消息还是协议指令消息都统一封装成QueuedCommand由NettyClientHandler拦截并处理相关代码如下所示NettyClientHandler类

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
              throws Exception {
        if (msg instanceof CreateStreamCommand) {
          createStream((CreateStreamCommand) msg, promise);
        } else if (msg instanceof SendGrpcFrameCommand) {
          sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg, promise);
        } else if (msg instanceof CancelClientStreamCommand) {
          cancelStream(ctx, (CancelClientStreamCommand) msg, promise);
        } else if (msg instanceof SendPingCommand) {
          sendPingFrame(ctx, (SendPingCommand) msg, promise);
        } else if (msg instanceof GracefulCloseCommand) {
          gracefulClose(ctx, (GracefulCloseCommand) msg, promise);
        } else if (msg instanceof ForcefulCloseCommand) {
          forcefulClose(ctx, (ForcefulCloseCommand) msg, promise);
        } else if (msg == NOOP_MESSAGE) {
          ctx.write(Unpooled.EMPTY_BUFFER, promise);
        } else {
          throw new AssertionError("Write called for unexpected type: " + msg.getClass().getName());
        }
    
    

    协议消息的接收NettyClientHandler通过向Http2ConnectionDecoder注册FrameListener来监听RPC响应消息和协议指令消息相关接口如下

    FrameListener回调NettyClientHandler的相关方法实现协议消息的接收和处理

    需要指出的是NettyClientHandler 并没有实现所有的回调接口对于需要特殊处理的几个方法进行了重载例如onDataRead和onHeadersRead。

    2.3 ProtocolNegotiator功能和源码分析

    ProtocolNegotiator用于HTTP/2连接创建的协商gRPC支持三种策略并有三个实现子类

    gRPC的ProtocolNegotiator实现类完全遵循HTTP/2相关规范以PlaintextUpgradeNegotiator为例通过设置Http2ClientUpgradeCodec用于101协商和协议升级相关代码如下所示PlaintextUpgradeNegotiator类

    public Handler newHandler(GrpcHttp2ConnectionHandler handler) {
          Http2ClientUpgradeCodec upgradeCodec = new Http2ClientUpgradeCodec(handler);
          HttpClientCodec httpClientCodec = new HttpClientCodec();
          final HttpClientUpgradeHandler upgrader =
              new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
          return new BufferingHttp2UpgradeHandler(upgrader);
        }
    
    

    2.4 LoadBalancer功能和源码分析

    LoadBalancer负责客户端负载均衡它是个抽象类gRPC框架的使用者可以通过继承的方式进行扩展。

    gRPC当前已经支持PickFirstBalancer和RoundRobinLoadBalancer两种负载均衡策略未来不排除会提供更多的策略。

    以RoundRobinLoadBalancer为例它的工作原理如下根据PickSubchannelArgs来选择一个SubchannelRoundRobinLoadBalancerFactory类

    public PickResult pickSubchannel(PickSubchannelArgs args) {
          if (size > 0) {
            return PickResult.withSubchannel(nextSubchannel());
          }
          if (status != null) {
            return PickResult.withError(status);
          }
          return PickResult.withNoResult();
        }
    
    

    再看下Subchannel的选择算法Picker类

    private Subchannel nextSubchannel() {
          if (size == 0) {
            throw new NoSuchElementException();
          }
          synchronized (this) {
            Subchannel val = list.get(index);
            index++;
            if (index >= size) {
              index = 0;
            }
            return val;
          }
        }
    
    

    即通过顺序的方式从服务端列表中获取一个Subchannel。
    如果用户需要定制负载均衡策略则可以在RPC调用时使用如下代码HelloWorldClient类

    this(ManagedChannelBuilder.forAddress(host, port).loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance()).nameResolverFactory(new ZkNameResolverProvider()) .usePlaintext(true));
    
    

    2.5 ClientCalls功能和源码分析

    ClientCalls提供了各种RPC调用方式包括同步、异步、Streaming和Unary方式等相关方法如下所示

    下面一起看下RPC请求消息的发送和应答接收相关代码。

    2.5.1 RPC请求调用源码分析

    请求调用主要有两步请求Frame构造和Frame发送请求Frame构造代码如下所示ClientCallImpl类

    public void sendMessage(ReqT message) {
        Preconditions.checkState(stream != null, "Not started");
        Preconditions.checkState(!cancelCalled, "call was cancelled");
        Preconditions.checkState(!halfCloseCalled, "call was half-closed");
        try {
          InputStream messageIs = method.streamRequest(message);
          stream.writeMessage(messageIs);
    ...
    
    

    使用PB对请求消息做序列化生成InputStream构造请求Frame:

    private int writeUncompressed(InputStream message, int messageLength) throws IOException {
        if (messageLength != -1) {
          statsTraceCtx.outboundWireSize(messageLength);
          return writeKnownLengthUncompressed(message, messageLength);
        }
        BufferChainOutputStream bufferChain = new BufferChainOutputStream();
        int written = writeToOutputStream(message, bufferChain);
        if (maxOutboundMessageSize >= 0 && written > maxOutboundMessageSize) {
          throw Status.INTERNAL
              .withDescription(
                  String.format("message too large %d > %d", written , maxOutboundMessageSize))
              .asRuntimeException();
        }
        writeBufferChain(bufferChain, false);
        return written;
    }
    
    

    Frame发送代码如下所示

    public void writeFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
          ByteBuf bytebuf = frame == null ? EMPTY_BUFFER : ((NettyWritableBuffer) frame).bytebuf();
          final int numBytes = bytebuf.readableBytes();
          if (numBytes > 0) {
            onSendingBytes(numBytes);
            writeQueue.enqueue(
                new SendGrpcFrameCommand(transportState(), bytebuf, endOfStream),
                channel.newPromise().addListener(new ChannelFutureListener() {
                  @Override
                  public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                      transportState().onSentBytes(numBytes);
                    }
                  }
                }), flush);
    
    

    NettyClientHandler接收到发送事件之后调用Http2ConnectionEncoder将Frame写入Netty HTTP/2协议栈NettyClientHandler类

    private void sendGrpcFrame(ChannelHandlerContext ctx, SendGrpcFrameCommand cmd,
          ChannelPromise promise) {
        encoder().writeData(ctx, cmd.streamId(), cmd.content(), 0, cmd.endStream(), promise);
      }
    
    

    2.5.2 RPC响应接收和处理源码分析

    响应消息的接收入口是NettyClientHandler包括HTTP/2 Header和HTTP/2 DATA Frame两部分代码如下NettyClientHandler类

    private void onHeadersRead(int streamId, Http2Headers headers, boolean endStream) {
        NettyClientStream.TransportState stream = clientStream(requireHttp2Stream(streamId));
        stream.transportHeadersReceived(headers, endStream);
        if (keepAliveManager != null) {
          keepAliveManager.onDataReceived();
        }
      }
    
    

    如果参数endStream为True说明Stream已经结束调用transportTrailersReceived通知Listener close代码如下所示AbstractClientStream2类

    if (stopDelivery || isDeframerStalled()) {
            deliveryStalledTask = null;
            closeListener(status, trailers);
          } else {
            deliveryStalledTask = new Runnable() {
              @Override
              public void run() {
                closeListener(status, trailers);
              }
            };
          }
    
    

    读取到HTTP/2 DATA Frame之后调用MessageDeframer的deliver对Frame进行解析代码如下MessageDeframer类

    private void deliver() {
        if (inDelivery) {
          return;
        }
        inDelivery = true;
        try {
              while (pendingDeliveries > 0 && readRequiredBytes()) {
            switch (state) {
              case HEADER:
                processHeader();
                break;
              case BODY:
                processBody();
    ...
    
    

    将Frame 转换成InputStream之后通知ClientStreamListenerImpl调用messageRead(final InputStream message)将InputStream反序列化为响应对象相关代码如下所示ClientStreamListenerImpl类

    public void messageRead(final InputStream message) {
          class MessageRead extends ContextRunnable {
            MessageRead() {
              super(context);
            }
            @Override
            public final void runInContext() {
              try {
                if (closed) {
                  return;
                }
                try {
                  observer.onMessage(method.parseResponse(message));
                } finally {
                  message.close();
                }
    
    

    当接收到endOfStream之后通知ClientStreamListenerImpl调用它的close方法如下所示ClientStreamListenerImpl类

    private void close(Status status, Metadata trailers) {
          closed = true;
          cancelListenersShouldBeRemoved = true;
          try {
            closeObserver(observer, status, trailers);
          } finally {
            removeContextListenerAndCancelDeadlineFuture();
          }
        }
    
    

    最终调用UnaryStreamToFuture的onClose方法set响应对象唤醒阻塞的调用方线程完成RPC调用代码如下UnaryStreamToFuture类

    public void onClose(Status status, Metadata trailers) {
          if (status.isOk()) {
            if (value == null) {
              responseFuture.setException(
                  Status.INTERNAL.withDescription("No value received for unary call")
                      .asRuntimeException(trailers));
            }
            responseFuture.set(value);
          } else {
            responseFuture.setException(status.asRuntimeException(trailers));
          }
    
    

    源代码下载地址:

    链接: https://github.com/geektime-geekbang/gRPC_LLF/tree/master