00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015 #include "CSocketMultiplexer.h"
00016 #include "ISocketMultiplexerJob.h"
00017 #include "CCondVar.h"
00018 #include "CLock.h"
00019 #include "CMutex.h"
00020 #include "CThread.h"
00021 #include "CLog.h"
00022 #include "TMethodJob.h"
00023 #include "CArch.h"
00024 #include "XArch.h"
00025 #include "stdvector.h"
00026
00027
00028
00029
00030
00031 CSocketMultiplexer* CSocketMultiplexer::s_instance = NULL;
00032
00033 CSocketMultiplexer::CSocketMultiplexer() :
00034 m_mutex(new CMutex),
00035 m_thread(NULL),
00036 m_update(false),
00037 m_jobsReady(new CCondVar<bool>(m_mutex, false)),
00038 m_jobListLock(new CCondVar<bool>(m_mutex, false)),
00039 m_jobListLockLocked(new CCondVar<bool>(m_mutex, false)),
00040 m_jobListLocker(NULL),
00041 m_jobListLockLocker(NULL)
00042 {
00043 assert(s_instance == NULL);
00044
00045
00046
00047
00048 m_cursorMark = reinterpret_cast<ISocketMultiplexerJob*>(this);
00049
00050
00051 m_thread = new CThread(new TMethodJob<CSocketMultiplexer>(
00052 this, &CSocketMultiplexer::serviceThread));
00053
00054 s_instance = this;
00055 }
00056
00057 CSocketMultiplexer::~CSocketMultiplexer()
00058 {
00059 m_thread->cancel();
00060 m_thread->unblockPollSocket();
00061 m_thread->wait();
00062 delete m_thread;
00063 delete m_jobsReady;
00064 delete m_jobListLock;
00065 delete m_jobListLockLocked;
00066 delete m_jobListLocker;
00067 delete m_jobListLockLocker;
00068 delete m_mutex;
00069
00070
00071 for (CSocketJobMap::iterator i = m_socketJobMap.begin();
00072 i != m_socketJobMap.end(); ++i) {
00073 delete *(i->second);
00074 }
00075
00076 s_instance = NULL;
00077 }
00078
00079 CSocketMultiplexer*
00080 CSocketMultiplexer::getInstance()
00081 {
00082 return s_instance;
00083 }
00084
00085 void
00086 CSocketMultiplexer::addSocket(ISocket* socket, ISocketMultiplexerJob* job)
00087 {
00088 assert(socket != NULL);
00089 assert(job != NULL);
00090
00091
00092 lockJobListLock();
00093
00094
00095 m_thread->unblockPollSocket();
00096
00097
00098 lockJobList();
00099
00100
00101 CSocketJobMap::iterator i = m_socketJobMap.find(socket);
00102 if (i == m_socketJobMap.end()) {
00103
00104
00105
00106 CJobCursor j = m_socketJobs.insert(m_socketJobs.end(), job);
00107 m_update = true;
00108 m_socketJobMap.insert(std::make_pair(socket, j));
00109 }
00110 else {
00111 CJobCursor j = i->second;
00112 if (*j != job) {
00113 delete *j;
00114 *j = job;
00115 }
00116 m_update = true;
00117 }
00118
00119
00120 unlockJobList();
00121 }
00122
00123 void
00124 CSocketMultiplexer::removeSocket(ISocket* socket)
00125 {
00126 assert(socket != NULL);
00127
00128
00129 lockJobListLock();
00130
00131
00132 m_thread->unblockPollSocket();
00133
00134
00135 lockJobList();
00136
00137
00138
00139
00140 CSocketJobMap::iterator i = m_socketJobMap.find(socket);
00141 if (i != m_socketJobMap.end()) {
00142 if (*(i->second) != NULL) {
00143 delete *(i->second);
00144 *(i->second) = NULL;
00145 m_update = true;
00146 }
00147 }
00148
00149
00150 unlockJobList();
00151 }
00152
00153 void
00154 CSocketMultiplexer::serviceThread(void*)
00155 {
00156 std::vector<IArchNetwork::CPollEntry> pfds;
00157 IArchNetwork::CPollEntry pfd;
00158
00159
00160 for (;;) {
00161 CThread::testCancel();
00162
00163
00164 {
00165 CLock lock(m_mutex);
00166 while (!(bool)*m_jobsReady) {
00167 m_jobsReady->wait();
00168 }
00169 }
00170
00171
00172 lockJobListLock();
00173 lockJobList();
00174
00175
00176 if (m_update) {
00177 m_update = false;
00178 pfds.clear();
00179 pfds.reserve(m_socketJobMap.size());
00180
00181 CJobCursor cursor = newCursor();
00182 CJobCursor jobCursor = nextCursor(cursor);
00183 while (jobCursor != m_socketJobs.end()) {
00184 ISocketMultiplexerJob* job = *jobCursor;
00185 if (job != NULL) {
00186 pfd.m_socket = job->getSocket();
00187 pfd.m_events = 0;
00188 if (job->isReadable()) {
00189 pfd.m_events |= IArchNetwork::kPOLLIN;
00190 }
00191 if (job->isWritable()) {
00192 pfd.m_events |= IArchNetwork::kPOLLOUT;
00193 }
00194 pfds.push_back(pfd);
00195 }
00196 jobCursor = nextCursor(cursor);
00197 }
00198 deleteCursor(cursor);
00199 }
00200
00201 int status;
00202 try {
00203
00204 if (!pfds.empty()) {
00205 status = ARCH->pollSocket(&pfds[0], pfds.size(), -1);
00206 }
00207 else {
00208 status = 0;
00209 }
00210 }
00211 catch (XArchNetwork& e) {
00212 LOG((CLOG_WARN "error in socket multiplexer: %s", e.what().c_str()));
00213 status = 0;
00214 }
00215
00216 if (status != 0) {
00217
00218
00219 UInt32 i = 0;
00220 CJobCursor cursor = newCursor();
00221 CJobCursor jobCursor = nextCursor(cursor);
00222 while (i < pfds.size() && jobCursor != m_socketJobs.end()) {
00223 if (*jobCursor != NULL) {
00224
00225 unsigned short revents = pfds[i].m_revents;
00226 bool read = ((revents & IArchNetwork::kPOLLIN) != 0);
00227 bool write = ((revents & IArchNetwork::kPOLLOUT) != 0);
00228 bool error = ((revents & (IArchNetwork::kPOLLERR |
00229 IArchNetwork::kPOLLNVAL)) != 0);
00230
00231
00232 ISocketMultiplexerJob* job = *jobCursor;
00233 ISocketMultiplexerJob* newJob = job->run(read, write, error);
00234
00235
00236 if (newJob != job) {
00237 CLock lock(m_mutex);
00238 delete job;
00239 *jobCursor = newJob;
00240 m_update = true;
00241 }
00242 ++i;
00243 }
00244
00245
00246 jobCursor = nextCursor(cursor);
00247 }
00248 deleteCursor(cursor);
00249 }
00250
00251
00252 for (CSocketJobMap::iterator i = m_socketJobMap.begin();
00253 i != m_socketJobMap.end();) {
00254 if (*(i->second) == NULL) {
00255 m_socketJobMap.erase(i++);
00256 m_update = true;
00257 }
00258 else {
00259 ++i;
00260 }
00261 }
00262
00263
00264 unlockJobList();
00265 }
00266 }
00267
00268 CSocketMultiplexer::CJobCursor
00269 CSocketMultiplexer::newCursor()
00270 {
00271 CLock lock(m_mutex);
00272 return m_socketJobs.insert(m_socketJobs.begin(), m_cursorMark);
00273 }
00274
00275 CSocketMultiplexer::CJobCursor
00276 CSocketMultiplexer::nextCursor(CJobCursor cursor)
00277 {
00278 CLock lock(m_mutex);
00279 CJobCursor j = m_socketJobs.end();
00280 CJobCursor i = cursor;
00281 while (++i != m_socketJobs.end()) {
00282 if (*i != m_cursorMark) {
00283
00284 j = i;
00285
00286
00287 m_socketJobs.splice(++i, m_socketJobs, cursor);
00288 break;
00289 }
00290 }
00291 return j;
00292 }
00293
00294 void
00295 CSocketMultiplexer::deleteCursor(CJobCursor cursor)
00296 {
00297 CLock lock(m_mutex);
00298 m_socketJobs.erase(cursor);
00299 }
00300
00301 void
00302 CSocketMultiplexer::lockJobListLock()
00303 {
00304 CLock lock(m_mutex);
00305
00306
00307 while (*m_jobListLockLocked) {
00308 m_jobListLockLocked->wait();
00309 }
00310
00311
00312 *m_jobListLockLocked = true;
00313 m_jobListLockLocker = new CThread(CThread::getCurrentThread());
00314 }
00315
00316 void
00317 CSocketMultiplexer::lockJobList()
00318 {
00319 CLock lock(m_mutex);
00320
00321
00322 assert(*m_jobListLockLocker == CThread::getCurrentThread());
00323
00324
00325 while (*m_jobListLock) {
00326 m_jobListLock->wait();
00327 }
00328
00329
00330 *m_jobListLock = true;
00331 m_jobListLocker = m_jobListLockLocker;
00332 m_jobListLockLocker = NULL;
00333
00334
00335 *m_jobListLockLocked = false;
00336 m_jobListLockLocked->broadcast();
00337 }
00338
00339 void
00340 CSocketMultiplexer::unlockJobList()
00341 {
00342 CLock lock(m_mutex);
00343
00344
00345 assert(*m_jobListLocker == CThread::getCurrentThread());
00346
00347
00348 delete m_jobListLocker;
00349 m_jobListLocker = NULL;
00350 *m_jobListLock = false;
00351 m_jobListLock->signal();
00352
00353
00354 bool isReady = !m_socketJobMap.empty();
00355 if (*m_jobsReady != isReady) {
00356 *m_jobsReady = isReady;
00357 m_jobsReady->signal();
00358 }
00359 }