总结
把总结放前面今后好复习:
这样看下来netty源码确实设计的挺复杂的,我阅读起来有点吃力,因为之前我阅读源码都是过程驱动的,这次直接从某一个类开始读,也不知道这个类在哪个地方用到了,下次就还是按netty运行的某块流程去跟踪源码。(这篇文章是失败的)
- NioEventLoopGroup中含一个children数组,类型为NioEventLoop,如果调用默认构造方法构造NioEventLoopGroup,其大小默认为可用CPU核数*2(或与配置属性之间的最大值)
- NioEventLoop继承自SingleThreadEventExecutor,他是一个主要的父类,具体有什么用下次看到相关源码再研究
- NioEventLoop中有个SelectorProvider,它在不同操作系统中的实现是不同的,windows的SelectorProvider提供的selector是使用
select
实现的,linux使用epoll
,salaris使用poll
,macosx使用kqueue
源码分析

实现了ExecutorService,说明它具有类似线程池的性质,ExecutorService和ScheduledExecuterService分别对应submit和schedule方法(任务和定时任务)
然后就要看这个核心接口:EventExecutorGroup,这个接口算是netty提供的最顶层的接口了,再往上就是jdk的地盘了,它的javadoc描述是
The EventExecutorGroup is responsible for providing the EventExecutor's to use via its next() method. Besides this, it is also responsible for handling their life-cycle and allows shutting them down in a global fashion.
EventExecutorGroup
负责通过其next()
方法提供要使用的EventExecutor
。除此之外,它还负责处理它们的生命周期,并允许以全局方式关闭它们。(shutdownGracefully)
而EventExecutor实际上是一个实现了EventExecutorGroup的子接口,它提供了parent()
方法获取所属的EventExecutorGroup。
通过AbstractEventExecutorGroup
发现,他调用submit、schedule等方法实际上是在next()
返回的EventExecutor上调用。
然后看MultithreadEventExecutorGroup
类,这个类已经提供了绝大部分实现,
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
//维护了一个EventExecutor数组
private final EventExecutor[] children;
//只读的EventExecutor
private final Set<EventExecutor> readonlyChildren;
//终止的EventExecutor计数
private final AtomicInteger terminatedChildren = new AtomicInteger();
//Promise是一个可写的Future
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
//Chooses the next EventExecutor to use.选择下一个EventExecutor
private final EventExecutorChooserFactory.EventExecutorChooser chooser;
//...
}
然后从最底下NioEventLoopGroup的构造方法看起,这个类没有什么新增的属性和方法,基本是提供了一些构造方法,默认构造
public NioEventLoopGroup() {
this(0);
}
传递链:
//nThreads = 0
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//nThreads = 0, executor = null
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
//nThreads = 0, executor = null,selectorProvider = SelectorProvider.provider()
//selectorProvider使用来提供NIO核心selector的一个类
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
//nThreads = 0, executor = null,selectorProvider = SelectorProvider.provider()
//selectStrategyFactory = DefaultSelectStrategyFactory()
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
接下来调用父类MultithreadEventLoopGroup的构造方法:
//nThreads = 0, executor = null
//args = [SelectorProvider.provider(),DefaultSelectStrategyFactory(),RejectedExecutionHandlers.reject()]
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里如果nThreads是0,传了一个DEFAULT_EVENT_LOOP_THREADS给父类的构造方法,而前面传过来的nThreads就是0,那么这个DEFAULT_EVENT_LOOP_THREADS是多少呢?
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
可见它是从1、环境配置中的io.netty.eventLoopThreads
、NettyRuntime.availableProcessors() * 2
中取最大值,而NettyRuntime.availableProcessors() * 2
是netty可用主机CPU的核数*2。
最后,它调用MultithreadEventExecutorGroup的构造方法:
/**
* nThreads:这个MultithreadEventExecutorGroup会用几个线程
* executor:使用的线程池,如果是null使用默认的
* chooserFactory,给chooserFactory赋值
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
//前面默认传过来的就是空,使用new ThreadPerTaskExecutor(newDefaultThreadFactory());
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//children的大小为nThreads,即默认是netty可用主机CPU的核数*2(如果是最大值)
children = new EventExecutor[nThreads];
//给children赋值
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
newChild,对于NioEventLoopGroup的实现为:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
它返回一个EventLoop,可以看到对于NioEventLoopGroup,它的children数组中存放的实际是一个NioEventLoop对象。

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop有以下属性:
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
而它继承的SingleThreadEventExecutor有以下属性:
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;
private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private final RejectedExecutionHandler rejectedExecutionHandler;
private long lastExecutionTime;
@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;
private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;
private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);
不同操作系统的I/O多路复用选择器各自内核实现不同,目前有select、poll、epoll、kqueue四种实现,JDK里与之对应的实现也随着操作系统的不同而不同,
JDK里对于Selector的实现都交由SelectorProvider的方法openSelector()
来提供,而SelectorProvider的实现根据操作系统而变化:
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
//根据不同操作系统的实现来创建SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
Windows实现:
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
通过阅读WindowsSelectorImpl.c
源码发现它是通过select
实现的
Unix系实现:
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
Linux是通过EPollSelectorImpl中调用EPollArrayWrapper类来通过系统函数epoll
实现的
SunOS(solaris)则是调用DevPollArrayWrapper通过系统函数poll
实现的
MacosX实现:
public static SelectorProvider create() {
return new sun.nio.ch.KQueueSelectorProvider();
}
KQueue是另一种多路复用的方法,这里就不去看了。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/netty%e6%ba%90%e7%a0%81%e5%88%9d%e6%8e%a2-nioeventloopgroup/