Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[3.236.100.210] |
|
Сообщ.
#1
,
|
|
|
Всем привет! Помогите разобраться, как правильно сделать вот такую штуку.
У меня есть класс, который реализует веб-сокет клиент. Пользователь может из нескольких потоков вызывать метод отправки сообщений Send(). Этот метод кладет сообщение в очередь на отправку и вызывает SendNextRequest(), который извлекает первое сообщение из очереди и пишет его в сокет: void SendNextRequest() { // (1) // Возьмем сообщение из очереди и асинхронно запишем данные в сокет ws_->async_write(OnWrite); } Функция отправки сообщения SendNextRequest вызывает бустовую функцию async_write, которая пишет данные в сокет и как только они записались, то будет вызван метод OnWrite void OnWrite() { // Данные записались в сокет // Вот тут надо бы опять вызвать SendNextRequest() SendNextRequest(); // (2) } Мне бы хотелось, чтобы по окончанию работы OnWrite я мог опять вызвать SendNextRequest, чтобы отправить следующее сообщение из очереди, и так пока очередь не опустеет. Загвоздка тут в том, что изначально Send() вызывается из разных потоков, соответственно SendNextRequest тоже будет работать в разных потоках. Каким образом я могу заблокировать SendNextRequest с начала ее выполнения (1) и до окончания выполнения OnWrite (2), чтобы все остальные потоки ждали своей очереди? |
Сообщ.
#2
,
|
|
|
Цитата =MOHAX= @ Каким образом я могу заблокировать SendNextRequest.. я такое делаю так: 1. Допустим, есть рабочий поток, делающий полезное. Как я понял, в твоём случае полезное - это метод "SendNextRequest" 2. Пусть поток ждёт событие - семафор. Посредством "WaitForMultipleObjects". 3. Для связи между потоками я делаю коммуникационные объекты. Это та же очередь (или 2 - в обе стороны) + возбуждение события после занесения данных в очередь. Поскольку я не знаю, какой тип танных буду передавать (это зависит от проекта), такой коммуникационный объект реализован шаблонным классом. При этом коммуникационный объект потокобезопасен. 4. Для возбуждения события напишем класс-интерфейс "I_RiseEvent" Например так: //--------------------------------------------------------------------------- class I_RiseEvent { private: protected: public: virtual BOOL WINAPI Rize_Event (UINT uiParam, WPARAM wParam=0, LPARAM lParam=0) = 0; }; //--------------------------------------------------------------------------- Унаследуем класс поток от базового класса и интерфейса I_RiseEvent. Теперь реализация метода Rize_Event будет в рабочем потоке. Она зависит от конкретного проекта и потока. В твоём случае просто взведём семафор для запуска SendNextRequest. 5. Во время старта приложения передадим указатель на интерфейс в коммуникационный объект. Таким образом, после занесения данных в очередь на отправку (из любого потока) комм-объект будет взводить семафор в рабочем потоке. --- Неважно, сколько потоков будут заносить данные в комм-объект. Каждое занесение будет взводить семафор. При запуске SendNextRequest семафор гасится один раз. Поэтому SendNextRequest будет вызвана требуемое количество раз, последовательно отправляя все порции данных из очереди. После выполнения последней транз-акции рабочий поток перейдёт в режим ожидания последующих запросов. --- Структура приложения становится асинхронной - "никто никого никогда не ждёт". И никто ни о ком ничего не знает. У рабочего потока могут быть входные очереди и выходные. И всё. Цикл такой : получил семафор - достал данные из очереди - отработал - заснул. Если надо во время работы какие-то данные записал в комм-объект. Тот же самый или другой. Для отсылки результатов куда-то ещё. Таким образом удобно организовывать конвейер из потоков для последовательной обработки данных. |
Сообщ.
#3
,
|
|
|
std::recursive_mutex блокирует все нитки, кроме владеющей им. Т.е. если его захватить перед ws_->async_write(), и OnWrite() по этому пакету будет вызвана в этом же потоке, то повторный вызов SendNextRequest() пропустит поток управления, но если вызов будет из другого потока, то попытка овладеть мютексом будет приостановлена до его освобождения.
|
Сообщ.
#4
,
|
|
|
ЫукпШ, Qraizer спасибо за советы, основную идею понял, вместо семафора использовал std::atomic_bool и вместе с std::recursive_mutex все получилось!
Единственное, в интернете почему то не рекомендуют использовать std::recursive_mutex из-за его тормознутости, наверное надо эту реализацию переделать на обычный цикл. |
Сообщ.
#5
,
|
|
|
=MOHAX=, std::recursive_mutex по факту самый обычный стандартный WinAPI мьютекс. Его реализацию можно легко представить либо через CRITICAL_SECTION, либо Mutex Objects. Причём первые вообще очень лёгкие объекты. Вот если в системе нет объектов с поддержкой владения, и его надо эмулировать семайфором со счётчиком 2, тогда да, нужно программить дополнительную логику владения и запускать её также под критической секцией. Руками это будет дорого.
В любом случае, прежде чем заниматься оптимизацией, всегда имеет смысл спрофилировать код. Абстрактное "тормознутость" понятие неконкретное. |