为什么需要rpc框架?一次rpc需要指定调用的方法,参数,接收返回值。如果没有rpc框架,裸写tcp,什么时候知道报文传递完毕的界限。最简单我们可以搞个私有协议,TLV格式指定:T(type)指定类型,L(length)指定长度,V(Value)指定值,但是这个也会带入一些问题,比如规范问题,不同服务提供不同协议,这不乱套了吗;另外还有效率问题,比如我要传递一个数组怎么传?基于以上几个问题,rpc框架出现了,rpc框架采用序列化操作将请求和返回在发送端进行序列化,然后在接收端进行解序列化达到目的,如下图所示,图片来自博客。
服务调用流程如下:
- client调用client stub,这是一次本地过程调用
- client stub将参数打包成一个消息,然后发送这个消息。打包过程(序列化)也叫做 marshalling
- client所在的系统将消息发送给server
- server的的系统将收到的包传给server stub
- server stub解包得到参数。 解包(解序列化)也被称作 unmarshalling
- 最后server stub调用服务过程. 返回结果按照相反的步骤传给client
net/rpc是go自带的rpc框架,采用gob
进行序列化。现在rpc框架有许多,比如跨语言调用的grpc,thrift等,服务治理框架dubbo,RPCX,go-micro等。rpc不同于RESTful API,前者可以基于HTTP,也可以基于TCP,UDP,主要注重方法,而后者为HTTP,主要为资源操作(增删改查)。关于rpc部分,可以查看这篇rpcx作者写的博客。本文的编写也参考了Go官方库RPC开发指南,这篇已经对net/rpc分析有个大概的轮廓了,只不过有些细节没有深究,本文我来扣一扣总结一下。
本文框架:首先给出服务端和客户端调用的例子,然后介绍服务端代码,然后介绍客户端,最后总结一下。
1. 调用的例子
1.1 服务端调用
package server
import "errors"
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
上面给出了服务端的例子,Arith提供了2个函数:Multiply
相乘函数和Divide
相除,格式规范为:第一个参数为传入的参数,第二个参数为返回的参数,返回值是error。这个也是定义rpc的必备约束:
- the method's type is exported.
- the method is exported.
- the method has two arguments, both exported (or builtin) types.
- the method's second argument is a pointer.
- the method has return type error.
也就是说,一个合格的RPC调用接口应该长这样:
func (t *T) MethodName(argType T1, replyType *T2) error
服务端启动服务完整代码,具体调用流程在下面小节分析:
package main
import(
"net"
"net/rpc"
"net/http/httptest"
"errors"
"log"
"sync"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
type Arith int
func (t *Arith) Multiply(args *Args, reply *int) error {
*reply = args.A * args.B
return nil
}
func (t *Arith) Divide(args *Args, quo *Quotient) error {
if args.B == 0 {
return errors.New("divide by zero")
}
quo.Quo = args.A / args.B
quo.Rem = args.A % args.B
return nil
}
func listenTCP() (net.Listener, string) {
l, e := net.Listen("tcp", "127.0.0.1:10011")
if e != nil {
log.Fatalf("net.Listen tcp :0: %v", e)
}
return l, l.Addr().String()
}
func startHttpServer() {
server := httptest.NewServer(nil)
httpServerAddr := server.Listener.Addr().String()
log.Println("Test HTTP RPC server listening on", httpServerAddr)
}
func main() {
rpc.Register(new(Arith)) //注册服务
var l net.Listener
l, serverAddr := listenTCP() //监听TCP连接
log.Println("Test RPC server listening on", serverAddr)
go rpc.Accept(l)
rpc.HandleHTTP() //监听HTTP连接
var httpOnce sync.Once
httpOnce.Do(startHttpServer)
select{}
}
1.2 客户端调用
客户端分别调用异步和同步连接到TCP和HTTP2个接口。
package main
import(
"net/rpc"
"log"
"fmt"
)
type Args struct {
A, B int
}
type Quotient struct {
Quo, Rem int
}
func main() {
client, err := rpc.DialHTTP("tcp", "127.0.0.1:64120") //64120为服务端启动服务的端口
if err != nil {
log.Fatal("dialing:", err)
}
// Synchronous call
args := &Args{7,8}
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
// Asynchronous call
clientTCP, err := rpc.Dial("tcp", "127.0.0.1:10011")
if err != nil {
log.Fatal("dialing:", err)
}
quotient := new(Quotient)
divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
replyCall := <-divCall.Done // will be equal to divCall
if replyCall.Error != nil {
fmt.Println(replyCall.Error)
} else {
fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
}
}
2.服务端代码分析
我们先来看一下服务端代码中的流程:注册(rpc.Register(new(Arith))
)、启动监听(listenTCP()
)、协程处理TCP连接(rpc.Accept(l)
)、处理HTTP连接(rpc.HandleHTTP()
)。
服务端对于每一个请求都会启动一个goroutine去处理,直接请求返回。
2.1 注册(rpc.Register(new(Arith))
)
首先是服务端代码调用rpc.Register(new(Arith))
,然后是对应的Register代码:
// Register publishes the receiver's methods in the DefaultServer.
func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
其中,DefaultServer
是全局变量var DefaultServer = NewServer()
,也就是说如果注册多次,都是挂在同一个Server下面(除非new一个新的Server,有相应的接口)。
调用Server中的Register:
// Register publishes in the server the set of methods of the
// receiver value that satisfy the following conditions:
// - exported method of exported type
// - two arguments, both of exported type
// - the second argument is a pointer
// - one return value, of type error
// It returns an error if the receiver is not an exported type or has
// no suitable methods. It also logs the error using package log.
// The client accesses each method using a string of the form "Type.Method",
// where Type is the receiver's concrete type.
func (server *Server) Register(rcvr interface{}) error {
return server.register(rcvr, "", false)
}
接下来是具体的register函数:
func (server *Server) register(rcvr interface{}, name string, useName bool) error {
s := new(service)
s.typ = reflect.TypeOf(rcvr)
s.rcvr = reflect.ValueOf(rcvr)
sname := reflect.Indirect(s.rcvr).Type().Name()
if useName {
sname = name
}
if sname == "" {
s := "rpc.Register: no service name for type " + s.typ.String()
log.Print(s)
return errors.New(s)
}
if !isExported(sname) && !useName {
s := "rpc.Register: type " + sname + " is not exported"
log.Print(s)
return errors.New(s)
}
s.name = sname
// Install the methods
s.method = suitableMethods(s.typ, true) //判断是否符合rpc规范
if len(s.method) == 0 {
str := ""
// To help the user, see if a pointer receiver would work.
method := suitableMethods(reflect.PtrTo(s.typ), false)
if len(method) != 0 {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
} else {
str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
}
log.Print(str)
return errors.New(str)
}
if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
return errors.New("rpc: service already defined: " + sname)
}
return nil
}
通过反射获取接口类型和值,并通过suitableMethods
函数判断注册的rpc是否符合规范,最后调用server.serviceMap.LoadOrStore(sname, s)
将对应rpc存放于map中,供之后查找。
2.2 启动监听(listenTCP()
)
注册监听端口。
2.3 协程处理TCP连接(rpc.Accept(l)
)
Accept
函数内对端口进行监听,有新来的连接,启动协程调用server.ServerConn
方法进行处理:
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks until the listener
// returns a non-nil error. The caller typically invokes Accept in a
// go statement.
func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
log.Print("rpc.Serve: accept:", err.Error())
return
}
go server.ServeConn(conn)
}
}
ServeConn
接着调用ServeCodec
,也就是走到了序列化/解序列化的地方:
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}
ServeCodec
代码,其主要为读取消息然后对request进行解序列化,然后调用相应的RPC方法,处理后发送序列化后的返回参数:
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) //读取消息并解序列化,其主要包含两部分:1.调用readRequestHeader读取请求,并查看请求的service是否存在,存在则返回。2.对输入参数进行解序列化,以及对应service的返回参数的类型
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1) //信号量控制,在下面call方法中会进行Done()
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)//调用对应的service处理,然后返回序列化后的返回值。
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
wg.Wait()//等待所有service.call完成
codec.Close()
}
readRequest
不具体展开了,来看一下call
函数:
func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
if wg != nil {
defer wg.Done() //信号量控制释放
}
mtype.Lock()
mtype.numCalls++ //访问次数计数
mtype.Unlock()
function := mtype.method.Func
// Invoke the method, providing a new value for the reply.
returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) //调用对应的rpc
// The return value for the method is an error.
errInter := returnValues[0].Interface()
errmsg := ""
if errInter != nil {
errmsg = errInter.(error).Error()
}
server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) //返回加密后的返回值
server.freeRequest(req) //释放request,此处下面有疑问
}
别的流程挺清晰了,此处freeRequest
用于释放Request
,对应的,readRequest
中调用getRequest()
获取头部。先来看一下Request结构体:
// Request is a header written before every RPC call. It is used internally
// but documented here as an aid to debugging, such as when analyzing
// network traffic.
type Request struct {
ServiceMethod string // format: "Service.Method"
Seq uint64 // sequence number chosen by client
next *Request // for free list in Server
}
包括了rpc方法名,序列号,已经链表结构存储下一个结点。看起来像是链表结构存储Request
,request请求来了,拿一个Request
结点,请求处理完毕,释放掉。
func (server *Server) getRequest() *Request {
server.reqLock.Lock()
req := server.freeReq
if req == nil {
req = new(Request)
} else {
server.freeReq = req.next
*req = Request{}
}
server.reqLock.Unlock()
return req
}
func (server *Server) freeRequest(req *Request) {
server.reqLock.Lock()
req.next = server.freeReq
server.freeReq = req
server.reqLock.Unlock()
}
然而看以上2个代码实现,现在的问题是:如果链表内没有结点可拿,则new一个,结束后把结点插入到链表的头部。那么链表的长度表示“最大一次并发访问量”,比如最大一次并发接受了100个请求,则结束后这个链表长度为100个Request,那么这个意义在哪里?为啥要用链表存,直接请求来了new不行吗?反正也是链表中结点也是拿出来复用的。此处的确没看懂。
同理,Response
也是这么做的。
2.4 处理HTTP连接(rpc.HandleHTTP()
)
以上是RPC over TCP的情况,go这个rpc库还提供了RPC over HTTP的接口。HandleHTTP
调用链就不具体讲了,其主要将默认的DefaultRPCPath
传递给http.Handle,当启动http server的时候,上面设置的RPC path将会生效,默认访问到该path。接下来是ServeHTTP
处理方法:
// ServeHTTP implements an http.Handler that answers RPC requests.
func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "CONNECT" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusMethodNotAllowed)
io.WriteString(w, "405 must CONNECT\n")
return
}
conn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
return
}
io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
server.ServeConn(conn)
}
函数内对连接进行Hijack,然后调用ServeConn处理连接(这里为RPC over HTTP情况,与RPC over TCP为一个处理函数,也就是说下层透明)。
2.5服务端小结
net/rpc
中默认生成了一个server供调用,当然你也可以自己new一个。
3.客户端代码分析
客户端代码提供了2种方式:同步Call和异步Go,其中Call方法的内部还是调用了Go方法,只不过进行了一次channel阻塞。
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}
// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
Go
里面是需要传入一个channel的且必须是带buffer的,如果没有,将会new一个,但是为什么这里channel buffer大小是10,我也不知道……send方法用于发送,考虑到一个client可能会调用多次send,所以有个加锁机制,但是为什么会有2个加锁?内层加锁是用于锁序列号,外层加锁锁整个发送过程?问题是既然2层嵌套,好像内层锁没什么用啊。
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()
// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
call.Error = ErrShutdown
client.mutex.Unlock()
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}
上面是发送过程,接收回复在Dial方法中调用了:
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
NewClient调用:
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}
// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
同样也是先过解码器,调用input处理回复,失败或者处理完毕会调用done
方法,这样读channel端就不会一直阻塞住。
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}
func (call *Call) done() {
select {
case call.Done <- call:
// ok
default:
// We don't want to block here. It is the caller's responsibility to make
// sure the channel has enough buffer space. See comment in Go().
if debugLog {
log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
}
}
}
总结
net/rpc库默认采用gob进行序列化,当然这个可以更改为protobuf, json等。据说net/rcp的性能很不错。
说明:
转载请注明链接: http://vinllen.com/golang-net-rpcyuan-ma-fen-xi/
参考:
http://colobu.com/2016/09/18/go-net-rpc-guide/
https://www.gitbook.com/book/smallnest/go-rpc-programming-guide/details