Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[18.97.9.175] |
|
Сообщ.
#1
,
|
|
|
Захотел поразмять мозги в новогодние выходные.
Поставил себе задачу: 1. Написать некий класс-обёртку для использования порта завершения ввода/вывода (TIOCP) 2. Написать класс для испльзования готового TIOCP. (TJob). То, что получилось, выкладываю с примерами - может кому пригодится. Не шедевр, конечно, но работает. Критика приветствуется. Логика использования: 1. Создаётся экземпляр класса TIOCP вместе с пулом потоков 2. Создаётся наследник класса TJob, в котором - В перекрытом конструкторе открываются нужные устройства (файлы, сокеты, пайпы и пр.) - В реализованном абстрактном методе TJob.ExecProc реализуется логика обработки в пуле потоков - Основной поток получает данные о выполнении задания и может реагировать в зависимости от ситуации Пример пока один. Сейчас пишу второй - сервер перенаправления портов TCP, выложу чуть позже. Модуль с классами Скрытый текст (*------------------------------------------------- Обёртка для работы с портом завершения ввода/вывода 2012, Демо sources.ru -------------------------------------------------*) unit uIOCP; interface uses Windows, SysUtils, Messages, Classes, SyncObjs; type TIOCP=class; //Состояния заданий TJobState=(jsNone, jsProcess, jsError, jsCompleted); // Экземпляры этого класса используются //для выполнения заданий с использованием IOCP TJob=class private FIOCP: TIOCP; //Ссылка на класс с IOCP FOV: POverlapped; //OVERLAPPED-структура, используемая IOCP FState: TJobState; FNumBytes: Cardinal; //Количество байт, прочитанных/записанных //последней операцией FCS: TCriticalSection; //Критическая секция для использования в TJob FUD: Pointer; //Пользовательские данные FErrorCode: Integer; //Последняя ошибка FCodeOp: Cardinal; //Текущий код операции function GetPort: THandle; public constructor Create(IOCP: TIOCP); destructor Destroy; override; procedure ExecProc; virtual; abstract; //Процедура, выполняемая //в потоках IOCP function Complete(Code: Cardinal): Boolean; //Вызывает немедленную обработку //в потоке function Associate(DeviceHandle: THandle): THandle; //Добавление устройства в таблицу IOCP //Вызывает метод из TIOCP function CheckIOCP: Boolean; //Проверка, жив ли ещё IOCP property IOPort: THandle read GetPort; property State: TJobState read FState write FState; property NumBytes: Cardinal read FNumBytes; property CS: TCriticalSection read FCS; property ErrorCode: Integer read FErrorCode write FErrorCode; property UserData: Pointer read FUD write FUD; property OV: POVERLAPPED read FOV; property CodeOp: Cardinal Read FCodeOp Write FCodeOp; end; //Класс порта завершения TIOCP=class private FIOCP: THandle; //Порт IOCP FThreadList: TList; //Список потоков, которые обрабатывают порт IOCP FWorkThreadsCount: Integer; //Количество рабочих потоков. Обычно равно (кол-во процессоров) FMaxThreadsCount: Integer; //Общееколичество потоков. Обычно ранво (кол-во процессоров)*2 public constructor Create(ThreadsCount: Cardinal=0); destructor Destroy; override; function Associate(DeviceHandle: THandle; CompKey: Cardinal): THandle; //Добавление устройства в таблицу IOCP property IOPort: THandle read FIOCP; property WorkThreadsCount: Integer read FWorkThreadsCount; end; //Потоки, связанные с IOCP TWorkThread=class(TThread) private FIOCP: TIOCP; public constructor Create(IOCP: TIOCP); procedure Execute; override; end; implementation { TIOCP } function TIOCP.Associate(DeviceHandle: THandle; CompKey: Cardinal): THandle; begin Result := CreateIoCompletionPort(DeviceHandle,FIOCP,CompKey,0); end; constructor TIOCP.Create(ThreadsCount: Cardinal=0); var i: Integer; SI: TSystemInfo; begin //Количество потоков по-умолчанию = Кол-во процессоров*2 if ThreadsCount=0 then begin GetSystemInfo(SI); FWorkThreadsCount := SI.dwNumberOfProcessors; FMaxThreadsCount := FWorkThreadsCount*2; end else begin FWorkThreadsCount := ThreadsCount; FMaxThreadsCount := FWorkThreadsCount; end; //Сооздаём порт заввершения в/в FIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,FWorkThreadsCount); if FIOCP=0 then raise Exception.Create(SysErrorMessage(GetLastError)); FThreadList := TList.Create; for i := 1 to FMaxThreadsCount do begin FThreadList.Add(TWorkThread.Create(Self)); end; end; destructor TIOCP.Destroy; var i: Integer; begin //Посылаем всем потокам команду закончить работу (CompKey=0) for i := 0 to FThreadList.Count-1 do PostQueuedCompletionStatus(FIOCP,0,0,nil); //Ожидаем завершения всех потоков for i := 0 to FThreadList.Count-1 do begin with TWorkThread(FThreadList[i]) do begin WaitFor; Free; end; end; CloseHandle(FIOCP); FThreadList.Free; inherited Destroy; end; { TWorkThread } constructor TWorkThread.Create(IOCP: TIOCP); begin inherited Create(True); FIOCP := IOCP; FreeOnTerminate := False; Resume; end; procedure TWorkThread.Execute; var NumBytes: Cardinal; CompKey: Cardinal; ov: POVERLAPPED; J: TJob; begin while not Terminated do begin if GetQueuedCompletionStatus(FIOCP.IOPort,NumBytes,CompKey,ov,INFINITE) then begin if CompKey=0 then begin Terminate; Continue; end; //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1 if Compkey=1 then Continue; //Полученный CompKey и есть наш TJob J := Pointer(CompKey); try J.FNumBytes := NumBytes; //Выполняем обработку J.ExecProc; except //Возвращаем признак ошибки J.FState := jsError; end; end else begin if CompKey<>0 then begin //Возвращаем признак ошибки J := Pointer(CompKey); J.FErrorCode := GetLastError; J.FState := jsError; end; end; end; end; { TJob } function TJob.Associate(DeviceHandle: THandle): THandle; begin Result := FIOCP.Associate(DeviceHandle,Cardinal(Self)); end; //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1 function TJob.CheckIOCP: Boolean; var Err: Integer; begin Result := True; if not PostQueuedCompletionStatus(FIOCP.FIOCP,0,1,nil) then begin Err := GetLastError; if Err=ERROR_INVALID_HANDLE then Result := False; end; end; function TJob.Complete(Code: Cardinal): Boolean; begin CodeOp := Code; Result := PostQueuedCompletionStatus(FIOCP.IOPort,0,Cardinal(Self),POVERLAPPED(FOV)); end; constructor TJob.Create(IOCP: TIOCP); begin FIOCP := IOCP; FState := jsNone; FCS := TCriticalSection.Create; New(FOV); FOV^.Internal := 0; FOV^.InternalHigh :=0; FOV^.Offset := 0; FOV^.OffsetHigh := 0; //При необходимости выполнения операции с устройством без передачи IOCP //перед выполнением операции устанавливаем младший бит hEvent в 1: // FOV^.hEvent := FOV^.hEvent or 1; FOV^.hEvent := CreateEvent(nil,True,False,nil); end; destructor TJob.Destroy; begin CloseHandle(FOV^.hEvent); Dispose(FOV); FCS.Free; inherited; end; function TJob.GetPort: THandle; begin Result := FIOCP.FIOCP; end; end. Пример с копированием файлов: Скрытый текст unit Unit1; interface uses Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms, Dialogs, StdCtrls, uIOCP, ExtCtrls; const //Коды операций coRead = 1; coWrite = 2; BufSize=64*1024; //Размер буфера чтения/записи type TForm1 = class(TForm) Button1: TButton; Button2: TButton; Label1: TLabel; Timer1: TTimer; procedure Button1Click(Sender: TObject); procedure Button2Click(Sender: TObject); procedure Timer1Timer(Sender: TObject); private { Private declarations } public { Public declarations } end; //Наследник TJob //В этом классе реализована вся логика по копированию файла TMyJob=class(TJob) constructor Create(IOCP: TIOCP;const SrcPath, DestPath: String); destructor Destroy; override; procedure ExecProc; override; end; //Пользовательские данные PMyData=^TMyData; TMyData=record HSrc: THandle; //Дескрипторы входного HDest: THandle; //и выходного файлов FileSize: LARGE_INTEGER; //Размер исходного файла CopiedSize: LARGE_INTEGER; //Текщий размер скопированных данных Buffer: PByte; //Буфер для копирования end; var Form1: TForm1; IOCP: TIOCP=nil; //Экземпляр класса IOCP Job: TMyJob=nil; //Задание для копирования //Функция получения размера файла function GetFileSizeEx(hFile: THandle; out lpFileSize: LARGE_INTEGER): BOOL; stdcall; external 'kernel32.dll'; implementation {$R *.dfm} procedure TForm1.Button1Click(Sender: TObject); begin //Создаём экземпляр класса IOCP вместе с пулом потоков IOCP := TIOCP.Create; //В констркторе открываем входной и выходной файлы, инициализируем структуры Job:= TMyJob.Create(IOCP,'d:\temp\temp.iso', 'd:\temp\temp_.iso'); //Связываем файлы в IOCP Job.Associate(PMyData(JOb.UserData)^.HSrc); Job.Associate(PMyData(JOb.UserData)^.HDest); //Фиктивный вызов IOCP для запуска проыесса копирования Job.Complete(coWrite); end; procedure TForm1.Button2Click(Sender: TObject); begin //Освобождение всех потоков и IOCP FreeAndNil(IOCP); end; procedure TForm1.Timer1Timer(Sender: TObject); var PD: PMyData; CopiedSize: Int64; begin (Sender as TTimer).Enabled := False; if Assigned(Job) then begin //Пользовательские данные заполняются в TMyJob.ExecProc PD := Job.UserData; Job.CS.Enter; //Синхронизация доступа из разных потоков try CopiedSize := PD^.CopiedSize.QuadPart; finally Job.CS.Leave; end; Label1.Caption := 'Скопировано '+FormatFloat('#,##',CopiedSize); if Job.State = jsCompleted then begin Label1.Caption := 'Копирование закончено, скопировано '+FormatFloat('#,##',CopiedSize); FreeAndNil(Job); end else begin //Жив ли порт завершения в/в? if not Job.CheckIOCP then begin Label1.Caption := 'Копирование закончено (ошибка IOCP), скопировано '+FormatFloat('#,##',CopiedSize); FreeAndNil(Job); end; end; end; (Sender as TTimer).Enabled := True; end; { TMyJob } constructor TMyJob.Create(IOCP: TIOCP;const SrcPath, DestPath: String); var PD: PMyData; begin inherited Create(IOCP); New(PD); UserData :=PD; PD^.Buffer := nil; //VirtualAlloc используется для того, чтобы страницы памяти были выровнены по границе 64К PD^.Buffer := VirtualAlloc(nil,BufSize,MEM_COMMIT,PAGE_READWRITE); PD^.HSrc := CreateFile(PChar(SrcPath), GENERIC_READ, FILE_SHARE_READ, nil, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, 0); if not GetFileSizeEx(PD^.HSrc, PD^.FileSize) then raise Exception.Create(SysErrorMessage(GetLastError)); PD^.HDest := CreateFile(PChar(DestPath), GENERIC_WRITE, 0, nil, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED, 0); if PD^.HDest=INVALID_HANDLE_VALUE then begin raise Exception.Create(SysErrorMessage(GetLastError)); end; PD^.CopiedSize.QuadPart := 0; end; destructor TMyJob.Destroy; begin if PMyData(UserData)^.Buffer<>nil then VirtualFree(PMyData(UserData)^.Buffer,0,MEM_RELEASE); if PMyData(UserData)^.HSrc<>0 then CloseHandle(PMyData(UserData)^.HSrc); if PMyData(UserData)^.HDest<>0 then CloseHandle(PMyData(UserData)^.HDest); Dispose(PMyData(UserData)); inherited; end; //Эта процедура вызывается при обработке завершения операции ввода/вывода в пуле потоков procedure TMyJob.ExecProc; var PD: PMyData; dwNumBytes: Cardinal; begin PD := UserData; // if (State=jsError) or (State=jsCompleted) then Exit; case CodeOp of coRead: //Закончена операция чтения begin CodeOp := coWrite; //Следующая операия - запись if not WriteFile(PD^.HDest,PD^.Buffer^,NumBytes,dwNumBytes,OV) then begin ErrorCode := GetLastError; if ErrorCode<>ERROR_IO_PENDING then //Операция жолжна быть поставлена в очередь begin State := jsError; Exit; end else ErrorCode := 0; end; end; coWrite: //Закончена операция записи begin CodeOp := coRead; //Следующая операия - чтение CS.Enter; try //В пользовательских данных ведём подсчёт уже записанного объёма PD^.CopiedSize.QuadPart := PD^.CopiedSize.QuadPart + NumBytes; finally CS.Leave; end; OV^.Offset := PD^.CopiedSize.LowPart; OV^.OffsetHigh := PD^.CopiedSize.HighPart; if PD^.CopiedSize.QuadPart<PD^.FileSize.QuadPart then begin if not ReadFile(PD^.HSrc,PD^.Buffer^,BufSize,dwNumBytes,OV) then begin ErrorCode := GetLastError; if ErrorCode<>ERROR_IO_PENDING then begin State := jsError; Exit; end else ErrorCode := 0; end; end else begin State := jsCompleted; end; end; end; end; end. |