|
|
51CTO旗下网站
|
|
移步端
  • Tomcat 中的 NIO 源码分析

    虽然本文的源码篇幅也不短,但是 Tomcat 的源码毕竟不像 Doug Lea 的并发源码那么“变态”,对于绝大多数读者来说,翻阅难度比之前介绍的任何并发源码要简单一些,故此读者不要以为有什么压力。

    笔者:JAVA架构 来源:当日第一| 2019-12-31 14:59

    本文将介绍 Tomcat 中的 NIO 采用,使大家对 Java NIO 的生产使用有更加直观的认识。

    虽然本文的源码篇幅也不短,但是 Tomcat 的源码毕竟不像 Doug Lea 的并发源码那么“变态”,对于绝大多数读者来说,翻阅难度比之前介绍的任何并发源码要简单一些,故此读者不要以为有什么压力。

    本文基于 Tomcat 眼前(2018-03-20)新型版本 9.0.6。

    先简单画一张图示意一下本文的首要内容:

    Tomcat 中的 NIO 源码分析

    目录

    源码环境准备

    出于上面下载的 tomcat 的源码并没有使用 maven 拓展组织,艰苦我们看源码,也不方便我们进行调整。此地我们将使用 maven 仓库中的 tomcat-embed-core,和谐编排代码进行启动的措施来开展调整。

    第一,创造一个空的 maven 水利,下一场添加以下依赖。

           
    1. <dependency> 
    2. <groupId>org.apache.tomcat.embed</groupId> 
    3. <artifactId>tomcat-embed-core</artifactId> 
    4. <version>9.0.6</version> 
    5. </dependency> 

    地方的依赖,只会将 tomcat-embed-core-9.0.6.jar 和 tomcat-annotations-api-9.0.6.jar 两个包引进来,对于本文来说,已经足够了,如果你需要其他职能,要求额外引用其他的依赖,如 Jasper。

    下一场,采用以下启动方法:

           
    1. public static void main(String[] args) throws LifecycleException { 
    2.  
    3. Tomcat tomcat = new Tomcat; 
    4.  
    5. Connector connector = new Connector("HTTP/1.1"); 
    6. connector.setPort(8080); 
    7. tomcat.setConnector(connector); 
    8.  
    9. tomcat.start; 
    10. tomcat.getServer.await; 

    历经以上的编码,咱们的 Tomcat 就启动起来了。

    Tomcat 中的其他接口感兴趣的读者群请自行探索,如设置 webapp 目录,安装 resources 等

    此地,介绍第一个重要的定义:Connector。在 Tomcat 官方,采用 Connector 来处理连接,一度 Tomcat 可以配置多个 Connector,离别用于监听不同端口,或处理不同协议。

    在 Connector 的结构方法中,咱们可以传 HTTP/1.1或AJP/1.3用于指定协议,也得以传相应的商谈处理类,毕竟协议不是重点,名将不同端口进来的过渡对应不同处理类才是正道。突出地,咱们可以指定以下几个协议处理类:

  • org.apache.coyote.http11.Http11NioProtocol:对应非阻塞 IO
  • org.apache.coyote.http11.Http11Nio2Protocol:对应异步 IO
  • org.apache.coyote.http2.Http2Protocol:对应 http2 协和,对 http2 感兴趣的读者群,赶紧看起来吧。
  • 本文的基本点当然是非阻塞 IO 了,先前已经介绍过异步 IO的根基知识了,读者看完本文后,如果对异步 IO 的拍卖流程感兴趣,可以自动去分析一遍。

    如果你利用 9.0 此前的本子,Tomcat 在起步的时节是会自动配置一个 connector 的,咱们可以不用显示配置。

    9.0 本子的 Tomcat#start 办法:

           
    1. public void start throws LifecycleException { 
    2. getServer; 
    3. server.start; 

    8.5 及之前版本的 Tomcat#start 办法:

           
    1. public void start throws LifecycleException { 
    2. getServer; 
    3. // 机动配置一个用到非阻塞 IO 的 connector 
    4. getConnector; 
    5. server.start; 

    endpoint

    眼前我们说过一个 Connector 对应一个协议,当然这描述也不太对,NIO 和 NIO2 就都是处理 HTTP/1.1 的,只不过一个用到非阻塞,一度用到异步。趟到指定 protocol 代码,咱们就会发现,它们的编码及他简单,只不过是指定了一定的 endpoint。

    开拓 Http11NioProtocol和Http11Nio2Protocol源码,咱们可以看出,在结构方法中,它们分别指定了 NioEndpoint 和 Nio2Endpoint。

           
    1. // 非阻塞模式 
    2. public classHttp11NioProtocolextendsAbstractHttp11JsseProtocol<NioChannel> { 
    3. publicHttp11NioProtocol { 
    4. // NioEndpoint 
    5. super(new NioEndpoint); 
    6. ... 
    7. // 异步模式 
    8. public classHttp11Nio2ProtocolextendsAbstractHttp11JsseProtocol<Nio2Channel> { 
    9.  
    10. publicHttp11Nio2Protocol { 
    11. // Nio2Endpoint 
    12. super(new Nio2Endpoint); 
    13. ... 

    此地介绍第二个基本点的定义:endpoint。Tomcat 采用不同之 endpoint 来处理不同之商谈请求,当日我们的基本点是 NioEndpoint,他使用非阻塞 IO 来开展拍卖 HTTP/1.1 协和的呼吁。

    NioEndpoint 连续 => AbstractJsseEndpoint 连续 => AbstractEndpoint。中间的 AbstractJsseEndpoint 重点是提供了部分关于 HTTPS的主意,这块我们暂时忽略它,后面所有关于 HTTPS 的我们都直接忽略,感兴趣的读者群请自行分析。

    init 经过分析

    下,咱们看看从 tomcat.start 一直到 NioEndpoint 的经过。

    1. AbstractProtocol # init

           
    1. @Override 
    2. public voidinit throws Exception { 
    3. ... 
    4. String endpointName = getName; 
    5. endpoint.setName(endpointName.substring(, endpointName.length-1 1)); 
    6. endpoint.setDomain(domain); 
    7. // endpoint 的 name=http-nio-8089,domain=Tomcat 
    8. endpoint.init; 

    2. AbstractEndpoint # init

           
    1. public final void init throws Exception { 
    2. if (bindOnInit) { 
    3. bind; // 此地对应的当然是子类 NioEndpoint 的 bind 办法 
    4. bindState = BindState.BOUND_ON_INIT; 
    5. ... 

    3. NioEndpoint # bind

    此地就到我们的 NioEndpoint 了,要使用到我们之前学习之 NIO 的所见所闻了。

           
    1. @Override 
    2. public voidbind throws Exception { 
    3. // initServerSocket; 原代码是这趟,咱们 “内联” 来到一起说 
    4.  
    5. // 起来 ServerSocketChannel 
    6. serverSock = ServerSocketChannel.open
    7. socketProperties.setProperties(serverSock.socket); 
    8.  
    9. // getPort 会回来我们最开始设置的 8080,得到我们的 address 是 0.0.0.0:8080 
    10. InetSocketAddress addr = (getAddress!=?new InetSocketAddress(getAddress,getPort):new InetSocketAddress(getPort)); 
    11.  
    12. // ServerSocketChannel 绑定地址、端口, 
    13. // 其次个参数 backlog 默认为 100,超过 100 的时节,新连接会把拒绝(不过源码注释也说了,其一值的真正语义取决于具体实现) 
    14. serverSock.socket.bind(addr,getAcceptCount); 
    15.  
    16. // ※※※ 安装 ServerSocketChannel 为阻塞模式 ※※※ 
    17. serverSock.configureBlocking(true); 
    18.  
    19. // 安装 acceptor 和 poller 的多寡,至于它们是什么角色,待会说 
    20. // acceptorThreadCount 默认为 1 
    21. if (acceptorThreadCount == 0) { 
    22. // FIXME: Doesn't seem to work that well with multiple accept threads 
    23. // 笔者想发挥的味道应该是:采用多个 acceptor 点程并不见得性能会更好 
    24. acceptorThreadCount = 1; 
    25.  
    26. // poller 点程数,默认值定义如下,故此在多核模式下,默认为 2 
    27. // pollerThreadCount = Math.min(2,Runtime.getRuntime.availableProcessors); 
    28. if (pollerThreadCount <= 0) { 
    29. pollerThreadCount = 1; 
    30.  
    31. // 
    32. setStopLatch(new CountDownLatch(pollerThreadCount)); 
    33.  
    34. // 初始化 ssl,咱们忽略 ssl 
    35. initialiseSsl; 
    36.  
    37. // 开拓 NioSelectorPool,先忽略它 
    38. selectorPool.open
    1. ServerSocketChannel 已经打开,并且绑定要了之前指定的 8080 端口,安装成了绿灯模式。
    2. 安装了 acceptor 的点程数为 1
    3. 安装了 poller 的点程数,单核 CPU 为 1,多核为 2
    4. 开拓了一番 SelectorPool,咱们先忽略这个

    到此处,咱们还不知晓 Acceptor 和 Poller 是什么东西,咱们只是设置了它们的多寡,咱们先来看望最后面提到的 SelectorPool。

    start 经过分析

    刚刚我们分析完了 init 经过,下是启动过程 start 剖析。

    AbstractProtocol # start

           
    1. @Override 
    2. public voidstart throws Exception { 
    3. ... 
    4. // 租用 endpoint 的 start 办法 
    5. endpoint.start; 
    6.  
    7. // Start async timeout thread 
    8. asyncTimeout = new AsyncTimeout; 
    9. Thread timeoutThread = new Thread(asyncTimeout, getNameInternal + "-AsyncTimeout"); 
    10. int priority = endpoint.getThreadPriority; 
    11. if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { 
    12. priority = Thread.NORM_PRIORITY; 
    13. timeoutThread.setPriority(priority); 
    14. timeoutThread.setDaemon(true); 
    15. timeoutThread.start; 

    AbstractEndpoint # start

           
    1. public final void start throws Exception { 
    2. // 按照我们的流程,刚刚 init 的时节,已经把 bindState 成为 BindState.BOUND_ON_INIT 了, 
    3. // 故此下面的 if 分支我们就不进来了 
    4. if (bindState == BindState.UNBOUND) { 
    5. bind; 
    6. bindState = BindState.BOUND_ON_START; 
    7. // 往里看 NioEndpoint 的贯彻 
    8. startInternal; 

    下这个艺术还是比较重要的,此地会创建前面说过的 acceptor 和 poller。

    NioEndpoint # startInternal

           
    1. @Override 
    2. public voidstartInternal throws Exception { 
    3.  
    4. if (!running) { 
    5. running = true
    6. paused = false
    7.  
    8. // 以下几个是缓存用之,后我们也会见到许多这样的编码,为了削减 new 有的是对象出来 
    9. processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, 
    10. socketProperties.getProcessorCache); 
    11. eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, 
    12. socketProperties.getEventCache); 
    13. nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, 
    14. socketProperties.getBufferPool); 
    15.  
    16. // 创造【上班线程池】,Tomcat 和谐包装了一下 ThreadPoolExecutor, 
    17. // 1. 为了在创造线程池下,先启动 corePoolSize 个线程(其一属于线程池之所见所闻了,不习的读者群可以看看我之前的篇章) 
    18. // 2. 和谐管理线程池之增强方式(默认 corePoolSize 10, maxPoolSize 200),不是本文重点,不分析 
    19. if ( getExecutor == ) { 
    20. createExecutor; 
    21.  
    22. // 安装一个栅栏(tomcat 自定义了类 LimitLatch),控制最大的过渡数,默认是 10000 
    23. initializeConnectionLatch; 
    24.  
    25. // 起来 poller 点程 
    26. // 还记得之前 init 的时节,默认地设置了 poller 的多寡为 2,故此这里启动 2 个 poller 点程 
    27. pollers = new Poller[getPollerThreadCount()]; 
    28. for (int i=0; i<pollers.length; i++) { 
    29. pollers[i] = new Poller; 
    30. Thread pollerThread = new Thread(pollers[i], getName + "-ClientPoller-"+i); 
    31. pollerThread.setPriority(threadPriority); 
    32. pollerThread.setDaemon(true); 
    33. pollerThread.start; 
    34.  
    35. // 起来 acceptor 点程,和开始 poller 点程组差不多。 
    36. // init 的时节,默认地,acceptor 的点程数是 1 
    37. startAcceptorThreads; 

    到此处,咱们启动了办事线程池、 poller 点程组、acceptor 点程组。同时,上班线程池初步就已经启动了 10 个线程。咱们用 jconsole 来看望此时的点程,请看下图:

    Tomcat 中的 NIO 源码分析

    副 jconsole 官方,咱们可以看出,此刻启动了 BlockPoller、worker、poller、acceptor、AsyncTimeout,大家应该都已经知道了每个线程是手里启动的吧。

    Tomcat 官方并没有 Worker 其一类,此名字是我瞎编。

    此刻,咱们还是不知晓 acceptor、poller 甚至 worker 到底是干嘛的,下,咱们从 acceptor 点程开始看起。

    Acceptor

    他的组织非常简单,在结构函数中,已经把 endpoint 传播进来了,另外就只有 threadName 和 state 两个简单的习性。

           
    1. private final AbstractEndpoint<?,U> endpoint; 
    2. private String threadName; 
    3. protected volatile AcceptorState state = AcceptorState.NEW; 
    4.  
    5. publicAcceptor(AbstractEndpoint<?,U> endpoint) { 
    6. this.endpoint = endpoint; 

    threadName 就是一番点程名字而已,Acceptor 的状态 state 重点是随着 endpoint 来之。

           
    1. public enum AcceptorState { 
    2. NEW, RUNNING, PAUSED, ENDED 

    咱们直接来看 acceptor 的 run 办法吧:

    Acceptor # run

           
    1. @Override 
    2. public voidrun { 
    3.  
    4. int errorDelay = 0; 
    5.  
    6. // 只要 endpoint 处于 running,此地就一直循环 
    7. while (endpoint.isRunning) { 
    8.  
    9. // 如果 endpoint 处于 pause 状态,此处 Acceptor 用一个 while 循环将团结也挂起 
    10. while (endpoint.isPaused && endpoint.isRunning) { 
    11. state = AcceptorState.PAUSED; 
    12. try { 
    13. Thread.sleep(50); 
    14. } catch (InterruptedException e) { 
    15. // Ignore 
    16. // endpoint 结束了,Acceptor 潇洒也要结束嘛 
    17. if (!endpoint.isRunning) { 
    18. break; 
    19. state = AcceptorState.RUNNING; 
    20.  
    21. try { 
    22. // 如果此时达到了最大连接数(先前我们说过,默认是10000),就等待 
    23. endpoint.countUpOrAwaitConnection; 
    24.  
    25. // Endpoint might have been paused while waiting for latch 
    26. // If that is the case, don't accept new connections 
    27. if (endpoint.isPaused) { 
    28. continue
    29.  
    30. U socket = ; 
    31. try { 
    32. // 此地就是吸收下一个进来的 SocketChannel 
    33. // 先前我们设置了 ServerSocketChannel 为阻塞模式,故此这边的 accept 是阻塞的 
    34. socket = endpoint.serverSocketAccept; 
    35. } catch (Exception ioe) { 
    36. // We didn't get a socket 
    37. endpoint.countDownConnection; 
    38. if (endpoint.isRunning) { 
    39. // Introduce delay if necessary 
    40. errorDelay = handleExceptionWithDelay(errorDelay); 
    41. // re-throw 
    42. throw ioe; 
    43. else { 
    44. break; 
    45. // accept 成功,名将 errorDelay 安装为 0 
    46. errorDelay = 0; 
    47.  
    48. if (endpoint.isRunning && !endpoint.isPaused) { 
    49. // setSocketOptions 是此处的严重性方法,具体地说前面千辛万苦都是为了能到此处进行拍卖 
    50. if (!endpoint.setSocketOptions(socket)) { 
    51. // 如果上面的主意返回 false,关闭 SocketChannel 
    52. endpoint.closeSocket(socket); 
    53. else { 
    54. // 出于 endpoint 不 running 了,或者处于 pause 了,名将此 SocketChannel 关闭 
    55. endpoint.destroySocket(socket); 
    56. } catch (Throwable t) { 
    57. ExceptionUtils.handleThrowable(t); 
    58. String msg = sm.getString("endpoint.accept.fail"); 
    59. // APR specific. 
    60. // Could push this down but not sure it is worth the trouble. 
    61. if (t instanceof Error) { 
    62. Error e = (Error) t; 
    63. if (e.getError == 233) { 
    64. // Not an error on HP-UX so log as a warning 
    65. // so it can be filtered out on that platform 
    66. // See bug 50273 
    67. log.warn(msg, t); 
    68. else { 
    69. log.error(msg, t); 
    70. else { 
    71. log.error(msg, t); 
    72. state = AcceptorState.ENDED; 

    大家应该发现了,Acceptor 绕来绕去,都是在调用 NioEndpoint 的主意,咱们简单分析一下这个。

    在 NioEndpoint init 的时节,咱们开始了一番 ServerSocketChannel,新兴 start 的时节,咱们开始多个 acceptor(实际上,默认是 1 个),每个 acceptor 起先后就开始循环调用 ServerSocketChannel 的 accept 办法获取新的连接,下一场调用 endpoint.setSocketOptions(socket) 拍卖新的连接,后再进来循环 accept 从一个连接。

    到此处,大家应该也就掌握了,为什么这个叫 acceptor 了吧?然后,咱们来看望 setSocketOptions 办法到底做了什么。

    NioEndpoint # setSocketOptions

           
    1. @Override 
    2. protected booleansetSocketOptions(SocketChannel socket) { 
    3. try { 
    4. // 安装该 SocketChannel 为非阻塞模式 
    5. socket.configureBlocking(false); 
    6. Socket sock = socket.socket; 
    7. // 安装 socket 的组成部分属性 
    8. socketProperties.setProperties(sock); 
    9.  
    10. // 还记得 startInternal 的时节,说过了 nioChannels 是缓存用之。 
    11. // 限于篇幅,此地的 NioChannel 就不进行了,他包括了 socket 和 buffer 
    12. NioChannel channel = nioChannels.pop; 
    13. if (channel == ) { 
    14. // 重点是创造读和写的两个 buffer,默认地,读和写 buffer 都是 8192 字节,8k 
    15. SocketBufferHandler bufhandler = new SocketBufferHandler( 
    16. socketProperties.getAppReadBufSize, 
    17. socketProperties.getAppWriteBufSize, 
    18. socketProperties.getDirectBuffer); 
    19. if (isSSLEnabled) { 
    20. channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); 
    21. else { 
    22. channel = new NioChannel(socket, bufhandler); 
    23. else { 
    24. channel.setIOChannel(socket); 
    25. channel.reset; 
    26.  
    27. // getPoller0 会选取所有 poller 中的一个 poller 
    28. getPoller0.register(channel); 
    29. } catch (Throwable t) { 
    30. ExceptionUtils.handleThrowable(t); 
    31. try { 
    32. log.error("",t); 
    33. } catch (Throwable tt) { 
    34. ExceptionUtils.handleThrowable(tt); 
    35. // Tell to close the socket 
    36. return false
    37. return true

    咱们看来,此地又没有进行具体的拍卖,而是将以此 SocketChannel 登记到了其中一个 poller 上。因为我们掌握,acceptor 有道是尽可能的简短,只做 accept 的上班,大概处理下就往后面扔。acceptor 还得回到之前的循环去 accept 新的连接呢。

    咱们只要求了解,此刻,往 poller 官方注册了一番 NioChannel 老,此实例包含客户端过来的 SocketChannel 和一个 SocketBufferHandler 老。

    Poller

    先前我们看来 acceptor 名将一个 NioChannel 老 register 到了一番 poller 官方。在看 register 办法之前,咱们需要先对 poller 要有个简单的认识。

           
    1. public class PollerimplementsRunnable{ 
    2.  
    3. publicPoller throws IOException { 
    4. // 每个 poller 起来一个 Selector 
    5. this.selector = Selector.open
    6. private Selector selector; 
    7. // events 列,该系的骨干 
    8. private final SynchronizedQueue<PollerEvent> events = 
    9. new SynchronizedQueue<>; 
    10.  
    11. private volatile boolean close = false
    12. private long nextExpiration = 0;//optimize expiration handling 
    13.  
    14. // 其一值后面有用,牢记它的初始值为 0 
    15. private AtomicLong wakeupCounter = new AtomicLong(0); 
    16.  
    17. private volatile int keyCount = 0; 
    18.  
    19. ... 

    敲重点:每个 poller 沟通了一番 Selector。

    Poller 其间围着一个 events 列转,观看看人家 events 办法:

           
    1. public boolean events { 
    2. boolean result = false
    3.  
    4. PollerEvent pe = ; 
    5. for (int i = 0, size = events.size; i < size && (pe = events.poll) != ; i++ ) { 
    6. result = true
    7. try { 
    8. // 逐个执行 event.run 
    9. pe.run; 
    10. // 该 PollerEvent 还得给以后用,此地 reset 瞬间(还是之前说过的内存) 
    11. pe.reset; 
    12. if (running && !paused) { 
    13. eventCache.push(pe); 
    14. } catch ( Throwable x ) { 
    15. log.error("",x); 
    16. return result; 

    events 办法比较简便,就是取出当前队列中的 PollerEvent 目标,逐个执行 event.run 办法。

    下一场,如今来看 Poller 的 run 办法,该方法会一直循环,直到 poller.destroy 把调用。

    Poller # run

           
    1. public void run { 
    2. while (true) { 
    3.  
    4. boolean hasEvents = false
    5.  
    6. try { 
    7. if (!close) { 
    8. // 推行 events 列中每篇 event 的 run 办法 
    9. hasEvents = events; 
    10. // wakeupCounter 的初始值为 0,此地设置为 -1 
    11. if (wakeupCounter.getAndSet(-) > 1 0) { 
    12. //if we are here, means we have other stuff to do 
    13. //do a non blocking select 
    14. keyCount = selector.selectNow; 
    15. else { 
    16. // timeout 默认值 1 秒 
    17. keyCount = selector.select(selectorTimeout); 
    18. wakeupCounter.set(0); 
    19. // 篇幅所限,咱们就不说 close 的状况了 
    20. if (close) { 
    21. events; 
    22. timeout(, 0 false); 
    23. try { 
    24. selector.close
    25. } catch (IOException ioe) { 
    26. log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); 
    27. break; 
    28. } catch (Throwable x) { 
    29. ExceptionUtils.handleThrowable(x); 
    30. log.error("",x); 
    31. continue
    32. //either we timed out or we woke up, process events first 
    33. // 此地没什么好说的,顶多就再实施一次 events 办法 
    34. if ( keyCount == 0 ) hasEvents = (hasEvents | events); 
    35.  
    36. // 如果刚刚 select 有返回 ready keys,拓展拍卖 
    37. Iterator<SelectionKey> iterator = 
    38. keyCount > 0 ? selector.selectedKeys.iterator : ; 
    39. // Walk through the collection of ready keys and dispatch 
    40. // any active event. 
    41. while (iterator != && iterator.hasNext) { 
    42. SelectionKey sk = iterator.next
    43. NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment; 
    44. // Attachment may be if another thread has called 
    45. // cancelledKey 
    46. if (attachment == ) { 
    47. iterator.remove; 
    48. else { 
    49. iterator.remove; 
    50. // ※※※※※ 拍卖 ready key ※※※※※ 
    51. processKey(sk, attachment); 
    52. }//while 
    53.  
    54. //process timeouts 
    55. timeout(keyCount,hasEvents); 
    56. }//while 
    57.  
    58. getStopLatch.countDown; 

    poller 的 run 办法主要做了调用 events 办法和处理注册到 Selector 上的 ready key,此地我们暂时不进行 processKey 办法,因为此方法必定是及其复杂的。

    咱们回过头来看之前从 acceptor 点程中租用的 register 办法。

    Poller # register

           
    1. public void register(final NioChannel socket) { 
    2. socket.setPoller(this); 
    3. NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); 
    4. socket.setSocketWrapper(ka); 
    5. ka.setPoller(this); 
    6. ka.setReadTimeout(getConnectionTimeout); 
    7. ka.setWriteTimeout(getConnectionTimeout); 
    8. ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests); 
    9. ka.setSecure(isSSLEnabled); 
    10.  
    11. PollerEvent r = eventCache.pop; 
    12. ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into
    13.  
    14. // 瞩目第三个参数值 OP_REGISTER 
    15. if ( r==) r = new PollerEvent(socket,ka,OP_REGISTER); 
    16. else r.reset(socket,ka,OP_REGISTER); 
    17.  
    18. // 补充 event 到 poller 官方 
    19. addEvent(r); 

    此地将以此 socket(包含 socket 和 buffer 的 NioChannel 老) 包装为一个 PollerEvent,下一场添加到 events 官方,此刻调用此方法的 acceptor 结束返回,扮演处理新的 accepted 联网了。

    然后,咱们已经清楚了,poller 点程在循环过程中会不断调用 events 办法,这就是说 PollerEvent 的 run 办法迅速就会把实践,咱们就来看望刚刚这个新的连接被注册到这个 poller 此后,会发生什么。

    PollerEvent # run

           
    1. @Override 
    2. public voidrun { 
    3. // 对于新来之过渡,眼前我们说过,interestOps == OP_REGISTER 
    4. if (interestOps == OP_REGISTER) { 
    5. try { 
    6. // 这步很重要!!! 
    7. // 名将以此新连接 SocketChannel 登记到该 poller 的 Selector 官方, 
    8. // 安装监听 OP_READ 事件, 
    9. // 名将 socketWrapper 安装为 attachment 拓展传递(其一目标可是什么鬼都有,往上看就掌握了) 
    10. socket.getIOChannel.register( 
    11. socket.getPoller.getSelector, SelectionKey.OP_READ, socketWrapper); 
    12. } catch (Exception x) { 
    13. log.error(sm.getString("endpoint.nio.registerFail"), x); 
    14. else { 
    15. /* else 这块不介绍,省得大家端大 */ 
    16.  
    17. final SelectionKey key = socket.getIOChannel.keyFor(socket.getPoller.getSelector); 
    18. try { 
    19. if (key == ) { 
    20. // The key was cancelled (e.g. due to socket closure) 
    21. // and removed from the selector while it was being 
    22. // processed. Count down the connections at this point 
    23. // since it won't have been counted down when the socket 
    24. // closed. 
    25. socket.socketWrapper.getEndpoint.countDownConnection; 
    26. else { 
    27. final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment; 
    28. if (socketWrapper != ) { 
    29. //we are registering the key to start with, reset the fairness counter. 
    30. int ops = key.interestOps | interestOps; 
    31. socketWrapper.interestOps(ops); 
    32. key.interestOps(ops); 
    33. else { 
    34. socket.getPoller.cancelledKey(key); 
    35. } catch (CancelledKeyException ckx) { 
    36. try { 
    37. socket.getPoller.cancelledKey(key); 
    38. } catch (Exception ignore) {} 

    到此处,咱们再回首一下:刚刚在 PollerEvent 的 run 办法中,咱们看来,新的 SocketChannel 登记到了 Poller 其间的 Selector 官方,监听 OP_READ 事件,下一场我们再回去 Poller 的 run 瞧下,一旦该 SocketChannel 是 readable 的状态,这就是说就会进入到 poller 的 processKey 办法。

    processKey

    Poller # processKey

           
    1. protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { 
    2. try { 
    3. if ( close ) { 
    4. cancelledKey(sk); 
    5. else if ( sk.isValid && attachment != ) { 
    6. if (sk.isReadable || sk.isWritable ) { 
    7. // 忽视 sendfile 
    8. if ( attachment.getSendfileData != ) { 
    9. processSendfile(sk,attachment, false); 
    10. else { 
    11. // unregister 应当的 interest set, 
    12. // 如接下来是处理 SocketChannel 进去的多寡,这就是说就不再监听该 channel 的 OP_READ 事件 
    13. unreg(sk, attachment, sk.readyOps); 
    14. boolean closeSocket = false
    15. // Read goes before write 
    16. if (sk.isReadable) { 
    17. // 拍卖读 
    18. if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { 
    19. closeSocket = true
    20. if (!closeSocket && sk.isWritable) { 
    21. // 拍卖写 
    22. if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { 
    23. closeSocket = true
    24. if (closeSocket) { 
    25. cancelledKey(sk); 
    26. else { 
    27. //invalid key 
    28. cancelledKey(sk); 
    29. } catch ( CancelledKeyException ckx ) { 
    30. cancelledKey(sk); 
    31. } catch (Throwable t) { 
    32. ExceptionUtils.handleThrowable(t); 
    33. log.error("",t); 

    然后是 processSocket 办法,瞩目第三个参数,地方进来的时节是 true。

    AbstractEndpoint # processSocket

           
    1. public boolean processSocket(SocketWrapperBase<S> socketWrapper, 
    2. SocketEvent event, boolean dispatch) { 
    3. try { 
    4. if (socketWrapper == ) { 
    5. return false
    6. SocketProcessorBase<S> sc = processorCache.pop; 
    7. if (sc == ) { 
    8. // 创造一个 SocketProcessor 的范例 
    9. sc = createSocketProcessor(socketWrapper, event); 
    10. else { 
    11. sc.reset(socketWrapper, event); 
    12. Executor executor = getExecutor; 
    13. if (dispatch && executor != ) { 
    14. // 名将任务放到之前建立之 worker 点程池中实施 
    15. executor.execute(sc); 
    16. else { 
    17. sc.run; // ps: 如果 dispatch 为 false,这就是说就现阶段线程自己执行 
    18. } catch (RejectedExecutionException ree) { 
    19. getLog.warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); 
    20. return false
    21. } catch (Throwable t) { 
    22. ExceptionUtils.handleThrowable(t); 
    23. // This means we got an OOM or similar creating a thread, or that 
    24. // the pool and its queue are full 
    25. getLog.error(sm.getString("endpoint.process.fail"), t); 
    26. return false
    27. return true

    NioEndpoint # createSocketProcessor

           
    1. @Override 
    2. protected SocketProcessorBase<NioChannel>createSocketProcessor( 
    3. SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) { 
    4. return new SocketProcessor(socketWrapper, event); 

    咱们看来,付出到 worker 点程池中的是 NioEndpoint.SocketProcessor 的范例,至于它的 run 办法之后的逻辑,咱们就不再继续往里分析了。

    总结

    说到底,再祭出文章开始的那张图来总结一下:

    此地简单梳理下前面我们说的流程,起大家回忆一下:

    1. 指定 Protocol,初始化相应的 Endpoint,咱们分析的是 NioEndpoint;
    2. init 经过:在 NioEndpoint 官方做 bind 借鉴;
    3. start 经过:起先 worker 点程池,起先 1 个 Acceptor 和 2 个 Poller,当然它们都是公认值,可配;
    4. Acceptor 获取到新的连接后,getPoller0 获取其中一个 Poller,下一场 register 到 Poller 官方;
    5. Poller 循环 selector.select(xxx),如果有通道 readable,这就是说在 processKey 官方将他放到 worker 点程池中。

    【编纂推荐】

    1. 讨论 Tomcat 呼吁处理流程
    2. Tomcat是一番Servlet容器?
    3. Java 类在 Tomcat 官方是如何加载的?
    4. Web传感器Tomcat的那些架构模块,您都晓得吧?
    5. 高可用 负载均衡 集群部署方案:Keepalived + Nginx + Tomcat
    【义务编辑: 武晓燕 TEL:(010)68476606】


    点赞 0
  • Tomcat  NIO  Java
  • 分享:
    大家都在看
    猜你喜欢
  • 订阅专栏+更多

    云架构师修炼手册

    云架构师修炼手册

    云架构师之必不可少技能
    共3章 | Allen在路上

    21人口订阅学习

    Devops的监控神器Prometheus

    Devops的监控神器Prometheus

    监督主流
    共22章 | 小罗ge11

    172人口订阅学习

    手把手玩转Elasticsearch

    手把手玩转Elasticsearch

    Chandler_珏瑜
    共20章 | Chandler_珏瑜

    80人口订阅学习

    读 书 +更多

    迅速Acegi、CAS:构建安全的Java系统

    该书是关于Acegi、CAS的独尊教程,是Java/Java EE竞争性开发者的必不可少参考书。不论Java EE竞争性编程模型的全景和基础知识,还是Acegi、CA...

    订阅51CTO邮刊

    点击这里查看样刊

    订阅51CTO邮刊

    51CTO劳务号

    51CTO官微


  •