На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
Модераторы: Qraizer, Hsilgos
  
> Обмен данными между потоками
    Доброе время суток.
    Столкнулся с такой проблемой:
    Я организовал взаимодействие потоков таким образом:
    поток1 формирует данные
    ExpandedWrap disabled
       while (...)
              {  
            
                  std::lock_guard<std::mutex> lg(mut);
                  
                  //Computing data      
          
                  exch_data.push(tmp);
                  data_cond.notify_one();
          
              }
          
              endWork = true;


    поток2 будет выводить на экран
    ExpandedWrap disabled
      while (true)
              {
                  if (endWork)
                      break;
          
                  std::unique_lock<std::mutex> lg(mut);
                  data_cond.wait(lg,[]{return !exch_data.empty();});
          
                  tmp = exch_data.front();
                  exch_data.pop();
          
                  //Draw data
          
                  lg.unlock();
              }


    main:
    ExpandedWrap disabled
      int _tmain(int argc, _TCHAR* argv[])
          {
              CThread_synchro exampl;
          
              //...
              std::thread t2(&CThread_synchro::_prepereData,&exampl);
              std::thread t1(&CThread_synchro::_proccesData,&exampl);
              
              //...
          
              t2.join();  
              t1.join();  
              return 0;
          }


    Предполагалось, я сформировал данные, поместил в очередь в одном потоке, а во-втором выбрал и очистил очередь и ждет поступления данных . Предполагал, что данный механизм блокировок обеспечит мне необходимую работу программы :unsure: :unsure:
    Но в результате у меня не корректная работа проги: 1-й поток набивает в очередь н-е количество порций данных, а второй отображает только несколько.
    Может я не тот механизм выбрал для такой задачи? :unsure:
      Механизм правильный. Дело в другом - в таймслайсах, которые отводятся каждому потоку. И переменной endWork, которую надо устанавливать либо под лочкой, либо делать atomic_bool. Если во втором потоке к проверке endWork добавить проверку очереди на пустоту, то всё должно заработать. И это, надо вытащить из под лочки notify_one.
        В общем, я тут пованговал, как могло выглядеть неизвестное мне приложение, получилось нечто такое:
        ExpandedWrap disabled
          #include <thread>
          #include <mutex>
          #include <atomic>
          #include <queue>
          #include <iostream>
          #include <conio.h>
          #include <condition_variable>
           
          std::atomic_bool endWork = false;
          std::queue<int>  exampl;
           
          std::condition_variable data_cond;
          std::mutex              mut;
           
          void prepereData(std::queue<int> &exch_data)
          {
            int tmp = 0;
           
            while (!kbhit() || getch() != 27)
            {
              {
                std::lock_guard<std::mutex> lg(mut);
           
                //Computing data
           
                exch_data.push(++tmp);
              }
              data_cond.notify_one();
            }
            endWork = true;
            data_cond.notify_one();
          }
           
          void proccesData(std::queue<int> &exch_data)
          {
            while (!endWork || !exch_data.empty())
            {
              int tmp;
           
              std::unique_lock<std::mutex> lg(mut);
           
              data_cond.wait(lg, [&exch_data] { return !exch_data.empty() || endWork; } );
           
              if (!exch_data.empty())
              {
                tmp = exch_data.front();
                exch_data.pop();
           
                //Draw data
                std::cout << tmp << ' ';
              }
              lg.unlock();
            }
            std::cout << std::endl;
          }
           
          int main()
          {
            //...
            std::thread t2(prepereData, std::ref(exampl));
            std::thread t1(proccesData, std::ref(exampl));
           
            //...
           
            t2.join();
            t1.join();
            return 0;
          }
        Оно работает. Выход по Esc.
          Цитата agapa @
          Может я не тот механизм выбрал для такой задачи? :unsure:

          Может ты и отладишь твой алгоритм, но я бы так никогда не стал делать.
          я делаю так:
          1. Обе потоковые функции, начав работать, останавливаются на функции ожидания
          многих событий. "WaitForMultipleObjects"
          2. Очередь должна быть потокобезопасным объектом. Т.е. синхронизация происходит внутри очереди.
          3. По какому-то событию 1-й поток разбужен, получил/обработал данные, поместил в очередь.
          4. Послал уведомление диспетчеру. Надо как-то взвести событие, которого
          ожидает 2-й поток. Очевидно, что удобнее всего использовать семафор.
          5. 2-й поток начинает работать, извлекает данные из очереди и гасит семафор, обрабатывает данные,выводит итд.
          После чего снова останавливается, ожидая взведения события.
          ---
          Такая структкра приложения максимально асинхронная, потоки никак не связаны между собой.
          Это похоже на конвейер обработки данных.
          В этом случае синхронизация между потоками вообще не нужна - 1-й может уже получить и поместить в очередь
          количество данных существенно больше, чем 2-й обработал. (по каким-то там причинам).
          Но впоследствии, если конкретный комп. вообще способен решить поставленную задачу,
          очередь рассосётся и все данные будут обработаны.
          ---
          Возможны варианты, но это самая удобная и перспективная архитектура.
          ---
          Например, таким образом очень удобно рисовать графику а GUI-приложениях.
          Рабочий поток рисует картинки, отправляет их в очередь, посылает асинхронное
          уведомление (окну) и дальнейшая судьба нарисованного его не волнует.
          Главный поток извлекает картинки из очереди и выводит на экран.
          Сообщение отредактировано: ЫукпШ -
            Цитата agapa @
            Предполагалось, я сформировал данные, поместил в очередь в одном потоке, а во-втором выбрал и очистил очередь и ждет поступления данных . Предполагал, что данный механизм блокировок обеспечит мне необходимую работу программы


            Вынеси в первом потоке "...Compute Data..." за блокировку. Поставь перед lock_guard. У тебя первый поток постоянно блокирует мьютекс и второй не может с ним работать.
            Кроме того, если в вычислениях, которые делай первый поток нет никаких ожидающих операций, то в начале цикла, за блокировкой не мешает сделать std::this_thread::yield() или sleep_for.

            Проблема в том, что у тебя второй поток вычитывает только одно событие из очереди, тогда как за это время первый может туда поместить несколько.
            Нужно во втором потоке полностью очищать очредь. Например так:

            ExpandedWrap disabled
                 .....
                 data_cond.wait(lg,[]{return !exch_data.empty();});
               
                 auto items = std::move(exch_data);
                 for (auto &item: items) .....


            Добавлено
            Цитата agapa @
            lg.unlock();

            Это можно не делать, там и так разблокирутся в деструкторе unique_lock
            Сообщение отредактировано: Олег М -
              Доброе время суток
              Огромное спасибо за Ваши предложения :thanks:

              ЫукпШ, "ДА" с использованием "старого" API все норм работает, но мне ударило в голову, что использования нового стандарта более элегантно. Ну и само мое желание использовать новый стандарт...
              Цитата ЫукпШ @
              ...что удобнее всего использовать семафор.
              - я закрутил на "событиях". Использовать семафор? :unsure:
                Цитата agapa @
                ЫукпШ, "ДА" с использованием "старого" API все норм работает, но мне ударило в голову, что использования нового стандарта более элегантно. Ну и само мое желание использовать новый стандарт...


                В "старом API" работает скорее всего потому, что ты, наверное, используешь событие с ручным сбросом. При использовании события с автоматическим сбросом будет работать точно также.

                Здесь проблема не в апи и не в семафорах (вооббще не представляю зачем они тут нужны), а в том что ты некорректно блокируешь мьютекс - первый поток держит блокировку слишком долго и отпускает на слишком короткое время, за которое второй поток не успевает его заблокировать.

                Блокировки должны быть максимально короткими. В данном случае первый поток должен блокировать мьютекс только для того, чтоб сделать push и notify, второй поток - только для того, чтоб вычитать весь список - move(exch_data) или splice. Остальные операции - computing и draw должны быть вне блокировок.
                Тогда всё будет работать хорошо.

                Добавлено
                И, кстати, во втором потоке нужно вынести std::unique_lock<std::mutex> lg(mut) за цикл (а lg.unlock(); убрать вообще). Тогда может и так заработает.
                А то сейчас получается, что второй поток у тебя блокирует-разблокирует мьютекс вдвое чаще чам первый, соответственно отрабатывает вдвое реже.
                  Цитата Олег М @
                  Блокировки должны быть максимально короткими. В данном случае первый поток должен блокировать мьютекс только для того, чтоб сделать push и notify, второй поток - только для того, чтоб вычитать весь список - move(exch_data) или splice. Остальные операции - computing и draw должны быть вне блокировок.
                  Тогда всё будет работать хорошо.

                  По моему представлению работы проги должно быть так: я рассчитал траекторию, сформировал ОДИН пакет и поместил его в очередь, предварительно залочил, что и делает первый поток. Блокировка должна быть на протяжении этого процесса. В конце итерации цикла мьютекс освобождается.

                  Цитата Олег М @
                  ...первый поток держит блокировку слишком долго и отпускает на слишком короткое время...

                  Может быть надо так :unsure: :
                  ExpandedWrap disabled
                    while (...)
                            {  
                                {
                                    std::lock_guard<std::mutex> lg(mut);
                     
                                    //Computing data      
                     
                                    exch_data.push(tmp);
                                    //Для того, чтобы здесь освободить мьютекс
                                }
                                data_cond.notify_one();
                     
                            }

                  И что значит "на короткое время"?

                  Второй поток должен, захватив мьютекс, считать этот ЕДИНЫЙ пакет и отобразить его. Опять же я хотел бы залочить этот процесс, чтобы в этот момент очередь не заполнялась.

                  Да, "lg.unlock(); " можно и убрать. Это больше для наглядности, для меня.

                  Но... Увы...

                  Но если переписать цикл во втором потоке, как это подсказал Qraizer, с:
                  ExpandedWrap disabled
                    while (true)
                            {
                                if (endWork)
                                    break;
                                //...

                  На
                  ExpandedWrap disabled
                    while (!endWork || !exch_data.empty())
                    {
                      //...
                    }

                  То все работает норм.
                  Но это не соответствует моему представлению (оно может быть слишком притензиозным :unsure: ).
                  Сделал вывод в консоль: сперва работает только первый поток, заполняя энным количеством пакетов очередь, только потом потоки начинаю работать синхронно.
                    Цитата agapa @
                    И что значит "на короткое время"?

                    Второй поток должен, захватив мьютекс, считать этот ЕДИНЫЙ пакет и отобразить его. Опять же я хотел бы залочить этот процесс, чтобы в этот момент очередь не заполнялась.


                    У тебя второй поток разблокирует мьютекс в каждой итерации, и первый поток успевает в это время накидать туда кучу пакетов
                    ExpandedWrap disabled
                      //thread1
                      {
                          while (...)
                          {  
                              //Computing data
                          
                              std::lock_guard<std::mutex> lg(mut);
                              exch_data.push(tmp);
                              data_cond.notify_one();
                          }
                       
                          std::lock_guard<std::mutex> lg(mut);
                          endWork = true;
                          data_cond.notify_one();
                      }
                       
                       
                      //thread2
                      {
                          std::unique_lock<std::mutex> lg(mut);
                          for(;;)
                          {
                              data_cond.wait(lg,[]{return !exch_data.empty() || endWork;});
                              if (endWork)
                                  break;
                       
                              tmp = exch_data.front();
                              exch_data.pop();
                       
                              //Draw data
                          }
                      }


                    Здесь, пока второй поток не перейдёт в режим ожидания, пакеты не будут добавляться

                    Добавлено
                    А вообще, я бы сделал вот так, чтобы Computing не блокировал Draw.
                    ExpandedWrap disabled
                      auto GetItems()
                      {
                          
                          data_cond.wait(lg,[]{return !exch_data.empty() || endWork;});
                          return std::move(exch_data);
                      }
                       
                      //thread2
                      while (!endWork)
                      {
                          auto items = GetItems();
                          for (auto &item: items)
                              //Draw data
                      }
                    Сообщение отредактировано: Олег М -
                      Доброе время суток
                      Цитата Олег М @
                      А вообще, я бы сделал вот так,


                      Спасибо за совет :thanks: Попробую
                        Доброе время суток
                        Цитата Олег М @
                        Здесь, пока второй поток не перейдёт в режим ожидания, пакеты не будут добавляться

                        По эксперементировал с этими вариантами - не выходит у меня: 1-й поток завершается раньше и, в результате, не все данные...
                        ExpandedWrap disabled
                              ...
                              std::lock_guard<std::mutex> lg(mut);
                              endWork = true;
                              data_cond.notify_one();
                              ...

                        Этот момент остался для меня не понятен :unsure: Какой смысл? Как и в выносе локера за пределы цикла.
                        Без
                        ExpandedWrap disabled
                              while (!endWork || !exch_data.empty())
                              {
                                //...
                              }

                        данные теряются.
                          Цитата agapa @
                          данные теряются.


                          Данные лежат в очереди. Теряются потому что ты сначала проверяешь endWork и, если она true, сразу выходишь. Если сначала вычитать очередь, а потом выходить, ничего не потеряется.
                          Кстати, у меня во втором случае, где GetItems() ничего теряться не должно - сначала обрабатывается очередь, потом выходит из цикла.

                          Добавлено
                          Цитата agapa @
                          Этот момент остался для меня не понятен Какой смысл? Как и в выносе локера за пределы цикла.


                          Потому что у тебя второй поток спит в wait(). Если при завершении 1-го потока не сделать notify, то он там так и зависнет.

                          Добавлено
                          За пределы цикла выносится, чтоб второй поток разблокировал мьютекс только во время ожидания новых данных. Соответственно первый поток будет добалять данные только когда второй ничего не делает.
                          И здесь, чтоб данные не терялись достаточно обработать их после выхода из цикла.
                          Сообщение отредактировано: Олег М -
                            Благодарю за разъяснение :thanks:
                            0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
                            0 пользователей:


                            Рейтинг@Mail.ru
                            [ Script execution time: 0,0548 ]   [ 17 queries used ]   [ Generated: 19.03.24, 14:03 GMT ]