首页>国内 > 正文

Go BIO/NIO探讨:Net库对Socket、Bind、listen、Accept的封装

2023-03-09 09:18:12来源:今日头条

​​前面一篇文章​​提到,Go内置的 net/http中使用了Blocking IO,主要体现在两层 for 循环。但真的是这样吗?

本文我们看看 Go net库中Server.ListenAndServe的实现细节。

net.Listen("tcp", addr)方法通过系统调用 socket、bind、listen 生成net.Listener对象,在后面的for 循环中,通过系统调用 accept 等待新的tcp conn,将其包装成一个 conn 对象,在新的 goroutine 中对这个conn进行处理。这里是典型的 per goroutine per connection 模型。这个环节看起来是阻塞的,但创建 socket 时设置了syscall.SOCK_NONBLOCK,对后来有什么影响?


(资料图片仅供参考)

// net/http/server.go struct Serverfunc (srv *Server) ListenAndServe() error {  ln, err := net.Listen("tcp", addr)  // ... 省略部分代码  return srv.Serve(ln)}func (srv *Server) Serve(l net.Listener) error {  for {    // ...    rw, err := l.Accept()    // ...    c := srv.newConn(rw)    c.setState(c.rwc, StateNew, runHooks) // before Serve can return    go c.serve(connCtx)}
net.Listener

net.Listen触发一系列的系统调用(主要是 socket、bind、listen),生成一个net.Listener对象。这个函数创建两类Listener: TCP 支持跨机器的网络通信,UNIX支持本机的多进程通信。

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {  // ... 省略部分代码  var l Listener  la := addrs.first(isIPv4)  switch la := la.(type) {  case *TCPAddr:    l, err = sl.listenTCP(ctx, la)  case *UnixAddr:    l, err = sl.listenUnix(ctx, la)  // ... 省略部分代码}

由于两者都是先触发 syscall.Socket,我们从 socket 系统调用的视角来看两者的区别。

// https://man7.org/linux/man-pages/man2/socket.2.html#include int socket(int family, int type, int protocol);

socket() 创建一个用于网络通信的endpoint,并返回对应的套接字,也叫socket file descriptor。它是一个 int 值,Linux C代码里一般用 sockfd 作为变量名,而 Go net库里一般用 fd 作为变量名。

第一个参数 family 参数用来指定通信的协议族(protocol family),常用的enum值有:

AF_UNIX/AF_LOCAL: Unix域协议, 用于本机的进程间通信。AF_INET: IPV4协议。AF_INET6: IPV6协议。AF_ROUTE: 路由套接字。全量Enum定义在Linux 下。

第二个参数 type 参数用来指定通信语义,常用enum值有:

SOCK_STREAM=1: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,不限制消息长度,支持消息的优先级传输。SOCK_DGRAM=2: 基于UDP, 支持数据报,不是基于连接的、不保证可靠性,且消息的最大长度是固定的。SOCK_RAW=3: 支持通过原始的网络协议访问。SOCK_RDM=4:。SOCK_SEQPACKET=5: 基于TCP, 提供有序、可靠、双向、基于连接的字节流,但消息的最大长度是固定的,超出的部分会被丢弃。

除了这几个,还有两个enum值在 Go net/http 被用到了,分别是:

SOCK_NONBLOCK: 设置 accept 和 read/write操作为 O_NONBLOCK, 对应的场景有:接收连接 accept: 同步模式下没有新连接时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。read类操作: 同步模式下socket缓冲区没有数据可读时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。write类操作: 同步模式下socket缓冲区已满无法写入时, 线程会被休眠, 异步模式下会返回EWOULDBLOCK/EAGAIN错误。SOCK_CLOEXEC: 由于fork时,子进程默认拷贝父进程的数据空间、堆、栈等,当然也包含socket, 通过设置这个flag, 可以保证fork出来的子进程不持有父进程创建的socket。

第三个参数 protocol 指定通信协议,对于domain=AF_INET/AF_INET6来说,常见的enum值有 IPPROTO_TCP IPPROTO_UDP,全量。

socket() 返回一个 socket file descriptor,但并没有协议和地址与其关联。对于tcp client端而言,可以由系统随机指定一个端口;对于一个 tcp server 而言,必须设置一个公开可访问的ip地址和端口。bind函数实现了这个功能:

#include int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);// sockaddr 包含struct sockaddr {  sa_family_t sa_family;  char        sa_data[14];}

其中 sockfd 参数是 socket函数的返回值,后面两个参数指定协议类型和地址。

当socket被创建以后,它并不能被动地接收创建连接请求,此时它只能作为一个client使用。要被动地接收请求,转化为server,需要依赖 listen函数。该函数调用以后,sockfd的状态会从 closed 转换为 listen (netstat 命令可以进行查看)。listen函数的声明如下:

#include int listen(int sockfd, int backlog);

第一个参数 sockfd 是 socket函数的返回值,第二个参数指定了处于ESTABLISHED状态的sockets的队列大小(从Linux 2.2起), 而不是处于 SYNC_RCVD状态的sockets队列的大小。这里提到的两个状态在TCP连接的三次握手中有所定义:

tcp三次握手

backlog 默认值是0x80即128,通常可以配置在文件。/proc/sys/net/ipv4/tcp_max_syn_backlog 中,同时受到/proc/sys/net/core/somaxconn 的限制。在 Go 中,这个参数可以通过func maxListenerBacklog() int获取。如果队列满了,Client端会收到 ECONNREFUSED 错误,即 connection refused。

小结一下,Linux操作系统层面创建一个tcp server,走的逻辑是:

socket函数创建一个套接字 sockfd,默认状态是 Closed。bind函数绑定 sockfd 与特定的协议地址,比如 tcp 0.0.0.0:8080。listen函数修改sockfd的状态为 LISTEN,内核开始监听套接字,三次握手建立连接。

回到 Go net 库的处理流程,我们关注的函数是 sysListener.listenTCP:

// net/tcpsock_posix.go struct sysListenerfunc (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {   fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)   if err != nil {      return nil, err   }   return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil}

对于一个 tcp server,实例 sl 可以被这样赋值,系统调用被封装在函数 internetSocket 里:

sl := &sysListener{  ListenConfig: *lc,     // lc 是默认值  network:      network, // string "tcp"  address:      address, // string "0.0.0.0:8080"}

对于一个 tcp server,函数 internetSocket 接收到的参数可以是:

func internetSocket(  ctx context.Context,  // context.Background()  net string,           // "tcp"  laddr sockaddr,       // &TCPAddr{IP:"0.0.0.0",Port:8080,Zone:"不知道是啥"}, DNS服务读取的地址  raddr sockaddr,       // nil, os=aix|windows|openbsd && mode="dail" 才需要  sotype int,           // syscall.SOCK_STREAM  proto int,            // 0  mode string,          // "listen"  ctrlFn func(string, string, syscall.RawConn) error // sl.ListenConfig.Control) (fd *netFD, err error) {

函数 internetSocket 同样只是一层封装,内部调用的是函数 socket。函数socket内部按照顺序调用了socket/bind/listen,返回一个套接字,这个套接字使用network poller,支持异步IO。函数 socket的主要逻辑如下:

// 通过sysSocket执行socket系统调用// 返回一个套接字ss, err := sysSocket(family, sotype, proto)// 封装int类型的套接字为一个结构体if fd, err = newFD(s, family, sotype, net); err != nil {// ...省略部分代码// 对于 SOCK_STREAM, SOCK_SEQPACKET类型,调用bind和listenif laddr != nil && raddr == nil {  switch sotype {  case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:    if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {      fd.Close()      return nil, err    }    return fd, nil

对于linux tcp server 而言,函数 sysSocket 的关键只有一行代码:

s, err := socketFunc(family, sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC, proto)

SOCK_STREAM、SOCK_NONBLOCK 和 SOCK_CLOEXEC 的语义前面已经讲过,不再赘述。

socketFunc 定义存放在 net/hook_unix.go 里,与listenFunc在一块:

// Placeholders for socket system calls.socketFunc        func(int, int, int) (int, error)  = syscall.SocketconnectFunc       func(int, syscall.Sockaddr) error = syscall.ConnectlistenFunc        func(int, int) error              = syscall.ListengetsockoptIntFunc func(int, int, int) (int, error)  = syscall.GetsockoptInt

通过sysSocket拿到套接字以后,通过函数newFD将其封装成一个结构体,类型是 *net.netFD:

func newFD(sysfd, family, sotype int, net string) (*netFD, error) {   ret := &netFD{      pfd: poll.FD{         Sysfd:         sysfd,         IsStream:      sotype == syscall.SOCK_STREAM,         ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,      },      family: family,      sotype: sotype,      net:    net,   }   return ret, nil}

其中,结构体内部poll.FD定义了读写的逻辑,它封装了6个系统调用:

readSyscallName     = "read"readFromSyscallName = "recvfrom"readMsgSyscallName  = "recvmsg"writeSyscallName    = "write"writeToSyscallName  = "sendto"writeMsgSyscallName = "sendmsg"

在创建套接字时,已经设置了 SOCK_NONBLOCK flag,如果没有可用的连接,读写数据时,会收到 EWOULDBLOCK/EAGAIN 错误。Go net库的处理是等待一段时间,我们看其中一个例子:

// ReadMsgInet4 is ReadMsg, but specialized for syscall.SockaddrInet4.func (fd *FD) ReadMsgInet4(p []byte, oob []byte, flags int, sa4 *syscall.SockaddrInet4) (int, int, int, error) {  if err := fd.readLock(); err != nil {    return 0, 0, 0, err  }  defer fd.readUnlock()  if err := fd.pd.prepareRead(fd.isFile); err != nil {    return 0, 0, 0, err  }  for {    n, oobn, sysflags, err := unix.RecvmsgInet4(fd.Sysfd, p, oob, flags, sa4)    if err != nil {        if err == syscall.EINTR {          continue        }        // TODO(dfc) should n and oobn be set to 0        if err == syscall.EAGAIN && fd.pd.pollable() {          if err = fd.pd.waitRead(fd.isFile); err == nil {              continue          }        }    }    err = fd.eofError(n, err)    return n, oobn, sysflags, err  }}

回到 func (sl *sysListener) listenTCP 方法,函数 internetSocket 返回一个套接字结构体的实例,用来构建 TCPListener 对象&TCPListener{fd: fd, lc: sl.ListenConfig}。后面 accept tcp conn 时,会用到 net.netFD 的 accept 方法,后者只是封装了 poll.DF 的 Accept 方法。

回到 net/http 下的 struct Server 的 ListenAndServe 方法,它包含两步:

net.Listen 方法获取 ln *TCPListener。srv.Serve(ln)。

前面详细说明了第一步的细节,后面我们看第二步如何Serve。

TCPListenr.Accept

对于 linux下的 tcp server,系统调用 accept 发生在 socket、bind、listen 之后,它从内核中的 ESTABLISHED 队列中获取一个建立完成的链接。通过函数socket生成的套接字sockfd可以是阻塞或非阻塞(NONBLOCK),它的函数声明如下:

#include int accept(int sockfd, struct sockaddr *restrict addr,          socklen_t *restrict addrlen);#define _GNU_SOURCE             /* See feature_test_macros(7) */#include int accept4(int sockfd, struct sockaddr *restrict addr,          socklen_t *restrict addrlen, int flags);

对于阻塞/非阻塞的套接字, accept 的表现并不相同:

阻塞的sockfd: 调用方会一直被阻塞,直到有一个ESTABLISHED的tcp conn。非阻塞的sockfd: 函数accept会返回 EAGAIN 或 EWOULDBLOCK 的错误。

Go net库使用的是非阻塞的套接字,我们看这部分代码的逻辑:

// net/net.go struct Serverfunc (srv *Server) Serve(l net.Listener) error {  for {    rw, err := l.Accept()    // ... 省略部分代码

这里 ln 的类型是 *TCPListener, 其方法Accept的定义如下:

// Accept implements the Accept method in the Listener interface; it// waits for the next call and returns a generic Conn.func (l *TCPListener) Accept() (Conn, error) {  if !l.ok() {    return nil, syscall.EINVAL  }  c, err := l.accept()  if err != nil {    return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}  }  return c, nil}func (ln *TCPListener) accept() (*TCPConn, error) {  fd, err := ln.fd.accept()  if err != nil {    return nil, err  }  tc := newTCPConn(fd)  if ln.lc.KeepAlive >= 0 {    setKeepAlive(fd, true)    ka := ln.lc.KeepAlive    if ln.lc.KeepAlive == 0 {      ka = defaultTCPKeepAlive    }    setKeepAlivePeriod(fd, ka)  }  return tc, nil}

struct TCPListener 的结构如下, accept 依赖成员变量 fd *net.netFD,它通过 pdf poll.FD 的 Accept 方法获取client端的套接字,并封装成一个 net.netFD 对象:

// TCPListener is a TCP network listener. Clients should typically// use variables of type Listener instead of assuming TCP.type TCPListener struct {  fd *netFD  lc ListenConfig}func (fd *netFD) accept() (netfd *netFD, err error) {  d, rsa, errcall, err := fd.pfd.Accept()  // ... 省略部分代码  if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {    poll.CloseFunc(d)    return nil, err  }  if err = netfd.init(); err != nil {    netfd.Close()    return nil, err  }  lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)  netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))  return netfd, nil}

继续看 poll.FD 的方法 Accept。它内部是一个 for 循环,先尝试通过系统调用accpt4 获取一个套接字,结果会有下面几种情况:

获取成功, err == nil, 函数直接return即可。syscall.EINTR 表示系统调用期间收到操作系统的信号,但并没有实质的错误发生,所以选择重试。syscall.EAGAIN 表示目前并没有establish新的tcp conn,处理是通过 waitRead 将当前goroutine挂起,等待被唤醒。syscall.ECONNABORTED 表示远程连接已经在ESTABLISHED队列,还没有被Accept时,client端放弃连接。其他错误: 返回一个错误。
// Accept wraps the accept network call.func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {  if err := fd.readLock(); err != nil {    return -1, nil, "", err  }  defer fd.readUnlock()  if err := fd.pd.prepareRead(fd.isFile); err != nil {    return -1, nil, "", err  }  for {    s, rsa, errcall, err := accept(fd.Sysfd)    if err == nil {      return s, rsa, "", err    }    switch err {    case syscall.EINTR:      continue    case syscall.EAGAIN:      if fd.pd.pollable() {        if err = fd.pd.waitRead(fd.isFile); err == nil {          continue        }      }    case syscall.ECONNABORTED:      // This means that a socket on the listen      // queue was closed before we Accept()ed it;      // it"s a silly error, so try again.      continue    }    return -1, nil, errcall, err  }}// 代码路径: internal/poll/sock_cloexec.go// Wrapper around the accept system call that marks the returned file// descriptor as nonblocking and close-on-exec.func accept(s int) (int, syscall.Sockaddr, string, error) {  ns, sa, err := Accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)  // ... 省略部分代码

另外可以看到,accept4 系统调用时,传入了 SOCK_NONBLOCK 和 SOCK_CLOEXEC 两个 flag,socket 系统调用也使用了这两个 flag。

通过 ln.Accept 获取到ESTABLISHED连接的套接字以后,就可以对远端的client进行服务了。

在本文中,总共有两类套接字(socket):

server端监听的套接字, 通过socket系统调用创建。它的生命周期和server同样长。已连接的套接字, 通过accept系统调用创建。它的生命周期比较短,尤其是对于应用层是HTTP短链接的情况。

在​​第一篇​​文章"Go BIO/NIO探讨(1):Gin框架中如何处理HTTP请求"中,我们提到了两层 for 循环,本文只是讲了第一层。从阻塞、非阻塞的角度来看,TCPListener.Accept方法看起来是block的实现,但底层的套接字和系统调用设置了 NONBLOCK flag,可以说是基于 NONBLOCK 的方式实现的。单纯从网络的视角看,这称得上是 Non-blocking IO 了。

关键词:

相关新闻

Copyright 2015-2020   三好网  版权所有 联系邮箱:435 22 640@qq.com  备案号: 京ICP备2022022245号-21