Go BIO/NIO探讨:Net库对Socket、Bind、listen、Accept的封装
2023-03-09 09:18:12来源:今日头条
前面一篇文章提到,Go内置的 net/http中使用了Blocking IO,主要体现在两层 for 循环。但真的是这样吗?
本文我们看看 Go net库中Server.ListenAndServe的实现细节。
net.Listen("tcp", addr)方法通过系统调用 socket、bind、listen 生成net.Listener对象,在后面的for 循环中,通过系统调用 accept 等待新的tcp conn,将其包装成一个 conn 对象,在新的 goroutine 中对这个conn进行处理。这里是典型的 per goroutine per connection 模型。这个环节看起来是阻塞的,但创建 socket 时设置了syscall.SOCK_NONBLOCK,对后来有什么影响?
(资料图片仅供参考)
// net/http/server.go struct Serverfunc (srv *Server) ListenAndServe() error { ln, err := net.Listen("tcp", addr) // ... 省略部分代码 return srv.Serve(ln)}func (srv *Server) Serve(l net.Listener) error { for { // ... rw, err := l.Accept() // ... c := srv.newConn(rw) c.setState(c.rwc, StateNew, runHooks) // before Serve can return go c.serve(connCtx)}net.Listener
net.Listen触发一系列的系统调用(主要是 socket、bind、listen),生成一个net.Listener对象。这个函数创建两类Listener: TCP 支持跨机器的网络通信,UNIX支持本机的多进程通信。
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) { // ... 省略部分代码 var l Listener la := addrs.first(isIPv4) switch la := la.(type) { case *TCPAddr: l, err = sl.listenTCP(ctx, la) case *UnixAddr: l, err = sl.listenUnix(ctx, la) // ... 省略部分代码}
由于两者都是先触发 syscall.Socket,我们从 socket 系统调用的视角来看两者的区别。
// https://man7.org/linux/man-pages/man2/socket.2.html#includeint socket(int family, int type, int protocol);
socket() 创建一个用于网络通信的endpoint,并返回对应的套接字,也叫socket file descriptor。它是一个 int 值,Linux C代码里一般用 sockfd 作为变量名,而 Go net库里一般用 fd 作为变量名。
第一个参数 family 参数用来指定通信的协议族(protocol family),常用的enum值有:
AF_UNIX/AF_LOCAL: Unix域协议, 用于本机的进程间通信。AF_INET: IPV4协议。AF_INET6: IPV6协议。AF_ROUTE: 路由套接字。全量Enum定义在Linux第二个参数 type 参数用来指定通信语义,常用enum值有:
SOCK_STREAM=1: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,不限制消息长度,支持消息的优先级传输。SOCK_DGRAM=2: 基于UDP, 支持数据报,不是基于连接的、不保证可靠性,且消息的最大长度是固定的。SOCK_RAW=3: 支持通过原始的网络协议访问。SOCK_RDM=4:。SOCK_SEQPACKET=5: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,但消息的最大长度是固定的,超出的部分会被丢弃。除了这几个,还有两个enum值在 Go net/http 被用到了,分别是:
SOCK_NONBLOCK: 设置 accept 和 read/write操作为 O_NONBLOCK, 对应的场景有:接收连接 accept: 同步模式下没有新连接时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。read类操作: 同步模式下socket缓冲区没有数据可读时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。write类操作: 同步模式下socket缓冲区已满无法写入时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。SOCK_CLOEXEC: 由于fork时,子进程默认拷贝父进程的数据空间、堆、栈等,当然也包含socket, 通过设置这个flag, 可以保证fork出来的子进程不持有父进程创建的socket。第三个参数 protocol 指定通信协议,对于domain=AF_INET/AF_INET6来说,常见的enum值有 IPPROTO_TCP IPPROTO_UDP,全量。
socket() 返回一个 socket file descriptor,但并没有协议和地址与其关联。对于tcp client端而言,可以由系统随机指定一个端口;对于一个 tcp server 而言,必须设置一个公开可访问的ip地址和端口。bind函数实现了这个功能:
#includeint bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);// sockaddr 包含struct sockaddr { sa_family_t sa_family; char sa_data[14];}
其中 sockfd 参数是 socket函数的返回值,后面两个参数指定协议类型和地址。
当socket被创建以后,它并不能被动地接收创建连接请求,此时它只能作为一个client使用。要被动地接收请求,转化为server,需要依赖 listen函数。该函数调用以后,sockfd的状态会从 closed 转换为 listen (netstat 命令可以进行查看)。listen函数的声明如下:
#includeint listen(int sockfd, int backlog);
第一个参数 sockfd 是 socket函数的返回值,第二个参数指定了处于ESTABLISHED状态的sockets的队列大小(从Linux 2.2起), 而不是处于 SYNC_RCVD状态的sockets队列的大小。这里提到的两个状态在TCP连接的三次握手中有所定义:
tcp三次握手
backlog 默认值是0x80即128,通常可以配置在文件。/proc/sys/net/ipv4/tcp_max_syn_backlog 中,同时受到/proc/sys/net/core/somaxconn 的限制。在 Go 中,这个参数可以通过func maxListenerBacklog() int获取。如果队列满了,Client端会收到 ECONNREFUSED 错误,即 connection refused。
小结一下,Linux操作系统层面创建一个tcp server,走的逻辑是:
socket函数创建一个套接字 sockfd,默认状态是 Closed。bind函数绑定 sockfd 与特定的协议地址,比如 tcp 0.0.0.0:8080。listen函数修改sockfd的状态为 LISTEN,内核开始监听套接字,三次握手建立连接。回到 Go net 库的处理流程,我们关注的函数是 sysListener.listenTCP:
// net/tcpsock_posix.go struct sysListenerfunc (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) { fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) if err != nil { return nil, err } return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil}
对于一个 tcp server,实例 sl 可以被这样赋值,系统调用被封装在函数 internetSocket 里:
sl := &sysListener{ ListenConfig: *lc, // lc 是默认值 network: network, // string "tcp" address: address, // string "0.0.0.0:8080"}
对于一个 tcp server,函数 internetSocket 接收到的参数可以是:
func internetSocket( ctx context.Context, // context.Background() net string, // "tcp" laddr sockaddr, // &TCPAddr{IP:"0.0.0.0",Port:8080,Zone:"不知道是啥"}, DNS服务读取的地址 raddr sockaddr, // nil, os=aix|windows|openbsd && mode="dail" 才需要 sotype int, // syscall.SOCK_STREAM proto int, // 0 mode string, // "listen" ctrlFn func(string, string, syscall.RawConn) error // sl.ListenConfig.Control) (fd *netFD, err error) {
函数 internetSocket 同样只是一层封装,内部调用的是函数 socket。函数socket内部按照顺序调用了socket/bind/listen,返回一个套接字,这个套接字使用network poller,支持异步IO。函数 socket的主要逻辑如下:
// 通过sysSocket执行socket系统调用// 返回一个套接字ss, err := sysSocket(family, sotype, proto)// 封装int类型的套接字为一个结构体if fd, err = newFD(s, family, sotype, net); err != nil {// ...省略部分代码// 对于 SOCK_STREAM, SOCK_SEQPACKET类型,调用bind和listenif laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil
对于linux tcp server 而言,函数 sysSocket 的关键只有一行代码:
s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)
SOCK_STREAM、SOCK_NONBLOCK 和 SOCK_CLOEXEC 的语义前面已经讲过,不再赘述。
socketFunc 定义存放在 net/hook_unix.go 里,与listenFunc在一块:
// Placeholders for socket system calls.socketFunc func(int, int, int) (int, error) = syscall.SocketconnectFunc func(int, syscall.Sockaddr) error = syscall.ConnectlistenFunc func(int, int) error = syscall.ListengetsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
通过sysSocket拿到套接字以后,通过函数newFD将其封装成一个结构体,类型是 *net.netFD:
func newFD(sysfd, family, sotype int, net string) (*netFD, error) { ret := &netFD{ pfd: poll.FD{ Sysfd: sysfd, IsStream: sotype == syscall.SOCK_STREAM, ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW, }, family: family, sotype: sotype, net: net, } return ret, nil}
其中,结构体内部poll.FD定义了读写的逻辑,它封装了6个系统调用:
readSyscallName = "read"readFromSyscallName = "recvfrom"readMsgSyscallName = "recvmsg"writeSyscallName = "write"writeToSyscallName = "sendto"writeMsgSyscallName = "sendmsg"
在创建套接字时,已经设置了 SOCK_NONBLOCK flag,如果没有可用的连接,读写数据时,会收到 EWOULDBLOCK/EAGAIN 错误。Go net库的处理是等待一段时间,我们看其中一个例子:
// ReadMsgInet4 is ReadMsg, but specialized for syscall.SockaddrInet4.func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error) { if err := fd.readLock(); err != nil { return 0, 0, 0, err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return 0, 0, 0, err } for { n, oobn, sysflags, err := unix.RecvmsgInet4(fd.Sysfd, p, oob, flags, sa4) if err != nil { if err == syscall.EINTR { continue } // TODO(dfc) should n and oobn be set to 0 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, oobn, sysflags, err }}
回到 func (sl *sysListener) listenTCP 方法,函数 internetSocket 返回一个套接字结构体的实例,用来构建 TCPListener 对象&TCPListener{fd: fd, lc: sl.ListenConfig}。后面 accept tcp conn 时,会用到 net.netFD 的 accept 方法,后者只是封装了 poll.DF 的 Accept 方法。
回到 net/http 下的 struct Server 的 ListenAndServe 方法,它包含两步:
net.Listen 方法获取 ln *TCPListener。srv.Serve(ln)。前面详细说明了第一步的细节,后面我们看第二步如何Serve。
TCPListenr.Accept对于 linux下的 tcp server,系统调用 accept 发生在 socket、bind、listen 之后,它从内核中的 ESTABLISHED 队列中获取一个建立完成的链接。通过函数socket生成的套接字sockfd可以是阻塞或非阻塞(NONBLOCK),它的函数声明如下:
#includeint accept(int sockfd, struct sockaddr *restrict addr, socklen_t *restrict addrlen);#define _GNU_SOURCE /* See feature_test_macros(7) */#include int accept4(int sockfd, struct sockaddr *restrict addr, socklen_t *restrict addrlen, int flags);
对于阻塞/非阻塞的套接字, accept 的表现并不相同:
阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp conn。非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误。Go net库使用的是非阻塞的套接字,我们看这部分代码的逻辑:
// net/net.go struct Serverfunc (srv *Server) Serve(l net.Listener) error { for { rw, err := l.Accept() // ... 省略部分代码
这里 ln 的类型是 *TCPListener, 其方法Accept的定义如下:
// Accept implements the Accept method in the Listener interface; it// waits for the next call and returns a generic Conn.func (l *TCPListener) Accept() (Conn, error) { if !l.ok() { return nil, syscall.EINVAL } c, err := l.accept() if err != nil { return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err} } return c, nil}func (ln *TCPListener) accept() (*TCPConn, error) { fd, err := ln.fd.accept() if err != nil { return nil, err } tc := newTCPConn(fd) if ln.lc.KeepAlive >= 0 { setKeepAlive(fd, true) ka := ln.lc.KeepAlive if ln.lc.KeepAlive == 0 { ka = defaultTCPKeepAlive } setKeepAlivePeriod(fd, ka) } return tc, nil}
struct TCPListener 的结构如下, accept 依赖成员变量 fd *net.netFD,它通过 pdf poll.FD 的 Accept 方法获取client端的套接字,并封装成一个 net.netFD 对象:
// TCPListener is a TCP network listener. Clients should typically// use variables of type Listener instead of assuming TCP.type TCPListener struct { fd *netFD lc ListenConfig}func (fd *netFD) accept() (netfd *netFD, err error) { d, rsa, errcall, err := fd.pfd.Accept() // ... 省略部分代码 if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil { poll.CloseFunc(d) return nil, err } if err = netfd.init(); err != nil { netfd.Close() return nil, err } lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd) netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) return netfd, nil}
继续看 poll.FD 的方法 Accept。它内部是一个 for 循环,先尝试通过系统调用accpt4 获取一个套接字,结果会有下面几种情况:
获取成功, err == nil, 函数直接return即可。syscall.EINTR 表示系统调用期间收到操作系统的信号,但并没有实质的错误发生,所以选择重试。syscall.EAGAIN 表示目前并没有establish新的tcp conn,处理是通过 waitRead 将当前goroutine挂起,等待被唤醒。syscall.ECONNABORTED 表示远程连接已经在ESTABLISHED队列,还没有被Accept时,client端放弃连接。其他错误: 返回一个错误。// Accept wraps the accept network call.func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { if err := fd.readLock(); err != nil { return -1, nil, "", err } defer fd.readUnlock() if err := fd.pd.prepareRead(fd.isFile); err != nil { return -1, nil, "", err } for { s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EINTR: continue case syscall.EAGAIN: if fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it"s a silly error, so try again. continue } return -1, nil, errcall, err }}// 代码路径: internal/poll/sock_cloexec.go// Wrapper around the accept system call that marks the returned file// descriptor as nonblocking and close-on-exec.func accept(s int) (int, syscall.Sockaddr, string, error) { ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) // ... 省略部分代码
另外可以看到,accept4 系统调用时,传入了 SOCK_NONBLOCK 和 SOCK_CLOEXEC 两个 flag,socket 系统调用也使用了这两个 flag。
通过 ln.Accept 获取到ESTABLISHED连接的套接字以后,就可以对远端的client进行服务了。
在本文中,总共有两类套接字(socket):
server端监听的套接字, 通过socket系统调用创建。它的生命周期和server同样长。已连接的套接字, 通过accept系统调用创建。它的生命周期比较短,尤其是对于应用层是HTTP短链接的情况。在第一篇文章"Go BIO/NIO探讨(1):Gin框架中如何处理HTTP请求"中,我们提到了两层 for 循环,本文只是讲了第一层。从阻塞、非阻塞的角度来看,TCPListener.Accept方法看起来是block的实现,但底层的套接字和系统调用设置了 NONBLOCK flag,可以说是基于 NONBLOCK 的方式实现的。单纯从网络的视角看,这称得上是 Non-blocking IO 了。
关键词: