Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[3.14.6.194] |
|
Сообщ.
#1
,
|
|
|
Всем привет.
Есть следующий код(приведен ниже). Он работает, претензий к нему пока нет. Но у меня возникает ощущение что тут что то не ладное. Вообще это вопрос больше по тому - правильно ли тут все организовано? Немного опишу алгоритм: Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков. Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция void dump(std::promise<void>&& promise) В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", и дальше продолжает из другой очереди сохранять значения в файл, а остальные потоки, включая того, кто вызвал Flush продолжают забивать основную очередь данными. По тестам проблем нет никаких, как я только не извращался. Но у меня есть подозрение, что тут кроется ошибка. В частности, так как функцию Flush могут вызывать много потоков, не может ли произойти так, что пока поток shadow выгребает вторую очередь(резервную скажем так) - вдруг главная очередь забьется до предела, и получится так, что какой нибудь поток еще раз породит поток shadow, в котором произойдет гонка или еще что нибудь похуже? Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться. Ну или может тут другие косяки есть какие нибудь? std::wofstream fout; //! <<< Член класса void Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_queue.push(msg); std::cout << m_queue.size() << std::endl; if (m_queue.size() >= 1000) { std::promise<void> promise; auto fut = promise.get_future(); auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise)); try { fut.wait(); } catch (const std::exception& error) { std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl; } // shadow.wait(); } } void dump(std::promise<void>&& promise) { std::unique_lock<std::mutex> guard(m_dumpmtx); m_queue.swap(m_second_queue); promise.set_value(); fout.open("promise.log", std::ios_base::out | std::ios_base::app); while (m_second_queue.size() > 0u) { std::wstring val; if (m_second_queue.try_pop(val)) fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val; } fout.flush(); } На всякий случай скину тестовый пример: #include <concurrent_queue.h> #include <iostream> #include <fstream> #include <thread> #include <chrono> #include <algorithm> #include <mutex> #include <future> #include <atomic> #include <sstream> #include <string> using namespace std::chrono_literals; class Flusher { public: Flusher() : m_counter(1) { m_fout.open("promise.log", std::ios_base::out | std::ios_base::trunc); m_fout << "Start log." << std::endl; m_fout << "==========================================" << std::endl; } ~Flusher() { m_fout.close(); } void generate_value() { while (m_counter.load() <= 10000) { std::lock_guard<std::mutex> lk(m_mtx); std::wostringstream stream; stream << L"Thread: " << std::this_thread::get_id() << L" put value: " << m_counter.load() << std::endl; Flush(std::move(stream.str())); ++m_counter; } } void Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_queue.push(msg); std::cout << m_queue.unsafe_size() << std::endl; if (m_queue.unsafe_size() >= 1000) { std::promise<void> promise; auto fut = promise.get_future(); auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise)); try { fut.wait(); } catch (const std::exception& error) { std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl; } // shadow.wait(); } } void dump(std::promise<void>&& promise) { std::unique_lock<std::mutex> guard(m_dumpmtx); std::for_each(m_queue.unsafe_begin(), m_queue.unsafe_end(), [this](auto&& item) { this->m_second_queue.push(std::move(item)); }); m_queue.clear(); promise.set_value(); while (m_second_queue.unsafe_size() > 0u) { std::wstring val; if (m_second_queue.try_pop(val)) m_fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val; } m_fout.flush(); } private: concurrency::concurrent_queue<std::wstring> m_queue; concurrency::concurrent_queue<std::wstring> m_second_queue; std::atomic_int m_counter; std::mutex m_mtx; std::mutex m_flushmtx; std::mutex m_dumpmtx; std::wofstream m_fout; }; void task(Flusher& fl) { fl.generate_value(); } int main() { Flusher fl; std::vector<std::thread> threads; threads.emplace_back(task, std::ref(fl)); threads.emplace_back(task, std::ref(fl)); threads.emplace_back(task, std::ref(fl)); std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); }); std::cout << "Done..."; std::cin.get(); return 0; } |
Сообщ.
#2
,
|
|
|
Цитата Wound @ Немного опишу алгоритм: Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков. Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция void dump(std::promise<void>&& promise) В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", Как-то очень сложно и не совсем понятно - зачем. 1. Странно, что сначала объект помещается в очередь, а потом проверка на размер. Разумнее сделать наоборот. Можно такой проверки снаружи очереди вообще не делать. Функция добавления возвращает FALSE, получаем от очереди GetLastError и выясняем, что очередь переполнена. 2. Вообще-то это критическая ситуация. Что можно сделать ? 3. До какого-то предельного размера можно увеличивать размер самой очереди. Этот момент - увеличения размера - просто потребует дополнительного времени и всё. 4. Изначально создать очередь значительных размеров. Чтобы она не занимала много места, перейди к очереди указателей на объекты. Получишь и другой бонус - время доступа к объекту в потокобезопасной очереди резко снизится. Не будут копироваться сами объекты, будут копироваться только указатели. --- Фактически, для доступа к объекту средства синхронизации использоваться не будут. Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума. |
Сообщ.
#3
,
|
|
|
Цитата ЫукпШ @ Как-то очень сложно и не совсем понятно - зачем. А что конкретно не понятно то? Просто выгрузка очереди в файл идет в другом потоке, чтоб остальные не тормозить забивкой очереди. По поводу контейнера - можно взять например и список, вопрос не в контейнере. Вопрос в самом алгоритме как бы. Цитата ЫукпШ @ 1. Странно, что сначала объект помещается в очередь, а потом проверка на размер. Разумнее сделать наоборот. Да это не важно сначало идет проверка или потом, это детали, я это сделал перед выкладкой на форум, потому как мне показалось логичнее сначало ложить данные в очередь, а потом проверять - достигла ли она нужного размера или нет. Ну типа в этом случае нет лишней проверки. Цитата ЫукпШ @ Фактически, для доступа к объекту средства синхронизации использоваться не будут. Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума. В смысле? |
Сообщ.
#4
,
|
|
|
Цитата Wound @ В смысле? Так ты подумай сам. Сначала объектом владеет какой-то поток, заполняет его данными. Затем указатель помещается в очередь, и именно она владеет объектом. Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов. Сам объект никуда не копируется. Поэтому затраты времени минимальны. После чего, допустим, другой поток получит этот указатель. Теперь этот указатель есть только у него, значит никакие средства синхронизации для доступа к объекту не нужны. И не используются. --- Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать внутреннее функционирование самой очереди. --- Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку таким способом можно обмениваться объектами неограниченных размеров. |
Сообщ.
#5
,
|
|
|
Цитата ЫукпШ @ Сначала объектом владеет какой-то поток, заполняет его данными. Затем указатель помещается в очередь, и именно она владеет объектом. Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов. Сам объект никуда не копируется. Поэтому затраты времени минимальны. После чего, допустим, другой поток получит этот указатель. Теперь этот указатель есть только у него, значит никакие средства синхронизации для доступа к объекту не нужны. И не используются. --- Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать внутреннее функционирование самой очереди. --- Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку таким способом можно обмениваться объектами неограниченных размеров. ИМХО, слишком сложно ты описал. Это поле для бесконечных ошибок. У меня объект - строка, плюс ко всему не совсем я уверен что указатели потокобезопасные. Плюс какие гарантии что при перемещении объекта в очередь, он не понадобиться еще там где использовался? Добавлено Тут придется использовать умные указатели, с семантикой перемещения - std::unique_ptr, а он не потокобезопасный, на сколько я знаю. Так что придется всеравно задействовать средства синхронизации. |
Сообщ.
#6
,
|
|
|
Цитата Wound @ Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться. Ну или может тут другие косяки есть какие нибудь? Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. |
Сообщ.
#7
,
|
|
|
Цитата Олег М @ Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. Ясненько, спасибо. Просто я хотел еще попутно понять как работает promise + shared_future, как работает promise сам по себе примерно понимаю, но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата? |
Сообщ.
#8
,
|
|
|
Цитата Wound @ но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата? Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно. |
Сообщ.
#9
,
|
|
|
Цитата Олег М @ Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно. Я понял, спасибо. Добавлено Просто я тут книжку одну читаю, там всюду этот promise мелькает по поводу и без, я так подумал видимо очень хорошая вещь. Вот решил тоже попробовать что за зверь. Добавлено Цитата Олег М @ В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный? Там общая файловая переменная фигурирует, но ведь если мы делаем wait до вызова async, значит там и не будет ведь запуска нескольких потоков? void Logger::Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_log.push(msg); if (m_log.size() >= m_cachesize) { if (m_shadow_flush.valid()) { try { m_shadow_flush.wait(); } catch (const std::exception& error) { std::cout << "Error during flush queue: " << error.what() << std::endl; } } m_shadow_flush = std::async(std::launch::async, &Logger::dump, this, std::move(m_log)); } } void Logger::dump(Msi::Queue<std::wstring>&& queue) { //std::unique_lock<std::mutex> guard(m_dumpmtx); //! <<<<<<< Бессмысленный гард ведь? while (queue.size() > 0u) { std::wstring val; if (queue.try_pop(val)) { m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val; } } m_fout.flush(); } |
Сообщ.
#10
,
|
|
|
Цитата Wound @ Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный? Ну да, не нужен. У тебя сейчас dump вызывается синхронно, под блокировкой m_flushmtx. Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл. |
Сообщ.
#11
,
|
|
|
Цитата Олег М @ Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл. Так ведь thread тогда придется делать локальным, либо задавать какой нибудь defered launch(у него вообще есть такой режим?), плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. Ну и std::async с флагом std::launch::async - тот же thread, разве нет? |
Сообщ.
#12
,
|
|
|
Цитата Wound @ ак ведь thread тогда придется делать локальным, Ну да, сделай. Что в этом плохого. Цитата Wound @ плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. А сейчас ты как его обрабатываешь? Делайшь try-catch внутри функции потока и всё. Цитата Wound @ Ну и std::async с флагом std::launch::async - тот же thread, разве нет Не совсем. Async - это thread pool, немного для других задач. В твоём случае проще и эффективнее использовать thread. Скорее всего у тебя будет один экземпляр Logger на всех, нафига тебе там пул потоков? Кроме того, у тебя сейчас по-любому dump() должен отрабатывать синхронно. Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке. Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица. |
Сообщ.
#13
,
|
|
|
Цитата Олег М @ Ну да, сделай. Что в этом плохого. Он запустит новый поток при вызове конструктора класса Log. Цитата Олег М @ Не совсем. Async - это thread pool, немного для других задач. У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения. Цитата Олег М @ Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке. Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица. Хм... Добавлено ты имеешь ввиду вызвать что то типа такого в конструкторе? std::thread([this] { std::unique_lock<std::mutex> lk(m_flushmtx); m_cond_flush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; }); this->dump(std::move(m_log)); }); |
Сообщ.
#14
,
|
|
|
Цитата Wound @ Он запустит новый поток при вызове конструктора класса Log. Обычно логгеров в приложении один, от силы два. В чём проблема. Цитата Wound @ У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения. О предпочтительности можно говорить только в контексте конкретной задачи. В твоём случае thread предпочтительнее. Под обработкой исключении ты имеешь ввиду это? Если да, то что тебе мешает сделать то же самое в функции потока? Цитата Wound @ std::cout << "Error during flush queue: " << error.what() << std::endl; |
Сообщ.
#15
,
|
|
|
Цитата Олег М @ Обычно логгеров в приложении один, от силы два. В чём проблема. Да в принципе не в чем, я щас глянул, у меня у логгера все конструкторы, кроме одного удалены. Ладно, сейчас попробую прикрутить, посмотрим что получится. Возможно ты и прав. Добавлено Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB. |
Сообщ.
#16
,
|
|
|
Цитата Wound @ Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB. Он стартанёт, когда ты его стартуешь, m_thread = std::thread(&ThreadProc.....), ни раньше ни позже. |
Сообщ.
#17
,
|
|
|
Ок, вот так получилось, вроде работает. Пожалуй на этом и остановлюсь. Надеюсь тут больше нет косяков?
//Constructor Logger::Logger(const std::wstring& logname, unsigned int cachesize) : m_cachesize(cachesize), m_logname(logname), m_isFinished(false) { openlog(m_logname); m_shadow_flusher = std::thread([this] { while (!m_isFinished) { std::unique_lock<std::mutex> lk(m_flushmtx); m_isflush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; }); this->dump(std::move(m_log)); } }); } void Logger::Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_log.push(msg); if (m_log.size() >= m_cachesize) m_isflush.notify_one(); } void Logger::dump(Msi::Queue<std::wstring>&& queue) { try { while (queue.size() > 0u) { std::wstring val; if (queue.try_pop(val)) { m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val; } } m_fout.flush(); } catch (std::exception& err) { std::cerr << "Error: " << err.what() << std::endl; } } Logger::~Logger() { m_isFinished.store(true); m_isflush.notify_one(); m_shadow_flusher.join(); m_fout.close(); } |
Сообщ.
#18
,
|
|
|
Цитата Wound @ m_shadow_flusher = std::thread([this] { while (!m_isFinished) { std::unique_lock<std::mutex> lk(m_flushmtx); m_isflush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; }); this->dump(std::move(m_log)); } }); 1. dump() должен вызываться без блокировки. 2. m_cachesize вообще больше не нужен. Событие можно взводить на каждый вызов Flush(), это дешёвая операция m_shadow_flusher = std::thread([this] { while (!m_isFinished) this->dump(WaitMessages()); std::lock_guard lock(m_flushmtx); this->dump(std::move(m_log)); }); inline auto Logger::WaitMessages() { std::unique_lock<std::mutex> lk(m_flushmtx); m_isflush.wait(lk, [this] {return !m_log.empty() || m_isFinished; }); return std::move(m_log); } 3. m_isFinished можно сделать просто volatile bool. В деструкторе его нужно взводить под блокировкой { std::lock_guard lock(m_flushmtx); m_isFinished = true; m_isflush.notify_all(); } m_shadow_flusher.join(); |
Сообщ.
#19
,
|
|
|
Цитата Олег М @ 3. m_isFinished можно сделать просто volatile bool. В деструкторе его нужно взводить под блокировкой Он у меня как std::atomic_bool |
Сообщ.
#20
,
|
|
|
Цитата Wound @ Он у меня как std::atomic_bool Можно без atomic |
Сообщ.
#21
,
|
|
|
Цитата Олег М @ Можно без atomic Так atomic вроде дешевле блокировки. Добавлено И мне не совсем понятно зачем тут два раза dump вызывать? Ведь все равно WaitMessage все сделает. m_shadow_flusher = std::thread([this] { while (!m_isFinished) this->dump(WaitMessages()); std::lock_guard lock(m_flushmtx); this->dump(std::move(m_log)); }); |
Сообщ.
#22
,
|
|
|
Цитата Wound @ Так atomic вроде дешевле блокировки. Начитывается он в любом случае под блокировкой, так что ничего у тебя там не дешевле. И, в общем случае, устанавливать такие переменные надо под блокировкой - там две операции =true b notify_all(), их надо синхронизировать с чтением Добавлено Цитата Wound @ И мне не совсем понятно зачем тут два раза dump вызывать? Ведь все равно WaitMessage все сделает. Да, возможно. На всякиий случай, хуже точно не будет. |
Сообщ.
#23
,
|
|
|
Цитата Олег М @ Начитывается он в любом случае под блокировкой, так что ничего у тебя там не дешевле. И, в общем случае, устанавливать такие переменные надо под блокировкой - там две операции =true b notify_all(), их надо синхронизировать с чтением На сколько я знаю использование std::atomic накладывает ограничение на порядок выполнения операций. Т.е. в данном случае notify_all буквально всегда обязано выполнятся только после присваивания значения atomic переменной. По крайней мере так написано у майерса: Цитата Однако применение s t d : : a t o m i c накладывает ограничения на разрешенные переупорядочения кода, и одно такое ограничение заключается в том, что никакой код, предшествующий в исходном тексте записи переменной s t d : : а t omi с, не может иметь место (или выглядеть таковым для друтих ядер) после нее7. Это означает, что в нашем коде auto imptValue = compute importantVa l ue ( ) ; / / Вычисление значения valAvai l aЫe = true ; 11 Сообщение об этом 11 другому потоку компиляторы должны не только сохранять порядок присваиваний i m p t V a l u e и valAva i laЬle, но и генерировать код, который гарантирует, что так же поведет себя и аппаратное обеспечение. В результате объявление v a lAva i l a Ы e как s t d : : at o m i c гарантирует выполнение критичного требования к упорядоченности - что значение imptVal ue должно быть видимо всеми потоками как измененное не позже, чем значение valAva i l aЬle. Единственное - это файл может закрыться раньше, чем dump вызвался, но у нас ведь гарантированно 1 поток? Добавлено Ладно попробую переписать, посмотрю что получится. |
Сообщ.
#24
,
|
|
|
Цитата Wound @ На сколько я знаю использование std::atomic накладывает ограничение на порядок выполнения операция. Т.е. в данном случае notify_all буквально всегда обязано выполнятся только после присваивания значения atomic переменной. По крайней мере так написано у майерса: notify_all в любом случае вызовется после присвоения значения. Здесь, в принципе, можно и без блокировки. Я говорил про общий случай - лучше выставление значении всегда синхронизировать с проверкой условий в потоке. Проблем будет меньше. Цитата Wound @ Единственное - это файл может закрыться раньше, чем dump вызвался, но у нас ведь гарантированно 1 поток? Не может. Ты join делаешь перед закрытием файла. |
Сообщ.
#25
,
|
|
|
Цитата Олег М @ Я говорил про общий случай - лучше выставление значении всегда синхронизировать с проверкой условий в потоке. Проблем будет меньше. А, ну тогда понял. Цитата Олег М @ Не может. Ты join делаешь перед закрытием файла. Ну да в принципе. Ну вроде все работает с последними изменениями. Спасибо за помощь Олег М. Добавлено На всякий случай выложу что получилось, может быть кому пригодится: #ifndef __LOGGER_H_INCLUDED__ #define __LOGGER_H_INCLUDED__ #include <sstream> #include <mutex> #include <fstream> #include <memory> #include <atomic> #include <future> #include "MsiQueue.h" namespace Msi { class Logger; class UnsafeLogger { public: UnsafeLogger(Logger& logger); UnsafeLogger(UnsafeLogger&& log); ~UnsafeLogger(); public: UnsafeLogger() = delete; UnsafeLogger(const UnsafeLogger& logger) = delete; UnsafeLogger& operator=(UnsafeLogger&&) = delete; UnsafeLogger& operator=(const UnsafeLogger&) = delete; public: template <typename T> UnsafeLogger& operator <<(T &&val); auto& operator <<(std::wostream& (*pfmanip)(std::wostream&)) { m_stream << pfmanip; return *this; } auto& operator <<(UnsafeLogger& (*pfmanip)(UnsafeLogger&)) { return pfmanip(*this); } public: void Flush() noexcept; private: Logger& m_logger; std::wostringstream m_stream; }; class Logger { public: explicit Logger(const std::wstring& logname); ~Logger(); public: Logger(const Logger&) = delete; Logger(Logger&&) = delete; Logger& operator=(const Logger&) = delete; Logger& operator=(Logger&&) = delete; UnsafeLogger Log(); void Flush(std::wstring&& msg); private: void openlog(const std::wstring& logname); void dump(Msi::Queue<std::wstring>&& queue); inline auto wait_messages() { std::unique_lock<std::mutex> lk(m_flushmtx); m_isflush.wait(lk, [this] {return m_log.size() > 0u || m_isFinished.load(); }); return std::move(m_log); } private: std::mutex m_flushmtx; std::wofstream m_fout; std::atomic_bool m_isFinished; Msi::Queue<std::wstring> m_log; std::condition_variable m_isflush; std::thread m_shadow_flusher; }; template<typename T> inline UnsafeLogger& UnsafeLogger::operator<<(T&& val) { m_stream << std::forward<T>(val); return *this; } namespace log { UnsafeLogger& info(UnsafeLogger& stream); UnsafeLogger& error(UnsafeLogger& stream); UnsafeLogger& thread(UnsafeLogger& stream); UnsafeLogger& warning(UnsafeLogger& stream); } } #endif // !__LOGGER_H_INCLUDED__ #include "Logger.h" #include <ctime> #include <thread> #include <future> #include <iostream> namespace Msi { UnsafeLogger::UnsafeLogger(Logger& logger) : m_logger(logger) { } UnsafeLogger::UnsafeLogger(UnsafeLogger&& log) : m_logger(log.m_logger), m_stream(std::move(log.m_stream)) { } UnsafeLogger::~UnsafeLogger() { Flush(); } void UnsafeLogger::Flush() noexcept { m_stream.flush(); m_logger.Flush(std::move(m_stream.str())); } Logger::Logger(const std::wstring& logname) : m_isFinished(false) { openlog(logname); m_shadow_flusher = std::thread([this] { while (!m_isFinished.load()) dump(wait_messages()); std::lock_guard<std::mutex> lk(m_flushmtx); dump(std::move(m_log)); }); } Logger::~Logger() { m_isFinished.store(true); m_isflush.notify_all(); m_shadow_flusher.join(); m_fout.close(); } UnsafeLogger Logger::Log() { return UnsafeLogger(*this); } void Logger::openlog(const std::wstring& logname) { m_fout.open(logname, std::ios_base::out | std::ios_base::trunc); if (!m_fout.is_open()) throw std::runtime_error("Error: can't open log file"); auto now = std::chrono::system_clock::now(); time_t start = std::chrono::system_clock::to_time_t(now); m_fout << L"Log was created at: " << std::ctime(&start); m_fout << L"============================================" << std::endl; } void Logger::Flush(std::wstring&& msg) { { std::unique_lock<std::mutex> lk(m_flushmtx); m_log.push(msg); } m_isflush.notify_all(); } void Logger::dump(Msi::Queue<std::wstring>&& queue) { try { while (queue.size() > 0u) { std::wstring val; if (queue.try_pop(val)) { m_fout << val; } } m_fout.flush(); } catch (std::exception& err) { std::cerr << "Error: " << err.what() << std::endl; } } namespace log { UnsafeLogger& info(UnsafeLogger& stream) { return stream << L"[Information]"; } UnsafeLogger& warning(UnsafeLogger& stream) { return stream << L"[Warning]"; } UnsafeLogger& thread(UnsafeLogger& stream) { return stream << L"Thread [" << std::this_thread::get_id() << L"]"; } UnsafeLogger& error(UnsafeLogger& stream) { return stream << L"[Error]"; } } } Единственное тут есть два момента, во первых вместо моей очереди, нужно подставить любую другую(например нестандартную concurent_queue) или самому написать. Во вторых тут вызывается - std::ctime(&start); функция, которая в Microsoft компиляторах генерирует варнинг, нужно либо убрать вызов этой функции, либо добавить в препроцессор _CRT_SECURE_NO_WARNINGS |
Сообщ.
#26
,
|
|
|
Цитата Wound @ Единственное тут есть два момента, во первых вместо моей очереди, нужно подставить любую другую(например нестандартную concurent_queue) или самому написать Поставь std::list<std::sringstream> |
Сообщ.
#27
,
|
|
|
Цитата Олег М @ Поставь std::list<std::sringstream> А зачем? Мне просто интересно чем std::stringstream лучше? |
Сообщ.
#28
,
|
|
|
Уберёшь из основного потока вызов m_stream.str(), какая-никакая экономия. (Точнее, тебе там надо std::wstringstream)
А с помощью std::list легко можно организовать пул этих стримов. |
Сообщ.
#29
,
|
|
|
Ок.
Вот версия с std::wstringstream: #ifndef __LOGGER_H_INCLUDED__ #define __LOGGER_H_INCLUDED__ #include <sstream> #include <mutex> #include <fstream> #include <memory> #include <atomic> #include <future> #include <list> namespace core { class Logger; class UnsafeLogger { public: UnsafeLogger(Logger& logger); UnsafeLogger(UnsafeLogger&& log); ~UnsafeLogger(); public: UnsafeLogger() = delete; UnsafeLogger(const UnsafeLogger& logger) = delete; UnsafeLogger& operator=(UnsafeLogger&&) = delete; UnsafeLogger& operator=(const UnsafeLogger&) = delete; public: template <typename T> UnsafeLogger& operator <<(T &&val); auto& operator <<(std::wostream& (*pfmanip)(std::wostream&)) { m_stream << pfmanip; return *this; } auto& operator <<(UnsafeLogger& (*pfmanip)(UnsafeLogger&)) { return pfmanip(*this); } public: void Flush() noexcept; private: Logger & m_logger; std::wstringstream m_stream; }; class Logger { public: explicit Logger(const std::wstring& logname); ~Logger(); public: Logger(const Logger&) = delete; Logger(Logger&&) = delete; Logger& operator=(const Logger&) = delete; Logger& operator=(Logger&&) = delete; UnsafeLogger Log(); void Flush(std::wstringstream&& msg); private: void openlog(const std::wstring& logname); void dump(std::list<std::wstringstream>&& queue); inline auto wait_messages() { std::unique_lock<std::mutex> lk(m_flushmtx); m_isflush.wait(lk, [this] {return m_log.size() > 0u || m_isFinished.load(); }); return std::move(m_log); } private: std::mutex m_flushmtx; std::wofstream m_fout; std::atomic_bool m_isFinished; std::list<std::wstringstream> m_log; std::condition_variable m_isflush; std::thread m_shadow_flusher; }; template<typename T> inline UnsafeLogger& UnsafeLogger::operator<<(T&& val) { m_stream << std::forward<T>(val); return *this; } namespace log { UnsafeLogger& info(UnsafeLogger& stream); UnsafeLogger& error(UnsafeLogger& stream); UnsafeLogger& thread(UnsafeLogger& stream); UnsafeLogger& warning(UnsafeLogger& stream); } } #endif // !__LOGGER_H_INCLUDED__ #include "Logger.h" #include <ctime> #include <thread> #include <future> #include <iostream> namespace core { UnsafeLogger::UnsafeLogger(Logger& logger) : m_logger(logger) { } UnsafeLogger::UnsafeLogger(UnsafeLogger&& log) : m_logger(log.m_logger), m_stream(std::move(log.m_stream)) { } UnsafeLogger::~UnsafeLogger() { Flush(); } void UnsafeLogger::Flush() noexcept { m_stream.flush(); m_logger.Flush(std::move(m_stream)); } Logger::Logger(const std::wstring& logname) : m_isFinished(false) { openlog(logname); m_shadow_flusher = std::thread([this] { while (!m_isFinished.load()) dump(wait_messages()); std::lock_guard<std::mutex> lk(m_flushmtx); dump(std::move(m_log)); }); } Logger::~Logger() { m_isFinished.store(true); m_isflush.notify_all(); m_shadow_flusher.join(); m_fout.close(); } UnsafeLogger Logger::Log() { return UnsafeLogger(*this); } void Logger::openlog(const std::wstring& logname) { m_fout.open(logname, std::ios_base::out | std::ios_base::trunc); if (!m_fout.is_open()) throw std::runtime_error("Error: can't open log file"); auto now = std::chrono::system_clock::now(); time_t start = std::chrono::system_clock::to_time_t(now); m_fout << L"Log was created at: " << std::ctime(&start); m_fout << L"============================================" << std::endl; } void Logger::Flush(std::wstringstream&& msg) { { std::unique_lock<std::mutex> lk(m_flushmtx); m_log.push_back(std::move(msg)); } m_isflush.notify_all(); } void Logger::dump(std::list<std::wstringstream>&& queue) { try { while (queue.size() > 0u) { m_fout << queue.front().str(); queue.pop_front(); } m_fout.flush(); } catch (std::exception& err) { std::cerr << "Error: " << err.what() << std::endl; } } namespace log { UnsafeLogger& info(UnsafeLogger& stream) { return stream << L"[Information]"; } UnsafeLogger& warning(UnsafeLogger& stream) { return stream << L"[Warning]"; } UnsafeLogger& thread(UnsafeLogger& stream) { return stream << L"Thread [" << std::this_thread::get_id() << L"]"; } UnsafeLogger& error(UnsafeLogger& stream) { return stream << L"[Error]"; } } } |
Сообщ.
#30
,
|
|
|
Неплохо бы тестовый пример еще.
|
Сообщ.
#31
,
|
|
|
Цитата JoeUser @ Неплохо бы тестовый пример еще. ну использовать вот так можно: #include "Logger.h" #include <iostream> #include <thread> #include <vector> #include <algorithm> void Test(core::Logger& logger, const std::wstring& message) { int counter = 1; while (counter < 10000) { logger.Log() << core::log::warning << L" " << core::log::thread << L" " << message << L" counter: " << counter << std::endl; ++counter; } } int main() { core::Logger logger(L"main.log"); std::wstring msg1 = L"This is message one"; std::wstring msg2 = L"This is message two"; std::wstring msg3 = L"This is message three"; std::vector<std::thread> threads; threads.emplace_back(Test, std::ref(logger), std::ref(msg1)); threads.emplace_back(Test, std::ref(logger), std::ref(msg2)); threads.emplace_back(Test, std::ref(logger), std::ref(msg3)); std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); }); std::cout << "Done..."; std::cin.get(); return 0; } |