00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014 #include <iostream>
00015 #include <sstream>
00016 #include <string>
00017
00018 #include "assa/Reactor.h"
00019 #include "assa/Logger.h"
00020
00021 using namespace ASSA;
00022
00023 Reactor::
00024 Reactor () :
00025 m_fd_setsize (1024),
00026 m_maxfd_plus1 (0),
00027 m_active (true)
00028 {
00029 trace_with_mask("Reactor::Reactor",REACTTRACE);
00030
00034 #if defined(WIN32)
00035 m_fd_setsize = FD_SETSIZE;
00036
00037 #else // POSIX
00038 struct rlimit rlim;
00039 rlim.rlim_max = 0;
00040
00041 if ( getrlimit (RLIMIT_NOFILE, &rlim) == 0 ) {
00042 m_fd_setsize = rlim.rlim_cur;
00043 }
00044 #endif
00045
00048 #if defined (WIN32)
00049 WSADATA data;
00050 WSAStartup (MAKEWORD (2, 2), &data);
00051 #endif
00052 }
00053
00054 Reactor::
00055 ~Reactor()
00056 {
00057 trace_with_mask("Reactor::~Reactor",REACTTRACE);
00058
00059 m_readSet.clear ();
00060 m_writeSet.clear ();
00061 m_exceptSet.clear ();
00062 deactivate ();
00063 }
00064
00065 TimerId
00066 Reactor::
00067 registerTimerHandler (EventHandler* eh_,
00068 const TimeVal& timeout_,
00069 const std::string& name_)
00070 {
00071 trace_with_mask( "Reactor::registerTimerHandler",REACTTRACE);
00072 Assure_return (eh_);
00073
00074 TimeVal now (TimeVal::gettimeofday());
00075 TimeVal t (now + timeout_);
00076
00077 DL((REACT,"TIMEOUT_EVENT......: (%d,%d)\n",
00078 timeout_.sec(),timeout_.msec()));
00079 DL((REACT,"Time now...........: %s\n", now.fmtString().c_str() ));
00080 DL((REACT,"Scheduled to expire: %s\n", t.fmtString().c_str() ));
00081
00082 TimerId tid = m_tqueue.insert (eh_, t, timeout_, name_);
00083
00084 DL((REACT,"---Modified Timer Queue----\n"));
00085 m_tqueue.dump();
00086 DL((REACT,"---------------------------\n"));
00087
00088 return (tid);
00089 }
00090
00091 bool
00092 Reactor::
00093 registerIOHandler (EventHandler* eh_, handler_t fd_, EventType et_)
00094 {
00095 trace_with_mask("Reactor::registerHandler(I/O)",REACTTRACE);
00096
00097 std::ostringstream msg;
00098 Assure_return (eh_ && !isSignalEvent (et_) && !isTimeoutEvent (et_));
00099
00100 if (isReadEvent (et_))
00101 {
00102 if (!m_waitSet.m_rset.setFd (fd_))
00103 {
00104 DL((ASSAERR,"readset: fd %d out of range\n", fd_));
00105 return (false);
00106 }
00107 m_readSet[fd_] = eh_;
00108 msg << "READ_EVENT";
00109 }
00110
00111 if (isWriteEvent (et_))
00112 {
00113 if (!m_waitSet.m_wset.setFd (fd_))
00114 {
00115 DL((ASSAERR,"writeset: fd %d out of range\n", fd_));
00116 return (false);
00117 }
00118 m_writeSet[fd_] = eh_;
00119 msg << " WRITE_EVENT";
00120 }
00121
00122 if (isExceptEvent (et_))
00123 {
00124 if (!m_waitSet.m_eset.setFd (fd_))
00125 {
00126 DL((ASSAERR,"exceptset: fd %d out of range\n", fd_));
00127 return (false);
00128 }
00129 m_exceptSet[fd_] = eh_;
00130 msg << " EXCEPT_EVENT";
00131 }
00132 msg << std::ends;
00133
00134 DL((REACT,"Registered EvtH(%s) fd=%d (0x%x) for event(s) %s\n",
00135 eh_->get_id ().c_str (), fd_, (u_long)eh_, msg.str ().c_str () ));
00136
00137 #if !defined (WIN32)
00138 if (m_maxfd_plus1 < fd_+1) {
00139 m_maxfd_plus1 = fd_+1;
00140 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00141 }
00142 #endif
00143
00144 DL((REACT,"Modified waitSet:\n"));
00145 m_waitSet.dump ();
00146
00147 return (true);
00148 }
00149
00150 bool
00151 Reactor::
00152 removeTimerHandler (TimerId tid_)
00153 {
00154 trace_with_mask("Reactor::removeTimer",REACTTRACE);
00155 bool ret;
00156
00157 if ((ret = m_tqueue.remove (tid_))) {
00158 DL((REACT,"---Modified Timer Queue----\n"));
00159 m_tqueue.dump();
00160 DL((REACT,"---------------------------\n"));
00161 }
00162 else {
00163 EL((ASSAERR,"Timer tid 0x%x wasn't found!\n", (u_long)tid_ ));
00164 }
00165 return (ret);
00166 }
00167
00171 bool
00172 Reactor::
00173 removeHandler (EventHandler* eh_, EventType event_)
00174 {
00175 trace_with_mask("Reactor::removeHandler(eh_,et_)",REACTTRACE);
00176
00177 bool ret = false;
00178 handler_t fd;
00179 handler_t rfdmax;
00180 handler_t wfdmax;
00181 handler_t efdmax;
00182 Fd2Eh_Map_Iter iter;
00183
00184 rfdmax = wfdmax = efdmax = 0;
00185
00186 if (eh_ == NULL) {
00187 return false;
00188 }
00189
00190 if (isTimeoutEvent (event_)) {
00191 ret = m_tqueue.remove (eh_);
00192 ret = true;
00193 }
00194
00195 if (isReadEvent (event_)) {
00196 iter = m_readSet.begin ();
00197 while (iter != m_readSet.end ()) {
00198 if ((*iter).second == eh_) {
00199 fd = (*iter).first;
00200 m_readSet.erase (iter);
00201 m_waitSet.m_rset.clear (fd);
00202 ret = true;
00203 break;
00204 }
00205 rfdmax = fd;
00206 iter++;
00207 }
00208 }
00209
00210 if (isWriteEvent (event_)) {
00211 iter = m_writeSet.begin ();
00212 while (iter != m_writeSet.end ()) {
00213 if ((*iter).second == eh_) {
00214 fd = (*iter).first;
00215 m_writeSet.erase (iter);
00216 m_waitSet.m_wset.clear (fd);
00217 ret = true;
00218 break;
00219 }
00220 wfdmax = fd;
00221 iter++;
00222 }
00223 }
00224
00225 if (isExceptEvent (event_)) {
00226 iter = m_exceptSet.begin ();
00227 while (iter != m_exceptSet.end ()) {
00228 if ((*iter).second == eh_) {
00229 fd = (*iter).first;
00230 m_exceptSet.erase (iter);
00231 m_waitSet.m_eset.clear (fd);
00232 ret = true;
00233 break;
00234 }
00235 efdmax = fd;
00236 iter++;
00237 }
00238 }
00239
00240 if (ret == true) {
00241 DL((REACT,"Found EvtH \"%s\"(%p)\n", eh_->get_id ().c_str (), eh_));
00242 eh_->handle_close (fd);
00243 }
00244
00245 adjust_maxfdp1 (fd, rfdmax, wfdmax, efdmax);
00246
00247 DL((REACT,"Modifies waitSet:\n"));
00248 m_waitSet.dump ();
00249
00250 return (ret);
00251 }
00252
00253 bool
00254 Reactor::
00255 removeIOHandler (handler_t fd_)
00256 {
00257 trace_with_mask("Reactor::removeIOHandler",REACTTRACE);
00258
00259 bool ret = false;
00260 EventHandler* ehp = NULL;
00261 Fd2Eh_Map_Iter iter;
00262
00263 handler_t rfdmax;
00264 handler_t wfdmax;
00265 handler_t efdmax;
00266
00267 rfdmax = wfdmax = efdmax = 0;
00268
00269 Assure_return (ASSA::is_valid_handler (fd_));
00270
00271 DL((REACT,"Removing handler for fd=%d\n",fd_));
00272
00277 if ((iter = m_readSet.find (fd_)) != m_readSet.end ())
00278 {
00279 ehp = (*iter).second;
00280 m_readSet.erase (iter);
00281 m_waitSet.m_rset.clear (fd_);
00282 m_readySet.m_rset.clear (fd_);
00283 if (m_readSet.size () > 0) {
00284 iter = m_readSet.end ();
00285 iter--;
00286 rfdmax = (*iter).first;
00287 }
00288 ret = true;
00289 }
00290
00291 if ((iter = m_writeSet.find (fd_)) != m_writeSet.end ())
00292 {
00293 ehp = (*iter).second;
00294 m_writeSet.erase (iter);
00295 m_waitSet.m_wset.clear (fd_);
00296 m_readySet.m_wset.clear (fd_);
00297 if (m_writeSet.size () > 0) {
00298 iter = m_writeSet.end ();
00299 iter--;
00300 wfdmax = (*iter).first;
00301 }
00302 ret = true;
00303 }
00304
00305 if ((iter = m_exceptSet.find (fd_)) != m_exceptSet.end ())
00306 {
00307 ehp = (*iter).second;
00308 m_exceptSet.erase (iter);
00309 m_waitSet.m_eset.clear (fd_);
00310 m_readySet.m_eset.clear (fd_);
00311 if (m_exceptSet.size () > 0) {
00312 iter = m_exceptSet.end ();
00313 iter--;
00314 efdmax = (*iter).first;
00315 }
00316 ret = true;
00317 }
00318
00319 if (ret == true && ehp != NULL) {
00320 DL((REACT,"Removed EvtH \"%s\"(%p)\n", ehp->get_id ().c_str (), ehp));
00321 ehp->handle_close (fd_);
00322 }
00323
00324 adjust_maxfdp1 (fd_, rfdmax, wfdmax, efdmax);
00325
00326 DL((REACT,"Modifies waitSet:\n"));
00327 m_waitSet.dump ();
00328
00329 return (ret);
00330 }
00331
00332 bool
00333 Reactor::
00334 checkFDs (void)
00335 {
00336 trace_with_mask("Reactor::checkFDs",REACTTRACE);
00337
00338 bool num_removed = false;
00339 FdSet mask;
00340 timeval poll = { 0, 0 };
00341
00342 for (handler_t fd = 0; fd < m_fd_setsize; fd++) {
00343 if ( m_readSet[fd] != NULL ) {
00344 mask.setFd (fd);
00345 if ( ::select (fd+1, &mask, NULL, NULL, &poll) < 0 ) {
00346 removeIOHandler (fd);
00347 num_removed = true;
00348 DL((REACT,"Detected BAD FD: %d\n", fd ));
00349 }
00350 mask.clear (fd);
00351 }
00352 }
00353 return (num_removed);
00354 }
00355
00356 bool
00357 Reactor::
00358 handleError (void)
00359 {
00360 trace_with_mask("Reactor::handleError",REACTTRACE);
00361
00364 if ( !m_active ) {
00365 DL((REACT,"Received cmd to stop Reactor\n"));
00366 return (false);
00367 }
00368
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386
00387 if ( errno == EINTR ) {
00388 EL((REACT,"EINTR: interrupted select(2)\n"));
00389
00390
00391
00392
00393
00394
00395
00396 return (true);
00397 }
00398
00399
00400
00401
00402
00403
00404 if ( errno == EBADF ) {
00405 DL((REACT,"EBADF: bad file descriptor\n"));
00406 return (checkFDs ());
00407 }
00408
00409
00410
00411 #if defined (WIN32)
00412 DL ((REACT,"select(3) error = %d\n", WSAGetLastError()));
00413 #else
00414 EL((ASSAERR,"select(3) error\n"));
00415 #endif
00416 return (false);
00417 }
00418
00419 int
00420 Reactor::
00421 isAnyReady (void)
00422 {
00423 trace_with_mask("Reactor::isAnyReady",REACTTRACE);
00424
00425 int n = m_readySet.m_rset.numSet () +
00426 m_readySet.m_wset.numSet () +
00427 m_readySet.m_eset.numSet ();
00428
00429 if ( n > 0 ) {
00430 DL((REACT,"m_readySet: %d FDs are ready for processing\n", n));
00431 m_readySet.dump ();
00432 }
00433 return (n);
00434 }
00435
00436 void
00437 Reactor::
00438 calculateTimeout (TimeVal*& howlong_, TimeVal* maxwait_)
00439 {
00440 trace_with_mask("Reactor::calculateTimeout",REACTTRACE);
00441
00442 TimeVal now;
00443 TimeVal tv;
00444
00445 if (m_tqueue.isEmpty () ) {
00446 howlong_ = maxwait_;
00447 goto done;
00448 }
00449 now = TimeVal::gettimeofday ();
00450 tv = m_tqueue.top ();
00451
00452 if (tv < now) {
00453
00454
00455
00456
00457
00458 *howlong_ = 0;
00459 }
00460 else {
00461 DL((REACT,"--------- Timer Queue ----------\n"));
00462 m_tqueue.dump();
00463 DL((REACT,"--------------------------------\n"));
00464
00465 if (maxwait_ == NULL || *maxwait_ == TimeVal::zeroTime ()) {
00466 *howlong_ = tv - now;
00467 }
00468 else {
00469 *howlong_ = (*maxwait_+now) < tv ? *maxwait_ : tv-now;
00470 }
00471 }
00472
00473 done:
00474 if (howlong_ != NULL) {
00475 DL((REACT,"delay (%f)\n", double (*howlong_) ));
00476 }
00477 else {
00478 DL((REACT,"delay (forever)\n"));
00479 }
00480 }
00481
00485 void
00486 Reactor::
00487 waitForEvents (void)
00488 {
00489 while ( m_active ) {
00490 waitForEvents ((TimeVal*) NULL);
00491 }
00492 }
00493
00510 void
00511 Reactor::
00512 waitForEvents (TimeVal* tv_)
00513 {
00514 trace_with_mask("Reactor::waitForEvents",REACTTRACE);
00515
00516 TimerCountdown traceTime (tv_);
00517 DL((REACT,"======================================\n"));
00518
00519
00520 m_tqueue.expire (TimeVal::gettimeofday ());
00521
00522
00523
00524
00525 if (!m_active) {
00526 return;
00527 }
00528
00529 int nReady;
00530 TimeVal delay;
00531 TimeVal* dlp = &delay;
00532
00533
00534
00535
00536
00537
00538
00539
00540 if ((nReady = isAnyReady ())) {
00541 DL((REACT,"isAnyReady returned: %d\n",nReady));
00542 dispatch (nReady);
00543 return;
00544 }
00545
00546 DL((REACT,"=== m_waitSet ===\n"));
00547 m_waitSet.dump ();
00548
00549 do {
00550 m_readySet.reset ();
00551 DL ((REACT,"m_readySet after reset():\n"));
00552 m_readySet.dump ();
00553
00554 m_readySet = m_waitSet;
00555 DL ((REACT,"m_readySet after assign:\n"));
00556 m_readySet.dump ();
00557
00558 calculateTimeout (dlp, tv_);
00559
00560 nReady = ::select (m_maxfd_plus1,
00561 &m_readySet.m_rset,
00562 &m_readySet.m_wset,
00563 &m_readySet.m_eset,
00564 dlp);
00565 DL((REACT,"::select() returned: %d\n",nReady));
00566
00567 m_readySet.sync ();
00568 DL ((REACT,"m_readySet after select:\n"));
00569 m_readySet.dump ();
00570
00571 }
00572 while (nReady < 0 && handleError ());
00573
00574 dispatch (nReady);
00575 }
00576
00583 void
00584 Reactor::
00585 dispatchHandler (FdSet& mask_, Fd2Eh_Map_Type& fdSet_, EH_IO_Callback callback_)
00586 {
00587 trace_with_mask("Reactor::dispatchHandler",REACTTRACE);
00588
00589 int ret = 0;
00590 handler_t fd;
00591 EventHandler* ehp = NULL;
00592 std::string eh_id;
00593
00594 Fd2Eh_Map_Iter iter = fdSet_.begin ();
00595
00596 while (iter != fdSet_.end ())
00597 {
00598 fd = (*iter).first;
00599 ehp = (*iter).second;
00600
00601 if (mask_.isSet (fd) && ehp != NULL)
00602 {
00603 eh_id = ehp->get_id ();
00604 DL((REACT,"Data detected from \"%s\"(fd=%d)\n",
00605 eh_id.c_str (), fd));
00606
00607 ret = (ehp->*callback_) (fd);
00608
00609 if (ret == -1) {
00610 removeIOHandler (fd);
00611 }
00612 else if (ret > 0) {
00613 DL((REACT,"%d bytes pending on fd=%d \"%s\"\n",
00614 ret, fd, eh_id.c_str ()));
00615
00616 }
00617 else {
00618 DL((REACT,"All data from \"%s\"(fd=%d) are consumed\n",
00619 eh_id.c_str (), fd));
00620 mask_.clear (fd);
00621 }
00628 iter = fdSet_.begin ();
00629 }
00630 else {
00631 iter++;
00632 }
00633 }
00634 }
00635
00641 bool
00642 Reactor::
00643 dispatch (int ready_)
00644 {
00645 trace_with_mask("Reactor::dispatch", REACTTRACE);
00646
00647 m_tqueue.expire (TimeVal::gettimeofday ());
00648
00649 if ( ready_ < 0 )
00650 {
00651 #if !defined (WIN32)
00652 EL((ASSAERR,"::select(3) error\n"));
00653 #endif
00654 return (false);
00655 }
00656 if ( ready_ == 0 ) {
00657 return (true);
00658 }
00659
00660 DL((REACT,"Dispatching %d FDs.\n",ready_));
00661 DL((REACT,"m_readySet:\n"));
00662 m_readySet.dump ();
00663
00664
00665 dispatchHandler (m_readySet.m_wset,
00666 m_writeSet,
00667 &EventHandler::handle_write);
00668
00669
00670 dispatchHandler (m_readySet.m_eset,
00671 m_exceptSet,
00672 &EventHandler::handle_except);
00673
00674
00675 dispatchHandler (m_readySet.m_rset,
00676 m_readSet,
00677 &EventHandler::handle_read);
00678
00679 return (true);
00680 }
00681
00682 void
00683 Reactor::
00684 stopReactor (void)
00685 {
00686 trace_with_mask("Reactor::stopReactor", REACTTRACE);
00687
00688 m_active = false;
00689
00690 Fd2Eh_Map_Iter iter;
00691 EventHandler* ehp;
00692
00693 while (m_readSet.size () > 0) {
00694 iter = m_readSet.begin ();
00695 ehp = (*iter).second;
00696 removeHandler (ehp);
00697 }
00698
00699 while (m_writeSet.size () > 0) {
00700 iter = m_writeSet.begin ();
00701 ehp = (*iter).second;
00702 removeHandler (ehp);
00703 }
00704
00705 while (m_exceptSet.size () > 0) {
00706 iter = m_exceptSet.begin ();
00707 ehp = (*iter).second;
00708 removeHandler (ehp);
00709 }
00710 }
00711
00716 void
00717 Reactor::
00718 adjust_maxfdp1 (handler_t fd_,
00719 handler_t rmax_,
00720 handler_t wmax_,
00721 handler_t emax_)
00722 {
00723 #if !defined (WIN32)
00724
00725 trace_with_mask("Reactor::adjust_maxfdp1", REACTTRACE);
00726
00727 if (m_maxfd_plus1 == fd_ + 1) {
00728 m_maxfd_plus1 = std::max (rmax_, std::max (wmax_, emax_));
00729
00730 DL((REACT,"maxfd+1 adjusted to %d\n", m_maxfd_plus1));
00731 }
00732 #endif
00733 }