Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[18.221.27.155] |
|
Страницы: (3) [1] 2 3 все ( Перейти к последнему сообщению ) |
Сообщ.
#1
,
|
|
|
Всем привет.
Есть следующий код(приведен ниже). Он работает, претензий к нему пока нет. Но у меня возникает ощущение что тут что то не ладное. Вообще это вопрос больше по тому - правильно ли тут все организовано? Немного опишу алгоритм: Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков. Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция void dump(std::promise<void>&& promise) В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", и дальше продолжает из другой очереди сохранять значения в файл, а остальные потоки, включая того, кто вызвал Flush продолжают забивать основную очередь данными. По тестам проблем нет никаких, как я только не извращался. Но у меня есть подозрение, что тут кроется ошибка. В частности, так как функцию Flush могут вызывать много потоков, не может ли произойти так, что пока поток shadow выгребает вторую очередь(резервную скажем так) - вдруг главная очередь забьется до предела, и получится так, что какой нибудь поток еще раз породит поток shadow, в котором произойдет гонка или еще что нибудь похуже? Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться. Ну или может тут другие косяки есть какие нибудь? std::wofstream fout; //! <<< Член класса void Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_queue.push(msg); std::cout << m_queue.size() << std::endl; if (m_queue.size() >= 1000) { std::promise<void> promise; auto fut = promise.get_future(); auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise)); try { fut.wait(); } catch (const std::exception& error) { std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl; } // shadow.wait(); } } void dump(std::promise<void>&& promise) { std::unique_lock<std::mutex> guard(m_dumpmtx); m_queue.swap(m_second_queue); promise.set_value(); fout.open("promise.log", std::ios_base::out | std::ios_base::app); while (m_second_queue.size() > 0u) { std::wstring val; if (m_second_queue.try_pop(val)) fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val; } fout.flush(); } На всякий случай скину тестовый пример: #include <concurrent_queue.h> #include <iostream> #include <fstream> #include <thread> #include <chrono> #include <algorithm> #include <mutex> #include <future> #include <atomic> #include <sstream> #include <string> using namespace std::chrono_literals; class Flusher { public: Flusher() : m_counter(1) { m_fout.open("promise.log", std::ios_base::out | std::ios_base::trunc); m_fout << "Start log." << std::endl; m_fout << "==========================================" << std::endl; } ~Flusher() { m_fout.close(); } void generate_value() { while (m_counter.load() <= 10000) { std::lock_guard<std::mutex> lk(m_mtx); std::wostringstream stream; stream << L"Thread: " << std::this_thread::get_id() << L" put value: " << m_counter.load() << std::endl; Flush(std::move(stream.str())); ++m_counter; } } void Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_queue.push(msg); std::cout << m_queue.unsafe_size() << std::endl; if (m_queue.unsafe_size() >= 1000) { std::promise<void> promise; auto fut = promise.get_future(); auto shadow = std::async(std::launch::async, &Flusher::dump, this, std::move(promise)); try { fut.wait(); } catch (const std::exception& error) { std::cout << "Error occured in thread: " << std::this_thread::get_id() << ", description: " << error.what() << std::endl; } // shadow.wait(); } } void dump(std::promise<void>&& promise) { std::unique_lock<std::mutex> guard(m_dumpmtx); std::for_each(m_queue.unsafe_begin(), m_queue.unsafe_end(), [this](auto&& item) { this->m_second_queue.push(std::move(item)); }); m_queue.clear(); promise.set_value(); while (m_second_queue.unsafe_size() > 0u) { std::wstring val; if (m_second_queue.try_pop(val)) m_fout << L"Thread: " << std::this_thread::get_id() << L" value: " << val; } m_fout.flush(); } private: concurrency::concurrent_queue<std::wstring> m_queue; concurrency::concurrent_queue<std::wstring> m_second_queue; std::atomic_int m_counter; std::mutex m_mtx; std::mutex m_flushmtx; std::mutex m_dumpmtx; std::wofstream m_fout; }; void task(Flusher& fl) { fl.generate_value(); } int main() { Flusher fl; std::vector<std::thread> threads; threads.emplace_back(task, std::ref(fl)); threads.emplace_back(task, std::ref(fl)); threads.emplace_back(task, std::ref(fl)); std::for_each(threads.begin(), threads.end(), [](auto&& item) { item.join(); }); std::cout << "Done..."; std::cin.get(); return 0; } |
Сообщ.
#2
,
|
|
|
Цитата Wound @ Немного опишу алгоритм: Функцию void Flush(std::wstring&& msg) могут вызывать несколько потоков. Поток ложит данные в очередь. Затем проверяет, если размер очереди выше максимального, то в отдельном потоке вызывается функция void dump(std::promise<void>&& promise) В которую перемещается объект будущее, дальше в функции dump очередь копируется в другую очередь, и выставляет результат объекта будущее в состояни "Готов", Как-то очень сложно и не совсем понятно - зачем. 1. Странно, что сначала объект помещается в очередь, а потом проверка на размер. Разумнее сделать наоборот. Можно такой проверки снаружи очереди вообще не делать. Функция добавления возвращает FALSE, получаем от очереди GetLastError и выясняем, что очередь переполнена. 2. Вообще-то это критическая ситуация. Что можно сделать ? 3. До какого-то предельного размера можно увеличивать размер самой очереди. Этот момент - увеличения размера - просто потребует дополнительного времени и всё. 4. Изначально создать очередь значительных размеров. Чтобы она не занимала много места, перейди к очереди указателей на объекты. Получишь и другой бонус - время доступа к объекту в потокобезопасной очереди резко снизится. Не будут копироваться сами объекты, будут копироваться только указатели. --- Фактически, для доступа к объекту средства синхронизации использоваться не будут. Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума. |
Сообщ.
#3
,
|
|
|
Цитата ЫукпШ @ Как-то очень сложно и не совсем понятно - зачем. А что конкретно не понятно то? Просто выгрузка очереди в файл идет в другом потоке, чтоб остальные не тормозить забивкой очереди. По поводу контейнера - можно взять например и список, вопрос не в контейнере. Вопрос в самом алгоритме как бы. Цитата ЫукпШ @ 1. Странно, что сначала объект помещается в очередь, а потом проверка на размер. Разумнее сделать наоборот. Да это не важно сначало идет проверка или потом, это детали, я это сделал перед выкладкой на форум, потому как мне показалось логичнее сначало ложить данные в очередь, а потом проверять - достигла ли она нужного размера или нет. Ну типа в этом случае нет лишней проверки. Цитата ЫукпШ @ Фактически, для доступа к объекту средства синхронизации использоваться не будут. Синхронизация будет обеспечена ходом алгоритма, что увеличит производительность до максимума. В смысле? |
Сообщ.
#4
,
|
|
|
Цитата Wound @ В смысле? Так ты подумай сам. Сначала объектом владеет какой-то поток, заполняет его данными. Затем указатель помещается в очередь, и именно она владеет объектом. Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов. Сам объект никуда не копируется. Поэтому затраты времени минимальны. После чего, допустим, другой поток получит этот указатель. Теперь этот указатель есть только у него, значит никакие средства синхронизации для доступа к объекту не нужны. И не используются. --- Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать внутреннее функционирование самой очереди. --- Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку таким способом можно обмениваться объектами неограниченных размеров. |
Сообщ.
#5
,
|
|
|
Цитата ЫукпШ @ Сначала объектом владеет какой-то поток, заполняет его данными. Затем указатель помещается в очередь, и именно она владеет объектом. Добавление указателя в очередь - это фактически запись UINT в массив UINT-ов. Сам объект никуда не копируется. Поэтому затраты времени минимальны. После чего, допустим, другой поток получит этот указатель. Теперь этот указатель есть только у него, значит никакие средства синхронизации для доступа к объекту не нужны. И не используются. --- Фактически, средства синхронизации потокобезопасной очереди будут обеспечивать внутреннее функционирование самой очереди. --- Такой вариант даст возможность писать алгоритм обмена в более общем виде, поскольку таким способом можно обмениваться объектами неограниченных размеров. ИМХО, слишком сложно ты описал. Это поле для бесконечных ошибок. У меня объект - строка, плюс ко всему не совсем я уверен что указатели потокобезопасные. Плюс какие гарантии что при перемещении объекта в очередь, он не понадобиться еще там где использовался? Добавлено Тут придется использовать умные указатели, с семантикой перемещения - std::unique_ptr, а он не потокобезопасный, на сколько я знаю. Так что придется всеравно задействовать средства синхронизации. |
Сообщ.
#6
,
|
|
|
Цитата Wound @ Плюс у меня такое ощущение, что тут нужно локальную переменную std::promise<void> promise; сделать членом класса, и получать std::shared_future Объект, для того чтобы все потоки ждали результата, но я могу ошибаться. Ну или может тут другие косяки есть какие нибудь? Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. |
Сообщ.
#7
,
|
|
|
Цитата Олег М @ Нафига тебе вообще этот promice? В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. Ясненько, спасибо. Просто я хотел еще попутно понять как работает promise + shared_future, как работает promise сам по себе примерно понимаю, но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата? |
Сообщ.
#8
,
|
|
|
Цитата Wound @ но не совсем понимаю в каких случаях идеально подошел бы promise+shared_future. Как я понимаю он применяется тогда, когда несколько потоков ждут будущего результата? Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно. |
Сообщ.
#9
,
|
|
|
Цитата Олег М @ Это примерно то же самое, что и Events в Win32. В твоём примере используестя абсолютно правильно, за исключение того, что бессмысленно. Я понял, спасибо. Добавлено Просто я тут книжку одну читаю, там всюду этот promise мелькает по поводу и без, я так подумал видимо очень хорошая вещь. Вот решил тоже попробовать что за зверь. Добавлено Цитата Олег М @ В функции flush() делай move(queue) и передавай результат параметром в std::async(dump, .....). А вот shadow точно следует сделать членом класса и делать ей wait() перед вызовом async и в деструкторе. Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный? Там общая файловая переменная фигурирует, но ведь если мы делаем wait до вызова async, значит там и не будет ведь запуска нескольких потоков? void Logger::Flush(std::wstring&& msg) { std::unique_lock<std::mutex> lk(m_flushmtx); m_log.push(msg); if (m_log.size() >= m_cachesize) { if (m_shadow_flush.valid()) { try { m_shadow_flush.wait(); } catch (const std::exception& error) { std::cout << "Error during flush queue: " << error.what() << std::endl; } } m_shadow_flush = std::async(std::launch::async, &Logger::dump, this, std::move(m_log)); } } void Logger::dump(Msi::Queue<std::wstring>&& queue) { //std::unique_lock<std::mutex> guard(m_dumpmtx); //! <<<<<<< Бессмысленный гард ведь? while (queue.size() > 0u) { std::wstring val; if (queue.try_pop(val)) { m_fout << L"Trhread [" << std::this_thread::get_id() << L"]: " << val; } } m_fout.flush(); } |
Сообщ.
#10
,
|
|
|
Цитата Wound @ Я ведь правильно понимаю, что в таком случае мьютекс внутри dump - попросту бесмысленный? Ну да, не нужен. У тебя сейчас dump вызывается синхронно, под блокировкой m_flushmtx. Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл. |
Сообщ.
#11
,
|
|
|
Цитата Олег М @ Вообще, здесь лучше использовать не async, а thread, который будет ждать события, что размер очереди больше m_cachesize, делать под блокировкой m_flushmtx - move(m_log) и сохранять очередь в файл. Так ведь thread тогда придется делать локальным, либо задавать какой нибудь defered launch(у него вообще есть такой режим?), плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. Ну и std::async с флагом std::launch::async - тот же thread, разве нет? |
Сообщ.
#12
,
|
|
|
Цитата Wound @ ак ведь thread тогда придется делать локальным, Ну да, сделай. Что в этом плохого. Цитата Wound @ плюс ко всему если вдруг будет исключение внутри dump - в случае с thread я же не смогу его обработать. А сейчас ты как его обрабатываешь? Делайшь try-catch внутри функции потока и всё. Цитата Wound @ Ну и std::async с флагом std::launch::async - тот же thread, разве нет Не совсем. Async - это thread pool, немного для других задач. В твоём случае проще и эффективнее использовать thread. Скорее всего у тебя будет один экземпляр Logger на всех, нафига тебе там пул потоков? Кроме того, у тебя сейчас по-любому dump() должен отрабатывать синхронно. Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке. Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица. |
Сообщ.
#13
,
|
|
|
Цитата Олег М @ Ну да, сделай. Что в этом плохого. Он запустит новый поток при вызове конструктора класса Log. Цитата Олег М @ Не совсем. Async - это thread pool, немного для других задач. У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения. Цитата Олег М @ Для этого ты используешь m_shadow_flush. А так он будет гарантированно работать в одном потоке. Async пригодился бы, если бы один Logger делал несколько dump() одновременно, но это бессмыслица. Хм... Добавлено ты имеешь ввиду вызвать что то типа такого в конструкторе? std::thread([this] { std::unique_lock<std::mutex> lk(m_flushmtx); m_cond_flush.wait(lk, [this] {return m_log.size() >= m_cachesize || m_isFinished; }); this->dump(std::move(m_log)); }); |
Сообщ.
#14
,
|
|
|
Цитата Wound @ Он запустит новый поток при вызове конструктора класса Log. Обычно логгеров в приложении один, от силы два. В чём проблема. Цитата Wound @ У Майерса написано что std::async использовать предпочтительннее std::thread, вроде как дает даже совет везде юзать именно std::async, так как он более умный + можно обрабатывать исключения. О предпочтительности можно говорить только в контексте конкретной задачи. В твоём случае thread предпочтительнее. Под обработкой исключении ты имеешь ввиду это? Если да, то что тебе мешает сделать то же самое в функции потока? Цитата Wound @ std::cout << "Error during flush queue: " << error.what() << std::endl; |
Сообщ.
#15
,
|
|
|
Цитата Олег М @ Обычно логгеров в приложении один, от силы два. В чём проблема. Да в принципе не в чем, я щас глянул, у меня у логгера все конструкторы, кроме одного удалены. Ладно, сейчас попробую прикрутить, посмотрим что получится. Возможно ты и прав. Добавлено Просто тут фишка в том, что у меня в конструкторе лог открывается, сдается мне, что использование std::thread как члена класса, стартанет поток еще до тела конструктора, что приведет к UB. |