final ChannelFuture initAndRegister(){ Channel channel = null; try { // 创建通道 channel = channelFactory.newChannel(); // 初始化通道 init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor returnnew DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor returnnew DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); } // 注册通道 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }
privatevoidexecute(Runnable task, boolean immediate){ boolean inEventLoop = inEventLoop(); // 添加任务到队列 addTask(task); if (!inEventLoop) { // 启动线程 startThread(); if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
@Override protectedvoidrun(){ int selectCnt = 0; for (;;) { try { int strategy; try { // 根据 taskQueue 是否为空决定 select 的策略 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { // 如果没有任务需要执行,调用 select 方法阻塞并监听就绪事件 strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; }
selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; finalint ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } elseif (strategy > 0) { finallong ioStartTime = System.nanoTime(); try { // 处理 Selector 监听到的就绪事件 processSelectedKeys(); } finally { // Ensure we always run tasks. finallong ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { // 执行 taskQueue 里面所有的 task ,不保证能全部执行完 ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } } }
// increment pending bytes after adding message to the unflushed arrays. // See https://github.com/netty/netty/issues/1619 incrementPendingOutboundBytes(entry.pendingSize, false); }
@Override protectedvoiddoWrite(ChannelOutboundBuffer in)throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { clearOpWrite(); return; }
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); int nioBufferCnt = in.nioBufferCount();
switch (nioBufferCnt) { case0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); finalint localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); finallong localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } } while (writeSpinCount > 0);