GRPC教程 3- 流式GRPC與錯(cuò)誤處理
解決vscode中,一個(gè)目錄下多個(gè)mod文件的問題,怎么解決呢? 如何 在一個(gè)目錄下正常的有多個(gè).mod 文件
使用 go work init 命令
本篇文章開始,我們將要開始學(xué)習(xí)流式GRPC與GRPC的錯(cuò)誤處理
流式GRPC
什么是流式GRPC呢?
和之前我們寫的普通的RPC服務(wù)寫入直接返回不同,流式GRPC允許我們在一個(gè)RPC請求中建立一個(gè)Stream(流),客戶端和服務(wù)器端都可以向這個(gè)流中寫入數(shù)據(jù),當(dāng)客戶端寫入數(shù)據(jù)時(shí),服務(wù)器端只需要不斷監(jiān)聽這個(gè)流就可以不斷獲取客戶端發(fā)送的消息,直到關(guān)閉。
首先我們先說說HTTP/2,GRPC的底層就是HTTP/2協(xié)議,HTTP2支持服務(wù)器端主動(dòng)向客戶端去發(fā)送流數(shù)據(jù)。
舉例:
// proto文件,在Greetering服務(wù)中加一行
?rpc StreamHello (HelloReq) returns (stream HelloResp);
然后proto文件就變成了
// syntax = "proto3";option go_package = "server/proto";package proto;// Hello Requestmessage HelloReq { ?string name = 1;
}// Hello Responsemessage HelloResp { ?string msg = 1;
}service Greetering { ?rpc Hello (HelloReq) returns (HelloResp); ?rpc StreamHello (HelloReq) returns (stream HelloResp);
}
接下來用我們上一篇文章的方式,借助插件,自動(dòng)生成go文件。
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative --grpc-gateway_out=. --grpc-gateway_opt=paths=source_relative
然后我們在server/server.go中實(shí)現(xiàn)StreamHello函數(shù)
func (s *server) StreamHello(in *proto.HelloReq, stream proto.Greetering_StreamHelloServer) error { ? ?for i := 10; i > 0; i-- {
? ? ? ?data := &proto.HelloResp{
? ? ? ? ? ?Msg: fmt.Sprintf("This is %d Msg", i),
? ? ? ?} ? ? ? ?if err := stream.Send(data); err != nil { ? ? ? ? ? ?return err
? ? ? ?}
? ?} ? ?return nil}
在client/client.go中加入
stream, err := client.StreamHello(context.Background(), &proto.HelloReq{Name: "Lixin"}) ? ?if err != nil { ? ? ? ?log.Fatal(err)
? ?} ? ?for {
? ? ? ?res, err := stream.Recv() ? ? ? ?if err == io.EOF { ? ? ? ? ? ?break
? ? ? ?} ? ? ? ?if err != nil { ? ? ? ? ? ?log.Fatal(err)
? ? ? ?}
? ? ? ?fmt.Println(res.GetMsg())
? ?}
運(yùn)行server.go代碼,client.go代碼就可以得到以下結(jié)果,我們的返回結(jié)果就是這樣
os getpid: ?330496This is 10 Msg
This is 9 Msg
This is 8 Msg
This is 7 Msg
This is 6 Msg
This is 5 Msg
This is 4 Msg
This is 3 Msg
This is 2 Msg
This is 1 Msg
接下來我們來改一改代碼,嘗試去使用客戶端的流式GRPC
//proto文件syntax = "proto3";option go_package = "server/proto";package proto;// Hello Requestmessage HelloReq { ?string name = 1;
}// Hello Responsemessage HelloResp { ?string msg = 1;
}service Greetering { ?rpc Hello (HelloReq) returns (HelloResp); ?rpc StreamHello (stream HelloReq) returns (HelloResp);
}
go的代碼
// server.gofunc (s *server) StreamHello(stream proto.Greetering_StreamHelloServer) error { ? ?for {
? ? ? ?res, err := stream.Recv() ? ? ? ?if err == io.EOF {
? ? ? ? ? ?stream.SendAndClose(&proto.HelloResp{
? ? ? ? ? ? ? ?Msg: "server end.",
? ? ? ? ? ?})
? ? ? ? ? ?fmt.Println("Message end.") ? ? ? ? ? ?break
? ? ? ?} ? ? ? ?if err != nil {
? ? ? ? ? ?log.Fatal(err)
? ? ? ?}
? ? ? ?fmt.Println(res.GetName())
? ?} ? ?return nil}
client.go
stream, err := client.StreamHello(context.Background()) ? ?if err != nil { ? ? ? ?log.Fatal(err)
? ?} ? ?for i := 0; i < 10; i++ {
? ? ? ?data := &proto.HelloReq{
? ? ? ? ? ?Name: fmt.Sprintf("This is %d msg from client.", i),
? ? ? ?} ? ? ? ?err = stream.Send(data) ? ? ? ?if err != nil { ? ? ? ? ? ?log.Fatal(err)
? ? ? ?}
? ?}
? ?res, err := stream.CloseAndRecv() ? ?if err != nil { ? ? ? ?log.Fatalf("c failed: %v", err)
? ?} ? ?log.Printf("got reply: %v", res.GetMsg())
雙向流式GRPC
修改proto文件加入rpc StreamHello (stream HelloReq) returns (stream HelloResp);
// server.gofunc (s *server) StreamHello(stream proto.Greetering_StreamHelloServer) error {
? ?signalch := make(chan os.Signal, 1)
? ?signal.Notify(signalch, os.Interrupt, syscall.SIGTERM)
? ?msg := ""
? ?go func() { ? ? ? ?for {
? ? ? ? ? ?fmt.Scanln(&msg)
? ? ? ? ? ?stream.Send(&proto.HelloResp{
? ? ? ? ? ? ? ?Msg: fmt.Sprint(msg),
? ? ? ? ? ?})
? ? ? ? ? ?msg = ""
? ? ? ?}
? ?}() ? ?go func() { ? ? ? ?for { ? ? ? ? ? ?// 接收流式請求
? ? ? ? ? ?in, err := stream.Recv() ? ? ? ? ? ?if err == io.EOF { ? ? ? ? ? ? ? ?return
? ? ? ? ? ?} ? ? ? ? ? ?if err != nil { ? ? ? ? ? ? ? ?return
? ? ? ? ? ?}
? ? ? ? ? ?fmt.Println("client: ", in.GetName())
? ? ? ?}
? ?}()
? ?signalType := <-signalch
? ?signal.Stop(signalch)
? ?fmt.Printf("Os Signal: <%s>", signalType)
? ?fmt.Println("Exit....") ? ?return nil}
// client.go
? ?msg := ""
? ?go func() { ? ? ? ?for {
? ? ? ? ? ?fmt.Scanln(&msg)
? ? ? ? ? ?stream.Send(&proto.HelloReq{
? ? ? ? ? ? ? ?Name: fmt.Sprint(msg),
? ? ? ? ? ?})
? ? ? ? ? ?msg = ""
? ? ? ?}
? ?}()
? ?go func() { ? ? ? ?for { ? ? ? ? ? ?// 接收流式請求 ? ? ? ? ? ?in, err := stream.Recv() ? ? ? ? ? ?if err == io.EOF {
? ? ? ? ? ? ? ?return
? ? ? ? ? ?} ? ? ? ? ? ?if err != nil {
? ? ? ? ? ? ? ?return
? ? ? ? ? ?}
? ? ? ? ? ?fmt.Println("server: ", in.GetMsg())
? ? ? ?}
? ?}()
? ?signalType := <-signalch
? ?signal.Stop(signalch) ? ?//cleanup before exit
? ?log.Printf("On Signal <%s>", signalType)
? ?log.Println("Exit command received. Exiting...")
GRPC對比WebSocket
WebSocket是HTML5新增的協(xié)議,它的目的是在瀏覽器和服務(wù)器之間建立一個(gè)不受限的雙向通信的通道,比如說,服務(wù)器可以在任意時(shí)刻發(fā)送消息給瀏覽器。
為什么傳統(tǒng)的HTTP協(xié)議不能做到WebSocket實(shí)現(xiàn)的功能?這是因?yàn)镠TTP協(xié)議是一個(gè)請求-響應(yīng)協(xié)議,請求必須先由瀏覽器發(fā)給服務(wù)器,服務(wù)器才能響應(yīng)這個(gè)請求,再把數(shù)據(jù)發(fā)送給瀏覽器。換句話說,瀏覽器不主動(dòng)請求,服務(wù)器是沒法主動(dòng)發(fā)數(shù)據(jù)給瀏覽器的。
這樣一來,要在瀏覽器中搞一個(gè)實(shí)時(shí)聊天,在線炒股(不鼓勵(lì)),或者在線多人游戲的話就沒法實(shí)現(xiàn)了,只能借助Flash這些插件。
也有人說,HTTP協(xié)議其實(shí)也能實(shí)現(xiàn)啊,比如用輪詢或者Comet。輪詢是指瀏覽器通過JavaScript啟動(dòng)一個(gè)定時(shí)器,然后以固定的間隔給服務(wù)器發(fā)請求,詢問服務(wù)器有沒有新消息。這個(gè)機(jī)制的缺點(diǎn)一是實(shí)時(shí)性不夠,二是頻繁的請求會(huì)給服務(wù)器帶來極大的壓力。
Comet本質(zhì)上也是輪詢,但是在沒有消息的情況下,服務(wù)器先拖一段時(shí)間,等到有消息了再回復(fù)。這個(gè)機(jī)制暫時(shí)地解決了實(shí)時(shí)性問題,但是它帶來了新的問題:以多線程模式運(yùn)行的服務(wù)器會(huì)讓大部分線程大部分時(shí)間都處于掛起狀態(tài),極大地浪費(fèi)服務(wù)器資源。另外,一個(gè)HTTP連接在長時(shí)間沒有數(shù)據(jù)傳輸?shù)那闆r下,鏈路上的任何一個(gè)網(wǎng)關(guān)都可能關(guān)閉這個(gè)連接,而網(wǎng)關(guān)是我們不可控的,這就要求Comet連接必須定期發(fā)一些ping數(shù)據(jù)表示連接“正常工作”。
以上兩種機(jī)制都治標(biāo)不治本,所以,HTML5推出了WebSocket標(biāo)準(zhǔn),讓瀏覽器和服務(wù)器之間可以建立無限制的全雙工通信,任何一方都可以主動(dòng)發(fā)消息給對方。
package mainimport ( ? ?"fmt"
? ?"log"
? ?"net/http"
? ?"github.com/gorilla/websocket")// We'll need to define an Upgrader// this will require a Read and Write buffer sizevar upgrader = websocket.Upgrader{
? ?ReadBufferSize: ?1024,
? ?WriteBufferSize: 1024,
}func wsEndpoint(w http.ResponseWriter, r *http.Request) {
? ?upgrader.CheckOrigin = func(r *http.Request) bool { return true } ? ?// upgrade this connection to a WebSocket
? ?// connection
? ?ws, err := upgrader.Upgrade(w, r, nil) ? ?if err != nil {
? ? ? ?log.Println(err)
? ?} ? ?// helpful log statement to show connections
? ?log.Println("Client Connected")
? ?reader(ws)
}func homePage(w http.ResponseWriter, r *http.Request) {
? ?fmt.Fprintf(w, "Home Page")
}// define a reader which will listen for// new messages being sent to our WebSocket// endpointfunc reader(conn *websocket.Conn) { ? ?for { ? ? ? ?// read in a message
? ? ? ?messageType, p, err := conn.ReadMessage() ? ? ? ?if err != nil {
? ? ? ? ? ?log.Println(err) ? ? ? ? ? ?return
? ? ? ?} ? ? ? ?// print out that message for clarity
? ? ? ?reply := fmt.Sprintf("server reply: %s", p)
? ? ? ?fmt.Println(reply) ? ? ? ?if err := conn.WriteMessage(messageType, []byte(reply)); err != nil {
? ? ? ? ? ?log.Println(err) ? ? ? ? ? ?return
? ? ? ?}
? ?}
}func setupRoutes() {
? ?http.HandleFunc("/", homePage)
? ?http.HandleFunc("/ws", wsEndpoint)
}func main() {
? ?fmt.Println("Hello World")
? ?setupRoutes()
? ?log.Fatal(http.ListenAndServe(":8080", nil))
}
流式 gRPC 和 WebSockets 都是用于實(shí)現(xiàn)客戶端和服務(wù)器之間的雙向通信,但它們有以下幾個(gè)區(qū)別:
協(xié)議:gRPC 是基于 HTTP/2 協(xié)議的,而 WebSocket 是一種獨(dú)立的協(xié)議。HTTP/2 是一個(gè)二進(jìn)制協(xié)議,可提供更好的性能和安全性。
語言支持:gRPC 支持多種語言,包括 Java、Python、Go 等,而 WebSocket 主要支持 Web 技術(shù)棧,如 JavaScript。
應(yīng)用場景:gRPC 通常用于在微服務(wù)架構(gòu)中進(jìn)行服務(wù)間通信,而 WebSocket 更多地用于實(shí)時(shí)通信應(yīng)用程序,如在線游戲或聊天應(yīng)用程序。
通信方式:在流式 gRPC 中,客戶端和服務(wù)器之間的通信是通過流來完成的,客戶端可以發(fā)送多個(gè)請求,服務(wù)器也可以發(fā)送多個(gè)響應(yīng)。而在 WebSocket 中,客戶端和服務(wù)器之間的通信是通過消息來完成的,消息可以是文本或二進(jìn)制數(shù)據(jù)。
總之,gRPC 和 WebSocket 都有其各自的優(yōu)勢和適用場景。選擇哪種技術(shù)應(yīng)該根據(jù)應(yīng)用程序的需求和設(shè)計(jì)來決定。
我們再來學(xué)一下GRPC的錯(cuò)誤處理
GRPC自己定義了一些常見的錯(cuò)誤碼,和我們可以在codes找到。 需要使用時(shí),需要引入codes包
https://pkg.go.dev/google.golang.org/grpc/codes
使用codes時(shí),需要配合status使用
import "google.golang.org/grpc/status"
GRPC的方法,一般是返回err或者status類型的錯(cuò)誤,然后調(diào)用GRPC的一方若err!=nil
我們可以通過status.Convert方法讀取對應(yīng)的錯(cuò)誤。
if err != nil {
s := status.Convert(err) ? ? ? ?// 將err轉(zhuǎn)為status
? ?for _, d := range s.Details() { // 獲取details
? ?switch info := d.(type) { ? ?case *errdetails.QuotaFailure:
? ? ? ?fmt.Printf("Quota failure: %s\n", info) ? ?default:
? ? ? ?fmt.Printf("Unexpected type: %s\n", info)
? ?}
}
// ?代碼中的例子// server.gofunc (s *server) StreamHello(stream proto.Greetering_StreamHelloServer) error { ? ?st := status.New(codes.Aborted, "error!!!!!!") ? ?ds, err := st.WithDetails(
? ? ? ?&errdetails.BadRequest{
? ? ? ? ? ?FieldViolations: []*errdetails.BadRequest_FieldViolation{
? ? ? ? ? ? ? ?{
? ? ? ? ? ? ? ? ? ?Description: "Bad Request",
? ? ? ? ? ? ? ? ? ?Field: "bad",
? ? ? ? ? ? ? ?},
? ? ? ? ? ?},
? ? ? ?},
? ?) ? ?if err != nil { ? ? ? ?return st.Err()
? ?} ? ?return ds.Err()
}
// client.go
? ?for { ? ? ? ?// 接收流式請求
? ? ? ?in, err := stream.Recv() ? ? ? ?if err == io.EOF {
? ? ? ? ? ?fmt.Println("braeak") ? ? ? ? ? ?return
? ? ? ?} ? ? ? ?if err != nil {
? ? ? ? ? ?s := status.Convert(err) ? ? ? ?// 將err轉(zhuǎn)為status
? ? ? ? ? ?for _, d := range s.Details() { // 獲取details
? ? ? ? ? ? ? ?fmt.Println(d) ? ? ? ? ? ? ? ?switch info := d.(type) { ? ? ? ? ? ? ? ?case *errdetails.BadRequest:
? ? ? ? ? ? ? ? ? ?fmt.Printf("BadRequest failure: %s\n", info) ? ? ? ? ? ? ? ?default:
? ? ? ? ? ? ? ? ? ?fmt.Println(info)
? ? ? ? ? ? ? ?}
? ? ? ? ? ?} ? ? ? ? ? ?return
? ? ? ?}
? ? ? ?fmt.Println("server: ", in.GetMsg())
? ?}
總結(jié)
在vscode下同一個(gè)目錄下多個(gè)go.mod文件解決 服務(wù)端流式GRPC 客戶端流式GRPC 雙端流式GRPC GRPC對比WebSocket GRPC的錯(cuò)誤處理
GRPC教程
Add tag…