thrift 笔记

SelectorThread

与 AcceptThread 负责连接接入不同的是,SelectorThread 是负责连接的读写。而且 SelectorThread 在一个 Server 里面应该有多个。

主要代码:

  protected class SelectorThread extends AbstractSelectThread {
  
  ...
  
  public boolean addAcceptedConnection(TNonblockingTransport accepted) {
      try {
        acceptedQueue.put(accepted);
      } catch (InterruptedException e) {
        LOGGER.warn("Interrupted while adding accepted connection!", e);
        return false;
      }
      selector.wakeup();
      return true;
    }
    
  public void run() {
      try {
        while (!stopped_) {
          select();
          processAcceptedConnections();
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() exiting due to uncaught error", t);
      } finally {
        // This will wake up the accept thread and the other selector threads
        TThreadedSelectorServer.this.stop();
      }
    }

    private void select() {
      try {
        // wait for io events.
        selector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

    private void processAcceptedConnections() {
      // Register accepted connections
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }

    private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

        FrameBuffer frameBuffer = processorFactory_.isAsyncProcessor() ?
                new AsyncFrameBuffer(accepted, clientKey, SelectorThread.this) :
                new FrameBuffer(accepted, clientKey, SelectorThread.this);

        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }
  
  ...

  }

主要方法:

  • addAcceptedConnection ( TNonblockingTransport accepted )接受 AcceptThread 提交过来的连接。
  • run () 线程的 run 函数,一直依次执行Select () ,processAcceptedConnections () ,processInterestChanges () 三个方法
  • processAcceptedConnections () 和registerAccepted ( TNonblockingTransport accepted ) 从队列中获取到连接,注册到 Selector,生成 FrameBuffer。
  • processInterestChanges () ,AbstractNonblockingServer 中的方法,修改 FrameBuffer 状态。

links