`
xsh5324
  • 浏览: 70723 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Netty源码分析 之 NioServerSocketChannelFactory

    博客分类:
  • java
阅读更多

NioServerSocketChannelFactory 是ChannelFactory的实现接口之一,负责创建并管理服务端Channel。

先来看下它的周边类图是怎样


 

 

 

下面针对核心的类做下讲解,然后就开始跟踪源码一步一步分析

NioServerSocketPipelineSink

接受和处理终端的下游ChannelEvent事件.

 

AbstractNioBossPool 

    处理socket accept、connect的工作线程池抽象类,提供了创建工作线程的模板方法newBoss 

 

NioServerBossPool 

    继续自AbstractNioBossPool重写了newBoss方法创建Boss工作线程

 

AbstractNioSelector 

    它间接的实现了Runnable接口,负责执行内部的工作队列、处理Selector事件(抽象方法process)、关闭相关的Channel(抽象方法close)、关闭内部了Selector;值得注意的是这个类的register抽象方法,它的作用是将一个新的Channel注册到自身这个工作线程中完成Channel的绑定或连接工作,这个方法由sink调用。

 

NioServerBoss 

    继承自AbstractNioSelector,完成如下工作

1、把ServerSocketChannel绑定到SocketAddress并将Selector实例注册到ServerSocketChannel监听其SelectionKey.OP_ACCEPT事件(其内部类干的事RegisterTask)

2、处理OP_ACCEPT事件,将新Accpet的Channel注册到一个工作线程中去(workPool.nextWorker().register());(网络模型图中acceptor到subRecator的过程)

 

NioWorker

与NioServerBoss类似,继承自AbstractNioSelector,完成如下工作

1、处理OP_READ、OP_WRITE事件

2、将Selector实例注册到SocketChannel,并监听OP_READ、OP_WRITE事件(其内部类干的事RegisterTask) 

 

下面是一段是简单的服务端代码,我们顺着代码中的关键类分析 

public class ServerTest {
	private final ChannelFactory channelFactory = new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool());

	private final ServerBootstrap bootstrap;

	public ServerTest() {
		this.bootstrap = new ServerBootstrap(channelFactory);
		this.bootstrap.setOption("child.tcpNoDelay", true);
		this.bootstrap.setOption("child.keepAlive", true);
		
		this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				ChannelPipeline pipeline=Channels.pipeline();
//				pipeline.addFirst("encoder", new Encoder());
//				pipeline.addFirst("decoder", new encoder());
//				...
				return pipeline;
			}
		});
	}
	public void service(){
		this.bootstrap.bind(new InetSocketAddress(9008));
	}
	
	public static void main(String[] args){
		new ServerTest().service();
	}
}
 NioServerSocketChannelFactory

  这个类的代码非常简洁就4个实例变量和几个方法  

private final WorkerPool<NioWorker> workerPool;
private final NioServerSocketPipelineSink sink;
private final BossPool<NioServerBoss> bossPool;
private boolean releasePools;//标记当前ChannelFactory是否已还没有关闭或者说工作线程还没有被shutdown
  这个类有个BUG,在默认的构造方法中将releasePools置为true了,但是在其它构造方法中却没有,这意味着在它被关闭的时候可能无法释放某些扩展资源

  

public NioServerSocketChannelFactory() {
        this(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
        releasePools = true;
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor) {
        this(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));//调用此构造方法时,getMaxThreads控制着工作线程数小于等于CPU数*2
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, 1, workerExecutor, workerCount);//默认情况下boss线程的数量为1
    }
public NioServerSocketChannelFactory(
            Executor bossExecutor, int bossCount, Executor workerExecutor,
            int workerCount) {
        this(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, WorkerPool<NioWorker> workerPool) {
        this(bossExecutor, 1 , workerPool);
}

public NioServerSocketChannelFactory(
            Executor bossExecutor, int bossCount, WorkerPool<NioWorker> workerPool) {
        this(new NioServerBossPool(bossExecutor, bossCount, null), workerPool);
}

public NioServerSocketChannelFactory(BossPool<NioServerBoss> bossPool, WorkerPool<NioWorker> workerPool) {
        if (bossPool == null) {
            throw new NullPointerException("bossExecutor");
        }
        if (workerPool == null) {
            throw new NullPointerException("workerPool");
        }
        this.bossPool = bossPool;
        this.workerPool = workerPool;
        sink = new NioServerSocketPipelineSink();
 }
public ServerSocketChannel newChannel(ChannelPipeline pipeline) {
        return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);//注意nextBoss方法,它选择一个工作线程来处理ServerSocketChannel的读写事件
}
 这个类主要依赖于这4个类的实例NioWorkerPool、NioServerBossPool、NioServerSocketPipelineSink、NioServerSocketChannel,关于它们的作用前面已经有说明了,在此不讲了。先来看下NioWorkerPool吧。

 

NioWorkerPool这个类也很简单,其内部主要维护了一个线程数组,它继承自AbstractNioWorkerPool并重写了createWorker方法 

protected NioWorker createWorker(Executor executor) {
return new NioWorker(executor, determiner);
}
 再来看下它的父类AbstractNioWorkerPool

 这个类也非常简单,它只做2件事——负载均衡和工作线程的创建

public abstract class AbstractNioWorkerPool<E extends AbstractNioWorker>
        implements WorkerPool<E>, ExternalResourceReleasable {

    private final AbstractNioWorker[] workers;
    private final AtomicInteger workerIndex = new AtomicInteger();
    private final Executor workerExecutor;
private volatile boolean initDone;
//此处省略了部分代码
	AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {
        if (workerExecutor == null) {
            throw new NullPointerException("workerExecutor");
        }
        if (workerCount <= 0) {
            throw new IllegalArgumentException(
                    "workerCount (" + workerCount + ") " + "must be a positive integer.");
        }
        workers = new AbstractNioWorker[workerCount];
        this.workerExecutor = workerExecutor;
        if (autoInit) {
            init();
        }
}
protected void init() {
        if (initDone) {
            throw new IllegalStateException("Init was done before");
        }
        initDone = true;

        for (int i = 0; i < workers.length; i++) {
            workers[i] = newWorker(workerExecutor);
    }
protected E newWorker(Executor executor) {
        return createWorker(executor);//调用抽象方法,把具体的工作线程交由子类完成
    }
public E nextWorker() {
        return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];//负载均衡
    }
 下面来具体的工作线程NioWorker

 

NioWorker继承自AbstractNioWorker,AbstractNioWorker继承自AbstractNioSelector,还是先从最顶层父类AbstractNioSelector看起吧

AbstractNioSelector实现了NioSelector接口,而NioSelector接口继承自Runnable接口AbstractNioSelector实现run方法,Netty把这个线程叫做IO线程

AbstractNioSelector类的构造方法会调用openSelector方法来创建一个Selector,并执行当前线程(这段代码很简单,我们就跳过直接看run方法了)

 

public void run() {
        thread = Thread.currentThread();//将当前线程保存起来,用来区分调用该类其它某些方法的线程是否为IO线程,请参看它的isIoThread方法。其它线程会调用此方法来区分自己是不是IO线程,如果是,则直接调用此类的方法完成,否则把相关的操作添加到IO线程的工作队列中。这样做的好处是,避免IO线程与其它线程的同步操作,带来了效率的提升。

        int selectReturnsImmediately = 0;
        Selector selector = this.selector; 
        if (selector == null) {
            return;
        }
        // use 80% of the timeout for measure
        final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100;
        boolean wakenupFromLoop = false;
        for (;;) {
            wakenUp.set(false);

            try {
                long beforeSelect = System.nanoTime();
                int selected = select(selector);
                if (SelectorUtil.EPOLL_BUG_WORKAROUND && selected == 0 && !wakenupFromLoop && !wakenUp.get()) {//看代码中的解释说jdk的epoll可能存在BUG,这个if用来处理这类错误的.看下面的代码冒似是说Channel被关闭后没有从Selector中移除相关的键而导致过多的垃圾键,当这类键超过1024个时,Selector将被重键.
                    long timeBlocked = System.nanoTime() - beforeSelect;

                    if (timeBlocked < minSelectTimeout) {
                        boolean notConnected = false;
                        for (SelectionKey key: selector.keys()) {
                            SelectableChannel ch = key.channel();
                            try {
                                if (ch instanceof DatagramChannel && !((DatagramChannel) ch).isConnected() ||
                                        ch instanceof SocketChannel && !((SocketChannel) ch).isConnected()) {
                                    notConnected = true;
                                    key.cancel();
                                }
                            } catch (CancelledKeyException e) {
                            }
                        }
                        if (notConnected) {
                            selectReturnsImmediately = 0;
                        } else {
                            selectReturnsImmediately ++;
                        }
                    } else {
                        selectReturnsImmediately = 0;
                    }

                    if (selectReturnsImmediately == 1024) {
                        rebuildSelector();
                        selector = this.selector;
                        selectReturnsImmediately = 0;
                        wakenupFromLoop = false;
                        continue;
                    }
                } else {
                    selectReturnsImmediately = 0;
                }
		//代码中这里的解释很清楚,请看代码
                if (wakenUp.get()) {
                    wakenupFromLoop = true;
                    selector.wakeup();
                } else {
                    wakenupFromLoop = false;
                }

                cancelledKeys = 0;
                processTaskQueue();//执行队列中的任务
                selector = this.selector; //因为rebuildSelector()方法是一个公共的方法,也许在上一步的processTaskQueue方法中这个方法被调了

                if (shutdown) {//如果当前IO线程标记为关闭,则执行关闭逻辑
                    this.selector = null;
                    processTaskQueue();//处理任务队列

                    for (SelectionKey k: selector.keys()) {
                        close(k);//关闭相关的Channel,这里会执行Pipeline的Upstream直到sink中才关闭Channel
                    }
                    try {
                        selector.close();
                    } catch (IOException e) {
                        logger.warn(
                                "Failed to close a selector.", e);
                    }
                    shutdownLatch.countDown();//shutdown方法必需等待相关的释放操作完成才能返回,此处唤醒调用shutdown方法的线程
                    break;//工作线程退出
                } else {
                    process(selector);//执行process抽象方法,SelectionKey的处理行为交给子类去完成
                }
            } catch (Throwable t) {
                logger.warn(
                        "Unexpected exception in the selector loop.", t);
                // Prevent possible consecutive immediate failures that lead to
                // excessive CPU consumption.
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }
}
 这个类还有两个方法值得解释一下,看完这两个方法再看process方法 

 

//这个方法在Boss线程中被调用在Channel被bind到本地端口(Server)或成功连接到服务器(Client端)的时候被调用
public void register(Channel channel, ChannelFuture future) {
        Runnable task = createRegisterTask(channel, future);//具体的注册操作交由子类去完成,透露一下 这个task面会有socketChannel.register方法的调用
        registerTask(task);
    }

    protected final void registerTask(Runnable task) {
        taskQueue.add(task);

        Selector selector = this.selector;

        if (selector != null) {
            if (wakenUp.compareAndSet(false, true)) {//唤醒IO线程,这个CAS操作是防止Selector被过早的唤醒的情况下导致信号丢失.请参看AbstractNioSelector类源码282行上面的解释
                selector.wakeup();
            }
        } else {
            if (taskQueue.remove(task)) {
                // the selector was null this means the Worker has already been shutdown.
                throw new RejectedExecutionException("Worker has already been shutdown");
            }
        }
    }
 子类AbstractNioWorker重写了process方法来处理SelectionKey 

 

protected void process(Selector selector) throws IOException {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                    if (!read(k)) {//调用抽象方法读取数据,如果此方法返回false则表示连接已经被关闭,无需再做处理
                        continue;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    writeFromSelectorLoop(k);//如果该Selector中触发了写事件则写出数据。用户代码调用channel.write(obj)方法后数据并不是马上被写出,而是被提交Channel的writeBufferQueue队列中(sink放的)等待IO线程去完成写出,写操作实际上是由write0这个方法完成的.
                }
            } catch (CancelledKeyException e) {
                close(k);//出形异常关闭Channel并释放相关资源
            }

            if (cleanUpCancelledKeys()) {
                break; // break the loop to avoid ConcurrentModificationException
            }
        }
    }
 先看该类的其它几个重要的方法再看NioWorker类的方法 

 

/**
 * 这个方法是由sink调用的,sink将要写出的数据offer到writeBufferQueue中然后调用该方法(上面解释process方法中有提到)
*/
void writeFromUserCode(final AbstractNioChannel<?> channel) {
        if (!channel.isConnected()) {
	//如果Channel已经关闭则没必要继续写了
            cleanUpWriteBuffer(channel);
            return;
        }

        if (scheduleWriteIfNecessary(channel)) {
	//这是个抽象方法,实现在NioWorker类中,这个方法中判断当前线程是否为IO线程,如果是则返回false,否则向IO线程的 任务队列添加一个写出任务并返回true
            return;
        }

	//到这里,就能确定当前线程是否是IO线程
        if (channel.writeSuspended) {//如果写出通道被挂起,则返回
            return;
        }

        if (channel.inWriteNowLoop) {//当前正在写数据,则返回
            return;
        }

        write0(channel);//写出数据
    }
protected void write0(AbstractNioChannel<?> channel) {
        boolean open = true;//标记channel是否处理打开状态
        boolean addOpWrite = false;//是否向此Channel关联的SelectionKey的interestOps添加SelectionKey.OP_WRITE,这会导致前面的process方法中这句代码“if ((readyOps & SelectionKey.OP_WRITE) != 0) ”返回true,然后再次触发写操作
        boolean removeOpWrite = false;//与上面对应
        boolean iothread = isIoThread(channel);//当前线程是否是该Channel的工作线程

        long writtenBytes = 0;

        final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
        final WritableByteChannel ch = channel.channel;
        final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
        final int writeSpinCount = channel.getConfig().getWriteSpinCount();//最多尝试写的次数
        List<Throwable> causes = null;

        synchronized (channel.writeLock) {
            channel.inWriteNowLoop = true;//上面提到过这个writeFromUserCode方法
            for (;;) {

                MessageEvent evt = channel.currentWriteEvent;
                SendBuffer buf = null;
                ChannelFuture future = null;
                try {
                    if (evt == null) {
                        if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
//如果没有要写的东西,则打个标记,在后面向此Channel关联的SelectionKey的interestOps移除SelectionKey.OP_WRITE
                            removeOpWrite = true;
                            channel.writeSuspended = false;
                            break;
                        }
                        future = evt.getFuture();

                        channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                    } else {
                        future = evt.getFuture();
                        buf = channel.currentWriteBuffer;
                    }

                    long localWrittenBytes = 0;
			//下面的代码尝试发送数据,直到有数据被写出,否则尽可能的尝试
                    for (int i = writeSpinCount; i > 0; i --) {
                        localWrittenBytes = buf.transferTo(ch);
                        if (localWrittenBytes != 0) {
                            writtenBytes += localWrittenBytes;
                            break;
                        }
                        if (buf.finished()) {
                            break;
                        }
                    }

                    if (buf.finished()) {
                        //如果当前消息已经写完了释放资源,并准备下一条消息的写出
                        buf.release();
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        evt = null;
                        buf = null;
                        future.setSuccess();
                    } else {
                        //没有全部写完,则打上addOpWrite标记,并设置写出进度
                        addOpWrite = true;
                        channel.writeSuspended = true;

                        if (localWrittenBytes > 0) {
                            // Notify progress listeners if necessary.
                            future.setProgress(
                                    localWrittenBytes,
                                    buf.writtenBytes(), buf.totalBytes());
                        }
                        break;
                    }
                } catch (AsynchronousCloseException e) {
                    // Doesn't need a user attention - ignore.
                } catch (Throwable t) {
                    if (buf != null) {
                        buf.release();
                    }
                    channel.currentWriteEvent = null;
                    channel.currentWriteBuffer = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    buf = null;
                    //noinspection UnusedAssignment
                    evt = null;
                    if (future != null) {
                        future.setFailure(t);//通知消息写出失败
                    }
                    if (iothread) {
                        // An exception was thrown from within a write in the iothread. We store a reference to it
                        // in a list for now and notify the handlers in the chain after the writeLock was released
                        // to prevent possible deadlock.
                        // See #1310
			//如果异常在IO线程中被抛出,则存储它的一个引用,待锁释放后触发pipeline的UpStream以防止死锁
                        if (causes == null) {
                            causes = new ArrayList<Throwable>(1);
                        }
                        causes.add(t);
                    } else {
                        fireExceptionCaughtLater(channel, t);
                    }
                    if (t instanceof IOException) {
                        // close must be handled from outside the write lock to fix a possible deadlock
                        // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
                        // and a close is triggered while the lock is hold. This is because the close(..)
                        // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
                        // See #1310
                        open = false;
                    }
                }
            }
            channel.inWriteNowLoop = false;
			// Initially, the following block was executed after releasing
            // the writeLock, but there was a race condition, and it has to be
            // executed before releasing the writeLock:
            //
            //     https://issues.jboss.org/browse/NETTY-410
            //
            //这个issues是说 之前下面这段代码是被放到同步段之外的,导致未完全写完的数据丢失的问题,所以移到同步块里面来了。
            if (open) {
                if (addOpWrite) {
                    setOpWrite(channel);
                } else if (removeOpWrite) {
                    clearOpWrite(channel);
                }
            }
        }
        if (causes != null) {
            for (Throwable cause: causes) {
                // notify about cause now as it was triggered in the write loop
                fireExceptionCaught(channel, cause);
            }
        }
        if (!open) {
            // close the channel now
            close(channel, succeededFuture(channel));
        }
        if (iothread) {
            fireWriteComplete(channel, writtenBytes);
        } else {
            fireWriteCompleteLater(channel, writtenBytes);
        }
    }
 AbstractNioSelector类把IO线程的构架搭起来, AbstractNioWorker类实现了数据的写出逻辑,只留下变化的部分交给子类去完成。现在来看来NioWorker类的几个比较重要的方法

 

protected boolean read(SelectionKey k) {
        final SocketChannel ch = (SocketChannel) k.channel();
        final NioSocketChannel channel = (NioSocketChannel) k.attachment();

//获取缓存大小分配策略,Netty中缓存分配策略有2种,一种是自动扩展的缓存分配策略,另一种是固定大小的缓存分配策略。自动扩展的缓存分配策略内部维护了一个缓存大小的int数组,数组中的值分段按一定数量递增
        final ReceiveBufferSizePredictor predictor =
            channel.getConfig().getReceiveBufferSizePredictor();
        final int predictedRecvBufSize = predictor.nextReceiveBufferSize();//获取下一个接收缓冲区大小
        final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();

        int ret = 0;
        int readBytes = 0;
        boolean failure = true;

        ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());//recvBufferPool目的是为了缓存重用
        try {
			//读取Channel中的数据,直到缓冲区满或没有可读的字节
            while ((ret = ch.read(bb)) > 0) {
                readBytes += ret;
                if (!bb.hasRemaining()) {
                    break;
                }
            }
            failure = false;
        } catch (ClosedChannelException e) {
            // Can happen, and does not need a user attention.
        } catch (Throwable t) {
            fireExceptionCaught(channel, t);
        }

        if (readBytes > 0) {
            bb.flip();

            final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
            buffer.setBytes(0, bb);
            buffer.writerIndex(readBytes);

            //向缓存分配策略报告当前读到的缓存大小,以预计算下一次的缓存大小
            predictor.previousReceiveBufferSize(readBytes);

            // 触发MessageReceive UpStream
            fireMessageReceived(channel, buffer);
        }

        if (ret < 0 || failure) {//如果代码执行到这,则表示此Channel不可用
            k.cancel(); // Some JDK implementations run into an infinite loop without this.
            close(channel, succeededFuture(channel));
            return false;
        }

        return true;
    }
//这个方法实现父类的抽象方法,返回Channel注册到IO线程的Task.这个方法一定是在Boss线程中被调用的,当boss线程接收到新的连接或一条新的连接与服务端建立,则调用这个方法,这个方法创建的Task完成将Channel注册到工作线程的Selector上
protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
        boolean server = !(channel instanceof NioClientSocketChannel);
        return new RegisterTask((NioSocketChannel) channel, future, server);
    }

    private final class RegisterTask implements Runnable {
        private final NioSocketChannel channel;
        private final ChannelFuture future;
        private final boolean server;

        RegisterTask(
                NioSocketChannel channel, ChannelFuture future, boolean server) {

            this.channel = channel;
            this.future = future;
            this.server = server;
        }

        public void run() {
            SocketAddress localAddress = channel.getLocalAddress();
            SocketAddress remoteAddress = channel.getRemoteAddress();

            if (localAddress == null || remoteAddress == null) {
                if (future != null) {
                    future.setFailure(new ClosedChannelException());
                }
                close(channel, succeededFuture(channel));
                return;
            }

            try {
                if (server) {
                    channel.channel.configureBlocking(false);
                }

                channel.channel.register(
                        selector, channel.getRawInterestOps(), channel);//注册感兴趣的事件(OP_READ),在write0方法中会添加或移除OP_WRITE(请参看上面的write0方法)

                if (future != null) {
                    channel.setConnected();
                    future.setSuccess();
                }

                if (server || !((NioClientSocketChannel) channel).boundManually) {//触发BOUND事件
                    fireChannelBound(channel, localAddress);
                }
//触发Connected事件
                fireChannelConnected(channel, remoteAddress);
            } catch (IOException e) {
                if (future != null) {
                    future.setFailure(e);
                }
                close(channel, succeededFuture(channel));
                if (!(e instanceof ClosedChannelException)) {
                    throw new ChannelException(
                            "Failed to register a socket to the selector.", e);
                }
            }
        }
    } 
 到这里为止, NioWorker工作线程(IO线程)核心路径已经基本分析完毕,现在来看NioWorker它主要是做下面这两件事。

 

1、执行内部工作队列中的任务

2、处理SelectionKey的读写事件

 

 

我们知道如果Selector不被注册到java.nio.channels.SelectableChannel上,它是没有任何作用的。那什么时候将被注册到SelectableChannel上呢(上面提到过RegisterTask的职责)?Boss线程,我们来看下NioServerBoss这个类

NioServerBoss

该类继承自AbstractNioSelector类并且实现了process抽象方法处理SelectionKey关于AbstractNioSelector类前面已经提过了,process方法

 

protected void process(Selector selector) {
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        if (selectedKeys.isEmpty()) {
            return;
        }
        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
            SelectionKey k = i.next();
            i.remove();
            NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();

            try {
                for (;;) {
//获取新连接到服务端的的SocketChannel
                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket == null) {
                        break;
                    }
//取出Channel,并注册到IO线程
                    registerAcceptedChannel(channel, acceptedSocket, thread);
                }
            } catch (CancelledKeyException e) {
                // Raised by accept() when the server socket was closed.
                k.cancel();
                channel.close();
            } catch (SocketTimeoutException e) {
                // Thrown every second to get ClosedChannelException
                // raised.
            } catch (ClosedChannelException e) {
                // Closed as requested.
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to accept a connection.", t);
                }

                try {
//避免在出现异常的情况下,无线循环导致cpu资源浪费
                    Thread.sleep(1000);
                } catch (InterruptedException e1) {
                    // Ignore
                }
            }
        }
}
private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,
                                         Thread currentThread) {
        try {
            ChannelSink sink = parent.getPipeline().getSink();
            ChannelPipeline pipeline =
                    parent.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = parent.workerPool.nextWorker();//获取一个工作线程
			//NioWorker类的register方法完成注册,之前说过的
            worker.register(new NioAcceptedSocketChannel(
                    parent.getFactory(), pipeline, parent, sink
                    , acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to initialize an accepted socket.", e);
            }

            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially accepted socket.",
                            e2);
                }
            }
        }
    }
 该类同样也有实现了父类的createRegisterTask方法来创建并返回一个Task,这个Task负责监听客户端连接Channel,并把Channel注册到IO线程上.在后面讲的NioServerSocketPipelineSink类会调用执行到这个方法

 

 

void bind(final NioServerSocketChannel channel, final ChannelFuture future,
              final SocketAddress localAddress) {
        registerTask(new RegisterTask(channel, future, localAddress));//这里会调父类的registerTask方法,父类的这个方法是个模板方法,具体bind任务的创建会调用该类的createRegisterTask方法
}

protected Runnable createRegisterTask(Channel channel, ChannelFuture future) {
        return new RegisterTask((NioServerSocketChannel) channel, future, null);
    }

    private final class RegisterTask implements Runnable {
        private final NioServerSocketChannel channel;
        private final ChannelFuture future;
        private final SocketAddress localAddress;

        public RegisterTask(final NioServerSocketChannel channel, final ChannelFuture future,
                            final SocketAddress localAddress) {
            this.channel = channel;
            this.future = future;
            this.localAddress = localAddress;
        }

        public void run() {
            boolean bound = false;
            boolean registered = false;
            try {
                channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());//绑定到本地
                bound = true;

                future.setSuccess();
                fireChannelBound(channel, channel.getLocalAddress());
                channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);//将此IO线程的Selector注册到该ServerSocketChannel上

                registered = true;
            } catch (Throwable t) {
                future.setFailure(t);
                fireExceptionCaught(channel, t);
            } finally {
                if (!registered && bound) {
                    close(channel, future);
                }
            }
        }
    }
 现在知道IO线程(NioWorker)的Selector在什么时候被注册到SelectableChannel上了,那Boss线程的Selector是什么时候被注册到SelectableChannel呢?ServerBootstrap的bind方法。

 

public Channel bind(final SocketAddress localAddress) {
        ChannelFuture future = bindAsync(localAddress);

        // 等待完成
        future.awaitUninterruptibly();
        if (!future.isSuccess()) {
            future.getChannel().close().awaitUninterruptibly();
            throw new ChannelException("Failed to bind to: " + localAddress, future.getCause());
        }

        return future.getChannel();
    }

public ChannelFuture bindAsync(final SocketAddress localAddress) {
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        Binder binder = new Binder(localAddress);
        ChannelHandler parentHandler = getParentHandler();

        ChannelPipeline bossPipeline = pipeline();
        bossPipeline.addLast("binder", binder);//注意这个
        if (parentHandler != null) {
            bossPipeline.addLast("userHandler", parentHandler);
        }
		//newChannel实际上是调用的NioServerSocketChannelFactory.newChannel方法new NioServerSocketChannel,NioServerSocketChannel构造方法创建完ServerSocketChannel后会触发UpStream(ChannelState.OPEN事件)这个事件被ServerBootstrap类的内部类Binder处理并完成Selector与Channel的注册操作
        Channel channel = getFactory().newChannel(bossPipeline);
        final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
        binder.bindFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    bfuture.setSuccess();
                } else {
                    // Call close on bind failure
                    bfuture.getChannel().close();
                    bfuture.setFailure(future.getCause());
                }
            }
        });
        return bfuture;
    }


private final class Binder extends SimpleChannelUpstreamHandler {

        private final SocketAddress localAddress;
        private final Map<String, Object> childOptions =
            new HashMap<String, Object>();
        private final DefaultChannelFuture bindFuture = new DefaultChannelFuture(null, false);
        Binder(SocketAddress localAddress) {
            this.localAddress = localAddress;
        }

        @Override
        public void channelOpen(
                ChannelHandlerContext ctx,
                ChannelStateEvent evt) {

            try {
                evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());

                // Split options into two categories: parent and child.
                Map<String, Object> allOptions = getOptions();
                Map<String, Object> parentOptions = new HashMap<String, Object>();
                for (Entry<String, Object> e: allOptions.entrySet()) {
                    if (e.getKey().startsWith("child.")) {
                        childOptions.put(
                                e.getKey().substring(6),
                                e.getValue());
                    } else if (!"pipelineFactory".equals(e.getKey())) {
                        parentOptions.put(e.getKey(), e.getValue());
                    }
                }

                // Apply parent options.
                evt.getChannel().getConfig().setOptions(parentOptions);
            } finally {
                ctx.sendUpstream(evt);
            }
			//这里将NioServerSocketChannel绑定到本地端口,并触发Downstream(ChannelState.BOUND)事件,最终被传递sin处理(NioServerSocketPipelineSink)
            evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        bindFuture.setSuccess();
                    } else {
                        bindFuture.setFailure(future.getCause());
                    }
                }
            });
        }

  NioServerSocketPipelineSink类用来处理最底层的事件。

 

class NioServerSocketPipelineSink extends AbstractNioChannelSink {

    public void eventSunk(
            ChannelPipeline pipeline, ChannelEvent e) throws Exception {
        Channel channel = e.getChannel();
        if (channel instanceof NioServerSocketChannel) {
            handleServerSocket(e);//处理服务端Channel
        } else if (channel instanceof NioSocketChannel) {
            handleAcceptedSocket(e);//处理客户端Channel
        }
    }

    private static void handleServerSocket(ChannelEvent e) {
        if (!(e instanceof ChannelStateEvent)) {
            return;
        }

        ChannelStateEvent event = (ChannelStateEvent) e;
        NioServerSocketChannel channel =
            (NioServerSocketChannel) event.getChannel();
        ChannelFuture future = event.getFuture();
        ChannelState state = event.getState();
        Object value = event.getValue();

        switch (state) {
        case OPEN://根据一个值标记是Open还是Close,其它case也是这样
            if (Boolean.FALSE.equals(value)) {
                ((NioServerBoss) channel.boss).close(channel, future);
            }
            break;
        case BOUND:
            if (value != null) {//在ServerBootStraup内部类Binder中触发BOUND事件,传递此value的值为localAddress,所以进入这个if调用boss的bind方法(NioServerBoss类的)完成地址绑定与OP_ACCEPT的注册,请参见上面讲NioServerBoss中的bind方法
                ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
            } else {
                ((NioServerBoss) channel.boss).close(channel, future);
            }
            break;
        default:
            break;
        }
    }

    private static void handleAcceptedSocket(ChannelEvent e) {
        if (e instanceof ChannelStateEvent) {
            ChannelStateEvent event = (ChannelStateEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            ChannelFuture future = event.getFuture();
            ChannelState state = event.getState();
            Object value = event.getValue();

            switch (state) {
            case OPEN:
                if (Boolean.FALSE.equals(value)) {
                    channel.worker.close(channel, future);
                }
                break;
            case BOUND:
            case CONNECTED:
                if (value == null) {
                    channel.worker.close(channel, future);
                }
                break;
            case INTEREST_OPS:
                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                break;
            }
        } else if (e instanceof MessageEvent) {
            MessageEvent event = (MessageEvent) e;
            NioSocketChannel channel = (NioSocketChannel) event.getChannel();
            boolean offered = channel.writeBufferQueue.offer(event);//将消息提交到该Channel的writeBuffer中
            assert offered;
            channel.worker.writeFromUserCode(channel);//写(不一定马上进行写入,它会把写入工作放到IO线程中去做)
        }
    }
}

   Netty的代码可以说是即漂亮又简单,了解实现原理后,再去看代码感觉很容易,发现没什么可讲的了...

 

  • 大小: 76.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics