На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
Модераторы: Qraizer, Hsilgos
  
> Блокировка асинхронной функции
    Всем привет! Помогите разобраться, как правильно сделать вот такую штуку.

    У меня есть класс, который реализует веб-сокет клиент.
    Пользователь может из нескольких потоков вызывать метод отправки сообщений Send().
    Этот метод кладет сообщение в очередь на отправку и вызывает SendNextRequest(),
    который извлекает первое сообщение из очереди и пишет его в сокет:

    ExpandedWrap disabled
      void SendNextRequest() {
          // (1)
          // Возьмем сообщение из очереди и асинхронно запишем данные в сокет
          ws_->async_write(OnWrite);
      }


    Функция отправки сообщения SendNextRequest вызывает бустовую функцию async_write,
    которая пишет данные в сокет и как только они записались, то будет вызван метод OnWrite

    ExpandedWrap disabled
      void OnWrite() {
          // Данные записались в сокет
          
          // Вот тут надо бы опять вызвать SendNextRequest()
          SendNextRequest();
          
          // (2)
      }


    Мне бы хотелось, чтобы по окончанию работы OnWrite я мог опять вызвать SendNextRequest, чтобы отправить следующее сообщение из очереди, и так пока очередь не опустеет.
    Загвоздка тут в том, что изначально Send() вызывается из разных потоков, соответственно SendNextRequest тоже будет работать в разных потоках.
    Каким образом я могу заблокировать SendNextRequest с начала ее выполнения (1) и до окончания выполнения OnWrite (2), чтобы все остальные потоки ждали своей очереди?
      Цитата =MOHAX= @
      Каким образом я могу заблокировать SendNextRequest..

      я такое делаю так:

      1. Допустим, есть рабочий поток, делающий полезное.
      Как я понял, в твоём случае полезное - это метод "SendNextRequest"
      2. Пусть поток ждёт событие - семафор. Посредством "WaitForMultipleObjects".
      3. Для связи между потоками я делаю коммуникационные объекты.
      Это та же очередь (или 2 - в обе стороны) + возбуждение события
      после занесения данных в очередь.
      Поскольку я не знаю, какой тип танных буду передавать (это зависит от проекта), такой
      коммуникационный объект реализован шаблонным классом.
      При этом коммуникационный объект потокобезопасен.
      4. Для возбуждения события напишем класс-интерфейс "I_RiseEvent"
      Например так:
      ExpandedWrap disabled
        //---------------------------------------------------------------------------
        class I_RiseEvent
        {
         private:
         
         protected:
         
         public:
         
        virtual BOOL WINAPI  Rize_Event  (UINT uiParam, WPARAM wParam=0, LPARAM lParam=0)  = 0;
        };
        //---------------------------------------------------------------------------


      Унаследуем класс поток от базового класса и интерфейса I_RiseEvent.
      Теперь реализация метода Rize_Event будет в рабочем потоке.
      Она зависит от конкретного проекта и потока.
      В твоём случае просто взведём семафор для запуска SendNextRequest.
      5. Во время старта приложения передадим указатель на интерфейс
      в коммуникационный объект. Таким образом, после занесения данных
      в очередь на отправку (из любого потока) комм-объект будет
      взводить семафор в рабочем потоке.
      ---
      Неважно, сколько потоков будут заносить данные в комм-объект.
      Каждое занесение будет взводить семафор.
      При запуске SendNextRequest семафор гасится один раз.
      Поэтому SendNextRequest будет вызвана требуемое количество раз,
      последовательно отправляя все порции данных из очереди.
      После выполнения последней транз-акции рабочий поток
      перейдёт в режим ожидания последующих запросов.
      ---
      Структура приложения становится асинхронной - "никто никого никогда не ждёт".
      И никто ни о ком ничего не знает. У рабочего потока могут быть
      входные очереди и выходные. И всё.
      Цикл такой : получил семафор - достал данные из очереди - отработал - заснул.
      Если надо во время работы какие-то данные записал в комм-объект.
      Тот же самый или другой. Для отсылки результатов куда-то ещё.
      Таким образом удобно организовывать конвейер из потоков для
      последовательной обработки данных.
      Сообщение отредактировано: ЫукпШ -
        std::recursive_mutex блокирует все нитки, кроме владеющей им. Т.е. если его захватить перед ws_->async_write(), и OnWrite() по этому пакету будет вызвана в этом же потоке, то повторный вызов SendNextRequest() пропустит поток управления, но если вызов будет из другого потока, то попытка овладеть мютексом будет приостановлена до его освобождения.
          ЫукпШ, Qraizer спасибо за советы, основную идею понял, вместо семафора использовал std::atomic_bool и вместе с std::recursive_mutex все получилось!

          Единственное, в интернете почему то не рекомендуют использовать std::recursive_mutex из-за его тормознутости, наверное надо эту реализацию переделать на обычный цикл.
            =MOHAX=, std::recursive_mutex по факту самый обычный стандартный WinAPI мьютекс. Его реализацию можно легко представить либо через CRITICAL_SECTION, либо Mutex Objects. Причём первые вообще очень лёгкие объекты. Вот если в системе нет объектов с поддержкой владения, и его надо эмулировать семайфором со счётчиком 2, тогда да, нужно программить дополнительную логику владения и запускать её также под критической секцией. Руками это будет дорого.
            В любом случае, прежде чем заниматься оптимизацией, всегда имеет смысл спрофилировать код. Абстрактное "тормознутость" понятие неконкретное.
            0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
            0 пользователей:


            Рейтинг@Mail.ru
            [ Script execution time: 0,0231 ]   [ 16 queries used ]   [ Generated: 27.04.24, 06:43 GMT ]