Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members

CPacketStreamFilter.cpp

00001 /*
00002  * synergy -- mouse and keyboard sharing utility
00003  * Copyright (C) 2004 Chris Schoeneman
00004  * 
00005  * This package is free software; you can redistribute it and/or
00006  * modify it under the terms of the GNU General Public License
00007  * found in the file COPYING that should have accompanied this file.
00008  * 
00009  * This package is distributed in the hope that it will be useful,
00010  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00011  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00012  * GNU General Public License for more details.
00013  */
00014 
00015 #include "CPacketStreamFilter.h"
00016 #include "IEventQueue.h"
00017 #include "CLock.h"
00018 #include "TMethodEventJob.h"
00019 #include <cstring>
00020 #include <memory>
00021 
00022 //
00023 // CPacketStreamFilter
00024 //
00025 
00026 CPacketStreamFilter::CPacketStreamFilter(IStream* stream, bool adoptStream) :
00027     CStreamFilter(stream, adoptStream),
00028     m_size(0),
00029     m_inputShutdown(false)
00030 {
00031     // do nothing
00032 }
00033 
00034 CPacketStreamFilter::~CPacketStreamFilter()
00035 {
00036     // do nothing
00037 }
00038 
00039 void
00040 CPacketStreamFilter::close()
00041 {
00042     CLock lock(&m_mutex);
00043     m_size = 0;
00044     m_buffer.pop(m_buffer.getSize());
00045     CStreamFilter::close();
00046 }
00047 
00048 UInt32
00049 CPacketStreamFilter::read(void* buffer, UInt32 n)
00050 {
00051     if (n == 0) {
00052         return 0;
00053     }
00054 
00055     CLock lock(&m_mutex);
00056 
00057     // if not enough data yet then give up
00058     if (!isReadyNoLock()) {
00059         return 0;
00060     }
00061 
00062     // read no more than what's left in the buffered packet
00063     if (n > m_size) {
00064         n = m_size;
00065     }
00066 
00067     // read it
00068     if (buffer != NULL) {
00069         memcpy(buffer, m_buffer.peek(n), n);
00070     }
00071     m_buffer.pop(n);
00072     m_size -= n;
00073 
00074     // get next packet's size if we've finished with this packet and
00075     // there's enough data to do so.
00076     readPacketSize();
00077 
00078     if (m_inputShutdown && m_size == 0) {
00079         EVENTQUEUE->addEvent(CEvent(getInputShutdownEvent(),
00080                         getEventTarget(), NULL));
00081     }
00082 
00083     return n;
00084 }
00085 
00086 void
00087 CPacketStreamFilter::write(const void* buffer, UInt32 count)
00088 {
00089     // write the length of the payload
00090     UInt8 length[4];
00091     length[0] = (UInt8)((count >> 24) & 0xff);
00092     length[1] = (UInt8)((count >> 16) & 0xff);
00093     length[2] = (UInt8)((count >>  8) & 0xff);
00094     length[3] = (UInt8)( count        & 0xff);
00095     getStream()->write(length, sizeof(length));
00096 
00097     // write the payload
00098     getStream()->write(buffer, count);
00099 }
00100 
00101 void
00102 CPacketStreamFilter::shutdownInput()
00103 {
00104     CLock lock(&m_mutex);
00105     m_size = 0;
00106     m_buffer.pop(m_buffer.getSize());
00107     CStreamFilter::shutdownInput();
00108 }
00109 
00110 bool
00111 CPacketStreamFilter::isReady() const
00112 {
00113     CLock lock(&m_mutex);
00114     return isReadyNoLock();
00115 }
00116 
00117 UInt32
00118 CPacketStreamFilter::getSize() const
00119 {
00120     CLock lock(&m_mutex);
00121     return isReadyNoLock() ? m_size : 0;
00122 }
00123 
00124 bool
00125 CPacketStreamFilter::isReadyNoLock() const
00126 {
00127     return (m_size != 0 && m_buffer.getSize() >= m_size);
00128 }
00129 
00130 void
00131 CPacketStreamFilter::readPacketSize()
00132 {
00133     // note -- m_mutex must be locked on entry
00134 
00135     if (m_size == 0 && m_buffer.getSize() >= 4) {
00136         UInt8 buffer[4];
00137         memcpy(buffer, m_buffer.peek(sizeof(buffer)), sizeof(buffer));
00138         m_buffer.pop(sizeof(buffer));
00139         m_size = ((UInt32)buffer[0] << 24) |
00140                  ((UInt32)buffer[1] << 16) |
00141                  ((UInt32)buffer[2] <<  8) |
00142                   (UInt32)buffer[3];
00143     }
00144 }
00145 
00146 bool
00147 CPacketStreamFilter::readMore()
00148 {
00149     // note if we have whole packet
00150     bool wasReady = isReadyNoLock();
00151 
00152     // read more data
00153     char buffer[4096];
00154     UInt32 n = getStream()->read(buffer, sizeof(buffer));
00155     while (n > 0) {
00156         m_buffer.write(buffer, n);
00157         n = getStream()->read(buffer, sizeof(buffer));
00158     }
00159 
00160     // if we don't yet have the next packet size then get it,
00161     // if possible.
00162     readPacketSize();
00163 
00164     // note if we now have a whole packet
00165     bool isReady = isReadyNoLock();
00166 
00167     // if we weren't ready before but now we are then send a
00168     // input ready event apparently from the filtered stream.
00169     return (wasReady != isReady);
00170 }
00171 
00172 void
00173 CPacketStreamFilter::filterEvent(const CEvent& event)
00174 {
00175     if (event.getType() == getInputReadyEvent()) {
00176         CLock lock(&m_mutex);
00177         if (!readMore()) {
00178             return;
00179         }
00180     }
00181     else if (event.getType() == getInputShutdownEvent()) {
00182         // discard this if we have buffered data
00183         CLock lock(&m_mutex);
00184         m_inputShutdown = true;
00185         if (m_size != 0) {
00186             return;
00187         }
00188     }
00189 
00190     // pass event
00191     CStreamFilter::filterEvent(event);
00192 }

Generated on Fri Nov 6 00:21:14 2009 for synergy-plus by  doxygen 1.3.9.1