RESTinio
Loading...
Searching...
No Matches
connection.hpp
Go to the documentation of this file.
1/*
2 restinio
3*/
4
9#pragma once
10
12
13#include <http_parser.h>
14
16
29
32
33namespace restinio
34{
35
36namespace impl
37{
38
39//
40// http_parser_ctx_t
41//
42
44
49{
53 std::string m_body;
55
59 std::size_t m_last_value_total_size{ 0u };
60 bool m_last_was_value{ true };
61
66
72
74 bool m_message_complete{ false };
75
84 std::size_t m_total_field_count{ 0u };
85
92
100 : m_limits{ limits }
101 {}
102
104 void
106 {
108 m_body.clear();
109 m_current_field_name.clear();
111 m_last_was_value = true;
113 m_message_complete = false;
115 }
116
119
125 {
127
128 if( !m_chunked_info_block.m_chunks.empty() ||
130 {
131 result = std::make_unique< chunked_input_info_t >(
132 std::move( m_chunked_info_block ) );
133 }
134
135 return result;
136 }
137};
138
140#include "parser_callbacks.ipp"
141
142//
143// create_parser_settings()
144//
145
147
150template< typename Http_Methods >
151inline http_parser_settings
153{
154 http_parser_settings parser_settings;
155 http_parser_settings_init( &parser_settings );
156
157 parser_settings.on_url =
158 []( http_parser * parser, const char * at, size_t length ) -> int {
159 return restinio_url_cb( parser, at, length );
160 };
161
162 parser_settings.on_header_field =
163 []( http_parser * parser, const char * at, size_t length ) -> int {
164 return restinio_header_field_cb( parser, at, length );
165 };
166
167 parser_settings.on_header_value =
168 []( http_parser * parser, const char * at, size_t length ) -> int {
169 return restinio_header_value_cb( parser, at, length );
170 };
171
172 parser_settings.on_headers_complete =
173 []( http_parser * parser ) -> int {
174 return restinio_headers_complete_cb( parser );
175 };
176
177 parser_settings.on_body =
178 []( http_parser * parser, const char * at, size_t length ) -> int {
179 return restinio_body_cb( parser, at, length );
180 };
181
182 parser_settings.on_chunk_header =
183 []( http_parser * parser ) -> int {
184 return restinio_chunk_header_cb( parser );
185 };
186
187 parser_settings.on_chunk_complete =
188 []( http_parser * parser ) -> int {
189 return restinio_chunk_complete_cb( parser );
190 };
191
192 parser_settings.on_message_complete =
193 []( http_parser * parser ) -> int {
194 return restinio_message_complete_cb< Http_Methods >( parser );
195 };
196
197 return parser_settings;
198}
199
200//
201// connection_upgrade_stage_t
202//
203
205enum class connection_upgrade_stage_t : std::uint8_t
206{
208 none,
218};
219
220//
221// connection_input_t
222//
223
226{
228 std::size_t buffer_size,
230 : m_parser_ctx{ limits }
231 , m_buf{ buffer_size }
232 {}
233
236 http_parser m_parser;
239
242
246
249
251 void
253 {
254 // Reinit parser.
255 http_parser_init( &m_parser, HTTP_REQUEST);
256
257 // Reset context and attach it to parser.
259 m_parser.data = &m_parser_ctx;
260 }
261};
262
263template < typename Connection, typename Start_Read_CB, typename Failed_CB >
264void
266 asio_ns::ip::tcp::socket & ,
267 Connection & ,
268 Start_Read_CB start_read_cb,
269 Failed_CB )
270{
271 // No preparation is needed, start
272 start_read_cb();
273}
274
275// An overload for the case of non-TLS-connection.
276inline tls_socket_t *
278 asio_ns::ip::tcp::socket & ) noexcept
279{
280 return nullptr;
281}
282
283//
284// connection_t
285//
286
288/*
289 Working circle consists of the following steps:
290 * wait for request -- reading from socket and parsing header and body;
291 * handling request -- once the request is completely obtained it's handling
292 is deligated to a handler chosen by handler factory;
293 * writing response -- writing response to socket;
294 * back to first step o close connection -- depending on keep-alive property
295 of the last response the connection goes back to first step or
296 shutdowns.
297
298 Each step is controlled by timer (\see schedule_operation_timeout_callback())
299
300 In case of errors connection closes itself.
301*/
302template < typename Traits >
303class connection_t final
304 : public connection_base_t
305 , public executor_wrapper_t< typename Traits::strand_t >
306{
308
309 public:
310 using timer_manager_t = typename Traits::timer_manager_t;
311 using timer_guard_t = typename timer_manager_t::timer_guard_t;
314 using logger_t = typename Traits::logger_t;
315 using strand_t = typename Traits::strand_t;
316 using stream_socket_t = typename Traits::stream_socket_t;
319
322 connection_id_t conn_id,
324 stream_socket_t && socket,
328 endpoint_t remote_endpoint,
330 lifetime_monitor_t lifetime_monitor )
331 : connection_base_t{ conn_id }
333 , m_socket{ std::move( socket ) }
334 , m_settings{ std::move( settings ) }
335 , m_remote_endpoint{ std::move( remote_endpoint ) }
336 , m_input{
337 m_settings->m_buffer_size,
338 m_settings->m_incoming_http_msg_limits
339 }
340 , m_response_coordinator{ m_settings->m_max_pipelined_requests }
341 , m_timer_guard{ m_settings->create_timer_guard() }
343 , m_logger{ *( m_settings->m_logger ) }
344 , m_lifetime_monitor{ std::move(lifetime_monitor) }
345 {
346 // Notify of a new connection instance.
347 m_logger.trace( [&]{
348 return fmt::format(
350 "[connection:{}] start connection with {}" ),
353 } );
354 }
355
356 // Disable copy/move.
357 connection_t( const connection_t & ) = delete;
359 connection_t & operator = ( const connection_t & ) = delete;
361
362 ~connection_t() override
363 {
365 [&]{
366 return fmt::format(
368 "[connection:{}] destructor called" ),
369 connection_id() );
370 } );
371 }
372
373 void
375 {
377 m_socket,
378 *this,
379 [ & ]{
380 // Inform state listener if it used.
381 m_settings->call_state_listener( [this]() noexcept {
383 this->connection_id(),
384 this->m_remote_endpoint,
387 m_socket )
388 }
389 };
390 } );
391
392 // Start timeout checking.
393 m_prepared_weak_ctx = shared_from_this();
395
396 // Start reading request.
398 },
399 [ & ]( const asio_ns::error_code & ec ){
401 return fmt::format(
403 "[connection:{}] prepare connection error: {}" ),
405 ec.message() );
406 } );
407 } );
408 }
409
411 void
413 {
414 m_logger.trace( [&]{
415 return fmt::format(
417 "[connection:{}] start waiting for request" ),
418 connection_id() );
419 } );
420
421 // Prepare parser for consuming new request message.
423
424 // Guard total time for a request to be read.
425 // guarding here makes the total read process
426 // to run in read_next_http_message_timelimit.
428
429 if( 0 != m_input.m_buf.length() )
430 {
431 // If a pipeline requests were sent by client
432 // then the biginning (or even entire request) of it
433 // is in the buffer obtained from socket in previous
434 // read operation.
436 }
437 else
438 {
439 // Next request (if any) must be obtained from socket.
441 }
442 }
443
446 {
448 upgrade_internals_t && ) = default;
449
452 stream_socket_t socket,
453 lifetime_monitor_t lifetime_monitor )
454 : m_settings{ std::move(settings) }
455 , m_socket{ std::move( socket ) }
456 , m_lifetime_monitor{ std::move(lifetime_monitor) }
457 {}
458
462 };
463
467 {
468 return upgrade_internals_t{
470 std::move(m_socket),
471 std::move(m_lifetime_monitor)
472 };
473 }
474
475 private:
477 inline void
479 {
481 {
482 m_logger.trace( [&]{
483 return fmt::format(
485 "[connection:{}] continue reading request" ),
486 connection_id() );
487 } );
488
489
491 m_socket.async_read_some(
493 asio_ns::bind_executor(
494 this->get_executor(),
495 [this, ctx = shared_from_this()]
496 // NOTE: this lambda is noexcept since v.0.6.0.
497 ( const asio_ns::error_code & ec,
498 std::size_t length ) noexcept {
501 } ) );
502 }
503 else
504 {
505 m_logger.trace( [&]{
506 return fmt::format(
508 "[connection:{}] skip read operation: already running" ),
509 connection_id() );
510 } );
511 }
512 }
513
515 inline void
516 after_read( const asio_ns::error_code & ec, std::size_t length ) noexcept
517 {
518 if( !ec )
519 {
520 // Exceptions shouldn't go out of `after_read`.
521 // So intercept them and close the connection in the case
522 // of an exception.
523 try
524 {
525 m_logger.trace( [&]{
526 return fmt::format(
528 "[connection:{}] received {} bytes" ),
529 this->connection_id(),
530 length );
531 } );
532
533 m_input.m_buf.obtained_bytes( length );
534
535 consume_data( m_input.m_buf.bytes(), length );
536 }
537 catch( const std::exception & x )
538 {
540 return fmt::format(
542 "[connection:{}] unexpected exception during the "
543 "handling of incoming data: {}" ),
545 x.what() );
546 } );
547 }
548 }
549 else
550 {
551 // Well, if it is actually an error
552 // then close connection.
553 if( !error_is_operation_aborted( ec ) )
554 {
555 if ( !error_is_eof( ec ) || 0 != m_input.m_parser.nread )
557 return fmt::format(
559 "[connection:{}] read socket error: {}; "
560 "parsed bytes: {}" ),
562 ec.message(),
563 m_input.m_parser.nread );
564 } );
565 else
566 {
567 // A case that is not such an error:
568 // on a connection (most probably keeped alive
569 // after previous request, but a new also applied)
570 // no bytes were consumed and remote peer closes connection.
572 [&]{
573 return fmt::format(
575 "[connection:{}] EOF and no request, "
576 "close connection" ),
577 connection_id() );
578 } );
579
581 }
582 }
583 // else: read operation was cancelled.
584 }
585 }
586
588 void
589 consume_data( const char * data, std::size_t length )
590 {
591 auto & parser = m_input.m_parser;
592
593 const auto nparsed =
594 http_parser_execute(
595 &parser,
596 &( m_settings->m_parser_settings ),
597 data,
598 length );
599
600 // If entire http-message was obtained,
601 // parser is stopped and the might be a part of consecutive request
602 // left in buffer, so we mark how many bytes were obtained.
603 // and next message read (if any) will be started from already existing
604 // data left in buffer.
605 m_input.m_buf.consumed_bytes( nparsed );
606
607 if( HPE_OK != parser.http_errno &&
608 HPE_PAUSED != parser.http_errno )
609 {
610 // PARSE ERROR:
611 auto err = HTTP_PARSER_ERRNO( &parser );
612
613 // TODO: handle case when there are some request in process.
615 return fmt::format(
617 "[connection:{}] parser error {}: {}" ),
619 http_errno_name( err ),
620 http_errno_description( err ) );
621 } );
622
623 // nothing to do.
624 return;
625 }
626
628 {
630 }
631 else
633 }
634
636 void
638 {
639 try
640 {
641 auto & parser = m_input.m_parser;
642 auto & parser_ctx = m_input.m_parser_ctx;
643
644 if( m_input.m_parser.upgrade )
645 {
646 // Start upgrade connection operation.
647
648 // The first thing is to make sure
649 // that upgrade request will be handled in
650 // a non pipelined fashion.
653 }
654
657 {
658 // Run ordinary HTTP logic.
659 const auto request_id = m_response_coordinator.register_new_request();
660
661 m_logger.trace( [&]{
662 return fmt::format(
664 "[connection:{}] request received (#{}): {} {}" ),
666 request_id,
667 http_method_str(
668 static_cast<http_method>( parser.method ) ),
669 parser_ctx.m_header.request_target() );
670 } );
671
672 // TODO: mb there is a way to
673 // track if response was emmited immediately in handler
674 // or it was delegated
675 // so it is possible to omit this timer scheduling.
677
678 const auto handling_result =
680 std::make_shared< generic_request_t >(
681 request_id,
682 std::move( parser_ctx.m_header ),
683 std::move( parser_ctx.m_body ),
684 parser_ctx.make_chunked_input_info_if_necessary(),
685 shared_from_concrete< connection_base_t >(),
687 m_settings->extra_data_factory() ) );
688
689 switch( handling_result )
690 {
693 // If handler refused request, say not implemented.
695 request_id,
700 break;
701
704 {
705 // Request was accepted,
706 // didn't create immediate response that closes connection after,
707 // and it is possible to receive more requests
708 // then start consuming yet another request.
710 }
711 break;
712 }
713 }
714 else
715 {
716 m_logger.trace( [&]{
717 const std::string default_value{};
718
719 return fmt::format(
721 "[connection:{}] upgrade request received: {} {}; "
722 "Upgrade: '{}';" ),
724 http_method_str(
725 static_cast<http_method>( parser.method ) ),
726 parser_ctx.m_header.request_target(),
727 parser_ctx.m_header.get_field_or(
728 http_field::upgrade, default_value ) );
729 } );
730
732 {
733 // There are no requests in handling
734 // So the current request with upgrade
735 // is the only one and can be handled directly.
736 // It is safe to call a handler for it.
738 }
739 else
740 {
741 // There are pipelined request
742 m_logger.trace( [&]{
743 return fmt::format(
745 "[connection:{}] upgrade request happened to "
746 "be a pipelined one, "
747 "and will be handled after previous requests "
748 "are handled" ),
749 connection_id() );
750 } );
751 }
752
753 // No further actions (like continue reading) in both cases are needed.
754 }
755
756 }
757 catch( const std::exception & ex )
758 {
760 return fmt::format(
762 "[connection:{}] error while handling request: {}" ),
763 this->connection_id(),
764 ex.what() );
765 } );
766 }
767 }
768
770
773 void
775 {
776 auto & parser = m_input.m_parser;
777 auto & parser_ctx = m_input.m_parser_ctx;
778
779 // If user responses with error
780 // then connection must be able to send
781 // (hence to receive) response.
782
783 const auto request_id = m_response_coordinator.register_new_request();
784
785 m_logger.info( [&]{
786 return fmt::format(
788 "[connection:{}] handle upgrade request (#{}): {} {}" ),
790 request_id,
791 http_method_str(
792 static_cast<http_method>( parser.method ) ),
793 parser_ctx.m_header.request_target() );
794 } );
795
796 // Do not guard upgrade request.
798
799 // After calling handler we expect the results or
800 // no further operations with connection
803
804 const auto handling_result = m_request_handler(
805 std::make_shared< generic_request_t >(
806 request_id,
807 std::move( parser_ctx.m_header ),
808 std::move( parser_ctx.m_body ),
809 parser_ctx.make_chunked_input_info_if_necessary(),
810 shared_from_concrete< connection_base_t >(),
812 m_settings->extra_data_factory() ) );
813 switch( handling_result )
814 {
817 if( m_socket.is_open() )
818 {
819 // Request is rejected, so our socket
820 // must not be moved out to websocket connection.
821
822 // If handler refused request, say not implemented.
824 request_id,
829 }
830 else
831 {
832 // Request is rejected, but the socket
833 // was moved out to somewhere else???
834
835 m_logger.error( [&]{
836 return fmt::format(
838 "[connection:{}] upgrade request handler rejects "
839 "request, but socket was moved out from connection" ),
840 connection_id() );
841 } );
842 }
843 break;
844
846 /* nothing to do */
847 break;
848 }
849
850 // Else 2 cases:
851 // 1. request is handled asynchronously, so
852 // what happens next depends on handling.
853 // 2. handling was immediate, so further destiny
854 // of a connection was already determined.
855 //
856 // In both cases: here do nothing.
857 // We can't even do read-only access because
858 // upgrade handling might take place
859 // in distinct execution context.
860 // So no even a log messages here.
861 }
862
864 virtual void
867 request_id_t request_id,
869 response_output_flags_t response_output_flags,
871 write_group_t wg ) override
872 {
874 asio_ns::dispatch(
875 this->get_executor(),
876 [ this,
877 request_id,
878 response_output_flags,
879 actual_wg = std::move( wg ),
880 ctx = shared_from_this() ]
881 // NOTE that this lambda is noexcept since v.0.6.0.
882 () mutable noexcept
883 {
884 try
885 {
887 request_id,
888 response_output_flags,
889 std::move( actual_wg ) );
890 }
891 catch( const std::exception & ex )
892 {
894 return fmt::format(
896 "[connection:{}] unable to handle response: {}" ),
898 ex.what() );
899 } );
900 }
901 } );
902 }
903
905 void
908 request_id_t request_id,
910 response_output_flags_t response_output_flags,
912 write_group_t wg )
913 {
914 auto invoke_after_write_cb_with_error = [&]{
915 try
916 {
920 }
921 catch( const std::exception & ex )
922 {
923 m_logger.error( [&]{
924 return fmt::format(
926 "[connection:{}] notificator error: {}" ),
928 ex.what() );
929 } );
930 }
931 };
932
933 if( m_socket.is_open() )
934 {
938 {
939 // It is response for a connection-upgrade request.
940 // If we receive it here then it is constructed via
941 // message builder and so connection was not transformed
942 // to websocket connection.
943 // So it is necessary to resume pipeline logic that was stopped
944 // for upgrade-request to be handled as the only request
945 // on the connection for that moment.
947 {
949 }
950 }
951
953 {
954 m_logger.trace( [&]{
955 return fmt::format(
957 "[connection:{}] append response (#{}), "
958 "flags: {}, write group size: {}" ),
960 request_id,
961 fmtlib_tools::streamed( response_output_flags ),
962 wg.items_count() );
963 } );
964
966 request_id,
967 response_output_flags,
968 std::move( wg ) );
969
971 }
972 else
973 {
974 m_logger.warn( [&]{
975 return fmt::format(
977 "[connection:{}] receive response parts for "
978 "request (#{}), but response with connection-close "
979 "attribute happened before" ),
981 request_id );
982 } );
983 invoke_after_write_cb_with_error();
984 }
985 }
986 else
987 {
988 m_logger.warn( [&]{
989 return fmt::format(
991 "[connection:{}] try to write response, "
992 "while socket is closed" ),
993 connection_id() );
994 } );
995 invoke_after_write_cb_with_error();
996 }
997 }
998
999 // Check if there is something to write,
1000 // and if so starts write operation.
1001 void
1003 {
1004 assert( !m_response_coordinator.closed() );
1005
1007 {
1008 init_write();
1009 }
1010 }
1011
1013 void
1015 {
1016 // Here: not writing anything to socket, so
1017 // write operation can be initiated.
1018
1019 // Remember if all response cells were busy.
1020 const bool response_coordinator_full_before =
1022
1023 auto next_write_group = m_response_coordinator.pop_ready_buffers();
1024
1025 if( next_write_group )
1026 {
1027 m_logger.trace( [&]{
1028 return fmt::format(
1030 "[connection:{}] start next write group for response (#{}), "
1031 "size: {}" ),
1032 this->connection_id(),
1033 next_write_group->second,
1034 next_write_group->first.items_count() );
1035 } );
1036
1037 // Check if all response cells busy:
1038 const bool response_coordinator_full_after =
1040
1041 // Whether we need to resume read after this group is written?
1043 response_coordinator_full_before &&
1044 !response_coordinator_full_after;
1045
1046 if( 0 < next_write_group->first.status_line_size() )
1047 {
1048 // We need to extract status line out of the first buffer
1049 assert(
1051 next_write_group->first.items().front().write_type() );
1052
1053 m_logger.trace( [&]{
1054 // Get status line:
1055 const string_view_t status_line{
1056 asio_ns::buffer_cast< const char * >(
1057 next_write_group->first.items().front().buf() ),
1058 next_write_group->first.status_line_size() };
1059
1060 return fmt::format(
1062 "[connection:{}] start response (#{}): {}" ),
1063 this->connection_id(),
1064 next_write_group->second,
1065 fmtlib_tools::streamed( status_line ) );
1066 } );
1067 }
1068
1069 // Initialize write context with a new write group.
1071 std::move( next_write_group->first ) );
1072
1073 // Start the loop of sending data from current write group.
1075 }
1076 else
1077 {
1079 }
1080 }
1081
1082 // Use aliases for shorter names.
1086
1088
1099 void
1101 {
1102 try
1103 {
1105
1106 if( holds_alternative< trivial_write_operation_t >( wo ) )
1107 {
1108 handle_trivial_write_operation( get< trivial_write_operation_t >( wo ) );
1109 }
1110 else if( holds_alternative< file_write_operation_t >( wo ) )
1111 {
1112 handle_file_write_operation( get< file_write_operation_t >( wo ) );
1113 }
1114 else
1115 {
1116 assert( holds_alternative< none_write_operation_t >( wo ) );
1118 }
1119 }
1120 catch( const std::exception & ex )
1121 {
1123 return fmt::format(
1125 "[connection:{}] handle_current_write_ctx failed: {}" ),
1126 connection_id(),
1127 ex.what() );
1128 } );
1129 }
1130 }
1131
1133 void
1135 {
1136 // Asio buffers (param for async write):
1137 auto & bufs = op.get_trivial_bufs();
1138
1140 {
1141 m_logger.trace( [&]{
1142 return fmt::format(
1144 "[connection:{}] sending resp data with "
1145 "connection-close attribute "
1146 "buf count: {}, "
1147 "total size: {}" ),
1148 connection_id(),
1149 bufs.size(),
1150 op.size() );
1151 } );
1152
1153 // Reading new requests is useless.
1154 asio_ns::error_code ignored_ec;
1155 m_socket.cancel( ignored_ec );
1156 }
1157 else
1158 {
1159 m_logger.trace( [&]{
1160 return fmt::format(
1162 "[connection:{}] sending resp data, "
1163 "buf count: {}, "
1164 "total size: {}" ),
1165 connection_id(),
1166 bufs.size(),
1167 op.size() ); } );
1168 }
1169
1170 // There is somethig to write.
1171 asio_ns::async_write(
1172 m_socket,
1173 bufs,
1174 asio_ns::bind_executor(
1175 this->get_executor(),
1176 [this, ctx = shared_from_this()]
1177 // NOTE: since v.0.6.0 this lambda is noexcept.
1178 ( const asio_ns::error_code & ec, std::size_t written ) noexcept
1179 {
1180 if( !ec )
1181 {
1183 [&]{
1184 return fmt::format(
1186 "[connection:{}] outgoing data was "
1187 "sent: {} bytes" ),
1188 connection_id(),
1189 written );
1190 } );
1191 }
1192
1194 } ) );
1195
1197 }
1198
1200 void
1202 {
1204 {
1205 m_logger.trace( [&]{
1206 return fmt::format(
1208 "[connection:{}] sending resp file data with "
1209 "connection-close attribute, "
1210 "total size: {}" ),
1211 connection_id(),
1212 op.size() );
1213 } );
1214
1215 // Reading new requests is useless.
1216 asio_ns::error_code ignored_ec;
1217 m_socket.cancel( ignored_ec );
1218 }
1219 else
1220 {
1221 m_logger.trace( [&]{
1222 return fmt::format(
1224 "[connection:{}] sending resp file data, total size: {}" ),
1225 connection_id(),
1226 op.size() );
1227 } );
1228 }
1229
1231
1232 auto op_ctx = op;
1233
1235 this->get_executor(),
1236 m_socket,
1237 asio_ns::bind_executor(
1238 this->get_executor(),
1239 [this, ctx = shared_from_this(),
1240 // Store operation context till the end
1241 op_ctx ]
1242 // NOTE: since v.0.6.0 this lambda is noexcept
1243 (const asio_ns::error_code & ec, file_size_t written ) mutable noexcept
1244 {
1245 // NOTE: op_ctx should be reset just before return from
1246 // that lambda. We can't call reset() until the end of
1247 // the lambda because lambda object itself will be
1248 // destroyed.
1249 auto op_ctx_reseter = restinio::utils::at_scope_exit(
1250 [&op_ctx] {
1251 // Reset sendfile operation context.
1252 RESTINIO_ENSURE_NOEXCEPT_CALL( op_ctx.reset() );
1253 } );
1254
1255 if( !ec )
1256 {
1258 [&]{
1259 return fmt::format(
1261 "[connection:{}] file data was sent: "
1262 "{} bytes" ),
1263 connection_id(),
1264 written );
1265 } );
1266 }
1267 else
1268 {
1270 [&]{
1271 return fmt::format(
1273 "[connection:{}] send file data error: "
1274 "{} ({}) bytes" ),
1275 connection_id(),
1276 ec.value(),
1277 ec.message() );
1278 } );
1279 }
1280
1282 } ) );
1283 }
1284
1286 void
1288 {
1289 // Finishing writing this group.
1290 m_logger.trace( [&]{
1291 return fmt::format(
1293 "[connection:{}] finishing current write group" ),
1294 this->connection_id() );
1295 } );
1296
1297 // Group notificators are called from here (if exist):
1299
1301 {
1302 m_logger.trace( [&]{
1303 return fmt::format(
1305 "[connection:{}] should keep alive" ),
1306 this->connection_id() );
1307 } );
1308
1311 {
1312 // Run ordinary HTTP logic.
1314 {
1316 }
1317
1318 // Start another write opertion
1319 // if there is something to send.
1321 }
1322 else
1323 {
1325 {
1326 // Here upgrade req is the only request
1327 // to by handled by this connection.
1328 // So it is safe to call a handler for it.
1330 }
1331 else
1332 {
1333 // Do not start reading in any case,
1334 // but if there is at least one request preceding
1335 // upgrade-req, logic must continue http interaction.
1337 }
1338 }
1339 }
1340 else
1341 {
1342 // No keep-alive, close connection.
1343 close();
1344 }
1345 }
1346
1347 void
1349 {
1351 {
1352 // Bufs empty but there happened to
1353 // be a response context marked as complete
1354 // (final_parts) and having connection-close attr.
1355 // It is because `init_write_if_necessary()`
1356 // is called only under `!m_response_coordinator.closed()`
1357 // condition, so if no bufs were obtained
1358 // and response coordinator is closed means
1359 // that a first response stored by
1360 // response coordinator was marked as complete
1361 // without data.
1362
1363 m_logger.trace( [&]{
1364 return fmt::format(
1366 "[connection:{}] last sent response was marked "
1367 "as complete" ),
1368 connection_id() ); } );
1369 close();
1370 }
1371 else
1372 {
1373 // Not writing anything, so need to deal with timouts.
1375 {
1376 // No requests in processing.
1377 // So set read next request timeout.
1379 }
1380 else
1381 {
1382 // Have requests in process.
1383 // So take control over request handling.
1385 }
1386 }
1387 }
1388
1390
1394 void
1395 after_write( const asio_ns::error_code & ec ) noexcept
1396 {
1397 if( !ec )
1398 {
1400 }
1401 else
1402 {
1403 if( !error_is_operation_aborted( ec ) )
1404 {
1406 return fmt::format(
1408 "[connection:{}] unable to write: {}" ),
1409 connection_id(),
1410 ec.message() );
1411 } );
1412 }
1413 // else: Operation aborted only in case of close was called.
1414
1415 try
1416 {
1418 }
1419 catch( const std::exception & ex )
1420 {
1422 [&]{
1423 return fmt::format(
1425 "[connection:{}] notificator error: {}" ),
1426 connection_id(),
1427 ex.what() );
1428 } );
1429 }
1430 }
1431 }
1432
1435
1437 void
1438 close() noexcept
1439 {
1441 [&]{
1442 return fmt::format(
1443 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] close" ),
1444 connection_id() );
1445 } );
1446
1447 // shutdown() and close() should be called regardless of
1448 // possible exceptions.
1450 m_logger,
1451 "connection.socket.shutdown",
1452 [this] {
1453 asio_ns::error_code ignored_ec;
1454 m_socket.shutdown(
1455 asio_ns::ip::tcp::socket::shutdown_both,
1456 ignored_ec );
1457 } );
1459 m_logger,
1460 "connection.socket.close",
1461 [this] {
1462 m_socket.close();
1463 } );
1464
1466 [&]{
1467 return fmt::format(
1469 "[connection:{}] close: close socket" ),
1470 connection_id() );
1471 } );
1472
1473 // Clear stuff.
1475
1477 [&]{
1478 return fmt::format(
1480 "[connection:{}] close: timer canceled" ),
1481 connection_id() );
1482 } );
1483
1485
1487 [&]{
1488 return fmt::format(
1490 "[connection:{}] close: reset responses data" ),
1491 connection_id() );
1492 } );
1493
1494 // Inform state listener if it used.
1495 m_settings->call_state_listener_suppressing_exceptions(
1496 [this]() noexcept {
1498 this->connection_id(),
1499 this->m_remote_endpoint,
1501 };
1502 } );
1503 }
1504
1506
1510 template< typename Message_Builder >
1511 void
1512 trigger_error_and_close( Message_Builder msg_builder ) noexcept
1513 {
1514 // An exception from logger/msg_builder shouldn't prevent
1515 // a call to close().
1517 m_logger, std::move(msg_builder) );
1518
1520 }
1522
1525
1528
1531
1534
1537
1538 // Memo flag: whether we need to resume read after this group is written
1540
1543
1546
1548 static connection_t &
1550 {
1551 return static_cast< connection_t & >( base );
1552 }
1553
1556 virtual void
1558 {
1559 asio_ns::dispatch(
1560 this->get_executor(),
1561 [ ctx = std::move( self ) ]
1562 // NOTE: this lambda is noexcept since v.0.6.0.
1563 () noexcept {
1564 auto & conn_object = cast_to_self( *ctx );
1565 // If an exception will be thrown we can only
1566 // close the connection.
1567 try
1568 {
1569 conn_object.check_timeout_impl();
1570 }
1571 catch( const std::exception & x )
1572 {
1573 conn_object.trigger_error_and_close( [&] {
1574 return fmt::format(
1576 "[connection: {}] unexpected "
1577 "error during timeout handling: {}" ),
1578 conn_object.connection_id(),
1579 x.what() );
1580 } );
1581 }
1582 } );
1583 }
1584
1586 using timout_cb_t = void (connection_t::* )( void );
1587
1590
1592 std::chrono::steady_clock::time_point m_current_timeout_after;
1597
1599 void
1601 {
1602 if( std::chrono::steady_clock::now() > m_current_timeout_after )
1603 {
1605 (this->*m_current_timeout_cb)();
1606 }
1607 else
1608 {
1610 }
1611 }
1612
1614 void
1616 {
1617 m_timer_guard.schedule( m_prepared_weak_ctx );
1618 }
1619
1621 void
1623 {
1624 m_current_timeout_cb = nullptr;
1626 }
1627
1629 void
1631 std::chrono::steady_clock::time_point timeout_after,
1632 timout_cb_t timout_cb )
1633 {
1634 m_current_timeout_after = timeout_after;
1635 m_current_timeout_cb = timout_cb;
1636 }
1637
1638 void
1640 std::chrono::steady_clock::duration timeout,
1641 timout_cb_t timout_cb )
1642 {
1644 std::chrono::steady_clock::now() + timeout,
1645 timout_cb );
1646 }
1647
1648 void
1649 handle_xxx_timeout( const char * operation_name )
1650 {
1651 m_logger.trace( [&]{
1652 return fmt::format(
1653 RESTINIO_FMT_FORMAT_STRING( "[connection:{}] {} timed out" ),
1654 connection_id(),
1655 operation_name );
1656 } );
1657
1658 close();
1659 }
1660
1661 void
1663 {
1664 handle_xxx_timeout( "wait for request" );
1665 }
1666
1668 void
1670 {
1672 {
1674 m_settings->m_read_next_http_message_timelimit,
1676 }
1677 }
1678
1679 void
1681 {
1682 handle_xxx_timeout( "handle request" );
1683 }
1684
1686 void
1688 {
1690 {
1692 m_settings->m_handle_request_timeout,
1694 }
1695 }
1696
1697 void
1699 {
1700 handle_xxx_timeout( "writing response" );
1701 }
1702
1704 void
1706 {
1708 m_settings->m_write_http_response_timelimit,
1710 }
1711
1712 void
1714 {
1715 handle_xxx_timeout( "writing response (sendfile)" );
1716 }
1717
1718 void
1719 guard_sendfile_operation( std::chrono::steady_clock::duration timelimit )
1720 {
1721 if( std::chrono::steady_clock::duration::zero() == timelimit )
1722 timelimit = m_settings->m_write_http_response_timelimit;
1723
1725 timelimit,
1727 }
1729
1732
1735
1745};
1746
1747//
1748// connection_factory_t
1749//
1750
1752template < typename Traits >
1754{
1755 public:
1756 using logger_t = typename Traits::logger_t;
1757 using stream_socket_t = typename Traits::stream_socket_t;
1760
1762 connection_settings_handle_t< Traits > connection_settings,
1763 std::unique_ptr< socket_options_setter_t > socket_options_setter )
1764 : m_connection_settings{ std::move( connection_settings ) }
1765 , m_socket_options_setter{ std::move( socket_options_setter ) }
1767 {}
1768
1769 // NOTE: since v.0.6.3 it returns non-empty
1770 // shared_ptr<connection_t<Traits>> or an exception is thrown in
1771 // the case of an error.
1772 // NOTE: since v.0.6.12 it accepts yet another parameter: lifetime_monitor.
1773 auto
1775 stream_socket_t socket,
1776 endpoint_t remote_endpoint,
1777 lifetime_monitor_t lifetime_monitor )
1778 {
1779 using connection_type_t = connection_t< Traits >;
1780
1781 {
1782 socket_options_t options{ socket.lowest_layer() };
1783 (*m_socket_options_setter)( options );
1784 }
1785
1786 return std::make_shared< connection_type_t >(
1788 std::move( socket ),
1790 std::move( remote_endpoint ),
1791 std::move( lifetime_monitor ) );
1792 }
1793
1794 private:
1796
1798
1799 std::unique_ptr< socket_options_setter_t > m_socket_options_setter;
1800
1802};
1803
1804} /* namespace impl */
1805
1806} /* namespace restinio */
A simple implementation of at_scope_exit concept.
Helper type for controlling the lifetime of the connection.
Type of object that tells that new connection has been accepted.
Type of object that tells that the connection has been closed.
An object with info about connection to be passed to state listener.
auto fields_count() const noexcept
std::unique_ptr< socket_options_setter_t > m_socket_options_setter
connection_factory_t(connection_settings_handle_t< Traits > connection_settings, std::unique_ptr< socket_options_setter_t > socket_options_setter)
typename Traits::logger_t logger_t
connection_settings_handle_t< Traits > m_connection_settings
auto create_new_connection(stream_socket_t socket, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
typename Traits::stream_socket_t stream_socket_t
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
Context for handling http connections.
Definition: connection.hpp:306
virtual void check_timeout(tcp_connection_ctx_handle_t &self) override
Schedules real timedout operations check on the executer of a connection.
connection_t & operator=(const connection_t &)=delete
typename Traits::strand_t strand_t
Definition: connection.hpp:315
void handle_upgrade_request()
Calls handler for upgrade request.
Definition: connection.hpp:774
tcp_connection_ctx_weak_handle_t m_prepared_weak_ctx
A prepared weak handle for passing it to timer guard.
void on_request_message_complete()
Handle a given request message.
Definition: connection.hpp:637
typename connection_count_limit_types< Traits >::lifetime_monitor_t lifetime_monitor_t
Definition: connection.hpp:318
virtual void write_response_parts(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg) override
Write parts for specified request.
Definition: connection.hpp:865
void handle_xxx_timeout(const char *operation_name)
void guard_write_operation()
Start guard write operation if necessary.
void finish_handling_current_write_ctx()
Do post write actions for current write group.
const endpoint_t m_remote_endpoint
Remote endpoint for this connection.
lifetime_monitor_t m_lifetime_monitor
Monitor of the connection lifetime.
stream_socket_t m_socket
Connection.
connection_input_t m_input
Input routine.
void schedule_operation_timeout_callback(std::chrono::steady_clock::time_point timeout_after, timout_cb_t timout_cb)
Helper function to work with timer guard.
void guard_sendfile_operation(std::chrono::steady_clock::duration timelimit)
request_handler_t & m_request_handler
Request handler.
void handle_trivial_write_operation(const trivial_write_operation_t &op)
Run trivial buffers write operation.
void handle_file_write_operation(file_write_operation_t &op)
Run sendfile write operation.
timout_cb_t m_current_timeout_cb
Callback to all if timeout happened.
void wait_for_http_message()
Start reading next htttp-message.
Definition: connection.hpp:412
void after_read(const asio_ns::error_code &ec, std::size_t length) noexcept
Handle read operation result.
Definition: connection.hpp:516
logger_t & m_logger
Logger for operation.
std::chrono::steady_clock::time_point m_current_timeout_after
Timeout point of a current guarded operation.
void init_write()
Initiate write operation.
typename Traits::timer_manager_t timer_manager_t
Definition: connection.hpp:310
upgrade_internals_t move_upgrade_internals()
Move socket out of connection.
Definition: connection.hpp:466
void cancel_timeout_checking() noexcept
Stop timout guarding.
static connection_t & cast_to_self(tcp_connection_ctx_base_t &base)
Timer to controll operations.
connection_t(const connection_t &)=delete
void handle_current_write_ctx() noexcept
Start/continue/continue handling output data of current write group.
void(connection_t::*)(void) timout_cb_t
Callback type for timedout operations.
connection_t(connection_id_t conn_id, stream_socket_t &&socket, connection_settings_handle_t< Traits > settings, endpoint_t remote_endpoint, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:320
void close() noexcept
Close connection functions.
typename Traits::logger_t logger_t
Definition: connection.hpp:314
void after_write(const asio_ns::error_code &ec) noexcept
Handle write response finished.
void trigger_error_and_close(Message_Builder msg_builder) noexcept
Trigger an error.
response_coordinator_t m_response_coordinator
Response coordinator.
void guard_request_handling_operation()
Start guard request handling operation if necessary.
typename timer_manager_t::timer_guard_t timer_guard_t
Definition: connection.hpp:311
request_handler_type_from_traits_t< Traits > request_handler_t
Definition: connection.hpp:312
void init_next_timeout_checking()
Schedule next timeout checking.
void consume_data(const char *data, std::size_t length)
Parse some data.
Definition: connection.hpp:589
void consume_message()
Start (continue) a chain of read-parse-read-... operations.
Definition: connection.hpp:478
timer_guard_t m_timer_guard
Timer guard.
typename Traits::stream_socket_t stream_socket_t
Definition: connection.hpp:316
void check_timeout_impl()
Check timed out operation.
connection_settings_handle_t< Traits > m_settings
Common paramaters of a connection.
void guard_read_operation()
Statr guard read operation if necessary.
void write_response_parts_impl(request_id_t request_id, response_output_flags_t response_output_flags, write_group_t wg)
Write parts for specified request.
Definition: connection.hpp:906
connection_t(connection_t &&)=delete
void schedule_operation_timeout_callback(std::chrono::steady_clock::duration timeout, timout_cb_t timout_cb)
write_group_output_ctx_t m_write_output_ctx
Write to socket operation context.
Wrapper for an executor (strand) used by connections.
Traits::strand_t & get_executor() noexcept
An executor for callbacks on async operations.
Helper class for reading bytes and feeding them to parser.
std::size_t length() const noexcept
How many unconsumed bytes are there in buffer.
void consumed_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
void obtained_bytes(std::size_t length) noexcept
Mark how many bytes were obtained.
auto make_asio_buffer() noexcept
Make asio buffer for reading bytes from socket.
const char * bytes() const noexcept
Get pointer to unconsumed bytes.
Coordinator for process of sending responses with respect to http pipeline technique and chunk transf...
void reset() noexcept
Remove all contexts.
request_id_t register_new_request()
Create a new request and reserve context for its response.
bool is_able_to_get_more_messages() const noexcept
Check if it is possible to accept more requests.
optional_t< std::pair< write_group_t, request_id_t > > pop_ready_buffers()
Extract a portion of data available for write.
void append_response(request_id_t req_id, response_output_flags_t response_output_flags, write_group_t wg)
Add outgoing data for specified request.
Socket adapter for asio::ssl::stream< asio::ip::tcp::socket >.
Definition: tls_socket.hpp:37
auto timelimit() const noexcept
Get the timelimit on this sendfile operation.
void start_sendfile_operation(default_asio_executor executor, Socket &socket, After_Write_CB after_sendfile_cb)
Start a sendfile operation.
auto size() const noexcept
Get the size of sendfile operation.
auto size() const noexcept
The size of data within this operation.
const std::vector< asio_ns::const_buffer > & get_trivial_bufs() const noexcept
Get buffer "iovec" for performing gather write.
Helper class for writting response data.
void fail_write_group(const asio_ns::error_code &ec)
Handle current group write process failed.
solid_write_operation_variant_t extract_next_write_operation()
et an object with next write operation to perform.
void finish_write_group()
Finish writing group normally.
void start_next_write_group(optional_t< write_group_t > next_wg) noexcept
Start handlong next write group.
bool transmitting() const noexcept
Check if data is trunsmitting now.
A type of holder of limits related to an incoming HTTP message.
An adapter for setting acceptor options before running server.
Definition: settings.hpp:249
connection_id_t connection_id() const noexcept
Get connection id.
Group of writable items transported to the context of underlying connection as one solid piece.
Definition: buffers.hpp:729
void invoke_after_write_notificator_if_exists(const asio_ns::error_code &ec)
Get after write notificator.
Definition: buffers.hpp:852
auto items_count() const noexcept
Get the count of stored items.
Definition: buffers.hpp:869
#define RESTINIO_ENSURE_NOEXCEPT_CALL(expr)
A wrapper around static_assert for checking that an expression is noexcept and execution of that expr...
#define RESTINIO_NODISCARD
Stuff related to limits of active parallel connections.
A special wrapper around fmtlib include files.
#define RESTINIO_FMT_FORMAT_STRING(s)
decltype(auto) streamed(T &&v) noexcept
tls_socket_t * make_tls_socket_pointer_for_state_listener(asio_ns::ip::tcp::socket &) noexcept
Definition: connection.hpp:277
std::shared_ptr< connection_settings_t< Traits > > connection_settings_handle_t
http_parser_settings create_parser_settings() noexcept
Include parser callbacks.
Definition: connection.hpp:152
auto create_not_implemented_resp()
void prepare_connection_and_start_read(asio_ns::ip::tcp::socket &, Connection &, Start_Read_CB start_read_cb, Failed_CB)
Definition: connection.hpp:265
connection_upgrade_stage_t
Enum for a flag specifying that connection is going to upgrade or not.
Definition: connection.hpp:206
@ wait_for_upgrade_handling_result_or_nothing
Handler for request with connection-upgrade header was called so any response data comming is for tha...
@ none
No connection request in progress.
@ pending_upgrade_handling
Request with connection-upgrade header came and waits for request handler to be called in non pipelin...
void suppress_exceptions(Logger &&logger, const char *block_description, Lambda &&lambda) noexcept
Helper function for execution a block of code with suppression of any exceptions raised inside that b...
scope_exit_details::at_exit_t< L > at_scope_exit(L &&l)
Helper function for creation action to be performed at scope exit.
void log_error_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
void log_trace_noexcept(Logger &&logger, Message_Builder &&builder) noexcept
std::shared_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_handle_t
Alias for http connection handle.
std::uint64_t file_size_t
std::unique_ptr< chunked_input_info_t > chunked_input_info_unique_ptr_t
Alias of unique_ptr for chunked_input_info.
std::weak_ptr< tcp_connection_ctx_base_t > tcp_connection_ctx_weak_handle_t
Alias for http connection weak handle.
nonstd::string_view string_view_t
Definition: string_view.hpp:19
unsigned int request_id_t
Request id in scope of single connection.
@ trivial_write_operation
Item is a buffer and must be written trivially.
bool error_is_operation_aborted(const asio_ns::error_code &ec) noexcept
@ accepted
Request accepted for handling.
@ not_handled
The request wasn't handled. If there is another handler to be tried it should be tried....
@ rejected
Request wasn't accepted for handling.
typename details::actual_request_handler_type_detector< typename Traits::request_handler_t, typename Traits::extra_data_factory_t >::request_handler_t request_handler_type_from_traits_t
A metafunction for extraction a request-handler type from server's traits.
Definition: traits.hpp:379
@ write_was_not_executed
After write notificator error: data was not sent, connection closed (or aborted) before a given piece...
bool error_is_eof(const asio_ns::error_code &ec) noexcept
asio_ns::ip::tcp::endpoint endpoint_t
An alias for endpoint type from Asio.
@ connection_close
This response says to close connection.
asio_ns::error_code make_asio_compaible_error(asio_convertible_error_t err) noexcept
Make restinio error_code compatible with asio_ns::error_code.
@ final_parts
Final parts (response ands with these parts).
std::uint64_t connection_id_t
Type for ID of connection.
STL namespace.
int restinio_url_cb(http_parser *parser, const char *at, size_t length)
int restinio_header_value_cb(http_parser *parser, const char *at, size_t length)
int restinio_body_cb(http_parser *parser, const char *at, size_t length)
int restinio_chunk_complete_cb(http_parser *)
int restinio_headers_complete_cb(http_parser *parser)
int restinio_header_field_cb(http_parser *parser, const char *at, size_t length)
int restinio_chunk_header_cb(http_parser *parser)
Helpers for safe truncation of unsigned integers.
Bunch of data related to chunked input.
std::vector< chunk_info_t > m_chunks
All non-empty chunks from the input.
http_header_fields_t m_trailing_fields
Trailing fields found in the input.
Data associated with connection read routine.
Definition: connection.hpp:226
connection_input_t(std::size_t buffer_size, incoming_http_msg_limits_t limits)
Definition: connection.hpp:227
fixed_buffer_t m_buf
Input buffer.
Definition: connection.hpp:241
bool m_read_operation_is_running
Flag to track whether read operation is performed now.
Definition: connection.hpp:248
void reset_parser()
Prepare parser for reading new http-message.
Definition: connection.hpp:252
http_parser m_parser
HTTP-parser.
Definition: connection.hpp:236
connection_upgrade_stage_t m_connection_upgrade_stage
Connection upgrade request stage.
Definition: connection.hpp:244
Internals that are necessary for upgrade.
Definition: connection.hpp:446
connection_settings_handle_t< Traits > m_settings
Definition: connection.hpp:459
upgrade_internals_t(connection_settings_handle_t< Traits > settings, stream_socket_t socket, lifetime_monitor_t lifetime_monitor)
Definition: connection.hpp:450
upgrade_internals_t(upgrade_internals_t &&)=default
Parsing result context for using in parser callbacks.
Definition: connection.hpp:49
void reset()
Prepare context to handle new request.
Definition: connection.hpp:105
RESTINIO_NODISCARD chunked_input_info_unique_ptr_t make_chunked_input_info_if_necessary()
Creates an instance of chunked_input_info if there is an info about chunks in the body.
Definition: connection.hpp:124
http_request_header_t m_header
Request data.
Definition: connection.hpp:52
std::size_t m_total_field_count
Total number of parsed HTTP-fields.
Definition: connection.hpp:84
bool m_message_complete
Flag: is http message parsed completely.
Definition: connection.hpp:74
const incoming_http_msg_limits_t m_limits
Limits for the incoming message.
Definition: connection.hpp:91
chunked_input_info_block_t m_chunked_info_block
Definition: connection.hpp:70
std::string m_current_field_name
Parser context temp values and flags.
Definition: connection.hpp:58
http_parser_ctx_t(incoming_http_msg_limits_t limits)
The main constructor.
Definition: connection.hpp:98
Response output flags for buffers commited to response-coordinator.