На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
Модераторы: Qraizer, Hsilgos
Страницы: (3) [1] 2 3  все  ( Перейти к последнему сообщению )  
> Вопрос по работе std::promise и std::shared_future
    Всем привет.
    Есть следующий код(приведен ниже). Он работает, претензий к нему пока нет.
    Но у меня возникает ощущение что тут что то не ладное.
    Вообще это вопрос больше по тому - правильно ли тут все организовано?

    Немного опишу алгоритм:
    Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков.
    Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция
    void dump(std::promise<void>&& promise)
    В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", и дальше продолжает из другой очереди сохранять значения в файл, а остальные потоки, включая того, кто вызвал Flush продолжают забивать основную очередь данными.

    По тестам проблем нет никаких, как я только не извращался. Но у меня есть подозрение, что тут кроется ошибка. В частности, так как функцию Flush могут вызывать много потоков, не может ли произойти так, что пока поток shadow выгребает вторую очередь(резервную скажем так) - вдруг главная очередь забьется до предела, и получится так, что какой нибудь поток еще раз породит поток shadow, в котором произойдет гонка или еще что нибудь похуже?

    Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться.
    Ну или может тут другие косяки есть какие нибудь?

    ExpandedWrap disabled
          std::wofstream fout; //!  <<< Член класса
       
          void Flush(std::wstring&& msg)
          {
              std::unique_lock<std::mutex> lk(m_flushmtx);
       
              m_queue.push(msg);
       
              std::cout << m_queue.size() << std::endl;
              if (m_queue.size() >= 1000)
              {
                  std::promise<void> promise;
                  auto fut = promise.get_future();
                  auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise));
                  try
                  {
                      fut.wait();
                  }
                  catch (const std::exception& error)
                  {
                      std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl;
                  }
      //          shadow.wait();
              }
          }
       
          void dump(std::promise<void>&& promise)
          {
              std::unique_lock<std::mutex> guard(m_dumpmtx);
              m_queue.swap(m_second_queue);
              promise.set_value();
       
              fout.open("promise.log", std::ios_base::out | std::ios_base::app);
              while (m_second_queue.size() > 0u)
              {
                  std::wstring val;
                  if (m_second_queue.try_pop(val))
                      fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val;
              }
              fout.flush();
          }


    На всякий случай скину тестовый пример:
    ExpandedWrap disabled
      #include <concurrent_queue.h>
      #include <iostream>
      #include <fstream>
      #include <thread>
      #include <chrono>
      #include <algorithm>
      #include <mutex>
      #include <future>
      #include <atomic>
      #include <sstream>
      #include <string>
       
      using namespace std::chrono_literals;
       
      class Flusher
      {
      public:
          Flusher()
          : m_counter(1)
          {
              m_fout.open("promise.log", std::ios_base::out | std::ios_base::trunc);
              m_fout << "Start log." << std::endl;
              m_fout << "==========================================" << std::endl;
          }
       
          ~Flusher()
          {
              m_fout.close();
          }
       
          void generate_value()
          {
              while (m_counter.load() <= 10000)
              {
                  std::lock_guard<std::mutex> lk(m_mtx);
                  std::wostringstream stream;
                  stream << L"Thread: " << std::this_thread::get_id() << L" put value: " << m_counter.load() << std::endl;
                  Flush(std::move(stream.str()));
                  ++m_counter;
              }
          }
       
          void Flush(std::wstring&& msg)
          {
              std::unique_lock<std::mutex> lk(m_flushmtx);
       
              m_queue.push(msg);
       
              std::cout << m_queue.unsafe_size() << std::endl;
       
              if (m_queue.unsafe_size() >= 1000)
              {
                  std::promise<void> promise;
                  auto fut = promise.get_future();
                  auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise));
                  try
                  {
                      fut.wait();
                  }
                  catch (const std::exception& error)
                  {
                      std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl;
                  }
                  //          shadow.wait();
              }
          }
       
          void dump(std::promise<void>&& promise)
          {
              std::unique_lock<std::mutex> guard(m_dumpmtx);
       
              std::for_each(m_queue.unsafe_begin(), m_queue.unsafe_end(), [this](auto&& item) { this->m_second_queue.push(std::move(item)); });
              m_queue.clear();
       
              promise.set_value();
       
              while (m_second_queue.unsafe_size() > 0u)
              {
                  std::wstring val;
                  if (m_second_queue.try_pop(val))
                      m_fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val;
              }
              m_fout.flush();
          }
       
      private:
          concurrency::concurrent_queue<std::wstring> m_queue;
          concurrency::concurrent_queue<std::wstring> m_second_queue;
          std::atomic_int m_counter;
          std::mutex m_mtx;
          std::mutex m_flushmtx;
          std::mutex m_dumpmtx;
          std::wofstream m_fout;
      };
       
      void task(Flusher& fl)
      {
          fl.generate_value();
      }
       
      int main()
      {
          Flusher fl;
       
          std::vector<std::thread> threads;
       
          threads.emplace_back(task, std::ref(fl));
          threads.emplace_back(task, std::ref(fl));
          threads.emplace_back(task, std::ref(fl));
          
          std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); });
       
          std::cout << "Done...";
          std::cin.get();
          return 0;
      }
    Сообщение отредактировано: Wound -
      Цитата Wound @
      Немного опишу алгоритм:
      Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков.
      Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция
      void dump(std::promise<void>&& promise)
      В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов",

      Как-то очень сложно и не совсем понятно - зачем.
      1. Странно, что сначала объект помещается в очередь, а потом проверка на размер.
      Разумнее сделать наоборот.
      Можно такой проверки снаружи очереди вообще не делать.
      Функция добавления возвращает FALSE, получаем от очереди GetLastError и выясняем, что очередь переполнена.
      2. Вообще-то это критическая ситуация. Что можно сделать ?
      3. До какого-то предельного размера можно увеличивать размер самой очереди.
      Этот момент - увеличения размера - просто потребует дополнительного времени и всё.
      4. Изначально создать очередь значительных размеров.
      Чтобы она не занимала много места, перейди к очереди указателей на объекты.
      Получишь и другой бонус - время доступа к объекту в потокобезопасной очереди резко снизится.
      Не будут копироваться сами объекты, будут копироваться только указатели.
      ---
      Фактически, для доступа к объекту средства синхронизации использоваться не будут.
      Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума.
      Сообщение отредактировано: ЫукпШ -
        Цитата ЫукпШ @
        Как-то очень сложно и не совсем понятно - зачем.

        А что конкретно не понятно то?

        Просто выгрузка очереди в файл идет в другом потоке, чтоб остальные не тормозить забивкой очереди. По поводу контейнера - можно взять например и список, вопрос не в контейнере. Вопрос в самом алгоритме как бы.

        Цитата ЫукпШ @
        1. Странно, что сначала объект помещается в очередь, а потом проверка на размер.
        Разумнее сделать наоборот.

        Да это не важно сначало идет проверка или потом, это детали, я это сделал перед выкладкой на форум, потому как мне показалось логичнее сначало ложить данные в очередь, а потом проверять - достигла ли она нужного размера или нет. Ну типа в этом случае нет лишней проверки.

        Цитата ЫукпШ @
        Фактически, для доступа к объекту средства синхронизации использоваться не будут.
        Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума.

        В смысле?
          Цитата Wound @
          В смысле?

          Так ты подумай сам.
          Сначала объектом владеет какой-то поток, заполняет его данными.
          Затем указатель помещается в очередь, и именно она владеет объектом.
          Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов.
          Сам объект никуда не копируется. Поэтому затраты времени минимальны.
          После чего, допустим, другой поток получит этот указатель.
          Теперь этот указатель есть только у него, значит никакие средства синхронизации для
          доступа к объекту не нужны. И не используются.
          ---
          Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать
          внутреннее функционирование самой очереди.
          ---
          Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку
          таким способом можно обмениваться объектами неограниченных размеров.
          Сообщение отредактировано: ЫукпШ -
            Цитата ЫукпШ @
            Сначала объектом владеет какой-то поток, заполняет его данными.
            Затем указатель помещается в очередь, и именно она владеет объектом.
            Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов.
            Сам объект никуда не копируется. Поэтому затраты времени минимальны.
            После чего, допустим, другой поток получит этот указатель.
            Теперь этот указатель есть только у него, значит никакие средства синхронизации для
            доступа к объекту не нужны. И не используются.
            ---
            Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать
            внутреннее функционирование самой очереди.
            ---
            Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку
            таким способом можно обмениваться объектами неограниченных размеров.

            ИМХО, слишком сложно ты описал. Это поле для бесконечных ошибок. У меня объект - строка, плюс ко всему не совсем я уверен что указатели потокобезопасные. Плюс какие гарантии что при перемещении объекта в очередь, он не понадобиться еще там где использовался?

            Добавлено
            Тут придется использовать умные указатели, с семантикой перемещения - std::unique_ptr, а он не потокобезопасный, на сколько я знаю. Так что придется всеравно задействовать средства синхронизации.
              Цитата Wound @
              Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться.
              Ну или может тут другие косяки есть какие нибудь?



              Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
              А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.
                Цитата Олег М @
                Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
                А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.

                Ясненько, спасибо.
                Просто я хотел еще попутно понять как работает promise + shared_future, как работает promise сам по себе примерно понимаю, но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата?
                  Цитата Wound @
                  но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата?

                  Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно.
                    Цитата Олег М @
                    Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно.

                    Я понял, спасибо.

                    Добавлено
                    Просто я тут книжку одну читаю, там всюду этот promise мелькает по поводу и без, я так подумал видимо очень хорошая вещь. Вот решил тоже попробовать что за зверь.

                    Добавлено
                    Цитата Олег М @
                    В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
                    А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.

                    Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный?
                    Там общая файловая переменная фигурирует, но ведь если мы делаем wait до вызова async, значит там и не будет ведь запуска нескольких потоков?
                    ExpandedWrap disabled
                          void Logger::Flush(std::wstring&& msg)
                          {
                              std::unique_lock<std::mutex> lk(m_flushmtx);
                       
                              m_log.push(msg);
                       
                              if (m_log.size() >= m_cachesize)
                              {
                                  if (m_shadow_flush.valid())
                                  {
                                      try
                                      {
                                          m_shadow_flush.wait();
                                      }
                                      catch (const std::exception& error)
                                      {
                                          std::cout << "Error during flush queue: " << error.what() << std::endl;
                                      }
                                  }
                                  m_shadow_flush = std::async(std::launch::async, &Logger::dump, this, std::move(m_log));
                              }
                          }
                       
                          void Logger::dump(Msi::Queue<std::wstring>&& queue)
                          {
                              //std::unique_lock<std::mutex> guard(m_dumpmtx); //! <<<<<<< Бессмысленный гард ведь?
                              while (queue.size() > 0u)
                              {
                                  std::wstring val;
                                  if (queue.try_pop(val))
                                  {
                                      m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val;
                                  }
                              }
                              m_fout.flush();
                          }
                      Цитата Wound @
                      Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный?


                      Ну да, не нужен. У тебя сейчас dump вызывается синхронно, под блокировкой m_flushmtx.

                      Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл.
                        Цитата Олег М @
                        Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл.

                        Так ведь thread тогда придется делать локальным, либо задавать какой нибудь defered launch(у него вообще есть такой режим?), плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. Ну и std::async с флагом std::launch::async - тот же thread, разве нет?
                          Цитата Wound @
                          ак ведь thread тогда придется делать локальным,

                          Ну да, сделай. Что в этом плохого.

                          Цитата Wound @
                          плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать.

                          А сейчас ты как его обрабатываешь? Делайшь try-catch внутри функции потока и всё.


                          Цитата Wound @
                          Ну и std::async с флагом std::launch::async - тот же thread, разве нет


                          Не совсем. Async - это thread pool, немного для других задач. В твоём случае проще и эффективнее использовать thread. Скорее всего у тебя будет один экземпляр Logger на всех, нафига тебе там пул потоков?
                          Кроме того, у тебя сейчас по-любому dump() должен отрабатывать синхронно. Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке.
                          Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица.
                            Цитата Олег М @
                            Ну да, сделай. Что в этом плохого.

                            Он запустит новый поток при вызове конструктора класса Log.

                            Цитата Олег М @
                            Не совсем. Async - это thread pool, немного для других задач.

                            У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения.

                            Цитата Олег М @
                            Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке.
                            Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица.

                            Хм...

                            Добавлено
                            ты имеешь ввиду вызвать что то типа такого в конструкторе?
                            ExpandedWrap disabled
                                          std::thread([this]
                                          {
                                              std::unique_lock<std::mutex> lk(m_flushmtx);
                                              m_cond_flush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; });
                                              this->dump(std::move(m_log));
                                          });
                              Цитата Wound @
                              Он запустит новый поток при вызове конструктора класса Log.

                              Обычно логгеров в приложении один, от силы два. В чём проблема.

                              Цитата Wound @
                              У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения.

                              О предпочтительности можно говорить только в контексте конкретной задачи. В твоём случае thread предпочтительнее.


                              Под обработкой исключении ты имеешь ввиду это? Если да, то что тебе мешает сделать то же самое в функции потока?
                              Цитата Wound @
                              std::cout << "Error during flush queue: " << error.what() << std::endl;
                                Цитата Олег М @
                                Обычно логгеров в приложении один, от силы два. В чём проблема.

                                Да в принципе не в чем, я щас глянул, у меня у логгера все конструкторы, кроме одного удалены.

                                Ладно, сейчас попробую прикрутить, посмотрим что получится. Возможно ты и прав.

                                Добавлено
                                Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB.
                                0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
                                0 пользователей:
                                Страницы: (3) [1] 2 3  все


                                Рейтинг@Mail.ru
                                [ Script execution time: 0,0677 ]   [ 17 queries used ]   [ Generated: 28.03.24, 20:43 GMT ]