前言
RPC(Remote Procedure Call),翻譯過來為「遠程過程調用」,是一種分布式系統中服務或節點之間的有效通信機制。通過 RPC,某個節點(或客戶端)可以很輕鬆的調用遠端(或服務端)的方法或服務,就像在本地調用一樣簡單。現有的很多 RPC 框架都要求暴露服務端地址,也就是需要知道伺服器的 IP 和 RPC 埠。而本篇文章將介紹一種不需要暴露 IP 地址和埠的 RPC 通信方式。這種方式是基於 Redis BRPOP/BLPOP 操作實現的延遲隊列,以及 Golang 中的 goroutine 協程異步機制,整個框架非常簡單和易於理解,同時也很高效、穩定和安全。這種方式已經應用到了 Crawlab 中的節點通信當中,成為了各節點即時傳輸信息的主要方式。下面我們將從 Crawlab 早期節點通信方案 PubSub 開始,介紹當時遇到的問題和解決方案,然後如何過渡到現在的 RPC 解決方案,以及它是如何在 Crawlab 中發揮作用的。
早期的 Crawlab 是基於 Redis 的 PubSub,也就是發布訂閱模式。這是 Redis 中主要用於一對多的單向通信的方案。其用法非常簡單:
SUBSCRIBE channel1 channel2 ...
PUBLISH channelx message
Redis的 PubSub 可以用作廣播模式,即一個發布者對應多個訂閱者。而在Crawlab中,我們只有一個訂閱者對應一個發布者的情況(主節點->工作節點: nodes:
以下為節點通信原理示意圖。
各個節點會通過Redis的 PubSub 功能來做相互通信。
所謂 PubSub ,簡單來說是一個發布訂閱模式。訂閱者(Subscriber)會在Redis上訂閱(Subscribe)一個通道,其他任何一個節點都可以作為發布者(Publisher)在該通道上發布(Publish)消息。
在 Crawlab 中,主節點會訂閱 nodes:master 通道,其他節點如果需要向主節點發送消息,只需要向 nodes:master 發布消息就可以了。同理,各工作節點會各自訂閱一個屬於自己的通道 nodes:
一個網絡請求的簡單過程如下:
不是所有節點通信都是雙向的,也就是說,主節點只會單方面對工作節點通信,工作節點並不會返迴響應給主節點,所謂的單向通信。以下是Crawlab的通信類型。
如果您在閱讀 Crawlab 源碼,會發現節點通信中有大量的 chan 語法,這是 Golang 的一個並發特性。
chan 表示為一個通道,在 Golang 中分為無緩衝和有緩衝的通道,我們用了無緩衝通道來阻塞協程,只有當 chan 接收到信號( chan <- "some signal" ),該阻塞才會釋放,協程進行下一步操作)。在請求響應模式中,如果為雙向通信,主節點收到請求後會起生成一個無緩衝通道來阻塞該請求,當收到來自工作節點的消息後,向該無緩衝通道賦值,阻塞釋放,返迴響應給客戶端。
go 命令會起一個 goroutine (協程)來完成並發,配合 chan ,該協程可以利用無緩衝通道掛起,等待信號執行接下來的操作。
PubSub 這種消息訂閱-發布設計模式是一種有效的實現節點通信的方式,但是它有兩個問題:
其中,第二個問題是比較棘手的。如果我們希望加入更多的功能,需要寫大量的異步代碼,這會加大系統模塊間的耦合度,造成擴展性很差,而且代碼閱讀起來很痛苦。
因此,為了解決這個問題,我們採用了基於 Redis 延遲消息隊列的 RPC 服務。
下圖是基於延遲隊列架構的 RPC 實現示意圖。
每一個節點都有一個客戶端(Client)和服務端(Server)。客戶端用於發送消息到目標節點(Target Node)並接收其返回的消息,服務端用於接收、處理源節點(Source Node)的消息並返回消息給源節點的客戶端。
整個 RPC 通信的流程如下:
這樣,整個節點的通信流程就通過 Redis 完成了。這樣做的好處在於不用暴露 HTTP 的 IP 地址和埠,只需要知道節點 ID 即可完成 RPC 通信。
這樣設計後的 RPC 代碼比較容易理解和維護。每次需要擴展新的通信類別時,只需要繼承 rpc.Service 類,實現 ClientHandle (客戶端處理方法)和 ServerHandle (服務端處理方法)方法就可以了。
這裡多說一下 BRPOP 。它將移出並獲取消息隊列的最後一個元素, 如果消息隊列沒有元素會阻塞隊列直到等待超時或發現可彈出元素為止。因此,使用 BRPOP 命令相對於輪訓或其他方式,可以避免不間斷的請求 Redis,避免浪費網絡和計算資源。
如果對 Redis 的操作命令不熟悉的,可以參考一下掘金小冊 《Redis 深度歷險:核心原理與應用實踐》 ,這本小冊深入介紹了 Redis 的原理以及工程實踐,對於應用 Redis 到實際開發中非常實用。
講了這麼多理論知識,我們還是需要看看代碼的。老師常教育我們:「Talk is cheap. Show me the code.」
由於 Crawlab 後端是 Golang 開發的,要理解以下代碼需要一些 Golang 的基礎知識。
首先我們需要定一個傳輸消息的數據結構。代碼如下。
package entity
type RpcMessage struct {
Id string `json:"id"` // 消息ID
Method string `json:"method"` // 消息方法
NodeId string `json:"node_id"` // 節點ID
Params map[string]string `json:"params"` // 參數
Timeout int `json:"timeout"` // 超時
Result string `json:"result"` // 結果
Error string `json:"error"` // 錯誤
}
這裡,我們定義了消息 ID、方法、節點 ID、參數等欄位。消息 ID 是 UUID,保證了消息 ID 的唯一性。
首先,我們定義一個抽象基礎接口,方便讓實際業務邏輯模塊繼承。服務端的處理邏輯在 ServerHandle 中,返回 entity 里的 RpcMessage ,而客戶端的邏輯在 ClientHandle 中。
// RPC服務基礎類
type Service interface {
ServerHandle() (entity.RpcMessage, error)
ClientHandle() (interface{}, error)
}
當我們調用客戶端的通用方法的時候,需要實現兩個邏輯:
以下是實現的代碼。
// 客戶端處理消息函數
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
return func() (replyMsg entity.RpcMessage, err error) {
// 請求ID
msg.Id = uuid.NewV4().String()
// 發送RPC消息
msgStr := utils.ObjectToString(msg)
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 獲取RPC回復消息
dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
if err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 反序列化消息
if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
log.Errorf("RpcClientFunc error: " + err.Error())
debug.PrintStack()
return replyMsg, err
}
// 如果返回消息有錯誤,返回錯誤
if replyMsg.Error != "" {
return replyMsg, errors.New(replyMsg.Error)
}
return
}
}
服務端處理的邏輯如下,大致的邏輯是:
您可以在 InitRpcService 這個方法中看到上述邏輯。私有方法 handleMsg 實現了序列化、調用服務端 RPC 服務方法、發送返回消息的邏輯。如果需要拓展 RPC 方法類型,在工廠類方法 GetService 里添加就可以了。
// 獲取RPC服務
func GetService(msg entity.RpcMessage) Service {
switch msg.Method {
case constants.RpcInstallLang:
return &InstallLangService{msg: msg}
case constants.RpcInstallDep:
return &InstallDepService{msg: msg}
case constants.RpcUninstallDep:
return &UninstallDepService{msg: msg}
case constants.RpcGetLang:
return &GetLangService{msg: msg}
case constants.RpcGetInstalledDepList:
return &GetInstalledDepsService{msg: msg}
}
return nil
}
// 處理RPC消息
func handleMsg(msgStr string, node model.Node) {
// 反序列化消息
var msg entity.RpcMessage
if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 獲取service
service := GetService(msg)
// 根據Method調用本地方法
replyMsg, err := service.ServerHandle()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
// 發送返回消息
if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
log.Errorf(err.Error())
debug.PrintStack()
}
}
// 初始化服務端RPC服務
func InitRpcService() error {
go func() {
for {
// 獲取當前節點
node, err := model.GetCurrentNode()
if err != nil {
log.Errorf(err.Error())
debug.PrintStack()
continue
}
// 獲取獲取消息隊列信息
msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
if err != nil {
if err != redis.ErrNil {
log.Errorf(err.Error())
debug.PrintStack()
}
continue
}
// 處理消息
go handleMsg(msgStr, node)
}
}()
return nil
}
Crawlab 的節點上經常需要為爬蟲安裝一些第三方依賴,例如 pymongo、requests 等。而其中,我們也需要直到某個節點上是否已經安裝了某個依賴,這需要跨伺服器通信,也就是需要在分布式網絡中進行雙向通信。而這個邏輯是通過 RPC 實現的。主節點向目標節點發起 RPC 調用,目標節點運行被調用方法,將運行結果也就是安裝的依賴列表返回給客戶端,客戶端再返回給調用者。
下面的代碼實現了獲取目標節點上已安裝的依賴的 RPC 方法。
// 獲取已安裝依賴服務
// 繼承Service基礎類
type GetInstalledDepsService struct {
msg entity.RpcMessage
}
// 服務端處理方法
// 重載ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
lang := utils.GetRpcParam("lang", s.msg.Params)
deps, err := GetInstalledDepsLocal(lang)
if err != nil {
s.msg.Error = err.Error()
return s.msg, err
}
resultStr, _ := json.Marshal(deps)
s.msg.Result = string(resultStr)
return s.msg, nil
}
// 客戶端處理方法
// 重載ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
// 發起 RPC 請求,獲取服務端數據
s.msg, err = ClientFunc(s.msg)()
if err != nil {
return o, err
}
// 反序列化
var output []entity.Dependency
if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
return o, err
}
o = output
return
}
寫好了 RPC 服務端和客戶端處理方法,就可以輕鬆編寫調用邏輯了。以下是調用獲取遠端已安裝依賴列表的方法。首先由 GetService 工廠類獲取之前定義好的 GetInstalledDepsService ,再調用其客戶端處理方法 ClientHandle ,然後返回結果。這就像在本地調用方法一樣。是不是很簡單?
// 獲取遠端已安裝依賴
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
params := make(map[string]string)
params["lang"] = lang
s := GetService(entity.RpcMessage{
NodeId: nodeId,
Method: constants.RpcGetInstalledDepList,
Params: params,
Timeout: 60,
})
o, err := s.ClientHandle()
if err != nil {
return
}
deps = o.([]entity.Dependency)
return
}
本篇文章主要介紹了一種基於 Redis 延遲隊列的 RPC 通信方式,這種方式不用暴露各個節點或服務的 IP 地址或埠,是一種非常安全的方式。而且,這種方式已經用 Golang 在 Crawlab 中實現了雙向通信,特別是 Golang 中的天生支持異步的 goroutine,讓這種方式的實現變得簡單。實際上,這種方式理論上是非常高效的,能夠支持高並發數據傳輸。
但是,在 Crawlab 的實現中還存在一些隱患,也就是它並沒有限制服務端的處理並發數量。因此如果傳輸消息過多時,服務端資源會被占滿,導致處理速度變慢甚至宕機的風險。修復方式是在服務端限制並發數量。另外,限於時間的原因,作者還沒有來得及測試這種 RPC 通信方式的實際傳輸效率,容錯機制也沒有加入。因此總的來說還有很大的提升和優化空間。
雖然如此,這種方式對於 Crawlab 的低並發遠程通信來說是足夠的了,在實際使用中也沒有出現問題,非常穩定。對於隱秘性有要求、希望不暴露地址信息的開發者,我們也推薦將該種方式在實際應用中嘗試。