ASP.NET Core 3.0 gRPC 双向流

目录

一.前言

本文来源:http://www.sss088.com/www_shcaoan_com/

太阳城申博官网登入,它并不能代表中国共产党一贯的文化政策和主张,而是一次重大的错误。2015年国家主席习近平在亚太经合组织工商领导人峰会上表示,“中国是亚太大家庭的一员,中国的发展起步于亚太,得益于亚太,也将继续立足亚太、造福亚太。“能够从几百上千人中脱颖而出,这说明孙涌非常优秀。事情正好相反。

如果您在本网站上发表评论,您即授权爱丽时尚网和他的关联企业在世界范围内的任何媒体上去使用、重新二手求购、修改、采用、出版、翻译、派生、分发及播出该评论的权利。上轮1比0拿下水晶宫之后,切尔西已提前三轮夺冠,本场比赛可谓无欲无求,而利物浦仍存在理论上争四的可能,此役唯有全力争胜。  沪市成交10793亿,平量----是稳健的标识,自1区出现近百亿净流出之后,买盘逐步堆积,收盘主力资金净流入8亿。若医护人员不及时举报,或者与病人密切接触者拒绝接受检查,可被处以200万韩元(约合人民币1.12万元)罚款。

此外,相比于欧洲近几年不够景气的经济,中国市场则显得活力十足,外国模特在这里能够接触到各种类型的工作机会,工作起来更有活力,也更有利于职业生涯的发展。  在此前的5月,教育部还曾发布《2013年具有普通高等学历教育招生资格的高等学校名单》。  株洲是老工业基地,10年前,这里的企业粗放生产、污染严重。  因事业发展需要,现面向海内外诚聘具有强烈事业心、较强创新能力和良好团队意识,热爱青少年事业的贤士加盟。

在前一文 《ASP.NET Core 3.0 使用gRPC》中有提到 gRPC 支持双向流调用,支持实时推送消息,这也是 gRPC的一大特点,且 gRPC 在对双向流的控制支持上也是非常强大的。

二. 什么是 gRPC 流

gRPC 有四种服务类型,分别是:简单 RPC(Unary RPC)、服务端流式 RPC (Server streaming RPC)、客户端流式 RPC (Client streaming RPC)、双向流式 RPC(Bi-directional streaming RPC)。它们主要有以下特点:

服务类型 特点
简单 RPC 一般的rpc调用,传入一个请求对象,返回一个返回对象
服务端流式 RPC 传入一个请求对象,服务端可以返回多个结果对象
客户端流式 RPC 客户端传入多个请求对象,服务端返回一个结果对象
双向流式 RPC 结合客户端流式RPC和服务端流式RPC,可以传入多个请求对象,返回多个结果对象

三.为什么 gRPC 支持流

gRPC 通信是基于 HTTP/2 实现的,它的双向流映射到 HTTP/2 流。HTTP/2 具有流的概念,流是为了实现HTTP/2的多路复用。流是服务器和客户端在HTTP/2连接内用于交换帧数据的独立双向序列,逻辑上可看做一个较为完整的交互处理单元,即表达一次完整的资源请求、响应数据交换流程;一个业务处理单元,在一个流内进行处理完毕,这个流生命周期完结。

特点如下:

  • 一个HTTP/2连接可同时保持多个打开的流,任一端点交换帧
  • 流可被客户端或服务器单独或共享创建和使用
  • 流可被任一端关闭
  • 在流内发送和接收数据都要按照顺序
  • 流的标识符自然数表示,1~2^31-1区间,有创建流的终端分配
  • 流与流之间逻辑上是并行、独立存在

摘自 HTTP/2笔记之流和多路复用 by 聂永

四.gRPC中使用双向流调用

我们在前文中编写的RPC属于简单RPC,没有包含流调用,下面直接讲一下双向流,根据第二小节列举的四种服务类型,如果我们掌握了简单RPC和双向流RPC,那么服务端流式 RPC和客户端流式 RPC自然也就没有问题了。

这里我们继续使用前文的代码,要实现的目标是一次给多个猫洗澡。

① 首先在 LuCat.proto 定义两个rpc,一个 Count 用于统计猫的数量,一个 双向流 RPC BathTheCat 用于给猫洗澡

syntax = "proto3";

option csharp_namespace = "AspNetCoregRpcService";

import "google/protobuf/empty.proto";
package LuCat; /定义包名

/定义服务
service LuCat{
    /定义给猫洗澡双向流rpc
    rpc BathTheCat(stream BathTheCatReq) returns ( stream BathTheCatResp);
    /定义统计猫数量简单rpc
    rpc Count(google.protobuf.Empty) returns (CountCatResult);
}

message SuckingCatResult{
    string message=1;
}
message BathTheCatReq{
    int32 id=1;
}
message BathTheCatResp{
    string message=1;
}
message CountCatResult{
    int32 Count=1;
}

② 添加服务的实现

这里安利下Resharper,非常方便

private readonly ILogger<LuCatService> _logger;
private static readonly List<string> Cats=new List<string>(){"英短银渐层","英短金渐层","美短","蓝猫","狸花猫","橘猫"};
private static readonly Random Rand=new Random(DateTime.Now.Millisecond);

public LuCatService(ILogger<LuCatService> logger)
{
    _logger = logger;
}

public override async Task BathTheCat(IAsyncStreamReader<BathTheCatReq> requestStream, IServerStreamWriter<BathTheCatResp> responseStream, ServerCallContext context)
{
    var bathQueue=new Queue<int>();
    while (await requestStream.MoveNext())
    {
        /将要洗澡的猫加入队列
        bathQueue.Enqueue(requestStream.Current.Id);

        _logger.LogInformation($"Cat {requestStream.Current.Id} Enqueue.");
    }

    /遍历队列开始洗澡
    while (bathQueue.TryDequeue(out var catId))
    {
        await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });

        await Task.Delay(500);/此处主要是为了方便客户端能看出流调用的效果
    }
}

public override Task<CountCatResult> Count(Empty request, ServerCallContext context)
{
    return Task.FromResult(new CountCatResult()
    {
        Count = Cats.Count
    });
}

BathTheCat 方法会接收多个客户端发来的CatId,然后将他们加入队列中,等客户端发送完成后开始依次洗澡并返回给客户端。

③ 客户端实现

随机发送10个猫Id给服务端,然后接收10个洗澡结果。

var channel = GrpcChannel.ForAddress("/localhost:5001");
var catClient = new LuCat.LuCatClient(channel);
/获取猫总数
var catCount = await catClient.CountAsync(new Empty());
Console.WriteLine($"一共{catCount.Count}只猫。");
var rand = new Random(DateTime.Now.Millisecond);

var bathCat = catClient.BathTheCat();
/定义接收吸猫响应逻辑
var bathCatRespTask = Task.Run(async() =>
{
    await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
    {
        Console.WriteLine(resp.Message);
    }
});
/随机给10个猫洗澡
for (int i = 0; i < 10; i++)
{
    await bathCat.RequestStream.WriteAsync(new BathTheCatReq() {Id = rand.Next(0, catCount.Count)});
}
/发送完毕
await bathCat.RequestStream.CompleteAsync();
Console.WriteLine("客户端已发送完10个需要洗澡的猫id");
Console.WriteLine("接收洗澡结果:");
/开始接收响应
await bathCatRespTask;

Console.WriteLine("洗澡完毕");

④ 运行

可以看到双向流调用成功,客户端发送了10个猫洗澡请求对象,服务端返回了10个猫洗澡结果对象。且是实时推送的,这就是 gRPC 的双向流调用。

这里大家可以自行改进来演变成客户端流式或者服务端流式调用。客户端发送一个猫Id列表,然后服务端返回每个猫洗澡结果,这就是服务端流式调用。客户端依次发送猫Id,然后服务端一次性返回所有猫的洗澡结果(给所有猫洗澡看做是一个业务,返回这个业务的结果),就是客户端流式调用。这里我就不再演示了。

五.流控制

gRPC 的流式调用支持对流进行主动取消的控制,进而可以衍生出流超时限制等控制。

在流式调用是,可以传一个 CancellationToken 参数,它就是我们用来对流进行取消控制的:

1569467990882

改造一下我们在第四小节的代码:

① 客户端

var cts = new CancellationTokenSource();
/指定在2.5s后进行取消操作
cts.CancelAfter(TimeSpan.FromSeconds(2.5));
var bathCat = catClient.BathTheCat(cancellationToken: cts.Token);
/定义接收吸猫响应逻辑
var bathCatRespTask = Task.Run(async() =>
{
    try
    {
        await foreach (var resp in bathCat.ResponseStream.ReadAllAsync())
        {
            Console.WriteLine(resp.Message);
        }
    }
    catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
    {
        Console.WriteLine("Stream cancelled.");
    }
});

② 服务端

/遍历队列开始洗澡
while (!context.CancellationToken.IsCancellationRequested && bathQueue.TryDequeue(out var catId))
{
    _logger.LogInformation($"Cat {catId} Dequeue.");
    await responseStream.WriteAsync(new BathTheCatResp() { Message = $"铲屎的成功给一只{Cats[catId]}洗了澡!" });

    await Task.Delay(500);/此处主要是为了方便客户端能看出流调用的效果
}

③ 运行

设置的是双向流式调用2.5s后取消流,从客户端调用结果看到,并没有收到全部10个猫的洗澡返回结果,流就已经被取消了,这就是 gRPC 的流控制。

六.结束

这里流式调用可以实现实时推送,服务端到客户端或者客户端到服务端短实时推送消息,但是这个和传统意义上的长连接主动推送、广播消息不一样,不管你是服务端流式、客户端流式还是双向流式,必须要由客户端进行发起,通过客户端请求来建立流通信。

七.参考资料

posted @ 2019-09-26 12:28 晓晨Master 阅读(...) 评论(...) 编辑 收藏

旧版申博会员注册 www.tyc123.com 菲律宾申博代理开户合作 www.88msc.com 澳门美高梅游戏登入 申博真人游戏登入
申博开户 电子游戏支付宝充值 申博138官网登录登入 www.81138.com 申博app手机直营网 菲律宾申博官方直营网
网上百家乐登入 申博娱乐网址 www.msc66.com 申博现金网直营网 www.285msc.com 申博真人游戏直营网