Наши проекты:
Журнал · Discuz!ML · Wiki · DRKB · Помощь проекту |
||
ПРАВИЛА | FAQ | Помощь | Поиск | Участники | Календарь | Избранное | RSS |
[3.149.233.72] |
|
Сообщ.
#1
,
|
|
|
Прозвучит банально, но достаточно трудно найти авторитетную информацию в инете, которая бы дала мне полное и окончательное понимание вопроса.
Речь идет об одном worker_thread-е и нескольких одновременных операциях чтения или записи в tcp-сокет. Под одновременностью я понимаю то, что делается send() или recv(), но до того, как был вызван completion handler, делается ещё один или несколько send() или recv(). Гарантирует ли TCP, что мои completion handler-ы будут вызваны строго в том порядке, в каком я выполнял операции send() и recv()? Пример: on_connect_complete() { Send(s, buf1, bind(on_send_complete1)); Send(s, buf2, bind(on_send_complete2)); Recv(s, buf3, bind(on_recv_complete1)); Send(s, buf2, bind(on_send_complete3)); Recv(s, buf3, bind(on_recv_complete2)); } Гарантирует ли TCP, что порядок вызова completion handler-ов будет таким же, то есть: on_send_complete1, on_send_complete2, on_recv_complete1, on_send_complete3, on_recv_complete2 ? Если да, то гарантируется ли тоже самое в случае, если эти Send и Recv были сделаны из разных потоков? Внутри TCP имеет просто одну очередь запланированных (но ещё не завершенных) операций, в которой будут присутствовать в нужном порядке и send()-ы и recv()-ы? Дополнительный вопрос: бывает ли вообще хоть когда-нибудь оправдан запуск двух параллельных Recv()-ов? А двух параллельных Send()-ов? Как мне кажется, всегда можно разработать для приложения протокол, который бы подразумевал только строгую последовательность "запрос-ответ" или "сообщение-ответ". Параллельный send() и recv() - это понятно, это для случая, когда любая из сторон может внезапно инициировать последовательность "сообщение-ответ" и должна, соответственно, ждать того же самого и от другой стороны. Мои сетевые приложения (пример: Как сделать редиректор? ) всегда были построены на такой модели, поэтому после connect()-а/accept()-а запускался recv(), а при каком-то событии ещё запускался send_all(), но вот придумать зачем бы понадобилось делать параллельных чтения или параллельные записи я не могу. Это действительно неиспользуемая фича - параллельность асинхронных операций или есть случаи, которые я просто не знаю? В случае, если что-то из того, о чем я спрашиваю является platform-specific, добавлю, что я использую boost::asio и мне нужна кросс-платформенность, отталкиваться надо от этого.. |
Сообщ.
#2
,
|
|
|
reinterpret_alexey
Чутье мне говорит, что как минимум порядок send'ов относительно друг друга должен сохраниться. То же самое с recv'ами - они тоже не должны менять порядок, так как TCP - это по сути поток данных, переставлять в нем байты нельзя. А вот если взять один send и один recv, то они могут завершиться в произвольном порядке, если конечно они логически не связаны протоколом общения с клиентом. |
Сообщ.
#3
,
|
|
|
Pacific
Ну да, это, впринципе, очевидно. Но мне интересно, зачем вообще интерфейс TCP-сокетов предоставляет возможность двух параллельных асинхронных recv()-ов? Какой-то он не прозрачный, этот интерфейс. Если такую возможность сделали, значит, логично предположить, что есть случаи, в которых без этого или совсем не обойдешься, или обойдешься только костылями. Но мне два параллельных recv()-а кажутся полностью бессмысленными. Зачем они нужны? |
Сообщ.
#4
,
|
|
|
Цитата reinterpret_alexey @ Но мне два параллельных recv()-а кажутся полностью бессмысленными. Зачем они нужны? Два recv()из Беркли-сокетов не могут быть параллельными. В recv() всего 4 параметра, а для параллельной работы нужен режим overlapped с соответствующим указателем на overlapped структуру |
Сообщ.
#5
,
|
|
|
Oleg2004
Мне кажется или вы действительно придираетесь к словам? Ну конечно речь не об блокирующих сокетах. Речь идет об асинхронном recv() как об абстрактной операции, реализованной в boost::asio::async_read() или в винде, в конкретном случае, WSARecv(). |
Сообщ.
#6
,
|
|
|
Цитата reinterpret_alexey @ Мне кажется или вы действительно придираетесь к словам? Вы зря ищете черную кошку там где ее нет. К сожалению многих моих собеседников или студентов, я всегда и везде борюсь за чистоту терминологии - в сетевом программирование это имеет превалирующее значение. И когда кто то пишет "пакет ТСР", для меня это вопиющая безграмотность. Я преподаватель - и потому вынужден хош не хош делать замечания такого рода. Как например в вашем заголовке темы - асинхронных send/recv в интерфейсе сокетов Беркли просто нет. Так что простите мне такую манеру поведения, но исправлять я буду всегда, пока жив. Еще раз прошу прощения... |
Сообщ.
#7
,
|
|
|
Oleg2004
Что ж, возможно, я действительно излишне упростил термины, гонясь за краткостью. Так что насчет того, зачем вообще нужно две параллельных операции чтения? Две параллельных операции чтения поддерживаются в boost::asio, то есть это общая возможность и для Windows (WSARecv()) и в linux (epoll). Это лишнее в интерфейсе или такую возможность оставили сознательно? Если мысли на этот счет? |
Сообщ.
#8
,
|
|
|
Знаете, я не настолько глубоко знаю эту тему.
Идея у меня только одна, и она состоит в том, что WSARecv()использует юниксовую идеологию Scatter & Gather - т.е. вектор буферов приема. Две параллельные операции, выполненные "параллельно" с совершенно автономными буферами могут при должной синхронизации увеличить скорость приема. Это пока все мои мысли. Почему то в конце "приснился" битторрент... там ведь все парралельно...из разных мест - но сливается все в один файл. Ну это так, рассуждение на свободную ткему. |
Сообщ.
#9
,
|
|
|
Посмотри вот здесь:
// Launchers // listen UINT __cdecl datagram_listen_connection_thread(LPVOID parameter) { thread_listen_parameters_structure_type *local_listen_thread_parameters_structure_source = (thread_listen_parameters_structure_type *)parameter; if(local_listen_thread_parameters_structure_source==NULL) { return 0; } { void *local_listen_thread_parameters_structure = new thread_listen_parameters_structure_type; ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure_source)->parameter_main_dialog; CWinThread *local_thread = AfxBeginThread(datagram_listen_connection_thread_ip_4,local_listen_thread_parameters_structure); } { void *local_listen_thread_parameters_structure = new thread_listen_parameters_structure_type; ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure_source)->parameter_main_dialog; CWinThread *local_thread = AfxBeginThread(datagram_listen_connection_thread_ip_6,local_listen_thread_parameters_structure); } delete parameter; return 1; } UINT __cdecl datagram_listen_connection_thread_ip_6(LPVOID parameter) { thread_listen_parameters_structure_type *local_listen_thread_parameters_structure = (thread_listen_parameters_structure_type *)parameter; if(local_listen_thread_parameters_structure==NULL) { return 0; } Cstl_network_ip_4_ip_6_udp_engineDlg *local_main_dialog = (local_listen_thread_parameters_structure)->parameter_main_dialog; if(local_main_dialog==NULL) { delete local_listen_thread_parameters_structure; return 0; } CString local_parameter_address_local(L"::0"); CStringA local_address_internet_address; UINT local_parameter_port_number_local = 10000; if(network::ip_6::domain_name_to_internet_6_name(local_parameter_address_local,local_address_internet_address)==false) { const int local_error_message_size = 10000; wchar_t local_error_message[local_error_message_size]; const int local_system_error_message_size = local_error_message_size-1000; wchar_t local_system_error_message[local_system_error_message_size]; wcscpy_s(local_system_error_message,local_system_error_message_size,L"domain_name_to_internet_6_name завершилась неудачей"); CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT"); wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer()); MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR); return 0; } for(;local_parameter_port_number_local<=0xFFFF;) { network::ip_6::CBlockingSocket_ip_6 local_blocking_socket_temp; network::ip_6::CSockAddr_ip_6 local_socket_address_temp(local_address_internet_address,local_parameter_port_number_local); try { local_blocking_socket_temp.Create(SOCK_DGRAM); local_blocking_socket_temp.Bind(local_socket_address_temp); } catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception) { const int local_error_message_size = 10000; wchar_t local_error_message[local_error_message_size]; const int local_system_error_message_size = local_error_message_size - 1000; wchar_t local_system_error_message[local_system_error_message_size]; CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT"); local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size); wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer()); local_blocking_socket_exception->Delete(); local_parameter_port_number_local++; continue; //MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR); //return 0; } local_blocking_socket_temp.Close(); break; } if(local_parameter_port_number_local>0xFFFF) { return 0; } network::ip_6::CBlockingSocket_ip_6 local_blocking_socket; network::ip_6::CSockAddr_ip_6 local_socket_address(local_address_internet_address,local_parameter_port_number_local); try { local_blocking_socket.Create(SOCK_DGRAM); local_blocking_socket.Bind(local_socket_address); } catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception) { const int local_error_message_size = 10000; wchar_t local_error_message[local_error_message_size]; const int local_system_error_message_size = local_error_message_size-1000; wchar_t local_system_error_message[local_system_error_message_size]; CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT"); local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size); wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer()); local_blocking_socket_exception->Delete(); delete local_listen_thread_parameters_structure; MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR); return 0; } char local_message_to_receive[CONST_MESSAGE_LENGTH]; for(;;) { ZeroMemory(local_message_to_receive,CONST_MESSAGE_LENGTH); int local_bytes_received = 0; network::ip_6::CSockAddr_ip_6 local_socket_address_peer; try { local_bytes_received = local_blocking_socket.ReceiveDatagram(local_message_to_receive,CONST_MESSAGE_LENGTH,(LPSOCKADDR_IN6)&local_socket_address_peer,CONST_WAIT_TIME_LISTEN); if(local_bytes_received>0) { { void *local_listen_answer_thread_parameters_structure = new thread_listen_answer_parameters_structure_type; ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_main_dialog = ((thread_listen_parameters_structure_type*)local_listen_thread_parameters_structure)->parameter_main_dialog; ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer = new char[local_bytes_received]; ((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer_size = local_bytes_received; memcpy(((thread_listen_answer_parameters_structure_type*)local_listen_answer_thread_parameters_structure)->parameter_buffer,local_message_to_receive,local_bytes_received); CWinThread *local_thread = AfxBeginThread(datagram_listen_answer_connection_thread_ip_6,local_listen_answer_thread_parameters_structure); } } } catch(network::ip_6::CBlockingSocketException_ip_6 *local_blocking_socket_exception) { const int local_error_message_size = 10000; wchar_t local_error_message[local_error_message_size]; const int local_system_error_message_size = local_error_message_size-1000; wchar_t local_system_error_message[local_system_error_message_size]; CString local_time_string = CTime::GetCurrentTime().FormatGmt("%d/%m/%y %H:%M:%S GMT"); local_blocking_socket_exception->GetErrorMessage(local_system_error_message,local_system_error_message_size); wsprintf((wchar_t*)local_error_message, L"Сетевая ошибка -- %s -- %s\r\n", local_system_error_message, local_time_string.GetBuffer()); int local_error_number = local_blocking_socket_exception->GetErrorNumber(); local_blocking_socket_exception->Delete(); if(local_error_number!=0) { MessageBox(0,local_error_message,CString(L"Error"),MB_ICONERROR); } } if(local_main_dialog->get_command_threads_stop()) { break; } } { delete local_listen_thread_parameters_structure; return 1; } } И здесь: void Cdatagram_sendDlg::OnBnClickedButtonStartSend() { CString local_string_address; CString local_string_port_number; CString local_string_threads_number; CString local_string_requests_number; CString local_string_message_unicode_16; member_edit_address.GetWindowTextW(local_string_address); member_edit_port_number.GetWindowTextW(local_string_port_number); member_edit_threads_number.GetWindowTextW(local_string_threads_number); member_edit_requests_number.GetWindowTextW(local_string_requests_number); member_edit_message_unicode_16.GetWindowTextW(local_string_message_unicode_16); WORD local_port_number = (WORD)_wtoi(local_string_port_number); int local_threads_number = _wtoi(local_string_threads_number); WORD local_requests_number = (WORD)_wtoi(local_string_requests_number); USHORT local_maximum_message_length = member_maximum_message_length; for(int local_int_threads_number_counter=0;local_int_threads_number_counter<local_threads_number;local_int_threads_number_counter++) { void *local_thread_parameters_structure = new thread_parameters_structure_type; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address = local_string_address; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number = local_port_number; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_requests_number = local_requests_number; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_message_unicode_16 = local_string_message_unicode_16; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_maximum_message_length = local_maximum_message_length; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address_local = CString(); ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number_local = 0; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_main_dialog = this; CWinThread *local_thread = AfxBeginThread(datagram_send_connection_thread,local_thread_parameters_structure); } } void Cdatagram_sendDlg::OnBnClickedButtonStartListen() { CString local_string_address_local; CString local_string_port_number_local; member_edit_address_local.GetWindowTextW(local_string_address_local); member_edit_port_number_local.GetWindowTextW(local_string_port_number_local); USHORT local_maximum_message_length = member_maximum_message_length; WORD local_port_number_local = (WORD)_wtoi(local_string_port_number_local); { void *local_thread_parameters_structure = new thread_parameters_structure_type; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address = CString(); ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number = 0; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_requests_number = 0; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_message_unicode_16 = CString(); ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_maximum_message_length = local_maximum_message_length; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_address_local = local_string_address_local; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_port_number_local = local_port_number_local; ((thread_parameters_structure_type*)local_thread_parameters_structure)->parameter_main_dialog = this; CWinThread *local_thread = AfxBeginThread(datagram_receive_connection_thread,local_thread_parameters_structure); } } |