在Go中实现gRPC服务端流,需在.proto文件定义返回stream的接口,生成代码后服务端使用Send()发送多条消息,客户端通过Recv()循环接收直至EOF,适用于日志推送等持续数据传输场景。

在Go语言中实现gRPC服务端流(Server Streaming RPC),核心是让服务器在接收到客户端请求后,持续向客户端发送多个消息。这种模式适用于日志推送、实时数据更新等场景。
定义.proto文件
首先,在.proto文件中定义服务接口。服务端流的特征是返回类型为stream:
syntax = "proto3";package example;
message Request { string query = 1; }
message Response { string message = 1; }
service DataService { rpc GetData(Request) returns (stream Response); }
使用protoc和插件生成Go代码:
立即学习“go语言免费学习笔记(深入)”;
protoc --go_out=. --go-grpc_out=. protofile.proto
实现服务端逻辑
在Go中实现服务端流,关键是使用生成的Send()方法逐个发送消息:
type server struct{}
func (s server) GetData(req example.Request, stream example.DataService_GetDataServer) error {
for i := 0; i < 5; i++ {
res := &example.Response{
Message: fmt.Sprintf("Message %d for %s", i+1, req.Query),
}
// 向客户端发送一条消息
if err := stream.Send(res); err != nil {
return err
}
time.Sleep(500 * time.Millisecond) // 模拟延迟
}
return nil
}
注意:服务端函数参数中的stream是生成的接口类型,包含Send()和Context()等方法。
启动gRPC服务器
标准的gRPC服务器启动流程:
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
example.RegisterDataServiceServer(s, &server{})
log.Println("gRPC server running on :50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}}
编写客户端接收流
客户端通过Recv()循环读取服务端发来的每一条消息:
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("connect failed: %v", err)
}
defer conn.Close()
client := example.NewDataServiceClient(conn)
req := &example.Request{Query: "test"}
stream, err := client.GetData(context.Background(), req)
if err != nil {
log.Fatalf("request failed: %v", err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
break // 流结束
}
if err != nil {
log.Fatalf("receive error: %v", err)
}
fmt.Println("Received:", res.Message)
}
客户端通过不断调用Recv()来获取消息,直到收到io.EOF表示流关闭。
基本上就这些。服务端流的关键是服务端控制发送节奏,客户端以流式方式接收。这种方式比单次响应更灵活,适合持续输出场景。










