NIO是Java提供的非阻塞I/O API。
非阻塞的意義在于可以使用一個(gè)線程對(duì)大量的數(shù)據(jù)連接進(jìn)行處理,非常適用于"短數(shù)據(jù)長連接"的應(yīng)用場(chǎng)景,例如即時(shí)通訊軟件。
在一個(gè)阻塞C/S系統(tǒng)中,服務(wù)器要為每一個(gè)客戶連接開啟一個(gè)線程阻塞等待客戶端發(fā)送的消息。若使用非阻塞技術(shù),服務(wù)器可以使用一個(gè)線程對(duì)連接進(jìn) 行輪詢,無須阻塞等待。這大大減少了內(nèi)存資源的浪費(fèi),也避免了服務(wù)器在客戶線程中不斷切換帶來的CPU消耗,服務(wù)器對(duì)CPU的有效使用率大大提高。
其核心概念包括Channel,Selector,SelectionKey,Buffer。
Channel是I/O通道,可以向其注冊(cè)Selector,應(yīng)用成功可以通過select操作獲取當(dāng)前通道已經(jīng)準(zhǔn)備好的可以無阻塞執(zhí)行的操作。這由SelectionKey表示。
SelectionKey的常量字段SelectionKey.OP_***分別對(duì)應(yīng)Channel的幾種操作例如connect(),accept(),read(),write()。
select操作后得到SelectionKey.OP_WRITE或者READ即可在Channel上面無阻塞調(diào)用read和write方 法,Channel的讀寫操作均需要通過Buffer進(jìn)行。即讀是講數(shù)據(jù)從通道中讀入Buffer然后做進(jìn)一步處理。寫需要先將數(shù)據(jù)寫入Buffer然后 通道接收Buffer。
下面是一個(gè)使用NIO的基本C/S示例。該示例只為顯示如何使用基本的API而存在,其代碼的健壯性,合理性都不具參考價(jià)值。
這個(gè)示例,實(shí)現(xiàn)一個(gè)簡(jiǎn)單的C/S,客戶端想服務(wù)器端發(fā)送消息,服務(wù)器將收到的消息打印到控制臺(tái)?,F(xiàn)實(shí)的應(yīng)用中需要定義發(fā)送數(shù)據(jù)使用的協(xié)議,以幫 助服務(wù)器解析消息。本示例只是無差別的使用默認(rèn)編碼將收到的字節(jié)轉(zhuǎn)換字符并打印。通過改變初始分配的ByteBuffer的容量,可以看到打印消息的變 化。容量越小,對(duì)一條消息的處理次數(shù)就越多,容量大就可以在更少的循環(huán)次數(shù)內(nèi)讀完整個(gè)消息。所以真是的應(yīng)用場(chǎng)景,要考慮適當(dāng)?shù)木彺娲笮∫蕴岣咝省?/p>
首先是Server:
1. package hadix.demo.nio;
2.
3. import java.io.IOException;
4. import java.net.InetSocketAddress;
5. import java.nio.ByteBuffer;
6. import java.nio.channels.SelectionKey;
7. import java.nio.channels.Selector;
8. import java.nio.channels.ServerSocketChannel;
9. import java.nio.channels.SocketChannel;
10. import java.util.*;
11. import java.util.concurrent.ConcurrentHashMap;
12.
13. /**
14. * User: hAdIx
15. * Date: 11-11-2
16. * Time: 上午11:26
17. */
18. public class Server {
19. private Selector selector;
20. private ByteBuffer readBuffer = ByteBuffer.allocate(8);//調(diào)整緩存的大小可以看到打印輸出的變化
21. private Map clientMessage = new ConcurrentHashMap<>();
22.
23. public void start() throws IOException {
24. ServerSocketChannel ssc = ServerSocketChannel.open();
25. ssc.configureBlocking(false);
26. ssc.bind(new InetSocketAddress("localhost", 8001));
27. selector = Selector.open();
28. ssc.register(selector, SelectionKey.OP_ACCEPT);
29. while (!Thread.currentThread()。isInterrupted()) {
30. selector.select();
31. Setkeys = selector.selectedKeys();
32. IteratorkeyIterator = keys.iterator();
33. while (keyIterator.hasNext()) {
34. SelectionKey key = keyIterator.next();
35. if (!key.isValid()) {
36. continue;
37. }
38. if (key.isAcceptable()) {
39. accept(key);
40. } else if (key.isReadable()) {
41. read(key);
42. }
43. keyIterator.remove();
44. }
45. }
46. }
47.
48. private void read(SelectionKey key) throws IOException {
49. SocketChannel socketChannel = (SocketChannel) key.channel();
50.
51. // Clear out our read buffer so it's ready for new data
52. this.readBuffer.clear();
53.
54. // Attempt to read off the channel
55. int numRead;
56. try {
57. numRead = socketChannel.read(this.readBuffer);
58. } catch (IOException e) {
59. // The remote forcibly closed the connection, cancel
60. // the selection key and close the channel.
61. key.cancel();
62. socketChannel.close();
63. clientMessage.remove(socketChannel);
64. return;
65. }
66.
67. byte[] bytes = clientMessage.get(socketChannel);
68. if (bytes == null) {
69. bytes = new byte[0];
70. }
71. if (numRead > 0) {
72. byte[] newBytes = new byte[bytes.length + numRead];
73. System.arraycopy(bytes, 0, newBytes, 0, bytes.length);
74. System.arraycopy(readBuffer.array(), 0, newBytes, bytes.length, numRead);
75. clientMessage.put(socketChannel, newBytes);
76. System.out.println(new String(newBytes));
77. } else {
78. String message = new String(bytes);
79. System.out.println(message);
80. }
81. }
82.
83. private void accept(SelectionKey key) throws IOException {
84. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
85. SocketChannel clientChannel = ssc.accept();
86. clientChannel.configureBlocking(false);
87. clientChannel.register(selector, SelectionKey.OP_READ);
88. System.out.println("a new client connected");
89. }
90.
91.
92. public static void main(String[] args) throws IOException {
93. System.out.println("server started…");
94. new Server()。start();
95. }
96. }
然后是Client:
1. package hadix.demo.nio;
2.
3. import java.io.IOException;
4. import java.net.InetSocketAddress;
5. import java.nio.ByteBuffer;
6. import java.nio.channels.SelectionKey;
7. import java.nio.channels.Selector;
8. import java.nio.channels.SocketChannel;
9. import java.util.Iterator;
10. import java.util.Scanner;
11. import java.util.Set;
12.
13. /**
14. * User: hAdIx
15. * Date: 11-11-2
16. * Time: 上午11:26
17. */
18. public class Client {
19.
20. public void start() throws IOException {
21. SocketChannel sc = SocketChannel.open();
22. sc.configureBlocking(false);
23. sc.connect(new InetSocketAddress("localhost", 8001));
24. Selector selector = Selector.open();
25. sc.register(selector, SelectionKey.OP_CONNECT);
26. Scanner scanner = new Scanner(System.in);
27. while (true) {
28. selector.select();
29. Setkeys = selector.selectedKeys();
30. System.out.println("keys=" + keys.size());
31. IteratorkeyIterator = keys.iterator();
32. while (keyIterator.hasNext()) {
33. SelectionKey key = keyIterator.next();
34. keyIterator.remove();
35. if (key.isConnectable()) {
36. sc.finishConnect();
37. sc.register(selector, SelectionKey.OP_WRITE);
38. System.out.println("server connected…");
39. break;
40. } else if (key.isWritable()) {
41.
42. System.out.println("please input message");
43. String message = scanner.nextLine();
44. ByteBuffer writeBuffer = ByteBuffer.wrap(message.getBytes());
45. sc.write(writeBuffer);
46. }
47. }
48. }
49. }
50.
51. public static void main(String[] args) throws IOException {
52. new Client()。start();
53. }
54. }