Литмир - Электронная Библиотека
Содержание  
A
A

int main(int argc, char* argv[]) {

 // создать TCP-сокет на порт

 ls = getsocket(THREAD_POOL_PORT);

 // создание атрибутной записи пула потоков:

 thread_pool_attr_t attr;

 memset(&attr, 0, sizeof(thread_pool_attr_t));

 // заполнение блока атрибутов пула

 /* - mm число блокированных потоков в пуле */

 attr.lo_water = 3;

 /* - max число блокированных потоков в пуле */

 attr.hi_water = 7;

 /* - инкремент шага создания потоков */

 attr.increment = 2;

 attr.maximum = 9;

 /* - общий предел числа потоков в пуле */

 attr.handle = dispatch_create();

 attr.context_alloc = alloc;

 attr.block_func = block;

 attr.handler_func = handler;

 // фактическое создание пула потоков:

 void* tpp = thread_pool_create(&attr, POOL_FLAG_USE_SELF);

 if (tpp == NULL) errx("create pool");

 // начало функционирования пула потоков:

 thread_pool_start(tpp);

 // ... выполнение никогда не дойдет до этой точки!

 exit(EXIT_SUCCESS);

}

Примечание

В примере используются, но не определены две функции, которые не столь существенны для понимания примера сточки зрения функционирования пула:

• 

errx()
— реакция на ошибку выполнения с выводом сообщения и последующим аварийным завершением;

• 

retrans()
— прием сообщения с присоединенного TCP-сокета с последующей ретрансляцией полученного содержимого в него же.

Итак, первая особенность пула потоков в том, что мы построили многопоточный сервер, почти не прописывая собственного кода, — большую часть рутинной работы за нас сделала библиотека пула.

Приведем описание логики работы пула потоков и показанного примера на самом качественном, простейшем уровне:

• Первоначально (при запуске пула потоков в работу вызовом

thread_pool_start()
) создается
attr.lo_water
потоков («нижняя ватерлиния» числа блокированных потоков).

• При создании любого потока (как в процессе начального, так и в процессе последующего создания) вызывается функция

attr.соntext_alloc()
(в контексте созданного потока).

• По завершении функция вызывает блокирующую функцию потока

attr.block_func()
, на которой созданный поток ожидает события активизации (в показанном примере событие активизации — это установление соединения новым клиентом по возврату из
accept()
).

• Блокирующая функция после наступления события активизации переведет поток в состояние READY и вызовет в контексте этого потока функцию обработчика

attr.handler_func()
.

• Если после предыдущего шага число оставшихся заблокированных потоков станет ниже

attr.lo_water
, механизм пула создаст дополнительно
attr.increment
потоков и «доведет» их до блокирующей функции.

• Активизированный поток производит всю обработку, предписанную функцией потока, и после выполнения потоковой функции будет опять переведен в блокированное состояние в функции блокирования…

• …но перед переводом потока вновь в блокированное состояние проверяется, не будет ли при этом превышено число блокированных потоков

attr.hi_water
(«верхняя ватерлиния»), и если это имеет место, то поток вместо перевода в блокированное состояние самоуничтожается.

• Все проверки числа потоков производятся для того, чтобы общее число потоков пула (т. e. число активизированных потоков вместе с блокированными) не превышало общее ограничение

attr.maximum
.

Разобрав общую логику функционирования пула потоков, можно теперь детальнее рассмотреть отдельные шаги всего процесса:

1. Прежде чем создавать пул потоков, мы должны создать атрибутную запись, определяющую все поведение пула. Атрибутная запись описана так (

<sys/dispatch.h>
):

typedef struct _thread_pool_attr {

 THREAD_POOL_HANDLE_T* handle;

 THREAD_POOL_PARAM_T*

 (*block_func)(THREAD_POOL_PARAM_T* ctp);

 void (*unblock_func)(THREAD_POOL_PARAM_T* ctp);

 int (*handler_func)(THREAD_POOL_PARAM_T* ctp);

 THREAD_POOL_PARAM_T*

  (*context_alloc)(THREAD_POOL_HANDLE_T* handle);

 void (*context_free)(THREAD_POOL_PARAM_T* ctp);

 pthread_attr_t* attr;

 unsigned short lo_water;

 unsigned short increment;

 unsigned short hi_water;

 unsigned short maximum;

 unsigned reserved[8];

} thread_pool_attr_t;

Дескриптор создаваемого пула потоков

handle
, посредством которого мы будем ссылаться на пул, является просто синонимом типа
dispatch_t
:

#ifndef THREAD_POOL_HANDLE_T

 #define THREAD_POOL_HANDLE_T dispatch_t

#endif

Атрибуты потоков, которые будут работать в составе пула, определяются полем

attr
типа
pthread_attr_t
(эту структуру мы детально рассматривали ранее при обсуждении создания единичных потоков).

Численные параметры пула определяют:

lo_water
— «нижняя ватерлиния», минимальное число потоков пула, находящихся в блокированном состоянии (в ожидании активизации). Если в результате некоторого события один из ожидающих потоков переходит в состояние активной обработки и число оставшихся блокированных потоков становится меньше
lo_water
, создается дополнительно increment потоков, которые переводятся в блокированное состояние.

82
{"b":"155449","o":1}