Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[35.168.113.41] |
|
Сообщ.
#1
,
|
|
|
Доброе время суток.
Столкнулся с такой проблемой: Я организовал взаимодействие потоков таким образом: поток1 формирует данные while (...) { std::lock_guard<std::mutex> lg(mut); //Computing data exch_data.push(tmp); data_cond.notify_one(); } endWork = true; поток2 будет выводить на экран while (true) { if (endWork) break; std::unique_lock<std::mutex> lg(mut); data_cond.wait(lg,[]{return !exch_data.empty();}); tmp = exch_data.front(); exch_data.pop(); //Draw data lg.unlock(); } main: int _tmain(int argc, _TCHAR* argv[]) { CThread_synchro exampl; //... std::thread t2(&CThread_synchro::_prepereData,&exampl); std::thread t1(&CThread_synchro::_proccesData,&exampl); //... t2.join(); t1.join(); return 0; } Предполагалось, я сформировал данные, поместил в очередь в одном потоке, а во-втором выбрал и очистил очередь и ждет поступления данных . Предполагал, что данный механизм блокировок обеспечит мне необходимую работу программы Но в результате у меня не корректная работа проги: 1-й поток набивает в очередь н-е количество порций данных, а второй отображает только несколько. Может я не тот механизм выбрал для такой задачи? |
Сообщ.
#2
,
|
|
|
Механизм правильный. Дело в другом - в таймслайсах, которые отводятся каждому потоку. И переменной endWork, которую надо устанавливать либо под лочкой, либо делать atomic_bool. Если во втором потоке к проверке endWork добавить проверку очереди на пустоту, то всё должно заработать. И это, надо вытащить из под лочки notify_one.
|
Сообщ.
#3
,
|
|
|
В общем, я тут пованговал, как могло выглядеть неизвестное мне приложение, получилось нечто такое:
#include <thread> #include <mutex> #include <atomic> #include <queue> #include <iostream> #include <conio.h> #include <condition_variable> std::atomic_bool endWork = false; std::queue<int> exampl; std::condition_variable data_cond; std::mutex mut; void prepereData(std::queue<int> &exch_data) { int tmp = 0; while (!kbhit() || getch() != 27) { { std::lock_guard<std::mutex> lg(mut); //Computing data exch_data.push(++tmp); } data_cond.notify_one(); } endWork = true; data_cond.notify_one(); } void proccesData(std::queue<int> &exch_data) { while (!endWork || !exch_data.empty()) { int tmp; std::unique_lock<std::mutex> lg(mut); data_cond.wait(lg, [&exch_data] { return !exch_data.empty() || endWork; } ); if (!exch_data.empty()) { tmp = exch_data.front(); exch_data.pop(); //Draw data std::cout << tmp << ' '; } lg.unlock(); } std::cout << std::endl; } int main() { //... std::thread t2(prepereData, std::ref(exampl)); std::thread t1(proccesData, std::ref(exampl)); //... t2.join(); t1.join(); return 0; } |
Сообщ.
#4
,
|
|
|
Цитата agapa @ Может я не тот механизм выбрал для такой задачи? Может ты и отладишь твой алгоритм, но я бы так никогда не стал делать. я делаю так: 1. Обе потоковые функции, начав работать, останавливаются на функции ожидания многих событий. "WaitForMultipleObjects" 2. Очередь должна быть потокобезопасным объектом. Т.е. синхронизация происходит внутри очереди. 3. По какому-то событию 1-й поток разбужен, получил/обработал данные, поместил в очередь. 4. Послал уведомление диспетчеру. Надо как-то взвести событие, которого ожидает 2-й поток. Очевидно, что удобнее всего использовать семафор. 5. 2-й поток начинает работать, извлекает данные из очереди и гасит семафор, обрабатывает данные,выводит итд. После чего снова останавливается, ожидая взведения события. --- Такая структкра приложения максимально асинхронная, потоки никак не связаны между собой. Это похоже на конвейер обработки данных. В этом случае синхронизация между потоками вообще не нужна - 1-й может уже получить и поместить в очередь количество данных существенно больше, чем 2-й обработал. (по каким-то там причинам). Но впоследствии, если конкретный комп. вообще способен решить поставленную задачу, очередь рассосётся и все данные будут обработаны. --- Возможны варианты, но это самая удобная и перспективная архитектура. --- Например, таким образом очень удобно рисовать графику а GUI-приложениях. Рабочий поток рисует картинки, отправляет их в очередь, посылает асинхронное уведомление (окну) и дальнейшая судьба нарисованного его не волнует. Главный поток извлекает картинки из очереди и выводит на экран. |
Сообщ.
#5
,
|
|
|
Цитата agapa @ Предполагалось, я сформировал данные, поместил в очередь в одном потоке, а во-втором выбрал и очистил очередь и ждет поступления данных . Предполагал, что данный механизм блокировок обеспечит мне необходимую работу программы Вынеси в первом потоке "...Compute Data..." за блокировку. Поставь перед lock_guard. У тебя первый поток постоянно блокирует мьютекс и второй не может с ним работать. Кроме того, если в вычислениях, которые делай первый поток нет никаких ожидающих операций, то в начале цикла, за блокировкой не мешает сделать std::this_thread::yield() или sleep_for. Нужно во втором потоке полностью очищать очредь. Например так: ..... data_cond.wait(lg,[]{return !exch_data.empty();}); auto items = std::move(exch_data); for (auto &item: items) ..... Добавлено Цитата agapa @ lg.unlock(); Это можно не делать, там и так разблокирутся в деструкторе unique_lock |
Сообщ.
#6
,
|
|
|
Доброе время суток
Огромное спасибо за Ваши предложения ЫукпШ, "ДА" с использованием "старого" API все норм работает, но мне ударило в голову, что использования нового стандарта более элегантно. Ну и само мое желание использовать новый стандарт... Цитата ЫукпШ @ - я закрутил на "событиях". Использовать семафор? ...что удобнее всего использовать семафор. |
Сообщ.
#7
,
|
|
|
Цитата agapa @ ЫукпШ, "ДА" с использованием "старого" API все норм работает, но мне ударило в голову, что использования нового стандарта более элегантно. Ну и само мое желание использовать новый стандарт... В "старом API" работает скорее всего потому, что ты, наверное, используешь событие с ручным сбросом. При использовании события с автоматическим сбросом будет работать точно также. Здесь проблема не в апи и не в семафорах (вооббще не представляю зачем они тут нужны), а в том что ты некорректно блокируешь мьютекс - первый поток держит блокировку слишком долго и отпускает на слишком короткое время, за которое второй поток не успевает его заблокировать. Блокировки должны быть максимально короткими. В данном случае первый поток должен блокировать мьютекс только для того, чтоб сделать push и notify, второй поток - только для того, чтоб вычитать весь список - move(exch_data) или splice. Остальные операции - computing и draw должны быть вне блокировок. Тогда всё будет работать хорошо. Добавлено И, кстати, во втором потоке нужно вынести std::unique_lock<std::mutex> lg(mut) за цикл (а lg.unlock(); убрать вообще). Тогда может и так заработает. А то сейчас получается, что второй поток у тебя блокирует-разблокирует мьютекс вдвое чаще чам первый, соответственно отрабатывает вдвое реже. |
Сообщ.
#8
,
|
|
|
Цитата Олег М @ Блокировки должны быть максимально короткими. В данном случае первый поток должен блокировать мьютекс только для того, чтоб сделать push и notify, второй поток - только для того, чтоб вычитать весь список - move(exch_data) или splice. Остальные операции - computing и draw должны быть вне блокировок. Тогда всё будет работать хорошо. По моему представлению работы проги должно быть так: я рассчитал траекторию, сформировал ОДИН пакет и поместил его в очередь, предварительно залочил, что и делает первый поток. Блокировка должна быть на протяжении этого процесса. В конце итерации цикла мьютекс освобождается. Цитата Олег М @ ...первый поток держит блокировку слишком долго и отпускает на слишком короткое время... Может быть надо так : while (...) { { std::lock_guard<std::mutex> lg(mut); //Computing data exch_data.push(tmp); //Для того, чтобы здесь освободить мьютекс } data_cond.notify_one(); } И что значит "на короткое время"? Второй поток должен, захватив мьютекс, считать этот ЕДИНЫЙ пакет и отобразить его. Опять же я хотел бы залочить этот процесс, чтобы в этот момент очередь не заполнялась. Да, "lg.unlock(); " можно и убрать. Это больше для наглядности, для меня. Но... Увы... Но если переписать цикл во втором потоке, как это подсказал Qraizer, с: while (true) { if (endWork) break; //... На while (!endWork || !exch_data.empty()) { //... } То все работает норм. Но это не соответствует моему представлению (оно может быть слишком притензиозным ). Сделал вывод в консоль: сперва работает только первый поток, заполняя энным количеством пакетов очередь, только потом потоки начинаю работать синхронно. |
Сообщ.
#9
,
|
|
|
Цитата agapa @ И что значит "на короткое время"? Второй поток должен, захватив мьютекс, считать этот ЕДИНЫЙ пакет и отобразить его. Опять же я хотел бы залочить этот процесс, чтобы в этот момент очередь не заполнялась. У тебя второй поток разблокирует мьютекс в каждой итерации, и первый поток успевает в это время накидать туда кучу пакетов //thread1 { while (...) { //Computing data std::lock_guard<std::mutex> lg(mut); exch_data.push(tmp); data_cond.notify_one(); } std::lock_guard<std::mutex> lg(mut); endWork = true; data_cond.notify_one(); } //thread2 { std::unique_lock<std::mutex> lg(mut); for(;;) { data_cond.wait(lg,[]{return !exch_data.empty() || endWork;}); if (endWork) break; tmp = exch_data.front(); exch_data.pop(); //Draw data } } Здесь, пока второй поток не перейдёт в режим ожидания, пакеты не будут добавляться Добавлено А вообще, я бы сделал вот так, чтобы Computing не блокировал Draw. auto GetItems() { data_cond.wait(lg,[]{return !exch_data.empty() || endWork;}); return std::move(exch_data); } //thread2 while (!endWork) { auto items = GetItems(); for (auto &item: items) //Draw data } |
Сообщ.
#10
,
|
|
|
Доброе время суток
Цитата Олег М @ А вообще, я бы сделал вот так, Спасибо за совет Попробую |
Сообщ.
#11
,
|
|
|
Доброе время суток
Цитата Олег М @ Здесь, пока второй поток не перейдёт в режим ожидания, пакеты не будут добавляться По эксперементировал с этими вариантами - не выходит у меня: 1-й поток завершается раньше и, в результате, не все данные... ... std::lock_guard<std::mutex> lg(mut); endWork = true; data_cond.notify_one(); ... Этот момент остался для меня не понятен Какой смысл? Как и в выносе локера за пределы цикла. Без while (!endWork || !exch_data.empty()) { //... } данные теряются. |
Сообщ.
#12
,
|
|
|
Цитата agapa @ данные теряются. Данные лежат в очереди. Теряются потому что ты сначала проверяешь endWork и, если она true, сразу выходишь. Если сначала вычитать очередь, а потом выходить, ничего не потеряется. Кстати, у меня во втором случае, где GetItems() ничего теряться не должно - сначала обрабатывается очередь, потом выходит из цикла. Добавлено Цитата agapa @ Этот момент остался для меня не понятен Какой смысл? Как и в выносе локера за пределы цикла. Потому что у тебя второй поток спит в wait(). Если при завершении 1-го потока не сделать notify, то он там так и зависнет. Добавлено За пределы цикла выносится, чтоб второй поток разблокировал мьютекс только во время ожидания новых данных. Соответственно первый поток будет добалять данные только когда второй ничего не делает. И здесь, чтоб данные не терялись достаточно обработать их после выхода из цикла. |
Сообщ.
#13
,
|
|
|
Благодарю за разъяснение
|