thrift 笔记

AcceptThread

AcceptThread 顾名思义就是接受线程,负责接受外来连接,然后把这些连接分配给 SelectorThread 处理。

部分主要代码:

protected class AcceptThread extends Thread {

    ...

    public void run() {
      try {
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }

        while (!stopped_) {
          select();
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        // This will wake up the selector threads
        TThreadedSelectorServer.this.stop();
      }
    }

    private void select() {
      try {
        // wait for connect events.
        acceptSelector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            continue;
          }

          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

    private void handleAccept() {
      final TNonblockingTransport client = doAccept();
      if (client != null) {
        // Pass this connection to a selector thread
        final SelectorThread targetThread = threadChooser.nextThread();

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
          doAddAccept(targetThread, client);
        } else {
          // FAIR_ACCEPT
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
            // close immediately
            client.close();
          }
        }
      }
    }

   ...    

}

大概流程:

1.线程开启,调用eventHandler_.preServe()。

2.调用acceptSelector.select()查询io事件。

3.如果可以 access ,获取一个 SelectorThread(分配 SelectoryThread 的原理是通过轮训来分配的,在内部类 SelectorThreadLoadBalancer 里面可以看到相关代码),access 连接。

4.把连接交给 SelectorThread 处理。

5.重新调用2步骤。

links