0xSEUNGJU
CodeMonkie
0xSEUNGJU
전체 방문자
오늘
어제
  • 분류 전체보기 (15)
    • linux (8)
    • web (2)
    • android (0)
    • data science (1)
    • algorithms (3)
    • 보안 (1)
    • hardware (0)

블로그 메뉴

  • github
  • playground

공지사항

인기 글

태그

최근 댓글

최근 글

티스토리

hELLO · Designed By 정상우.
0xSEUNGJU

CodeMonkie

Netty - 동기/비동기, 블로킹/논블로킹#2
web

Netty - 동기/비동기, 블로킹/논블로킹#2

2022. 10. 5. 17:59

동기/비동기

  • 멀티스레드 프로그래밍에서 동기화는 하나의 데이터에 대한 동시 접근을 하나의 스레드로 한정한다는 의미이다.
  • 비동기 호출을 지원하는 디자인 패턴은 다양하다.(Future, Observer, Callback, Reactor)
  • netty는 비동기 호출을 위한 API들을 프레임워크 레벨에서 제공해서 개발자가 스레드 동기화 이슈를 신경쓰는 대신 구현할 기능에 집중할 수 있다.

 

블로킹/논블로킹

  • 블로킹은 요청한 작업이 성공하거나 에러가 발생하기 전까지 응답을 돌려주지 않는 것
  • 논블로킹은 요청한 작업의 성공 여부와 상관없이 바로 결과를 돌려주는 것

 

블로킹 소켓

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class BlockingServer {

    public void run() throws IOException {
        ServerSocket server = new ServerSocket(8888);
        System.out.println("접속 대기중");

        while(true) {
            Socket socket = server.accept();
            System.out.println("클라이언트 연결됨");

            OutputStream out = socket.getOutputStream();
            InputStream in = socket.getInputStream();

            while(true) {
                try {
                    int request = in.read();
                    out.write(request);
                } catch (IOException e) {
                    break;
                }
            }
        }
    }
}
  • 디버그 모드로 실행하면 server.accept()에서 멈춘다.
  • 연결되는 클라이언트가 없으면 프로그램은 아무런 동작도 하지 않으며 스레드는 해당 함수의 완료를 기다리며 대기한다.

 

telnet localhost 8888

 

read 블로킹

1. 서버에 접속

2. accept() 실행이 완료

3. 클라이언트와 연결된 소켓을 생성

4. 데이터를 읽기 위한 read 명령에서 다시 멈춤

5. 클라이언트로부터 데이터가 수신되기를 기다리면서 스레드가 블로킹

 

write 블로킹

  • 클라이언트 또한 서버에서 클라이언트로 전송한 데이터를 읽기 위한 메서드에서 블로킹이 발생
  • read와 달리 운영체제의 송신 Buffer에 전송할 데이터를 기록
  • 이때 송신 Buffer의 남은 크기가 write 메서드에서 기록한 데이터의 크기보다 작다면 송신 Buffer가 비워질 때까지 블로킹

 

블로킹 소켓은 데이터 입출력에서 스레드의 블로킹이 발생하기 때문에 동시에 여러 클라이언트에 대한 처리가 불가능하다. 이 문제를 해결하기 위해 연결된 클라이언트 별로 각각 스레드를 할당하는 방법이다.

1. 클라이언트가 서버에 접속한다.

2. 서버 소켓의 accept 메서드를 통해 연결된 클라이언트 소켓을 얻어온다.

3. 블로킹 소켓은 I/O 처리에 블로킹이 발생하기 때문에 새로운 스레드를 하나 생성

4. 스레드에게 클라이언트 소켓에 대한 I/O 처리를 넘겨준다.

 

하지만 서버 소켓의 accept 메서드가 병목지점이 되어 동접 요청이 들어왔을 때 대기시간이 길어진다는 단점이 있다. 또한 접속할 클라이언트 수가 정해져 있지 않은 상황에서도 문제가 있을 수 있다. 서버에 접속하는 클라이언트 수가 증가하면 서버의 스레드 수가 증가하게 되는데 이때 자바의 힙 메모리 부족으로 인한 OOM(Out Of Memory) 오류가 발생할 수 있다. 

 

위와 같은 상황을 예방하려면 서버에서 생성되는 스레드 수를 제한하는 방법인 스레드 풀링을 사용하기도 한다. 

1. 클라이언트가 서버에 접속하면 서버 소켓으로부터 클라이언트 소켓을 얻어온다.

2. 스레드 풀에서 가용 스레드를 하나 가져오고 해당 스레드에 클라이언트 소켓을 할당한다.

3. 클라이언트 소켓에서 발생하는 I/O 처리를 할당된 스레드가 전담한다.

 

이와 같은 구조는 동접 가능 사용자 수가 스레드 풀에 지정된 스레드 수에 의존하는 현상이 발생한다. 동접 수를 늘리기 위해 스레드 풀의 크기를 자바 힙이 허용하는 최대 한도에 도달할 때까지 늘리는 것이 합당한지 두 가지 관점에서 생각해 볼 필요가 있다.

  • 자바의 가비지 컬랙션에 대한 관점 --만약 하드웨어에 장착된 메모리가 충분히 커서 자바 프로세스의 힙 메모리를 원하는 만큼 할당할 수 있다고 가정했을 때
    • 자바 프로세스가 가동되고 시간이 흐름에 따라 가비지 컬랙션의 대상이 되는 객체수가 늘어나게 되고 가비지 컬랙션이 동작하게 된다. 
    • 이때 자바 프로세스는 가비지 컬랙션을 완료하기 위해서 다른 스레드를 멈추게 되는데 그러면 애플리케이션이 먹통이 된 것처럼 보이게 된다.
    • 특히 힙 크기가 크면 클수록 가비지 컬랙션에 드는 시간이 길어진다
    • 힙 크기와 생성 가능한 스레드 수가 비례하게되고 힙헤 할당된 메모리가 클수록 가비지 컬랙션이 수행되는 횟수는 줄어들지만 수행시간은 길어진다.
  • 운영체제에서 사용되는 컨텍스트 스위칭 관점
    • 컨텍스트 스위칭이란 한 프로세스에서 수행되는 스레드들이 cpu 점유를 위해 자신의 상태를 변경하는 작업
    • 이 때 많은 스레드가 cpu 자원획득을 위해 경쟁하면서 cpu 자원을 소모하기 때문에 실제로 작업에 사용할 cpu 자원이 적어지게됨

 

논블로킹 소켓

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class NonBlockingServer {
    private Map<SocketChannel, List<byte[]>> keepDataTrack = new HashMap<>();
    private ByteBuffer buffer = ByteBuffer.allocate(2*1024);
    
    public void startEchoServer() {
        // finally에서 자원해제하는 걸 이렇게 할 수 있음
        try(
            Selector selector = Selector.open(); // 자신에게 등록된 채널에 변경 사항이 발생했는지 검사하고 변경 사항이 발생한 채널에 대한 접근을 가능하게 해준다.
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open() // 논블로킹 서버 소켓 채널을 생성/ 블로킹 소켓과 다르게 소켓 채널을 먼저 생성하고 사용할 포트를 바인딩한다.
        ) {
            if((serverSocketChannel.isOpen()) && (selector.isOpen())) { // 생성한 Selector 객체와 ServerSocketChannel 객체가 정상적으로 생성되었는지 확인
                serverSocketChannel.configureBlocking(false); // 소켓 채널의 블로킹모드의 기본값은 true. 설정하지 않으면 블로킹모드로 작동
                serverSocketChannel.bind(new InetSocketAddress(8888)); // 포트 지정하고 생성된 채널객체에 할당한다. 

                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 채널 객체를 selector 객체에 등록한다. selector가 감지할 이벤트는 연결 요쳥에해당하는 OP_ACCEPT
                System.out.println("접속 대기중");

                while(true) {
                    selector.select(); // selector에 등록된 채널에서 변경 사항이 발생했는지 검사. selector에 i/o 이벤트가 발생하지 않으면 스레드는 여기서 블로킹된다. 블로킹을 피하고 싶으면 selectNow 를 사용
                    Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); // selector에 등록된 채널 중 i/o 이벤트가 발생한 채널 목록
                    
                    while(keys.hasNext()) {
                        SelectionKey key = (SelectionKey) keys.next();
                        keys.remove(); // 동일 이벤트 감지 방지
                    

                        if(!key.isValid()) {
                            continue;
                        }

                        if(key.isAcceptable()) { // 연결요청
                            this.acceptOP(key, selector);
                        } else if(key.isReadable()) { // 데이터 수신
                            this.readOP(key);
                        } else if(key.isWritable()) { // 데이터 쓰기가능
                            this.writeOP(key);
                        }
                    }
                }
            } else {
                System.out.println("서버 소켓을 생성하지 못했습니다.");
            }

        } catch(IOException ex) {
            
        }
    }

    private void acceptOP(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); // 연결 요청 이벤트가 발생한 채널은 항상 ServerSocketChannel이므로 이벤트가 발생한 채널을 타입 캐스팅
        SocketChannel socketChannel = serverChannel.accept(); // ServerSocketChannel을 이용하여 클라이언트의 연결을 수락하고 연결된 소켓 채널을 가져옴
        socketChannel.configureBlocking(false); // 연결된 클라이언트 소켓 채널을 논블로킹 모드로 설정

        System.out.println("클라이언트 연결됨 : " + socketChannel.getRemoteAddress());

        keepDataTrack.put(socketChannel, new ArrayList<byte[]>());
        socketChannel.register(selector, SelectionKey.OP_READ); // 클라이언트 소켓 채널을 selector에 등록하여 i/o 이벤트를 감시
    }

    private void readOP(SelectionKey key) {
        try {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            buffer.clear();
            int numRead = -1;
            try {
                numRead = socketChannel.read(buffer);
            } catch (IOException e) {
                System.err.println("데이터 읽기 에러!");
            }

            if(numRead == -1) {
                this.keepDataTrack.remove(socketChannel);
                System.out.println("클라이언트 연결 종료 :" + socketChannel.getRemoteAddress());
                socketChannel.close();
                key.cancel();
                return;
            }
            byte[] data = new byte[numRead];
            System.arraycopy(buffer.array(), 0, data, 0, numRead);
            System.out.println(new String(data, "UTF-8") + " from " + socketChannel.getRemoteAddress());

            doEchoJob(key, data);
        } catch(IOException ex) {

        }
    }

    private void writeOP(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();

        List<byte[]> channelData = keepDataTrack.get(socketChannel);
        Iterator<byte[]> its = channelData.iterator();

        while(its.hasNext()) {
            byte[] it = its.next();
            its.remove();
            socketChannel.write(ByteBuffer.wrap(it));
        }

        key.interestOps(SelectionKey.OP_READ);
    }

    private void doEchoJob(SelectionKey key, byte[] data) {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        List<byte[]> channelData = keepDataTrack.get(socketChannel);
        channelData.add(data);

        key.interestOps(SelectionKey.OP_WRITE);
    }
}

 

TLDR

블로킹 소켓은 read, write, accept 등 입출력 메서드가 호출되면 처리가 완료될 때까지 스레드가 멈추게 되어 다른 처리를 할 수 없다. 이런 동작방식으로 인해 많은 동접을 수용하지 못한다. 동접을 늘리기 위해 스레드 풀 크기를 자바 힙이 허용하는 최대까지 늘리면 가비지 컬랙션이 수행되는 횟수는 줄지만 가비지 컬랙션에 드는 시간이 길어진다.

이런 단점을 개선한 방식이 논블로킹 방식이다. 논블로킹 방식은 읽은 데이터를 바로 소켓에 쓸 수 없고 각 이벤트가 공유하는 데이터 객체를 생성하여 그 객체를 통해 각 소켓 채널로 데이터를 전송한다.

(좌: 블로킹/ 우: 논블로킹)

 

References

  • Java Network 소녀 Netty
  • 소스코드

'web' 카테고리의 다른 글

Netty - Discard, Echo Service #1  (0) 2022.10.05
    'web' 카테고리의 다른 글
    • Netty - Discard, Echo Service #1
    0xSEUNGJU
    0xSEUNGJU

    티스토리툴바