
Selector selector = Selector.open();
將 Channel 注冊到選擇器中
channel.configureBlocking(false);
SelectionKey?key?=?channel.register(selector,?SelectionKey.OP_READ);
注意,如果一個 Channel 要注冊到 Selector 中,那么這個 Channel 必須是非阻塞的,即channel.configureBlocking(false);
因為 Channel 必須要是非阻塞的,因此 FileChannel 是不能夠使用選擇器的,因為 FileChannel 都是阻塞的.
Connect,即連接事件(TCP 連接),?對應于SelectionKey.OP_CONNECT。
Accept,即確認事件,對應于SelectionKey.OP_ACCEPT。
Read,即讀事件,對應于SelectionKey.OP_READ, 表示 buffer 可讀。
Write,即寫事件,對應于SelectionKey.OP_WRITE, 表示 buffer 可寫。
一個 Channel發(fā)出一個事件也可以稱為對于某個事件, Channel 準備好了。因此一個 Channel 成功連接到了另一個服務器也可以被稱為 connect ready。
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
int?readySet?=?selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和 Selector
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attaching Object
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
通過 Selector 選擇 Channel
注意:select()方法返回的值表示有多少個 Channel 可操作.
獲取可操作的 Channel
Set
selectedKeys = selector.selectedKeys();
Iterator
keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
-
通過 Selector.open() 打開一個 Selector. -
將 Channel 注冊到 Selector 中, 并設置需要監(jiān)聽的事件(interest set) -
不斷重復: *從 selected key 中獲取 對應的 Channel 和附加信息(如果有的話)
關閉 Selector
完整的 Selector 例子
public class NioEchoServer {
private static final int BUF_SIZE = 256;
private static final int TIMEOUT = 3000;
public static void main(String args[]) throws Exception {
// 打開服務端 Socket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 打開 Selector
Selector selector = Selector.open();
// 服務端 Socket 監(jiān)聽8080端口, 并配置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false);
// 將 channel 注冊到 selector 中.
// 通常我們都是先注冊一個 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ
// 注冊到 Selector 中.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通過調(diào)用 select 方法, 阻塞地等待 channel I/O 可操作
if (selector.select(TIMEOUT) == 0) {
System.out.print(".");
continue;
}
// 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經(jīng)就緒.
Iterator
keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
// 當獲取一個 SelectionKey 后, 就要將它刪除, 表示我們已經(jīng)對這個 IO 事件進行了處理.
keyIterator.remove();
if (key.isAcceptable()) {
// 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
// 代表客戶端的連接
// 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
// 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
//在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 注冊到 Selector 中.
// 注意, 這里我們?nèi)绻麤]有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那么 select 方法會一直直接返回.
clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
}
if (key.isReadable()) {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buf = (ByteBuffer) key.attachment();
long bytesRead = clientChannel.read(buf);
if (bytesRead == -1) {
clientChannel.close();
} else if (bytesRead > 0) {
key.interestOps(OP_READ | SelectionKey.OP_WRITE);
System.out.println("Get data length: " + bytesRead);
}
}
if (key.isValid() && key.isWritable()) {
ByteBuffer buf = (ByteBuffer) key.attachment();
buf.flip();
SocketChannel clientChannel = (SocketChannel) key.channel();
clientChannel.write(buf);
if (!buf.hasRemaining()) {
key.interestOps(OP_READ);
}
buf.compact();
}
}
}
}
}
轉載源:SegmentFault
特別推薦一個分享架構+算法的優(yōu)質(zhì)內(nèi)容,還沒關注的小伙伴,可以長按關注一下:
長按訂閱更多精彩▼
如有收獲,點個在看,誠摯感謝
免責聲明:本文內(nèi)容由21ic獲得授權后發(fā)布,版權歸原作者所有,本平臺僅提供信息存儲服務。文章僅代表作者個人觀點,不代表本平臺立場,如有問題,請聯(lián)系我們,謝謝!