for { // break-02 if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { returnnil } return err }
// break-03 if !wp.Serve(c) { s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served because Server.Concurrency limit exceeded") c.Close() if time.Since(lastOverflowErrorTime) > time.Minute { s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+ "Try increasing Server.Concurrency", maxWorkersCount) lastOverflowErrorTime = CoarseTimeNow() }
// The current server reached concurrency limit, // so give other concurrently running servers a chance // accepting incoming connections on the same address. // // There is a hope other servers didn't reach their // concurrency limits yet :) time.Sleep(100 * time.Millisecond) } c = nil } }
// 有必要了解一下 workerPool 的结构 type workerPool struct { // Function for serving server connections. // It must leave c unclosed. WorkerFunc func(c net.Conn)error
// Clean least recently used workers if they didn't serve connections // for more than maxIdleWorkerDuration. currentTime := time.Now()
wp.lock.Lock() ready := wp.ready n := len(ready) i := 0 // 这里从队列头部取出超过 最大空闲时间 的workerChan。 // 可以看出,最后使用的workerChan 一定是放回队列尾部的。 for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration { i++ } // 把空闲的放入 scratch, 剩余的放回 ready *scratch = append((*scratch)[:0], ready[:i]...) if i > 0 { m := copy(ready, ready[i:]) for i = m; i < n; i++ { ready[i] = nil } wp.ready = ready[:m] } wp.lock.Unlock()
// Notify obsolete workers to stop. // This notification must be outside the wp.lock, since ch.ch // may be blocking and may consume a lot of time if many workers // are located on non-local CPUs. tmp := *scratch // 销毁的操作就是向 chan net.Conn 中塞入一个 nil, 后面会看到解释 for i, ch := range tmp { ch.ch <- nil tmp[i] = nil } }