Joinc 팀블로그 리눅스 메뉴얼 정리 Joinc 위키
댓글

Recent Comments

Powered by Disqus
팀블로그 카테고리
  전체 (1105)
   공지사항 (1)
   검색엔진 (21)
   기술동향 (58)
   게임 (2)
   독서 (6)
   리눅스 (12)
   보안 (1)
   사회문제 (22)
   어셈블리 (43)
   영화 (3)
   오픈소스 (10)
   음악 (9)
   인물 (1)
   포인터 (4)
   프로그래머 (23)
   팀블로그 (20)
   테터툴즈 (29)
   C/C++ (152)
   FireFox (11)
   Gimp (2)
   Google (98)
   Java (13)
   Perl (2)
   Pthread (11)
   STL (13)
   TCP/IP (8)
   Tools (31)
   Web2.0 (42)
   Wiki (1)
«   2010/09   »
      1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30    
2007/07/24 00:03

Pthread - RealTime signal을 이용한 Thread Pool의 구현

RTS와 Thread Pool의 결합

윤 상배

dreamyun@yahoo.co.kr

교정 과정
교정 0.82003년 9월 16일 23시
최초 문서작성


1절. 소개

RTS는 기본적으로 고성능 네트워크 애플리케이션을 작성하기 위한 목적으로 사용된다. 이런 이유로 네트워크 애플리케이션의 처리능력을 향상시키기 위해서 사용되는 Thread/Process pool 등과 함께 사용되는 경우가 많다.

이번 강좌에서는 Thread Pool과 RTS를 조합해서 고성능 네트워크 애플리케이션을 작성하는 방법에 대해서 알아보도록 하겠다.

이 문서를 읽기전에 반드시 이전의 RTS문서들을 읽어 보기바란다. 그렇지 않다면 이해하기 힘든 내용이 많을 것이다.


2절. 좀더 효율적인 네트워크 애플리케이션을 위해서

2.1절. RTS와 쓰레드의 조합

RTS와 쓰레드풀의 조합에 대해서 알아보기 전에 그 전단계 과정이라고 볼수 있는 RTS와 쓰레드와의 조합에 대해서 알아보자. 1

일반적인 쓰레드및 프로세스 기반의 네트워크 서버 애플리케이션과 동일한 방법으로 제작된다. 즉 메인 쓰레드에서 socket->bind->listen 의 순서를 따라서 듣기 소켓을 작성하고 accept로 클라이언트의 연결을 대기하고 있다가 클라이언트의 연결이 만들어지면 클라이언트와 데이터 통신을 위한 쓰레드를 생성하는 방법이다.

이들 고전적인 쓰레드/포크방식과의 차이점이라면 데이터 통신을 위해서 RTS를 사용한다는 점이다. RTS를 제대로 적용하기 위해서는 유닉스의 쓰레드에서 작동되는 시그널 매커니즘에 대해서 이해를 하고 있어야 한다.

몇번 이 사이트의 문서를 통해서 간접적으로 언급되었을 건데, 쓰레드는 기본적으로 스택, 쓰레드, 파일, 시그널 등의 자원을 서로 공유하게 된다. 이것은 유닉스 표준 시그널의 확장판인 RTS에도 동일하게 적용되며, 모든 쓰레드에서 공유할 수 있다. 핵심은 프로세스에서 시그널을 받을경우 특정 쓰레드로 시그널을 전달되도록 해야한다는 것이다.

다행히(혹은 당연히) pthread라이브러리에서는 다음과 같은 시그널과 관련된 라이브러리를 제공한다.

#include <pthread.h>
#include <signal.h>

int pthread_sigmask(int how, const sigset_t *newmask, sigset_t *oldmask);
pthread_sigmask(3)를 이용하면 각각의 쓰레드가 어떤 시그널셋(newmask)에 대해서 특정한 행동(how)를 할 수 있도록 만들 수 있다.

또 한가지 신경써야할 점은 각각의 쓰레드가 받아들여야할 RTS가 달라야 한다는 점이다. 모든 생성된 쓰레드가 동일한 SIGRTMIN만을 사용하게 된다면 SIGRTMIN이 발생했을 때 어느 쓰레드로 RTS가 전달되어야 할지 알 수 없을 것이다.

위의 문제는 각 생성되는 쓰레드마다 다른 RTS가 전달되게 하므로써 해결할 수 있다. 리눅스의 경우 SIGRTMIN에서 부터 SIGRTMAX까지 32개의 RTS가 존재하므로 각 쓰레드마다 다른 RTS를 처리하도록 만들어 줄 수 있다. 클라이언트가 연결해서 A쓰레드가 생성되었다면, 클라이언트와의 연결소켓에 이벤트가 발생했을 때 SIGRTMIN+1이 전달되도록 하고, 또다른 클라이언트가 연결되어서 B쓰레드가 생성되었다면 이 연결소켓에 대해서 SIGRTMIN+2가 발생하도록 하는 식이다.

여기에서는 방법론적인 얘기만 하고 실제 코드를 작성하지는 않을 것이다. 어차피 다음장에서 모두 다루어질 내용이기 때문이다.


2.2절. RTS와 Thread Pool의 조합

그럼 이제부터 진정으로 관심있어하는 RTS와 쓰레드 풀과의 조합에 대해서 알아보도록 하겠다. 여기에 있는 내용을 충분히 이해한다면 덤으로 2.1절까지 이해할 수 있게 될 것이다.

여기에서는 RTS와 쓰레드풀을 조합함으로써 얻을 수 있는 장점과 어떤 방식으로 조합이 가능한지에 대해서 설명하도록 하겠다.


2.2.1절. 얻을 수 있는 장점

쓰레드 풀에 대해서는 쓰레드 풀 작성에서 간단하게 언급했었다. 그렇다면 RTS와 쓰레드 풀을 조합해서 사용할 경우 얻을 수 있는 장점에 대해서 우선알아보도록 하자.

  1. 미리 생성된 쓰레드에서 클라이언트를 처리하게 되므로 쓰레드 생성에 대한 비용을 줄일 수 있다. 이러한 기술은 웹서버와 같이 연결과 종료가 빈번한 서버 애플리케이션에 특히 유용할 것이다. 실제 많은 웹서버들은 쓰레드풀(혹은 프로세스 풀)을 이용해서 구현된다.

    이 외에도 빈번한 데이터 교환이 발생하는 게임서버와 같은 곳에도 훌륭하게 응용 될수 있을 것이다.

  2. 특정 소켓에 이벤트가 발생했을 때 별다른 연산없이 해당 소켓을 처리하는 쓰레드에서 이벤트를 (자동적으로) 감지하고 처리 할 수 있도록 도와준다.

    쓰레드 풀 혹은 프로세스 풀 기반의 경우 소켓을 전달하기 위해서 꽤나 복잡한 과정을 거쳐야 하는 것에 비하면 매우 간단하게 소켓이벤트의 처리가 가능하다.

  3. 몇몇의 서버는 프로세스(쓰레드) 풀과 select(2)등을 동시에 사용한다. 대개의 경우 이러한 프로세스(쓰레드)풀과 select(2)의 사용은 상당한 수준의 프로그래밍 능력을 요구한다.

    RTS를 이용할 경우 쓰레드 풀에 있는 각각의 쓰레드에서 여러개의 소켓을 처리하는 과정을 비교접 손쉽게 구현할 수 있으며, select(2)나 poll(2)보다 훨씬 저렴한 비용으로 구현가능하다.

첫번째 장점은 쓰레드 풀을 사용하게 됨으로써 얻는 장점이고, 두번째 세번째 장점은 RTS를 사용하게 됨으로써 얻는 장점들이다.

2.2.2절. 구현 프로시져

다음은 구현을 위한 대략적인 프로세스를 나타낸 슈도코드이다.

int main()
{
accept 쓰레드를 생성한다.
int k=1 ; // 쓰레드 일련번호로 RTS번호를 지정하기 위해서 사용한다.
SIRTMIN에 대한 시그널 마스크 설정

for (지정한 갯수 만큼)
{
1. connect(k) 쓰레드를 생성한다.
k++;
}
}

accept() 쓰레드
{
socket->bind->listen;
만들어진 듣기 소켓에 대하여 RTS반응하도록 한다.
while(1)
{
2. accept()를 통해서 연결이 발생하면 fcntl을 통해서 해당 소켓이
RTS를 발생하도록 한다.
}
}

connect() 쓰레드
{
pthread_sigmask를 이용해서 RTS에 쓰레드가 반응하도록 한다.
반응하는 RTS번호는 쓰레드마다 다르다.
첫번째 생성된 쓰레드는 SIGRTMIN+1, 그다음은 SIGRTMIN+2.. 식이다.
while(1)
{
sigwaitinfo()를 이용해서 RTS를 기다린다.
RTS가 발생하면 소켓지정자를 통해서 클라이언트와 통신한다.
}
}

  1. connect()쓰레드를 생성할때 각 쓰레드는 고유의 일련번호를 가지며 이 일련번호는 쓰레드가 반응할 RTS번호를 할당받기 위해서 사용한다. 예를 들어 첫번째 쓰레드는 1이 인자로 넘어가므로 이 쓰레드는 SIGRTMIN+1에 대해서 반응한다. 두번째 쓰레드는 SIGRTMIN+2에 대해서 반응한다.

  2. 각 쓰레드가 반응해야될 RTS번호에 대해서 지정을 했으므로 이제 accept()쓰레드를 통해서 만들어진 연결 소켓이 적당한 RTS번호로 시그널을 발생시키도록 하면 될것이다. 이작업은 fcntl(2)을 통해서 이루어지며, 만약 fcntl을 이용해서 SIGRTMIN+1시그널을 발생하도록 연결 소켓을 설정한다면, 앞으로 이 소켓에 데이터 이벤트가 발생하면 SIGRTMIN+1이 발생할 것이며 자동적으로 첫번째 connect()쓰레드로 시그널이 전달될 것이다.

  3. connect()로 시그널이 전달된다면, 쓰레드는 sigwaitinfo()를 통해서 RTS정보를 얻어올 수 있으며, 이벤트가 발생한 소켓을 통하여 데이터 통신을 하면 된다. connect()쓰레드가 여러개의 소켓을 관리하고 있다고 하더라도 sigwaitinfo()를 이용해서 이벤트가 발생한 소켓에 대한 정보를 얻어 올 수 있으므로 쉽게 여러개의 소켓관리가 가능하다.

쓰레드 풀을 작성하는 이유는 네트워크 처리를 각각의 쓰레드로 분산시키기 위한 목적이다. 여기에서의 로드밸런싱은 단순히 하나의 쓰레드가 몇개의 클라이언트를 관리하는지를 검사해서 가장 적은 클라이언트를 처리하고 있는 쓰레드에 소켓에 대한 처리를 넘기는 간단한 방식을 사용한다. 이러한 처리를 위해서 각각의 쓰레드가 몇개의 소켓을 처리하고 있는지에 대한 정보를 유지하고 있어야 한다. 다음은 쓰레드 소켓 분배를 위한 자료 구조다.

typedf struct _fd_sig
{
int signum;
int pid;
} fd_sig;

multimap<int, fd_sig> pool_list;
pool_list는 multimap으로 구성된다. key값은 쓰레드에서 처리중인 소켓의 갯수를 나타낸다. value는 fd_sig구조체이다. fd_sig구조체의 멤버변수인 signum은 RTS시그널 번호이며, 동시에 쓰레드를 지정하기 위한 번호로도 사용된다. pid는 파일(소켓)에 이벤트가 발생했을 때 이벤트 통보를 받게될 쓰레드의 pid로 fcntl()을 통해서 시그널을 전달받을 쓰레드를 지정하기 위해서 사용된다. 아시다시피 리눅스에서의 쓰레드는 clone(2)호출을 통한 프로세스개념으로 작동되기 때문에 반드시 각 쓰레드별 pid를 구분해줘야 한다.

참고: 솔라리스 같은 경우 완전한 쓰레드를 지원하므로 모든 쓰레드가 동일한 pid를 가지게 된다 그러므로 굳이 각 쓰레드의 pid를 넘겨줄 필요 없이 필요할때 getpid(2)만 호출하면 된다.

리눅스도 최근 커널 2.4.20 에서는 하나의 pid만으로 생성되는걸 확인했다. 자세한 커널문서를 읽어 보지 않아서 확신할 수는 없지만 리눅스도 clone()을 사용하지 않는 완전한 쓰레드를 지원하는 것으로 보인다. 이러한 최신 리눅스 커널에서의 경우 getpid()만을 이용해도 관계없이 작동 되었다.

그러나 Unix와 리눅스 그리고 리눅스 커널버젼간 호환을 유지하길 원한다면 쓰레드별 PID를 읽어와서 작업하는 것을 추천한다.

fd_sig lfd_sig;
for (i = 0, k=1; i < thread_num; i++, k++)
{
// 변수 k는 쓰레드가 기다릴 시그널의 번호이다.
// (SIGRTMIN+k)형식으로 사용된다.
pthread_create(&th_t, NULL, thread_func, (void *)&k);
lfd_sig.signum = (k);
lfd_sig.pid = pid; // pthread_create를 통해서 생성된 쓰레드의 pid
pool_list.insert(pair<int, fd_sig>(0, lfd_sig));
}
accept()가 발생해서 연결 소켓이 만들어지면 pool_list컨테이너의 첫번째 데이터를 가져온후 해당 key를 +1 증가시켜주고 연결 소켓에 대해서는 fcntl()을 이용해서 SIGRTMIN+fd_sig.signum 시그널을 발생시키도록 세팅하면 된다. 멀티맵은 오름차순으로 정렬이 되므로 우리는 언제나 가장 적은 소켓을 처리하는 쓰레드에 소켓처리를 위임 할것이라는걸 보증할 수 있게된다. 소켓연결이 종료되거나 어떤 이유로 끊겼을 경우에는 k-1을 하면 된다. 소켓이 종료되었을 경우에는 종료된 소켓을 처리하는 컨테이너 멤버데이터를 찾아내야 하므로 pool_list 컨테이너를 순환하면서 fd_sig.signum과 쓰레드번호가 일치하는 멤버를 찾아내야 한다.

참고: 여기에서는 설명의 편의를 위해서 key+1한다고 했는데, 멀티맵에서 키의 값은 변경불가능 하다. 그러므로 실제로 삽입후 삭제 하는 과정이 필요하다.

위의 자료구조는 물론 효율성등을 고려한건 아니다. 단지 편의성만을 고려한 것이니 마음에 들지 않는다면 적당한 자료구조를 만들어서 사용하기 바란다. 보통 쓰레드 풀을 구성한다고 하면 아마도 20개 이상의 쓰레드를 생성해서 사용하는 경우는 매우 드물 것이므로, 단지 배열로 만들어서 비교하는 방식을 사용해도 별 문제는 없을 것이다.


2.2.3절. 실제 구현

이러한 네트워크 프로그램 구현을 위한 여러가지 개발방법들 자체가 "이거다"라고 정해진게 없이 환경과 필요에 따라 달라지기 때문에 단지 예제코드만 달랑 설명해서는 너무 경직될 수가 있다. 이런 이유로 단지 예제 셈플만 보여주는게 아닌 기타 이런 저런 아이디얼한 내용까지 담게 되었다. 지루했더라도 이해해 주길 바라면서 다음의 예제를 분석하고 테스트 해보기 바란다.

예제 : rts_poll.cc

#include <pthread.h>
#include <iostream>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <vector>

#include <netinet/in.h>
#include <arpa/inet.h>

#include <map>

#ifndef __USE_GNU
#define __USE_GNU
#endif
#include <fcntl.h>

using namespace std;

pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t sync_cond = PTHREAD_COND_INITIALIZER;

int gpid;
typedef struct _fd_sig
{
int signum;
int pid;
} fd_sig;

/*
* 1
* 쓰레드당 처리중인 소켓의 정보를 유지하기
* 위한 자료구조.
* key : 처리중인 소켓의 수
* value : 쓰레드(RTS 정보)
*/
multimap<int, fd_sig> pool_list;
multimap<int, fd_sig>::iterator mi;

// RTS overflow가 발생했을 때 실행되는 핸들러
void do_sigio(int signo)
{
printf("SIGIO : RTS signal queue overflow\n");
}

/*
* 기본 시그널 핸들로 초기화 및 등록
* 여기에서는 RTS overflow의 처리를 위한 시그널
* 핸들러를 등록한다.
*/
void init_signal_handler()
{
struct sigaction sigact;
sigemptyset(&sigact.sa_mask);
sigact.sa_flags = SA_SIGINFO;
sigact.sa_restorer = NULL;
sigact.sa_handler = do_sigio;
if (sigaction(SIGIO, &sigact, NULL) < 0)
{
perror("sigaction SIGIO ");
exit(0);
}

return ;
}

/*
* 2
* 인자로 주어지는 소켓지정자 fd가 RTS시그널을
* 발생하도록 설정한다.
* 발생시키는 RTS시그널 번호는 sig_num에 의해서
* 결정된다.
*/
int setup_sigio(int fd, int sig_num, int pid)
{
if (fcntl(fd, F_SETFL, O_RDWR|O_NONBLOCK|O_ASYNC) < 0)
{
perror("fcntl NONBLOCK ");
return -1;
}
if (fcntl(fd, F_SETSIG, SIGRTMIN+sig_num) < 0)
{
perror("fcntl SETSIG ");
return -1;
}

// 인자로 주어진 파일지정자 fd에서 이벤트가 발생할 경우
// pid를 가지는 쓰레드로 RTS SIGRTMIN+sig_num 시그널이 전달된다.
if (fcntl(fd, F_SETOWN, pid) < 0)
{
perror("fcntl SETOWN ");
return -1;
}
return 0;
}

/*
* 3
* 듣기 소켓을 만들고
* 연결을 기다린다.
* 만약 연결이 들어온다면 pool_list자료구조를 통해서
* 가장 적은 소켓을 처리하는 쓰레드를 알아오고
* 그 쓰레드에 해당되는 RTS시그널 번호로 RTS시그널을
* 발생하도록 소켓을 설정한다.
*/
void *accept_listener(void *data)
{
int server_sockfd, cli_sockfd;
fd_sig lfd_sig;
int count;
sigset_t set;
int client_sockfd;
socklen_t clilen;
int ret;

int signum = *((int *)data);
struct sockaddr_in serveraddr, clientaddr;
struct siginfo si;

if ((server_sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("socket error ");
exit(0);
}

bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(1234);

if (bind (server_sockfd, (struct sockaddr *)&serveraddr,
sizeof(serveraddr)) == -1)
{
perror("bind error ");
exit(0);
}

if (listen(server_sockfd, 5) == -1)
{
perror("listen error ");
exit(0);
}
if (setup_sigio(server_sockfd, signum, getpid()) == -1)
{
printf("sigio error\n");
exit(0);
}

sigemptyset(&set);
sigaddset(&set, SIGRTMIN+signum);
sigprocmask(SIG_BLOCK, &set, NULL);

pthread_mutex_lock(&mutex_lock);
pthread_cond_signal(&sync_cond);
pthread_mutex_unlock(&mutex_lock);

while(1)
{
int ret;
ret = sigwaitinfo(&set, &si);
if (ret == SIGRTMIN+signum)
{
if (si.si_fd == server_sockfd)
{
cli_sockfd = accept(server_sockfd,
(struct sockaddr *)&clientaddr,
&clilen);
if(cli_sockfd < 0)
{
printf("Accept error\n");
continue;
}
mi = pool_list.begin();
lfd_sig = mi->second;
count = mi->first+1;

/*
* 연결소켓에 대해서 SIGRTMIN+signum RTS를 발생하도록
* 설정한다.
* fcntl()을 위해서 pid를 넘기는걸 주목하기 바란다.
*/
cout << "Accept " << cli_sockfd << " : "
<< mi->second.signum << " : "
<< mi->second.pid << endl;
setup_sigio(cli_sockfd, mi->second.signum, mi->second.pid);
pool_list.erase(mi);
pool_list.insert(pair<int, fd_sig>(count, lfd_sig));
}
else
{
}
}
}
}

/*
* 클라이언트와 데이터를 주고 받을 쓰레드 함수이다.
* 클라이언트로 부터 읽은 데이터를 반향(echo)한다.
*/
void *jecho(void *rts_num)
{

int signum = *((int *)rts_num);
int ret;
socklen_t clen;
char buf[256];
struct sockaddr_in cname;
int n;

// 쓰레드의 PID를 얻어온다.
gpid = getpid();

sigset_t set;

sigemptyset(&set);
sigaddset(&set, SIGRTMIN+signum);
pthread_sigmask(SIG_BLOCK, &set, NULL);

pthread_mutex_lock(&mutex_lock);
pthread_cond_signal(&sync_cond);
pthread_mutex_unlock(&mutex_lock);

struct siginfo si;
fd_sig lfd_sig;
int count;
int lpid;

while(1)
{
clen = sizeof(cname);
ret = sigwaitinfo(&set, &si);
memset(buf,0x00, 256);
if (ret == SIGRTMIN+signum)
{
if ((n = read (si.si_fd, buf, 255)) <= 0)
{
printf("read error \n");
close(si.si_fd);
mi = pool_list.begin();
while(mi != pool_list.end())
{
if (mi->second.signum == signum)
{
lfd_sig = mi->second;
count = mi->first - 1;
pool_list.erase(mi);
pool_list.insert(pair<int, fd_sig>(count, lfd_sig));
}
*mi++;
}
}
else
{
getsockname(si.si_fd, (struct sockaddr *)&cname, &clen);
printf("%s(%d) : %s", inet_ntoa(cname.sin_addr), signum, buf);
write(si.si_fd, buf, strlen(buf));
}
}
else
{
}
}
}

int main(int argc, char **argv)
{
struct siginfo si;
int status;
int k;
fd_sig lfd_sig;
sigset_t set;
unsigned int i;

if (argc !=2 )
{
printf("Usage : ./rts_th [thread num]\n");
exit(1);
}
int thread_num = atoi(argv[1]);
vector<void *(*)(void *)> thread_list;
vector<pthread_t> tident(thread_num);
pthread_attr_t myattr;
init_signal_handler();

sigemptyset(&set);
sigaddset(&set, SIGRTMIN);
sigprocmask(SIG_BLOCK,&set, NULL);

thread_list.push_back(accept_listener);
for (i = 0; i < thread_num; i++)
{
thread_list.push_back(jecho);
}

/*
* 쓰레드를 생성한다.
* 첫번째 쓰레드는 accept()전용 쓰레드이며
* 이후 생성되는 쓰레드가 클라이언트 통신전용 쓰레드이다.
* 쓰레드를 생성할때 넘어가는 인자 K는 쓰레드가 기다릴
* RTS 시그널 번호이다.
* 각 쓰레드는 SIGRTSMIN+k 번호를 가지는 RTS를 기다리게된다.
*/
for (i = 0, k = 1; i < thread_list.size(); i++, k++)
{
/*
* 메인 쓰레드와 생성되는 쓰레드간에 정확한 데이터
* 전달이 필요하므로 뮤텍스와 조건변수를 이용해서
* 쓰레드 동기화를 시켜준다.
*/
pthread_mutex_lock(&mutex_lock);
pthread_create(&tident[i], NULL, thread_list[i], (void *)&k);

/*
* 쓰레드 자료구조
* 각 쓰레드에서 처리하는 RTS번호와 처리중인 소켓의 갯수를
* 유지한다.
*/
lfd_sig.signum = k;
pthread_cond_wait(&sync_cond, &mutex_lock);
lfd_sig.pid = gpid;
if (i !=0)
pool_list.insert(pair<int, fd_sig>(0, lfd_sig));
pthread_mutex_unlock(&mutex_lock);
}

cout << "Thread Join " << endl;
for (i = 0; i < thread_list.size(); i++)
{
pthread_join(tident[i], (void **)&status);
}
return 1;
}
쓰레드, 프로세스, 시그널에 대한 내용을 이해하고 있다면 이해하기에 어려운 부분이 없을 것이다. 코드는 최소한의 에러처리만 신경썼으며 효율,유지보수 같은 것들 역시 신경쓰지 않았다.


3절. 결론

이상 RTS와 쓰레드풀간의 조합에 대해서 알아보았다. 아마도 꽤 흥미있는 내용이 되었으리라고 생각된다. 이번에 다루었던 주제에 대해서는 아직도 고민해야될 부분이 많이 있으므로 틈틈히 더 효율적이고 깔끔한 방법에 대해서 고민해 보도록 하자..

이 문서는 수정될 수 있습니다. 최신 문서는 Joinc Wiki에서...
:::
2007/07/15 22:39

Pthread - 쓰레드간 메시지 전달및 공유


1 소개

쓰레드 프로그래밍을 할 때 가장 신경쓰이는건 역시 쓰레드동기화와 쓰레드간 메시지 전달과 관련된 문제일 것이다. 또한 쓰레드간 메시지 전달에는 쓰레드 동기화 문제까지 함께 고민해야 한다. 이 문서는 다중쓰레드에서 쓰레드간 메시지를 효과적으로 전달하기 위한 다양한 방법들을 기술한다.

여기에서 소개하는 방법들은 수많은 방법들 중 몇가지 방법들일 뿐이다. 실제 프로젝트에서는 다양한 응용을 생각해야 할 것이다.

2 시나리오

영어문서를 파싱해서 Term을 얻어오고, 출현한 Term의 빈도수를 계수하는 프로그램을 만들도록 하겠다. 빠른 파싱을 위해서, 문서가 주어지면 문서를 라인수를 기준으로 4등분 한다음, 4개의 쓰레드를 돌려서 병렬로 처리하도록 할 것이다. 이 프로그램은 다음의 사항을 만족시켜야 한다.
  1. Main 쓰레드는 문서를 4등분한다음 생성된 work thread갯수를 파악한다음
  2. 파일을 open()한 후, 4개의 쓰레드에게 읽어야될 파일지정자와 파싱할 줄의 범위를 알려준다.
  3. 파일지정자와 범위를 전달받은 work thread는 해당범위의 문장을 분석해서 <Term, count> 자료구조를 만든다.
  4. 모든 작업이 끝났다면, Main Thread에게 작업이 끝났음을 알려준다. 이때, 자신이 작업한 결과에 대한 정보를 Main Thread에게 알려줘야 한다.
  5. 모든 work Thread에서 작업종료 메시지를 받았다면, Main Thread는 각 work Thread의 <Term, count>를 취합해서, 하나의 파일로 만든다.

3 구현방안

다음은 문서를 파싱해서 <Term, count>를 얻어오는 프로그램이다. 단일 쓰레드로 작동하는 프로토타입의 프로그램으로 아래의 코드를 멀티쓰레드방식으로 수정할 것이다.

#include <sys/types.h>
#include <regex.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <vector>
#include <string>
#include <iostream>

#include <fcntl.h>

using namespace std;

/*
* 주어진 문서에서 단어를 얻어온다.
* 입력된 라인은 strtok를 통해서 토큰으로 먼저 분리되고
* 분리된 문자열은 정규표현을 만족하면 Term으로 판단하고 출력한다.
* 실행인자 : 정규표현식
* 사 용 예 :
*/

int main(int argc, char **argv)
{
FILE *fp;
int rtv;
regex_t preg;
char linebuf[1024];
char *tr;
char seps[] = " -.,()\";:{}'+@/<>[]|!?#";
char msg[64];
int i;
vector<string> FileList;

// 파싱할 파일을 등록한다.
FileList.push_back("rfc.dat");

if (argc != 2)
{
printf("Usage : %s [pattern]\n", argv[0]);
return 1;
}

// 정규표현을 위한 컴파일러 생성
rtv = regcomp(&preg, argv[1], REG_EXTENDED|REG_NOSUB);
if(rtv != 0)
{
regerror(rtv, &preg, NULL, 0);
return 1;
}

for(int i = 0; i < FileList.size(); i++)
{
fp = fopen(FileList[i].c_str(), "r");
if (fp == NULL)
{
perror("Error ");
return 1;
}

while(fgets(linebuf, 1024, fp) != NULL)
{
linebuf[strlen(linebuf)-1] = '\0';

// 토큰으로 구분한다음
tr = strtok(linebuf, seps);
while(tr != NULL)
{
tr = strtok(NULL, seps);
if (tr != NULL)
{
// 정규표현을 만족하는지 확인한다.
if (regexec(&preg, tr, 0, NULL, 0) == 0)
{
cout << "Find Term : " << tr << endl;
}
}
}
}
fclose(fp);
}
}
다음과 같이 실행하면 된다.
# ./getterm "[a-zA-Z0-9]"

3.1 메시지큐 구현

메시지큐 구현에 대한 아이디어는 다음과 같다.
  • worker Thread가 하나의 Term을 얻어오면, 해당 Term을 메시지 형태로 Main Thread에게 바로 전달한다.
  • Main Thread는 <Term, count>자료구조를 유지하면서, 계수를 한다.
  • 메시지큐의 ID는 하나로 통일한다.
  • 메시지큐를 이용한 메시지의 전달 가능성 확인을 목적으로 한다.(병렬성 효율등은 부차적 문제로 언급한다)

messagequeue.png

3.1.1 worker Thread 관리

주고 받는 데이터에 타입을 두어서 관리하도록 할 것이다.
struct Data
{
int type;
char *Data;
int size;
};
Type는 다음과 같이 정의 할 것이다.
1 << 1 일반 데이터
1 << 2 제어 데이터
  • 일반데이터라면 Data를 파싱된 Term으로 인식해서 처리한다.
  • 제어데이터라면 Data를 제어 명령이라고 인식해서 처리한다. Data가 1 이라면, 모든 일을 끝냈다고 판단한다.

이 방식은 구현이 단순하긴 하지만 worker thread -> Main thread로의 단방향 데이터 전송만 가능하다는 단점이 있다. 메시지큐를 하나더 만드는등 다양한 방법이 있을 수 있는데, 이는 뒤에서(몇가지만) 다루도록 하겠다.

3.1.2 쓰레드 동기화

쓰레드간 동기화는 mutex 잠금과 조건변수를 이용할 것이다.

3.1.3 프로시져

대충 그림을 그려야 코드가 만들어 지는 스타일이라서...
main
{
생성 쓰레드 갯수는 4개로 한다.
문서를 읽어들여서 문서의 Line수를 계수한다.
메시지큐를 생성한다.
for(i = 0; i < 4; i++)
{
// 쓰레드 동기화 Start
Worker 쓰레드를 생성한다. 인자로 문서의 Offset정보를 넘긴다.
쓰레드 함수명은 WThread로 한다.
// 쓰레드 동기화 End
}
메인 쓰레드를 수행한다.
while(1)
{
메시지 큐로부터 데이터를 읽는다.
switch(데이터 타입)
case 일반데이터
{
Term을 계수한다.
}
case 제어데이터
{
종료루틴을 수행한다.
모든 쓰레드가 작업을 종료했다면, Break;
}
}
<Term,Count>결과를 출력한다.
}

3.1.4 코드 구현

<!> 미완성 코드다. g++ 로 컴파일 하면, 메시지큐를 통해서 worker thread에서 main thread로 분석된 Term이 전달되는걸 확인할 수 있다. 메시지큐를 통한 데이터 통신의 기본적인 구현은 끝났다고 볼 수 있다. 코드를 좀더 깔끔하게 하고, 몇 군데 예외처리를 해주어야 한다.
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/stat.h>
#include <regex.h>

#include <fcntl.h>
#include <string.h>

#define ThreadNum 4

pthread_mutex_t mutex_lock;
pthread_cond_t sync_cond;

pthread_mutex_t gmutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t gcond = PTHREAD_COND_INITIALIZER;

const char seps[] = " -.,()\";:{}'+@/<>[]|!?#\n"; // Token

// 주고 받을 데이터
struct Data
{
int queuenum;
int msgtype;
int tid;
char msg[256];
int size;
key_t key_id;
};

// worker 쓰레드에 넘겨줄 정보
struct DocInfo
{
int tnum; // thread id
int start; // 문서시작 위치
int end; // 문서 끝 위치
char fname[80]; // 작업 파일명
char regex[24]; // 정규표현 문자열
};

/*
* worker 쓰레드 함수
*/
void *Tfunction(void *data)
{
struct Data lData;
struct DocInfo *lDocInfo;
FILE *fp;
char line[256];
int rtv;
int bsize;
int readn = 0;
regex_t preg;
char *tr;

lData = *(struct Data *)(data);
lDocInfo = (struct DocInfo *)lData.msg;

bsize = lDocInfo->end - lDocInfo->start;
printf("DEBUG Thread %d %d\n", lDocInfo->tnum, bsize);
if((fp = fopen(lDocInfo->fname, "r")) == NULL)
{
perror("Fopen Error");
exit(0);
}

lseek(fileno(fp), lDocInfo->start, SEEK_SET);
rtv = regcomp(&preg, lDocInfo->regex, REG_EXTENDED|REG_NOSUB);

pthread_mutex_lock(&mutex_lock);
pthread_cond_signal(&sync_cond);
pthread_mutex_unlock(&mutex_lock);

if (rtv != 0)
{
regerror(rtv, &preg, lDocInfo->regex, REG_EXTENDED|REG_NOSUB);
exit(0);
}
for (readn = 0; readn < bsize; )
{
if(fgets(line, 256, fp) == NULL)
break;
tr = strtok(line, seps);
if (tr != NULL)
{
if (regexec(&preg, tr, 0, NULL, 0) == 0)
{
lData.queuenum = 1;
lData.msgtype = 1 << 1;
lData.size = strlen(tr);
sprintf(lData.msg, "%s", tr);

if(msgsnd(lData.key_id, (void *)&lData, sizeof(lData), IPC_NOWAIT) <0)
{
perror("msg snd error");
exit(0);
}
}
}
readn += strlen(line);
sleep(1);
}
// 종료 메시지를 보낸다.
}


int main(int argc, char **argv)
{
struct Data lData;
struct DocInfo lDocInfo;

pthread_t p_thread[ThreadNum];

int fd;
char *fname;
int fsize = 0;
int blocksize = 0;
struct stat fileinfo;
char *regex;

// Message queue
key_t key_id;
int msgtype;

fname = argv[1];
regex = argv[2];

// 메시지큐 생성
key_id = msgget((key_t)8888, IPC_CREAT|0666);
if (key_id == -1)
{
perror("msgget error : ");
exit(0);
}

// Open File
fd = open(fname, O_RDONLY);
if (fd < 0)
{
perror("Open file error");
return 1;
}
if(fstat(fd, &fileinfo)< 0)
{
return 1;
}
fsize = fileinfo.st_size;
printf("T Size is %d\n", fsize);

blocksize = fsize / ThreadNum;

for (int i = 0; i < ThreadNum; i++)
{
lDocInfo.start = (i*blocksize);
lDocInfo.end = lDocInfo.start+blocksize;
sprintf(lDocInfo.fname, "%s", fname);
sprintf(lDocInfo.regex, "%s", regex);
lDocInfo.tnum = i;

if (i == (ThreadNum -1))
lDocInfo.end += fsize%ThreadNum;
lData.queuenum = 1;
lData.msgtype = 1 << 2;
lData.size = sizeof(lDocInfo);
lData.key_id = key_id;
lData.tid = lDocInfo.tnum;
memcpy((void *)lData.msg, (void *)&lDocInfo, sizeof(lDocInfo));

pthread_mutex_lock(&mutex_lock);
pthread_create(&p_thread[i], NULL, Tfunction, (void *)&lData);
pthread_cond_wait(&sync_cond, &mutex_lock);
pthread_mutex_unlock(&mutex_lock);
}

while(1)
{
if(msgrcv(key_id, (void *)&lData, sizeof(lData), (1 >> 1), 0) == -1)
{
perror("msgrcv error: ");
}
printf("Get Term %d %s\n", lData.tid, lData.msg);
}
}

4 공유메모리 구현

공유메모리는 커널에 메모리 공간을 할당함으로써, 시스템 전역적으로 자원을 사용할 수 있도록 지원하는 IPC 설비중 하나다. 이러한 특성을 이용 쓰레드간 메시지 교환은 공유메모리상에 환형큐를 만드는 것으로 구현가능할 수 있을 것이다.

이 환형큐에는 여러개의 쓰레드가 접근을 하게 될 것이므로, 쓰레드간 동기화가 필수 적일건데, mutex가 아닌 레코드 잠금으로도 쓰레드간 접근제어를 할 수 있다. 레코드 잠금을 통해서 쓰레드동기화를 제어하는 방법은 공유메모리와 파일잠금을 이용한 프로세스간 데이터 공유에서 이미 다룬바가 있다.

쓸만한 예제라고 생각은 하지만 이번 구현에 그대로 적용하기에는 성격이 다르다. 이번 구현은 여러개의 생산자와 하나의 소비자가 존재하는 형식이기 때문이다. 뭐 그렇다고 크게 문제될건 없다. 아래와 같이 생산/소비자 방식을 약간 수정하는 것으로 해결책을 삼았다.

thread.png

쓰레드간 동기화는 그대로 파일의 레코드 잠금을 이용할 것이다. 이 경우 다수의 생산자를 제어해야 하는데, 처음 8 byte를 생산자간 접근제어를 위한 잠금으로 사용할 것이다. 처음 레코드에 대한 생산자만이 레코드에 큐(빨간색)에 쓸권한을 가지게 된다.

데이터 쓰기 권한을 얻은 쓰레드는 공유메모리에 데이터를 쓰고, 해당 공유메모리에 대응되는 레코드에 잠금을 풀게된다. 그러면 소비자는 잠금을 얻고, 잠금에 대응되는 공유메모리를 찾아가서 정보를 읽어오면 된다. 예제로 제시한 문서를 이해하고 있다면, 위의 방식으로 수정하는건 그리 어렵지 않을 것이니, 굳이 코드를 만들진 않도록 하겠다.

이 문서는 수정될 수 있습니다. 최신문서는 Joinc Wiki에서.
:::
2007/07/15 01:43

Pthread - 쓰레드 풀 사용하기

Thread Pooling

윤 상배

dreamyun@yahoo.co.kr

교정 과정
교정 0.72003년 1월 24일 14시
Thread Pool 구성도 이미지 추가,문서 히스토리 추가


1절. Thread Pooling

1.1절. Thread Pooling 이란

pool 의 사전적인 뜻을 찾아보면 연못, 저수지, 수영장 풀 등 "무엇을 담아놓는" 의 뜻을 가진다. 이대로 해석하자면 Thread Pooling 이란 쓰레드를 담아 놓는 용기(메모리가 될것이다) 를 뜻하며, 프로그래밍 측면에서 해석하자면, "미리 쓰레드를 할당시켜 놓는기법" 을 뜻한다.

그렇다면 쓰레드를 미리 할당시켜 놓는 이유에 대해서 생각해보자, 지금까지 이 사이트에서 다루었던 쓰레드프로그래밍 기법은 기본적으로 fork(2) 방식과 매우 비슷하며, 쓰레드를 생성시켜야 될 필요가 있을때 pthread_create(3)등의 함수를 이용하여 새로운 작업쓰레드를 생성시키는 방식을 사용했다. 보통 쓰레드프로그래밍은 네트웍 프로그래밍시 주로 사용됨으로 accept(2) 로 연결을 기다리다가 연결이 만들어지면 accept 에서 넘어온 소켓 지시자를 인자로 하는 쓰레드를 생성했다.

이러한 방식 - 요청이 있을때 쓰레드를 생성시키는 - 의 쓰레드 프로그래밍기법은 대부분의 작업을 처리하기에 충분히 효율적이며, 빠르긴하지만 클라이언트로 부터의 연결과 종료가 매우 바쁘게 일어나는 서버의 경우, 계속적으로 쓰레드를 생성하고 종료해야 하는 비용을 무시할수 없게 된다. 쓰레드가 비록 fork()에 비해서 생성과 소멸시에 훨씬 적은 비용을 소모한다고는 하지만, 이건 어디까지나 상대적인 것으로 실상은 꽤 많은 시간과 비용을 소비하는 작업이다. 특히 Linux 에서의 Pthread 의 경우 clone(2)를 이용한 구현임으로 더욱더 많은 비용을 소비하게 된다.

Thread Pooling 은 이러한 반복적인 쓰레드의 생성/소멸에 의한 비효율적인 측면을 없애고자 하는 목적으로 만들어진 프로그래밍 기법이다.


1.1.1절. Thread Pool의 구현방식

개념적으로 보자면 Thread Pool 을 구성하는건 매우 간단하다. 생성하고자 하는 크기만큼 ptread_create() 함수를 돌리면 되기 때문이다.

하지만 이건 어디까지나 개념적인 것으로 대부분의 경우 각각의 쓰레드를 스케쥴링 해주어야 함으로, 때에 따라서는 구현을 위해서 매우 복잡한 프로그래밍 기법을 동원해야 할때도 있다. 간단히 웹 서버를 Thread Pool 로 구현한다고 가정을 해보자 - 보통 웹서버는 HTTP 의 특성상 연결과/종료가 빈번하게 일어 남으로 쓰레드풀을 사용할경우 많은 이익을 얻을수 있다 -, 만약 100 개의 Thread 를 미리 생성시켰고, 각각의 Thread 는 하나의 클라이언트 연결을 처리한다고 가정했을때, main 쓰레드는 accept(2) 를 통해서 클라이언트를 받아들였을때, accept() 로 만들어진 소켓 지정번호를 미리 만들어진 100 개의 쓰레드중 "놀고" 있는 쓰레드에게 넘겨주어야 할것이다. 그러기 위해서는 main 쓰레드에서 각각의 쓰레드 상태를 유지해서 적당한 쓰레드에게 파일지정자를 넘겨줘야 할것이다.

그나마 위의 경우는 하나의 쓰레드가 하나의 연결을 처리함으로 어렵지 않게 구현하겠지만, 만약 100개의 쓰레드가 있고, 거기에 각각의 쓰레드가 10개 씩의 클라이언트 연결을 처리하도록 구성한다면, 거기에다가 적당한 로드밸런싱 기능 까지 포함시키고자 한다면, 구현이 꽤 복잡해 질수도 있다.

그림 1. Thread Pool 구성도

위는 Thread Pool 의 대략적인 구현상태를 그림? 으로 나타낸 것이다. Thread Pool 에 들어있는 각각의 쓰레드를 관리하기 위해서는 필수적으로 각각의 쓰레드의 상태를 가지고 있는 Schedul 자료구조 를 가지고 있어야한다. 그래야만 MAIN THREAD 에서 쓰레드 상태를 확인해서 적당한 쓰레드로 작업분배가 가능할것이기 때문이다. - 실제 Linux 커널도 각각의 task 의 스케쥴링을 위해서 task 구조체를 유지한다. -

1.1.2절. 구현 프로세스

이제 구현방식에 대한 밑그림이 나왔으니, 실제로 구현을 위한 프로세스를 만들어 보도록 하자. 프로세스는 슈도코드로 구성을 하도록 하겠다. 네트웍 서버 작성을 기준으로 하겠다.

스케쥴관련 자료구조
{
현재 연결된 클라이언트수
현재 처리해야될 클라이언트 소켓지시자

쓰레드풀에 만들어진 쓰레드 상태 : 쓰레드풀 크기만큼의 배열
{
0 이면 휴식상태
1 이면 작업상태
처리중인 소켓지시자
}
};

main 함수시작
{
아규먼트로 몇개의 쓰레드를 생성할지를 받음
while(쓰레드 생성수만큼)
{
pthread_create 를 이용해서 쓰레드 생성
// 통신쓰레드 함수
{
WAIT:
main 쓰레드가 깨우길 기다린다.
만약 main 쓰레드로 부터 깨움이 있다면
{
스케쥴 자료구조->현재 처리해야될 소켓지시자 를 읽어온다.
스케쥴 자료구조->자신의 상태를 1로 세팅한다.
스케쥴 자료구조->처리중인 소켓지시자를 세팅한다.
while(1)
{
클라이언트와 통신한다.
만약 에러가 발생하면
{
스케쥴 자료구조->처리중인 소켓지시자를 0으로 세팅
스케쥴 자료구조->자신의 상태를 0으로 세팅
스케쥴 자료구조->현재 연결된 클라이언트수 --;
goto WAIT:
}
}
}
}
}

// main 쓰레드
while(1)
{
만약 accept 를 통해서 연결이 발생한다면
{
스케쥴관련 자료구조->현재연결된 클라이언트수가 MAX 를 초과하지 않았다면
{
스케쥴관련 자료구조->현재연결된 클라이언트수 ++;
스케쥴관련 자료구조->현재처리해야될 클라이언트 소켓지시자 = accept();
스케줄관련 자료구조->쓰레드풀에 만들어진 쓰레드상태 가 0인
쓰레드를 찾아서 해당 쓰레드를 깨운다.
}
그렇지 않고 초과했을경우
{
클랑리언트에게 에러메시지를 전송한다.
}
}
}
}
구현은 구현하는 프로그래머가 상황에 따라서 선택하기 나름이긴 하지만 보통은 위의 방법을 기본으로 해서, 약간의 변경을 가하는 정도가 될것이다. 위의 슈도코드를 보면 main 쓰레드에서 accept 를 받으면 휴식상태에 있는 쓰레드를 깨운다고 되어있는데, 이때 깨우기 위해서는 쓰레드 조건변수를 사용하면 될것이다.

그렇다면 스케쥴관련 자료구조는 어떻게 구현하는게 쉬운방법인지 생각해보도록 하자. 구현하는 방법은 프로그래머 맘이겠지만, 필자가 구현하고자 한다면 multimap 을 이용해서 구현할것이다. 이 자료구조는 아마 다음과 같을것이다.

// 쓰레드 정보 구조체
struct ph
{
int sockfd; // 처리중인 소켓지정번호
int index_num; // 쓰레드의 인덱스 번호
};

// 쓰레드 구조체 MAP
multimap<int, struct ph> phinfo;

struct schedul_info
{
int client_num; // 총 연결중인 클라이언트수
int current_sockfd; // 가장최근에 연결된 소켓지정번호
phinfo mphinfo; // 쓰레드 구조체 map
}
멀티맵의 key 는 쓰레드의 활성화 여부로 1 혹은 0이 된다. 그리고 value 는 해당 쓰레드 정보가 될것이다. 이렇게 멀티맵으로 만든이유는 간단하다. 멀티맵은 정렬연관 컨테이너 임으로 key 를 기준으로 자동적으로 정렬이 될것이다. 만약 첫번째 쓰레드가 처리중(1)로 변경되었다면 이 원소는 multimap 의 가장 뒤로 정렬이 될것이다. 그럼으로 우리는 클라이언트의 수가 총연결가능한 클라이언트수(Thread Pool 에 생성된 쓰레드수) 를 초과하지 않는한 phinfo.begin() 으로 가져온 쓰레드는 휴식상태(0) 이라는걸 믿을수 있게 된다. 다시 말해서 복잡해서 쓰레드상태가 0인지 1인지 처음부터 검사할 필요가 없다는 뜻이다.
  1 2 3 4 5 6 7    99 100  : 쓰레드 번호
+-+-+-+-+-+-+-+---+-+-+
|0|0|0|0|0|0|0|...|0|0|
+-+-+-+-+-+-+-+---+-+-+

--> 연결이 들어왔다면
1 2 3 4 5 6 7 99 100 : 쓰레드 번호
+-+-+-+-+-+-+-+---+-+-+
|1|0|0|0|0|0|0|...|0|0|
+-+-+-+-+-+-+-+---+-+-+
| |
+----------->-------+
가장 뒤로 자동으로 sort 됨

--> Sort 후
2 3 4 5 6 7 8 100 1 : 쓰레드 번호
+-+-+-+-+-+-+-+---+-+-+
|0|0|0|0|0|0|0|...|0|1|
+-+-+-+-+-+-+-+---+-+-+

--> 클라이언트가 99개가 접속해 있을경우
+-+-+-+-+-+-+-+---+-+-+
|0|1|1|1|1|1|1|...|1|1|
+-+-+-+-+-+-+-+---+-+-+

그럼으로 begin() 을 사용하게 될경우
언제나 휴식상태에 있는 쓰레드를 가져올수 있음
사실 multimap 을 쓴다면 굳이 "현재 연결된 클라이언트 수" 를 유지하기 위해서 별도의 변수를 둘 필요가 없을것이다. multimap 에서 제공하는 count() 를 이용해서 key 가 "1" 인 요소의 수를 구하면 되기 때문이다. 만약 multimp 의 begin() 값이 1 이라면 MAX 클라이언트가 가득찼다는걸 의미할것이다.

물론 multimap 의 경우 기본적으로 key 값의 수정은 허용하지 않기 때문에 0 을 1로 변경할경우 실제로는 0 을 가지는 요소를 삭제하고, 1을 가지는 새로운 요소를 삽입하는 방식을 취해야 할것이다. 마찬가지로 클라이언트가 종료해서 1을 0으로 변경할때에도 삭제/인서트를 해야할것이다. Value(값) 는 그대로 복사해서 삭제/인서트를 해야 한다.

이 방법이 번거롭다면, 그냥 배열을 쓰거나 혹은 다른 어떤 자료구조를 쓰더라도 전혀 관계없기는 하다. 그건 자기의 기호에 맞게 선택해서 사용하면 될문제이다.



1.2절. 예제

지금까지 Thread POOL 의 구현방법에 대해서 알아봤으니, 간단하게 구현해 보도록 하겠다. 이 코드는 지극히 기능구현에만 신경쓴 코드이다. 에러처리와 몇군데 mutex잠금처리는 각자의 재량에 맡기겠다.

예제 : pool_echo.cc

#include <map>
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <signal.h>
#include <arpa/inet.h>
#include <sys/un.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

// 최대 쓰레드 POOL 크기
#define MAX_THREAD_POOL 256
using namespace std;

// 전역 쓰레드 구조체
typedef struct _ph
{
int sockfd; // 현재 사용중인 소켓 fd
int index_num; // 인덱스 번호
} ph;

// 전역쓰레드 구조체로써
// 현재 쓰레드 상황을 파악함
struct schedul_info
{
int client_num; // 현재 연결된 클라이언트수
int current_sockfd; // 가장최근에 만들어진 소켓지시자
multimap<int, ph> phinfo;
};

// 각 쓰레드별 조건변수
pthread_cond_t *mycond;
// 쓰레드 동기화를 위한 조건변수
pthread_cond_t async_cond = PTHREAD_COND_INITIALIZER;

// 각 쓰레드별 조건변수의 크리티컬세션 지정을 위한
// 뮤텍스
pthread_mutex_t mutex_lock= PTHREAD_MUTEX_INITIALIZER;
// 쓰레드 동기화용 조건변수의 크리티컬세션 지정을 위한
// 뮤텍스
pthread_mutex_t async_mutex = PTHREAD_MUTEX_INITIALIZER;

// 클라이언트와의 통신용 쓰레드
void *thread_func(void *data);
// 현재 클라이언트 상태 모니터용 쓰레드
// 한마디로 디버깅용
void *mon_thread(void *data);

schedul_info s_info;

// 메인 함수
int main(int argc, char **argv)
{
int i;
ph myph;
int status;
int pool_size = atoi(argv[2]);
pthread_t p_thread;
struct sockaddr_in clientaddr, serveraddr;
int server_sockfd;
int client_sockfd;
int client_len;

// 풀사이즈 검사
if ((pool_size < 0) || (pool_size > MAX_THREAD_POOL))
{
cout << "Pool size Error" << endl;
exit(0);
}

// Make Socket
if ((server_sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
perror("error : ");
exit(0);
}

// Bind
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(atoi(argv[1]));

if (bind (server_sockfd, (struct sockaddr *)&serveraddr,
sizeof(serveraddr)) == -1)
{
perror("bind error : ");
exit(0);
}

// Listen
if (listen(server_sockfd, 5) == -1)
{
perror("listen error : ");
exit(0);
}

// 쓰레드 갯수만큼 조건변수 생성
mycond = (pthread_cond_t *)malloc(sizeof(pthread_cond_t)*pool_size);

// 쓰레드 전역변수 초기화
s_info.client_num = 0;
s_info.current_sockfd = 0;

// 쓰레드 POOL 새성
for (i = 0; i < pool_size; i++)
{
memset((void *)&myph, 0x00, sizeof(myph));
myph.index_num = i;
s_info.phinfo.insert(pair<int, ph>(0, myph));

// 조건변수를 이용해서 쓰레드간 동기화를 실시한다.
pthread_mutex_lock(&async_mutex);
if (pthread_create(&p_thread, NULL, thread_func, (void *)&i) < 0)
{
perror("thread Create error : ");
exit(0);
}
pthread_cond_wait(&async_cond, &async_mutex);
pthread_mutex_unlock(&async_mutex);
}

// 디버깅용 쓰레드 생성
pthread_create(&p_thread, NULL, mon_thread, (void *)NULL);

// MAIN THREAD accept wait
client_len = sizeof(clientaddr);

// 클라이언트 ACCEPT 처리를 위한
// MAIN 쓰레드
while(1)
{
multimap<int, ph>::iterator mi;
client_sockfd = accept(server_sockfd, (struct sockaddr *)&clientaddr,
(socklen_t *)&client_len);
if (client_sockfd > 0)
{
// 만약 쓰레드풀이 가득찼다면 클라이언트 연결을
// 종료시킨다.
mi = s_info.phinfo.begin();
if (mi->first == 1)
{
//error message send to client_sockfd
cout << "SOCKET IS FULL" << endl;
close(client_sockfd);
}
// 그렇지 않다면 연결을 받아들이고
// 클라이언트 전역 변수를 세팅한다.
// 세팅후 해당 처리쓰레드에게 시그널을 보내어서
// 처리하게 한다.
else
{
ph tmpph;
int psockfd;
int pindex_num;
s_info.current_sockfd = client_sockfd;

tmpph.sockfd = client_sockfd;
tmpph.index_num = mi->second.index_num;
s_info.phinfo.erase(mi);
s_info.phinfo.insert(pair<int, ph>(1,tmpph));
s_info.client_num ++;
cout << "SEND SIGNAL " << mi->second.index_num << endl;
pthread_cond_signal(&mycond[mi->second.index_num]);
}
}
else
{
cout << "ACCEPT ERROR " << endl;
}
}
pthread_join(p_thread, (void **)status);
}

void *thread_func(void *data)
{
char buf[255];
int mysocket;
int mynum = *((int *)data);
multimap<int, ph>::iterator mi;
// 쓰레드 동기화용 조건변수
pthread_mutex_lock(&async_mutex);
pthread_cond_signal(&async_cond);
pthread_mutex_unlock(&async_mutex);

cout << "Thread create " << mynum << endl;
while(1)
{
// MAIN 쓰레드로 부터 신호를 기다린다.
// 신호가 도착하면 쓰레드 전역변수로 부터
// 현재 처리해야할 소켓지정값을 가져온다.
pthread_mutex_lock(&mutex_lock);
pthread_cond_wait(&mycond[mynum], &mutex_lock);
mysocket = s_info.current_sockfd;
pthread_mutex_unlock(&mutex_lock);
memset(buf, 0x00, 255);

// 데이타를 처리한다.
// 만약 quit 문자열을 만나면
// 쓰레드 전역변수를 세팅한다음 연결종료 한다.
while(1)
{
read(mysocket, buf, 255);
if (strstr(buf, "quit") == NULL)
{
write(mysocket, buf, 255);
}
else
{
mi = s_info.phinfo.begin();
while(mi != s_info.phinfo.end())
{
cout << "search " << mi->second.index_num << endl;
if (mi->second.index_num == mynum)
{
ph tmpph;
tmpph.index_num = mynum;
tmpph.sockfd = 0;
s_info.phinfo.erase(mi);
s_info.phinfo.insert(pair<int, ph>(0, tmpph));
s_info.client_num --;
close(mysocket);
break;
}
mi ++;
}
break;
}
memset(buf, 0x00, 255);
}
}
}

void *mon_thread(void *data)
{
cout << "moniter thread" << endl;
while(1)
{
sleep(10);
multimap<int, ph>::iterator mi;
mi = s_info.phinfo.begin();
cout << "size " << s_info.phinfo.size() << endl;
while(mi != s_info.phinfo.end())
{
cout << mi->first << " : " << mi->second.index_num
<< " : " << mi->second.sockfd << endl;
mi ++;
}
}
}
이 프로그램은 2개의 인자를 받아들이며, 클라이언트의 입력을 되돌려주는 일을한다 (echo 서버). 첫번째 인자는 서비스할 PORT 번호이고, 두번째 인자는 쓰레드 생성갯수이다. 프로그램은 인자의 정보를 이용해서 PORT 를 열고 클라이언트를 받아들인다. 클라이언트가 연결하면, Thread Pool 에 남는 공간이 있는지를 확인하고, 남는 공간이 있다면 클라이언트와 통신하게 된다.

단지 쓰레드를 미리 생성시키고 나서, 이것을 스케쥴링하기 위한 코드가 몇줄 추가되었을 뿐 특별히 복잡한 코드는 아닐거라고 생각된다.


2절. 결론

이상 간단한 쓰레드 풀의 작성요령에 대해서 알아보았다. 위에서 설명했듯이 쓰레드 풀이란 개념적인 요소에 가까움으로 어떻게 구현할지는 상황에 따라서 매우 달라지게 되며, 위의 예제는 그러한 여러가지 상황중 가장 기본적인 상황을 예로 해서 만들어진 것이다. 어쨋든 위의 예제를 충분히 이해한다면 다른 상황으로의 응용역시 별 어려움없을 것이라고 생각된다.

쓰레드 풀은 보통 매우 효율적인 성능을 보장해주는 어플리케이션의 작성을 위해서 사용되어짐으로, 가능한한 빠른 쓰레드간 전환이 가능하도록 고민해서 코딩을 해야 한다. 위의 경우 쓰레드간 전환을 위해서 multimap 을 사용하고 있는데, accept 가 들어왔을경우 해당 클라이언트에 대한 쓰레드 할당은 매우 빠르다고 볼수 있을것이다. 그러나 종료할경우에는 multimap 의 첫번째 원소부터 마지막번 원소까지 search 해야 한다. 이것은 매우 비효율적임으로 개선할 여지가 있다. 가장 간단하게 생각할수 있는 것은 multimap 의 key 값이 1인 원소내에서만 검색하는 것이다. 우리는 쓰레드 풀의 크기와 현재 연결된 클라이언트의 수를 알고 있음으로, multimap 의 몇번째 요소부터 key 값이 1인지를 계산해 낼수 있기 때문이다. 이렇게 할경우 약간의 시간단축효과를 기대할수 있을것이다.

   1 2 3 4 5 6 7 8 9 0 1 2 3 4 5
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|0|0|0|0|0|0|0|0|1|1|1|1|1|1|1|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| |
+-------------+
15 - 8
이 시간단축효과는 연결된 클라이언트의 수가 전체 POOL 사이즈에 비례해서 작을 수록 커질것이다.

나머지 방법은 각자 고민을 해보기 바란다. 아마 전혀 다른 자료구조를 사용할수도 있을것이다.

이 문서는 수정될 수 있습니다. 최신 문서는 Joinc Wiki 에서.
:::
2007/07/13 23:33

Pthread - 쓰레드 종료와 취소

Thread 취소와 종료

윤 상배

yundream@joinc.co.kr

교정 과정
교정 1.12004년 2월 20일 21시
취소상태에 대한 설명오류 수정
교정 1.02004년 2월 17일 21시
취소종류에 대한 내용 추가
교정 0.92004년 2월 17일 20시
취소상태에 대한 잘못된 부분 수정
교정 0.82004년 2월 13일 19시
최초 문서작성


1절. 소개

쓰레드에 대해서 학습을 해본적이 있다면 Thread 취소와 종료에 대한 내용은 뻔한 것 아니냐 라고 생각할 수 있을 것이다. 하지만 이 문서를 읽어 보면 왜 별도의 문서를 만들어서 종료와 취소에 대해서 다루었는지 이해하게 될 것이다.


2절. Thread 취소(cancellation)와 종료

쓰레드는 제어가능한 객체로 필요에 따라 생성시킬 수 있듯이 필요에 따라서 중단 시킬 수도 있다. 이 쓰레드 중단이라는 것이 매우 단순한 행위라고 생각되지만 생각처럼 그렇게 단순한 행위가 아니다. 멀티 쓰레드 프로그램이라면 쓰레드간 동기화를 위해서 조건변수 뮤텍스등 을 사용하고 있을 것이며, 여러가지 공유 자원들 역시 가지고 있을 것이다.

몇명의 인원이 같이 참가해서 진행하는 프로젝트가 있다고 생각해 보자. 그중 한명이 프로젝트에서 빠지면 나머지 인원이 프로젝트를 진행하는데 문제가 생기지 않도록 이런 저런 뒷수습을 해주는게 매우 중요하다. 쓰레드 역시 마찬가지로 중단(종료)시 뒷수습을 해주는 것은 매우 중요한 일이다. 이 문서는 쓰레드 취소에 관련된 내용과 쓰레드 종료시 신경써야될 (자원정리 와 같은)것들에 대해서 알아보도록 한다.


2.1절. Thread 취소

멀티 쓰레드 프로그램에서 특정 쓰레드를 중단 시키고자 할때를 위해서 Pthread는 ptread_cancel()이라는 함수를 제공한다.

int pthread_cancel(pthread_t thread);
이 함수는 인자로 주어진 쓰레드 식별번호 thread를 가지는 쓰레드를 중지시킨다. 명확히 말하자면 쓰레드를 중지 시키는게 아니고 쓰레드에 취소 요청을 하는 것으로 봐야 한다. 취소 요청을 받은 쓰레드가 어떻게 반응 할런지는 요청을 받은 쓰레드의 취소 상태 설정에 의존한다. 취소 요청을 받은 쓰레드는 취소 상태에 의해서 필요한 작업을 한 후 종료 하게 된다. 취소 요청을 받아서 종료하는 쓰레드는 pthread_exit(PTHREAD_CANCELED)를 호출하고 종료한다.

pthread_cancel()에 의해서 취소가 통보된 쓰레드는 쓰레드 취소 상태의 설정에 따라서 취소 요청을 무시할 수도 취소지점(cancellation point) 지점까지 수행한뒤에 종료 될수도 있기 때문이다.

쓰레드 취소와 종료는 엄연히 다르다는 것을 이해하기 바란다. 그렇지 않으면 앞으로 문서의 내용을 읽는데 헛갈릴 수 있다.


2.1.1절. 쓰레드 취소상태의 설정

쓰레드가 pthread_cancel()에 의해서 취소요청을 받았을 때 어떻게 반응할런지를 결정하는 쓰레드 취소상태는 여러가지 방법에 의해서 결정된다. 취소 상태는 pthread_setcancelstat() 함수에 의해 결정한다.

int pthread_setcancelstate(int state, int *oldstate);
첫번째 인자인 state는 새로운 취소상태를 설정하기 위해서 사용된다. 두번째 인자인 oldstate는 이전의 취소상태의 설정값을 받아오기 위해서 사용된다. 이 함수는 다음과 같이 사용할 수 있다.
int old_cancel_state;
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_cancel_state);
만약 이전의 취소상태 설정값이 필요 없다면 old_cancel_state대신 NULL을 사용하면 된다. 위의 사용에서 처럼 쓰레드는 PTHREAD_CANCEL_DISABLE와 PTHREAD_CANCEL_ENABLE 둘중 하나의 취소상태를 가질 수 있다.

만약 PTHREAD_CANCEL_ENABLE 상태라면 쓰레드는 취소요청을 받아들이고 취소지점까지 진행한다음 취소 지점을 벗어나야지 종료한다. ENABLE 상태일 경우 별도로 취소지점까지 진행한다음 종료할 것인지 아니면 바로 종료할 것인지를 pthread_setcanceltype() 를 통해 지정할 수 있다. 만약 PTHREAD_CANCEL_DISABLE 상태라면 취소요청을 받은 후 취소지점까지의 진행을 하지 않고 바로 종료한다.


2.1.2절. 쓰레드 취소종류의 설정

쓰레드 취소상태가 PTHREAD_CANCEL_ENABLE인 경우 취소의 종류를 결정할 수 있다. 취소종류의 결정은 pthread_setcanceltype()을 통해서 이루어진다.

int pthread_setcanceltype(int type, int *oldtype);
취소종류는 type를 통해서 결정된다. PTHREAD_CANCEL_ASYNCHRONOUSPTHREAD_CANCEL_DEFERRED 둘 중 하나를 선택할 수 있다. 전자일 경우 바로 종료하며, 후자의 경우 취소지점을 벗어날 때까지 기다리게 된다. oldtype를 이용해서 이전 취소타입을 얻어 올 수 있다. NULL이라면 받아오지 않는다. 이 함수는 당연하지만 취소상태가 PTHREAD_CANCEL_DISABLE 라면 의미 없는 함수다.


2.1.3절. 취소지점

쓰레드에게 취소요청이 왔다고 해서 무조건 취소해 버리면 문제가 생길 수도 있다. 어떤 일을 처리하고 있는 중에 취소요청이 전달했는데, 별로 중요하지 않는 (무시해도 될만한) 일이라면 중단후 바로 취소해도 되겠지만 중요한 일을 처리하는 중이라면 일을 처리한후 종료 해야 할것이다. 이 마지 노선이 취소지점이다.

취소지점으로 설정될 수 있는 영역은 다음과 같다.

pthread_join(3)
pthread_cond_wait(3)
pthread_cond_timedwait(3)
pthread_testcancel(3)
sem_wait(3)
sigwait(3)
pthread_setcancelstate()함수에 의해서 PTHREAD_CANCEL_ENABLE 상태로 되어 있다면 취소지점을 무시하고 즉시 종료 된다. PTHREAD_CANCEL_DISABLE로 되어 있다면 위의 취소지점을 벗어날 때까지 기다린다. 즉 취소요청을 받은 쓰레드가 pthread_cond_wait()에서 조건변수를 기다리는고 있다면 조건변수 를 받을 때까지 취소를 유보하게 된다.



2.2절. 쓰레드 기본 취소상태와 취소 종류

별다른 설정이 없을 경우 pthread_create()로 만들어 지는 쓰레드는 PTHREAD_CANCEL_ENABLE, PTHREAD_CANCEL_DEFERRED로 상태로 생성된다.


2.3절. 쓰레드 종료시 자원정리

pthread_cancel()등을 통해서 종료 통보를 받은 쓰레드는 종료하기 전에 여러가지 일을 해주어야 할 것이다. 뭐 간단한 쓰레드라면 관계 없겠지만 복잡하게 얽혀 있는 멀티 쓰레드 프로그램이라면 이런 저런 정리해줘야 할 것들이 많을 것이다.


2.3.1절. 쓰레드 종료시 자원해제

쓰레드에서 malloc()등을 호출해서 메모리 공간을 확보했다거나 DB나 파일, 소켓등을 열어서 작업했다면 반드시 이들 자원을 해제시켜줘야 한다. 간단하게 생각하자면 쓰레드 종료시점에서 free(), close(), DB라면 이런 저런 정리를 해주면 될것이다. 그러나 pthrad_cancel()등에 의해서 작업중간에 요청을 받았다면 그리 간단한 문제가 아니다. 쓰레드 마지막까지 루틴을 진행할 수 없기 때문이다. 이럴 경우를 대비해서 pthread_cleanup_push(), pthread_cleanup_pop()와 같은 함수를 제공한다.

이 함수들을 이용해서 쓰레드가 종료할 때 호출해야할 함수를 지정할 수 있다. 그러므로 프로그래머는 이들 함수에 자원해제와 같은 필요한 코드를 넣어두기만 하면 된다. 이들 함수에 대한 자세한 내용은 Pthread_API를 참고하기 바란다.


2.4절. 총정리

지금까지의 내용을 예제를 통해서 정리해보도록 하자.

#include <pthread.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/time.h>

// 쓰레드 종료시 호출될 함수
void clean_up(void *);

// 쓰레드 함수
void *thread_func(void *);

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t lmu = PTHREAD_MUTEX_INITIALIZER;

int main(int argc, char **argv)
{
pthread_t pt;
pthread_create(&pt, NULL, thread_func, NULL);

// 생성된 쓰레드 pt에 취소 요청을 보낸다.
pthread_cancel(pt);

// 5초를 쉰 후에 시그널을 보낸다.
sleep(5);
pthread_cond_signal(&cond);

// join후 종료한다.
pthread_join(pt, NULL);
printf("exit\n");
exit(1);
}

// 쓰레드 종료시 효출될 함수
// 여기에 자원해제루틴을 입력할 수 있을 것이다.
void clean_up(void *arg)
{
printf("Thread cancel Clean_up function\n");
}

void *thread_func(void *arg)
{
// DISABLE 상태다.
// 이경우 쓰레드에 대해서 취소 요청이 들어오면
// 바로 취소된다.
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);

// 쓰레드 종료시 호출될 함수 등록
pthread_cleanup_push(clean_up, (void *)NULL);

while(1)
{
pthread_mutex_lock(&lmu);

printf("THREAD cond wait\n");
pthread_cond_wait(&cond, &lmu);
printf("NO WAIT COND\n");
pthread_mutex_unlock(&lmu);
}

printf("EXIT\n");
pthread_cleanup_pop(0);
}
main 쓰레드는 thread_func를 실행시킨다. thread_func는 pthread_cond_wait()에서 신호가 발생하기를 기다리게 된다. 잠시후 main 쓰레드는 pthread_cancel()을 이용해서 thread_func 쓰레드에 취소 요청을 하게 된다. 위의 경우 thread_func쓰레드는 취소상태가 PTHREAD_CANCEL_DISABLE로 되어 있기 때문에 취소 지점등은 부시하고 바로 종료된다. 이때 pthread_cleanup_push()에 등록된 clean_up함수가 실행되는걸 확인할 수 있을 것이다.

PTHREAD_CANCEL_ENABLE로 되어 있을 경우 취소 요청을 받아들이기 때문에 취소 요청이 들어오더 라도 thread_func가 현재 종료 지점에 머물러 있기 때문에 곧바로 종료하지 않는다. main 쓰레드에서 pthread_cond_signal()를 호출해서 thread_func가 cond_wait상태를 벗어난 후 종료된다. 물론 cleanup 함수도 실행된다.


3절. 참고문헌

  1. pthread API 레퍼런스

  2. mutex와 조건변수

이 문서는 수정될 수 있습니다. 최신문서는 Joinc Wiki에서
:::
2007/07/11 23:57

Pthread : 쓰레드와 시그널

쓰레드와 시그널

윤 상배

dreamyun@yahoo.co.kr

고친 과정
고침 0.92004년 1월 29일 23시
시그널을 이용한 쓰레드 작동/중지 제어
고침 0.82003년 10월 7일 23시
최초 문서작성

1. 쓰레드에서의 시그널 사용

쓰레드에서의 시그널 사용은 시그널에 대한 기본적인 이해만 가지고 있다면 약간의 응용으로 충분히 해결할 수 있는 문제이긴 하지만 범 유닉스적으로 응용하고자 한다면(특히 리눅스가 포함된다면) 운영체제간 신경써줘야할 문제가 있다. 이번장에서는 쓰레드에서의 시그널을 이용하는 방법과 운영체제가 다름으로 인해 발생할 수 있는 문제들에 대해서 알아보도록 하겠다.


1.1. 시그널을 특정 쓰레드로 보내기

쓰레드에서 시그널은 서로 공유된다는걸 알고 있을 것이다. 문제는 공유된다는 점인데 만약 프로세스에 시그널을 보낼 경우 해당 프로세스에서 생성된 모든 쓰레드에 시그널이 전달이 되게 된다. 이것은 우리가 원하는게 아니다.

우리가 원하는 것은 특정 쓰레드에서만 시그널을 받도록 하는 것이다. 이러한 작업을 위해서 우리는 시그널 마스크를 사용한다. 시그널 마스크는 말그대로 특정 시그널에 대해서 마스크를 씌우는 것으로 해당 쓰레드에서 특정 시그널에 대해서 마스크를 씌우면 마스킹된 시그널은 해당 쓰레드로 전달되지 않는다. 이 시그널을 받기를 원하는 쓰레드에서는 이 시그널에 대한 마스크를 제거시킨다. 그러면 블럭되어 있는 시그널은 마스크가 제거된 쓰레드로 전달될 것이다. 일종의 필터기다.

그림 1. 시그널 마스크의 작동원리

위의 그림은 시그널 마스크의 작동원리를 보여준다. 메인 쓰레드에서는 SIGINT와 SIGUSR2에 대해서 시그널 마스크를 설치한다. 그리고 쓰레드 1에서는 SIGINT에 대한 마스크를 제거하고, 쓰레드 2에서는 SIGUSR2에 대한 마스크를 제거한다. 이렇게 되면 SIGINT가 메인 쓰레드에 도착했을 때 마스크 때문에 메인 쓰레드에는 도착하지 못하고 쓰레드 1로 전달될 것이다. SIGUSR2가 도착했을 경우 메인 쓰레드와 쓰레드 1에서는 마스크 때문에 전달되지 못하고 쓰레드 2로 시그널이 전달된다. 1.1.1절에서는 위의 작동원리 대로 구현된 예제 코드를 다루고 있다.

이러한 쓰레드별 시그널 마스킹을 위해서 pthread는 pthread_sigmask(3)라는 함수를 제공한다.

#include <pthread.h>
#include <signal.h>

int pthread_sigmask(int how, const sigset_t *newmask, sigset_t *oldmask);
이 함수는 현재 쓰레드에 시그널newmaskhow 를 이용해서 시그널 마스크를 만든다. how는 SIG_BLOCK, SIG_UNBLOCK, SIG_SETMASK중 하나를 선택할 수 있다. SIG_BLOCK는 현재 설정된 시그널 마스크에 newmask를 추가하며 SIG_UNBLOCK는 현재 설정된 시그널 마스크에서 newmask를 제거하고 SIG_SETMASK는 newmask로 현재 시그널 마스크를 설정한다.


1.1.1. 간단 예제

그럼 pthread_mask(3)를 이용한 간다한 예제를 만들어 보도록 하겠다. 코드는 여러분이 시그널과 쓰레드에 관한 최소한의 지식을 가지고 있다는 가정하에 작성될 것이며, 설명은 주석으로 대신하도록 하겠다.

예제 : th_signal.c

#include <pthread.h>
#include <sys/wait.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>

/*
* 시그널 핸들러
* 핸들러가 호출된 쓰레드의 ID와 시그널 번호를 출력한다.
*/
void sig_handler(int signo)
{
printf("SIGNAL thid %d : %d\n", pthread_self(),signo);
}

void *threadfunc2(void *arg);
void *threadfunc(void *arg);

int main()
{
int n, i, j;
pthread_t threadid;

// SIGINT와 SIGUSR2 시그널을
// 시그널 마스크에 등록한다.
// 시그널 마스크는 모든 쓰레드에서 공유된다.
// 고로 다른 쓰레드에서도
// SIGINT, SIGUSR2시그널에 대해서 마스크 된다.
sigset_t newmask;
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
sigaddset(&newmask, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &newmask, NULL);

// 원하는 쓰레드로 시그널이 전달하는지 확인하기 위해서
// 쓰레드 ID를 확인한다.
if ((n = pthread_create(&threadid, NULL, threadfunc2, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
printf("thread2 id %d\n", threadid);

if ((n = pthread_create(&threadid, NULL, threadfunc, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
printf("thread id %d\n", threadid);

pthread_join(threadid, NULL);
}

void *threadfunc(void *arg)
{
int i=0, j;
struct sigaction act;
sigset_t newmask;

// 결과의 확인을 위해서 쓰레드 ID를 출력한다.
printf("SIGINT Thread Start %d\n", pthread_self());

// SIGINT에 대한 시그널 핸들러를 설치하고
// 시그널 마스크에서 SIGINT를 제거한다.
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
act.sa_handler = sig_handler;
sigaction(SIGINT, &act, NULL);
pthread_sigmask(SIG_UNBLOCK, &newmask, NULL);

while(1)
{
printf("%d\n", i);
i++;
sleep(1);
}
return ;
}

void *threadfunc2(void *arg)
{
struct sigaction act;

// SIGUSR2에 대한 시그널 핸들러를 설치하고
// 시그널 마스크에서 SIGUSR2를 제거한다.
sigset_t newmask;
sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR2);
act.sa_handler = sig_handler;
sigaction(SIGUSR2, &act, NULL);
pthread_sigmask(SIG_UNBLOCK, &newmask, NULL);

while(1)
{
sleep(1);
}
}
위 프로그램을 실행시킨뒤 kill명령으로 SIGINT와 SIGUSR2 시그널을 PID로 보내보면 해당 쓰레드로 시그널이 전달되고 시그널 핸들러가 실행되는걸 확인할 수 있을 것이다.


1.2. 쓰레드간 시그널 전송

외부의 다른 프로세스에서 시그널을 발생시키는 것 외에도 같은 프로세스에서 작동하는 쓰레드간에 시그널을 전송해야 하는 경우도 생길 것이다.

이러한 쓰레드간 시그널 전송은 여러가지 목적으로 사용할 수 있다. 일정시간마다 특정 쓰레드에 시그널을 전송하므로써 쓰레드를 깨워서 코드를 실행시키게 한다거나 네트워크 애플리케이션에서 write, read에 타임아웃을 검사하는 용도로도 사용가능 하다.

네트워크 애플리케이션에서 스레드간 시그널 전달을 통해 타임아웃을 검사한다는 생각은 좀 생소할 수도 있을것 같다. 보통은 select나 alarm을 사용할 건데, 멀티 쓰레드 프로그램의 경우 alarm(2)의 사용은 사실상 어렵다고 볼 수 있다. 여러개의 쓰레드에서 alarm(2)을 사용할 경우 단지 하나의 alarm(마지막 alarm값)만이 등록되어서 사용할 수 있기 때문이다. 그렇다면 select를 사용해야 할 건데, select대신에 전용의 시그널을 발생하는 쓰레드를 이용해서 사용할 수 있다.

read(2)를 예로 들어서 설명해 보자 read(2)를 하기전에 특정 (전역)값을 0으로 세팅하고 read를 수행한후 1로 값을 변경하도록 한다. 그리고 타임아웃 체크를 위한 쓰레드에서는 타임아웃 시간 간격으로(sleep(2)를 이용하면 된다) 이 값을 검사한다. 만약 값이 0으로 세팅되어 있는걸 확인 했는데, 다음 시간이 돌아온 뒤에도 이 값이 0이라면 read영역에서 타임아웃이 발생했다고 판단 할 수 있을 것이다. 그러면 타임아웃이 발생한 쓰레드에 시그널을 전송하도록 한다. 쓰레드에 시그널이 전송하면 인터럽트가 발생하고 read에서 빠져나오게 된다.

if (read(..) < 0)
{
// 만약 인터럽트로 인하여 빠져나온 거라면..
if (errno == EINTR)
{
...
}
}
시그널 발생시 인터럽트가 전달되게 하려면 약간의 부가적인 작업이 필요한데, 이것은 소켓 타임아웃을 참고하기 바란다.


1.2.1. 다른 쓰레드로 시그널 전송

이러한 쓰레드간 시그널 전송을 위해서 pthread_kill(3)이라는 함수가 제공된다.

#include <pthread.h>
#include <signal.h>

int pthread_kill(pthread_t thread, int signo);
첫번째 인자thread는 시그널을 전달받을 쓰레드의 식별자이고 signo는 전달하고자 하는 시그널 번호이다. 보내는 쪽은 pthread_kill(3)을 이용해서 비교적 간단하게 구현이 가능하다.


1.2.2. 시그널 받기

시그널을 받는 쓰레드의 경우 동기와 비동기 두가지 방식을 통해서 받을 수 있다. 동기 방식으로 받을 경우는 sigwait(3)함수를 이용해서 시그널이 전달될 때까지 블럭되면서 기다린다.

#include <pthread.h>
#include <signal.h>

int sigwait(const sigset_t *set, int *sig);
이 함수는 시그널 셋set에 설정된 시그널중 하나가 전달될 때까지 호출된 영역에서 대기한다. 시그널을 받았다면 리턴되고 전달 받은 시그널 번호는 sig를 통해서 넘어온다. 시그널을 기다린다는 특징을 이용해서 쓰레드간 동기화를 위한 목적으로도 유용하게 사용할 수 있을 것이다.

두번째는 비동기적인 방식으로 코드 실행중에 시그널이 전달되면 인터럽트가 걸리고 시그널 핸들러가 수행되는 방식이다. 일반적인 시그널 사용방식과 동일하다.


1.2.3. 예제

sigwait(3)를 통해서 동기적으로 기다리는 것은 구현이 간단하므로 따로 다루지 않고 시그널 핸들러를 등록해서 비동기적으로 시그널을 기다리는 코드를 구현해 보도록 하겠다. 1.1.1절의 코드를 약간 수정했다.

예제 : thtoth_sig.c

#include <pthread.h>
#include <sys/wait.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <signal.h>

/*
* 시그널 핸들러
* 핸들러가 호출된 쓰레드의 ID와 시그널 번호를 출력한다.
*/
void sig_handler(int signo)
{
printf("SIGNAL RECV TH ID %d : %d\n", pthread_self(),signo);
}

void *threadfunc2(void *arg);
void *threadfunc(void *arg);
void *s_signal(void *arg);

// 쓰레드 ID를 저장한다.
int sigid[2];

int main()
{
int n, i, j;
pthread_t threadid;

// 원하는 쓰레드로 시그널이 전달하는지 확인하기 위해서
// 쓰레드 ID를 확인한다.
if ((n = pthread_create(&threadid, NULL, threadfunc2, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
sigid[0] = threadid;
printf("thread2 id %d\n", threadid);

if ((n = pthread_create(&threadid, NULL, threadfunc, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
sigid[1] = threadid;
printf("thread id %d\n", threadid);

if ((n = pthread_create(&threadid, NULL, s_signal, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}

pthread_join(threadid, NULL);
}

void *threadfunc(void *arg)
{
int i=0, j;
struct sigaction act;
sigset_t newmask;

// 결과의 확인을 위해서 쓰레드 ID를 출력한다.
printf("SIGINT Thread Start %d\n", pthread_self());

sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
act.sa_handler = sig_handler;
sigaction(SIGINT, &act, NULL);

while(1)
{
printf("%d\n", i);
i++;
sleep(1);
}
return ;
}

void *threadfunc2(void *arg)
{
struct sigaction act;

sigset_t newmask;
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
act.sa_handler = sig_handler;
sigaction(SIGINT, &ct, NULL);

while(1)
{
sleep(1);
}
}

/*
* SIGINT를 두개의 쓰레드로 서로다른 시간간격으로
* 전달한다.
*/
void *s_signal(void *arg)
{
int i = 1;
while(1)
{
sleep(1);
i++;
if((i % 7) == 0)
{
printf("Send SIGINT %d\n", sigid[0]);
pthread_kill(sigid[0], SIGINT);
}
if((i % 11) == 0)
{
printf("Send SIGINT %d\n", sigid[1]);
pthread_kill(sigid[1], SIGINT);
}
}
}
위의 코드의 경우 시그널을 받을 쓰레드를 명시해줄 수 있으므로 시그널 마스크등을 설치할 필요가 없다. SIGINT가 원하는 쓰레드로 정확하게 전달되는걸 확인할 수 있을 것이다.


1.2.4. 시그널을 이용한 쓰레드 작동 제어

쓰레드 프로그래밍을 하다보면 비동기 적으로 특정 쓰레드를 중단 시켜야 되는 경우가 발생한다. 물론 임의의 시점에서 중단된 쓰레드를 다시 작동하도록 만들어 주어야 할것이다.

다른 우회적인 몇가지 구현 방법이 있겠지만 비동기적인 처리를 위해서는 역시 시그널만한게 없는 것 같다.

#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <signal.h>
#include <stdio.h>

pthread_t thread_t[3];

void sig_handler(int signo)
{
printf("Thread Stop %d:\n", (int)pthread_self());
sleep(100);
}

void null(int signo)
{
printf("Thread Start\n");
}

void *test(void *data)
{
sigset_t newmask;
struct sigaction act, act2;
int i = 0;

sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR1);
sigaddset(&newmask, SIGCONT);
act.sa_handler = sig_handler;
act2.sa_handler = null;

sigaction(SIGUSR1, &act, NULL);
sigaction(SIGCONT, &act2, NULL);

pthread_sigmask(SIG_UNBLOCK, &newmask, NULL);

while(1)
{
printf("Im child Thread %d %d\n", (int)pthread_self(),i);
i++;
sleep(1);
}
}

void *worker(void *data)
{

sigset_t newmask;
sigemptyset(&newmask);
sigaddset(&newmask, SIGUSR1);
sigaddset(&newmask, SIGCONT);

pthread_sigmask(SIG_BLOCK, &newmask, NULL);
while(1)
{
sleep(2);
pthread_kill(thread_t[0], SIGUSR1);
sleep(3);
pthread_kill(thread_t[0], SIGCONT);
}
}

int main()
{
pthread_create(&thread_t[0], NULL, test, NULL);
pthread_create(&thread_t[1], NULL, test, NULL);
pthread_create(&thread_t[2], NULL, worker, NULL);

pthread_join(thread_t[0], NULL);
pthread_join(thread_t[1], NULL);
return 1;
}


1.3. 운영체제별 차이점

쓰레드의 작동방식은 운영체제별로 많은 차이를 보여줄 수 있으며, 차이점에 유의해서 프로그램을 작성해야 한다. 여기에서는 솔라리스와 리눅스를 비교해서 설명하도록 하겠다.

지금까지의 쓰레드와 시그널에 대해서 다루었던 내용은 솔라리스와 같이 하나의 프로세스에서 다중의 쓰레드를 관리하는 경우를 기준으로 했다. 그러나 리눅스의 경우 clone(2)를 통한 다중 프로세스형태로 쓰레드가 생성된다. 때문에 ps를 이용해서 확인할 경우 다중 쓰레드 프로세스임에도 불구하고 각각의 PID를 가지는 프로세스로 쓰레드가 생성되는걸 확인 할 수 있다.

이런 특징 때문에 리눅스 시스템에서 외부 프로세스에서 시그널을 특정 쓰레드로 보낼 경우에는 메인 쓰레드가 아닌 해당 쓰레드의 PID를 명시해 주어야 한다.


이 문서는 수정될 수 있습니다. 최신 문서는 Joinc Wiki 에서 확인하세요.
:::
2007/06/14 10:30

pthread : 쓰레드 우선순위 문제

이 문서는 수정될 수 있습니다. 이글의 최신문서는 Joinc Wiki에서 확인하실 수 있습니다.

쓰레드 우선순위 문제 해결

윤 상배

dreamyun@yahoo.co.kr

교정 과정
교정 0.82003년 8월 3일 18시
최초 문서작성

차례
1절. 소개
2절. 문제 발생
3절. 문제 해결
4절. 결론

1절. 소개

프로그래밍을 하다보면 "분명 이러한 상황에서는 이러한 문제가 발생하지 않아야 하는데", "코드 상으론 아무런 문제가 없는데" 프로그램의 실행에 문제가 생기는 경우가 있다.

pthread를 이용해서 쓰레드 프로그래밍을 할때도 이런 일이 간혹 발생한다. 이중 "쓰레드 우선순위"에 따라서 발생하는 문제점에 대해서 알아보겠다. 이 문제는 주의해서 프로그래밍 하지 않을 경우 간혹 발생하기도 한다.

이 글은 여러분이 pthread를 이용한 쓰레드 프로그래밍에 대한 기본적인 이해를 가지고 있다는 가정하에 씌여져 있다. 예제로 제시된 코드의 테스트는 리눅스(kernel-2.4.x)와 솔라리스 5.7환경에서 이루어 졌다.


2절. 문제 발생

다음은 이번 테스트에 사용될 예제코드다.

예제 : thread_test.c

#include <pthread.h>
#include <sys/wait.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>

void *threadfunc(void *arg);

main()
{
int n, i, j;
pthread_t threadid;

if ((n = pthread_create(&threadid, NULL, threadfunc, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
printf("Main Thread START\n");
for (i = 0; i < 100000000; i++)
{
j = i*5;
}
printf("Main Thread END\n");
pthread_join(threadid, NULL);
}

void *threadfunc(void *arg)
{
int i, j;
printf("Thread Start\n");

for (i = 0; i < 100000000; i++)
{
j = i*5;
}

printf("Thread:end\n");
return ;
}
코드는 최소한의 테스트가 가능한 수준에서 작성되었다. 위의 코드는 언뜻 보기에 아무 문제가 없을 것 같고, 실제 특정 플랫폼에서는 문제 없이 작동하지만 플랫폼을 이동했을 경우 상당히 심각한 문제가 발생한다.

위의 코드를 리눅스에서 컴파일 시켜서 실행 시켜보면 아래와 같은 출력 결과 물을 보이면서 정상적으로 작동하는걸 확인 할 수 있을 것이다.

[root@localhost test]# ./thread_test 
Main Thread START
Thread Start
Main Thread END
Thread:end
그러나 위의 예제를 솔라리스에서 컴파일 시키면 다음과 같은 결과물을 보여준다.
[root@sun5.7 /]$ ./thread_test 
Main Thread Start
Main Thread END
Thread Start
Thread:end
리눅스에서 처럼 2개의 쓰레드가 동시에(정확히는 동시가 아니지만 이해하기 쉽게)수행 되는게 아니고 메인 쓰레드가 끝날 때까지 쓰레드 생성이 되지 않음을 알 수 있다. 코드에 따라서는 특정 쓰레드가 영원히 실행되지 않는 문제가 발생할 수도 있다. 리눅스에서 개발되는 많은 프로그램들이 다른 유닉스로도 배포 되는 것을 감안한다면 이는 매우 심각한 문제이다.

솔라리스에서 이러한 문제가 발생하는 이유는 쓰레드라 할지라도 개별적인 프로세스로 실행되는 리눅스와 달리 하나의 LWP를 나눠서 사용하기 때문에 하나의 쓰레드에서 매우 바쁘게 작동할경우 다음 쓰레드 생성을 위한 스케쥴링에 문제가 생기기 때문이다.

작은 정보: 리눅스에서의 쓰레드는 fork()와 유사한 clone()호출을 통한 개별 프로세스로 생성된다. 아직까진(2.4.x) 완전한 쓰레드라고 볼 수 없으며 완전한 쓰레드의 지원은 아마도 커널 2.6.x를 기다려봐야 할것 같다.


3절. 문제 해결

비록 위의 문제가 리눅스에서 발생하진 않지만 많은 리눅스에서 개발되는 프로그램이 솔라리스등의 다른 유닉스로 포팅되고 있다는 것을 감안한다면 반드시 잡아줘야 할 문제다.

문제의 해결방법은 비교적 간단한데, sleep(3)계열의 함수를 이용해서 쓰레드와 쓰레드 사이에 약간의 시간간격을 두어서 쓰레드 스케쥴링을 할 수 있도록 시간을 벌여주면 된다. 다음은 문제를 해결한 코드이다.

  if ((n = pthread_create(&threadid, NULL, threadfunc, NULL)) != 0 )
{
perror("Thread create error ");
exit(0);
}
usleep(100);
printf("Main Thread START\n");

4절. 결론

어떻게 보면 팁수준의 매우 간단한 내용인것 같지만 이런 문제일수록 문제의 원인이 애매 모호해서 해결책을 찾기 어려운 경우가 많다. 이런 경우도 마찬가지다. 아마도 pthread 라이브러리의 (그리 심각하지 않은?)문제라고 생각되는데 막상 문제가 발생했을 때는 정말 사람을 짜증나게 할 수 있는 문제다.

:::
2007/06/10 11:42

Pthread : 쓰레드 동기화, 쓰레드모드에 대해서

Pthread(3) - 더 깊이

윤 상배

dreamyun@yahoo.co.kr



1절. 소개

그동안 Pthread(1) 과 Pthread(2) 를 통해서 pthread 에 대한 기본적인 개요와 사용방법을 알아 보았다. 이번문서는 pthread 에 대한 좀더 깊이있는 내용에 대해서 알아보도록 하겠다.

이 문서에서는 그동안의 Pthread 문서에서 다루지 못한 몇가지 이슈들과 다중 쓰레드 프로그래밍시 필요로하는 Condition variables 를 통한 좀더 정교한 쓰레드간 동기화 방법에 대해서 다루게 된다.


2절. 쓰레드 종류

2.1절. User Space 과 Kernel Space 쓰레드

Pthread 는 Posix Thread 로 Posix 에서 표준으로 규격화한 쓰레드 라이브러리이며, User Space 에서 작동하는 쓰레드 라이브러리이다. 즉 쓰레드 관리를 위해서 Kernel 이 관여를하지 않고 쓰레드 관련 라이브러리에서 대부분의 쓰레드 관리를 처리하게 된다. Kernel 이 직접 쓰레드를 관리하는 방식을 Kernel Space 쓰레드 라고 하는데, User Space 쓰레드 보다 비용이 좀더 많이 드는 관계로 최근에는 User Space 라이브러리를 좀더 선호하는 편이다.

Kernel Space 이 User Space 쓰레드 보다 비용이 더 많이 든다는데 의아해 할수도 있을것이다. 일반적으로 생각했을때 Kernel 은 좀더 시스템에 가까우므로 좀더 비용이 적고 빠를것 이라고 생각할수 있지만 실은 그렇지 않다. 보통 응용프로그램은 2가지 레벨에서 작동하게 된다. 하나는 Kernel Space 모드 이고 다른 하나는 User Space 모드이다. 이렇게 2가지 모드로 나누는 이유는 일반응용 프로그램이 시스템에 직접 접근하는 일을 막기 위함으로, 일반 응용 프로그램이 시스템에 접근하기 위해서는 System Call 을 불러서 Kernel 에게 시스템자원을 사용할것을 요청하는 방식으로 간접 접근해야 한다. 이때 커널은 보호모드 상태에서 System Call 을 처리해야 하므로 여러가지 연산을 가지게 되된다. 최종적으로 User 모드에서 Kernel 모드로 전환하는데 드는비용, 보호모드를 위한 여러가지 연산을 하는데 드는 비용등 3가지의 비용을 소비하게 된다.

반면 User Level 쓰레드는 대부분의 작업을 system call 을 사용하지 않고 사용자라이브러리 에서 대부분 처리하므로 Kernel Level 쓰레드가 가지는 비용을 줄일수 있다. 단점이라면 Kernel Level 쓰레드가 각 쓰레드를 보호해주는 반면 유저레벨 쓰레드는 보호를 받을수 없다는 점이다. 그러므로 유저레벨쓰레드 사용시에는 각 쓰레드를 보호하기 위한 여러가지 동기화 장치등을 사용해야 하며, 이것은 때때로 세밀하고 복잡한 코딩 기법을 요구한다.

어쨋든 현재 Unix 쪽은 Pthread 를 표준으로 가져가는것 같다.


2.1.1절. User Space쓰레드

쓰레드를 유지하기 위해서는 여러가지 정보를 필요로 한다. 이러한 정보는 "프로그램 코드", Data, Stack, File I/O, Signal 테이블 등 크게 5가지이다. User Space 쓰레드는 이러한 정보를 커널에서 관리하도록 하지 않고 쓰레드 라이브러리에서 직접 처리하게 된다. 또한 각 쓰레드간 협력이 가능하도록 switch 기능을 제공하는데, 커널쓰레드보다 일반적으로 빠른 switch 기능을 제공한다.

단점은 SMPs(Symmetric MultiProcessor system)의 장점들을 이용할수 없다는 것이다. 즉 dual 혹은 quad 의 cpu를 사용하고 있는 시스템이라 할지라도, User Space 쓰레드로는 진정한 멀티쓰레드 프로그래밍을 할수 없다는 것이다. 오직 하나의 CPU에서만 작동할수 있을뿐이다. 그러므로 설사 CPU가 2개 혹은 4개 있다고 하더라도, 수행속도차이는 그리 크지 않게 된다. 또한 하나의 쓰레드에서 I/O 봉쇄가 일어나게 되면 다른 모든 쓰레드까지 봉쇄되게 된다.

물론 이러한 문제들을 위한 해법역시 존재한다. I/O 봉쇄를 아예 라이브러리 차원에서 해결할수 있도록 도와주는 쓰레드 라이브러리도 있으며, I/O 작업을 비봉쇄 모드로 작업함으로써 봉쇄를 막을수도 있다. 그리고 몇몇의 SMPs 는 User Space 쓰레드로도 모든 SMP 자원을 이용할수 있도록 방법을 제공해 주기도 한다.


2.1.2절. Kernel Space 쓰레드

커널에서 직접 쓰레드를 관리하며, 커널자체에서 쓰레드 정보를 가진다. 또한 각 프로세스 간의 스케쥴역시 커널에서 맡는다. 이경우 I/O 봉쇄같은 문제는 신경쓰지 않아도 되며, 또한 SMPs 자원을 제대로 활용할수 있다는 장점을 가진다.


3절. Pthread 에서의 세밀한 동기화

우리는 이전의 Pthread(2) 문서에서 mutex 잠금을 이용한 데이타 동기화에 대해서 이미 알아보았다. 그러나 mutex 는 어디까지나 잠금을 위한 것으로 동기화시킬수 있는 범위는 좁은 범위로 한정되며, mutex 만 가지고는 세밀한 수준에서의 동기화는 어렵다. 그래서 mutex 와 더불어 다른 쓰레드 도구를 사용해야 한다. 이번장에서는 Condition variables (조건변수) 와 mutex 를 이용한 데이터 동기화에 대해서 알아보도록 하겠다.


3.1절. 조건변수

뮤텍스가 잠금에 의해서 소극적인 동기화를 제공한다면, 조건변수는 "시그널 발생"과 "시그널에 대한 기다림" 을 통한 좀더 적극적인 동기화 방법을 제공한다. 이러한 시그널을 기다리기 위해서 pthread_cond_wait() 와 pthread_cond_timedwait() 2개의 함수를 제공한다. 이 함수들은 pthread_cond_signal()에 의해서 발생되는 시그널을 기다리고 있다가. 시그널이 도착하면 기다림을 멈추고 다음 루틴을 실행하게 된다. 이러한 시그널의 전달은 "조건변수" 를 통해서 이루어지게 된다. 아래는 이러한 순서를 간략하게 정리한 것이다(슈도코드형식).

임계영역에 들어간다. 
{
요청을 기다린다.(pthread_cond_wait 를 이용해서 singal이 도착하는지 확인한다)
만약 요청이 도착한다면 (요청에 대한 해결이 끝날때 까지 요청영역을 임계영역으로 설정한다)
{
요청을 해결한다.
}
}
임계영역을 빠져나온다.
임계영역에 진입하고 빠져나오는 것은 mutex 를 사용한다. 위의 슈도코드를 보면 단순히 잠금만을 제공하는것이 아닌, 요청/해결 의 서버/클라이언트 모델을 구현할수 있으며, 다중의 요청/해결에 대한 동기화 문제까지 해결할수 있음을 알수 있다.

3.1.1절. 조건변수 생성및 초기화

조건변수는 pthread_cont_t 의 타입으로 선언되어 있다. pthread_cond_t 는 다음과 같이 선언되어 있다.

typedef struct
{
struct _pthread_fastlock __c_lock; // 구조체의 잠금을 위해서 사용된다.
_pthread_descr __c_waiting; // 쓰레드는 이 값의 상태변화를 기다린다.
} pthread_cond_t
pthread_cont_t 의 초기화를 위한 방법은 2가지가 있다. 첫번째는 프로그램의 시작시에 선언하고 초기화 해버리는 방법과, 프로그램 실행중에 함수를 호출해서 사용하는 방법이 있다. 프로그램 시작시에 선언해서 사용하기 위해서는 다음과 같은 방법을 사용한다.
pthread_cond_t       thread_con1 =   PTHREAD_COND_INITIALIZER;
PTHREAD_COND_INITIALIZER 은 {__LOCK_INITIALIZER, 0} 으로 디파인 되어 있다. 위의 초기화를 사용하게 되면 pthread_cont_t 구조체를 위의 값으로 초기화 하게 된다. 또다른 방법은 프로그램 시행중에 사용하는 방법으로 pthread_cond_init() 함수를 사용하면 된다.

3.1.2절. 조건변수에 신호 보내기

우리는 조건변수에 신호를 보냄으로써, 조건변수를 (신호를)기다리는 다른 쓰레드를 깨울수 있게 된다. 보통은 이러한 신호를 보내서 깨우고자 하는 쓰레드가 하나일수도 있지만, 동시에 여러개의 쓰레드를 깨워야할 필요도 있을것이다. 다음은 이러한 시그널을 보내는 함수들이다.

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
pthread_cond_signal() 은 하나의 쓰레드를 깨우기 위한, pthread_cond_broadcast() 는 모든 쓰레드를 깨우기 위한 신호를 보내기 위해서 사용된다.

성공적으로 시그널이 전달되면 0이 리턴된다.


3.1.3절. 조건변수를 통한 신호 기다리기

신호를 보내는 쓰레드가 있다면, 이 신호를 받기 위해 기다리는 쓰레드도 있어야 할것이다. 신호를 받기 위해서는 pthread_cond_wait()와 pthread_cond_timewait() 두개의 함수가 사용된다. 이두개의 함수는 신호가 전달될때까지 기다리게 된다.

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
첫번째 아규먼트는 신호를 기다리기 위한 "조건변수" 이며 2번째는 mutext 변수이다. 이 두개의 함수는 실행되면 mutext 잠금풀고(unlock) 신호를 기다린다. 그러다가 신호가 전달되면 뮤텍스 잠금을 얻은 다음 해당영역을 실행하게 된다.

pthread_cond_wait() 와 pthread_cond_timedwait() 의 다른점은 timeout 을 기다리는지에 따라서 구별된다. pthread_cond_timedwait 는 기다림의 시간을 설정하여서 일정한 시간동안 시그널이 도착하지 않으면 ETIMEDOUT 값을 리턴한다. 반면 pthread_cond_wait 는 시그널이 도착할때까지 무한정 기다린다.

지금까지의 설명한 내용의 이해를 돕기 위해 간단한 흐름도를 그려보았다. 아래의 흐름도는 pthread_cond_wait()를 사용했을경우다.

// 뮤텍스 와 조건변수를 초기화 한다.  
pthread_mutext_t request_lock = PTHREAD_MUTEX_INITILIZER;
pthread_cond_t producer_cv = PTHREAD_COND_INITIALIZER;

// 임계 영역에 대한 뮤텍스 잠금을 얻는다.
int rc;
rc = pthread_mutext(&mutext_a);
if (rc)
{
perror("mutext lock error ");
phread_exit(NULL);
}

// 조건변수 producer_cv 에 대한 시그널을 기다린다.
// wait 에 들어가기전에 mutext 는 request_lock에 대한 잠금을
// 얻는다.
mutext_rc1 = pthread_cond_wait(&producer_cv,&request_lock)
if
{
// mutext 잠금이 이루어진다.
// 코드 실행
}

// 뮤텍스 잠금을 돌려준다.
pthread_mutex_unlock(&request_lock);

다음은 pthread_cond_timedwait()를 사용했을때의 흐름도이다. pthread_cond_wait()와 비교해서 시간설정을 위한부분이 들어가고, 타임아웃이 발생했을때 처리하는 부분이 들어가는걸 빼고는 나머지 부분은 동일하다.

...
struct timeval now;
struct timespec timeout;
int done;
...
rc = pthread_mutext(&mutext_a);
if(rc)
{
...
}

// timeout 시간 설정을 위해서 사용한다.
gettimeofday(&now);
timeout.tv_sec = noew.tv_sec + 5
timeout.tv_nsec = now.tv_usec * 1000;

// timeout 시간동안 signal 을 기다린다.
rc = pthread_cond_timedwait(&got_request, &request_mutext, &timeout);

// pthread_cond_timedwait 가 넘겨준 값을 이용해서 분기한다.
switch(rc)
{
// 시그널이 발생했을때..
case 0:
...
break;
// timeout 동안 시그널이 발생하지 않았을경우
case ETIMEDOUT:
...
break;
// 그밖의 경우(에러)
default:
...
break;
}
pthread_mutext_unlock(&request_mutex);

3.1.4절. 조건 변수 없애기(Destory)

더이상 사용할 필요가 없는 조건변수는 없애주도록 하자. 이렇게 함으로써 시스템자원의 낭비를 막을수 있다. 조건변수를 없애기 위해서는 pthread_cond_destroy() 함수를 이용한다. 조견변수를 없애기 전에 조건변수를 사용하는 쓰레드가 없음을 확인해야 한다.

// 조건변수 got_request 를 삭제한다. 리턴값이 EBUSY일경우는 
// 아직 사용하는 쓰레드가 있으서 조건변수 삭제에 실패했을경우이다.
int rc = pthread_cond_destory(&got_request);
if (rc == EBUSY)
{
....
}

4절. Thread-Specific Data

4.1절. 쓰레드간 전역변수 사용의 문제점

일반적인 단일 쓰레드 프로그래밍을 할때, (비록 권장하지는 않지만) 전역변수를 사용하는 경우가 종종 있을것이다. 전역변수를 씀으로써 그렇지 않을때 보다 코드를 훨씬 쉽게 기술할수도 있기때문이다.

멀티쓰레드 프로그램에서도 이러한 전역 변수가 필요할때가 있다. 그런데 전역변수를 쓰는데 약간의 문제가 발생한다. 이유는 모든 쓰레드가 전역변수를 공유해 버리기 때문에, 쓰레드별 전역변수를 사용할수 없기 때문이다. fork() 를 이용한 멀티 프로세스 모델에서는 프로세스간 전역변수 공유가 되지 않기 때문에, 각 프로세스 개별적인 전역변수를 사용할수 있는것과는 대조적이다.

그럼 어떻게 해야지 쓰레드 개별적으로 전역변수를 사용하게 할수 있는지 알아보도록 하겠다.


4.2절. Thread-Specific Data

4.2.1절. Thread-Specific Data 의 개념

이러한 문제의 해결을 위해 등장한것이, Thread-Specific Data (이하 TSD)이다. 같은 전역변수가 서로 다른 메모리 영역을 가르키게 하는 방법을 사용하는데, 다른 메모리 영역의 주소를 가리키는 각각의 다른 key 를 사용함으로써, 쓰레드간 개별적인 전역변수를 엑세스하게끔 만들어준다.

이것은 간단한 데이타베이스 형태인 key-value 관계로 나타낼수 있는데, 각각의 key 에 이름을 부여하고 이 key 가 value 를 가리킨다. 이 value 는 메모리 영역이 될것이다.

+-----------------------------------+
| Process |
| |
| Global_data |
| +-------+-------+-------+ |
| | data1 | data2 | data3 | |
| +-------+-------+-------+ |
| ^ ^ ^ |
| | | | |
| | | | |
| key1 key2 key3 |
| | | | |
| +---+ +---+ +---+ |
| | | | | | | |
| | A | | B | | C | |
| +---+ +---+ +---+ |
+-----------------------------------+
위의 그림은 쓰레드 A, B, C각 하나의 전역변수인 Global_data 에 어떻게 자신만의 전역데이타를 읽고 쓸수 있는지에 대한 개념을 나타낸것이다.

4.2.2절. TSD 블럭의 할당(생성)

pthread_key_create() 함수를 이용해서 모든 쓰레드에서 효력을 가지는 새로운 key를 할당받을수 있다. 최초에 key 를 만들게 되면 이 key 는 기본적으로 NULL을 가르키게 된다. 나중에 쓰레드는 이 key 값을 변경하게 됨으로써 자신이 원하는 메모리 영역을 참조할수 있게 된다. 다음은 이 함수가 어떻게 사용되는지에 대한 간단한 예이다. pthread_key_t 는 unsigned int 형이다.

int rc;
pthread_key_t list_key;

// clean_list 는 사용자 정의 함수로, 나중에 데이타가 필요없을때
// 데이타를 제거(free) 하기 위해서 필요한 함수이다.
extern void* clean_list(void*);

// key 를 생성하며 데이타 제거를 위한 함수를 지정해준다.
// cleanup_list 함수는 NULL 로 지정될수도 있으며,
// 이경우 나중에 key 를 삭제하더라도 데이터가 제거되지는 않는다.
rc=pthread_key_create(&list_key, cleanup_list);
pthread_key_create()를 실행한후에 list_key 는 새로 만들어진 key 를 가르킨다. pthread_key_create 는 성공적으로 실행될경우 0을 반환하고 그렇지 않을경우에는 적당한 에러코드를 반환하도록 되어있다. 생성할수 있는 key 의 숫자는 제한되어 있는데, 생성할수 있는 최대크기는 PTHREAD_KEYS_MAX 에 정의 되어 있다. 이값은 OS 마다 다를수 있으며 보통 1024 의 크기를 가진다.

4.2.3절. TSD 블럭 에 대한 Access

key 를 만들었다면 이제 value 에 엑세스를 해야 한다. 엑세스를 위해서 pthread_getspecific()와 pthread_setspecific()두개의 함수가 제공된다. 첫번째 함수는 key 에 대한 value를 가져오기 위해서 사용되며, 두번째 함수는 key 에 value 를 세팅하기 위해서 사용된다. value 는 (void *)형으로 원하는 어떤 데이타라도 넘길수 있도록 되어있다. 다음은 간단한 사용 예이다.

    
phtread_key_t a_key;
int rc;

int *p_num = (int *)malloc(sizeof(int));
(*p_num) = 4;

rc = pthread_setspecific(a_key, (void *)p_num);
...
// 만약 a_key 에 대해서 p_num 값을 가져오고 싶다면
{
int *p_keyval = (int *)pthread_getspecific(a_key);

if (p_keyval != NULL)
{
printf("value of 'a_key' is: %d\n", *p_keyval);
}
}

4.2.4절. TSD 블럭 삭제

더이상 사용할 필요가 없는 TSD 블럭은 pthread_key_delete()를 이용해서 삭제한다. 주의 할것은 이것은 key 를 삭제하는 것이지 실제 데이타의 메모리를 삭제하지는 않는다는 것이다. 데이타 메모리의 삭제는 pthread_key_create() 로 key 를 만들때 지정한 데이타 삭제 함수가 이용된다. 만약 데이타 삭제 함수가 NULL 이였다면 데이타 삭제는 일어나지 않을것이다.

int rc = pthread_key_delete(key);

5절. 결론

지금까지 Pthread 에 대한 몇가지 이슈들을 살펴보았는데, 여전히 다루지 못한 사항들이 있습니다. 그리고 실제로 어떻게 응용이 가능한지에 대한 방향제시도 좀 미약하구요. 이러한 부족한 점들은 나중에 다루게 될 "다중 연결 서버구성"의 쓰레드 편을 다루면서 보강하도록 하겠습니다.

이 문서들은 인터넷상의 온라인문서들과 man 페이지들을 참고해서 만들었읍니다. 그리고 제 스스로 쓰레드에 대해서 그리 깊은 지식을 가지고 있지 않음으로, 잘못된 내용이 있을수 있습니다. 이러한 내용은 게시판 등 에댓글을 남겨주시면 수정하도록 하겠습니다.

:::
2007/06/06 01:46

Pthread - mutex와 조건변수

이 문서는 수정될 수 있습니다. 최신 문서는 Joinc Wiki를 참고하세요.

Pthread(4) - mutex & 조건변수

윤 상배

yundream@joinc.co.kr



1절. 소개

그동안 Pthread_1, Pthread_2, Pthread_3, 을 통해서 pthread 에 대한 몇가지 기본적인 내용들에 대해서 알아 보았다.

그중 Pthread_3 에서 조건변수와, mutex 잠금에 대한 설명이 있었는데, 설명만 있었고 실질적인 예를 이용한 테스트는 없었다.

이번에는 mutex 잠금과 조건변수에 대한 이해를 도울수 있는 간단한 어플리케이션을 제작해보고 어떠한 문제점을 가질수 있는지에 대한 테스트도 하게 될것이다.


2절. Mutex 잠금과 조건변수 테스트

2.1절. 테스트용 어플리케이션 개요

테스트용 어플리케이션의 이름은 mutex_con.c 로 하도록 하겠다. 이 프로그램은 3개의 쓰레드로 이루어진다. 첫 번째 쓰레드는 main 쓰레드로 나머지 2개의 쓰레드(thread 1, thread 2) 를 생성하고 (pthread_create) join 하는 일을 하게 될것이다(즉 특별히 하는일 없다). 2번째 쓰레드는 2개의 int 형 멤버변수를 가지는 구조체에 접근해서 특정한 숫자를 입력하게 된다. 3번째 쓰레드는 이 구조체에 접근해서 멤버변수의 값을 읽어와서 "뎃셈" 하고 이를 화면에 출력시켜주는 일을한다.

이때 이 구조체는 2번 쓰레드와 3번쓰레드 모두가 접근하게 되므로 mutex 잠금을 이용해서 한번에 하나의 쓰레드만 접근하도록 제어해야 될것이다.

mutex 잠금을 이용한 접근제어 외에도, 3번째 쓰레드는 2번째 쓰레드에 의해서 구조체의 값이 변경되었다는걸 감지하고, 값이 변경된 시점에서 구조체에 접근해야 한다. 즉 구조체의 값이 변경될때까지 기다려야 한다. 이 "기다림" 을 위해서 조건변수를 사용하게 된다.

이 조건변수라는 것은 간단히 말해서 신호(signal)를 주고 받는 개념이다. 한쪽에서는 신호를 기다리다가, 신호가 오면 신호를 감지해서 필요한 일을 하게 되는 개념이다.


2.2절. 조건변수를 통해 얻는 프로그래밍 상의 이점

조건변수를 사용하지 않는 다면 어떻게 될까. (물론 조건변수 대신 세마포어를 사용할수도 있으나 이는 논외로 하자.) 그렇다면 thread 2 에서는 구조체의 정보가 변경되었는지 알수 없음으로 구조체의 정보가 변경되었는지 확인하기 위해서 busy wait(바쁜대기 상태)에 놓이면서 지속적으로 값이 변경되었는지를 확인해야 할것이다.

하지만 이건 좋은 방법이 아니다. busy wait 상태란 점도 맘에 들지 않지만 실제로 thread 1 에서 값을 변경했는데 기존의 값과 같을수도 있기 때문이다. 기존의 값과 같든지 아니든지 간에 thread 2 에서는 값을 읽어들여야 하는데, 값의 변경을 확인하는 방법으론 체크자체가 불가능해 질수가 있다.

위의 문제를 해결하기 위해서 별도의 변수를 하나더 둘수 있을것이다. 그래서 thread 1 에서 구조체의 값을 변경시켰다면 이 별도의 준비한 변수의 값을 변경하는 것이다. 그리고 thread 2 에서는 바쁜 대기 상태에서 이 변수의 값이 변경되었는지 확인해서 구조체의 값을 가져오는 것이다. 이 방법을 사용하면 위의 문제를 해결할수 있겠지만, 역시 바쁜 대기 상태에 놓이게 된다는 단점을 가지게 된다.

조건 변수를 사용하면 이러한 모든 문제를 해결할수 있다. 조건 변수를 사용하게 되면 thread 2 에서는 thread 1 에서 조건변수를 통해서 시그널을 보내기 전까지 대기 상태에 놓일수 있을것이기 때문이다.

조건변수는 메모리 buffer 처리등에 유용하게 사용될수 있을것이다. 조건변수를 사용함으로써, 만약에 메모리 buffer 에 처리해야될 자료가 없다면 busy wait 상태에 놓일 필요 없이 signal 을 기다리면 될것이기 때문이다. 그래서 signal 이 도착하면 메모리 buffer 에 엑세스를 시도해서 최근의 정보를 가져오면 될것이다. 이 외에도 조건변수는 쓰레드간 동기화등과 같은 다른 영역에도 매우 유용하게 사용할수 있다.


2.3절. 작동 프로세스

작동 프로세스는 어떻게 mutex 잠금과 조건변수를 이용해서 임계영역을 보호하고 구조체의 값의 변경시점을 알수 있는지에 대한 내용을 중심으로 해서 기술할것이다.

  thread 2  
while (1)
{
mutex 잠금을 얻는다.
// 임계영역 시작 ----------------------------------------------
구조체에 접근해서 값을 가져온다.
구조체 멤버변수의 값을 변경한다.(2씩 더한다)
pthrad_cond_signal 를 이용해서 조건변수를 통해 신호를 보낸다.
// 임계영역 끝 ------------------------------------------------
mutex 잠금을 돌려준다.
sleep(1);
}

thread 3
while(1)
{
mutex 잠금을 얻는다.
// 임계영역 시작 ----------------------------------------------
pthread_cond_wait 를 이용해서 조건변수를 통해 신호가 오는지 기다린다.
if (신호가 도착한다면)
두개의 구조체 멤버변수의 값을 덧셈 하고 이를 출력한다.
// 임계영역 끝 ------------------------------------------------
mutex 잠금을 돌려준다.
}


2.4절. 코딩

이제 작동프로세스까지 만들어졌으니, 코딩에 들어가도록 한다. 코딩에 들어가기 위해서는 작동프로세스 외에도 설계서가 필요할것이지만, 이러한 경우 매우 간단한 프로그램으로 작동프로세스 자체가 설계서나 마찬가지임으로 설계서 이런건 생략하도록 하겠다.

예제 : mutex_con.c

#include <pthread.h>
#include <string.h>
#include <unistd.h>

pthread_mutex_t mutex_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t thread_cond = PTHREAD_COND_INITIALIZER;

struct com_data
{
int a;
int b;
};

struct com_data mydata;

void *do_write(void *data)
{
mydata.a = 0;
mydata.b = 0;
while(1)
{
pthread_mutex_lock(&mutex_lock);
mydata.a = random() % 6000;
mydata.b = random() % 6000;
pthread_cond_signal(&thread_cond);
pthread_mutex_unlock(&mutex_lock);
sleep(2);
}
}

void *do_read(void *data)
{
while(1)
{
pthread_mutex_lock(&mutex_lock);
pthread_cond_wait(&thread_cond, &mutex_lock);
printf("%4d + %4d = %4d\n", mydata.a, mydata.b, mydata.a + mydata.b);
pthread_mutex_unlock(&mutex_lock);
}
}

int main()
{
pthread_t p_thread[2];
int thr_id;
int status;
int a = 1;
int b = 2;

thr_id = pthread_create(&p_thread[0], NULL, do_write, (void *)&a);
thr_id = pthread_create(&p_thread[1], NULL, do_read, (void *)&b);

pthread_join(p_thread[0], (void **) status);
pthread_join(p_thread[1], (void **) status);

return 0;
}

프로그램자체는 매우 간단하지만 조건변수의 기본적인 사용방법을 알수 있을것이다.


2.5절. 조건변수 사용시 주의해야될 사항

조건변수에는 pthread_cond_signal(3) 과 ptherad_cond_wait(3) 를 이용해서 신호를 주고, 기다리는 방식을 사용한다고 했다. 그렇다면 생각할수 있는게, 과연 신호가 실시간으로 전달이 될것이란걸 믿을수 있을까?

실시간으로 전달되는지 아닌지가 중요한 이유는 쓰레드가 신호를 보내고 나서 신호를 잘받았는지 기다리지 않고 바로 다음으로 넘어가 버리기 때문이다.

이건 꽤 중요한 문제가 될수도 있다. 왜냐하면 만약 신호가 실시간으로 전달되지 않는다면 신호가 미쳐 전달되기 전에 어떤 데이타가 변경되어 버리는 경우가 발생할수 있기 때문이다.

            쓰레드 공유변수 A = 0

thread 1 thread 2
while(1) while(1)
{ {
쓰레드 공유변수 A++
신호 보냄 ------------------> 신호 기다림
} }

위의 상황을 생각해 보자 최초 공유변수 A 에 0 이 들어간다. thread 1 에서 여기에 1 을 증가시키고 신호를 보낸다. thread 2 는 신호를 받고 A 의 값을 읽어 들여서 이것을 100 으로 나눈다. 그런데 신호가 늦게 보내져서 - thread 1 의 loop 회전속도가 신호를 보내는 시간보다 빠른경우 - thread 2 에서 신호를 미쳐 받기전에 A ++ 이 한번더 실행되고 A 의 값은 2가 될것이다. 이때 서야 thread 2 로 신호가 전달되었다면 결국 thread 1 에서는 2번의 데이타를 보냈는데 thread 1 는 한번의 연산만 실행한것으로 데이타 하나를 잃어 버린것과 같은 문제가 발생해 버린다.

신호는 매우 빠른 시간에 전달됨으로 보통의 경우 신호전달시간을 염두에 두어야 하는 경우는 발생하지 않을것이다. 하지만 불행하게도 염두에 두어야 하는 경우가 발생하기도 한다.

물론 우리 프로그래머들의 사전에 불가능이란 없으므로 위의 문제도 간단하게 해결가능 하다. 조건변수를 2개 쓰면 된다. thread 1 에서 신호를 보냈다면, thread 1 은 다음 루틴으로 넘어가기 전에 thread 2 에서 넘어오는 신호를 기다리도록 하면 될것이다. thread 2 는 thread 1의 신호를 받은뒤 thread 1으로 신호를 보내게 될것임으로 반드시 신호가 전달될것을 확신할수 있을것이다. 2개의 조건변수를 지원하기 위해서 2개의 mutex 잠금이 필요할것이다. 여기에서는 그 구현까지 설명하지는 않을것이다. 조금만 생각해보면 간단하게 구현 가능할것이기 때문이다.

 thread 1                                    thread 2
while(1) while(1)
{ {
쓰레드 공유변수 A++
신호 1 보냄 -----------------> 신호 1 기다림
신호 2 기다림 <---------------- 신호 2 보냄
.... ....
} }

신호의 전달에 걸리는 시간은 운영체제에 따라 상당한 차이를 보인다. 그러므로 이러한 오차시간까지도 염두에 두어야할 상황이 발생한다면 시간테스트를 해야할것이다.

:::
2007/05/26 14:51

Mutex를 이용한 쓰레드 동기화

관련문서 : 쓰레드 미니홈피
오래된 문서라 수정해야될 부분이 많습니다. 최신문서는 Joinc Wiki 에서 확인하세요.

쓰레드에서의 데이타 일치 문제

예를 들어서 두개의 쓰레드가 2개의 변수를 업데이트하려고 하는데, 하나의 쓰레드는 2개의 변수를 0으로 다른 하나는 1로 변경을 시도한다고 하자. 만약 이러한 변경이 같은 시간(same time)에 일어난다면 하나는 0으로 변경이 되었지만 다른 하나는 1로 변경되는 문제가 발생할수도 있을것이다. 이러한 문제를 context-switch 라고 하는데, 하나의 쓰레드가 변경하고자 하는 첫번째 변수를 0으로 바꾸었는데, 2번째 변수를 0으로 바꾸기 전에 다른 쓰레드가 변수를 모두 1로 바꾸어 버릴수 있을것이다. 그리고 나서 다시 첫번째 쓰레드가 두번째 변수를 0으로 바꾸게 되면, 첫번째 변수는 1이지만 두번째 변수는 0이 되는 데이타 불일치 상황이 발생하게 될것이다.

Mutex를 이용하면 된다.

pthread 라이브러리는 이러한 문제의 해결을 위해서 mutex 라는 것을 사용한다. mutex 는 IPC 에서 데이타 접근 통제를 위해서 사용하는 "세마포어"와 매우 비슷한 일을한다. mutex 는 세마포어와 마찬가지로 lock 을 사용하는데, mutex 는 lock 에 대해서 다음과 같은 3가지 사항에 대해서 보증한다.
  1. Atomicity - mutex 잠금(lock)는 최소단위 연적(atomic operation) 으로 작동한다. 이말의 뜻은 하나의 쓰레드가 mutex 를 이용해서 잠금을 시도하는 도중에 다른 쓰레드가 mutex 잠금을 할수없도록 해준다는 뜻이다. 한번에 하나의 mutex 잠금을 하도록 보증해준다.
  2. Singularity - 만약 스레드가 mutex 잠금을 했다면, 잠금을 한 쓰레드가 mutex 잠금을 해제 하기 전까지 다른 어떠한 쓰레드도 mutex 잠금을 할수 없도록 보증해준다.
  3. Non-Busy Wait - 바쁜대기 상태에 놓이지 않는다는 뜻으로, 하나의 쓰레드가 mutex 잠금을 시도하는데 이미 다른 쓰레드가 mutex 잠금을 사용하고 있다면 이쓰레드는 다른 쓰레드가 락을 해제하기전까지 해당 지점에 머물러 있으며 이동안은 어떠한 CPU 자원도 소비하지 않는다(이를테면 sleep).
mutex 는 위의 3가지 사항을 보증해 줌으로 우리는 쓰레드 공유되는 메모리를 충돌없이 사용할수 있게 된다. mutex 를 통해서 잠금을 얻는 것을 간단하게 나타내면 아래와 같다(pseude-code 형식으로 설명하겠다). 일단 첫번째 쓰레드는 다음과 같이 mutex 를 이용해서 변수 업데이트 작업을 하게 될것이다.
'X1' 에 대해서 뮤텍스 잠금을 한다.
첫번째 변수를 0으로 세팅한다.
두번째 변수를 0으로 세팅한다.
'X1' 뮤택스의 잠금을 해제한다.
그리고 나서 2번째 쓰레드가 뮤텍스를 이용한 변수 업데이트 작업을 하게 될것이다.
'X1' 에 대해서 뮤텍스 잠금을 한다.
첫번째 변수를 1으로 세팅한다.
두번째 변수를 1으로 세팅한다.
'X1' 뮤택스의 잠금을 해제한다.
2개의 쓰레드가 같은 mutex 'X1' 을 사용한다고 가정하면 이 코드는 어떠한상황 에서도 두개의 변수를 '0' 혹은 '1' 로 유지할것이라는걸 보증할수 있을것이다. mutex 를 이용해서 이러한 작업을 할때 프로그래머는 어떠한 상황에서 mutex 를 써야할지에 대해서 주의를 기울여야 한다. 만약 3번째 쓰레드가 mutex 를 사용하지 않은체 변수에 업데이트 작업을 하게 될 경우 앞의 2개의 쓰레드가 mutex 를 사용한 이유가 사라지게 된다.

뮤텍스 생성과 초기화

뮤텍스를 생성하기 위해서 우리는 먼저, 뮤텍스정보를 저장하기 위한 타입인 pthread_mutex_t 를 선언해주고 이것을 초기화 해주어야 한다. 선언과 초기화의 가장간단한 방법은 PTHREAD_MUTEX_INITIALIZER 상수를 할당하는 것으로 아래와 같이 사용할수 있다.
pthread_mutex_t a_mutex = PTHREAD_MUTEX_INITIALIZER;

뮤텍스 잠금, 잠금해제, 제거

뮤텍스 잠금을 위한 함수로는 pthread_mutex_lock() 함수를 제공한다. 이 함수는 해당 뮤텍스에 대해서 잠금을 시도하는데, 만약 잠그려는 뮤텍스가 다른 쓰레드에 의해서 이미 잠겨있다면, 잠금을 얻을수 있을때까지 - 이미 잠근 다른 쓰레드가 뮤텍스의 잠금을 해제할때까지 - 봉쇄(블럭)되게 된다. 다음은 이러한 뮤텍스 잠금을 얻기 위한 지원함수들이다.
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_destory(pthread_mutex_t *mutex);
pthread_mutex_trylock 를 사용하면 잠금을 얻을수 없을경우 해당 코드에서 블럭되지 않고 바로 에러코드를 돌려준다. 즉 pthread_mutex_lock 의 비봉쇄 버젼이라고 생각하면 된다.

뮤텍스 잠금을 얻은후 해당 영역에서의 작업을 마친후 잠금을 해제하기 위해서 사용한다. 사용되는 함수는 pthread_mutex_unlock 이며 함수원형은 다음과 같다.
int pthread_mutex_unlock(pthread_mutex_t *mutex);
다음은 쓰레드간 공유되는 자원을 위해서 잠금을 어떻게 사용하는지를 보여주는 간단한 예제이다.
에제: mutex_lock.c
#include <stdio.h> 
#include <unistd.h>
#include <pthread.h>

int ncount; // 쓰레드간 공유되는 자원
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 쓰레드 초기화

void* do_loop(void *data)
{
int i;

pthread_mutex_lock(&mutex); // 잠금을 생성한다.
for (i = 0; i < 10; i++)
{
printf("loop1 : %d", ncount);
ncount ++;
sleep(1);
}
pthread_mutex_unlock(&mutex); // 잠금을 해제한다.
}

void* do_loop2(void *data)
{
int i;

// 잠금을 얻으려고 하지만 do_loop 에서 이미 잠금을
// 얻었음으로 잠금이 해제될때까지 기다린다.
pthread_mutex_lock(&mutex); // 잠금을 생성한다.
for (i = 0; i < 10; i++)
{
printf("loop2 : %d", ncount);
ncount ++;
sleep(1);
}
pthread_mutex_unlock(&mutex); // 잠금을 해제한다.
}

int main()
{
int thr_id;
pthread_t p_thread[2];
int status;
int a = 1;

ncount = 0;
thr_id = pthread_create(&p_thread[0], NULL, do_loop, (void *)&a);
sleep(1);
thr_id = pthread_create(&p_thread[1], NULL, do_loop2, (void *)&a);

pthread_join(p_thread[0], (void *) &status);
pthread_join(p_thread[1], (void *) &status);

status = pthread_mutex_destroy(&mutex);
printf("code = %d", status);
printf("programing is end");
return 0;
}
위의 코드를 우선 mutex 잠금을 하지 않은체 컴파일후 실행해보자. 간단하게 pthread_mutext_lock 와 pthread_mutex_unlock 부만 주석처리하면 된다. 그러면 do_loop2 와 do_loop 가 일정한 간격을 두고 ncount 자원에 접근하는 것을 볼수 있을것이다. 그러나 우리는 do_loop 가 ncount 자원을 접근하고 있는동안 다른 쓰레드가 접근하지 않기를 원할때가 있을것이다. 이럴때 뮤텍스 잠금을 사용하면 된다.

위의 코드에서 뮤텍스 잠금 부분의 주석을 풀고 다시 컴파일해서 실행시켜보면, do_loop 쓰레드가 ncount 증가 작업을 모두 마칠때까지 do_loop2 쓰레드는 해당 영역에서 블럭됨을 알수 있을것이다. 이런식으로 하나의 쓰레드가 특정자원에 접근할때 다른 쓰레드가 접근하지 못하도록(한번에 하나의 쓰레드만 해당 자원에 접근할수 있도록) 제어할수 있다.

컴파일 방법은 gcc -o mutex_lock mutex_lock.c -lpthread 이다

더이상 뮤텍스를 사용할일이 없다면 pthread_mutex_destory 를 이용해서 뮤텍스 자원을 제거(free) 하도록 한다. 만일 뮤텍스자원을 사용하는 쓰레드가 하나라도 존재한다면 에러코드(EBUSY)를 리턴한다. 그러므로 모든 쓰레드의 뮤텍스에 대해서 pthread_mutex_unlock 을 이용해서 잠겨져야만 뮤텍스 제거가 성공할수 있다. 성공할경우 0을 넘겨준다.
:::