package ns.jdk14;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
public class MultiplexingChatServer implements Runnable {
private int port;
private Vector<Socket> sockets = new Vector<>();
private Set<Socket> closedSockets = new HashSet<>();
public MultiplexingChatServer(int port) {
this.port = port;
Thread t = new Thread(this,"MultiplexingChatServer");
t.start();
}
@Override
public void run() {
try{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress isa = new InetSocketAddress(port);
ss.bind(isa);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Listening on port : "+port);
ByteBuffer buffer = ByteBuffer.allocate(4096);
while(true){
//이벤트를 받기위해서 대기
int numKeys = selector.select();
//이벤트가 들어온 경우
if(numKeys > 0){
Set<SelectionKey> skeys =
selector.selectedKeys();
Iterator<SelectionKey> it = skeys.iterator();
while(it.hasNext()){
SelectionKey rsk = it.next();
int rskOps = rsk.readyOps(); //명령어 종류들
//새로 들어오는 연결
if((rskOps & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT){
Socket socket = ss.accept();
System.out.println("Connection from : "
+socket);
sockets.addElement(socket);
SocketChannel sc = socket.getChannel();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
//처리하고 나면 지운다.
selector.selectedKeys().remove(rsk);
}
//데이터가 들어온 경우
else if((rskOps & SelectionKey.OP_READ)
== SelectionKey.OP_READ){
SocketChannel ch = (SocketChannel)rsk.channel();
selector.selectedKeys().remove(rsk);
buffer.clear();
ch.read(buffer);
buffer.flip();
System.out.println("Read : "+buffer.limit()
+" bytes from "+ch.socket());
//연결 해지 이벤트도 OP_READ 형태로 발생하므로
if(buffer.limit() == 0){
System.out.println("closing on 0 read");
//세렉터에 채널 등록 해지
rsk.cancel();
Socket socket = ch.socket();
close(socket);
}else{
sendTOAll(buffer);
}
}
}//-- end of while(it.hasNext())
removeClosedSockets();
}
}//--end of while(true)
}catch(IOException ie){
ie.printStackTrace();
}
}
/**
* 4) Data를 각 클라이언트 소켓에 씀
* @param bb
*/
private void sendTOAll(ByteBuffer bb){
for(Enumeration<Socket> e = sockets.elements();
e.hasMoreElements();){
Socket socket = null;
try{
socket = e.nextElement();
SocketChannel sc = socket.getChannel();
//버퍼의 포지션을 시작 위치로 변경함.
bb.rewind();
//보낼것이 있는 경우
while(bb.remaining() > 0){
sc.write(bb);
}
}catch(IOException ie){closedSockets.add(socket);}
}
}
/**
* 소켓을 to-close 리스트에 추가
* @param socket
*/
private void close(Socket socket){
closedSockets.add(socket);
}
/**
* 연결 종료된 소켓 제거
*/
private void removeClosedSockets(){
for(Iterator<Socket> it = closedSockets.iterator();
it.hasNext();){
Socket socket = it.next();
sockets.remove(socket);
System.out.println("Removed "+socket);
}
closedSockets.clear();
}
public static void main(String[] args) {
int port = 5555;
new MultiplexingChatServer(port);
}
}
|