Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members

CTCPSocket.cpp

00001 /*
00002  * synergy -- mouse and keyboard sharing utility
00003  * Copyright (C) 2002 Chris Schoeneman
00004  * 
00005  * This package is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU General Public License
00007  * found in the file COPYING that should have accompanied this file.
00008  * 
00009  * This package is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  */
00014 
00015 #include "CTCPSocket.h"
00016 #include "CNetworkAddress.h"
00017 #include "CSocketMultiplexer.h"
00018 #include "TSocketMultiplexerMethodJob.h"
00019 #include "XSocket.h"
00020 #include "CLock.h"
00021 #include "CLog.h"
00022 #include "IEventQueue.h"
00023 #include "IEventJob.h"
00024 #include "CArch.h"
00025 #include "XArch.h"
00026 #include <cstring>
00027 #include <cstdlib>
00028 #include <memory>
00029 
00030 //
00031 // CTCPSocket
00032 //
00033 
00034 CTCPSocket::CTCPSocket() :
00035     m_mutex(),
00036     m_flushed(&m_mutex, true)
00037 {
00038     try {
00039         m_socket = ARCH->newSocket(IArchNetwork::kINET, IArchNetwork::kSTREAM);
00040     }
00041     catch (XArchNetwork& e) {
00042         throw XSocketCreate(e.what());
00043     }
00044 
00045     init();
00046 }
00047 
00048 CTCPSocket::CTCPSocket(CArchSocket socket) :
00049     m_mutex(),
00050     m_socket(socket),
00051     m_flushed(&m_mutex, true)
00052 {
00053     assert(m_socket != NULL);
00054 
00055     // socket starts in connected state
00056     init();
00057     onConnected();
00058     setJob(newJob());
00059 }
00060 
00061 CTCPSocket::~CTCPSocket()
00062 {
00063     try {
00064         close();
00065     }
00066     catch (...) {
00067         // ignore
00068     }
00069 }
00070 
00071 void
00072 CTCPSocket::bind(const CNetworkAddress& addr)
00073 {
00074     try {
00075         ARCH->bindSocket(m_socket, addr.getAddress());
00076     }
00077     catch (XArchNetworkAddressInUse& e) {
00078         throw XSocketAddressInUse(e.what());
00079     }
00080     catch (XArchNetwork& e) {
00081         throw XSocketBind(e.what());
00082     }
00083 }
00084 
00085 void
00086 CTCPSocket::close()
00087 {
00088     // remove ourself from the multiplexer
00089     setJob(NULL);
00090 
00091     CLock lock(&m_mutex);
00092 
00093     // clear buffers and enter disconnected state
00094     if (m_connected) {
00095         sendEvent(getDisconnectedEvent());
00096     }
00097     onDisconnected();
00098 
00099     // close the socket
00100     if (m_socket != NULL) {
00101         CArchSocket socket = m_socket;
00102         m_socket = NULL;
00103         try {
00104             ARCH->closeSocket(socket);
00105         }
00106         catch (XArchNetwork& e) {
00107             // ignore, there's not much we can do
00108             LOG((CLOG_WARN "error closing socket: %s", e.what().c_str()));
00109         }
00110     }
00111 }
00112 
00113 void*
00114 CTCPSocket::getEventTarget() const
00115 {
00116     return const_cast<void*>(reinterpret_cast<const void*>(this));
00117 }
00118 
00119 UInt32
00120 CTCPSocket::read(void* buffer, UInt32 n)
00121 {
00122     // copy data directly from our input buffer
00123     CLock lock(&m_mutex);
00124     UInt32 size = m_inputBuffer.getSize();
00125     if (n > size) {
00126         n = size;
00127     }
00128     if (buffer != NULL && n != 0) {
00129         memcpy(buffer, m_inputBuffer.peek(n), n);
00130     }
00131     m_inputBuffer.pop(n);
00132 
00133     // if no more data and we cannot read or write then send disconnected
00134     if (n > 0 && m_inputBuffer.getSize() == 0 && !m_readable && !m_writable) {
00135         sendEvent(getDisconnectedEvent());
00136         m_connected = false;
00137     }
00138 
00139     return n;
00140 }
00141 
00142 void
00143 CTCPSocket::write(const void* buffer, UInt32 n)
00144 {
00145     bool wasEmpty;
00146     {
00147         CLock lock(&m_mutex);
00148 
00149         // must not have shutdown output
00150         if (!m_writable) {
00151             sendEvent(getOutputErrorEvent());
00152             return;
00153         }
00154 
00155         // ignore empty writes
00156         if (n == 0) {
00157             return;
00158         }
00159 
00160         // copy data to the output buffer
00161         wasEmpty = (m_outputBuffer.getSize() == 0);
00162         m_outputBuffer.write(buffer, n);
00163 
00164         // there's data to write
00165         m_flushed = false;
00166     }
00167 
00168     // make sure we're waiting to write
00169     if (wasEmpty) {
00170         setJob(newJob());
00171     }
00172 }
00173 
00174 void
00175 CTCPSocket::flush()
00176 {
00177     CLock lock(&m_mutex);
00178     while (m_flushed == false) {
00179         m_flushed.wait();
00180     }
00181 }
00182 
00183 void
00184 CTCPSocket::shutdownInput()
00185 {
00186     bool useNewJob = false;
00187     {
00188         CLock lock(&m_mutex);
00189 
00190         // shutdown socket for reading
00191         try {
00192             ARCH->closeSocketForRead(m_socket);
00193         }
00194         catch (XArchNetwork&) {
00195             // ignore
00196         }
00197 
00198         // shutdown buffer for reading
00199         if (m_readable) {
00200             sendEvent(getInputShutdownEvent());
00201             onInputShutdown();
00202             useNewJob = true;
00203         }
00204     }
00205     if (useNewJob) {
00206         setJob(newJob());
00207     }
00208 }
00209 
00210 void
00211 CTCPSocket::shutdownOutput()
00212 {
00213     bool useNewJob = false;
00214     {
00215         CLock lock(&m_mutex);
00216 
00217         // shutdown socket for writing
00218         try {
00219             ARCH->closeSocketForWrite(m_socket);
00220         }
00221         catch (XArchNetwork&) {
00222             // ignore
00223         }
00224 
00225         // shutdown buffer for writing
00226         if (m_writable) {
00227             sendEvent(getOutputShutdownEvent());
00228             onOutputShutdown();
00229             useNewJob = true;
00230         }
00231     }
00232     if (useNewJob) {
00233         setJob(newJob());
00234     }
00235 }
00236 
00237 bool
00238 CTCPSocket::isReady() const
00239 {
00240     CLock lock(&m_mutex);
00241     return (m_inputBuffer.getSize() > 0);
00242 }
00243 
00244 UInt32
00245 CTCPSocket::getSize() const
00246 {
00247     CLock lock(&m_mutex);
00248     return m_inputBuffer.getSize();
00249 }
00250 
00251 void
00252 CTCPSocket::connect(const CNetworkAddress& addr)
00253 {
00254     {
00255         CLock lock(&m_mutex);
00256 
00257         // fail on attempts to reconnect
00258         if (m_socket == NULL || m_connected) {
00259             sendConnectionFailedEvent("busy");
00260             return;
00261         }
00262 
00263         try {
00264             if (ARCH->connectSocket(m_socket, addr.getAddress())) {
00265                 sendEvent(getConnectedEvent());
00266                 onConnected();
00267             }
00268             else {
00269                 // connection is in progress
00270                 m_writable = true;
00271             }
00272         }
00273         catch (XArchNetwork& e) {
00274             throw XSocketConnect(e.what());
00275         }
00276     }
00277     setJob(newJob());
00278 }
00279 
00280 void
00281 CTCPSocket::init()
00282 {
00283     // default state
00284     m_connected = false;
00285     m_readable  = false;
00286     m_writable  = false;
00287 
00288     try {
00289         // turn off Nagle algorithm.  we send lots of very short messages
00290         // that should be sent without (much) delay.  for example, the
00291         // mouse motion messages are much less useful if they're delayed.
00292         ARCH->setNoDelayOnSocket(m_socket, true);
00293     }
00294     catch (XArchNetwork& e) {
00295         try {
00296             ARCH->closeSocket(m_socket);
00297             m_socket = NULL;
00298         }
00299         catch (XArchNetwork&) {
00300             // ignore
00301         }
00302         throw XSocketCreate(e.what());
00303     }
00304 }
00305 
00306 void
00307 CTCPSocket::setJob(ISocketMultiplexerJob* job)
00308 {
00309     // multiplexer will delete the old job
00310     if (job == NULL) {
00311         CSocketMultiplexer::getInstance()->removeSocket(this);
00312     }
00313     else {
00314         CSocketMultiplexer::getInstance()->addSocket(this, job);
00315     }
00316 }
00317 
00318 ISocketMultiplexerJob*
00319 CTCPSocket::newJob()
00320 {
00321     // note -- must have m_mutex locked on entry
00322 
00323     if (m_socket == NULL) {
00324         return NULL;
00325     }
00326     else if (!m_connected) {
00327         assert(!m_readable);
00328         if (!(m_readable || m_writable)) {
00329             return NULL;
00330         }
00331         return new TSocketMultiplexerMethodJob<CTCPSocket>(
00332                                 this, &CTCPSocket::serviceConnecting,
00333                                 m_socket, m_readable, m_writable);
00334     }
00335     else {
00336         if (!(m_readable || (m_writable && (m_outputBuffer.getSize() > 0)))) {
00337             return NULL;
00338         }
00339         return new TSocketMultiplexerMethodJob<CTCPSocket>(
00340                                 this, &CTCPSocket::serviceConnected,
00341                                 m_socket, m_readable,
00342                                 m_writable && (m_outputBuffer.getSize() > 0));
00343     }
00344 }
00345 
00346 void
00347 CTCPSocket::sendConnectionFailedEvent(const char* msg)
00348 {
00349     CConnectionFailedInfo* info = (CConnectionFailedInfo*)malloc(
00350                             sizeof(CConnectionFailedInfo) + strlen(msg));
00351     strcpy(info->m_what, msg);
00352     EVENTQUEUE->addEvent(CEvent(getConnectionFailedEvent(),
00353                             getEventTarget(), info));
00354 }
00355 
00356 void
00357 CTCPSocket::sendEvent(CEvent::Type type)
00358 {
00359     EVENTQUEUE->addEvent(CEvent(type, getEventTarget(), NULL));
00360 }
00361 
00362 void
00363 CTCPSocket::onConnected()
00364 {
00365     m_connected = true;
00366     m_readable  = true;
00367     m_writable  = true;
00368 }
00369 
00370 void
00371 CTCPSocket::onInputShutdown()
00372 {
00373     m_inputBuffer.pop(m_inputBuffer.getSize());
00374     m_readable = false;
00375 }
00376 
00377 void
00378 CTCPSocket::onOutputShutdown()
00379 {
00380     m_outputBuffer.pop(m_outputBuffer.getSize());
00381     m_writable = false;
00382 
00383     // we're now flushed
00384     m_flushed = true;
00385     m_flushed.broadcast();
00386 }
00387 
00388 void
00389 CTCPSocket::onDisconnected()
00390 {
00391     // disconnected
00392     onInputShutdown();
00393     onOutputShutdown();
00394     m_connected = false;
00395 }
00396 
00397 ISocketMultiplexerJob*
00398 CTCPSocket::serviceConnecting(ISocketMultiplexerJob* job,
00399                 bool, bool write, bool error)
00400 {
00401     CLock lock(&m_mutex);
00402 
00403     // should only check for errors if error is true but checking a new
00404     // socket (and a socket that's connecting should be new) for errors
00405     // should be safe and Mac OS X appears to have a bug where a
00406     // non-blocking stream socket that fails to connect immediately is
00407     // reported by select as being writable (i.e. connected) even when
00408     // the connection has failed.  this is easily demonstrated on OS X
00409     // 10.3.4 by starting a synergy client and telling to connect to
00410     // another system that's not running a synergy server.  it will
00411     // claim to have connected then quickly disconnect (i guess because
00412     // read returns 0 bytes).  unfortunately, synergy attempts to
00413     // reconnect immediately, the process repeats and we end up
00414     // spinning the CPU.  luckily, OS X does set SO_ERROR on the
00415     // socket correctly when the connection has failed so checking for
00416     // errors works.  (curiously, sometimes OS X doesn't report
00417     // connection refused.  when that happens it at least doesn't
00418     // report the socket as being writable so synergy is able to time
00419     // out the attempt.)
00420     if (error || true) {
00421         try {
00422             // connection may have failed or succeeded
00423             ARCH->throwErrorOnSocket(m_socket);
00424         }
00425         catch (XArchNetwork& e) {
00426             sendConnectionFailedEvent(e.what().c_str());
00427             onDisconnected();
00428             return newJob();
00429         }
00430     }
00431 
00432     if (write) {
00433         sendEvent(getConnectedEvent());
00434         onConnected();
00435         return newJob();
00436     }
00437 
00438     return job;
00439 }
00440 
00441 ISocketMultiplexerJob*
00442 CTCPSocket::serviceConnected(ISocketMultiplexerJob* job,
00443                 bool read, bool write, bool error)
00444 {
00445     CLock lock(&m_mutex);
00446 
00447     if (error) {
00448         sendEvent(getDisconnectedEvent());
00449         onDisconnected();
00450         return newJob();
00451     }
00452 
00453     bool needNewJob = false;
00454 
00455     if (write) {
00456         try {
00457             // write data
00458             UInt32 n = m_outputBuffer.getSize();
00459             const void* buffer = m_outputBuffer.peek(n);
00460             n = (UInt32)ARCH->writeSocket(m_socket, buffer, n);
00461 
00462             // discard written data
00463             if (n > 0) {
00464                 m_outputBuffer.pop(n);
00465                 if (m_outputBuffer.getSize() == 0) {
00466                     sendEvent(getOutputFlushedEvent());
00467                     m_flushed = true;
00468                     m_flushed.broadcast();
00469                     needNewJob = true;
00470                 }
00471             }
00472         }
00473         catch (XArchNetworkShutdown&) {
00474             // remote read end of stream hungup.  our output side
00475             // has therefore shutdown.
00476             onOutputShutdown();
00477             sendEvent(getOutputShutdownEvent());
00478             if (!m_readable && m_inputBuffer.getSize() == 0) {
00479                 sendEvent(getDisconnectedEvent());
00480                 m_connected = false;
00481             }
00482             needNewJob = true;
00483         }
00484         catch (XArchNetworkDisconnected&) {
00485             // stream hungup
00486             onDisconnected();
00487             sendEvent(getDisconnectedEvent());
00488             needNewJob = true;
00489         }
00490         catch (XArchNetwork& e) {
00491             // other write error
00492             LOG((CLOG_WARN "error writing socket: %s", e.what().c_str()));
00493             onDisconnected();
00494             sendEvent(getOutputErrorEvent());
00495             sendEvent(getDisconnectedEvent());
00496             needNewJob = true;
00497         }
00498     }
00499 
00500     if (read && m_readable) {
00501         try {
00502             UInt8 buffer[4096];
00503             size_t n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00504             if (n > 0) {
00505                 bool wasEmpty = (m_inputBuffer.getSize() == 0);
00506 
00507                 // slurp up as much as possible
00508                 do {
00509                     m_inputBuffer.write(buffer, n);
00510                     n = ARCH->readSocket(m_socket, buffer, sizeof(buffer));
00511                 } while (n > 0);
00512 
00513                 // send input ready if input buffer was empty
00514                 if (wasEmpty) {
00515                     sendEvent(getInputReadyEvent());
00516                 }
00517             }
00518             else {
00519                 // remote write end of stream hungup.  our input side
00520                 // has therefore shutdown but don't flush our buffer
00521                 // since there's still data to be read.
00522                 sendEvent(getInputShutdownEvent());
00523                 if (!m_writable && m_inputBuffer.getSize() == 0) {
00524                     sendEvent(getDisconnectedEvent());
00525                     m_connected = false;
00526                 }
00527                 m_readable = false;
00528                 needNewJob = true;
00529             }
00530         }
00531         catch (XArchNetworkDisconnected&) {
00532             // stream hungup
00533             sendEvent(getDisconnectedEvent());
00534             onDisconnected();
00535             needNewJob = true;
00536         }
00537         catch (XArchNetwork& e) {
00538             // ignore other read error
00539             LOG((CLOG_WARN "error reading socket: %s", e.what().c_str()));
00540         }
00541     }
00542 
00543     return needNewJob ? newJob() : job;
00544 }

Generated on Fri Nov 6 00:21:14 2009 for synergy-plus by  doxygen 1.3.9.1