SocketChannel 例子(转)

Socket通讯比较普遍的题材有如下两种:
壹 、设置收发超时;
二 、正确的每一个bit的收发;
叁 、物理线路故障的掩护;
肆 、始终能符合规律办事;
⑤ 、尽量少占系统能源;
n、……
而Socket编制程序有1个共性,尽管100私人住房可能会写出一千种完成,但做的事务却唯有一种,正是:通信
为此,通过学习dnsjava的通讯代码,加上自身在局地连串中的实践,现在交给TCP通讯的例子完成如下,希望能够给想偷懒的人三个简便的消除方案。
本方案在常规的局域网连接中测试过几百万次没什么难题。贫乏更不方便的条件,所以假如使用那个代码发生其余风险的话……
(TcpChannel代码为Brian
Wellington所做,原名为TCPClient,经自身稍作改动)

Java代码

  1. // Copyright (c) 2005 Brian Wellington (bwelling@xbill.org)  
  2.   
  3. package asynchronizedchannel;  
  4.   
  5. import java.io.*;  
  6. import java.net.*;  
  7. import java.nio.*;  
  8. import java.nio.channels.*;  
  9.   
  10. final class TcpChannel  
  11. {  
  12.     private long endTime;  
  13.     private SelectionKey key;  
  14.   
  15.     public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException  
  16.     {  
  17.         boolean done = false;  
  18.         Selector selector = null;  
  19.         this.endTime = endTime;  
  20.         try {  
  21.             selector = Selector.open();  
  22.             channel.configureBlocking(false);  
  23.             key = channel.register(selector, op);  
  24.             done = true;  
  25.         } finally {  
  26.             if (!done && selector != null) {  
  27.                 selector.close();  
  28.             }  
  29.             if (!done) {  
  30.                 channel.close();  
  31.             }  
  32.         }  
  33.     }  
  34.   
  35.     static void blockUntil(SelectionKey key, long endTime) throws IOException  
  36.     {  
  37.         long timeout = endTime – System.currentTimeMillis();  
  38.         int nkeys = 0;  
  39.         if (timeout > 0) {  
  40.             nkeys = key.selector().select(timeout);  
  41.         } else if (timeout == 0) {  
  42.             nkeys = key.selector().selectNow();  
  43.         }  
  44.         if (nkeys == 0) {  
  45.             throw new SocketTimeoutException();  
  46.         }  
  47.     }  
  48.   
  49.     void cleanup()  
  50.     {  
  51.         try {  
  52.             key.selector().close();  
  53.             key.channel().close();  
  54.         } catch (IOException ex) {  
  55.             ex.printStackTrace();  
  56.         }  
  57.     }  
  58.   
  59.     void bind(SocketAddress addr) throws IOException  
  60.     {  
  61.         SocketChannel channel = (SocketChannel) key.channel();  
  62.         channel.socket().bind(addr);  
  63.     }  
  64.   
  65.     void connect(SocketAddress addr) throws IOException  
  66.     {  
  67.         SocketChannel channel = (SocketChannel) key.channel();  
  68.         if (channel.connect(addr))  
  69.             return;  
  70.         key.interestOps(SelectionKey.OP_CONNECT);  
  71.         try {  
  72.             while (!channel.finishConnect()) {  
  73.                 if (!key.isConnectable()) {  
  74.                     blockUntil(key, endTime);  
  75.                 }  
  76.             }  
  77.         } finally {  
  78.             if (key.isValid()) {  
  79.                 key.interestOps(0);  
  80.             }  
  81.         }  
  82.     }  
  83.   
  84.     void send(ByteBuffer buffer) throws IOException  
  85.     {  
  86.         Send.operate(key, buffer, endTime);  
  87.     }  
  88.   
  89.     void recv(ByteBuffer buffer) throws IOException  
  90.     {  
  91.         Recv.operate(key, buffer, endTime);  
  92.     }  
  93. }  
  94.   
  95. interface Operator  
  96. {  
  97.     class Operation  
  98.     {  
  99.         static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException  
  100.         {  
  101.             final SocketChannel channel = (SocketChannel) key.channel();  
  102.             final int total = buffer.capacity();  
  103.             key.interestOps(op);  
  104.             try {  
  105.                 while (buffer.position() < total) {  
  106.                     if (System.currentTimeMillis() > endTime) {  
  107.                         throw new SocketTimeoutException();  
  108.                     }  
  109.                     if ((key.readyOps() & op) != 0) {  
  110.                         if (optr.io(channel, buffer) < 0) {  
  111.                             throw new EOFException();  
  112.                         }  
  113.                     } else {  
  114.                         TcpChannel.blockUntil(key, endTime);  
  115.                     }  
  116.                 }  
  117.             } finally {  
  118.                 if (key.isValid()) {  
  119.                     key.interestOps(0);  
  120.                 }  
  121.             }  
  122.         }  
  123.     }  
  124.   
  125.     int io(SocketChannel channel, ByteBuffer buffer) throws IOException;  
  126. }  
  127. class Send implements Operator  
  128. {  
  129.     public int io(SocketChannel channel, ByteBuffer buffer) throws IOException  
  130.     {  
  131.         return channel.write(buffer);  
  132.     }  
  133.     public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException  
  134.     {  
  135.         Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);  
  136.     }  
  137.     public static final Send operator = new Send();  
  138. }  
  139.   
  140. class Recv implements Operator  
  141. {  
  142.     public int io(SocketChannel channel, ByteBuffer buffer) throws IOException  
  143.     {  
  144.         return channel.read(buffer);  
  145.     }  
  146.       
  147.     public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException  
  148.     {  
  149.         Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);  
  150.     }  
  151.     public static final Recv operator = new Recv();  
  152. }  

    // Copyright (c) 2005 Brian Wellington (bwelling@xbill.org) package asynchronizedchannel; import java.io.; import java.net.; import java.nio.; import java.nio.channels.; final class TcpChannel { private long endTime; private SelectionKey key; public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException { boolean done = false; Selector selector = null; this.endTime = endTime; try { selector = Selector.open(); channel.configureBlocking(false); key = channel.register(selector, op); done = true; } finally { if (!done && selector != null) { selector.close(); } if (!done) { channel.close(); } } } static void blockUntil(SelectionKey key, long endTime) throws IOException { long timeout = endTime – System.currentTimeMillis(); int nkeys = 0; if (timeout > 0) { nkeys = key.selector().select(timeout); } else if (timeout == 0) { nkeys = key.selector().selectNow(); } if (nkeys == 0) { throw new SocketTimeoutException(); } } void cleanup() { try { key.selector().close(); key.channel().close(); } catch (IOException ex) { ex.printStackTrace(); } } void bind(SocketAddress addr) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); channel.socket().bind(addr); } void connect(SocketAddress addr) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); if (channel.connect(addr)) return; key.interestOps(SelectionKey.OP_CONNECT); try { while (!channel.finishConnect()) { if (!key.isConnectable()) { blockUntil(key, endTime); } } } finally { if (key.isValid()) { key.interestOps(0); } } } void send(ByteBuffer buffer) throws IOException { Send.operate(key, buffer, endTime); } void recv(ByteBuffer buffer) throws IOException { Recv.operate(key, buffer, endTime); } } interface Operator { class Operation { static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException { final SocketChannel channel = (SocketChannel) key.channel(); final int total = buffer.capacity(); key.interestOps(op); try { while (buffer.position() < total) { if (System.currentTimeMillis() > endTime) { throw new SocketTimeoutException(); } if ((key.readyOps() & op) != 0) { if (optr.io(channel, buffer) < 0) { throw new EOFException(); } } else { TcpChannel.blockUntil(key, endTime); } } } finally { if (key.isValid()) { key.interestOps(0); } } } } int io(SocketChannel channel, ByteBuffer buffer) throws IOException; } class Send implements Operator { public int io(SocketChannel channel, ByteBuffer buffer) throws IOException { return channel.write(buffer); } public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException { Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator); } public static final Send operator = new Send(); } class Recv implements Operator { public int io(SocketChannel channel, ByteBuffer buffer) throws IOException { return channel.read(buffer); } public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException { Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator); } public static final Recv operator = new Recv(); }

动用演示见之下代码。
大体说美赞臣下,Server端开5656侦听,Client端开多少线程测试Socket通讯。每回发送240字节新闻+16字节MD5校验。服务端收到音讯之后做MD5反省,正确的,发送“.xxxx”表示肯定,不然发送“?xxxx”表示故障。
行业内部使用中能够再安装tryout尝试n次。
Server端,代码演示:

Java代码

  1. package asynchronizedchannel;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.Selector;  
  8. import java.nio.channels.ServerSocketChannel;  
  9. import java.nio.channels.SocketChannel;  
  10. import java.security.MessageDigest;  
  11. import java.util.Iterator;  
  12.   
  13. public class Server  
  14. {  
  15.   
  16.     /** 
  17.      * 服务端通讯范例程序主函数 
  18.      *  
  19.      * @param args 
  20.      * @throws IOException 
  21.      */  
  22.     public static void main(String[] args) throws IOException  
  23.     {  
  24.         // Create the selector  
  25.         final Selector selector = Selector.open();  
  26.         final ServerSocketChannel server = ServerSocketChannel.open();  
  27.         server.configureBlocking(false);  
  28.         server.socket().bind(new InetSocketAddress(“xx.xx.xx.xx”, 5656), 5);  
  29.         // Register both channels with selector  
  30.         server.register(selector, SelectionKey.OP_ACCEPT);  
  31.         new Thread(new Daemon(selector)).start();  
  32.     }  
  33. }  
  34.   
  35. class Daemon implements Runnable  
  36. {  
  37.     private final Selector selector;  
  38.   
  39.     Daemon(Selector selector)  
  40.     {  
  41.         this.selector = selector;  
  42.     }  
  43.   
  44.     public void run()  
  45.     {  
  46.         while (true) {  
  47.             try {  
  48.                 // Wait for an event  
  49.                 selector.select();  
  50.   
  51.                 // Get list of selection keys with pending events  
  52.                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();  
  53.   
  54.                 // Process each key  
  55.                 while (it.hasNext()) {  
  56.                     // Get the selection key  
  57.                     SelectionKey selKey = it.next();  
  58.   
  59.                     // Remove it from the list to indicate that it is being processed  
  60.                     it.remove();  
  61.   
  62.                     // Check if it’s a connection request  
  63.                     if (selKey.isAcceptable()) {  
  64.                         // Get channel with connection request  
  65.                         ServerSocketChannel server = (ServerSocketChannel) selKey.channel();  
  66.                         // Accept the connection request.  
  67.                         // If serverSocketChannel is blocking, this method blocks.  
  68.                         // The returned channel is in blocking mode.  
  69.                         SocketChannel channel = server.accept();  
  70.   
  71.                         // If serverSocketChannel is non-blocking, sChannel may be null  
  72.                         if (channel != null) {  
  73.                             // Use the socket channel to communicate with the client  
  74.                             new Thread(new ServerHandler(channel)).start();  
  75.                         } else {  
  76.                             System.out.println(“—No Connection—“);  
  77.                             // There were no pending connection requests; try again later.  
  78.                             // To be notified of connection requests,  
  79.                         }  
  80.                     }  
  81.                 }  
  82.             } catch (Exception ex) {  
  83.                 ex.printStackTrace();  
  84.             }  
  85.         }  
  86.     }  
  87. }  
  88.   
  89. class ServerHandler implements Runnable  
  90. {  
  91.     private static final long timeout = 30 * 一千; // 设置超时时间为30秒  
  92.     private static int counter = 0;  
  93.     private final TcpChannel channel;  
  94.     private final MessageDigest md;  
  95.   
  96.     ServerHandler(SocketChannel channel) throws Exception  
  97.     {  
  98.         this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);  
  99.         md = MessageDigest.getInstance(“md5”);  
  100.     }  
  101.   
  102.     public void run()  
  103.     {  
  104.         try {  
  105.             while (true) {  
  106.                 work();  
  107.                 synchronized (ServerHandler.class) {  
  108.                     if ((++counter & 65535) == 0) {  
  109.                         System.out.println(counter);  
  110.                     }  
  111.                 }  
  112.             }  
  113.         } catch (Exception e) {  
  114.             e.printStackTrace();  
  115.         } finally {  
  116.             channel.cleanup();  
  117.         }  
  118.     }  
  119.   
  120.     private void work() throws IOException  
  121.     { // 模拟工作流程  
  122.         byte[] cache = new byte[256], reply = new byte[5];  
  123.         read(cache, reply);  
  124.     }  
  125.   
  126.     private void read(byte[] cache, byte[] reply) throws IOException  
  127.     { // 从套接字读入数据  
  128.         channel.recv(ByteBuffer.wrap(cache));  
  129.         md.reset();  
  130.         md.update(cache, 0, 240);  
  131.         byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码  
  132.         if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节可比  
  133.             reply[0] = ‘?’;  
  134.             System.out.println(“MISMATCH!”);  
  135.         } else {  
  136.             reply[0] = ‘.’;  
  137.         }  
  138.         channel.send(ByteBuffer.wrap(reply)); // 再次回到接收结果  
  139.     }  
  140. }  
  141.   
  142. final class ExtArrays  
  143. {  
  144.     private ExtArrays()  
  145.     {  
  146.     }  
  147.   
  148.     public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)  
  149.     { // 字节数组的部分比较  
  150.         if (a == null || b == null) {  
  151.             return false;  
  152.         }  
  153.         if (offset_a + len > a.length || offset_b + len > b.length) {  
  154.             return false;  
  155.         }  
  156.         for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k–) {  
  157.             if (a[i] != b[j]) {  
  158.                 return false;  
  159.             }  
  160.         }  
  161.         return true;  
  162.     }  
  163. }  

    package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.security.MessageDigest; import java.util.Iterator; public class Server { /* 服务端通讯范例程序主函数 @param args @throws IOException / public static void main(String[] args) throws IOException { // Create the selector final Selector selector = Selector.open(); final ServerSocketChannel server = ServerSocketChannel.open(); server.configureBlocking(false); server.socket().bind(new InetSocketAddress(“xx.xx.xx.xx”, 5656), 5); // Register both channels with selector server.register(selector, SelectionKey.OP_ACCEPT); new Thread(new Daemon(selector)).start(); } } class Daemon implements Runnable { private final Selector selector; Daemon(Selector selector) { this.selector = selector; } public void run() { while (true) { try { // Wait for an event selector.select(); // Get list of selection keys with pending events Iterator it = selector.selectedKeys().iterator(); // Process each key while (it.hasNext()) { // Get the selection key SelectionKey selKey = it.next(); // Remove it from the list to indicate that it is being processed it.remove(); // Check if it’s a connection request if (selKey.isAcceptable()) { // Get channel with connection request ServerSocketChannel server = (ServerSocketChannel) selKey.channel(); // Accept the connection request. // If serverSocketChannel is blocking, this method blocks. // The returned channel is in blocking mode. SocketChannel channel = server.accept(); // If serverSocketChannel is non-blocking, sChannel may be null if (channel != null) { // Use the socket channel to communicate with the client new Thread(new ServerHandler(channel)).start(); } else { System.out.println(“—No Connection—“); // There were no pending connection requests; try again later. // To be notified of connection requests, } } } } catch (Exception ex) { ex.printStackTrace(); } } } } class ServerHandler implements Runnable { private static final long timeout = 30 * 一千; // 设置超时时间为30秒 private static int counter = 0; private final TcpChannel channel; private final MessageDigest md; ServerHandler(SocketChannel channel) throws Exception { this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ); md = MessageDigest.getInstance(“md5”); } public void run() { try { while (true) { work(); synchronized (ServerHandler.class) { if ((++counter & 65535) == 0) { System.out.println(counter); } } } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException { // 模拟工作流程 byte[] cache = new byte[256], reply = new byte[5]; read(cache, reply); } private void read(byte[] cache, byte[] reply) throws IOException { // 从套接字读入数据 channel.recv(ByteBuffer.wrap(cache)); md.reset(); md.update(cache, 0, 240); byte[] md5 = md.digest(); // 使用前240字节发生MD5校验码 if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节可比 reply[0] = ‘?’; System.out.println(“MISMATCH!”); } else { reply[0] = ‘.’; } channel.send(ByteBuffer.wrap(reply)); // 重返接收结果 } } final class ExtArrays { private ExtArrays() { } public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len) { // 字节数组的一部分可比 if (a == null || b == null) { return false; } if (offset_a + len > a.length || offset_b + len > b.length) { return false; } for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k–) { if (a[i] != b[j]) { return false; } } return true; } }

Client端,代码演示:

Java代码

  1. package asynchronizedchannel;  
  2.   
  3. import java.io.IOException;  
  4. import java.net.InetSocketAddress;  
  5. import java.nio.ByteBuffer;  
  6. import java.nio.channels.SelectionKey;  
  7. import java.nio.channels.SocketChannel;  
  8. import java.security.DigestException;  
  9. import java.security.MessageDigest;  
  10. import java.util.Random;  
  11.   
  12. public class Client  
  13. {  
  14.     private static int id = 0;  
  15.     /** 
  16.      * 客户端通讯范例程序主函数 
  17.      *  
  18.      * @param args 
  19.      * @throws Exception 
  20.      */  
  21.     public static void main(String[] args) throws Exception  
  22.     {  
  23.         new Thread(new ClientHandler(id++)).start();  
  24.         new Thread(new ClientHandler(id++)).start();  
  25.         new Thread(new ClientHandler(id++)).start();  
  26.         new Thread(new ClientHandler(id++)).start();  
  27.         new Thread(new ClientHandler(id++)).start();  
  28.     }  
  29.   
  30. }  
  31.   
  32. class ClientHandler implements Runnable  
  33. {  
  34.     private static final long timeout = 30 * 1000; // 设置超时时间为30秒  
  35.     private final TcpChannel channel;  
  36.       
  37.     private final int id;  
  38.   
  39.     private final MessageDigest md;  
  40.     private final Random rand;  
  41.   
  42.     ClientHandler(int id) throws Exception  
  43.     {  
  44.         this.id = id;  
  45.         channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);  
  46.         md = MessageDigest.getInstance(“md5”);  
  47.         rand = new Random();  
  48.     }  
  49.   
  50.     @Override  
  51.     public void run()  
  52.     {  
  53.         try {  
  54.             channel.connect(new InetSocketAddress(“xx.xx.xx.xx”, 5656));  
  55.             int i = 0;  
  56.             while (true) {  
  57.                 work();  
  58.                 if ((++i & 16383) == 0) {  
  59.                     System.out.println(String.format(“client(%1$d): %2$d”, id, i));  
  60.                 }  
  61.                 Thread.yield();  
  62.             }  
  63.         } catch (Exception e) {  
  64.             e.printStackTrace();  
  65.         } finally {  
  66.             channel.cleanup();  
  67.         }  
  68.     }  
  69.   
  70.     private void work() throws IOException, DigestException  
  71.     {  
  72.         byte[] cache = new byte[256], reply = new byte[5];  
  73.         write(cache, reply);  
  74.     }  
  75.   
  76.     private void write(byte[] cache, byte[] reply) throws DigestException, IOException  
  77.     {  
  78.         rand.nextBytes(cache); // 只用后面的240字节  
  79.         md.reset();  
  80.         md.update(cache, 0, 240);  
  81.         md.digest(cache, 240, 16); // MD5校验码占后边16字节  
  82.         ByteBuffer buffer = ByteBuffer.wrap(cache);  
  83.         channel.send(buffer);  
  84.         buffer = ByteBuffer.wrap(reply);  
  85.         channel.recv(buffer);  
  86.         if (reply[0] != ‘.’) { // 若接收的结果不科学,能够考虑尝试再次发送  
  87.             System.out.println(“MISMATCH!”);  
  88.         }  
  89.     }  
  90. }  

    package asynchronizedchannel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.security.DigestException; import java.security.MessageDigest; import java.util.Random; public class Client { private static int id = 0; /* 客户端通讯范例程序主函数 @param args @throws Exception / public static void main(String[] args) throws Exception { new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); new Thread(new ClientHandler(id++)).start(); } } class ClientHandler implements Runnable { private static final long timeout = 30 * 一千; // 设置超时时间为30秒 private final TcpChannel channel; private final int id; private final MessageDigest md; private final Random rand; ClientHandler(int id) throws Exception { this.id = id; channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE); md = MessageDigest.getInstance(“md5”); rand = new Random(); } @Override public void run() { try { channel.connect(new InetSocketAddress(“xx.xx.xx.xx”, 5656)); int i = 0; while (true) { work(); if ((++i & 16383) == 0) { System.out.println(String.format(“client(%1$d): %2$d”, id, i)); } Thread.yield(); } } catch (Exception e) { e.printStackTrace(); } finally { channel.cleanup(); } } private void work() throws IOException, DigestException { byte[] cache = new byte[256], reply = new byte[5]; write(cache, reply); } private void write(byte[] cache, byte[] reply) throws DigestException, IOException { rand.nextBytes(cache); // 只用后面包车型大巴240字节 md.reset(); md.update(cache, 0, 240); md.digest(cache, 240, 16); // MD5校验码占后边16字节 ByteBuffer buffer = ByteBuffer.wrap(cache); channel.send(buffer); buffer = ByteBuffer.wrap(reply); channel.recv(buffer); if (reply[0] != ‘.’) { // 若接收的结果不科学,能够设想尝试再次发送 System.out.println(“MISMATCH!”); } } }

重在表达:

发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的音讯,接收的时候依据该音信接收完整然后再处理。

 转http://regular.javaeye.com/blog/653936

相关文章