На главную Наши проекты:
Журнал   ·   Discuz!ML   ·   Wiki   ·   DRKB   ·   Помощь проекту
ПРАВИЛА FAQ Помощь Участники Календарь Избранное RSS
msm.ru
  
    > Несколько параллельных асинхронных send()-ов/recv()-ов
      Прозвучит банально, но достаточно трудно найти авторитетную информацию в инете, которая бы дала мне полное и окончательное понимание вопроса.
      Речь идет об одном worker_thread-е и нескольких одновременных операциях чтения или записи в tcp-сокет.
      Под одновременностью я понимаю то, что делается send() или recv(), но до того, как был вызван completion handler,
      делается ещё один или несколько send() или recv(). Гарантирует ли TCP, что мои completion handler-ы будут вызваны
      строго в том порядке, в каком я выполнял операции send() и recv()?
      Пример:
      ExpandedWrap disabled
        on_connect_complete() {
          Send(s, buf1, bind(on_send_complete1));
          Send(s, buf2, bind(on_send_complete2));
          Recv(s, buf3, bind(on_recv_complete1));
          Send(s, buf2, bind(on_send_complete3));
          Recv(s, buf3, bind(on_recv_complete2));
        }


      Гарантирует ли TCP, что порядок вызова completion handler-ов будет таким же, то есть:
      on_send_complete1, on_send_complete2, on_recv_complete1, on_send_complete3, on_recv_complete2

      ?

      Если да, то гарантируется ли тоже самое в случае, если эти Send и Recv были сделаны из разных потоков?
      Внутри TCP имеет просто одну очередь запланированных (но ещё не завершенных) операций, в которой
      будут присутствовать в нужном порядке и send()-ы и recv()-ы?

      Дополнительный вопрос: бывает ли вообще хоть когда-нибудь оправдан запуск двух параллельных Recv()-ов?
      А двух параллельных Send()-ов? Как мне кажется, всегда можно разработать для приложения протокол,
      который бы подразумевал только строгую последовательность "запрос-ответ" или "сообщение-ответ".
      Параллельный send() и recv() - это понятно, это для случая, когда любая из сторон может внезапно инициировать
      последовательность "сообщение-ответ" и должна, соответственно, ждать того же самого и от другой стороны.
      Мои сетевые приложения (пример: Как сделать редиректор? ) всегда были построены на такой модели, поэтому после connect()-а/accept()-а запускался recv(), а при каком-то событии ещё запускался send_all(), но вот
      придумать зачем бы понадобилось делать параллельных чтения или параллельные записи я не могу. Это действительно
      неиспользуемая фича - параллельность асинхронных операций или есть случаи, которые я просто не знаю?

      В случае, если что-то из того, о чем я спрашиваю является platform-specific, добавлю, что я использую
      boost::asio и мне нужна кросс-платформенность, отталкиваться надо от этого.
      .
      Сообщение отредактировано: reinterpret_alexey -
        reinterpret_alexey
        Чутье мне говорит, что как минимум порядок send'ов относительно друг друга должен сохраниться. То же самое с recv'ами - они тоже не должны менять порядок, так как TCP - это по сути поток данных, переставлять в нем байты нельзя. А вот если взять один send и один recv, то они могут завершиться в произвольном порядке, если конечно они логически не связаны протоколом общения с клиентом.
          Pacific

          Ну да, это, впринципе, очевидно. Но мне интересно, зачем вообще интерфейс TCP-сокетов предоставляет возможность
          двух параллельных асинхронных recv()-ов? Какой-то он не прозрачный, этот интерфейс. Если такую возможность сделали,
          значит, логично предположить, что есть случаи, в которых без этого или совсем не обойдешься, или обойдешься только костылями.
          Но мне два параллельных recv()-а кажутся полностью бессмысленными. Зачем они нужны?
            Цитата reinterpret_alexey @
            Но мне два параллельных recv()-а кажутся полностью бессмысленными. Зачем они нужны?

            Два recv()из Беркли-сокетов не могут быть параллельными.
            В recv() всего 4 параметра, а для параллельной работы нужен режим overlapped с соответствующим указателем на overlapped структуру
            Сообщение отредактировано: Oleg2004 -
              Oleg2004

              Мне кажется или вы действительно придираетесь к словам? Ну конечно речь не об блокирующих сокетах. Речь идет об асинхронном recv() как об абстрактной операции, реализованной в boost::asio::async_read() или в винде, в конкретном случае, WSARecv().
                Цитата reinterpret_alexey @
                Мне кажется или вы действительно придираетесь к словам?

                Вы зря ищете черную кошку там где ее нет.
                К сожалению многих моих собеседников или студентов, я всегда и везде борюсь за чистоту терминологии - в сетевом программирование это имеет превалирующее значение.
                И когда кто то пишет "пакет ТСР", для меня это вопиющая безграмотность.
                Я преподаватель - и потому вынужден хош не хош делать замечания такого рода.
                Как например в вашем заголовке темы - асинхронных send/recv в интерфейсе сокетов Беркли просто нет. :)
                Так что простите мне такую манеру поведения, но исправлять я буду всегда, пока жив. :yes:
                Еще раз прошу прощения... :yes-sad:
                  Oleg2004

                  Что ж, возможно, я действительно излишне упростил термины, гонясь за краткостью. Так что насчет того, зачем вообще нужно две параллельных операции чтения? Две параллельных операции чтения поддерживаются в boost::asio, то есть это общая возможность и для Windows (WSARecv()) и в linux (epoll). Это лишнее в интерфейсе или такую возможность оставили сознательно? Если мысли на этот счет?
                    Знаете, я не настолько глубоко знаю эту тему. :(
                    Идея у меня только одна, и она состоит в том, что WSARecv()использует юниксовую идеологию Scatter & Gather - т.е. вектор буферов приема. Две параллельные операции, выполненные "параллельно" с совершенно автономными буферами могут при должной синхронизации увеличить скорость приема.
                    Это пока все мои мысли.
                    Почему то в конце "приснился" битторрент... :) там ведь все парралельно...из разных мест - но сливается все в один файл. Ну это так, рассуждение на свободную ткему.
                    Сообщение отредактировано: Oleg2004 -
                      Посмотри вот здесь:

                      ExpandedWrap disabled
                        //  Launchers
                         
                        //  listen
                         
                        UINT __cdecl datagram_listen_connection_thread(LPVOID parameter)
                        {
                            thread_listen_parameters_structure_type *local_listen_thread_parameters_structure_source = (thread_listen_parameters_structure_type *)parameter;
                         
                            if(local_listen_thread_parameters_structure_source==NULL)
                            {
                                return 0;
                            }
                         
                            {
                                void *local_listen_thread_parameters_structure = new thread_listen_parameters_structure_type;
                         
                                ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure_source)->parameter_main_dialog;
                         
                                CWinThread *local_thread = AfxBeginThread(datagram_listen_connection_thread_ip_4,local_listen_thread_parameters_structure);
                            }
                         
                            {
                                void *local_listen_thread_parameters_structure = new thread_listen_parameters_structure_type;
                         
                                ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure_source)->parameter_main_dialog;
                         
                                CWinThread *local_thread = AfxBeginThread(datagram_listen_connection_thread_ip_6,local_listen_thread_parameters_structure);
                            }
                         
                            delete parameter;
                         
                            return 1;
                        }


                      ExpandedWrap disabled
                        UINT __cdecl datagram_listen_connection_thread_ip_6(LPVOID parameter)
                        {
                            thread_listen_parameters_structure_type *local_listen_thread_parameters_structure = (thread_listen_parameters_structure_type *)parameter;
                         
                            if(local_listen_thread_parameters_structure==NULL)
                            {
                                return 0;
                            }
                         
                            Cstl_network_ip_4_ip_6_udp_engineDlg *local_main_dialog = (local_listen_thread_parameters_structure)->parameter_main_dialog;
                         
                            if(local_main_dialog==NULL)
                            {
                                delete local_listen_thread_parameters_structure;
                                return 0;
                            }
                         
                            CString local_parameter_address_local(L"::0");
                         
                            CStringA local_address_internet_address;
                            UINT local_parameter_port_number_local = 10000;
                         
                            if(network::ip_6::domain_name_to_internet_6_name(local_parameter_address_local,local_address_internet_address)==false)
                            {
                                const int local_error_message_size = 10000;
                                wchar_t local_error_message[local_error_message_size];
                         
                                const int local_system_error_message_size = local_error_message_size-1000;
                                wchar_t local_system_error_message[local_system_error_message_size];
                         
                                wcscpy_s(local_system_error_message,local_system_error_message_size,L"domain_name_to_internet_6_name завершилась неудачей");
                         
                                CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT");
                         
                                wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer());
                         
                                MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR);
                         
                                return 0;
                            }
                         
                         
                            for(;local_parameter_port_number_local<=0xFFFF;)
                            {
                                
                                network::ip_6::CBlockingSocket_ip_6 local_blocking_socket_temp;
                         
                                network::ip_6::CSockAddr_ip_6 local_socket_address_temp(local_address_internet_address,local_parameter_port_number_local);
                         
                                try
                                {
                                    local_blocking_socket_temp.Create(SOCK_DGRAM);
                         
                                    local_blocking_socket_temp.Bind(local_socket_address_temp);
                                }
                                catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception)
                                {
                                    const int local_error_message_size = 10000;
                                    wchar_t local_error_message[local_error_message_size];
                         
                                    const int local_system_error_message_size = local_error_message_size - 1000;
                                    wchar_t local_system_error_message[local_system_error_message_size];
                         
                                    CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT");
                         
                                    local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size);
                         
                                    wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer());
                                    
                                    local_blocking_socket_exception->Delete();
                                
                                    local_parameter_port_number_local++;
                         
                                    continue;
                         
                                    //MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR);
                         
                                    //return 0;
                                }
                         
                                local_blocking_socket_temp.Close();
                         
                                break;
                         
                            }
                         
                            if(local_parameter_port_number_local>0xFFFF)
                            {
                                return 0;
                            }
                         
                            network::ip_6::CBlockingSocket_ip_6 local_blocking_socket;
                         
                            network::ip_6::CSockAddr_ip_6 local_socket_address(local_address_internet_address,local_parameter_port_number_local);
                         
                            try
                            {
                                local_blocking_socket.Create(SOCK_DGRAM);
                         
                                local_blocking_socket.Bind(local_socket_address);
                            }
                            catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception)
                            {
                                const int local_error_message_size = 10000;
                                wchar_t local_error_message[local_error_message_size];
                         
                                const int local_system_error_message_size = local_error_message_size-1000;
                                wchar_t local_system_error_message[local_system_error_message_size];
                         
                                CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT");
                         
                                local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size);
                         
                                wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer());
                                    
                                local_blocking_socket_exception->Delete();
                         
                                delete local_listen_thread_parameters_structure;
                                
                                MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR);
                         
                                return 0;
                            }
                         
                         
                            char local_message_to_receive[CONST_MESSAGE_LENGTH];
                         
                            for(;;)
                            {
                                ZeroMemory(local_message_to_receive,CONST_MESSAGE_LENGTH);
                         
                                int local_bytes_received = 0;
                                    
                                network::ip_6::CSockAddr_ip_6 local_socket_address_peer;
                         
                                try
                                {
                                    local_bytes_received = local_blocking_socket.ReceiveDatagram(local_message_to_receive,CONST_MESSAGE_LENGTH,(LPSOCKADDR_IN6)&local_socket_address_peer,CONST_WAIT_TIME_LISTEN);
                         
                                    if(local_bytes_received>0)
                                    {
                                        {
                                            void *local_listen_answer_thread_parameters_structure = new thread_listen_answer_parameters_structure_type;
                         
                                            ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog;
                                            ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer = new char[local_bytes_received];
                                            ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer_size = local_bytes_received;
                         
                                            memcpy(((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer,local_message_to_receive,local_bytes_received);
                         
                                            CWinThread *local_thread = AfxBeginThread(datagram_listen_answer_connection_thread_ip_6,local_listen_answer_thread_parameters_structure);
                                        }
                                    }
                                }
                                catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception)
                                {
                                    const int local_error_message_size = 10000;
                                    wchar_t local_error_message[local_error_message_size];
                         
                                    const int local_system_error_message_size = local_error_message_size-1000;
                                    wchar_t local_system_error_message[local_system_error_message_size];
                         
                                    CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT");
                         
                                    local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size);
                         
                                    wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer());
                                    
                                    int local_error_number = local_blocking_socket_exception->GetErrorNumber();
                         
                                    local_blocking_socket_exception->Delete();
                         
                                    if(local_error_number!=0)
                                    {
                                        MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR);
                                    }
                                }
                         
                                if(local_main_dialog->get_command_threads_stop())
                                {
                                    break;
                                }
                            }
                         
                            {
                                delete local_listen_thread_parameters_structure;
                                return 1;
                            }
                        }


                      И здесь:

                      ExpandedWrap disabled
                        void Cdatagram_sendDlg::OnBnClickedButtonStartSend()
                        {
                            CString local_string_address;
                            CString local_string_port_number;
                            CString local_string_threads_number;
                            CString local_string_requests_number;
                            CString local_string_message_unicode_16;
                         
                            member_edit_address.GetWindowTextW(local_string_address);
                            member_edit_port_number.GetWindowTextW(local_string_port_number);
                            member_edit_threads_number.GetWindowTextW(local_string_threads_number);
                            member_edit_requests_number.GetWindowTextW(local_string_requests_number);
                            member_edit_message_unicode_16.GetWindowTextW(local_string_message_unicode_16);
                         
                            WORD local_port_number = (WORD)_wtoi(local_string_port_number);
                            int local_threads_number = _wtoi(local_string_threads_number);
                            WORD local_requests_number = (WORD)_wtoi(local_string_requests_number);
                         
                            USHORT local_maximum_message_length = member_maximum_message_length;
                         
                            for(int local_int_threads_number_counter=0;local_int_threads_number_counter<local_threads_number;local_int_threads_number_counter++)
                            {
                                void *local_thread_parameters_structure = new thread_parameters_structure_type;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address = local_string_address;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number = local_port_number;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_requests_number = local_requests_number;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_message_unicode_16 = local_string_message_unicode_16;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_maximum_message_length = local_maximum_message_length;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address_local = CString();
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number_local = 0;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_main_dialog = this;
                         
                                CWinThread *local_thread = AfxBeginThread(datagram_send_connection_thread,local_thread_parameters_structure);
                            }
                        }
                         
                        void Cdatagram_sendDlg::OnBnClickedButtonStartListen()
                        {
                            CString local_string_address_local;
                            CString local_string_port_number_local;
                         
                            member_edit_address_local.GetWindowTextW(local_string_address_local);
                            member_edit_port_number_local.GetWindowTextW(local_string_port_number_local);
                         
                            USHORT local_maximum_message_length = member_maximum_message_length;
                         
                            WORD local_port_number_local = (WORD)_wtoi(local_string_port_number_local);
                         
                            {
                                void *local_thread_parameters_structure = new thread_parameters_structure_type;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address = CString();
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number = 0;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_requests_number = 0;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_message_unicode_16 = CString();
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_maximum_message_length = local_maximum_message_length;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address_local = local_string_address_local;
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number_local = local_port_number_local;
                         
                                ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_main_dialog = this;
                         
                                CWinThread *local_thread = AfxBeginThread(datagram_receive_connection_thread,local_thread_parameters_structure);
                            }
                        }
                      0 пользователей читают эту тему (0 гостей и 0 скрытых пользователей)
                      0 пользователей:


                      Рейтинг@Mail.ru
                      [ Script execution time: 0,0376 ]   [ 17 queries used ]   [ Generated: 26.04.24, 06:58 GMT ]