libzypp  17.34.1
asyncdatasource.cpp
Go to the documentation of this file.
2 
4 #include <zypp-core/zyppng/base/AutoDisconnect>
5 #include <zypp-core/zyppng/base/EventDispatcher>
7 
8 namespace zyppng {
9 
10  void AsyncDataSourcePrivate::notifierActivated( const SocketNotifier &notify, int evTypes )
11  {
12  if ( _writeNotifier.get() == &notify ) {
13  if ( evTypes & SocketNotifier::Error ) {
14  DBG << "Closing due to error when polling" << std::endl;
16  return;
17  }
18  readyWrite();
19  } else {
20 
21  auto dev = std::find_if( _readFds.begin(), _readFds.end(),
22  [ &notify ]( const auto &dev ){ return ( dev._readNotifier.get() == &notify ); } );
23 
24  if ( dev == _readFds.end() ) {
25  return;
26  }
27 
28  readyRead( std::distance( _readFds.begin(), dev ) );
29  }
30  }
31 
32  void AsyncDataSourcePrivate::readyRead( uint channel )
33  {
34  auto bytesToRead = z_func()->rawBytesAvailable( channel );
35  if ( bytesToRead == 0 ) {
36  // make sure to check if bytes are available even if the ioctl call returns something different
37  bytesToRead = 4096;
38  }
39 
40  auto &_readBuf = _readChannels[channel];
41  char *buf = _readBuf.reserve( bytesToRead );
42  const auto bytesRead = z_func()->readData( channel, buf, bytesToRead );
43 
44  if ( bytesRead <= 0 ) {
45  _readBuf.chop( bytesToRead );
46 
47  switch( bytesRead ) {
48  // remote close , close the read channel
49  case 0: {
51  break;
52  }
53  // no data is available , just try again later
54  case -2: break;
55  // anything else
56  default:
57  case -1: {
59  break;
60  }
61  }
62  return;
63  }
64 
65  if ( bytesToRead > bytesRead )
66  _readBuf.chop( bytesToRead-bytesRead );
67 
68  if ( channel == _currentReadChannel )
69  _readyRead.emit();
70 
71  _channelReadyRead.emit( channel );
72  return;
73  }
74 
76  {
77  const auto nwrite = _writeBuffer.frontSize();
78  if ( !nwrite ) {
79  // disable Write notifications so we do not wake up without the need to
80  _writeNotifier->setEnabled( false );
81  return;
82  }
83 
84  const auto nBuf = _writeBuffer.front();
85  const auto written = eintrSafeCall( ::write, _writeFd, nBuf, nwrite );
86  if ( written == -1 ) {
87  switch ( errno ) {
88  case EACCES:
90  return;
91  case EAGAIN:
92 #if EAGAIN != EWOULDBLOCK
93  case EWOULDBLOCK:
94 #endif
95  return;
96  case EPIPE:
97  case ECONNRESET:
99  return;
100  default:
102  return;
103  }
104  return;
105  }
106  _writeBuffer.discard( written );
107  _sigBytesWritten.emit( written );
108 
109  if ( _writeBuffer.size() == 0 )
110  _sigAllBytesWritten.emit();
111  }
112 
114  {
115  bool sig = _writeFd >= 0;
116  _writeNotifier.reset();
117  _writeFd = -1;
119  _mode.unsetFlag( AsyncDataSource::WriteOnly );
120  if ( sig )
121  _sigWriteFdClosed.emit( reason );
122  }
123 
125  {
126  auto &readFd = _readFds[channel];
127  // we do not clear the read buffer so code has the opportunity to read whats left in there
128  bool sig = readFd._readFd >= 0;
129  readFd._readNotifier.reset();
130  readFd._readFd = -1;
131  if ( sig )
132  _sigReadFdClosed.emit( channel, reason );
133  }
134 
136 
138  { }
139 
141  : IODevice(d)
142  {}
143 
145  {
146  return std::shared_ptr<AsyncDataSource>( new AsyncDataSource );
147  }
148 
149 
150  bool AsyncDataSource::openFds ( const std::vector<int>& readFds, int writeFd )
151  {
152  Z_D();
153 
154  if ( d->_mode != IODevice::Closed )
155  return false;
156 
157  IODevice::OpenMode mode;
158 
159  bool error = false;
160  for ( const auto readFd : readFds ) {
161  if ( readFd >= 0 ) {
162  mode |= IODevice::ReadOnly;
163  d->_readFds.push_back( {
164  readFd,
166  });
168  ERR << "Failed to set read FD to non blocking" << std::endl;
169  error = true;
170  break;
171  }
172  d->_readFds.back()._readNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
173  }
174  }
175 
176  if ( writeFd >= 0 && !error ) {
177  mode |= IODevice::WriteOnly;
179  ERR << "Failed to set write FD to non blocking" << std::endl;
180  error = true;
181  } else {
182  d->_writeFd = writeFd;
183  d->_writeNotifier = SocketNotifier::create( writeFd, SocketNotifier::Write | AbstractEventSource::Error, false );
184  d->_writeNotifier->connect( &SocketNotifier::sigActivated, *d, &AsyncDataSourcePrivate::notifierActivated );
185  }
186  }
187 
188  if( error || !IODevice::open( mode ) ) {
189  d->_mode = IODevice::Closed;
190  d->_readFds.clear();
191  d->_writeNotifier.reset();
192  d->_writeFd = -1;
193  return false;
194  }
195 
196  // make sure we have enough read buffers
197  setReadChannelCount( d->_readFds.size() );
198  return true;
199  }
200 
201  int64_t zyppng::AsyncDataSource::writeData( const char *data, int64_t count )
202  {
203  Z_D();
204  if ( count > 0 ) {
205  // we always use the write buffer, to make sure the fd is actually writeable
206  d->_writeBuffer.append( data, count );
207  d->_writeNotifier->setEnabled( true );
208  }
209  return count;
210  }
211 
212  int64_t zyppng::AsyncDataSource::readData( uint channel, char *buffer, int64_t bufsize )
213  {
214  Z_D();
215  if ( channel >= d->_readFds.size() ) {
216  ERR << constants::outOfRangeErrMsg << std::endl;
217  throw std::logic_error( constants::outOfRangeErrMsg.data() );
218  }
219  const auto read = eintrSafeCall( ::read, d->_readFds[channel]._readFd, buffer, bufsize );
220  if ( read < 0 ) {
221  switch ( errno ) {
222  #if EAGAIN != EWOULDBLOCK
223  case EWOULDBLOCK:
224  #endif
225  case EAGAIN: {
226  return -2;
227  }
228  default:
229  break;
230  }
231  }
232  return read;
233  }
234 
235  int64_t AsyncDataSource::rawBytesAvailable( uint channel ) const
236  {
237  Z_D();
238 
239  if ( channel >= d->_readFds.size() ) {
240  ERR << constants::outOfRangeErrMsg << std::endl;
241  throw std::logic_error( constants::outOfRangeErrMsg.data() );
242  }
243 
244  if ( isOpen() && canRead() )
245  return zyppng::bytesAvailableOnFD( d->_readFds[channel]._readFd );
246  return 0;
247  }
248 
250  {
251  Z_D();
252  if ( channel >= d->_readFds.size() ) {
253  ERR << constants::outOfRangeErrMsg << std::endl;
254  throw std::logic_error( constants::outOfRangeErrMsg.data() );
255  }
256  }
257 
259  {
260  Z_D();
261  for( uint i = 0; i < d->_readFds.size(); ++i ) {
262  auto &readChan = d->_readFds[i];
263  readChan._readNotifier.reset();
264  if ( readChan._readFd >= 0)
265  d->_sigReadFdClosed.emit( i, UserRequest );
266  }
267  d->_readFds.clear();
268 
269  d->_writeNotifier.reset();
270  d->_writeBuffer.clear();
271  if ( d->_writeFd >= 0 ) {
272  d->_writeFd = -1;
273  d->_sigWriteFdClosed.emit( UserRequest );
274  }
275 
276  IODevice::close();
277  }
278 
280  {
281  Z_D();
282 
283  // if we are open writeOnly, simply call close();
284  if ( !canRead() ) {
285  close();
286  return;
287  }
288 
289  d->_mode = ReadOnly;
290  d->_writeNotifier.reset();
291  d->_writeBuffer.clear();
292 
293  if ( d->_writeFd >= 0 ) {
294  d->_writeFd = -1;
295  d->_sigWriteFdClosed.emit( UserRequest );
296  }
297  }
298 
299  bool AsyncDataSource::waitForReadyRead( uint channel, int timeout )
300  {
301  Z_D();
302  if ( !canRead() )
303  return false;
304 
305  if ( channel >= d->_readFds.size() ) {
306  ERR << constants::outOfRangeErrMsg << std::endl;
307  throw std::logic_error( constants::outOfRangeErrMsg.data() );
308  }
309 
310  bool gotRR = false;
311  auto rrConn = AutoDisconnect( d->_channelReadyRead.connect([&]( uint activated ){
312  gotRR = ( channel == activated );
313  }) );
314 
315  // we can only wait if we are open for reading and still have a valid fd
316  auto &channelRef = d->_readFds[ channel ];
317  while ( readFdOpen(channel) && canRead() && !gotRR ) {
318  int rEvents = 0;
319  if ( EventDispatcher::waitForFdEvent( channelRef._readFd, AbstractEventSource::Read | AbstractEventSource::Error , rEvents, timeout ) ) {
320  //simulate signal from read notifier
321  d->notifierActivated( *channelRef._readNotifier, rEvents );
322  } else {
323  //timeout
324  return false;
325  }
326  }
327  return gotRR;
328  }
329 
331  {
332  Z_D();
333  if ( !canWrite() )
334  return;
335 
336  int timeout = -1;
337  while ( canWrite() && d->_writeBuffer.frontSize() ) {
338  int rEvents = 0;
340  //simulate signal from write notifier
341  d->readyWrite();
342  } else {
343  //timeout
344  return;
345  }
346  }
347  }
348 
350  {
351  return d_func()->_sigWriteFdClosed;
352  }
353 
355  {
356  return d_func()->_sigReadFdClosed;
357  }
358 
360  {
361  Z_D();
362  if ( !d->_readChannels.size() )
363  return false;
364  return readFdOpen( d_func()->_currentReadChannel );
365  }
366 
367  bool AsyncDataSource::readFdOpen(uint channel) const
368  {
369  Z_D();
370  if ( channel >= d->_readFds.size() ) {
371  ERR << constants::outOfRangeErrMsg << std::endl;
372  throw std::logic_error( constants::outOfRangeErrMsg.data() );
373  }
374  auto &channelRef = d->_readFds[ channel ];
375  return ( channelRef._readNotifier && channelRef._readFd >= 0 );
376  }
377 
378 }
bool canRead() const
Definition: iodevice.cc:73
SignalProxy< void(AsyncDataSource::ChannelCloseReason)> sigWriteFdClosed()
virtual bool open(const OpenMode mode)
Definition: iodevice.cc:16
void notifierActivated(const SocketNotifier &notify, int evTypes)
void setReadChannelCount(uint channels)
Definition: iodevice.cc:37
Signal< void(AsyncDataSource::ChannelCloseReason)> _sigWriteFdClosed
char * front()
Definition: iobuffer.cc:35
bool isOpen() const
Definition: iodevice.cc:83
static Ptr create(int socket, int evTypes, bool enable=true)
int64_t size() const
Definition: iobuffer.cc:154
void closeReadChannel(uint channel, AsyncDataSource::ChannelCloseReason reason)
Signal< void()> _sigAllBytesWritten
Definition: iodevice_p.h:47
SignalProxy< void(uint, AsyncDataSource::ChannelCloseReason)> sigReadFdClosed()
#define ERR
Definition: Logger.h:100
std::vector< ReadChannelDev > _readFds
#define Z_D()
Definition: zyppglobal.h:104
Signal< void(uint, AsyncDataSource::ChannelCloseReason)> _sigReadFdClosed
void closeWriteChannel(AsyncDataSource::ChannelCloseReason reason)
static bool waitForFdEvent(const int fd, int events, int &revents, int &timeout)
int64_t rawBytesAvailable(uint channel) const override
int64_t writeData(const char *data, int64_t count) override
Failed to block or unblock the fd.
IODevice::OpenMode _mode
Definition: iodevice_p.h:43
void readChannelChanged(uint channel) override
SocketNotifier::Ptr _writeNotifier
int64_t discard(int64_t bytes)
Definition: iobuffer.cc:55
int64_t frontSize() const
Definition: iobuffer.cc:43
Signal< void()> _readyRead
Definition: iodevice_p.h:44
constexpr std::string_view outOfRangeErrMsg("Channel index out of range")
virtual void closeWriteChannel()
auto eintrSafeCall(Fun &&function, Args &&... args)
int64_t readData(uint channel, char *buffer, int64_t bufsize) override
ZYPP_IMPL_PRIVATE(UnixSignalSource)
bool openFds(const std::vector< int > &readFds, int writeFd=-1)
std::map< std::string, std::string > read(const Pathname &_path)
Read sysconfig file path_r and return (key,valye) pairs.
Definition: sysconfig.cc:34
virtual void close()
Definition: iodevice.cc:30
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
bool write(const Pathname &path_r, const std::string &key_r, const std::string &val_r, const std::string &newcomment_r)
Add or change a value in sysconfig file path_r.
Definition: sysconfig.cc:80
bool canWrite() const
Definition: iodevice.cc:78
Signal< void(uint)> _channelReadyRead
Definition: iodevice_p.h:45
std::vector< IOBuffer > _readChannels
Definition: iodevice_p.h:39
Signal< void(int64_t)> _sigBytesWritten
Definition: iodevice_p.h:46
int64_t bytesAvailableOnFD(int fd)
Definition: linuxhelpers.cc:62
std::shared_ptr< AsyncDataSource > Ptr
BlockingMode setFDBlocking(int fd, bool mode)
Definition: IOTools.cc:31
#define DBG
Definition: Logger.h:97
bool waitForReadyRead(uint channel, int timeout) override