Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[18.191.189.120] |
|
Страницы: (2) [1] 2 все ( Перейти к последнему сообщению ) |
Сообщ.
#1
,
|
|
|
Всем привет!
Думал, что может, лучше разместить эту тему в Алгоритмах, но всё же решил тут, т.к. тема довольно сильно завязана на low-level. Решил написать небольшой include для fasm под Windows, который будет позволять выполнять параллельные вычисления для for-циклов по аналогии с TParallel.For в Delphi или Parallel.For в C#, но без таких заморочек, как там, а что-то простенькое (строк на 500 асм-кода, ну может, чуть больше). Результат потом выложу. Кто не в теме – схема такая. Запускается функция инициализации, которая создаёт ждущие потоки. Далее (сразу или в нужный момент) вызывается функция, которой передаётся адрес процедуры цикла (ProcAddr) диапазон цикла for (либо кол-во итераций). Эта функция будит потоки (SetEvent) и либо завершает свою работу, либо ждёт окончания цикла. Потоки, в свою очередь, инициализируют цикл по определённым правилам (см. ниже вопросы) и вызывают на каждом цикле процедуру цикла (ProcAddr), передавая ей номер цикла и порядковый номер потока (чтобы было удобнее группировать результаты вычислений без использования lock и критических секций). В конце программист вызывает функцию завершения, которая даёт потокам команду завершить работу. В ходе реализации возникло несколько моментов, о которых я решил посоветоваться с местными спецами Итак, вопрос №1: какое кол-во потоков оптимально? Тут два варианта: Как лучше, на ваш взгляд, и почему? Кроме того, хочу по умолчанию умножать это число на 2 (потому что функция цикла может грузить процессор не на все 100%, а так всё же надёжнее будет). Ну и давать программисту возможность ограничивать общее число потоков сверху. Вопрос №1А (туда же): есть ли смысл назначать affinity mask или хотя бы номер идеального процессора для каждого потока? Мне кажется, что нет (система сама разберётся как лучше). Вопрос №2: как лучше распределять итерации циклов между потоками? Представим, что у нас 4 потока и мы делаем цикл i=0...999. А может, у вас есть более интересная идея (но не чересчур замороченная)? Вопрос №3: достаточно ли указания кол-ва итераций (причём, знаковым значением, т.е. 0x7FFFFFFF для 32-х бит) или обязательно нужно указывать и начальное, и конечное значение счётчика? Вариант с указанием обоих границ имеет некоторые сложности в реализации при кол-ве итераций, приближённых к максимально возможным. Но всё решаемо, конечно, при необходимости. Другой вопрос: есть ли такая необходимость? Что думаете обо всём этом? Повторюсь, что это не претензия на какую-то грандиозную задумку, а довольно простой, но всё же полезный инструмент для ускорения циклических вычислений. |
Сообщ.
#2
,
|
|
|
Не понял реализации захвата итераций. На мой дилетантский взгляд это выглядит приблизительно так: имеется массив итераций, и захват итерации номер N потоком K cостоит в том, что в N-й элемент этого массива вместо нуля записывается K. Если так, то можно модифицировать схему блочного захвата. Установил ты длину блока в 3, и некий поток захватил себе некие 3 элемента. После выполнения одного цикла он просто пытается "добить" количество захваченных себе элементов до установленного количества, ничего не локая и глубоко плюя на возможные интерференции. Т.е. тупо сканит массив по принципу: мой? плюс один к счётчику... ноль? захватить себе и плюс один... счётчик = 3? угомониться и начать выполнение итерации с наименьшим из номеров.
|
Сообщ.
#3
,
|
|
|
Akina, ты про 2 и 3 методы? Зачем тут массив? Просто имеется глобальная (для всех потоков) переменная, которая хранит номер следующей итерации. Перед запуском итерации поток делает
mov eax,1 lock xadd [NextIteration],eax В 3-м методе схема примерно та же, только там будет не номер итерации, а номер блока. |
Сообщ.
#4
,
|
|
|
Ааа... ну так это же ж догадаться нужно.
У тебя размер блока статический. А это значит, что выигрыш от многопоточности ты слегка теряешь, поскольку всё одно ждать окончания работы самого последнего из работающих потоков. Уменьшение BLC снижает среднее время этой потери, конечно... но при этом увеличивается количество перезапросов очередного блока на обработку и, соответственно, локов. А потому кажется мне, что вполне себе можно сделать количество захватываемых на обработку блоков динамическим. Навскидку что-то вроде (текущий BLC) = ОкруглитьВверх{ константа * (количество оставшихся итераций) / (количество потоков) } где константа - опять же навскидку нечто вроде 1/e..1/2. Округление же вверх гарантирует захват как минимум одной итерации. |
Сообщ.
#5
,
|
|
|
Есть ещё вот такая идея...
Для каждого потока задано кол-во оставшихся итераций (обозначим его как IC). К примеру, если у нас цикл 0...999 и 4 потока, то на старте для всех нитей IC=250 (и при запуске итерации это число будет уменьшаться без блокировки). Допустим, thread1 закончил раньше. При этом у thread2:IC=30, thread3:IC=20, thread4:IC=10. 1. Ищем максимальное IC. Если это значение = 0, завершаем свою работу. Иначе мы нашли нить, у которой будем отбирать работу (это thread2, у него IC=30, обозначим его как ICx). 2. Вычисляем NewIC = ICx/2 = 15. 3. Делаем xchg ICx,NewIC (тут срабатывает блокировка). 4. Смотрим какое значение мы получили через этот обмен. Если оно <= NewIC, значит goto 1 (такое возможно, когда ICx=1 или около того, либо ещё какой-то поток тоже закончил работу и влез раньше нас). 5. Устанавливаем у себя IC = ICx-NewIC, выполняем оставшиеся итерации. Нормальная схема? |
Сообщ.
#6
,
|
|
|
Цитата Jin X @ Ищем максимальное IC. Если это значение = 0 0 или 1, наверное... если у потока осталась одна итерация, есть ли смысл её отбирать? Цитата Jin X @ Для каждого потока задано кол-во оставшихся итераций По одному значению нельзя определить, какие именно итерации "отбираются". Должны храниться два значения: "сколько" и "начиная с". Тогда при "отборе" достаточно корректировать (с блокировкой) только значение "сколько" у ограбленного потока, забирая остаток себе и соотв. образом пересчитав для себя "начиная с". А значение "начиная с" чужого потока только читается, так что тут блокировка не требуется. |
Сообщ.
#7
,
|
|
|
Ещё такой момент интересует: т.к. итерации всё равно будут идти не совсем последовательно (скажем, 1,3,2,5,4,6,7,9,8,11,10,13,12...), может, вообще стоит делить весь цикл на равные части и отдавать потокам целиковые куски?
Т.е. не так: thread1=0,4,8,12... thread2=1,5,9,13... thread3=2,6,10,14... thread4=3,7,11,15... А вот так: thread1=0..249 thread2=250..499 thread3=500..749 thread4=750..999 Для упрощения реализации (и ускорения). В итоге итерации будут происходить примерно в таком виде: (0,250,750,1,1000,251,751,1001,252,2,752,253,1002,3,753,503,1003...). Или не стоит, т.к. иногда этот вариант может быть более напряжным? Хотя и там, и там цикл не последовательный... Добавлено Цитата Akina @ А почему нет, если он сейчас занят другой? 0 или 1, наверное... если у потока осталась одна итерация, есть ли смысл её отбирать? Добавлено Цитата Akina @ Ну да. Я сначала подумал, что это будет хранится "внутри" потока. Но теперь понимаю, что так не пойдёт... потому что отбирающий поток не знает, с какого счёта начал работать отбираемый (т.к. он мог сам отобрать у кого-то). Должны храниться два значения: "сколько" и "начиная с". Добавлено Цитата Akina @ Главное, чтобы этот поток между xchg "осталось" и чтением "начиная с" не успел быстро закончить свою работу и отобрать работу у кого-нибудь другого. Так что, сдаётся мне, что тут попахивает cmpxchg8b... А значение "начиная с" чужого потока только читается, так что тут блокировка не требуется. Добавлено Нет... не "начиная с" нужно хранить, а "номер следующей итерации". Т.к. по "начиная с" и "осталось" мы не поймём какая итерация следующая... |
Сообщ.
#8
,
|
|
|
Цитата Jin X @ Главное, чтобы этот поток между xchg "осталось" и чтением "начиная с" не успел быстро закончить свою работу Именно от этой "неприятности" защищает не-отбор последней итерации. Цитата Jin X @ не "начиная с" нужно хранить, а "номер следующей итерации". Т.к. по "начиная с" и "осталось" мы не поймём какая итерация следующая... Если мы храним начало и текущее количество, мы обновляем только одно значение. И второе не надо лочить. А если хранить номер следующей - надо лочить оба значения. Цитата Jin X @ Или не стоит, т.к. иногда этот вариант может быть более напряжным? Хотя и там, и там цикл не последовательный... Предполагается, что итерации независимы. Иначе какое в пень распараллеливание? А коли так, то последовательные блоки упрощают расчёты при перехватах. Цитата Jin X @ может, вообще стоит делить весь цикл на равные части и отдавать потокам целиковые куски? Несомненно. Иначе в конце процесса начнутся сложности с расчётом, когда потоки начнут выбывать. |
Сообщ.
#9
,
|
|
|
Цитата Akina @ А где гарантия, что в этом же промежутке не успеют отработать 2, 3 или 5 итераций?Именно от этой "неприятности" защищает не-отбор последней итерации. Цитата Akina @ А мне и не надо трогать чужое "следующее" значение. Я его только читать буду.А если хранить номер следующей - надо лочить оба значения. И как ты определишь с какой позиции нужно "отобрать" итерации у потока, если, скажем, известно, что поток начал с 100-й итерации и осталось 50? На какой он сейчас находится? Цитата Akina @ Последовательные блоки - это...А коли так, то последовательные блоки упрощают расчёты при перехватах. thread1=0..249 thread2=250..499 thread3=500..749 thread4=750..999 ??? Цитата Akina @ Ну вот и я про то же... Несомненно. Иначе в конце процесса начнутся сложности с расчётом, когда потоки начнут выбывать. Добавлено Кстати, лучше хранить номер последней итерации и кол-во оставшихся. Рабочий поток будет на каждом цикле уменьшать кол-во оставшихся (без lock'а), а номер текущей итерации будет храниться просто в регистре. Обирающий поток будет проверять кол-во оставшихся итераций и просто читать номер последней итерации, чтобы вычислить откуда ему продолжить (это всё без lock'а), а lock cmpxchg8b здесь будет работать эффективнее, т.к. поток меняет только одно из значений, и меньше вероятности "несрабатывания" (сравнения с ложным результатом). При этом решается ещё одна проблема: если хранить номер следующей итерации и кол-во оставшихся, рабочему потоку нужно как-то менять эти значения одновременно. А тут только одно. |
Сообщ.
#10
,
|
|
|
Цитата Jin X @ Кстати, лучше хранить номер последней итерации и кол-во оставшихся. Кстати, да. Хранить только "мне остались итерации от и до", не включая текущую обрабатываемую итерацию. Соответственно сам поток меняет только своё "от". И это "от" меняет только он. Лок соответственно не требуется - как я сказал выше, последнюю итерацию не отбираем. Сторонние потоки же меняют, наоборот, только его "до", а себе забирают итерации от (новое "до" плюс один) по (старое "до"). Тут как раз лок нужен - потому как сразу два потока могут попытаться ограбить текущий поток. Цитата Jin X @ За одну процессорную команду? да ещё с локом? вот что-то я засомневался... где гарантия, что в этом же промежутке не успеют отработать 2, 3 или 5 итераций? |
Сообщ.
#11
,
|
|
|
Цитата Akina @ Дело не в этом.За одну процессорную команду? да ещё с локом? вот что-то я засомневался... У нас 2 команды. Между ними может произойти переключение потоков (т.к. эти 2 потока могут исполняться на одном физическом ядре). И этот поток может хоть 10 циклов прокрутить. |
Сообщ.
#12
,
|
|
|
Я придумал, как сделать без локов!!!
Локи будут только у перехватчиков (это для алгоритма с перехватами, описанного здесь (#5), там я его немного модифицировал, но суть та же). Теперь у каждого потока есть счётчик текущей и последней выполняемой для данного потока итерации (Cur и Last). В массиве ThreadCounters они расположены друг за другом (Cur0,Last0, Cur1,Last1, Cur2,Last2...). Код работающего потока: ; ebx - номер текущего цикла (Cur), на старте - начальный номер ; esi - номер текущего потока, начиная с 0 cmp ebx,[ThreadCounters+esi*8+4] ; Cur <= Last ? jle .start ; да, начинаем jmp .finish ; нет, пустой цикл, сразу выходим .loop: mov [ThreadCounters+esi*8],ebx ; Cur (mov атомарен, lock ненужен) :) .start: stdcall [MainProc] ; вызов основной функции цикла inc ebx cmp ebx,[ThreadCounters+esi*8+4] ; Cur+1 <= Last ? jle .loop ; да, продолжаем цикл .findjob: ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток, ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает) Перехватчик (отбирающий поток) делает так: ; edi содержит номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 0 ; [хотя меньше 0 она не должна быть], завершаем работу, не доходя до этого места в коде) mov ebp,ATTEMPT_COUNT ; кол-во попыток отобрать работу у какого-либо потока, пусть будет = 4 mov edx,[ThreadCounters+edi*8+4] ; edx = Last (обновляем данные) mov eax,[ThreadCounters+edi*8] ; eax = Cur .tryagain: mov ebx,edx sub ebx,eax ; Last - Cur ; какой результат мы можем тут получить? ; если поток завершил работу или выполняет последнюю итерацию, то Cur = Last, ebx = 0 ; если поток выполняет не самую последнюю итерацию, то ebx > 0 (даже если кол-во итераций = 0x80000000, а больше ; быть не может, т.к. при работе в 1 потоке мы будет действовать по-другому, там всё это нагромождение ни к чему) cmp ebx,1 jle .findjob ; если этот поток уже завершён, выполняет последнюю итерацию или у него осталась ещё лишь 1 итерация, ; попробуем найти какой-нибудь другой поток (хотя... может, лучше сразу на выход?); последнюю итерацию отбирать ; нельзя, иначе может произойти казус, если lock cmpxchg8b попадёт между cmp-jle-mov в цикле кода работающего потока ; inc ebx ; раскомментировать, если мы хотим делить нечётные числа в свою пользу, а не в пользу потока-владельца shr ebx,1 ; кол-во отбираемых итераций mov ecx,edx sub ecx,ebx ; ecx = новый номер последней итерации, т.е. новый Last = (Last-(Last-Cur)/2) mov ebx,eax ; ebx = Cur, его мы не меняем, а только проверяем (чтобы он случайно не вышел за границы нового Last'а) lock cmpxchg8b [ThreadCounters+edi*8] ; if (edx:eax == Last:Cur) { Last = ecx; Cur = ebx; } jnz .robfail ; облом! :( ; ура, успех! lea ebx,[ecx+1] ; наш новый Cur = Last+1 потока, у которого мы отобрали работу mov [ThreadCounters+esi*8+4],0x80000000 ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет ; (влезая между двумя следующими mov'ами) mov [ThreadCounters+esi*8],ebx ; Cur mov [ThreadCounters+esi*8+4],edx ; наш новый Last jmp .start .robfail: cmp edx,[ThreadCounters+edi*8+4] jne .findjob ; какая-то зараза (другой поток) успел влезть и перехватить работающий поток раньше нас - идём искать новую работу! :( dec ebp jnz .tryagain ; мы просто не успели, поток перешёл на следующую итерацию, пробуем ещё ebp раз... ; если кол-во попыток исчерпано, продолжать смысла нет, поток слишком быстро отрабатывает итерации .finish: ; завершение цикла для данного потока Вроде всё должно быть чётко. Но прошу проследить за моей мыслью, вдруг я что-то не учёл... |
Сообщ.
#13
,
|
|
|
Итак, новый вариант (реализация накладывает свои коррективы на теорию).
Имеем структуру ThreadData, которая содержит 4 поля dword для каждого потока: Handle, Lock (флаг запрета обработки итераций), Current (номер текущей итерации), Last (номер последней итерации). Обозначу смещения до них как @Handle, @Lock, @Current и @Last (0, 4, 8 и 12). Работающий поток: Код работающего потока: ; esi = номер текущего потока, начиная с 0. mov ebp,[ForProc] ; ebp = адрес основной процедуры цикла lea edi,[esi+esi] lea [edi*8+ThreadData] ; edi = ThreadData + esi*16 mov ebx,[edi+@Current] cmp ebx,[edi+@Last] ; Current <= Last ? jle .start ; да, стартуем jmp .finish ; нет, это пустой цикл, сразу выходим ; Spin-цикл ожидания разблокировки (пока кто-то пытается отобрать у нас работу) .pause: mov eax,16 @@: pause cmp dword [edi+@Lock],0 je .start ; выходим из цикла dec eax jnz @B invoke SwitchToThread ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре jmp .pause .loop: mov [edi+@Current],ebx ; mov атомарен, lock не нужен cmp dword [edi+@Lock],0 jne .pause ; у нас пытаются отобрать работу? .start: stdcall ebp, ebx, esi ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока) inc ebx ; Current++ cmp ebx,[edi+@Last] ; Current+1 <= Last ? jle .loop ; да, продолжаем цикл .findjob: ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток, ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает) Перехватчик (отбирающий поток) делает так: ; edx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1 ; завершаем работу, не доходя до этого места в коде) ; eax = Last этого потока lea ecx,[edx+edx] lea ecx,[ecx*8+ThreadData] ; ecx = ThreadData + edx*16 mov edx,1 xchg [ecx+@Lock],edx ; блокируем работающий поток (даже если мы попали не туда, и он ; прокрутит ещё одну итерацию, это не страшно, дальше заблокируется, у нас есть запас) test edx,edx jnz .findjob ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас - идём искать новую работу mov edx,eax sub edx,[ecx+@Current] ; кол-во оставшихся итераций cmp edx,1 ; Last-Current <= 1 ? jle .toolate ; да, осталось слишком мало итераций shr edx,1 ; кол-во отбираемых итераций mov ebx,eax sub ebx,edx ; новый @Last cmpxchg [ecx+@Last],ebx ; if (eax = Last) Last = ebx mov [ecx+@Lock],0 ; освобождаем поток jnz .findjob ; оказывается, другой поток успел отхватить работу аж до xchg! inc ebx ; наш новый Current = Last+1 потока, у которого мы отобрали работу mov [edi+@Last],0x80000000 ; Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет ; (влезая между двумя следующими mov'ами) mov [edi+@Current],ebx mov [edi+@Last],eax ; наш новый Last jmp .start .toolate: mov [ecx+@Lock],0 ; освобождаем поток jmp .findjob ; идём искать новую работу .finish: ; завершение цикла для данного потока И этот вариант мне тоже не до конца нравится из-за длинной блокировки основного потока при перехвате (и SwitchToThread). Более длинной, чем при cmpxchg8b. Но вариант с cmpxchg8b может обломаться при быстрых циклах. Короче, х/з. Ещё что-то придумывать??? |
Сообщ.
#14
,
|
|
|
Итак, новый вариант...
Суть в том, что мы будем записывать в Lock не просто 1 и 0, а изначально хранить там 0x7FFFFFFF, а потом заносить новый Last. А работающий в цикле поток будет останавливаться не при любом ненулевом значении, а только когда Current доходит до Lock. Т.е. по факту он почти никогда не будет подвисать. Работающий поток: ; THREAD_UNLOCKED = 0x7FFFFFFF - значение, при котором поток разблокирован (MaxInt) ; esi = номер текущего потока, начиная с 0. mov ebp,[ForProc] ; ebp = адрес основной процедуры цикла lea edi,[esi+esi] lea [edi*8+ThreadData] ; edi = ThreadData + esi*16 mov ebx,[edi+@Current] cmp ebx,[edi+@Last] ; Current <= Last ? jle .start ; да, стартуем jmp .finish ; нет, это пустой цикл, сразу выходим .loop: mov [edi+@Current],ebx ; Current++ (mov атомарен, lock не нужен) .start: stdcall ebp, ebx, esi ; вызов основной функции цикла (с 2 параметрами: номером итерации и потока) inc ebx ; Current+1 cmp ebx,[edi+@Lock] ; Current+1 <= Lock ? jle .continue ; да, продолжаем ; делаем паузу, если достигли значение Lock (spin-цикл ожидания разблокировки) .pause: mov eax,16 @@: pause cmp dword [edi+@Lock],THREAD_UNLOCKED je .continue ; выходим из spin-цикла dec eax jnz @B invoke SwitchToThread ; раз в 16 итераций переключаем контекст, т.к. при таком большом числе ; циклов есть немалая вероятность, что перехватывающий поток находится на том же ядре jmp .pause ; продолжаем цикл .continue: cmp ebx,[edi+@Last] ; Current+1 <= Last ? jle .loop ; да, продолжаем цикл .findjob: ; тут мы переходим в режим перехватчика и начинаем искать незавершённый поток, ; у которого есть необработанные итерации (не считая текущей, которую он обрабатывает) Перехватчик: ; ecx = номер потока с наибольшей разницей Last-Cur (если у всех потоков эта разница <= 1 ; завершаем работу, не доходя до этого места в коде) ; ebx = Last этого потока add ecx,ecx lea ecx,[ecx*8+ThreadData] ; ecx = ThreadData + ecx*16 mov eax,ebx mov edx,ebx sub eax,[ecx+@Current] shr eax,1 ; (Last - Current) / 2 sub edx,eax ; edx = Last - eax = новый Lock, он же новый Last mov eax,THREAD_UNLOCKED lock cmpxchg [ecx+@Lock],edx ; if (Lock == THREAD_UNLOCKED) { Lock = edx ; zf = 1; } else { eax = Lock; zf = 0; } jnz .findjob ; какая-то зараза (другой поток) успела влезть и перехватить работающий поток раньше нас, идём искать новую работу cmp [ecx+@Current],edx jge .toolate ; поток уже дошёл до итерации Lock (ситуация Current = Lock = новый Last нам тоже не подходит) mov eax,ebx cmpxchg [ecx+@Last],edx ; if (eax == Last) { Last = edx; } mov [ecx+@Lock],THREAD_UNLOCKED ; освобождаем поток jnz .findjob ; оказывается, поток закончил работу и взял итерации у кого-то другого, ; либо другой поток успел отжать работу у нашего подопечного ещё до lock cmpxchg! inc edx ; наш новый Current = новый Last потока, у которого мы отобрали работу + 1 mov [edi+@Last],0x80000000 ; наш Last = MinInt, чтобы другой поток не отжал у нас то, чего ещё у нас пока нет ; (влезая между двумя следующими mov'ами) mov [edi+@Current],edx mov [edi+@Last],eax jmp .start .toolate: mov [ecx+@Lock],THREAD_UNLOCKED ; освобождаем поток jmp .findjob ; идём искать новую работу .finish: ; завершение цикла для данного потока Добавлено Осталось теперь проверить на практике |
Сообщ.
#15
,
|
|
|
Можно слегонца распараллелить. Не так
add ecx,ecx lea ecx,[ecx*8+ThreadData] ; ecx = ThreadData + ecx*16 mov eax,ebx mov edx,ebx add ecx,ecx mov eax,ebx lea ecx,[ecx*8+ThreadData] ; ecx = ThreadData + ecx*16 mov edx,ebx |