2016년 7월 22일 금요일

JDK 1.4 튜토리얼 - 2장 NIO- 논블록킹-Selector

selector 이용한 멀티플렉싱

Selector
  • 멀티플렉싱의 중심이 되는 객체
  • 다양한 종류의 채널을 감시
  • 새로운 입력이 있음을 알림
  • select() 는 채널이 준비되기 이전까지 블록킹 시킴. 시키고 새로운 정보가 들어오면 select()를 다시
    호출함
  • 모든 클라이언트에 대한 입력 대기를 할 수 있음
  • 서버 소켓과 동일한 방식으로 정규 소켓을 처리 할 수 있음
    • 서버 소켓 : 새로운 연결에 대한 이벤트 (ACCEPT/CONNECT)
    • 정규 소켓 : I/O 이벤트 대한 처리 (READ/WRITE)
  • select()를 사용 가능한 채널 (selectable channel)
    • SelectableChannel 구현한 채널
    • DatagramChannel
    • Pipe.SinkChannel
    • Pipe.SourceChannel
    • Server-SocketChannel
    • SocketChannel
호출 형식
while(true){
   selector.select();
   //새로운 입력에 대해 처리...
}


select() 사용하기
1. Selector 생성
Selector selector = Selector.open();
2. 서버 채널 및 서버 소켓 생성 그리고 바인딩하기
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); //non블록킹 설정
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(port));
3. 서버 소켓 채널을 등록
ssc.register(selector, SelectionKey.OP_ACCEPT);
4. 채널 이벤트 처리 준비 완료 및 대기
//numKeys는 발생된 이벤트 수
int numKeys = selector.select();
5. 이벤트 처리 (numKeys > 0 이상 일때)
Set<SelectionKey> skeys = selector.selectedKeys();
// 이벤트 처리 하기 ..
6. 이벤트 처리 완료후 selector에게 알리기
selector.selectedKeys().remove(selectionKey);


*채널 등록 삭제
selectionKey = sc.registor(selector , ops);
selectionKey.cancel();


select 감시 명령(OP)
명령
소켓
서버소켓
OP_READ
V

OP_WRITE
V

OP_CONNECT

V
OP_ACCEPT

V


명령어 설명
  • OP_READ (정규 소켓)
    • 채널에 데이터가 들어와서 읽을 수 있을 때 발생
    • 단일 바이트 오더라도 이벤트가 발생
    • 그외 상황들
      • 연결이 닫힐 때
      • 에러가 발생하는 경우
  • OP_WRITE (정규 소켓)
    • 채널이 쓰기 가능한 상태가 될 때 발생
    • 버퍼 공간이 여유 생길 때 까지 블록킹 되면 그후 블록킹이 풀릴 때
  • OP_CONNECT (서버 소켓)
    • 정규 소켓이 리모트 서버에 완벽하게 연결되어서 사용할 준비가 되었을 때 발생
  • OP_ACCEPT (서버 소켓)
    • 서버 소켓에서 하나 혹은 그 이상의 연결이 들어올 때에 발생
  • validOps() : SelectableChannel에 대한 어떤 명령이 가능한 지 확인 함
  • 혼합 사용가능
    sc.register(selector, SelectorKey.OP_READ | SelectorKey.OP_WRITE);


이벤트 흐름도


package ns.jdk14;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
public class MultiplexingChatServer implements Runnable {
   private int port;
   private Vector<Socket> sockets = new Vector<>();
   private Set<Socket> closedSockets = new HashSet<>();
   public MultiplexingChatServer(int port) {
       this.port = port;
       Thread t = new Thread(this,"MultiplexingChatServer");
       t.start();
   }
   @Override
   public void run() {
       try{
           ServerSocketChannel ssc = ServerSocketChannel.open();
           ssc.configureBlocking(false);
           ServerSocket ss = ssc.socket();
           InetSocketAddress isa = new InetSocketAddress(port);
           ss.bind(isa);

           Selector selector = Selector.open();
           ssc.register(selector, SelectionKey.OP_ACCEPT);
           System.out.println("Listening on port : "+port);
           ByteBuffer buffer = ByteBuffer.allocate(4096);

           while(true){
               //이벤트를 받기위해서 대기
               int numKeys = selector.select();
               //이벤트가 들어온 경우
               if(numKeys > 0){
                   Set<SelectionKey> skeys =
                           selector.selectedKeys();
                   Iterator<SelectionKey> it = skeys.iterator();
                   while(it.hasNext()){
                       SelectionKey rsk = it.next();
                       int rskOps = rsk.readyOps();  //명령어 종류들
                       //새로 들어오는 연결
                       if((rskOps & SelectionKey.OP_ACCEPT)
                               == SelectionKey.OP_ACCEPT){
                           Socket socket = ss.accept();
                           System.out.println("Connection from : "
                                   +socket);
                           sockets.addElement(socket);
                           SocketChannel sc = socket.getChannel();
                           sc.configureBlocking(false);
                           sc.register(selector,SelectionKey.OP_READ);
                           //처리하고 나면 지운다.
                           selector.selectedKeys().remove(rsk);
                       }
                       //데이터가 들어온 경우
                       else if((rskOps & SelectionKey.OP_READ)
                               == SelectionKey.OP_READ){
                           SocketChannel ch = (SocketChannel)rsk.channel();
                           selector.selectedKeys().remove(rsk);
                           buffer.clear();
                           ch.read(buffer);
                           buffer.flip();
                           System.out.println("Read : "+buffer.limit()
                                   +" bytes from "+ch.socket());
                           //연결 해지 이벤트도 OP_READ 형태로 발생하므로
                           if(buffer.limit() == 0){
                               System.out.println("closing on 0 read");
                               //세렉터에 채널 등록 해지
                               rsk.cancel();
                               Socket socket =  ch.socket();
                               close(socket);
                           }else{
                               sendTOAll(buffer);
                           }
                       }
                   }//-- end of while(it.hasNext())

                   removeClosedSockets();
               }
           }//--end of while(true)
       }catch(IOException ie){
           ie.printStackTrace();
       }
   }
   /**
    * 4) Data 클라이언트 소켓에
    * @param bb
    */
   private void sendTOAll(ByteBuffer bb){
       for(Enumeration<Socket> e = sockets.elements();
           e.hasMoreElements();){
           Socket socket = null;
           try{
               socket = e.nextElement();
               SocketChannel sc = socket.getChannel();
               //버퍼의 포지션을 시작 위치로 변경함.
               bb.rewind();
               //보낼것이 있는 경우
               while(bb.remaining() > 0){
                   sc.write(bb);
               }
           }catch(IOException ie){closedSockets.add(socket);}
       }
   }
   /**
    * 소켓을 to-close 리스트에 추가
    * @param socket
    */
   private void close(Socket socket){
       closedSockets.add(socket);
   }
   /**
    * 연결 종료된 소켓 제거
    */
   private void removeClosedSockets(){
       for(Iterator<Socket> it = closedSockets.iterator();
           it.hasNext();){
           Socket socket = it.next();
           sockets.remove(socket);
           System.out.println("Removed "+socket);
       }
       closedSockets.clear();
   }
   public static void main(String[] args) {
       int port = 5555;
       new MultiplexingChatServer(port);
   }
}

출처 : 인포북 JDK 1.4 Tutorial

댓글 없음: