jasper的技术小窝

关注DevOps、运维监控、Python、Golang、开源、大数据、web开发、互联网

深入Golang之netpoller

作者:jasper | 分类:Golang | 标签:   | 阅读 77 次 | 发布:2017-11-26 9:33 p.m.

Golang是一门主要面向互联网环境的分布式语言,这意味着它需要面对高并发的挑战。当系统出现高并发的IO访问时,如一个网络服务器通常要并发处理成百上千的链接,每个链接可能都是由一个用户任务执行的,那么将会出现大量阻塞的IO操作,如果为每个阻塞操作都单独分配一个OS线程,那么将会增加系统的负载。在Golang中针对网络IO做了特别的优化,而这就是我们这一篇将要探讨的netpoller。

所谓的netpoller,其实是Golang实利用了OS提供的非阻塞IO访问模式,并配合epll/kqueue等IO事件监控机制;为了弥合OS的异步机制与Golang接口的差异,而在runtime上做的一层封装。以此来实现网络IO优化。

实际的实现(epoll/kqueue)必须定义以下函数:

func netpollinit():初始化轮询器
func netpollopen(fd uintptr, pd *pollDesc) int32:为fd和pd启动边缘触发通知

pollDesc包含了2个二进制的信号,分别负责读写goroutine的暂停,该信号的两个状态:

  • pdReady:IO就绪通知,一个goroutine将状态置为nil来消费一个通知;
  • pdWait:一个goroutine准备暂停在信号上,但是还没有完成暂停。
const (
    pdReady uintptr = 1
    pdWait  uintptr = 2
)

当一个goroutine进行io阻塞时,会去被放到等待队列。这里面就关键的就是建立起文件描述符和goroutine之间的关联。 pollDesc结构体就是完成这个任务的。它的结构体定义如下:

type pollDesc struct {
    link *pollDesc
    lock    mutex 
    fd      uintptr
    closing bool
    seq     uintptr 
    rg      uintptr 
    rt      timer 
    rd      int64   
    wg      uintptr 
    wt      timer  
    wd      int64  
    user    uint32 
}

首先我们来看一下网络poll描述符的数据结构,lock锁对象保护了pollOpen, pollSetDeadline, pollUnblock和deadlineimpl操作。而这些操作又完全包含了对seq, rt, tw变量。fd在PollDesc整个生命过程中都是一个常量。处理pollReset, pollWait, pollWaitCanceled和runtime.netpollready(IO就绪通知)不需要用到锁,所以closing, rg, rd, wg和wd的所有操作都是一个无锁的操作。

当从网络连接的文件描述符读取数据时,调用system call,循环从fd.sysfd读取数据:

func (fd *FD) Read(p []byte) (int, error) {
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return 0, err
    }
    if fd.IsStream && len(p) > maxRW {
        p = p[:maxRW]
    }
    for {
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

读取的时候只处理EAGAIN类型的错误,其他错误一律返回给调用者,因为对于非阻塞的网络连接的文件描述符,如果错误是EAGAIN,说明Socket的缓冲区为空,未读取到任何数据,则调用fd.pd.WaitRead:

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

res是runtime_pollWait函数返回的结果,由conevertErr函数包装后返回:

func convertErr(res int, isFile bool) error {
    switch res {
    case 0:
        return nil
    case 1:
        return errClosing(isFile)
    case 2:
        return ErrTimeout
    }
    println("unreachable: ", res)
    panic("unreachable")
}

其中0表示io已经准备好了,1表示链接意见关闭,2表示io超时。再来看看pollWait的实现:

func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    err := netpollcheckerr(pd, int32(mode))
    if err != 0 {
        return err
    }
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

调用netpollblock来判断IO是否准备好了:

func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    for {
        old := *gpp
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }
        if waitio || netpollcheckerr(pd, mode) == 0 {
        gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
    }
        old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

返回true说明IO已经准备好,返回false说明IO操作已经超时或者已经关闭。否则当waitio为false, 且io不出现错误或者超时才会挂起当前goroutine。最后的gopark函数,就是将当前的goroutine(调用者)设置为waiting状态。

到这里,当前goroutine对socket文件描述符的等待IO继续的行为已经完成。以上说了这么多,那么在何时将goroutine唤醒呢,其实和之前说的goroutine的调度一样,在sysmon中会不断地调用epoll函数:

lastpoll := int64(atomic.Load64(&sched.lastpoll))
now := nanotime()
if lastpoll != 0 && lastpoll+10*1000*1000 < now {
        atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
        gp := netpoll(false)
                if gp != nil {
                   injectglist(gp)
         }
}

而这里的netpoll会根据操作系统的不同而调用epll/kqueue,寻找到IO就绪的socket文件描述符,并找到这些socket文件描述符对应的轮询器中附带的信息,根据这些信息将之前等待这些socket文件描述符就绪的goroutine状态修改为Grunnable。执行完netpoll之后,会找到一个就绪的goroutine列表,接下来将就绪的goroutine加入到调度队列中,等待调度运行。也就是injectglist(gp)的作用,把g放到sched中去执行,底层仍然是调用的之前在goroutine里面提到的startm函数。

总结

总的来说,netpoller的最终的效果就是用户层阻塞,底层非阻塞。当goroutine读或写阻塞时会被放到等待队列,这个goroutine失去了运行权,但并不是真正的整个系统“阻塞”于系统调用。而通过后台的poller不停地poll,所有的文件描述符都被添加到了这个poller中的,当某个时刻一个文件描述符准备好了,poller就会唤醒之前因它而阻塞的goroutine,于是goroutine重新运行起来。

和使用Unix系统中的select或是poll方法不同地是,Golang的netpoller查询的是能被调度的goroutine而不是那些函数指针、包含了各种状态变量的struct等,这样你就不用管理这些状态,也不用重新检查函数指针等,这些都是你在传统Unix网络I/O需要操心的问题。


转载请注明出处:http://www.opscoder.info/golang_netpoller.html

【上一篇】 深入Golang之sync.pool
【下一篇】 使用InfluxDB监控kafka
其他分类: