Netty源码初探-NioEventLoopGroup

总结

把总结放前面今后好复习:

这样看下来netty源码确实设计的挺复杂的,我阅读起来有点吃力,因为之前我阅读源码都是过程驱动的,这次直接从某一个类开始读,也不知道这个类在哪个地方用到了,下次就还是按netty运行的某块流程去跟踪源码。(这篇文章是失败的)

  • NioEventLoopGroup中含一个children数组,类型为NioEventLoop,如果调用默认构造方法构造NioEventLoopGroup,其大小默认为可用CPU核数*2(或与配置属性之间的最大值)
  • NioEventLoop继承自SingleThreadEventExecutor,他是一个主要的父类,具体有什么用下次看到相关源码再研究
  • NioEventLoop中有个SelectorProvider,它在不同操作系统中的实现是不同的,windows的SelectorProvider提供的selector是使用select实现的,linux使用epoll,salaris使用poll,macosx使用kqueue

源码分析

Netty源码初探-NioEventLoopGroup

实现了ExecutorService,说明它具有类似线程池的性质,ExecutorService和ScheduledExecuterService分别对应submit和schedule方法(任务和定时任务)

然后就要看这个核心接口:EventExecutorGroup,这个接口算是netty提供的最顶层的接口了,再往上就是jdk的地盘了,它的javadoc描述是

The EventExecutorGroup is responsible for providing the EventExecutor's to use via its next() method. Besides this, it is also responsible for handling their life-cycle and allows shutting them down in a global fashion.

EventExecutorGroup负责通过其next()方法提供要使用的EventExecutor。除此之外,它还负责处理它们的生命周期,并允许以全局方式关闭它们。(shutdownGracefully)

而EventExecutor实际上是一个实现了EventExecutorGroup的子接口,它提供了parent()方法获取所属的EventExecutorGroup。

通过AbstractEventExecutorGroup发现,他调用submit、schedule等方法实际上是在next()返回的EventExecutor上调用。

然后看MultithreadEventExecutorGroup类,这个类已经提供了绝大部分实现,

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    //维护了一个EventExecutor数组
    private final EventExecutor[] children;
    //只读的EventExecutor
    private final Set<EventExecutor> readonlyChildren;
    //终止的EventExecutor计数
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    //Promise是一个可写的Future
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    //Chooses the next EventExecutor to use.选择下一个EventExecutor
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    //...
}

然后从最底下NioEventLoopGroup的构造方法看起,这个类没有什么新增的属性和方法,基本是提供了一些构造方法,默认构造

public NioEventLoopGroup() {
    this(0);
}

传递链:

//nThreads = 0
public NioEventLoopGroup(int nThreads) {
    this(nThreads, (Executor) null);
}

//nThreads = 0, executor = null
public NioEventLoopGroup(int nThreads, Executor executor) {
    this(nThreads, executor, SelectorProvider.provider());
}

//nThreads = 0, executor = null,selectorProvider = SelectorProvider.provider()
//selectorProvider使用来提供NIO核心selector的一个类
public NioEventLoopGroup(
        int nThreads, Executor executor, final SelectorProvider selectorProvider) {
    this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
//nThreads = 0, executor = null,selectorProvider = SelectorProvider.provider()
//selectStrategyFactory = DefaultSelectStrategyFactory()
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                            final SelectStrategyFactory selectStrategyFactory) {
    super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}

接下来调用父类MultithreadEventLoopGroup的构造方法:

//nThreads = 0, executor = null
//args = [SelectorProvider.provider(),DefaultSelectStrategyFactory(),RejectedExecutionHandlers.reject()]
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

这里如果nThreads是0,传了一个DEFAULT_EVENT_LOOP_THREADS给父类的构造方法,而前面传过来的nThreads就是0,那么这个DEFAULT_EVENT_LOOP_THREADS是多少呢?

private static final int DEFAULT_EVENT_LOOP_THREADS;

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

    if (logger.isDebugEnabled()) {
        logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }
}

可见它是从1、环境配置中的io.netty.eventLoopThreadsNettyRuntime.availableProcessors() * 2中取最大值,而NettyRuntime.availableProcessors() * 2是netty可用主机CPU的核数*2。

最后,它调用MultithreadEventExecutorGroup的构造方法:

/**
* nThreads:这个MultithreadEventExecutorGroup会用几个线程
* executor:使用的线程池,如果是null使用默认的
* chooserFactory,给chooserFactory赋值
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }
    //前面默认传过来的就是空,使用new ThreadPerTaskExecutor(newDefaultThreadFactory());
    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }
    //children的大小为nThreads,即默认是netty可用主机CPU的核数*2(如果是最大值)
    children = new EventExecutor[nThreads];
    //给children赋值
    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        // Let the caller handle the interruption.
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

newChild,对于NioEventLoopGroup的实现为:

@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
    EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
    return new NioEventLoop(this, executor, (SelectorProvider) args[0],
        ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

它返回一个EventLoop,可以看到对于NioEventLoopGroup,它的children数组中存放的实际是一个NioEventLoop对象。

Netty源码初探-NioEventLoopGroup
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                EventLoopTaskQueueFactory queueFactory) {
    super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
            rejectedExecutionHandler);
    if (selectorProvider == null) {
        throw new NullPointerException("selectorProvider");
    }
    if (strategy == null) {
        throw new NullPointerException("selectStrategy");
    }
    provider = selectorProvider;
    final SelectorTuple selectorTuple = openSelector();
    selector = selectorTuple.selector;
    unwrappedSelector = selectorTuple.unwrappedSelector;
    selectStrategy = strategy;
}

NioEventLoop有以下属性:

private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

而它继承的SingleThreadEventExecutor有以下属性:

private final Queue<Runnable> taskQueue;

private volatile Thread thread;
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;

private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private final RejectedExecutionHandler rejectedExecutionHandler;

private long lastExecutionTime;

@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;

private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;

private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

不同操作系统的I/O多路复用选择器各自内核实现不同,目前有select、poll、epoll、kqueue四种实现,JDK里与之对应的实现也随着操作系统的不同而不同,

JDK里对于Selector的实现都交由SelectorProvider的方法openSelector()来提供,而SelectorProvider的实现根据操作系统而变化:

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        //根据不同操作系统的实现来创建SelectorProvider
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

Windows实现:

public static SelectorProvider create() {
    return new sun.nio.ch.WindowsSelectorProvider();
}
public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}

通过阅读WindowsSelectorImpl.c源码发现它是通过select实现的

Unix系实现:

public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}
public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

Linux是通过EPollSelectorImpl中调用EPollArrayWrapper类来通过系统函数epoll实现的
SunOS(solaris)则是调用DevPollArrayWrapper通过系统函数poll实现的

MacosX实现:

public static SelectorProvider create() {
    return new sun.nio.ch.KQueueSelectorProvider();
}

KQueue是另一种多路复用的方法,这里就不去看了。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e6%ba%90%e7%a0%81%e5%88%9d%e6%8e%a2-nioeventloopgroup/