На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
Модераторы: Qraizer, Hsilgos
  
> Вопрос по работе std::promise и std::shared_future
    Всем привет.
    Есть следующий код(приведен ниже). Он работает, претензий к нему пока нет.
    Но у меня возникает ощущение что тут что то не ладное.
    Вообще это вопрос больше по тому - правильно ли тут все организовано?

    Немного опишу алгоритм:
    Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков.
    Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция
    void dump(std::promise<void>&& promise)
    В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", и дальше продолжает из другой очереди сохранять значения в файл, а остальные потоки, включая того, кто вызвал Flush продолжают забивать основную очередь данными.

    По тестам проблем нет никаких, как я только не извращался. Но у меня есть подозрение, что тут кроется ошибка. В частности, так как функцию Flush могут вызывать много потоков, не может ли произойти так, что пока поток shadow выгребает вторую очередь(резервную скажем так) - вдруг главная очередь забьется до предела, и получится так, что какой нибудь поток еще раз породит поток shadow, в котором произойдет гонка или еще что нибудь похуже?

    Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться.
    Ну или может тут другие косяки есть какие нибудь?

    ExpandedWrap disabled
          std::wofstream fout; //!  <<< Член класса
       
          void Flush(std::wstring&& msg)
          {
              std::unique_lock<std::mutex> lk(m_flushmtx);
       
              m_queue.push(msg);
       
              std::cout << m_queue.size() << std::endl;
              if (m_queue.size() >= 1000)
              {
                  std::promise<void> promise;
                  auto fut = promise.get_future();
                  auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise));
                  try
                  {
                      fut.wait();
                  }
                  catch (const std::exception& error)
                  {
                      std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl;
                  }
      //          shadow.wait();
              }
          }
       
          void dump(std::promise<void>&& promise)
          {
              std::unique_lock<std::mutex> guard(m_dumpmtx);
              m_queue.swap(m_second_queue);
              promise.set_value();
       
              fout.open("promise.log", std::ios_base::out | std::ios_base::app);
              while (m_second_queue.size() > 0u)
              {
                  std::wstring val;
                  if (m_second_queue.try_pop(val))
                      fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val;
              }
              fout.flush();
          }


    На всякий случай скину тестовый пример:
    ExpandedWrap disabled
      #include <concurrent_queue.h>
      #include <iostream>
      #include <fstream>
      #include <thread>
      #include <chrono>
      #include <algorithm>
      #include <mutex>
      #include <future>
      #include <atomic>
      #include <sstream>
      #include <string>
       
      using namespace std::chrono_literals;
       
      class Flusher
      {
      public:
          Flusher()
          : m_counter(1)
          {
              m_fout.open("promise.log", std::ios_base::out | std::ios_base::trunc);
              m_fout << "Start log." << std::endl;
              m_fout << "==========================================" << std::endl;
          }
       
          ~Flusher()
          {
              m_fout.close();
          }
       
          void generate_value()
          {
              while (m_counter.load() <= 10000)
              {
                  std::lock_guard<std::mutex> lk(m_mtx);
                  std::wostringstream stream;
                  stream << L"Thread: " << std::this_thread::get_id() << L" put value: " << m_counter.load() << std::endl;
                  Flush(std::move(stream.str()));
                  ++m_counter;
              }
          }
       
          void Flush(std::wstring&& msg)
          {
              std::unique_lock<std::mutex> lk(m_flushmtx);
       
              m_queue.push(msg);
       
              std::cout << m_queue.unsafe_size() << std::endl;
       
              if (m_queue.unsafe_size() >= 1000)
              {
                  std::promise<void> promise;
                  auto fut = promise.get_future();
                  auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise));
                  try
                  {
                      fut.wait();
                  }
                  catch (const std::exception& error)
                  {
                      std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl;
                  }
                  //          shadow.wait();
              }
          }
       
          void dump(std::promise<void>&& promise)
          {
              std::unique_lock<std::mutex> guard(m_dumpmtx);
       
              std::for_each(m_queue.unsafe_begin(), m_queue.unsafe_end(), [this](auto&& item) { this->m_second_queue.push(std::move(item)); });
              m_queue.clear();
       
              promise.set_value();
       
              while (m_second_queue.unsafe_size() > 0u)
              {
                  std::wstring val;
                  if (m_second_queue.try_pop(val))
                      m_fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val;
              }
              m_fout.flush();
          }
       
      private:
          concurrency::concurrent_queue<std::wstring> m_queue;
          concurrency::concurrent_queue<std::wstring> m_second_queue;
          std::atomic_int m_counter;
          std::mutex m_mtx;
          std::mutex m_flushmtx;
          std::mutex m_dumpmtx;
          std::wofstream m_fout;
      };
       
      void task(Flusher& fl)
      {
          fl.generate_value();
      }
       
      int main()
      {
          Flusher fl;
       
          std::vector<std::thread> threads;
       
          threads.emplace_back(task, std::ref(fl));
          threads.emplace_back(task, std::ref(fl));
          threads.emplace_back(task, std::ref(fl));
          
          std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); });
       
          std::cout << "Done...";
          std::cin.get();
          return 0;
      }
    Сообщение отредактировано: Wound -
      Цитата Wound @
      Немного опишу алгоритм:
      Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков.
      Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция
      void dump(std::promise<void>&& promise)
      В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов",

      Как-то очень сложно и не совсем понятно - зачем.
      1. Странно, что сначала объект помещается в очередь, а потом проверка на размер.
      Разумнее сделать наоборот.
      Можно такой проверки снаружи очереди вообще не делать.
      Функция добавления возвращает FALSE, получаем от очереди GetLastError и выясняем, что очередь переполнена.
      2. Вообще-то это критическая ситуация. Что можно сделать ?
      3. До какого-то предельного размера можно увеличивать размер самой очереди.
      Этот момент - увеличения размера - просто потребует дополнительного времени и всё.
      4. Изначально создать очередь значительных размеров.
      Чтобы она не занимала много места, перейди к очереди указателей на объекты.
      Получишь и другой бонус - время доступа к объекту в потокобезопасной очереди резко снизится.
      Не будут копироваться сами объекты, будут копироваться только указатели.
      ---
      Фактически, для доступа к объекту средства синхронизации использоваться не будут.
      Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума.
      Сообщение отредактировано: ЫукпШ -
        Цитата ЫукпШ @
        Как-то очень сложно и не совсем понятно - зачем.

        А что конкретно не понятно то?

        Просто выгрузка очереди в файл идет в другом потоке, чтоб остальные не тормозить забивкой очереди. По поводу контейнера - можно взять например и список, вопрос не в контейнере. Вопрос в самом алгоритме как бы.

        Цитата ЫукпШ @
        1. Странно, что сначала объект помещается в очередь, а потом проверка на размер.
        Разумнее сделать наоборот.

        Да это не важно сначало идет проверка или потом, это детали, я это сделал перед выкладкой на форум, потому как мне показалось логичнее сначало ложить данные в очередь, а потом проверять - достигла ли она нужного размера или нет. Ну типа в этом случае нет лишней проверки.

        Цитата ЫукпШ @
        Фактически, для доступа к объекту средства синхронизации использоваться не будут.
        Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума.

        В смысле?
          Цитата Wound @
          В смысле?

          Так ты подумай сам.
          Сначала объектом владеет какой-то поток, заполняет его данными.
          Затем указатель помещается в очередь, и именно она владеет объектом.
          Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов.
          Сам объект никуда не копируется. Поэтому затраты времени минимальны.
          После чего, допустим, другой поток получит этот указатель.
          Теперь этот указатель есть только у него, значит никакие средства синхронизации для
          доступа к объекту не нужны. И не используются.
          ---
          Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать
          внутреннее функционирование самой очереди.
          ---
          Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку
          таким способом можно обмениваться объектами неограниченных размеров.
          Сообщение отредактировано: ЫукпШ -
            Цитата ЫукпШ @
            Сначала объектом владеет какой-то поток, заполняет его данными.
            Затем указатель помещается в очередь, и именно она владеет объектом.
            Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов.
            Сам объект никуда не копируется. Поэтому затраты времени минимальны.
            После чего, допустим, другой поток получит этот указатель.
            Теперь этот указатель есть только у него, значит никакие средства синхронизации для
            доступа к объекту не нужны. И не используются.
            ---
            Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать
            внутреннее функционирование самой очереди.
            ---
            Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку
            таким способом можно обмениваться объектами неограниченных размеров.

            ИМХО, слишком сложно ты описал. Это поле для бесконечных ошибок. У меня объект - строка, плюс ко всему не совсем я уверен что указатели потокобезопасные. Плюс какие гарантии что при перемещении объекта в очередь, он не понадобиться еще там где использовался?

            Добавлено
            Тут придется использовать умные указатели, с семантикой перемещения - std::unique_ptr, а он не потокобезопасный, на сколько я знаю. Так что придется всеравно задействовать средства синхронизации.
              Цитата Wound @
              Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться.
              Ну или может тут другие косяки есть какие нибудь?



              Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
              А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.
                Цитата Олег М @
                Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
                А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.

                Ясненько, спасибо.
                Просто я хотел еще попутно понять как работает promise + shared_future, как работает promise сам по себе примерно понимаю, но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата?
                  Цитата Wound @
                  но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата?

                  Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно.
                    Цитата Олег М @
                    Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно.

                    Я понял, спасибо.

                    Добавлено
                    Просто я тут книжку одну читаю, там всюду этот promise мелькает по поводу и без, я так подумал видимо очень хорошая вещь. Вот решил тоже попробовать что за зверь.

                    Добавлено
                    Цитата Олег М @
                    В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....).
                    А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе.

                    Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный?
                    Там общая файловая переменная фигурирует, но ведь если мы делаем wait до вызова async, значит там и не будет ведь запуска нескольких потоков?
                    ExpandedWrap disabled
                          void Logger::Flush(std::wstring&& msg)
                          {
                              std::unique_lock<std::mutex> lk(m_flushmtx);
                       
                              m_log.push(msg);
                       
                              if (m_log.size() >= m_cachesize)
                              {
                                  if (m_shadow_flush.valid())
                                  {
                                      try
                                      {
                                          m_shadow_flush.wait();
                                      }
                                      catch (const std::exception& error)
                                      {
                                          std::cout << "Error during flush queue: " << error.what() << std::endl;
                                      }
                                  }
                                  m_shadow_flush = std::async(std::launch::async, &Logger::dump, this, std::move(m_log));
                              }
                          }
                       
                          void Logger::dump(Msi::Queue<std::wstring>&& queue)
                          {
                              //std::unique_lock<std::mutex> guard(m_dumpmtx); //! <<<<<<< Бессмысленный гард ведь?
                              while (queue.size() > 0u)
                              {
                                  std::wstring val;
                                  if (queue.try_pop(val))
                                  {
                                      m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val;
                                  }
                              }
                              m_fout.flush();
                          }
                      Цитата Wound @
                      Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный?


                      Ну да, не нужен. У тебя сейчас dump вызывается синхронно, под блокировкой m_flushmtx.

                      Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл.
                        Цитата Олег М @
                        Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл.

                        Так ведь thread тогда придется делать локальным, либо задавать какой нибудь defered launch(у него вообще есть такой режим?), плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. Ну и std::async с флагом std::launch::async - тот же thread, разве нет?
                          Цитата Wound @
                          ак ведь thread тогда придется делать локальным,

                          Ну да, сделай. Что в этом плохого.

                          Цитата Wound @
                          плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать.

                          А сейчас ты как его обрабатываешь? Делайшь try-catch внутри функции потока и всё.


                          Цитата Wound @
                          Ну и std::async с флагом std::launch::async - тот же thread, разве нет


                          Не совсем. Async - это thread pool, немного для других задач. В твоём случае проще и эффективнее использовать thread. Скорее всего у тебя будет один экземпляр Logger на всех, нафига тебе там пул потоков?
                          Кроме того, у тебя сейчас по-любому dump() должен отрабатывать синхронно. Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке.
                          Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица.
                            Цитата Олег М @
                            Ну да, сделай. Что в этом плохого.

                            Он запустит новый поток при вызове конструктора класса Log.

                            Цитата Олег М @
                            Не совсем. Async - это thread pool, немного для других задач.

                            У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения.

                            Цитата Олег М @
                            Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке.
                            Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица.

                            Хм...

                            Добавлено
                            ты имеешь ввиду вызвать что то типа такого в конструкторе?
                            ExpandedWrap disabled
                                          std::thread([this]
                                          {
                                              std::unique_lock<std::mutex> lk(m_flushmtx);
                                              m_cond_flush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; });
                                              this->dump(std::move(m_log));
                                          });
                              Цитата Wound @
                              Он запустит новый поток при вызове конструктора класса Log.

                              Обычно логгеров в приложении один, от силы два. В чём проблема.

                              Цитата Wound @
                              У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения.

                              О предпочтительности можно говорить только в контексте конкретной задачи. В твоём случае thread предпочтительнее.


                              Под обработкой исключении ты имеешь ввиду это? Если да, то что тебе мешает сделать то же самое в функции потока?
                              Цитата Wound @
                              std::cout << "Error during flush queue: " << error.what() << std::endl;
                                Цитата Олег М @
                                Обычно логгеров в приложении один, от силы два. В чём проблема.

                                Да в принципе не в чем, я щас глянул, у меня у логгера все конструкторы, кроме одного удалены.

                                Ладно, сейчас попробую прикрутить, посмотрим что получится. Возможно ты и прав.

                                Добавлено
                                Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB.
                                  Цитата Wound @
                                  Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB.

                                  Он стартанёт, когда ты его стартуешь, m_thread = std::thread(&ThreadProc.....), ни раньше ни позже.
                                    Ок, вот так получилось, вроде работает. Пожалуй на этом и остановлюсь. Надеюсь тут больше нет косяков?
                                    ExpandedWrap disabled
                                      //Constructor
                                          Logger::Logger(const std::wstring& logname, unsigned int cachesize)
                                          : m_cachesize(cachesize),
                                            m_logname(logname),
                                            m_isFinished(false)
                                          {
                                              openlog(m_logname);
                                              m_shadow_flusher = std::thread([this]
                                              {
                                                  while (!m_isFinished)
                                                  {
                                                      std::unique_lock<std::mutex> lk(m_flushmtx);
                                                      m_isflush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; });
                                                      this->dump(std::move(m_log));
                                                  }
                                              });
                                          }
                                       
                                          void Logger::Flush(std::wstring&& msg)
                                          {
                                              std::unique_lock<std::mutex> lk(m_flushmtx);
                                              m_log.push(msg);
                                              if (m_log.size() >= m_cachesize)
                                                  m_isflush.notify_one();
                                          }
                                       
                                          void Logger::dump(Msi::Queue<std::wstring>&& queue)
                                          {
                                              try
                                              {
                                                  while (queue.size() > 0u)
                                                  {
                                                      std::wstring val;
                                                      if (queue.try_pop(val))
                                                      {
                                                          m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val;
                                                      }
                                                  }
                                                  m_fout.flush();
                                              }
                                              catch (std::exception& err)
                                              {
                                                  std::cerr << "Error: " << err.what() << std::endl;
                                              }
                                          }
                                       
                                          Logger::~Logger()
                                          {
                                              m_isFinished.store(true);
                                              m_isflush.notify_one();
                                              m_shadow_flusher.join();
                                       
                                              m_fout.close();
                                          }
                                      Цитата Wound @
                                            m_shadow_flusher = std::thread([this]
                                              {
                                                  while (!m_isFinished)
                                                  {
                                                      std::unique_lock<std::mutex> lk(m_flushmtx);
                                                      m_isflush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; });
                                                      this->dump(std::move(m_log));
                                                  }
                                              });

                                      1. dump() должен вызываться без блокировки.
                                      2. m_cachesize вообще больше не нужен. Событие можно взводить на каждый вызов Flush(), это дешёвая операция

                                      ExpandedWrap disabled
                                        m_shadow_flusher = std::thread([this]
                                        {
                                            while (!m_isFinished)
                                                this->dump(WaitMessages());
                                         
                                             std::lock_guard lock(m_flushmtx);
                                             this->dump(std::move(m_log));
                                        });
                                         
                                         
                                        inline
                                        auto Logger::WaitMessages()
                                        {
                                            std::unique_lock<std::mutex> lk(m_flushmtx);
                                            m_isflush.wait(lk, [this] {return !m_log.empty() || m_isFinished; });
                                            return std::move(m_log);
                                        }


                                      3. m_isFinished можно сделать просто volatile bool. В деструкторе его нужно взводить под блокировкой
                                      ExpandedWrap disabled
                                        {
                                            std::lock_guard lock(m_flushmtx);
                                            m_isFinished  = true;
                                            m_isflush.notify_all();
                                        }
                                        m_shadow_flusher.join();
                                        Цитата Олег М @
                                        3. m_isFinished можно сделать просто volatile bool. В деструкторе его нужно взводить под блокировкой

                                        Он у меня как std::atomic_bool
                                          Цитата Wound @
                                          Он у меня как std::atomic_bool

                                          Можно без atomic
                                            Цитата Олег М @
                                            Можно без atomic

                                            Так atomic вроде дешевле блокировки.

                                            Добавлено
                                            И мне не совсем понятно зачем тут два раза dump вызывать? Ведь все равно WaitMessage все сделает.
                                            ExpandedWrap disabled
                                              m_shadow_flusher = std::thread([this]
                                              {
                                                  while (!m_isFinished)
                                                      this->dump(WaitMessages());
                                               
                                                   std::lock_guard lock(m_flushmtx);
                                                   this->dump(std::move(m_log));
                                              });
                                              Цитата Wound @
                                              Так atomic вроде дешевле блокировки.


                                              Начитывается он в любом случае под блокировкой, так что ничего у тебя там не дешевле.

                                              И, в общем случае, устанавливать такие переменные надо под блокировкой - там две операции =true b notify_all(), их надо синхронизировать с чтением

                                              Добавлено
                                              Цитата Wound @
                                              И мне не совсем понятно зачем тут два раза dump вызывать? Ведь все равно WaitMessage все сделает.


                                              Да, возможно. На всякиий случай, хуже точно не будет.
                                                Цитата Олег М @
                                                Начитывается он в любом случае под блокировкой, так что ничего у тебя там не дешевле.

                                                И, в общем случае, устанавливать такие переменные надо под блокировкой - там две операции =true b notify_all(), их надо синхронизировать с чтением

                                                На сколько я знаю использование std::atomic накладывает ограничение на порядок выполнения операций. Т.е. в данном случае notify_all буквально всегда обязано выполнятся только после присваивания значения atomic переменной. По крайней мере так написано у майерса:
                                                Цитата

                                                Однако применение s t d : : a t o m i c накладывает ограничения на разрешенные переупорядочения
                                                кода, и одно такое ограничение заключается в том, что никакой код, предшествующий
                                                в исходном тексте записи переменной s t d : : а t omi с, не может иметь место
                                                (или выглядеть таковым для друтих ядер) после нее7. Это означает, что в нашем коде
                                                auto imptValue = compute importantVa l ue ( ) ; / / Вычисление значения
                                                valAvai l aЫe = true ; 11 Сообщение об этом
                                                11 другому потоку
                                                компиляторы должны не только сохранять порядок присваиваний i m p t V a l u e
                                                и valAva i laЬle, но и генерировать код, который гарантирует, что так же поведет себя
                                                и аппаратное обеспечение. В результате объявление v a lAva i l a Ы e как s t d : : at o m i c
                                                гарантирует выполнение критичного требования к упорядоченности - что значение
                                                imptVal ue должно быть видимо всеми потоками как измененное не позже, чем значение
                                                valAva i l aЬle.

                                                Единственное - это файл может закрыться раньше, чем dump вызвался, но у нас ведь гарантированно 1 поток?

                                                Добавлено
                                                Ладно попробую переписать, посмотрю что получится.
                                                Сообщение отредактировано: Wound -
                                                  Цитата Wound @
                                                  На сколько я знаю использование std::atomic накладывает ограничение на порядок выполнения операция. Т.е. в данном случае notify_all буквально всегда обязано выполнятся только после присваивания значения atomic переменной. По крайней мере так написано у майерса:

                                                  notify_all в любом случае вызовется после присвоения значения. Здесь, в принципе, можно и без блокировки.
                                                  Я говорил про общий случай - лучше выставление значении всегда синхронизировать с проверкой условий в потоке. Проблем будет меньше.


                                                  Цитата Wound @
                                                  Единственное - это файл может закрыться раньше, чем dump вызвался, но у нас ведь гарантированно 1 поток?

                                                  Не может. Ты join делаешь перед закрытием файла.
                                                  Сообщение отредактировано: Олег М -
                                                    Цитата Олег М @
                                                    Я говорил про общий случай - лучше выставление значении всегда синхронизировать с проверкой условий в потоке. Проблем будет меньше.

                                                    А, ну тогда понял.

                                                    Цитата Олег М @
                                                    Не может. Ты join делаешь перед закрытием файла.

                                                    Ну да в принципе.

                                                    Ну вроде все работает с последними изменениями. Спасибо за помощь Олег М.

                                                    Добавлено
                                                    На всякий случай выложу что получилось, может быть кому пригодится:
                                                    ExpandedWrap disabled
                                                      #ifndef __LOGGER_H_INCLUDED__
                                                      #define __LOGGER_H_INCLUDED__
                                                       
                                                      #include <sstream>
                                                      #include <mutex>
                                                      #include <fstream>
                                                      #include <memory>
                                                      #include <atomic>
                                                      #include <future>
                                                      #include "MsiQueue.h"
                                                       
                                                      namespace Msi
                                                      {
                                                         class Logger;
                                                         class UnsafeLogger
                                                         {
                                                         public:
                                                            UnsafeLogger(Logger& logger);
                                                            UnsafeLogger(UnsafeLogger&& log);
                                                            ~UnsafeLogger();
                                                       
                                                         public:
                                                            UnsafeLogger() = delete;
                                                            UnsafeLogger(const UnsafeLogger& logger) = delete;
                                                            UnsafeLogger& operator=(UnsafeLogger&&) = delete;
                                                            UnsafeLogger& operator=(const UnsafeLogger&) = delete;
                                                       
                                                         public:
                                                            template <typename T>
                                                            UnsafeLogger& operator <<(T &&val);
                                                       
                                                            auto& operator <<(std::wostream& (*pfmanip)(std::wostream&))
                                                            {
                                                               m_stream << pfmanip;
                                                               return *this;
                                                            }
                                                       
                                                            auto& operator <<(UnsafeLogger& (*pfmanip)(UnsafeLogger&))
                                                            {
                                                               return pfmanip(*this);
                                                            }
                                                       
                                                         public:
                                                            void Flush() noexcept;
                                                       
                                                         private:
                                                            Logger& m_logger;
                                                            std::wostringstream m_stream;
                                                         };
                                                       
                                                         class Logger
                                                         {
                                                         public:
                                                            explicit Logger(const std::wstring& logname);
                                                            ~Logger();
                                                       
                                                         public:
                                                            Logger(const Logger&) = delete;
                                                            Logger(Logger&&) = delete;
                                                            Logger& operator=(const Logger&) = delete;
                                                            Logger& operator=(Logger&&) = delete;
                                                       
                                                            UnsafeLogger Log();
                                                            void Flush(std::wstring&& msg);
                                                         private:
                                                            
                                                            void openlog(const std::wstring& logname);
                                                            void dump(Msi::Queue<std::wstring>&& queue);
                                                            inline auto wait_messages()
                                                            {
                                                               std::unique_lock<std::mutex> lk(m_flushmtx);
                                                               m_isflush.wait(lk, [this] {return m_log.size() > 0u || m_isFinished.load(); });
                                                               return std::move(m_log);
                                                            }
                                                       
                                                         private:
                                                            std::mutex m_flushmtx;
                                                            std::wofstream m_fout;
                                                            std::atomic_bool m_isFinished;
                                                            Msi::Queue<std::wstring> m_log;
                                                            std::condition_variable m_isflush;
                                                            std::thread m_shadow_flusher;
                                                         };
                                                       
                                                         template<typename T>
                                                         inline UnsafeLogger& UnsafeLogger::operator<<(T&& val)
                                                         {
                                                            m_stream << std::forward<T>(val);
                                                            return *this;
                                                         }
                                                       
                                                         namespace log
                                                         {
                                                            UnsafeLogger& info(UnsafeLogger& stream);
                                                            UnsafeLogger& error(UnsafeLogger& stream);
                                                            UnsafeLogger& thread(UnsafeLogger& stream);
                                                            UnsafeLogger& warning(UnsafeLogger& stream);
                                                         }
                                                      }
                                                      #endif // !__LOGGER_H_INCLUDED__

                                                    ExpandedWrap disabled
                                                      #include "Logger.h"
                                                      #include <ctime>
                                                      #include <thread>
                                                      #include <future>
                                                      #include <iostream>
                                                       
                                                      namespace Msi
                                                      {
                                                         UnsafeLogger::UnsafeLogger(Logger& logger)
                                                         : m_logger(logger)
                                                         {
                                                         }
                                                       
                                                         UnsafeLogger::UnsafeLogger(UnsafeLogger&& log)
                                                         : m_logger(log.m_logger),
                                                           m_stream(std::move(log.m_stream))
                                                         {
                                                         }
                                                       
                                                         UnsafeLogger::~UnsafeLogger()
                                                         {
                                                            Flush();
                                                         }
                                                       
                                                         void UnsafeLogger::Flush() noexcept
                                                         {
                                                            m_stream.flush();
                                                            m_logger.Flush(std::move(m_stream.str()));
                                                         }
                                                       
                                                         Logger::Logger(const std::wstring& logname)
                                                         : m_isFinished(false)
                                                         {
                                                            openlog(logname);
                                                            m_shadow_flusher = std::thread([this]
                                                            {
                                                               while (!m_isFinished.load())
                                                                  dump(wait_messages());
                                                       
                                                               std::lock_guard<std::mutex> lk(m_flushmtx);
                                                               dump(std::move(m_log));
                                                            });
                                                         }
                                                       
                                                         Logger::~Logger()
                                                         {
                                                            m_isFinished.store(true);
                                                            m_isflush.notify_all();
                                                       
                                                            m_shadow_flusher.join();
                                                       
                                                            m_fout.close();
                                                         }
                                                       
                                                         UnsafeLogger Logger::Log()
                                                         {
                                                            return UnsafeLogger(*this);
                                                         }
                                                       
                                                         void Logger::openlog(const std::wstring& logname)
                                                         {
                                                            m_fout.open(logname, std::ios_base::out | std::ios_base::trunc);
                                                            if (!m_fout.is_open())
                                                               throw std::runtime_error("Error: can't open log file");
                                                            auto now = std::chrono::system_clock::now();
                                                            time_t start = std::chrono::system_clock::to_time_t(now);
                                                            m_fout << L"Log was created at: " << std::ctime(&start);
                                                            m_fout << L"============================================" << std::endl;
                                                         }
                                                       
                                                         void Logger::Flush(std::wstring&& msg)
                                                         {
                                                            {
                                                               std::unique_lock<std::mutex> lk(m_flushmtx);
                                                               m_log.push(msg);
                                                            }
                                                            m_isflush.notify_all();
                                                         }
                                                       
                                                         void Logger::dump(Msi::Queue<std::wstring>&& queue)
                                                         {
                                                            try
                                                            {
                                                               while (queue.size() > 0u)
                                                               {
                                                                  std::wstring val;
                                                                  if (queue.try_pop(val))
                                                                  {
                                                                     m_fout << val;
                                                                  }
                                                               }
                                                               m_fout.flush();
                                                            }
                                                            catch (std::exception& err)
                                                            {
                                                               std::cerr << "Error: " << err.what() << std::endl;
                                                            }
                                                         }
                                                       
                                                         namespace log
                                                         {
                                                            UnsafeLogger& info(UnsafeLogger& stream)
                                                            {
                                                               return stream << L"[Information]";
                                                            }
                                                       
                                                            UnsafeLogger& warning(UnsafeLogger& stream)
                                                            {
                                                               return stream << L"[Warning]";
                                                            }
                                                       
                                                            UnsafeLogger& thread(UnsafeLogger& stream)
                                                            {
                                                               return stream << L"Thread [" << std::this_thread::get_id() << L"]";
                                                            }
                                                       
                                                            UnsafeLogger& error(UnsafeLogger& stream)
                                                            {
                                                               return stream << L"[Error]";
                                                            }
                                                         }
                                                      }

                                                    Единственное тут есть два момента, во первых вместо моей очереди, нужно подставить любую другую(например нестандартную concurent_queue) или самому написать. Во вторых тут вызывается - std::ctime(&start); функция, которая в Microsoft компиляторах генерирует варнинг, нужно либо убрать вызов этой функции, либо добавить в препроцессор _CRT_SECURE_NO_WARNINGS
                                                      Цитата Wound @
                                                      Единственное тут есть два момента, во первых вместо моей очереди, нужно подставить любую другую(например нестандартную concurent_queue) или самому написать

                                                      Поставь std::list<std::sringstream>
                                                        Цитата Олег М @
                                                        Поставь std::list<std::sringstream>

                                                        А зачем? Мне просто интересно чем std::stringstream лучше?
                                                          Уберёшь из основного потока вызов m_stream.str(), какая-никакая экономия. (Точнее, тебе там надо std::wstringstream)
                                                          А с помощью std::list легко можно организовать пул этих стримов.
                                                          Сообщение отредактировано: Олег М -
                                                            Ок.
                                                            Вот версия с std::wstringstream:
                                                            ExpandedWrap disabled
                                                              #ifndef __LOGGER_H_INCLUDED__
                                                              #define __LOGGER_H_INCLUDED__
                                                               
                                                              #include <sstream>
                                                              #include <mutex>
                                                              #include <fstream>
                                                              #include <memory>
                                                              #include <atomic>
                                                              #include <future>
                                                              #include <list>
                                                               
                                                              namespace core
                                                              {
                                                                  class Logger;
                                                                  class UnsafeLogger
                                                                  {
                                                                  public:
                                                                      UnsafeLogger(Logger& logger);
                                                                      UnsafeLogger(UnsafeLogger&& log);
                                                                      ~UnsafeLogger();
                                                               
                                                                  public:
                                                                      UnsafeLogger() = delete;
                                                                      UnsafeLogger(const UnsafeLogger& logger) = delete;
                                                                      UnsafeLogger& operator=(UnsafeLogger&&) = delete;
                                                                      UnsafeLogger& operator=(const UnsafeLogger&) = delete;
                                                               
                                                                  public:
                                                                      template <typename T>
                                                                      UnsafeLogger& operator <<(T &&val);
                                                               
                                                                      auto& operator <<(std::wostream& (*pfmanip)(std::wostream&))
                                                                      {
                                                                          m_stream << pfmanip;
                                                                          return *this;
                                                                      }
                                                               
                                                                      auto& operator <<(UnsafeLogger& (*pfmanip)(UnsafeLogger&))
                                                                      {
                                                                          return pfmanip(*this);
                                                                      }
                                                               
                                                                  public:
                                                                      void Flush() noexcept;
                                                               
                                                                  private:
                                                                      Logger & m_logger;
                                                                      std::wstringstream m_stream;
                                                                  };
                                                               
                                                                  class Logger
                                                                  {
                                                                  public:
                                                                      explicit Logger(const std::wstring& logname);
                                                                      ~Logger();
                                                               
                                                                  public:
                                                                      Logger(const Logger&) = delete;
                                                                      Logger(Logger&&) = delete;
                                                                      Logger& operator=(const Logger&) = delete;
                                                                      Logger& operator=(Logger&&) = delete;
                                                               
                                                                      UnsafeLogger Log();
                                                                      void Flush(std::wstringstream&& msg);
                                                                  private:
                                                               
                                                                      void openlog(const std::wstring& logname);
                                                                      void dump(std::list<std::wstringstream>&& queue);
                                                                      inline auto wait_messages()
                                                                      {
                                                                          std::unique_lock<std::mutex> lk(m_flushmtx);
                                                                          m_isflush.wait(lk, [this] {return m_log.size() > 0u || m_isFinished.load(); });
                                                                          return std::move(m_log);
                                                                      }
                                                               
                                                                  private:
                                                                      std::mutex m_flushmtx;
                                                                      std::wofstream m_fout;
                                                                      std::atomic_bool m_isFinished;
                                                                      std::list<std::wstringstream> m_log;
                                                                      std::condition_variable m_isflush;
                                                                      std::thread m_shadow_flusher;
                                                                  };
                                                               
                                                                  template<typename T>
                                                                  inline UnsafeLogger& UnsafeLogger::operator<<(T&& val)
                                                                  {
                                                                      m_stream << std::forward<T>(val);
                                                                      return *this;
                                                                  }
                                                               
                                                                  namespace log
                                                                  {
                                                                      UnsafeLogger& info(UnsafeLogger& stream);
                                                                      UnsafeLogger& error(UnsafeLogger& stream);
                                                                      UnsafeLogger& thread(UnsafeLogger& stream);
                                                                      UnsafeLogger& warning(UnsafeLogger& stream);
                                                                  }
                                                              }
                                                              #endif // !__LOGGER_H_INCLUDED__

                                                            ExpandedWrap disabled
                                                              #include "Logger.h"
                                                              #include <ctime>
                                                              #include <thread>
                                                              #include <future>
                                                              #include <iostream>
                                                               
                                                              namespace core
                                                              {
                                                                  UnsafeLogger::UnsafeLogger(Logger& logger)
                                                                      : m_logger(logger)
                                                                  {
                                                                  }
                                                               
                                                                  UnsafeLogger::UnsafeLogger(UnsafeLogger&& log)
                                                                      : m_logger(log.m_logger),
                                                                      m_stream(std::move(log.m_stream))
                                                                  {
                                                                  }
                                                               
                                                                  UnsafeLogger::~UnsafeLogger()
                                                                  {
                                                                      Flush();
                                                                  }
                                                               
                                                                  void UnsafeLogger::Flush() noexcept
                                                                  {
                                                                      m_stream.flush();
                                                                      m_logger.Flush(std::move(m_stream));
                                                                  }
                                                               
                                                                  Logger::Logger(const std::wstring& logname)
                                                                  : m_isFinished(false)
                                                                  {
                                                                      openlog(logname);
                                                                      m_shadow_flusher = std::thread([this]
                                                                      {
                                                                          while (!m_isFinished.load())
                                                                              dump(wait_messages());
                                                               
                                                                          std::lock_guard<std::mutex> lk(m_flushmtx);
                                                                          dump(std::move(m_log));
                                                                      });
                                                                  }
                                                               
                                                                  Logger::~Logger()
                                                                  {
                                                                      m_isFinished.store(true);
                                                                      m_isflush.notify_all();
                                                               
                                                                      m_shadow_flusher.join();
                                                               
                                                                      m_fout.close();
                                                                  }
                                                               
                                                                  UnsafeLogger Logger::Log()
                                                                  {
                                                                      return UnsafeLogger(*this);
                                                                  }
                                                               
                                                                  void Logger::openlog(const std::wstring& logname)
                                                                  {
                                                                      m_fout.open(logname, std::ios_base::out | std::ios_base::trunc);
                                                                      if (!m_fout.is_open())
                                                                          throw std::runtime_error("Error: can't open log file");
                                                                      auto now = std::chrono::system_clock::now();
                                                                      time_t start = std::chrono::system_clock::to_time_t(now);
                                                                      m_fout << L"Log was created at: " << std::ctime(&start);
                                                                      m_fout << L"============================================" << std::endl;
                                                                  }
                                                               
                                                                  void Logger::Flush(std::wstringstream&& msg)
                                                                  {
                                                                      {
                                                                          std::unique_lock<std::mutex> lk(m_flushmtx);
                                                                          m_log.push_back(std::move(msg));
                                                                      }
                                                                      m_isflush.notify_all();
                                                                  }
                                                               
                                                                  void Logger::dump(std::list<std::wstringstream>&& queue)
                                                                  {
                                                                      try
                                                                      {
                                                                          while (queue.size() > 0u)
                                                                          {
                                                                              m_fout << queue.front().str();
                                                                              queue.pop_front();
                                                                          }
                                                                          m_fout.flush();
                                                                      }
                                                                      catch (std::exception& err)
                                                                      {
                                                                          std::cerr << "Error: " << err.what() << std::endl;
                                                                      }
                                                                  }
                                                               
                                                                  namespace log
                                                                  {
                                                                      UnsafeLogger& info(UnsafeLogger& stream)
                                                                      {
                                                                          return stream << L"[Information]";
                                                                      }
                                                               
                                                                      UnsafeLogger& warning(UnsafeLogger& stream)
                                                                      {
                                                                          return stream << L"[Warning]";
                                                                      }
                                                               
                                                                      UnsafeLogger& thread(UnsafeLogger& stream)
                                                                      {
                                                                          return stream << L"Thread [" << std::this_thread::get_id() << L"]";
                                                                      }
                                                               
                                                                      UnsafeLogger& error(UnsafeLogger& stream)
                                                                      {
                                                                          return stream << L"[Error]";
                                                                      }
                                                                  }
                                                              }
                                                              Неплохо бы тестовый пример еще.
                                                                Цитата JoeUser @
                                                                Неплохо бы тестовый пример еще.


                                                                ну использовать вот так можно:
                                                                ExpandedWrap disabled
                                                                  #include "Logger.h"
                                                                  #include <iostream>
                                                                  #include <thread>
                                                                  #include <vector>
                                                                  #include <algorithm>
                                                                   
                                                                  void Test(core::Logger& logger, const std::wstring& message)
                                                                  {
                                                                      int counter = 1;
                                                                      while (counter < 10000)
                                                                      {
                                                                          logger.Log() << core::log::warning << L" " << core::log::thread << L" " << message << L" counter: " << counter << std::endl;
                                                                          ++counter;
                                                                      }
                                                                  }
                                                                   
                                                                   
                                                                  int main()
                                                                  {
                                                                      core::Logger logger(L"main.log");
                                                                   
                                                                      std::wstring msg1 = L"This is message one";
                                                                      std::wstring msg2 = L"This is message two";
                                                                      std::wstring msg3 = L"This is message three";
                                                                   
                                                                      std::vector<std::thread> threads;
                                                                   
                                                                      threads.emplace_back(Test, std::ref(logger), std::ref(msg1));
                                                                      threads.emplace_back(Test, std::ref(logger), std::ref(msg2));
                                                                      threads.emplace_back(Test, std::ref(logger), std::ref(msg3));
                                                                   
                                                                      std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); });
                                                                   
                                                                      std::cout << "Done...";
                                                                      std::cin.get();
                                                                      return 0;
                                                                  }
                                                                Сообщение отредактировано: Wound -
                                                                0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
                                                                0 пользователей:


                                                                Рейтинг@Mail.ru
                                                                [ Script execution time: 0,0937 ]   [ 17 queries used ]   [ Generated: 23.04.24, 09:02 GMT ]