#include <Reactor.h>
Public Member Functions | |
Reactor () | |
Constructor. | |
~Reactor () | |
Destructor. | |
TimerId | registerTimerHandler (EventHandler *eh_, const TimeVal &tv_, const std::string &name_="<unknown>") |
Register Timer Event handler with Reactor. | |
bool | registerIOHandler (EventHandler *eh_, handler_t fd_, EventType et_=RWE_EVENTS) |
Register I/O Event handler with Reactor. | |
bool | removeHandler (EventHandler *eh_, EventType et_=ALL_EVENTS) |
Remove Event handler from reactor for either all I/O events or timeout event or both. | |
bool | removeTimerHandler (TimerId id_) |
Remove Timer event from the queue. | |
bool | removeIOHandler (handler_t fd_) |
Remove IO Event handler from reactor. | |
void | waitForEvents (void) |
Main waiting loop that blocks indefinitely processing events. | |
void | waitForEvents (TimeVal *tv_) |
Wait for events for time specified. | |
void | stopReactor (void) |
Stop Reactor's activity. | |
void | deactivate (void) |
Deactivate Reactor. | |
Private Types | |
typedef std::map< u_int, EventHandler * > | Fd2Eh_Map_Type |
no cloning | |
typedef Fd2Eh_Map_Type::iterator | Fd2Eh_Map_Iter |
Private Member Functions | |
Reactor (const Reactor &) | |
Reactor & | operator= (const Reactor &) |
no cloning | |
void | adjust_maxfdp1 (handler_t fd_, handler_t rmax_, handler_t wmax_, handler_t emax_) |
Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether). | |
bool | handleError (void) |
Handle error in select(2) loop appropriately. | |
bool | dispatch (int minimum_) |
Notify all EventHandlers registered on respecful events occured. | |
int | isAnyReady (void) |
Return number of file descriptors ready accross all sets. | |
bool | checkFDs (void) |
Check mask for bad file descriptors. | |
void | dispatchHandler (FdSet &mask_, Fd2Eh_Map_Type &fdSet_, EH_IO_Callback callback_) |
Call handler's callback and, if callback returns negative value, remove it from the Reactor. | |
void | calculateTimeout (TimeVal *&howlong_, TimeVal *maxwait_) |
Calculate closest timeout. | |
Private Attributes | |
int | m_fd_setsize |
Max number of open files per process. | |
handler_t | m_maxfd_plus1 |
Max file descriptor number (in all sets) plus 1. | |
bool | m_active |
Flag that indicates whether Reactor is active or had been stopped. | |
Fd2Eh_Map_Type | m_readSet |
Event handlers awaiting on READ_EVENT. | |
Fd2Eh_Map_Type | m_writeSet |
Event handlers awaiting on WRITE_EVENT. | |
Fd2Eh_Map_Type | m_exceptSet |
Event handlers awaiting on EXCEPT_EVENT. | |
MaskSet | m_waitSet |
Handlers to wait for event on. | |
MaskSet | m_readySet |
Handlers that are ready for processing. | |
TimerQueue | m_tqueue |
The queue of Timers. |
Definition at line 57 of file Reactor.h.
typedef Fd2Eh_Map_Type::iterator ASSA::Reactor::Fd2Eh_Map_Iter [private] |
typedef std::map<u_int, EventHandler*> ASSA::Reactor::Fd2Eh_Map_Type [private] |
Reactor::Reactor | ( | ) |
Constructor.
Maximum number of sockets supported (per process) Win32 defines it to 64 in winsock2.h.
Initialize winsock2 library
Definition at line 24 of file Reactor.cpp.
References m_fd_setsize, ASSA::REACTTRACE, and trace_with_mask.
00024 : 00025 m_fd_setsize (1024), 00026 m_maxfd_plus1 (0), 00027 m_active (true) 00028 { 00029 trace_with_mask("Reactor::Reactor",REACTTRACE); 00030 00034 #if defined(WIN32) 00035 m_fd_setsize = FD_SETSIZE; 00036 00037 #else // POSIX 00038 struct rlimit rlim; 00039 rlim.rlim_max = 0; 00040 00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) { 00042 m_fd_setsize = rlim.rlim_cur; 00043 } 00044 #endif 00045 00048 #if defined (WIN32) 00049 WSADATA data; 00050 WSAStartup (MAKEWORD (2, 2), &data); 00051 #endif 00052 }
Reactor::~Reactor | ( | ) |
Destructor.
Definition at line 55 of file Reactor.cpp.
References deactivate(), m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, and trace_with_mask.
00056 { 00057 trace_with_mask("Reactor::~Reactor",REACTTRACE); 00058 00059 m_readSet.clear (); 00060 m_writeSet.clear (); 00061 m_exceptSet.clear (); 00062 deactivate (); 00063 }
ASSA::Reactor::Reactor | ( | const Reactor & | ) | [private] |
void Reactor::adjust_maxfdp1 | ( | handler_t | fd_, | |
handler_t | rmax_, | |||
handler_t | wmax_, | |||
handler_t | emax_ | |||
) | [private] |
Adjust maxfdp1 in a portable way (win32 ignores masfd alltogether).
Win32 implementation of select() ignores this value altogether.
Definition at line 718 of file Reactor.cpp.
References DL, m_maxfd_plus1, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by removeHandler(), and removeIOHandler().
00722 { 00723 #if !defined (WIN32) /* POSIX */ 00724 00725 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE); 00726 00727 if (m_maxfd_plus1 == fd_ + 1) { 00728 m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_)); 00729 00730 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00731 } 00732 #endif 00733 }
Calculate closest timeout.
If TimerQueue is not empty, then return smallest of maxtimeout and first in the queue. Otherwise, return maxtimeout.
maxwait_ | (in) how long we are expected to wait for event(s). | |
howlong_ | (out) how long we are going to wait. |
Definition at line 438 of file Reactor.cpp.
References DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::isEmpty(), m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::top(), trace_with_mask, and ASSA::TimeVal::zeroTime().
Referenced by waitForEvents().
00439 { 00440 trace_with_mask("Reactor::calculateTimeout",REACTTRACE); 00441 00442 TimeVal now; 00443 TimeVal tv; 00444 00445 if (m_tqueue.isEmpty () ) { 00446 howlong_ = maxwait_; 00447 goto done; 00448 } 00449 now = TimeVal::gettimeofday (); 00450 tv = m_tqueue.top (); 00451 00452 if (tv < now) { 00453 /*--- 00454 It took too long to get here (fraction of a millisecond), 00455 and top timer had already expired. In this case, 00456 perform non-blocking select in order to drain the timer queue. 00457 ---*/ 00458 *howlong_ = 0; 00459 } 00460 else { 00461 DL((REACT,"--------- Timer Queue ----------\n")); 00462 m_tqueue.dump(); 00463 DL((REACT,"--------------------------------\n")); 00464 00465 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) { 00466 *howlong_ = tv - now; 00467 } 00468 else { 00469 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now; 00470 } 00471 } 00472 00473 done: 00474 if (howlong_ != NULL) { 00475 DL((REACT,"delay (%f)\n", double (*howlong_) )); 00476 } 00477 else { 00478 DL((REACT,"delay (forever)\n")); 00479 } 00480 }
bool Reactor::checkFDs | ( | void | ) | [private] |
Check mask for bad file descriptors.
Definition at line 334 of file Reactor.cpp.
References ASSA::FdSet::clear(), DL, m_fd_setsize, m_readSet, ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), ASSA::FdSet::setFd(), and trace_with_mask.
Referenced by handleError().
00335 { 00336 trace_with_mask("Reactor::checkFDs",REACTTRACE); 00337 00338 bool num_removed = false; 00339 FdSet mask; 00340 timeval poll = { 0, 0 }; 00341 00342 for (handler_t fd = 0; fd < m_fd_setsize; fd++) { 00343 if ( m_readSet[fd] != NULL ) { 00344 mask.setFd (fd); 00345 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) { 00346 removeIOHandler (fd); 00347 num_removed = true; 00348 DL((REACT,"Detected BAD FD: %d\n", fd )); 00349 } 00350 mask.clear (fd); 00351 } 00352 } 00353 return (num_removed); 00354 }
void ASSA::Reactor::deactivate | ( | void | ) | [inline] |
Deactivate Reactor.
This function sets internal flag which notifies Reactor's internal event handling loop to abort its activity. It is mostly used when a *slow* system call is interrupted by the signal handler. The system call will be restarted by OS after control returns from the signal handler. Signal handler (GenServer::handle_signal()) should call this method to delay Reactor's deactivation.
Definition at line 237 of file Reactor.h.
References m_active.
Referenced by ASSA::GenServer::handle_signal(), ASSA::GenServer::stop_service(), and ~Reactor().
00237 { m_active = false; }
bool Reactor::dispatch | ( | int | minimum_ | ) | [private] |
Notify all EventHandlers registered on respecful events occured.
minimum_ | number of file descriptors ready. |
Definition at line 643 of file Reactor.cpp.
References ASSA::ASSAERR, dispatchHandler(), DL, ASSA::MaskSet::dump(), EL, ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), ASSA::EventHandler::handle_except(), ASSA::EventHandler::handle_read(), ASSA::EventHandler::handle_write(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00644 { 00645 trace_with_mask("Reactor::dispatch", REACTTRACE); 00646 00647 m_tqueue.expire (TimeVal::gettimeofday ()); 00648 00649 if ( ready_ < 0 ) 00650 { 00651 #if !defined (WIN32) 00652 EL((ASSAERR,"::select(3) error\n")); 00653 #endif 00654 return (false); 00655 } 00656 if ( ready_ == 0 ) { 00657 return (true); 00658 } 00659 00660 DL((REACT,"Dispatching %d FDs.\n",ready_)); 00661 DL((REACT,"m_readySet:\n")); 00662 m_readySet.dump (); 00663 00664 /*--- Writes first ---*/ 00665 dispatchHandler (m_readySet.m_wset, 00666 m_writeSet, 00667 &EventHandler::handle_write); 00668 00669 /*--- Exceptions next ---*/ 00670 dispatchHandler (m_readySet.m_eset, 00671 m_exceptSet, 00672 &EventHandler::handle_except); 00673 00674 /*--- Finally, the Reads ---*/ 00675 dispatchHandler (m_readySet.m_rset, 00676 m_readSet, 00677 &EventHandler::handle_read); 00678 00679 return (true); 00680 }
void Reactor::dispatchHandler | ( | FdSet & | mask_, | |
Fd2Eh_Map_Type & | fdSet_, | |||
EH_IO_Callback | callback_ | |||
) | [private] |
Call handler's callback and, if callback returns negative value, remove it from the Reactor.
When you have several high data-rate connections sending data at the same time, the one that had connected first would get lower FD number and would get data transfer preference over everybody else who has connected later on.
WIN32 HACK: Without having restarted scan from the beginning, this causes crash due to the fact that firing a callback of EventHandler might have invalidated the iterator (happens with Connector's in a sync mode).
Definition at line 585 of file Reactor.cpp.
References ASSA::FdSet::clear(), DL, ASSA::EventHandler::get_id(), ASSA::FdSet::isSet(), ASSA::REACT, ASSA::REACTTRACE, removeIOHandler(), and trace_with_mask.
Referenced by dispatch().
00586 { 00587 trace_with_mask("Reactor::dispatchHandler",REACTTRACE); 00588 00589 int ret = 0; 00590 handler_t fd; 00591 EventHandler* ehp = NULL; 00592 std::string eh_id; 00593 00594 Fd2Eh_Map_Iter iter = fdSet_.begin (); 00595 00596 while (iter != fdSet_.end ()) 00597 { 00598 fd = (*iter).first; 00599 ehp = (*iter).second; 00600 00601 if (mask_.isSet (fd) && ehp != NULL) 00602 { 00603 eh_id = ehp->get_id (); 00604 DL((REACT,"Data detected from \"%s\"(fd=%d)\n", 00605 eh_id.c_str (), fd)); 00606 00607 ret = (ehp->*callback_) (fd); /* Fire up a callback */ 00608 00609 if (ret == -1) { 00610 removeIOHandler (fd); 00611 } 00612 else if (ret > 0) { 00613 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n", 00614 ret, fd, eh_id.c_str ())); 00615 //return; <-- would starve other connections 00616 } 00617 else { 00618 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n", 00619 eh_id.c_str (), fd)); 00620 mask_.clear (fd); 00621 } 00628 iter = fdSet_.begin (); 00629 } 00630 else { 00631 iter++; 00632 } 00633 } 00634 }
bool Reactor::handleError | ( | void | ) | [private] |
Handle error in select(2) loop appropriately.
If commanded to stop, do so
Definition at line 358 of file Reactor.cpp.
References ASSA::ASSAERR, checkFDs(), DL, EL, m_active, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00359 { 00360 trace_with_mask("Reactor::handleError",REACTTRACE); 00361 00364 if ( !m_active ) { 00365 DL((REACT,"Received cmd to stop Reactor\n")); 00366 return (false); 00367 } 00368 00369 /*--- 00370 TODO: If select(2) returns before time expires, with 00371 a descriptor ready or with EINTR, timeval is not 00372 going to be updated with number of seconds remaining. 00373 This is true for all systems except Linux, which will 00374 do so. Therefore, to restart correctly in case of 00375 EINTR, we ought to take time measurement before and 00376 after select, and try to select() for remaining time. 00377 00378 For now, we restart with the initial timing value. 00379 ---*/ 00380 /*--- 00381 BSD kernel never restarts select(2). SVR4 will restart if 00382 the SA_RESTART flag is specified when the signal handler 00383 for the signal delivered is installed. This means taht for 00384 portability, we must handle signal interrupts. 00385 ---*/ 00386 00387 if ( errno == EINTR ) { 00388 EL((REACT,"EINTR: interrupted select(2)\n")); 00389 /* 00390 If I was sitting in select(2) and received SIGTERM, 00391 the signal handler would have set m_active to 'false', 00392 and this function would have returned 'false' as above. 00393 For any other non-critical signals (USR1,...), 00394 we retry select. 00395 */ 00396 return (true); 00397 } 00398 /* 00399 EBADF - bad file number. One of the file descriptors does 00400 not reference an open file to open(), close(), ioctl(). 00401 This can happen if user closed fd and forgot to remove 00402 handler from Reactor. 00403 */ 00404 if ( errno == EBADF ) { 00405 DL((REACT,"EBADF: bad file descriptor\n")); 00406 return (checkFDs ()); 00407 } 00408 /* 00409 Any other error from select 00410 */ 00411 #if defined (WIN32) 00412 DL ((REACT,"select(3) error = %d\n", WSAGetLastError())); 00413 #else 00414 EL((ASSAERR,"select(3) error\n")); 00415 #endif 00416 return (false); 00417 }
int Reactor::isAnyReady | ( | void | ) | [private] |
Return number of file descriptors ready accross all sets.
Definition at line 421 of file Reactor.cpp.
References DL, ASSA::MaskSet::dump(), ASSA::MaskSet::m_eset, m_readySet, ASSA::MaskSet::m_rset, ASSA::MaskSet::m_wset, ASSA::FdSet::numSet(), ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by waitForEvents().
00422 { 00423 trace_with_mask("Reactor::isAnyReady",REACTTRACE); 00424 00425 int n = m_readySet.m_rset.numSet () + 00426 m_readySet.m_wset.numSet () + 00427 m_readySet.m_eset.numSet (); 00428 00429 if ( n > 0 ) { 00430 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n)); 00431 m_readySet.dump (); 00432 } 00433 return (n); 00434 }
bool Reactor::registerIOHandler | ( | EventHandler * | eh_, | |
handler_t | fd_, | |||
EventType | et_ = RWE_EVENTS | |||
) |
Register I/O Event handler with Reactor.
Reactor will dispatch appropriate callback when event of EventType is received.
eh_ | Pointer to the EventHandler | |
fd_ | File descriptor | |
et_ | Event Type |
Definition at line 93 of file Reactor.cpp.
References ASSA::ASSAERR, Assure_return, DL, ASSA::MaskSet::dump(), ASSA::ends(), ASSA::EventHandler::get_id(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isSignalEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_maxfd_plus1, m_readSet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::FdSet::setFd(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::RemoteLogger::log_open(), and ASSA::Acceptor< SERVICE_HANDLER, PEER_ACCEPTOR >::open().
00094 { 00095 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE); 00096 00097 std::ostringstream msg; 00098 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_)); 00099 00100 if (isReadEvent (et_)) 00101 { 00102 if (!m_waitSet.m_rset.setFd (fd_)) 00103 { 00104 DL((ASSAERR,"readset: fd %d out of range\n", fd_)); 00105 return (false); 00106 } 00107 m_readSet[fd_] = eh_; 00108 msg << "READ_EVENT"; 00109 } 00110 00111 if (isWriteEvent (et_)) 00112 { 00113 if (!m_waitSet.m_wset.setFd (fd_)) 00114 { 00115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_)); 00116 return (false); 00117 } 00118 m_writeSet[fd_] = eh_; 00119 msg << " WRITE_EVENT"; 00120 } 00121 00122 if (isExceptEvent (et_)) 00123 { 00124 if (!m_waitSet.m_eset.setFd (fd_)) 00125 { 00126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_)); 00127 return (false); 00128 } 00129 m_exceptSet[fd_] = eh_; 00130 msg << " EXCEPT_EVENT"; 00131 } 00132 msg << std::ends; 00133 00134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n", 00135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () )); 00136 00137 #if !defined (WIN32) 00138 if (m_maxfd_plus1 < fd_+1) { 00139 m_maxfd_plus1 = fd_+1; 00140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1)); 00141 } 00142 #endif 00143 00144 DL((REACT,"Modified waitSet:\n")); 00145 m_waitSet.dump (); 00146 00147 return (true); 00148 }
TimerId Reactor::registerTimerHandler | ( | EventHandler * | eh_, | |
const TimeVal & | tv_, | |||
const std::string & | name_ = "<unknown>" | |||
) |
Register Timer Event handler with Reactor.
Reactor will dispatch appropriate callback when event of EventType is received.
eh_ | Pointer to the EventHandler | |
tv_ | Timeout value | |
name_ | Name of the timer |
Definition at line 67 of file Reactor.cpp.
References Assure_return, DL, ASSA::TimerQueue::dump(), ASSA::TimeVal::fmtString(), ASSA::TimeVal::gettimeofday(), ASSA::TimerQueue::insert(), m_tqueue, ASSA::TimeVal::msec(), ASSA::REACT, ASSA::REACTTRACE, ASSA::TimeVal::sec(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doAsync(), and ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().
00070 { 00071 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE); 00072 Assure_return (eh_); 00073 00074 TimeVal now (TimeVal::gettimeofday()); 00075 TimeVal t (now + timeout_); 00076 00077 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n", 00078 timeout_.sec(),timeout_.msec())); 00079 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() )); 00080 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() )); 00081 00082 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_); 00083 00084 DL((REACT,"---Modified Timer Queue----\n")); 00085 m_tqueue.dump(); 00086 DL((REACT,"---------------------------\n")); 00087 00088 return (tid); 00089 }
bool Reactor::removeHandler | ( | EventHandler * | eh_, | |
EventType | et_ = ALL_EVENTS | |||
) |
Remove Event handler from reactor for either all I/O events or timeout event or both.
If et_ is TIMEOUT_EVENT, all timers associated with Event Handler eh_ will be removed.
eh_ | Pointer to the EventHandler | |
et_ | Event Type to remove. Default will remove Event Handler for all events. |
Definition at line 173 of file Reactor.cpp.
References adjust_maxfdp1(), ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::isExceptEvent(), ASSA::isReadEvent(), ASSA::isTimeoutEvent(), ASSA::isWriteEvent(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_timeout(), ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write(), ASSA::RemoteLogger::log_close(), and stopReactor().
00174 { 00175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE); 00176 00177 bool ret = false; 00178 handler_t fd; 00179 handler_t rfdmax; 00180 handler_t wfdmax; 00181 handler_t efdmax; 00182 Fd2Eh_Map_Iter iter; 00183 00184 rfdmax = wfdmax = efdmax = 0; 00185 00186 if (eh_ == NULL) { 00187 return false; 00188 } 00189 00190 if (isTimeoutEvent (event_)) { 00191 ret = m_tqueue.remove (eh_); 00192 ret = true; 00193 } 00194 00195 if (isReadEvent (event_)) { 00196 iter = m_readSet.begin (); 00197 while (iter != m_readSet.end ()) { 00198 if ((*iter).second == eh_) { 00199 fd = (*iter).first; 00200 m_readSet.erase (iter); 00201 m_waitSet.m_rset.clear (fd); 00202 ret = true; 00203 break; 00204 } 00205 rfdmax = fd; 00206 iter++; 00207 } 00208 } 00209 00210 if (isWriteEvent (event_)) { 00211 iter = m_writeSet.begin (); 00212 while (iter != m_writeSet.end ()) { 00213 if ((*iter).second == eh_) { 00214 fd = (*iter).first; 00215 m_writeSet.erase (iter); 00216 m_waitSet.m_wset.clear (fd); 00217 ret = true; 00218 break; 00219 } 00220 wfdmax = fd; 00221 iter++; 00222 } 00223 } 00224 00225 if (isExceptEvent (event_)) { 00226 iter = m_exceptSet.begin (); 00227 while (iter != m_exceptSet.end ()) { 00228 if ((*iter).second == eh_) { 00229 fd = (*iter).first; 00230 m_exceptSet.erase (iter); 00231 m_waitSet.m_eset.clear (fd); 00232 ret = true; 00233 break; 00234 } 00235 efdmax = fd; 00236 iter++; 00237 } 00238 } 00239 00240 if (ret == true) { 00241 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_)); 00242 eh_->handle_close (fd); 00243 } 00244 00245 adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax); 00246 00247 DL((REACT,"Modifies waitSet:\n")); 00248 m_waitSet.dump (); 00249 00250 return (ret); 00251 }
bool Reactor::removeIOHandler | ( | handler_t | fd_ | ) |
Remove IO Event handler from reactor.
This will remove handler from receiving all I/O events.
fd_ | File descriptor |
We clear m_readySet mask here as well, because if we don't, it will be erroneously used by isAnyReady() before select().
Definition at line 255 of file Reactor.cpp.
References adjust_maxfdp1(), Assure_return, ASSA::FdSet::clear(), DL, ASSA::MaskSet::dump(), ASSA::EventHandler::get_id(), ASSA::EventHandler::handle_close(), ASSA::is_valid_handler(), ASSA::MaskSet::m_eset, m_exceptSet, m_readSet, m_readySet, ASSA::MaskSet::m_rset, m_waitSet, m_writeSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, and trace_with_mask.
Referenced by checkFDs(), and dispatchHandler().
00256 { 00257 trace_with_mask("Reactor::removeIOHandler",REACTTRACE); 00258 00259 bool ret = false; 00260 EventHandler* ehp = NULL; 00261 Fd2Eh_Map_Iter iter; 00262 00263 handler_t rfdmax; 00264 handler_t wfdmax; 00265 handler_t efdmax; 00266 00267 rfdmax = wfdmax = efdmax = 0; 00268 00269 Assure_return (ASSA::is_valid_handler (fd_)); 00270 00271 DL((REACT,"Removing handler for fd=%d\n",fd_)); 00272 00277 if ((iter = m_readSet.find (fd_)) != m_readSet.end ()) 00278 { 00279 ehp = (*iter).second; 00280 m_readSet.erase (iter); 00281 m_waitSet.m_rset.clear (fd_); 00282 m_readySet.m_rset.clear (fd_); 00283 if (m_readSet.size () > 0) { 00284 iter = m_readSet.end (); 00285 iter--; 00286 rfdmax = (*iter).first; 00287 } 00288 ret = true; 00289 } 00290 00291 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ()) 00292 { 00293 ehp = (*iter).second; 00294 m_writeSet.erase (iter); 00295 m_waitSet.m_wset.clear (fd_); 00296 m_readySet.m_wset.clear (fd_); 00297 if (m_writeSet.size () > 0) { 00298 iter = m_writeSet.end (); 00299 iter--; 00300 wfdmax = (*iter).first; 00301 } 00302 ret = true; 00303 } 00304 00305 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ()) 00306 { 00307 ehp = (*iter).second; 00308 m_exceptSet.erase (iter); 00309 m_waitSet.m_eset.clear (fd_); 00310 m_readySet.m_eset.clear (fd_); 00311 if (m_exceptSet.size () > 0) { 00312 iter = m_exceptSet.end (); 00313 iter--; 00314 efdmax = (*iter).first; 00315 } 00316 ret = true; 00317 } 00318 00319 if (ret == true && ehp != NULL) { 00320 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp)); 00321 ehp->handle_close (fd_); 00322 } 00323 00324 adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax); 00325 00326 DL((REACT,"Modifies waitSet:\n")); 00327 m_waitSet.dump (); 00328 00329 return (ret); 00330 }
bool Reactor::removeTimerHandler | ( | TimerId | id_ | ) |
Remove Timer event from the queue.
This removes particular event.
id_ | Timer Id returned by registerTimer. |
Definition at line 152 of file Reactor.cpp.
References ASSA::ASSAERR, DL, ASSA::TimerQueue::dump(), EL, m_tqueue, ASSA::REACT, ASSA::REACTTRACE, ASSA::TimerQueue::remove(), and trace_with_mask.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::handle_write().
00153 { 00154 trace_with_mask("Reactor::removeTimer",REACTTRACE); 00155 bool ret; 00156 00157 if ((ret = m_tqueue.remove (tid_))) { 00158 DL((REACT,"---Modified Timer Queue----\n")); 00159 m_tqueue.dump(); 00160 DL((REACT,"---------------------------\n")); 00161 } 00162 else { 00163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ )); 00164 } 00165 return (ret); 00166 }
void Reactor::stopReactor | ( | void | ) |
Stop Reactor's activity.
This effectively removes all handlers from under Reactor's supervision. As of now, there is no way to re-activate the Reactor. This method is typically called from method other then EventHandler::signal_handler(). EventHandler::handle_read () is a good candidate. Calling it from EventHandler::handle_close () will most likely cause an infinite loop of recursive calls.
Definition at line 684 of file Reactor.cpp.
References m_active, m_exceptSet, m_readSet, m_writeSet, ASSA::REACTTRACE, removeHandler(), and trace_with_mask.
00685 { 00686 trace_with_mask("Reactor::stopReactor", REACTTRACE); 00687 00688 m_active = false; 00689 00690 Fd2Eh_Map_Iter iter; 00691 EventHandler* ehp; 00692 00693 while (m_readSet.size () > 0) { 00694 iter = m_readSet.begin (); 00695 ehp = (*iter).second; 00696 removeHandler (ehp); 00697 } 00698 00699 while (m_writeSet.size () > 0) { 00700 iter = m_writeSet.begin (); 00701 ehp = (*iter).second; 00702 removeHandler (ehp); 00703 } 00704 00705 while (m_exceptSet.size () > 0) { 00706 iter = m_exceptSet.begin (); 00707 ehp = (*iter).second; 00708 removeHandler (ehp); 00709 } 00710 }
void Reactor::waitForEvents | ( | TimeVal * | tv_ | ) |
Wait for events for time specified.
Passing NULL replicates behavior of waitForEvents(void). Passing tv_ {0, 0} will cause non-blocking polling for all events. This method blocks up to tv_ time interval processing event. If an event occurs, it will process event(s) and return. tv_ time is adjusted by substracting time spent in event processing.
tv_ | [RW] is time to wait for. |
Definition at line 512 of file Reactor.cpp.
References calculateTimeout(), dispatch(), DL, ASSA::MaskSet::dump(), ASSA::TimerQueue::expire(), ASSA::TimeVal::gettimeofday(), handleError(), isAnyReady(), m_active, ASSA::MaskSet::m_eset, m_maxfd_plus1, m_readySet, ASSA::MaskSet::m_rset, m_tqueue, m_waitSet, ASSA::MaskSet::m_wset, ASSA::REACT, ASSA::REACTTRACE, ASSA::MaskSet::reset(), ASSA::MaskSet::sync(), and trace_with_mask.
00513 { 00514 trace_with_mask("Reactor::waitForEvents",REACTTRACE); 00515 00516 TimerCountdown traceTime (tv_); 00517 DL((REACT,"======================================\n")); 00518 00519 /*--- Expire all stale Timers ---*/ 00520 m_tqueue.expire (TimeVal::gettimeofday ()); 00521 00522 /* Test to see if Reactor has been deactivated as a result 00523 * of processing done by any TimerHandlers. 00524 */ 00525 if (!m_active) { 00526 return; 00527 } 00528 00529 int nReady; 00530 TimeVal delay; 00531 TimeVal* dlp = &delay; 00532 00533 /*--- 00534 In case if not all data have been processed by the EventHandler, 00535 and EventHandler stated so in its callback's return value 00536 to dispatcher (), it will be called again. This way 00537 underlying file/socket stream can efficiently utilize its 00538 buffering mechaninsm. 00539 ---*/ 00540 if ((nReady = isAnyReady ())) { 00541 DL((REACT,"isAnyReady returned: %d\n",nReady)); 00542 dispatch (nReady); 00543 return; 00544 } 00545 00546 DL((REACT,"=== m_waitSet ===\n")); 00547 m_waitSet.dump (); 00548 00549 do { 00550 m_readySet.reset (); 00551 DL ((REACT,"m_readySet after reset():\n")); 00552 m_readySet.dump (); 00553 00554 m_readySet = m_waitSet; 00555 DL ((REACT,"m_readySet after assign:\n")); 00556 m_readySet.dump (); 00557 00558 calculateTimeout (dlp, tv_); 00559 00560 nReady = ::select (m_maxfd_plus1, 00561 &m_readySet.m_rset, 00562 &m_readySet.m_wset, 00563 &m_readySet.m_eset, 00564 dlp); 00565 DL((REACT,"::select() returned: %d\n",nReady)); 00566 00567 m_readySet.sync (); 00568 DL ((REACT,"m_readySet after select:\n")); 00569 m_readySet.dump (); 00570 00571 } 00572 while (nReady < 0 && handleError ()); 00573 00574 dispatch (nReady); 00575 }
void Reactor::waitForEvents | ( | void | ) |
Main waiting loop that blocks indefinitely processing events.
Definition at line 487 of file Reactor.cpp.
References m_active.
Referenced by ASSA::Connector< SERVICE_HANDLER, PEER_CONNECTOR >::doSync().
00488 { 00489 while ( m_active ) { 00490 waitForEvents ((TimeVal*) NULL); 00491 } 00492 }
bool ASSA::Reactor::m_active [private] |
Flag that indicates whether Reactor is active or had been stopped.
Definition at line 212 of file Reactor.h.
Referenced by deactivate(), handleError(), stopReactor(), and waitForEvents().
Fd2Eh_Map_Type ASSA::Reactor::m_exceptSet [private] |
Event handlers awaiting on EXCEPT_EVENT.
Definition at line 221 of file Reactor.h.
Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().
int ASSA::Reactor::m_fd_setsize [private] |
Max number of open files per process.
This is the soft limit enforced by the kernel. It can be obtained/manipulated from the shell with ulimit/limit utilities, but may not exceed the hard limit.
Definition at line 203 of file Reactor.h.
Referenced by checkFDs(), and Reactor().
handler_t ASSA::Reactor::m_maxfd_plus1 [private] |
Max file descriptor number (in all sets) plus 1.
This value is ignored by WIN32 implementation of select()
Definition at line 209 of file Reactor.h.
Referenced by adjust_maxfdp1(), registerIOHandler(), and waitForEvents().
Fd2Eh_Map_Type ASSA::Reactor::m_readSet [private] |
Event handlers awaiting on READ_EVENT.
Definition at line 215 of file Reactor.h.
Referenced by checkFDs(), dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().
MaskSet ASSA::Reactor::m_readySet [private] |
Handlers that are ready for processing.
Definition at line 227 of file Reactor.h.
Referenced by dispatch(), isAnyReady(), removeIOHandler(), and waitForEvents().
TimerQueue ASSA::Reactor::m_tqueue [private] |
The queue of Timers.
Definition at line 230 of file Reactor.h.
Referenced by calculateTimeout(), dispatch(), registerTimerHandler(), removeHandler(), removeTimerHandler(), and waitForEvents().
MaskSet ASSA::Reactor::m_waitSet [private] |
Handlers to wait for event on.
Definition at line 224 of file Reactor.h.
Referenced by registerIOHandler(), removeHandler(), removeIOHandler(), and waitForEvents().
Fd2Eh_Map_Type ASSA::Reactor::m_writeSet [private] |
Event handlers awaiting on WRITE_EVENT.
Definition at line 218 of file Reactor.h.
Referenced by dispatch(), registerIOHandler(), removeHandler(), removeIOHandler(), stopReactor(), and ~Reactor().