gRPC 客戶端調(diào)用服務端需要連接池嗎?
發(fā)現(xiàn)的問題
在微服務開發(fā)中,gRPC 的應用絕對少不了,一般情況下,內(nèi)部微服務交互,通常是使用 RPC 進行通信,如果是外部通信的話,會提供 https 接口文檔
對于 gRPC 的基本使用可以查看文章 gRPC介紹
對于 gRPC ,我們需要基本知道如下的一些知識點:
gRPC 的基本四種模式的應用場景
請求響應模式
客戶端數(shù)據(jù)流模式
服務端數(shù)據(jù)流模式
雙向流模式
Proto 文件的定義和使用
gRPC 攔截器的應用 , 基本的可以查看這篇 gRPC 攔截器
實際上有客戶端攔截器 和 服務端攔截器,具體詳細的可以自行學習
gRPC 的設計原理細節(jié)
Go-Kit 的使用
當然今天并不是要聊 gRPC 的應用或者原理,而是想聊我們在開發(fā)過程中很容易遇到的問題:
未復用 gRPC 客戶端連接,影響性能
最近審查各個服務代碼中,發(fā)現(xiàn)整個部門使用 gRPC 客戶端請求服務端接口的時候,都是會新建一個連接,然后調(diào)用服務端接口,使用完畢之后就 close 掉, 例如這樣

這會有什么問題呢?
正常簡單的使用不會有啥問題,但如果是面臨高并發(fā)的情況,性能問題很容易就會出現(xiàn),例如我們在做性能測試的時候,就會發(fā)現(xiàn),打一會性能測試,客戶端請求服務端的時候就會報錯:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
實際去查看問題的時候,很明顯,這是 gRPC 的連接數(shù)被打滿了,很多連接都還未完全釋放
那這個時候,簡單思考一下,我們是沒有必要對于每一次客戶端請求服務端接口的時候,都新建立一次連接,并且調(diào)用完畢之后就馬上關(guān)閉連接
我們知道,gRPC 的通信本質(zhì)上也是 TCP 的連接,那么一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這么一個過程,如果我們頻繁的建立和釋放連接,這對于資源和性能其實都是一個大大的浪費
我們還知道 gRPC 是一個高性能、開源和擁有統(tǒng)一規(guī)定的 RPC框架,面向?qū)ο蟮?http/2 通信協(xié)議,能夠能節(jié)省空間和 IO 密集度的開銷 ,但是我們并沒有很好的將他運用起來,gRPC 服務端的連接管理不用我們操心,但是我們對于 gRPC 客戶端的連續(xù)非常有必要關(guān)心,咱們要想辦法復用客戶端的連接
gRPC 連接池
復用連接,我們可以使用連接池的方式
對于這種復用資源,我們其實也接觸了不少,例如復用線程 worker 的線程池,go 中的協(xié)程池 ..
簡單來說,連接池 ,就是提前創(chuàng)建好一定數(shù)量的 tcp 連接句柄放在池子中,咱們需要和外部通信的時候,就去池子中取一個連接來用,用完了之后,咱們就放回去
連接池解決了什么問題
很明顯,連接池解決了上述咱們頻繁創(chuàng)建連接和釋放連接帶來的資源和性能上的損耗,咱們節(jié)省了這部分開銷后,自然就提高了咱們的性能
可是我們再次思考一下,如果這個連接池子就是只能存放固定的連接,那么我們業(yè)務擴張的時候,豈不是光等待池子里面有空閑連接就會耗費大量的時間呢?
或者是池子過大,咱們需要的連接數(shù)較少,那么開辟那么多連接豈不是一種浪費?
那么我們在設計或者是應用連接池的時候,就需要考慮如下幾個方面了:
連接池是否支持擴縮容
空閑的連接是否支持超時自行關(guān)閉,是否支持?;?/strong>
池子滿的時候,處理的策略是什么樣的
其實關(guān)于連接池的設計和庫網(wǎng)上都很多,我們可以找一個案例來看看如何來使用連接池,以及它是如何來進行上述幾個方面的編碼落地的
如何去使用連接池
先來看看客戶端如何使用連接池

客戶端使用 pool
client/main.go
package?main
import?(
????????"context"
????????"flag"
????????"fmt"
????????"log"
????????"time"
????????"mypoolclient/pool"
????????"mypoolclient/pb"
)
var?addr?=?flag.String("addr",?"127.0.0.1:8888",?"the?address?to?connect?to")
func?main()?{
????????flag.Parse()
????????p,?err?:=?pool.New(*addr,?pool.DefaultOptions)
????????if?err?!=?nil?{
????????????????log.Fatalf("failed?to?new?pool:?%v",?err)
????????}
????????defer?p.Close()
????????conn,?err?:=?p.Get()
????????if?err?!=?nil?{
????????????????log.Fatalf("failed?to?get?conn:?%v",?err)
????????}
????????defer?conn.Close()
????????client?:=?pb.NewTestsvrClient(conn.Value())
????????ctx,?cancel?:=?context.WithTimeout(context.Background(),?5*time.Second)
????????defer?cancel()
????????res,?err?:=?client.Say(ctx,?&pb.TestReq{Message:?[]byte("hi")})
????????if?err?!=?nil?{
????????????????log.Fatalf("unexpected?error?from?Say:?%v",?err)
????????}
????????fmt.Println("rpc?response:",?res)
}
此處的客戶端,我們很明顯可以看出來,以前咱們使用客戶端去調(diào)用服務端接口的時候,總會不自覺的 Dial 一下建立連接
咱們使用連接池的話,就可以直接從池子里面拿一個連接出來直接使用即可
服務端
server/client.go
package?main
import?(
????????"context"
????????"flag"
????????"fmt"
????????"log"
????????"net"
????????"google.golang.org/grpc"
????????"mypoolserver/pb"
)
var?port?=?flag.Int("port",?8888,?"port?number")
//?server?implements?EchoServer.
type?server?struct{}
func?(s?*server)?Say(context.Context,?*pb.TestReq)?(*pb.TestRsp,?error)?{
????????fmt.Println("call??Say?...?")
????????return?&pb.TestRsp{Message:?[]byte("hello?world")},?nil
}
func?main()?{
????????flag.Parse()
????????listen,?err?:=?net.Listen("tcp",?fmt.Sprintf("127.0.0.1:%v",?*port))
????????if?err?!=?nil?{
????????????????log.Fatalf("failed?to?listen:?%v",?err)
????????}
????????s?:=?grpc.NewServer()
????????pb.RegisterTestsvrServer(s,?&server{})
????????fmt.Println("start?server?...")
????????if?err?:=?s.Serve(listen);?err?!=?nil?{
????????????????log.Fatalf("failed?to?serve:?%v",?err)
????????}
????????fmt.Println("over?server?...")
}
連接池的具體實現(xiàn)方式
連接池的具體實現(xiàn)方式,參考了 github https://github.com/shimingyah/pool
具體的實現(xiàn),都放在上述目錄的 pool 下面了 , 也可以訪問地址 : https://github.com/qingconglaixueit/mypoolapp
pool 包中包含了 3 個文件,作用如下:
.
├── conn.go
-- 關(guān)于 grpc 連接的結(jié)構(gòu)定義和方法實現(xiàn)
├── options.go
-- 攔截器的常量定義,以及 Dial 建立連接的簡單封裝, 這個文件可要可不要,看自己的需求
└── pool.go
-- 具體 pool 的接口定義和實現(xiàn)
直接來看 pool.go 中的接口定義
type?Pool?interface?{
???Get()?(Conn,?error)
???Close()?error
???Status()?string
}
Get()
獲取一個新的連接 , 當關(guān)閉連接的時候,會將該連接放入到池子中
Close()
關(guān)閉連接池,自然連接池子中的連接也不再可用
關(guān)于 pool 結(jié)構(gòu)的定義 ,conn 結(jié)構(gòu)的定義建議,將上述 github 地址上的源碼下載下來進行閱讀,下面主要是分享關(guān)于
連接池子的創(chuàng)建,擴縮容,釋放
具體 TCP 連接的創(chuàng)建和釋放
創(chuàng)建連接池
func?New(address?string,?option?Options)?(Pool,?error)?{
???if?address?==?""?{
??????return?nil,?errors.New("invalid?address?settings")
???}
???if?option.Dial?==?nil?{
??????return?nil,?errors.New("invalid?dial?settings")
???}
???if?option.MaxIdle?<=?0?||?option.MaxActive?<=?0?||?option.MaxIdle?>?option.MaxActive?{
??????return?nil,?errors.New("invalid?maximum?settings")
???}
???if?option.MaxConcurrentStreams?<=?0?{
??????return?nil,?errors.New("invalid?maximun?settings")
???}
???p?:=?&pool{
??????index:???0,
??????current:?int32(option.MaxIdle),
??????ref:?????0,
??????opt:?????option,
??????conns:???make([]*conn,?option.MaxActive),
??????address:?address,
??????closed:??0,
???}
???for?i?:=?0;?i?<?p.opt.MaxIdle;?i++?{
??????c,?err?:=?p.opt.Dial(address)
??????if?err?!=?nil?{
?????????p.Close()
?????????return?nil,?fmt.Errorf("dial?is?not?able?to?fill?the?pool:?%s",?err)
??????}
??????p.conns[i]?=?p.wrapConn(c,?false)
???}
???log.Printf("new?pool?success:?%v\n",?p.Status())
???return?p,?nil
}
關(guān)于 pool 的接口,可以看成是這樣的

對于創(chuàng)建連接池,除了校驗基本的參數(shù)以外,我們知道池子其實是一個 TCP 連接的切片,長度為 option.MaxActive 即最大的活躍連接數(shù)
p.conns[i] = p.wrapConn(c, false)
表示咱們初始化一個連接,并放到連接池中,且初始化的 once 參數(shù)置為 false,表示該連接默認保存在池子中,不被銷毀
換句話說,當我們需要真實銷毀連接池中的連接的時候,就將該鏈接的 once 參數(shù)置為 false 即可,實際上也無需我們使用這去做這一步
實際上 關(guān)于每一個連接的建立也是在 New 里面完成的,只要有 1 個連接未建立成功,那么咱們的連接池就算是建立失敗,咱們會調(diào)用 p.Close() 將之前建立好的連接全部釋放掉
//?關(guān)閉連接池
func?(p?*pool)?Close()?error?{
???atomic.StoreInt32(&p.closed,?1)
???atomic.StoreUint32(&p.index,?0)
???atomic.StoreInt32(&p.current,?0)
???atomic.StoreInt32(&p.ref,?0)
???p.deleteFrom(0)
???log.Printf("close?pool?success:?%v\n",?p.Status())
???return?nil
}
//?清除從?指定位置開始到?MaxActive?之間的連接
func?(p?*pool)?deleteFrom(begin?int)?{
???for?i?:=?begin;?i?<?p.opt.MaxActive;?i++?{
??????p.reset(i)
???}
}
//?清除具體的連接
func?(p?*pool)?reset(index?int)?{
???conn?:=?p.conns[index]
???if?conn?==?nil?{
??????return
???}
???conn.reset()
???p.conns[index]?=?nil
}
這里我們可以看到,當需要從池子中清除具體的連接的時候,最終從連接池子中取出對應位置上的連接 ,conn := p.conns[index], conn.reset()
,實際上是給當前這個連接進行參數(shù)賦值
func?(c?*conn)?reset()?error?{
???cc?:=?c.cc
???c.cc?=?nil
???c.once?=?false
???if?cc?!=?nil?{
??????return?cc.Close()
???}
???return?nil
}
func?(c?*conn)?Close()?error?{
???c.pool.decrRef()
???if?c.once?{
??????return?c.reset()
???}
???return?nil
}
最終調(diào)用 Close() 將指定的連接清除掉,這些動作都是連接池自動給我們做了,無需我們使用者去擔心
我們使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之后,我們使用 conn.Close() 關(guān)閉連接,實際上也是會走到上述的 Close() 實現(xiàn)的位置,但是我們并未指定當然也沒有權(quán)限顯示的指定將 once 置位為 false ,因此對于調(diào)用者來說,是關(guān)閉了連接,對于連接池來說,實際上是將連接歸還到連接池中
關(guān)于連接池子的縮容和擴容是在 pool.Get() 中實現(xiàn)的
func?(p?*pool)?Get()?(Conn,?error)?{
???//?the?first?selected?from?the?created?connections
???nextRef?:=?p.incrRef()
???p.RLock()
???current?:=?atomic.LoadInt32(&p.current)
???p.RUnlock()
???if?current?==?0?{
??????return?nil,?ErrClosed
???}
???if?nextRef?<=?current*int32(p.opt.MaxConcurrentStreams)?{
??????next?:=?atomic.AddUint32(&p.index,?1)?%?uint32(current)
??????return?p.conns[next],?nil
???}
???//?the?number?connection?of?pool?is?reach?to?max?active
???if?current?==?int32(p.opt.MaxActive)?{
??????//?the?second?if?reuse?is?true,?select?from?pool's?connections
??????if?p.opt.Reuse?{
?????????next?:=?atomic.AddUint32(&p.index,?1)?%?uint32(current)
?????????return?p.conns[next],?nil
??????}
??????//?the?third?create?one-time?connection
??????c,?err?:=?p.opt.Dial(p.address)
??????return?p.wrapConn(c,?true),?err
???}
???//?the?fourth?create?new?connections?given?back?to?pool
???p.Lock()
???current?=?atomic.LoadInt32(&p.current)
???if?current?<?int32(p.opt.MaxActive)?&&?nextRef?>?current*int32(p.opt.MaxConcurrentStreams)?{
??????//?2?times?the?incremental?or?the?remain?incremental
??????increment?:=?current
??????if?current+increment?>?int32(p.opt.MaxActive)?{
?????????increment?=?int32(p.opt.MaxActive)?-?current
??????}
??????var?i?int32
??????var?err?error
??????for?i?=?0;?i?<?increment;?i++?{
?????????c,?er?:=?p.opt.Dial(p.address)
?????????if?er?!=?nil?{
????????????err?=?er
????????????break
?????????}
?????????p.reset(int(current?+?i))
?????????p.conns[current+i]?=?p.wrapConn(c,?false)
??????}
??????current?+=?i
??????log.Printf("grow?pool:?%d?--->?%d,?increment:?%d,?maxActive:?%d\n",
?????????p.current,?current,?increment,?p.opt.MaxActive)
??????atomic.StoreInt32(&p.current,?current)
??????if?err?!=?nil?{
?????????p.Unlock()
?????????return?nil,?err
??????}
???}
???p.Unlock()
???next?:=?atomic.AddUint32(&p.index,?1)?%?uint32(current)
???return?p.conns[next],?nil
}
從 Get 的實現(xiàn)中,我們可以知道 Get 的邏輯如下
先增加連接的引用計數(shù),如果在設定 current*int32(p.opt.MaxConcurrentStreams) 范圍內(nèi),那么直接取連接進行使用即可
若當前的連接數(shù)達到了最大活躍的連接數(shù),那么就看我們新建池子的時候傳遞的 option 中的 reuse 參數(shù)是否是 true,若是復用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接
其余的情況,就需要我們進行 2 倍或者 1 倍的數(shù)量對連接池進行擴容了
實際上,上述的庫中,并沒有提供咱們縮容的算法,如果真的有這方面的需求的話
也可以在 Get 的實現(xiàn)上進行縮容,具體的縮容策略可以根據(jù)實際情況來定,例如當引用計數(shù) nextRef 只有當前活躍連接數(shù)的 20% 的時候(這只是一個例子),就可以考慮縮容了
感謝閱讀,歡迎交流,點個贊,關(guān)注一波 再走吧