Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[3.236.86.184] |
|
Сообщ.
#1
,
|
|
|
Всем привет!
Прошу найти ошибки (если есть) в моем решении тестового задания. Задание 1) найти количество строк в файлах в подкаталоге (не рекурсивно) 2) по умолчанию считать, что в подкаталоге все файлы текстовые 3) при пересчете загрузить все ядра CPU Вот мое решение, скажите что не так: #include <iostream> #include <fstream> #include <filesystem> #include <algorithm> #include <thread> #include <atomic> #include <mutex> #include <vector> #if __cplusplus < 201700L #error need c++17 compiler! #endif #if defined(__APPLE__) || defined(__MACH__) #define CH 13 #else #define CH 10 #endif using namespace std; namespace fs = std::filesystem; void Usage() { cout << "\nUsage: linecounter <valid-path>\n" << endl; exit(1); } int main(int argc, char* argv[]) { if (argc != 2 || !fs::directory_entry(argv[1]).exists() || ((fs::status(argv[1]).permissions() & fs::perms::others_read) == fs::perms::none)) Usage(); string work_dir(argv[1]); atomic<size_t> count = 0; atomic<size_t> error = 0; vector<string> file_pool; file_pool.reserve(distance(fs::directory_iterator(work_dir), fs::directory_iterator())); for (auto const& entry : fs::directory_iterator(work_dir)) file_pool.push_back(entry.path().filename().string()); vector<std::thread*> threads; size_t cores = thread::hardware_concurrency(); mutex mutex_vector; for (size_t i = 0; i < cores + 1; i++) { thread *th = new thread([&]() { while(true) { mutex_vector.lock(); if (file_pool.empty()) { mutex_vector.unlock(); break; } string file_name = file_pool.back(); file_pool.pop_back(); mutex_vector.unlock(); ifstream file (work_dir + "/" + file_name, std::ifstream::in|std::ifstream::binary); if (file.is_open()) { stringstream ss; ss << file.rdbuf(); string content = ss.str(); size_t cur_count = count_if (content.begin(), content.end(), [&](char c) { return c == CH; }); count += (cur_count) ? cur_count + 1 : (content.length()) ? 1 : 0; } else { error++; } } }); threads.push_back(th); }; for (auto &&i: threads) i->join(); cout << "Lines: " << count << ", I/O Errors: " << error << endl; for (auto &&i: threads) delete i; return 0; } |
Сообщ.
#2
,
|
|
|
Посмотрел по-диагонали, может другие участники чего добавят.
1. использовать defin'ы с целью констант - моветон. 2. сюда же и using namespace std; 3. exit(1) тоже непонятно зачем нужен. Эта функция аварийного завершения, у тебя в коде можно выйти из main через return. 4. хранить std::thread по указателю не нужно, new/delete тут тоже не нужен. 5. явно лочить мьютекс нежелательно, для этого есть raii lock_guard. 6. мне непонятен смысл этих танцев с бубном вокруг вектора и мьютекса. Можно не удалять имена файлов, а использовать тот же атомик индекс текущего имени. 7. к подсчету строк еще больше вопросов. зачем нужен stringstream? а если файл будет размером с террабайт, ты тоже его будешь пытаться в память целиком засунуть? Можно просто выделить буфер фиксированного размера (4кб например) и читать в него в цикле данные пока не дойдешь до конца файла через read(). 8. std::ifstream::in для ifstream можно не писать. 9. ошибки могут быть не только при открытии файла, а и при чтении, например. ifstream выставляет соответствующие флаги, подробнее см. в документации. 10. вместо "/" лучше заюзать std::filesystem::path::preferred_separator. Остальные куски кода работы с файловой системой не комментирую, т.к. не использовал fs, скорее всего там тоже проблем хватает. 11. можно вместо argv[1] вначале объявить const char* path = argv[1] для очевидности. И у текущего алгоритма есть фатальный недостаток. Допустим у тебя 4 ядра ЦП и 4 файла. 3 из них по 1Мб, а один - 1 Тб. Так вот 3 ядра будут простаивать, а один работать. Да и само задание бредовое. Я бы на месте тебя сперва вопросы задал по нему. Цитата Majestio @ 2) по умолчанию считать, что в подкаталоге все файлы текстовые Что значит "по умолчанию считать"? Это вообще ни о чем не говорит. А что если не текстовые? Цитата Majestio @ 3) при пересчете загрузить все ядра CPU Эта задача не ЦП грузит а дисковую подсистему. В реальных условиях как ты не тужься, а загрузка будет околонулевая (т.к. читает файл не ЦП, а дисковый контроллер, который работает асинхронно). Ну разве что RAM диск создать. |
Сообщ.
#3
,
|
|
|
shm отлично выше расписал, почти нечего добавить.
std::vector<thread*> - нафига? vector<thread> и всё. Или вообще jthread, если у тебя там 20-е плюсы, тогда и join не надо будет в конце юзать. |
Сообщ.
#4
,
|
|
|
Цитата shm @ У меня большое подозрение, что в тестовом задании ожидалось распараллелить работу не по файлам, а как раз по буферам. Типа, главный поток читает файлы и складывает буферы в очередь, потоки их оттуда изымают и считают строки. На это, как бы, намекает и тот факт, что строки нужны суммарно по файлам без индивидуального учёта, в конкретно каком сколько. + третий пункт "при пересчете"....(т.к. читает файл не ЦП, а дисковый контроллер, который работает асинхронно)... И всё равно выигрыш во времени сомнителен. Всё собирался накидать сравнение производительности, та всё времени не находится. |
Сообщ.
#5
,
|
|
|
Qraizer, тут в любом случае надо было уточнять задание. В такой постановке не могу считать его корректным. Имхо.
|
Сообщ.
#6
,
|
|
|
Всем спасибо.
|
Сообщ.
#7
,
|
|
|
Нашёл немного времени. До сравнения руки пока не дошли, однако могу предложить, Majestio, твой код в виде, который, думаю, отметившихся в этой теме больше устроит. Сорри, без обработки ошибок, ибо непонятно, что с ними делать. Так что просто синтетика.
#include <iostream> #include <filesystem> #include <fstream> #include <string> #include <queue> #include <vector> #include <iterator> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> namespace fs = std::filesystem; namespace chrono = std::chrono; using namespace std::literals; std::queue<fs::path> fileList; std::recursive_mutex queueLock; std::condition_variable_any quit; std::atomic<size_t> countLines = 0; auto getFile() { std::unique_lock<std::recursive_mutex> guard(queueLock); auto file(fileList.front()); fileList.pop(); quit.notify_all(); return file; } void threadFunc() { size_t curCount; for(;;) { fs::path fileName; { std::unique_lock<std::recursive_mutex> guard(queueLock); if (quit.wait_for(guard, 0ms, [&]{ return fileList.empty(); })) break; fileName = std::move(getFile()); std::cout << fileName << std::endl; } std::ifstream inFile(fileName); std::string buffer; for (curCount = 0; !inFile.eof(); ++curCount) std::getline(inFile, buffer); countLines += curCount; } } int main() { std::vector<std::thread> ths; auto startWork = chrono::system_clock::now(); decltype(startWork) endWork; for (fs::directory_iterator item(fs::current_path() / "texts"); item != fs::directory_iterator(); ++item) if (item->is_regular_file()) fileList.push(item->path()); startWork = chrono::system_clock::now(); for (int i = 0; i < std::thread::hardware_concurrency(); ++i) ths.emplace_back(std::thread(threadFunc)); for (auto &i : ths) i.join(); endWork = chrono::system_clock::now(); std::cout << "count lines "<< chrono::duration_cast<chrono::milliseconds>(endWork - startWork).count() << "ms\nlines " << countLines << std::endl; } |
Сообщ.
#8
,
|
|
|
Ну держите.
Подправил предыдущий вариант (к тому же он ранее пропускал пустые строки, что нехорошо) и чуток соптимизировал. (Во-первых, нефик все нитки будить, они всё равно передерутся за очередь и опять уснут все, кроме одной. Во-вторых, нечего буферу в цикле делать, он всё равно мувится туда/сюда, и размер у него фиксирован. В-третьих, нотификация теперь бросается вне лока очереди, что по идее уменьшит нагрузку на мультиядерное планирование, ибо на этом локе уже ожидаемо не одна нитка сидит.) #include <iostream> #include <filesystem> #include <fstream> #include <queue> #include <vector> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> namespace fs = std::filesystem; namespace chrono = std::chrono; using namespace std::literals; constexpr size_t bufferSize = 1024*1024; // размер буфера чтения std::queue<fs::path> fileList; // очередь файлов std::recursive_mutex queueLock; // лок для очереди std::condition_variable_any quit; // сигнал завершения std::atomic<size_t> countLines = 0; // подсчитанные строки /* получить очередной файл из очереди с блокировкой */ auto getFile() { decltype(fileList)::value_type file; { std::unique_lock<std::recursive_mutex> guard(queueLock); // лок file = std::move(fileList.front()); fileList.pop(); } // анлок quit.notify_one(); // дёрнуть сигнал return file; } /* функция нитки */ void threadFunc() { std::vector<char> buffer(bufferSize); // буфер чтения size_t curCount; // локальный счётчик, чтоб пореже дёргать глобальный for(;;) { fs::path fileName; { std::unique_lock<std::recursive_mutex> guard(queueLock); // лок // окончание работы, если очередь пуста if (quit.wait_for(guard, 0ms, [&]{ return fileList.empty(); })) break; // без таймаута fileName = std::move(getFile()); // забрать файл std::cout << fileName << std::endl; } // анлок std::ifstream inFile(fileName); // цикл чтения и подсчёта for (curCount = 0; !inFile.eof(); curCount+= std::count_if(buffer.begin(), // подсчёт buffer.begin() + inFile.gcount(), // тут .end() не канает! [](char ch){ return ch == '\n'; })) inFile.read(&buffer.front(), buffer.size()); // чтение // обновить глобальный счётчик countLines += curCount; } } int main() { std::vector<std::thread> ths; auto startWork = chrono::system_clock::now(); decltype(startWork) endWork; // параллелить заполнение очереди файлами не имеет смысла for (fs::directory_iterator item(fs::current_path() / "texts"); item != fs::directory_iterator(); ++item) if (item->is_regular_file()) fileList.push(item->path()); startWork = chrono::system_clock::now(); // старт // насоздавать ниток for (int i = 0; i < std::thread::hardware_concurrency(); ++i) ths.emplace_back(std::thread(threadFunc)); //подождать окончания работы for (auto &i : ths) i.join(); endWork = chrono::system_clock::now(); // стоп std::cout << "count lines "<< chrono::duration_cast<chrono::milliseconds>(endWork - startWork).count() << "ms\nlines " << countLines << std::endl; } #include <iostream> #include <filesystem> #include <fstream> #include <queue> #include <vector> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> namespace fs = std::filesystem; namespace chrono = std::chrono; using namespace std::literals; constexpr size_t bufferSize = 1024*1024; // размер буфера чтения std::queue<std::vector<char>> bufferList; // очередь буферов std::recursive_mutex queueLock; // лок для очереди std::mutex dirLock; // лок для списка файлов std::condition_variable_any quit; // сигнал завершения std::atomic<size_t> countLines = 0; // подсчитанные строки /* получить очередной буфер из очереди с блокировкой */ auto getBuffer() { decltype(bufferList)::value_type buf; { std::unique_lock<std::recursive_mutex> guard(queueLock); // лок буферов buf = std::move(bufferList.front()); bufferList.pop(); } // анлок буферов quit.notify_one(); // дёрнуть сигнал return buf; } /* впихнуть очередной буфер в очередь с блокировкой */ void putBuffer(decltype(bufferList)::value_type&& buf) { { // лок буферов std::unique_lock<std::recursive_mutex> guard(queueLock); bufferList.emplace(std::move(buf)); } // анлок буферов quit.notify_one(); // дёрнуть сигнал } fs::directory_iterator item; // "список" файлов std::vector<std::ifstream> files; // файловые потоки, по потоку на нитку /* чекнуть буферы в очереди; возвращает false, если очередь пуста */ bool processBuffer() { decltype(bufferList)::value_type buffer; // буфер чтения size_t curCount; // локальный счётчик, чтоб пореже дёргать глобальный { // лок буферов std::unique_lock<std::recursive_mutex> guard(queueLock); // если очередь не пуста забрать очередной буфер к себе if (quit.wait_for(guard, 0ms, [&]{ return !bufferList.empty(); })) // "ждать" без таймаута buffer = std::move(getBuffer()); } // анлок буферов // если юуфер успешно забрался, потому что он есть, ... if (!buffer.empty()) { // ... подсчитать и обновить глобальный счётчик curCount = std::count_if(buffer.begin(), buffer.end(), [](char ch){ return ch == '\n'; }); countLines += curCount; } return !buffer.empty(); // а был ли буфер?.. } /* чекнуть файлы в списке и пополнить очередь; возвращает false, если файлы кончились параметр - индекс нитки в массиве файловых потоков */ bool processFile(int idx) { decltype(bufferList)::value_type buffer; /* если наш файл не открыт, то попробовать найти очередной и открыть */ if (!files[idx].is_open()) { std::lock_guard<std::mutex> guard(dirLock); // лок файлов // пока список не исчерпан, пропускать нефайлы while (item != fs::directory_iterator() && !item->is_regular_file()) ++item; if (item == fs::directory_iterator()) return false; // список файлов исчерпан? files[idx].open(item->path()); // попробовать открыть std::cout << item++->path() << std::endl; } // анлок файлов /* если наш файл открыт, то прочитать буфер и пополнить очередь */ if (files[idx].is_open()) { buffer.resize(bufferSize); // это максимальный размер files[idx].read(&buffer.front(), buffer.size()); buffer.resize(files[idx].gcount()); // а это сколько реально прочлось putBuffer(std::move(buffer)); // осчастливить другие нитки // ну или себя, тут как получится if (files[idx].eof()) files[idx].close(); // не забыть закрыть, если файл кончился } return true; } /* функция нитки параметр - индекс нитки для массива файловых потоков */ void threadFunc(int idx) { // стоп, если нет ни буферов, ни файлов while (processBuffer() || processFile(idx)) /* do nothing */; } int main() { std::vector<std::thread> ths; auto startWork = chrono::system_clock::now(); decltype(startWork) endWork; // инициализация "списка" файлов и массива файловых потоков item = fs::directory_iterator(fs::current_path() / "texts"); files.resize(std::thread::hardware_concurrency()); startWork = chrono::system_clock::now(); // старт // насоздавать ниток for (int i = 0; i < std::thread::hardware_concurrency(); ++i) ths.emplace_back(std::thread(threadFunc, i)); //подождать окончания работы for (auto &i : ths) i.join(); endWork = chrono::system_clock::now(); // стоп std::cout << "count lines "<< chrono::duration_cast<chrono::milliseconds>(endWork - startWork).count() << "ms\nlines " << countLines << std::endl; } Проверил на SSD, но это неинтересно, и ежу понятно, что на SSD разницу не особо увидишь. Вот HDD интереснее, но это уж из дома. |
Сообщ.
#9
,
|
|
|
Цитата Qraizer @ Если кто какие баги найдёт, тот будет молодец. А вот такая строчка из функции потока: std::cout << ... << std::endl; Это точно потоко - безопасный вариант ? А если поток прервётся при частично сделанном выводе ? ----- И вообще всё сложно. Полагаю, надо так: 1. Перечислим файлы в директории и спасём имена в контейнер по вкусу. Список, массив - не важно. 2. Определимся с числом потоков. 3. Создадим массив полноценных объектов - потоков, но без запуска поточных процедур. 4. Загрузим данные о файлах каждому объекту - потоку в их внутренний контейнер. Каждому свою порцию - часть общей работы. (Предусмотрим для этого метод для объекта - потока.) 5. Только после этого запустим поточные процедуры. Интересно будет по-исследовать, как всё "одновременно" будет работать. 6. Из главного потока дождёмся окончания работы всех потоков. 7. Получим результаты вычислений из каждого потока. (Предусмотрим для этого метод для объекта - потока.) 8. Выведем результат - из главного потока. При таком алгоритме даже никакая синхронизация не требуется. Только флаг окончания работы потока. |
Сообщ.
#10
,
|
|
|
Цитата ЫукпШ @ Каждому свою порцию - часть общей работы Осталось поделить "по-честному" |
Сообщ.
#11
,
|
|
|
В общем, задача исключительно академическая, как ни крути. Проверил на HDD, никакой особо разницы и там. Только по другой причине: фактически многопоточность там ни к селу ни к городу, HDD просто не поспевает подавать данные, так что работает тупо один поток. Попытался хоть как-то улучшить ситуацию – раскидал файлы по трём HDD (у меня их 2 по 1Тб и один 2 Тб) и объединил в рамках одного каталога символическими ссылками. Удалось поднять загрузку CPU на один поток, благодаря распараллеливанию дисковых операций по разным SATA контроллерам. Но всё равно иногда нагрузка с 55% падала до 30. Решился на последнее средство: в threadFunc() сменил порядок вызова processBuffer() и processFile() и в рамках последней ограничил количество буферов количеством ядер (иначе б тупо все файлы засасывались бы в память и только потом подсчитывались строки). Так удалось ещё на 15% поднять производительность, но это всё равно меньше одного потока (25%).
В итоге имеем, что если сию задачу делать для продакшна, то нужно реально регулировать количество потоков в пуле и их назначение динамически. Ну и, как и ожидалось, параллелить ли файлы или буферы, не играет сколько-нибудь существенной роли. На HDD буферы проигрывают файлам ~15%, на SDD порядка 8%. P.S. Увеличил общий размер файлов до 10Гб, ибо винда очень любит динамически подстраивать дисковый кеш под текущую активность. Добавлено Цитата ЫукпШ @ Да, они ж под локами на std::mutex.Это точно потоко - безопасный вариант ? Цитата ЫукпШ @ Фактически это и есть первый вариант, с очередью файлов. Второй вариант работает на очереди буферов, что теоретически плавнее нагружает потоки, т.к. они все (почти, файлы могут быть некратны размеру буфера) фиксированного размера, а файлы сильно разного. Но как показала практика, дисковые операции своими тормозами напрочь съедают весь выигрыш. Впрочем, не факт, что на особых сценариях с особо подобранными размерами файлов будет та же картина, синтетика есть синтетика.Полагаю, надо так: ... Цитата ЫукпШ @ Она и тут не особо влияет, только операции с очередью, которые и так очень оперативны, благодаря перемещениям элементов. Были бы копирования, было бы дороже.При таком алгоритме даже никакая синхронизация не требуется. P.S. Цитата ЫукпШ @ Файлы могут быть быть разного размера, так что примерно равные очереди отнюдь не обязательно равномерно нагрузят потоки. Потому я и задумался об очереди буферов вместо очереди файлов. 4. Загрузим данные о файлах каждому объекту - потоку в их внутренний контейнер. Добавлено P.P.S. Что любопытно, при реализации второго вариант допустил лишь одну алгоритмическую ошибку: забыл ++ сделать item в строке 90. Потом лишь оптимизировал архитектуру многопоточности. С первым вариантом, который проще, как ни странно, было не так шоколадно . Эх, люблю современный C++, рабочий код почти сразу из-под пальцев |
Сообщ.
#12
,
|
|
|
Цитата Majestio @ Цитата ЫукпШ @ Каждому свою порцию - часть общей работы Осталось поделить "по-честному" Самое не сложное из всего. (Конечно, прежде чем советовать, я сам всё это попробовал) Вот такой отрезок: // загрузим данные для работы потокам // sizeList - размер исходного контейнера // threads - число потоков // memWth - массив указателей на потоки // item - структура данных для обмена. // имя файла, счётчик строк, код возврата операции с файлом size_t j=0; for(size_t i=0;i<sizeList;++i) { FilList.ReadFromList(name); fwItem item; item.fileName = name; memWth[j++]->AddItem(item); if(j >= threads) j=0; } А там уж как повезёт, но всё-таки файлы берём не подряд, а в перемешку. Разброс в размерах текстовых файлов может быть, но не в дохреннилиард раз. Добавлено Цитата Qraizer @ P.P.S. Что любопытно, при реализации второго вариант допустил лишь одну алгоритмическую ошибку: А вот ещё такие соображения: 1. Есть ли предохранитель от использования исполнимого файла (самого себя) при работе в своём каталоге ? Фильтра при перечислении файлов, как я понял, в твоей реализации нет. 2. Не все строки текстового файла оканчиваются LF (\n). Это характерная ошибка при работе с текстовыми файлами. Иногда юзер забывает стелать Enter в конце файла. В результате последняя строка файла оканчивается не LF, а концом файла. Но от этого она не перестаёт быть строкой.. |