Версия для печати
Нажмите сюда для просмотра этой темы в оригинальном формате
Форум на Исходниках.RU > Все языки: Статьи, заготовки в FAQ > Обёртка IOCP с примерами


Автор: Демо 05.01.12, 23:17
Захотел поразмять мозги в новогодние выходные.
Поставил себе задачу:

1. Написать некий класс-обёртку для использования порта завершения ввода/вывода (TIOCP)
2. Написать класс для испльзования готового TIOCP. (TJob).

То, что получилось, выкладываю с примерами - может кому пригодится.
Не шедевр, конечно, но работает.
Критика приветствуется.

Логика использования:

1. Создаётся экземпляр класса TIOCP вместе с пулом потоков
2. Создаётся наследник класса TJob, в котором
- В перекрытом конструкторе открываются нужные устройства (файлы, сокеты, пайпы и пр.)
- В реализованном абстрактном методе TJob.ExecProc реализуется логика обработки в пуле потоков
- Основной поток получает данные о выполнении задания и может реагировать в зависимости от ситуации

Пример пока один. Сейчас пишу второй - сервер перенаправления портов TCP, выложу чуть позже.

Модуль с классами
Скрытый текст

<{CODE_COLLAPSE_OFF}><{CODE_WRAP_OFF}>
    (*-------------------------------------------------
    Обёртка для работы с портом завершения ввода/вывода
    2012, Демо
    sources.ru
    -------------------------------------------------*)
    unit uIOCP;
     
    interface
     
    uses
      Windows,
      SysUtils,
      Messages,
      Classes,
      SyncObjs;
     
    type
     
      TIOCP=class;
     
     
    //Состояния заданий
      TJobState=(jsNone, jsProcess, jsError, jsCompleted);
     
    //  Экземпляры этого класса используются
    //для выполнения заданий с использованием IOCP
      TJob=class
      private
        FIOCP: TIOCP;                     //Ссылка на класс с IOCP
        FOV: POverlapped;                 //OVERLAPPED-структура, используемая IOCP
        FState: TJobState;
        FNumBytes: Cardinal;              //Количество байт, прочитанных/записанных
                                          //последней операцией
        FCS: TCriticalSection;            //Критическая секция для использования в TJob
        FUD: Pointer;                     //Пользовательские данные
        FErrorCode: Integer;              //Последняя ошибка
        FCodeOp: Cardinal;                //Текущий код операции
        function GetPort: THandle;
      public
        constructor Create(IOCP: TIOCP);
        destructor Destroy; override;
        procedure ExecProc; virtual; abstract;                        //Процедура, выполняемая
                                                                      //в потоках IOCP
        function Complete(Code: Cardinal): Boolean;                   //Вызывает немедленную обработку
                                                                      //в потоке
        function Associate(DeviceHandle: THandle): THandle;           //Добавление устройства в таблицу IOCP
                                                                      //Вызывает метод из TIOCP
        function CheckIOCP: Boolean;                                  //Проверка, жив ли ещё IOCP
     
        property IOPort: THandle read GetPort;
        property State: TJobState read FState write FState;
        property NumBytes: Cardinal read FNumBytes;
        property CS: TCriticalSection read FCS;
        property ErrorCode: Integer read FErrorCode write FErrorCode;
        property UserData: Pointer read FUD write FUD;
        property OV: POVERLAPPED read FOV;
        property CodeOp: Cardinal Read FCodeOp Write FCodeOp;
      end;
     
    //Класс порта завершения
      TIOCP=class
      private
        FIOCP: THandle;                   //Порт IOCP
        FThreadList: TList;               //Список потоков, которые обрабатывают порт IOCP
        FWorkThreadsCount: Integer;       //Количество рабочих потоков. Обычно равно (кол-во процессоров)
        FMaxThreadsCount: Integer;        //Общееколичество потоков. Обычно ранво (кол-во процессоров)*2
      public
        constructor Create(ThreadsCount: Cardinal=0);
        destructor Destroy; override;
        function Associate(DeviceHandle: THandle; CompKey: Cardinal): THandle; //Добавление устройства в таблицу IOCP
     
        property IOPort: THandle read FIOCP;
        property WorkThreadsCount: Integer read FWorkThreadsCount;
      end;
     
    //Потоки, связанные с IOCP
      TWorkThread=class(TThread)
      private
        FIOCP: TIOCP;
      public
        constructor Create(IOCP: TIOCP);
        procedure Execute; override;
      end;
     
    implementation
     
    { TIOCP }
     
    function TIOCP.Associate(DeviceHandle: THandle;
      CompKey: Cardinal): THandle;
    begin
      Result := CreateIoCompletionPort(DeviceHandle,FIOCP,CompKey,0);
    end;
     
    constructor TIOCP.Create(ThreadsCount: Cardinal=0);
    var
      i: Integer;
      SI: TSystemInfo;
    begin
    //Количество потоков по-умолчанию = Кол-во процессоров*2
      if ThreadsCount=0 then
      begin
        GetSystemInfo(SI);
        FWorkThreadsCount := SI.dwNumberOfProcessors;
        FMaxThreadsCount := FWorkThreadsCount*2;
      end
      else
      begin
        FWorkThreadsCount := ThreadsCount;
        FMaxThreadsCount := FWorkThreadsCount;
      end;
     
    //Сооздаём порт заввершения в/в
      FIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,FWorkThreadsCount);
      if FIOCP=0 then raise Exception.Create(SysErrorMessage(GetLastError));
     
      FThreadList := TList.Create;
     
      for i := 1 to FMaxThreadsCount do
      begin
        FThreadList.Add(TWorkThread.Create(Self));
      end;
    end;
     
    destructor TIOCP.Destroy;
    var
      i: Integer;
    begin
    //Посылаем всем потокам команду закончить работу (CompKey=0)
      for i := 0 to FThreadList.Count-1 do PostQueuedCompletionStatus(FIOCP,0,0,nil);
    //Ожидаем завершения всех потоков
      for i := 0 to FThreadList.Count-1 do
      begin
        with TWorkThread(FThreadList[i]) do
        begin
          WaitFor;
          Free;
        end;
      end;
      CloseHandle(FIOCP);
      FThreadList.Free;
      inherited Destroy;
    end;
     
     
    { TWorkThread }
     
    constructor TWorkThread.Create(IOCP: TIOCP);
    begin
      inherited Create(True);
      FIOCP := IOCP;
      FreeOnTerminate := False;
      Resume;
    end;
     
    procedure TWorkThread.Execute;
    var
      NumBytes: Cardinal;
      CompKey: Cardinal;
      ov: POVERLAPPED;
      J: TJob;
    begin
      while not Terminated do
      begin
        if GetQueuedCompletionStatus(FIOCP.IOPort,NumBytes,CompKey,ov,INFINITE) then
        begin
          if CompKey=0 then
          begin
            Terminate;
            Continue;
          end;
    //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1
          if Compkey=1 then Continue;
    //Полученный CompKey и есть наш TJob
        J := Pointer(CompKey);
        try
          J.FNumBytes := NumBytes;
    //Выполняем обработку
          J.ExecProc;
        except
    //Возвращаем признак ошибки
          J.FState := jsError;
        end;
        end
        else
        begin
          if CompKey<>0 then
          begin
    //Возвращаем признак ошибки
            J := Pointer(CompKey);
            J.FErrorCode := GetLastError;
            J.FState := jsError;
          end;
        end;
      end;
    end;
     
    { TJob }
     
    function TJob.Associate(DeviceHandle: THandle): THandle;
    begin
      Result := FIOCP.Associate(DeviceHandle,Cardinal(Self));
    end;
     
    //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1
    function TJob.CheckIOCP: Boolean;
    var
      Err: Integer;
    begin
      Result := True;
      if not PostQueuedCompletionStatus(FIOCP.FIOCP,0,1,nil) then
      begin
        Err := GetLastError;
        if Err=ERROR_INVALID_HANDLE then Result := False;
      end;
    end;
     
    function TJob.Complete(Code: Cardinal): Boolean;
    begin
      CodeOp := Code;
      Result := PostQueuedCompletionStatus(FIOCP.IOPort,0,Cardinal(Self),POVERLAPPED(FOV));
    end;
     
    constructor TJob.Create(IOCP: TIOCP);
    begin
      FIOCP := IOCP;
      FState := jsNone;
      FCS := TCriticalSection.Create;
      New(FOV);
      FOV^.Internal := 0;
      FOV^.InternalHigh :=0;
      FOV^.Offset := 0;
      FOV^.OffsetHigh := 0;
    //При необходимости выполнения операции с устройством без передачи IOCP
    //перед выполнением операции устанавливаем младший бит hEvent в 1:
    //     FOV^.hEvent := FOV^.hEvent or 1;
      FOV^.hEvent := CreateEvent(nil,True,False,nil);
    end;
     
    destructor TJob.Destroy;
    begin
      CloseHandle(FOV^.hEvent);
      Dispose(FOV);
      FCS.Free;
      inherited;
    end;
     
    function TJob.GetPort: THandle;
    begin
      Result := FIOCP.FIOCP;
    end;
     
    end.




Пример с копированием файлов:

Скрытый текст

<{CODE_COLLAPSE_OFF}><{CODE_WRAP_OFF}>
    unit Unit1;
     
    interface
     
    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls, uIOCP, ExtCtrls;
     
    const
     
    //Коды операций
      coRead    = 1;
      coWrite   = 2;
     
      BufSize=64*1024;                                  //Размер буфера чтения/записи
     
    type
     
      TForm1 = class(TForm)
        Button1: TButton;
        Button2: TButton;
        Label1: TLabel;
        Timer1: TTimer;
        procedure Button1Click(Sender: TObject);
        procedure Button2Click(Sender: TObject);
        procedure Timer1Timer(Sender: TObject);
      private
        { Private declarations }
      public
        { Public declarations }
      end;
     
    //Наследник TJob
    //В этом классе реализована вся логика по копированию файла
      TMyJob=class(TJob)
        constructor Create(IOCP: TIOCP;const SrcPath, DestPath: String);
        destructor Destroy; override;
        procedure ExecProc; override;
      end;
     
    //Пользовательские данные
      PMyData=^TMyData;
      TMyData=record
        HSrc: THandle;                                  //Дескрипторы входного
        HDest: THandle;                                 //и выходного файлов
        FileSize: LARGE_INTEGER;                        //Размер исходного файла
        CopiedSize: LARGE_INTEGER;                      //Текщий размер скопированных данных
        Buffer: PByte;                                  //Буфер для копирования
      end;
     
    var
      Form1: TForm1;
      IOCP: TIOCP=nil;                                  //Экземпляр класса IOCP
      Job: TMyJob=nil;                                  //Задание для копирования
     
    //Функция получения размера файла
    function GetFileSizeEx(hFile: THandle; out lpFileSize: LARGE_INTEGER): BOOL; stdcall; external 'kernel32.dll';
     
    implementation
     
    {$R *.dfm}
     
    procedure TForm1.Button1Click(Sender: TObject);
    begin
    //Создаём экземпляр класса IOCP вместе с пулом потоков
      IOCP := TIOCP.Create;
     
    //В констркторе открываем входной и выходной файлы, инициализируем структуры
      Job:= TMyJob.Create(IOCP,'d:\temp\temp.iso', 'd:\temp\temp_.iso');
     
    //Связываем файлы в IOCP
      Job.Associate(PMyData(JOb.UserData)^.HSrc);
      Job.Associate(PMyData(JOb.UserData)^.HDest);
    //Фиктивный вызов IOCP для запуска проыесса копирования
      Job.Complete(coWrite);
    end;
     
    procedure TForm1.Button2Click(Sender: TObject);
    begin
    //Освобождение всех потоков и IOCP
      FreeAndNil(IOCP);
    end;
     
    procedure TForm1.Timer1Timer(Sender: TObject);
    var
      PD: PMyData;
      CopiedSize: Int64;
    begin
      (Sender as TTimer).Enabled := False;
      if Assigned(Job) then
      begin
    //Пользовательские данные заполняются в TMyJob.ExecProc
        PD := Job.UserData;
        Job.CS.Enter;                                   //Синхронизация доступа из разных потоков
        try
          CopiedSize := PD^.CopiedSize.QuadPart;
        finally
          Job.CS.Leave;
        end;
        Label1.Caption := 'Скопировано '+FormatFloat('#,##',CopiedSize);
        if Job.State = jsCompleted then
        begin
          Label1.Caption := 'Копирование закончено, скопировано '+FormatFloat('#,##',CopiedSize);
          FreeAndNil(Job);
        end
        else
        begin
    //Жив ли порт завершения в/в?
          if not Job.CheckIOCP then
          begin
            Label1.Caption := 'Копирование закончено (ошибка IOCP), скопировано '+FormatFloat('#,##',CopiedSize);
            FreeAndNil(Job);
          end;
        end;
     
      end;
      (Sender as TTimer).Enabled := True;
    end;
     
     
    { TMyJob }
     
    constructor TMyJob.Create(IOCP: TIOCP;const SrcPath, DestPath: String);
    var
      PD: PMyData;
    begin
      inherited Create(IOCP);
      New(PD);
      UserData :=PD;
      PD^.Buffer := nil;
     
    //VirtualAlloc используется для того, чтобы страницы памяти были выровнены по границе 64К
      PD^.Buffer := VirtualAlloc(nil,BufSize,MEM_COMMIT,PAGE_READWRITE);
     
      PD^.HSrc :=
        CreateFile(PChar(SrcPath),
        GENERIC_READ,
        FILE_SHARE_READ,
        nil,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        0);
     
      if not GetFileSizeEx(PD^.HSrc, PD^.FileSize) then raise Exception.Create(SysErrorMessage(GetLastError));
     
      PD^.HDest :=
          CreateFile(PChar(DestPath),
          GENERIC_WRITE,
          0,
          nil,
          CREATE_ALWAYS,
          FILE_FLAG_OVERLAPPED,
          0);
     
      if PD^.HDest=INVALID_HANDLE_VALUE then
      begin
        raise Exception.Create(SysErrorMessage(GetLastError));
      end;
      PD^.CopiedSize.QuadPart := 0;
     
    end;
     
    destructor TMyJob.Destroy;
    begin
      if PMyData(UserData)^.Buffer<>nil
        then VirtualFree(PMyData(UserData)^.Buffer,0,MEM_RELEASE);
      if PMyData(UserData)^.HSrc<>0 then CloseHandle(PMyData(UserData)^.HSrc);
      if PMyData(UserData)^.HDest<>0 then CloseHandle(PMyData(UserData)^.HDest);
       Dispose(PMyData(UserData));
      inherited;
    end;
     
    //Эта процедура вызывается при обработке завершения операции ввода/вывода в пуле потоков
    procedure TMyJob.ExecProc;
    var
      PD: PMyData;
      dwNumBytes: Cardinal;
    begin
      PD := UserData;
     
    //  if (State=jsError) or (State=jsCompleted) then Exit;
     
      case CodeOp of
        coRead:                                         //Закончена операция чтения
          begin
            CodeOp := coWrite;                          //Следующая операия - запись
            if not WriteFile(PD^.HDest,PD^.Buffer^,NumBytes,dwNumBytes,OV) then
            begin
              ErrorCode := GetLastError;
              if ErrorCode<>ERROR_IO_PENDING then       //Операция жолжна быть поставлена в очередь
              begin
                State := jsError;
                Exit;
              end
              else ErrorCode := 0;
            end;
          end;
        coWrite:                                        //Закончена операция записи
          begin
            CodeOp := coRead;                           //Следующая операия - чтение
            CS.Enter;
            try
    //В пользовательских данных ведём подсчёт уже записанного объёма
              PD^.CopiedSize.QuadPart := PD^.CopiedSize.QuadPart + NumBytes;
            finally
              CS.Leave;
            end;
     
            OV^.Offset := PD^.CopiedSize.LowPart;
            OV^.OffsetHigh := PD^.CopiedSize.HighPart;
            if PD^.CopiedSize.QuadPart<PD^.FileSize.QuadPart then
            begin
              if not ReadFile(PD^.HSrc,PD^.Buffer^,BufSize,dwNumBytes,OV) then
              begin
                ErrorCode := GetLastError;
                if ErrorCode<>ERROR_IO_PENDING then
                begin
                  State := jsError;
                  Exit;
                end
                else ErrorCode := 0;
              end;
            end
            else
            begin
              State := jsCompleted;
            end;
          end;
      end;
    end;
     
    end.


Powered by Invision Power Board (https://www.invisionboard.com)
© Invision Power Services (https://www.invisionpower.com)