На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
! Друзья, соблюдайте, пожалуйста, правила форума и данного раздела:
Данный раздел не предназначен для вопросов и обсуждений, он содержит FAQ-заготовки для разных языков программирования. Любой желающий может разместить здесь свою статью. Вопросы же задавайте в тематических разделах!
• Если ваша статья может быть перенесена в FAQ соответствующего раздела, при условии, что она будет оформлена в соответствии с Требованиями к оформлению статей.
• Чтобы остальным было проще понять, указывайте в описании темы (подзаголовке) название языка в [квадратных скобках]!
Модераторы: Модераторы
  
> Обёртка IOCP с примерами , [Delphi] Решил размяться на выходных
    Захотел поразмять мозги в новогодние выходные.
    Поставил себе задачу:

    1. Написать некий класс-обёртку для использования порта завершения ввода/вывода (TIOCP)
    2. Написать класс для испльзования готового TIOCP. (TJob).

    То, что получилось, выкладываю с примерами - может кому пригодится.
    Не шедевр, конечно, но работает.
    Критика приветствуется.

    Логика использования:

    1. Создаётся экземпляр класса TIOCP вместе с пулом потоков
    2. Создаётся наследник класса TJob, в котором
    - В перекрытом конструкторе открываются нужные устройства (файлы, сокеты, пайпы и пр.)
    - В реализованном абстрактном методе TJob.ExecProc реализуется логика обработки в пуле потоков
    - Основной поток получает данные о выполнении задания и может реагировать в зависимости от ситуации

    Пример пока один. Сейчас пишу второй - сервер перенаправления портов TCP, выложу чуть позже.

    Модуль с классами
    Скрытый текст

    ExpandedWrap disabled
      (*-------------------------------------------------
      Обёртка для работы с портом завершения ввода/вывода
      2012, Демо
      sources.ru
      -------------------------------------------------*)
      unit uIOCP;
       
      interface
       
      uses
        Windows,
        SysUtils,
        Messages,
        Classes,
        SyncObjs;
       
      type
       
        TIOCP=class;
       
       
      //Состояния заданий
        TJobState=(jsNone, jsProcess, jsError, jsCompleted);
       
      //  Экземпляры этого класса используются
      //для выполнения заданий с использованием IOCP
        TJob=class
        private
          FIOCP: TIOCP;                     //Ссылка на класс с IOCP
          FOV: POverlapped;                 //OVERLAPPED-структура, используемая IOCP
          FState: TJobState;
          FNumBytes: Cardinal;              //Количество байт, прочитанных/записанных
                                            //последней операцией
          FCS: TCriticalSection;            //Критическая секция для использования в TJob
          FUD: Pointer;                     //Пользовательские данные
          FErrorCode: Integer;              //Последняя ошибка
          FCodeOp: Cardinal;                //Текущий код операции
          function GetPort: THandle;
        public
          constructor Create(IOCP: TIOCP);
          destructor Destroy; override;
          procedure ExecProc; virtual; abstract;                        //Процедура, выполняемая
                                                                        //в потоках IOCP
          function Complete(Code: Cardinal): Boolean;                   //Вызывает немедленную обработку
                                                                        //в потоке
          function Associate(DeviceHandle: THandle): THandle;           //Добавление устройства в таблицу IOCP
                                                                        //Вызывает метод из TIOCP
          function CheckIOCP: Boolean;                                  //Проверка, жив ли ещё IOCP
       
          property IOPort: THandle read GetPort;
          property State: TJobState read FState write FState;
          property NumBytes: Cardinal read FNumBytes;
          property CS: TCriticalSection read FCS;
          property ErrorCode: Integer read FErrorCode write FErrorCode;
          property UserData: Pointer read FUD write FUD;
          property OV: POVERLAPPED read FOV;
          property CodeOp: Cardinal Read FCodeOp Write FCodeOp;
        end;
       
      //Класс порта завершения
        TIOCP=class
        private
          FIOCP: THandle;                   //Порт IOCP
          FThreadList: TList;               //Список потоков, которые обрабатывают порт IOCP
          FWorkThreadsCount: Integer;       //Количество рабочих потоков. Обычно равно (кол-во процессоров)
          FMaxThreadsCount: Integer;        //Общееколичество потоков. Обычно ранво (кол-во процессоров)*2
        public
          constructor Create(ThreadsCount: Cardinal=0);
          destructor Destroy; override;
          function Associate(DeviceHandle: THandle; CompKey: Cardinal): THandle; //Добавление устройства в таблицу IOCP
       
          property IOPort: THandle read FIOCP;
          property WorkThreadsCount: Integer read FWorkThreadsCount;
        end;
       
      //Потоки, связанные с IOCP
        TWorkThread=class(TThread)
        private
          FIOCP: TIOCP;
        public
          constructor Create(IOCP: TIOCP);
          procedure Execute; override;
        end;
       
      implementation
       
      { TIOCP }
       
      function TIOCP.Associate(DeviceHandle: THandle;
        CompKey: Cardinal): THandle;
      begin
        Result := CreateIoCompletionPort(DeviceHandle,FIOCP,CompKey,0);
      end;
       
      constructor TIOCP.Create(ThreadsCount: Cardinal=0);
      var
        i: Integer;
        SI: TSystemInfo;
      begin
      //Количество потоков по-умолчанию = Кол-во процессоров*2
        if ThreadsCount=0 then
        begin
          GetSystemInfo(SI);
          FWorkThreadsCount := SI.dwNumberOfProcessors;
          FMaxThreadsCount := FWorkThreadsCount*2;
        end
        else
        begin
          FWorkThreadsCount := ThreadsCount;
          FMaxThreadsCount := FWorkThreadsCount;
        end;
       
      //Сооздаём порт заввершения в/в
        FIOCP := CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,FWorkThreadsCount);
        if FIOCP=0 then raise Exception.Create(SysErrorMessage(GetLastError));
       
        FThreadList := TList.Create;
       
        for i := 1 to FMaxThreadsCount do
        begin
          FThreadList.Add(TWorkThread.Create(Self));
        end;
      end;
       
      destructor TIOCP.Destroy;
      var
        i: Integer;
      begin
      //Посылаем всем потокам команду закончить работу (CompKey=0)
        for i := 0 to FThreadList.Count-1 do PostQueuedCompletionStatus(FIOCP,0,0,nil);
      //Ожидаем завершения всех потоков
        for i := 0 to FThreadList.Count-1 do
        begin
          with TWorkThread(FThreadList[i]) do
          begin
            WaitFor;
            Free;
          end;
        end;
        CloseHandle(FIOCP);
        FThreadList.Free;
        inherited Destroy;
      end;
       
       
      { TWorkThread }
       
      constructor TWorkThread.Create(IOCP: TIOCP);
      begin
        inherited Create(True);
        FIOCP := IOCP;
        FreeOnTerminate := False;
        Resume;
      end;
       
      procedure TWorkThread.Execute;
      var
        NumBytes: Cardinal;
        CompKey: Cardinal;
        ov: POVERLAPPED;
        J: TJob;
      begin
        while not Terminated do
        begin
          if GetQueuedCompletionStatus(FIOCP.IOPort,NumBytes,CompKey,ov,INFINITE) then
          begin
            if CompKey=0 then
            begin
              Terminate;
              Continue;
            end;
      //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1
            if Compkey=1 then Continue;
      //Полученный CompKey и есть наш TJob
          J := Pointer(CompKey);
          try
            J.FNumBytes := NumBytes;
      //Выполняем обработку
            J.ExecProc;
          except
      //Возвращаем признак ошибки
            J.FState := jsError;
          end;
          end
          else
          begin
            if CompKey<>0 then
            begin
      //Возвращаем признак ошибки
              J := Pointer(CompKey);
              J.FErrorCode := GetLastError;
              J.FState := jsError;
            end;
          end;
        end;
      end;
       
      { TJob }
       
      function TJob.Associate(DeviceHandle: THandle): THandle;
      begin
        Result := FIOCP.Associate(DeviceHandle,Cardinal(Self));
      end;
       
      //Для проверки того, что IOCP жив посылаем в порт запрос с CompKey=1
      function TJob.CheckIOCP: Boolean;
      var
        Err: Integer;
      begin
        Result := True;
        if not PostQueuedCompletionStatus(FIOCP.FIOCP,0,1,nil) then
        begin
          Err := GetLastError;
          if Err=ERROR_INVALID_HANDLE then Result := False;
        end;
      end;
       
      function TJob.Complete(Code: Cardinal): Boolean;
      begin
        CodeOp := Code;
        Result := PostQueuedCompletionStatus(FIOCP.IOPort,0,Cardinal(Self),POVERLAPPED(FOV));
      end;
       
      constructor TJob.Create(IOCP: TIOCP);
      begin
        FIOCP := IOCP;
        FState := jsNone;
        FCS := TCriticalSection.Create;
        New(FOV);
        FOV^.Internal := 0;
        FOV^.InternalHigh :=0;
        FOV^.Offset := 0;
        FOV^.OffsetHigh := 0;
      //При необходимости выполнения операции с устройством без передачи IOCP
      //перед выполнением операции устанавливаем младший бит hEvent в 1:
      //     FOV^.hEvent := FOV^.hEvent or 1;
        FOV^.hEvent := CreateEvent(nil,True,False,nil);
      end;
       
      destructor TJob.Destroy;
      begin
        CloseHandle(FOV^.hEvent);
        Dispose(FOV);
        FCS.Free;
        inherited;
      end;
       
      function TJob.GetPort: THandle;
      begin
        Result := FIOCP.FIOCP;
      end;
       
      end.




    Пример с копированием файлов:

    Скрытый текст

    ExpandedWrap disabled
      unit Unit1;
       
      interface
       
      uses
        Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
        Dialogs, StdCtrls, uIOCP, ExtCtrls;
       
      const
       
      //Коды операций
        coRead    = 1;
        coWrite   = 2;
       
        BufSize=64*1024;                                  //Размер буфера чтения/записи
       
      type
       
        TForm1 = class(TForm)
          Button1: TButton;
          Button2: TButton;
          Label1: TLabel;
          Timer1: TTimer;
          procedure Button1Click(Sender: TObject);
          procedure Button2Click(Sender: TObject);
          procedure Timer1Timer(Sender: TObject);
        private
          { Private declarations }
        public
          { Public declarations }
        end;
       
      //Наследник TJob
      //В этом классе реализована вся логика по копированию файла
        TMyJob=class(TJob)
          constructor Create(IOCP: TIOCP;const SrcPath, DestPath: String);
          destructor Destroy; override;
          procedure ExecProc; override;
        end;
       
      //Пользовательские данные
        PMyData=^TMyData;
        TMyData=record
          HSrc: THandle;                                  //Дескрипторы входного
          HDest: THandle;                                 //и выходного файлов
          FileSize: LARGE_INTEGER;                        //Размер исходного файла
          CopiedSize: LARGE_INTEGER;                      //Текщий размер скопированных данных
          Buffer: PByte;                                  //Буфер для копирования
        end;
       
      var
        Form1: TForm1;
        IOCP: TIOCP=nil;                                  //Экземпляр класса IOCP
        Job: TMyJob=nil;                                  //Задание для копирования
       
      //Функция получения размера файла
      function GetFileSizeEx(hFile: THandle; out lpFileSize: LARGE_INTEGER): BOOL; stdcall; external 'kernel32.dll';
       
      implementation
       
      {$R *.dfm}
       
      procedure TForm1.Button1Click(Sender: TObject);
      begin
      //Создаём экземпляр класса IOCP вместе с пулом потоков
        IOCP := TIOCP.Create;
       
      //В констркторе открываем входной и выходной файлы, инициализируем структуры
        Job:= TMyJob.Create(IOCP,'d:\temp\temp.iso', 'd:\temp\temp_.iso');
       
      //Связываем файлы в IOCP
        Job.Associate(PMyData(JOb.UserData)^.HSrc);
        Job.Associate(PMyData(JOb.UserData)^.HDest);
      //Фиктивный вызов IOCP для запуска проыесса копирования
        Job.Complete(coWrite);
      end;
       
      procedure TForm1.Button2Click(Sender: TObject);
      begin
      //Освобождение всех потоков и IOCP
        FreeAndNil(IOCP);
      end;
       
      procedure TForm1.Timer1Timer(Sender: TObject);
      var
        PD: PMyData;
        CopiedSize: Int64;
      begin
        (Sender as TTimer).Enabled := False;
        if Assigned(Job) then
        begin
      //Пользовательские данные заполняются в TMyJob.ExecProc
          PD := Job.UserData;
          Job.CS.Enter;                                   //Синхронизация доступа из разных потоков
          try
            CopiedSize := PD^.CopiedSize.QuadPart;
          finally
            Job.CS.Leave;
          end;
          Label1.Caption := 'Скопировано '+FormatFloat('#,##',CopiedSize);
          if Job.State = jsCompleted then
          begin
            Label1.Caption := 'Копирование закончено, скопировано '+FormatFloat('#,##',CopiedSize);
            FreeAndNil(Job);
          end
          else
          begin
      //Жив ли порт завершения в/в?
            if not Job.CheckIOCP then
            begin
              Label1.Caption := 'Копирование закончено (ошибка IOCP), скопировано '+FormatFloat('#,##',CopiedSize);
              FreeAndNil(Job);
            end;
          end;
       
        end;
        (Sender as TTimer).Enabled := True;
      end;
       
       
      { TMyJob }
       
      constructor TMyJob.Create(IOCP: TIOCP;const SrcPath, DestPath: String);
      var
        PD: PMyData;
      begin
        inherited Create(IOCP);
        New(PD);
        UserData :=PD;
        PD^.Buffer := nil;
       
      //VirtualAlloc используется для того, чтобы страницы памяти были выровнены по границе 64К
        PD^.Buffer := VirtualAlloc(nil,BufSize,MEM_COMMIT,PAGE_READWRITE);
       
        PD^.HSrc :=
          CreateFile(PChar(SrcPath),
          GENERIC_READ,
          FILE_SHARE_READ,
          nil,
          OPEN_EXISTING,
          FILE_FLAG_OVERLAPPED,
          0);
       
        if not GetFileSizeEx(PD^.HSrc, PD^.FileSize) then raise Exception.Create(SysErrorMessage(GetLastError));
       
        PD^.HDest :=
            CreateFile(PChar(DestPath),
            GENERIC_WRITE,
            0,
            nil,
            CREATE_ALWAYS,
            FILE_FLAG_OVERLAPPED,
            0);
       
        if PD^.HDest=INVALID_HANDLE_VALUE then
        begin
          raise Exception.Create(SysErrorMessage(GetLastError));
        end;
        PD^.CopiedSize.QuadPart := 0;
       
      end;
       
      destructor TMyJob.Destroy;
      begin
        if PMyData(UserData)^.Buffer<>nil
          then VirtualFree(PMyData(UserData)^.Buffer,0,MEM_RELEASE);
        if PMyData(UserData)^.HSrc<>0 then CloseHandle(PMyData(UserData)^.HSrc);
        if PMyData(UserData)^.HDest<>0 then CloseHandle(PMyData(UserData)^.HDest);
         Dispose(PMyData(UserData));
        inherited;
      end;
       
      //Эта процедура вызывается при обработке завершения операции ввода/вывода в пуле потоков
      procedure TMyJob.ExecProc;
      var
        PD: PMyData;
        dwNumBytes: Cardinal;
      begin
        PD := UserData;
       
      //  if (State=jsError) or (State=jsCompleted) then Exit;
       
        case CodeOp of
          coRead:                                         //Закончена операция чтения
            begin
              CodeOp := coWrite;                          //Следующая операия - запись
              if not WriteFile(PD^.HDest,PD^.Buffer^,NumBytes,dwNumBytes,OV) then
              begin
                ErrorCode := GetLastError;
                if ErrorCode<>ERROR_IO_PENDING then       //Операция жолжна быть поставлена в очередь
                begin
                  State := jsError;
                  Exit;
                end
                else ErrorCode := 0;
              end;
            end;
          coWrite:                                        //Закончена операция записи
            begin
              CodeOp := coRead;                           //Следующая операия - чтение
              CS.Enter;
              try
      //В пользовательских данных ведём подсчёт уже записанного объёма
                PD^.CopiedSize.QuadPart := PD^.CopiedSize.QuadPart + NumBytes;
              finally
                CS.Leave;
              end;
       
              OV^.Offset := PD^.CopiedSize.LowPart;
              OV^.OffsetHigh := PD^.CopiedSize.HighPart;
              if PD^.CopiedSize.QuadPart<PD^.FileSize.QuadPart then
              begin
                if not ReadFile(PD^.HSrc,PD^.Buffer^,BufSize,dwNumBytes,OV) then
                begin
                  ErrorCode := GetLastError;
                  if ErrorCode<>ERROR_IO_PENDING then
                  begin
                    State := jsError;
                    Exit;
                  end
                  else ErrorCode := 0;
                end;
              end
              else
              begin
                State := jsCompleted;
              end;
            end;
        end;
      end;
       
      end.

    Сообщение отредактировано: Демо -
    0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
    0 пользователей:


    Рейтинг@Mail.ru
    [ Script execution time: 0,0304 ]   [ 16 queries used ]   [ Generated: 19.03.24, 11:00 GMT ]