NIOServer.java 6.03 KB
   1
   2
   3
   4
   5
   6
   7
   8
   9
  10
  11
  12
  13
  14
  15
  16
  17
  18
  19
  20
  21
  22
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
package com.lyms.hospitalapi;

import com.lyms.platform.common.utils.PropertiesUtils;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
* Created by riecard on 2016/11/17.
*/
@Service
public class NIOServer {

public static final String NIO_SERVER_PORT = PropertiesUtils.getPropertyValue("nio_server_port");
public static final String NIO_SERVER_START = PropertiesUtils.getPropertyValue("nio_server_start");

//通道管理器
private Selector selector;

@PostConstruct
public void init() throws IOException {
if ("1".equals(NIO_SERVER_START)) {
NIOServer server = new NIOServer();
server.initServer(Integer.valueOf(NIO_SERVER_PORT));
server.listen();
}
}

/**
* 获得一个ServerSocket通道,并对该通道做一些初始化的工作
* @param port 绑定的端口号
* @throws IOException
*/
public void initServer(int port) throws IOException {
// 获得一个ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置通道为非阻塞
serverChannel.configureBlocking(false);
// 将该通道对应的ServerSocket绑定到port端口
serverChannel.socket().bind(new InetSocketAddress(port));
// 获得一个通道管理器
this.selector = Selector.open();
//将通道管理器和该通道绑定,并为该通道注册SelectionKey.OP_ACCEPT事件,注册该事件后,
//当该事件到达时,selector.select()会返回,如果该事件没到达selector.select()会一直阻塞。
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
new BeatTask(selector).start();
}

/**
* 采用轮询的方式监听selector上是否有需要处理的事件,如果有,则进行处理
* @throws IOException
*/
@SuppressWarnings("unchecked")
public void listen() throws IOException {
System.out.println("服务端启动成功!");
// 轮询访问selector
while (true) {
//当注册的事件到达时,方法返回;否则,该方法会一直阻塞
selector.select();
// 获得selector中选中的项的迭代器,选中的项为注册的事件
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 删除已选的key,以防重复处理
ite.remove();
// 客户端请求连接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
// 获得和客户端连接的通道
SocketChannel channel = server.accept();
// 设置成非阻塞
channel.configureBlocking(false);

//在这里可以给客户端发送信息哦
channel.write(ByteBuffer.wrap(new String("向客户端发送了一条信息").getBytes()));
//在和客户端连接成功之后,为了可以接收到客户端的信息,需要给通道设置读的权限。
channel.register(this.selector, SelectionKey.OP_READ);

// 获得了可读的事件
} else if (key.isReadable()) {
int length = read(key);
if (length == 0) {
key.channel().close();
}
}

}

}
}
/**
* 处理读取客户端发来的信息 的事件
* @param key
* @throws IOException
*/
public int read(SelectionKey key) {
// 服务器可读取消息:得到事件发生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 创建读取的缓冲区
try {
ByteBuffer buffer = ByteBuffer.allocate(10);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
if (msg.length() == 0) {
return 0;
}
System.out.println("服务端收到信息:" + msg);
if ("exit".equals(msg)) {
channel.close();
} else {
ByteBuffer outBuffer = ByteBuffer.wrap("success".getBytes());
channel.write(outBuffer);// 将消息回送给客户端
}
return msg.length();
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

class BeatTask extends Thread {

private Selector selector;

public BeatTask(Selector selector) {
this.selector = selector;
}

@Override
public void run() {
while (true) {
for (SelectionKey key:selector.keys()) {
System.out.println(key);
Channel channel = key.channel();
if (channel instanceof SocketChannel) {
ByteBuffer outBuffer = ByteBuffer.wrap("heartbeat".getBytes());
try {
((SocketChannel)channel).write(outBuffer);// 将消息回送给客户端
} catch (IOException e) {
e.printStackTrace();
}
}
}
try {
sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

/**
* 启动服务端测试
* @throws IOException
*/
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8000);
server.listen();
}



}