selector 이용한 멀티플렉싱
Selector
- 멀티플렉싱의 중심이 되는 객체
- 다양한 종류의 채널을 감시
- 새로운 입력이 있음을 알림
- select() 는 채널이 준비되기 이전까지 블록킹 시킴. 시키고 새로운 정보가 들어오면 select()를 다시
호출함 - 모든 클라이언트에 대한 입력 대기를 할 수 있음
- 서버 소켓과 동일한 방식으로 정규 소켓을 처리 할 수 있음
- 서버 소켓 : 새로운 연결에 대한 이벤트 (ACCEPT/CONNECT)
- 정규 소켓 : I/O 이벤트 대한 처리 (READ/WRITE)
- select()를 사용 가능한 채널 (selectable channel)
- SelectableChannel 구현한 채널
- DatagramChannel
- Pipe.SinkChannel
- Pipe.SourceChannel
- Server-SocketChannel
- SocketChannel
호출 형식
while(true){
selector.select();
//새로운 입력에 대해 처리...
}
|
select() 사용하기
1. Selector 생성
Selector selector = Selector.open();
2. 서버 채널 및 서버 소켓 생성 그리고 바인딩하기
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); //non블록킹 설정
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(port));
3. 서버 소켓 채널을 등록
ssc.register(selector, SelectionKey.OP_ACCEPT);
4. 채널 이벤트 처리 준비 완료 및 대기
//numKeys는 발생된 이벤트 수
int numKeys = selector.select();
5. 이벤트 처리 (numKeys > 0 이상 일때)
Set<SelectionKey> skeys = selector.selectedKeys();
// 이벤트 처리 하기 ..
6. 이벤트 처리 완료후 selector에게 알리기
selector.selectedKeys().remove(selectionKey);
*채널 등록 삭제
selectionKey = sc.registor(selector , ops);
selectionKey.cancel();
select 감시 명령(OP)
명령
|
소켓
|
서버소켓
|
OP_READ
|
V
| |
OP_WRITE
|
V
| |
OP_CONNECT
|
V
| |
OP_ACCEPT
|
V
|
명령어 설명
- OP_READ (정규 소켓)
- 채널에 데이터가 들어와서 읽을 수 있을 때 발생
- 단일 바이트 오더라도 이벤트가 발생
- 그외 상황들
- 연결이 닫힐 때
- 에러가 발생하는 경우
- OP_WRITE (정규 소켓)
- 채널이 쓰기 가능한 상태가 될 때 발생
- 버퍼 공간이 여유 생길 때 까지 블록킹 되면 그후 블록킹이 풀릴 때
- OP_CONNECT (서버 소켓)
- 정규 소켓이 리모트 서버에 완벽하게 연결되어서 사용할 준비가 되었을 때 발생
- OP_ACCEPT (서버 소켓)
- 서버 소켓에서 하나 혹은 그 이상의 연결이 들어올 때에 발생
- validOps() : SelectableChannel에 대한 어떤 명령이 가능한 지 확인 함
- 혼합 사용가능
sc.register(selector, SelectorKey.OP_READ | SelectorKey.OP_WRITE);
이벤트 흐름도
package ns.jdk14;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
public class MultiplexingChatServer implements Runnable {
private int port;
private Vector<Socket> sockets = new Vector<>();
private Set<Socket> closedSockets = new HashSet<>();
public MultiplexingChatServer(int port) {
this.port = port;
Thread t = new Thread(this,"MultiplexingChatServer");
t.start();
}
@Override
public void run() {
try{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ServerSocket ss = ssc.socket();
InetSocketAddress isa = new InetSocketAddress(port);
ss.bind(isa);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Listening on port : "+port);
ByteBuffer buffer = ByteBuffer.allocate(4096);
while(true){
//이벤트를 받기위해서 대기
int numKeys = selector.select();
//이벤트가 들어온 경우
if(numKeys > 0){
Set<SelectionKey> skeys =
selector.selectedKeys();
Iterator<SelectionKey> it = skeys.iterator();
while(it.hasNext()){
SelectionKey rsk = it.next();
int rskOps = rsk.readyOps(); //명령어 종류들
//새로 들어오는 연결
if((rskOps & SelectionKey.OP_ACCEPT)
== SelectionKey.OP_ACCEPT){
Socket socket = ss.accept();
System.out.println("Connection from : "
+socket);
sockets.addElement(socket);
SocketChannel sc = socket.getChannel();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
//처리하고 나면 지운다.
selector.selectedKeys().remove(rsk);
}
//데이터가 들어온 경우
else if((rskOps & SelectionKey.OP_READ)
== SelectionKey.OP_READ){
SocketChannel ch = (SocketChannel)rsk.channel();
selector.selectedKeys().remove(rsk);
buffer.clear();
ch.read(buffer);
buffer.flip();
System.out.println("Read : "+buffer.limit()
+" bytes from "+ch.socket());
//연결 해지 이벤트도 OP_READ 형태로 발생하므로
if(buffer.limit() == 0){
System.out.println("closing on 0 read");
//세렉터에 채널 등록 해지
rsk.cancel();
Socket socket = ch.socket();
close(socket);
}else{
sendTOAll(buffer);
}
}
}//-- end of while(it.hasNext())
removeClosedSockets();
}
}//--end of while(true)
}catch(IOException ie){
ie.printStackTrace();
}
}
/**
* 4) Data를 각 클라이언트 소켓에 씀
* @param bb
*/
private void sendTOAll(ByteBuffer bb){
for(Enumeration<Socket> e = sockets.elements();
e.hasMoreElements();){
Socket socket = null;
try{
socket = e.nextElement();
SocketChannel sc = socket.getChannel();
//버퍼의 포지션을 시작 위치로 변경함.
bb.rewind();
//보낼것이 있는 경우
while(bb.remaining() > 0){
sc.write(bb);
}
}catch(IOException ie){closedSockets.add(socket);}
}
}
/**
* 소켓을 to-close 리스트에 추가
* @param socket
*/
private void close(Socket socket){
closedSockets.add(socket);
}
/**
* 연결 종료된 소켓 제거
*/
private void removeClosedSockets(){
for(Iterator<Socket> it = closedSockets.iterator();
it.hasNext();){
Socket socket = it.next();
sockets.remove(socket);
System.out.println("Removed "+socket);
}
closedSockets.clear();
}
public static void main(String[] args) {
int port = 5555;
new MultiplexingChatServer(port);
}
}
|
출처 : 인포북 JDK 1.4 Tutorial