libzypp  17.34.1
providequeue.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 \---------------------------------------------------------------------*/
9 
10 #include "private/providequeue_p.h"
11 #include "private/provideitem_p.h"
12 #include "private/provide_p.h"
13 #include "private/providedbg_p.h"
14 
15 #include <zypp-core/fs/PathInfo.h>
16 #include <zypp-core/zyppng/rpc/MessageStream>
17 #include <zypp-core/base/StringV.h>
19 #include <zypp-media/MediaException>
20 #include <zypp-media/auth/CredentialManager>
21 
22 #include <zypp/APIConfig.h>
23 #include <bitset>
24 
25 namespace zyppng {
26 
28  {
29  if ( !_request )
30  return false;
31  return ( _request->code () == ProvideMessage::Code::Attach );
32  }
33 
35  {
36  if ( !_request )
37  return false;
38  return ( _request->code () == ProvideMessage::Code::Prov );
39  }
40 
42  {
43  if ( !_request )
44  return false;
45  return ( _request->code () == ProvideMessage::Code::Detach );
46  }
47 
49  { }
50 
52  {
54  if ( this->_activeItems.size() || this->_waitQueue.size() ) {
55  DBG << "Queue shutdown with Items still running" << std::endl;
56  }
57  }
58  immediateShutdown(std::make_exception_ptr(zypp::media::MediaException("Cancelled by queue shutdown")));
59  }
60 
61  bool ProvideQueue::startup(const std::string &workerScheme, const zypp::filesystem::Pathname &workDir, const std::string &hostname ) {
62 
63  if ( _workerProc ) {
64  ERR << "Queue Worker was already initialized" << std::endl;
65  return true;
66  }
67 
69 
70  const auto &pN = _parent.workerPath() / ( "zypp-media-"+workerScheme ) ;
71  MIL << "Trying to start " << pN << std::endl;
72  const auto &pi = zypp::PathInfo( pN );
73  if ( !pi.isExist() ) {
74  ERR << "Failed to find worker for " << workerScheme << std::endl;
75  return false;
76  }
77 
78  if ( !pi.userMayX() ) {
79  ERR << "Failed to start worker for " << workerScheme << " binary " << pi.asString() << " is not executable." << std::endl;
80  return false;
81  }
82 
83  if ( zypp::filesystem::assert_dir( workDir ) != 0 ) {
84  ERR << "Failed to assert working directory '" << workDir << "' for worker " << workerScheme << std::endl;
85  return false;
86  }
87 
88  _currentExe = pN;
90  _workerProc->setWorkingDirectory ( workDir );
92  return doStartup();
93  }
94 
95 
96  void ProvideQueue::enqueue( ProvideRequestRef request )
97  {
98  Item i;
99  i._request = request;
100  i._request->provideMessage().setRequestId( nextRequestId() );
101  request->setCurrentQueue( shared_this<ProvideQueue>() );
102  _waitQueue.push_back( std::move(i) );
103  if ( _parent.isRunning() )
104  scheduleNext();
105  }
106 
107  void ProvideQueue::cancel( ProvideRequest *item , std::exception_ptr error )
108  {
109  const auto &isSameItem = [item]( const Item &i ){
110  if ( i.isDetachRequest () )
111  return false;
112  return i._request.get() == item;
113  };
114 
115  if ( !item )
116  return;
117 
118  if ( item->code() != ProvideMessage::Code::Attach
119  && item->code() != ProvideMessage::Code::Prov ) {
120  ERR << "Can not cancel a " << item->code() << " request!" << std::endl;
121  return;
122  }
123 
124  if ( auto i = std::find_if( _waitQueue.begin(), _waitQueue.end(), isSameItem ); i != _waitQueue.end() ) {
125  auto &reqRef = i->_request;
126  reqRef->setCurrentQueue(nullptr);
127  if ( reqRef->owner() )
128  reqRef->owner()->finishReq( this, reqRef, error );
129  _waitQueue.erase(i);
130  _parent.schedule( ProvidePrivate::FinishReq ); // let the parent scheduler run since we have a open spot now
131  } else if ( auto i = std::find_if( _activeItems.begin(), _activeItems.end(), isSameItem ); i != _activeItems.end() ) {
132  cancelActiveItem(i, error);
133  }
134  }
135 
136  std::list<ProvideQueue::Item>::iterator ProvideQueue::dequeueActive( std::list<Item>::iterator it )
137  {
138  if ( it == _activeItems.end() )
139  return it;
140 
141  if ( it->_request )
142  it->_request->setCurrentQueue( nullptr );
143 
144  auto i = _activeItems.erase(it);
145  _parent.schedule ( ProvidePrivate::FinishReq ); // Trigger the scheduler
146  scheduleNext (); // keep the active items full
147  return i;
148  }
149 
150  void ProvideQueue::fatalWorkerError( const std::exception_ptr &reason )
151  {
152  immediateShutdown( reason ? reason : std::make_exception_ptr( zypp::media::MediaException("Fatal worker error")) );
153  }
154 
155  void ProvideQueue::immediateShutdown( const std::exception_ptr &reason )
156  {
157  _queueShuttingDown = true;
158 
159  while ( _waitQueue.size() ) {
160  auto &item = _waitQueue.front();
161  auto &reqRef = item._request;
162  if ( reqRef && reqRef->owner() && !item.isDetachRequest() )
163  reqRef->owner()->finishReq( this, reqRef, reason );
164  _waitQueue.pop_front();
165  }
166 
167  for ( auto i = _activeItems.begin(); i != _activeItems.end(); ) {
168  auto &reqRef = i->_request;
169  if ( reqRef && reqRef->owner() && !i->isDetachRequest() ) {
170  i = cancelActiveItem(i, reason );
171  } else {
172  i++;
173  }
174  }
175 
176  if ( _workerProc && _workerProc->isRunning() ) {
177  _workerProc->flush();
178  _workerProc->closeWriteChannel();
179  _workerProc->waitForExit();
180  readAllStderr();
181  }
182  }
183 
184  std::list< ProvideQueue::Item >::iterator ProvideQueue::cancelActiveItem( std::list< Item >::iterator i , const std::__exception_ptr::exception_ptr &error )
185  {
186  auto &reqRef = i->_request;
187 
188  // already in cancelling process or finished
189  if ( i->_state == Item::Cancelling || i->_state == Item::Finished )
190  return (++i);
191 
192  // not possible but lets be safe
193  if ( i->_state == Item::Pending ) {
194  reqRef->setCurrentQueue(nullptr);
195  if ( reqRef->owner() )
196  reqRef->owner()->finishReq( this, reqRef, error );
197  return dequeueActive(i);
198  }
199 
200  // we first need to cancel the item
201  auto c = ProvideMessage::createCancel ( i->_request->provideMessage().requestId() );
202  if( !_messageStream->sendMessage(c) )
203  ERR << "Failed to send cancel message to worker" << std::endl;
204 
205  i->_state = Item::Cancelling;
206  reqRef->setCurrentQueue(nullptr);
207  if ( reqRef->owner() )
208  reqRef->owner()->finishReq( this, reqRef, error );
209  reqRef.reset();
210  return (++i);
211  }
212 
214  {
215  if ( _queueShuttingDown )
216  return;
217 
218  while ( _waitQueue.size() && canScheduleMore() ) {
219  auto item = std::move( _waitQueue.front() );
220  _waitQueue.pop_front();
221 
222  auto &reqRef = item._request;
223  if ( !reqRef->activeUrl() ) {
224  ERR << "Item without active URL enqueued, this is a BUG." << std::endl;
225  if ( reqRef->owner() )
226  reqRef->owner()->finishReq( this, reqRef, ZYPP_EXCPT_PTR (zypp::media::MediaException("Item needs a activeURL to be queued.")) );
227  continue;
228  }
229 
230  if ( !_messageStream->sendMessage( reqRef->provideMessage() ) ) {
231  ERR << "Failed to send message to worker process." << std::endl;
232  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
233  return;
234  }
235 
236  item._state = Item::Queued;
237  _activeItems.push_back( std::move(item) );
238  _idleSince.reset();
239  }
240 
241  if ( _waitQueue.empty() && _activeItems.empty() ) {
243  if ( !_idleSince )
244  _idleSince = std::chrono::steady_clock::now();
245  _sigIdle.emit();
246  }
247  }
248 
250  {
252  }
253 
254  bool ProvideQueue::isIdle() const
255  {
256  return ( empty() );
257  }
258 
259  std::optional<ProvideQueue::TimePoint> ProvideQueue::idleSince() const
260  {
261  return _idleSince;
262  }
263 
264  bool ProvideQueue::empty() const
265  {
266  return ( _activeItems.empty() && _waitQueue.empty() );
267  }
268 
270  {
271  return _activeItems.size() + _waitQueue.size();
272  }
273 
275  {
276  return _activeItems.size();
277  }
278 
280  {
281  zypp::ByteCount dlSize;
282  for ( const auto &i : _waitQueue ) {
283  if ( i.isDetachRequest () )
284  continue;
285 
286  auto &reqRef = i._request;
287  if ( reqRef->code() != ProvideMessage::Code::Prov )
288  continue;
289  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
290  }
291  for ( const auto &i : _activeItems ) {
292  if ( i.isDetachRequest () )
293  continue;
294  auto &reqRef = i._request;
295  if ( reqRef->code() != ProvideMessage::Code::Prov )
296  continue;
297  dlSize += reqRef->provideMessage().value( ProvideMsgFields::ExpectedFilesize, int64_t(0) ).asInt64();
298  }
299  return dlSize;
300  }
301 
302  const std::string &ProvideQueue::hostname() const
303  {
304  return _myHostname;
305  }
306 
308  {
309  return _capabilities;
310  }
311 
313  {
314  return _sigIdle;
315  }
316 
318  {
319  if ( _currentExe.empty() )
320  return false;
321 
322  //const char *argv[] = { "gdbserver", ":10000", _currentExe.c_str(), nullptr };
323  const char *argv[] = { _currentExe.c_str(), nullptr };
324  if ( !_workerProc->start( argv) ) {
325  ERR << "Failed to execute worker" << std::endl;
326 
327  _messageStream.reset ();
328  _workerProc.reset ();
329 
330  return false;
331  }
332 
333  // make sure the default read channel is StdOut so RpcMessageStream gets all the rpc messages
334  _workerProc->setReadChannel ( Process::StdOut );
335 
336  // we are ready to send the data
337 
339  // @TODO actually write real config data :D
340  conf.insert ( { AGENT_STRING_CONF.data (), "ZYpp " LIBZYPP_VERSION_STRING } );
341  conf.insert ( { ATTACH_POINT.data (), _workerProc->workingDirectory().asString() } );
342  conf.insert ( { PROVIDER_ROOT.data (), _parent.z_func()->providerWorkdir().asString() } );
343 
344  const auto &cleanupOnErr = [&](){
345  readAllStderr();
346  _messageStream.reset ();
347  _workerProc->close();
348  _workerProc.reset();
349  return false;
350  };
351 
352  if ( !_messageStream->sendMessage( conf ) ) {
353  ERR << "Failed to send initial message to queue worker" << std::endl;
354  return cleanupOnErr();
355  }
356 
357  // wait for the data to be written
358  _workerProc->flush ();
359 
360  // wait until we receive a message
361  const auto &caps = _messageStream->nextMessageWait();
362  if ( !caps || caps->messagetypename() != WorkerCaps::staticTypeName() ) {
363  ERR << "Worker did not sent a capabilities message, aborting" << std::endl;
364  return cleanupOnErr();
365  }
366 
367  {
368  auto p = _messageStream->parseMessage<WorkerCaps>( *caps );
369  if ( !p )
370  return cleanupOnErr();
371 
372  _capabilities = std::move(*p);
373  }
374 
375  DBG << "Received config for worker: " << this->_currentExe.asString() << " Worker Type: " << this->_capabilities.worker_type() << " Flags: " << std::bitset<32>( _capabilities.cfg_flags() ).to_string() << std::endl;
376 
377  // now we can set up signals and start processing messages
381 
382  // make sure we do not miss messages
383  processMessage();
384  return true;
385  }
386 
388 
389  const auto &getRequest = [&]( const auto &exp ) -> decltype(_activeItems)::iterator {
390  if ( !exp ) {
391  ERR << "Ignoring invalid request!" << std::endl;
392  return _activeItems.end();
393  }
394 
395  auto i = std::find_if( _activeItems.begin(), _activeItems.end(), [&]( const auto &elem ) {
396  if ( ! elem._request )
397  return false;
398  return exp->requestId() == elem._request->provideMessage().requestId();
399  });
400 
401  if ( i == _activeItems.end() ) {
402  ERR << "Ignoring unknown request ID: " << exp->requestId() << std::endl;
403  return _activeItems.end();
404  }
405 
406  return i;
407  };
408 
409  const auto &sendErrorToWorker = [&]( const uint32_t reqId, const uint code, const std::string &reason, bool transient = false ) {
410  auto r = ProvideMessage::createErrorResponse ( reqId, code, reason, transient );
411  if ( !_messageStream->sendMessage( r ) ) {
412  ERR << "Failed to send Error message to worker process." << std::endl;
413  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
414  return false;
415  }
416  return true;
417  };
418 
419  const bool doesDownload = this->_capabilities.worker_type() == Config::Downloading;
420  const bool fileNeedsCleanup = doesDownload || ( _capabilities.worker_type() == Config::CPUBound && _capabilities.cfg_flags() & Config::FileArtifacts );
421 
422  while ( auto msg = _messageStream->nextMessage () ) {
423 
424  if ( msg->messagetypename() == ProvideMessage::staticTypeName() ) {
425 
426  const auto &provMsg = ProvideMessage::create(*msg);
427  if ( !provMsg ) {
428  fatalWorkerError( provMsg.error() );
429  return;
430  }
431 
432  const auto &reqIter = getRequest( provMsg );
433  if ( reqIter == _activeItems.end() ) {
434  if ( provMsg->code() == ProvideMessage::Code::ProvideFinished && fileNeedsCleanup ) {
435  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
436  if ( !_parent.isInCache(locFName) ) {
437  MIL << "Received a ProvideFinished message for a non existant request. Since this worker reported to create file artifacts, the file is cleaned up." << std::endl;
438  zypp::filesystem::unlink( locFName );
439  }
440  }
441  continue;
442  }
443 
444  auto &req = *reqIter;
445  auto &reqRef =req._request;
446 
447  const auto code = provMsg->code();
448 
450 
451  // send the message to the item but don't dequeue
452  if ( reqRef && reqRef->owner() )
453  reqRef->owner()->informalMessage ( *this, reqRef, *provMsg );
454  continue;
455 
457 
458  if ( req._state == Item::Cancelling ) {
459  req._state = Item::Finished;
460  dequeueActive( reqIter );
461  continue;
462  }
463 
465 
466  // we are going to register the file to the cache if this is a downloading worker, so it can not leak
467  // no matter if the item does the correct dance or not, this code is duplicated by all ProvideItems that receive ProvideFinished
468  // results that require file cleanups.
469  // we keep the ref around until after sending the result to the item. At that point it should take a reference
470  std::optional<zypp::ManagedFile> dataRef;
471 
472  if ( !reqIter->isFileRequest() ) {
473  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
475  return;
476  }
477 
478  // when a worker is downloading we keep a internal book of cache files
479  if ( doesDownload ) {
480  const auto locFName = provMsg->value( ProvideFinishedMsgFields::LocalFilename ).asString();
481  if ( provMsg->value( ProvideFinishedMsgFields::CacheHit, false ).asBool()) {
482  dataRef = _parent.addToFileCache ( locFName );
483  if ( !dataRef ) {
484  MIL << "CACHE MISS, file " << locFName << " was already removed, queueing again" << std::endl;
485  if ( reqRef->owner() )
486  reqRef->owner()->cacheMiss( reqRef );
487  reqRef->provideMessage().setRequestId( InvalidId );
488  req._state = Item::Pending;
489  _waitQueue.push_front( req );
490  dequeueActive( reqIter );
491  continue;
492  }
493  } else {
494  dataRef = _parent.addToFileCache ( locFName );
495 
496  // unlikely this can happen but better be safe than sorry
497  if ( !dataRef ) {
498  req._state = Item::Finished;
499  reqRef->setCurrentQueue(nullptr);
500  auto resp = ProvideMessage::createErrorResponse ( provMsg->requestId(), ProvideMessage::Code::InternalError, "File vanished between downloading and adding it to cache." );
501  if ( reqRef->owner() )
502  reqRef->owner()->finishReq( *this, reqRef, resp );
503  dequeueActive( reqIter );
504  continue;
505  }
506  }
507  }
508  }
509 
510  // send the message to the item and dequeue
511  reqRef->setCurrentQueue(nullptr);
512  if ( reqRef->owner() )
513  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
514  req._state = Item::Finished;
515  dequeueActive( reqIter );
516  continue;
517 
519 
520  if ( req._state == Item::Cancelling ) {
521  req._state = Item::Finished;
522  dequeueActive( reqIter );
523  continue;
524  }
525 
526  // send the message to the item and dequeue
527  reqRef->setCurrentQueue(nullptr);
528 
529  if ( reqRef->owner() )
530  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
531 
532  req._state = Item::Finished;
533  dequeueActive( reqIter );
534  continue;
535 
537 
538  // redir is like a finished message, we can simply forgot about a cancelling request
539  if ( req._state == Item::Cancelling ) {
540  req._state = Item::Finished;
541  dequeueActive( reqIter );
542  continue;
543  }
544 
545  // send the message to the item and dequeue
546  reqRef->setCurrentQueue(nullptr);
547  if ( reqRef->owner() )
548  reqRef->owner()->finishReq( *this, reqRef, *provMsg );
549  req._state = Item::Finished;
550  dequeueActive( reqIter );
551  continue;
552 
554 
555  ERR << "Received Controller message from worker, this is a fatal error. Cancelling all requests!" << std::endl;
556  fatalWorkerError ( ZYPP_EXCPT_PTR( zypp::media::MediaException("Controller message received from worker.") ) );
557  return;
558 
560 
562  if ( !reqIter->isFileRequest() && !reqIter->isAttachRequest() ) {
563  ERR << "Invalid message for request ID: " << reqRef->provideMessage().requestId() << std::endl;
565  return;
566  }
567 
568  // if the file was cancelled we send a failure back
569  if( reqIter->_state == Item::Cancelling ) {
570  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item was cancelled") )
571  return;
572  continue;
573  }
574 
575  // we need a owner item to fetch the auth data for us
576  if ( !reqRef->owner() ) {
577  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Request has no owner" ) )
578  return;
579  continue;
580  }
581 
582  if ( !reqRef->activeUrl() ) {
583  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "Item has no active URL, this is a bug." ) )
584  return;
585  continue;
586  }
587 
588  try {
589  zypp::Url u( provMsg->value( AuthDataRequestMsgFields::EffectiveUrl ).asString() );
590 
591  std::map<std::string, std::string> extraVals;
592  provMsg->forEachVal( [&]( const std::string &name, const zyppng::ProvideMessage::FieldVal &val ) {
593 
596  return true;
597 
598  if ( !val.isString() ) {
599  WAR << "Ignoring non string value for " << name << std::endl;
600  return true;
601  }
602 
603  extraVals[name] = val.asString();
604  return true;
605  });
606 
607  const auto &authOpt = reqRef->owner()->authenticationRequired( *this, reqRef, u, provMsg->value( AuthDataRequestMsgFields::LastAuthTimestamp ).asInt64(), extraVals );
608  if ( !authOpt ) {
609  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, "No auth given by user." ) )
610  return;
611  continue;
612  }
613 
614  auto r = ProvideMessage::createAuthInfo ( reqRef->provideMessage().requestId(), authOpt->username(), authOpt->password(), authOpt->lastDatabaseUpdate(), authOpt->extraValues() );
615  if ( !_messageStream->sendMessage( r ) ) {
616  ERR << "Failed to send AuthorizationInfo to worker process." << std::endl;
617  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
618  return;
619  }
620  continue;
621 
622  } catch ( const zypp::Exception &e ) {
623  ZYPP_CAUGHT(e);
624  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::NoAuthData, e.asString() ) )
625  return;
626  continue;
627  }
628 
629  } else if ( code == ProvideMessage::Code::MediaChangeRequest ) {
630 
631  if ( !reqIter->isAttachRequest() ) {
632  ERR << "Invalid message for request ID: " << reqIter->_request->provideMessage().requestId() << std::endl;
634  return;
635  }
636 
637  // if the file was cancelled we send a failure back
638  if( reqIter->_state == Item::Cancelling ) {
639  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Item was cancelled" ) )
640  return;
641  continue;
642  }
643 
644  MIL << "Worker sent a MediaChangeRequest, asking the user to insert the correct medium" << std::endl;
645 
646  //const std::string &label, const int32_t mediaNr, const std::vector<std::string> &devices, const std::optional<std::string> &desc
647  std::vector<std::string> freeDevs;
648  for ( const auto &val : provMsg->values( MediaChangeRequestMsgFields::Device) ) {
649  freeDevs.push_back( val.asString() );
650  }
651 
652  std::optional<std::string> desc;
653  const auto &descVal = provMsg->value( MediaChangeRequestMsgFields::Desc );
654  if ( descVal.valid () && descVal.isString() )
655  desc = descVal.asString();
656 
657  auto res = _parent._sigMediaChange.emit(
658  _parent.queueName(*this),
659  provMsg->value( MediaChangeRequestMsgFields::Label ).asString(),
660  provMsg->value( MediaChangeRequestMsgFields::MediaNr ).asInt(),
661  freeDevs,
662  desc
663  );
664 
665  auto action = res ? *res : Provide::Action::ABORT;
666  switch ( action ) {
667  case Provide::Action::RETRY: {
668  MIL << "Sending back a MediaChanged message, retrying to find medium " << std::endl;
669  auto r = ProvideMessage::createMediaChanged ( reqIter->_request->provideMessage().requestId() );
670  if ( !_messageStream->sendMessage( r ) ){
671  ERR << "Failed to send MediaChanged to worker process." << std::endl;
672  fatalWorkerError( ZYPP_EXCPT_PTR( zypp::media::MediaException("Failed to communicate with worker process.") ) );
673  return;
674  }
675  continue;
676  }
677  case Provide::Action::ABORT: {
678  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
679  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeAbort, "Cancelled by User" ) )
680  return;
681  continue;
682  }
683  case Provide::Action::SKIP: {
684  MIL << "Sending back a MediaChangeFailure message, request will fail " << std::endl;
685  if ( !sendErrorToWorker( reqRef->provideMessage().requestId(), ProvideMessage::Code::MediaChangeSkip, "Skipped by User" ) )
686  return;
687  continue;
688  }
689  }
690  } else {
691  // if there is a unsupported worker request we need to stop immediately because the worker will be blocked until it gets a answer
692  ERR << "Unsupported worker request: "<<code<<", this is a fatal error!" << std::endl;
694  return;
695  }
696 
697  } else {
698  // unknown code
699  ERR << "Received unsupported message " << msg->messagetypename() << " with code " << code << " ignoring! " << std::endl;
700  }
701 
702  } else {
703  ERR << "Received unsupported message " << msg->messagetypename() << "ignoring" << std::endl;
704  }
705  }
706  }
707 
713  {
714  // read all stderr data so we get the full logs
715  auto ba = _workerProc->channelReadLine(Process::StdErr);
716  while ( !ba.empty() ) {
717  forwardToLog(std::string( ba.data(), ba.size() ) );
718  ba = _workerProc->channelReadLine(Process::StdErr);
719  }
720  }
721 
722  void ProvideQueue::forwardToLog( std::string &&logLine )
723  {
725  zypp::base::LogControl::instance ().logRawLine( std::move(logLine) );
726  else
727  MIL << "Message from worker: " << _capabilities.worker_name() << ":" << logLine << std::endl;
728  }
729 
730  void ProvideQueue::processReadyRead(int channel) {
731  // ignore stdout here
732  if ( channel == Process::StdOut )
733  return;
734 
735  // forward the stderr output to the log bypassing the formatter
736  // the worker already formatted the line
737  while ( _workerProc->canReadLine(Process::StdErr) ) {
738  const auto &data = _workerProc->channelReadLine( Process::StdErr );
739  if ( data.empty() )
740  return;
741 
742  forwardToLog(std::string( data.data(), data.size() ) );
743  }
744  }
745 
746  void ProvideQueue::procFinished(int exitCode)
747  {
748  // process all pending messages
749  processMessage();
750 
751  // get all of the log lines
752  readAllStderr();
753 
754  // shut down
755  // @todo implement worker restart in case of a unexpected exit
756  if ( !_queueShuttingDown )
757  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
758 
759 #if 0
760  if ( !_queueShuttingDown ) {
761 
762  _crashCounter++;
763  if ( _crashCounter > 3 ) {
764  immediateShutdown( ZYPP_EXCPT_PTR( zypp::media::MediaException("Unexpected queue worker exit!") ) );
765  return;
766  }
767 
768  MIL << "Unexpected queue worker exit with code: " << exitCode << std::endl;
769  // try to spawn the worker again, move active items back to wait list and start over
770 
771  if ( !doStartup () ) {
772 
773  }
774  }
775 #endif
776  }
777 
779  return _parent.nextRequestId();
780  }
781 }
std::list< Item > _activeItems
static ProvideMessage createErrorResponse(const uint32_t reqId, const uint code, const std::string &reason, bool transient=false)
#define MIL
Definition: Logger.h:98
const std::string & asString() const
int assert_dir(const Pathname &path, unsigned mode)
Like &#39;mkdir -p&#39;.
Definition: PathInfo.cc:324
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:729
Signal< Provide::MediaChangeAction(const std::string &, const std::string &, const int32_t, const std::vector< std::string > &, const std::optional< std::string > &) > _sigMediaChange
Definition: provide_p.h:98
static Ptr create(IODevice::Ptr iostr)
SignalProxy< void(int)> sigFinished()
Definition: process.cpp:292
std::list< ProvideQueue::Item >::iterator dequeueActive(std::list< Item >::iterator it)
Store and operate with byte count.
Definition: ByteCount.h:31
static ProvideMessage createAuthInfo(const uint32_t reqId, const std::string &user, const std::string &pw, int64_t timestamp, const std::map< std::string, std::string > &extraValues={})
constexpr std::string_view ATTACH_POINT("zconfig://media/AttachPoint")
const zypp::Pathname & workerPath() const
Definition: provide.cc:829
RpcMessageStreamPtr _messageStream
const char * c_str() const
String representation.
Definition: Pathname.h:112
static ProvideMessage createMediaChanged(const uint32_t reqId)
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:96
bool provideDebugEnabled()
Definition: providedbg_p.h:28
static expected< ProvideMessage > create(const zyppng::RpcMessage &message)
uint32_t nextRequestId()
Definition: provide.cc:916
constexpr std::string_view PROVIDER_ROOT("zconfig://media/ProviderRoot")
bool canScheduleMore() const
std::optional< TimePoint > idleSince() const
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:433
static constexpr uint32_t InvalidId
SignalProxy< void()> sigMessageReceived()
ProvideRequestRef _request
constexpr std::string_view Label("label")
bool isString() const
#define ERR
Definition: Logger.h:100
Signal< void()> _sigIdle
void cancel(ProvideRequest *item, std::exception_ptr error)
static LogControl instance()
Singleton access.
Definition: LogControl.h:102
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:749
WeakPtr parent() const
Definition: base.cc:26
bool empty() const
Test for an empty path.
Definition: Pathname.h:116
uint32_t nextRequestId()
zypp::ByteCount expectedProvideSize() const
void immediateShutdown(const std::exception_ptr &reason)
constexpr std::string_view LocalFilename("local_filename")
uint activeRequests() const
static Ptr create()
Definition: process.cpp:47
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
constexpr std::string_view LastAuthTimestamp("last_auth_timestamp")
const std::string & asString() const
String representation.
Definition: Pathname.h:93
const Config & workerConfig() const
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:94
void procFinished(int exitCode)
Just inherits Exception to separate media exceptions.
constexpr std::string_view Device("device")
#define WAR
Definition: Logger.h:99
constexpr std::string_view MediaNr("media_nr")
bool isRunning() const
Definition: provide.cc:843
void forwardToLog(std::string &&logLine)
std::optional< TimePoint > _idleSince
ProvideQueue(ProvidePrivate &parent)
Definition: providequeue.cc:48
ProvidePrivate & _parent
void schedule(ScheduleReason reason)
Definition: provide.cc:38
WorkerType worker_type() const
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:834
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:705
constexpr std::string_view EffectiveUrl("effective_url")
Process::Ptr _workerProc
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:61
const std::string & worker_name() const
SignalProxy< void()> sigIdle()
#define ZYPP_CAUGHT(EXCPT)
Drops a logline telling the Exception was caught (in order to handle it).
Definition: Exception.h:437
SignalProxy< void(uint)> sigChannelReadyRead()
Definition: iodevice.cc:329
zypp::Pathname _currentExe
Base class for Exception.
Definition: Exception.h:146
static ProvideMessage createCancel(const uint32_t reqId)
void logRawLine(std::string &&line)
will push a line to the logthread without formatting it
Definition: LogControl.cc:934
void fatalWorkerError(const std::exception_ptr &reason=nullptr)
constexpr std::string_view AGENT_STRING_CONF("zconfig://media/UserAgent")
Wrapper class for ::stat/::lstat.
Definition: PathInfo.h:221
std::list< ProvideQueue::Item >::iterator cancelActiveItem(std::list< Item >::iterator i, const std::exception_ptr &error)
Flags cfg_flags() const
uint requestCount() const
std::deque< Item > _waitQueue
constexpr std::string_view Desc("desc")
constexpr std::string_view CacheHit("cacheHit")
void processReadyRead(int channel)
Url manipulation class.
Definition: Url.h:91
#define DBG
Definition: Logger.h:97
constexpr std::string_view ExpectedFilesize("expected_filesize")
const std::string & hostname() const