XRootD
Loading...
Searching...
No Matches
XrdClCopyProcess.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
32#include "XrdCl/XrdClMonitor.hh"
33#include "XrdCl/XrdClCopyJob.hh"
34#include "XrdCl/XrdClUtils.hh"
39
40#include <sys/time.h>
41
42#include <memory>
43
44namespace
45{
46 class QueuedCopyJob: public XrdCl::Job
47 {
48 public:
49 QueuedCopyJob( XrdCl::CopyJob *job,
51 uint16_t currentJob,
52 uint16_t totalJobs,
53 XrdSysSemaphore *sem = 0 ):
54 pJob(job), pProgress(progress), pCurrentJob(currentJob),
55 pTotalJobs(totalJobs), pSem(sem),
56 pWrtRetryCnt( XrdCl::DefaultRetryWrtAtLBLimit ),
57 pRetryCnt( XrdCl::DefaultCpRetry ),
58 pRetryPolicy( XrdCl::DefaultCpRetryPolicy )
59 {
60 XrdCl::DefaultEnv::GetEnv()->GetInt( "RetryWrtAtLBLimit", pWrtRetryCnt );
61 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpRetry", pRetryCnt );
62 XrdCl::DefaultEnv::GetEnv()->GetString( "CpRetryPolicy", pRetryPolicy );
63 }
64
65 //------------------------------------------------------------------------
67 //------------------------------------------------------------------------
68 virtual void Run( void * )
69 {
71 timeval bTOD;
72
73 //----------------------------------------------------------------------
74 // Report beginning of the copy
75 //----------------------------------------------------------------------
76 if( pProgress )
77 pProgress->BeginJob( pCurrentJob, pTotalJobs,
78 &pJob->GetSource(),
79 &pJob->GetTarget() );
80
81 if( mon )
82 {
84 i.transfer.origin = &pJob->GetSource();
85 i.transfer.target = &pJob->GetTarget();
87 }
88
89 gettimeofday( &bTOD, 0 );
90
91 //----------------------------------------------------------------------
92 // Do the copy
93 //----------------------------------------------------------------------
95 while( true )
96 {
97 st = pJob->Run( pProgress );
98 //--------------------------------------------------------------------
99 // Retry due to write-recovery
100 //--------------------------------------------------------------------
101 if( !st.IsOK() && st.code == XrdCl::errRetry && pWrtRetryCnt > 0 )
102 {
103 std::string url;
104 pJob->GetResults()->Get( "LastURL", url );
105 XrdCl::URL lastURL( url );
106 XrdCl::URL::ParamsMap cgi = lastURL.GetParams();
107 auto itr = cgi.find( "tried" );
108 if( itr != cgi.end() )
109 {
110 std::string tried = itr->second;
111 if( tried[tried.size() - 1] != ',' ) tried += ',';
112 tried += lastURL.GetHostName();
113 cgi["tried"] = tried;
114 }
115 else
116 cgi["tried"] = lastURL.GetHostName();
117
118 std::string recoveryRedir;
119 pJob->GetResults()->Get( "WrtRecoveryRedir", recoveryRedir );
120 XrdCl::URL recRedirURL( recoveryRedir );
121
122 std::string target;
123 pJob->GetProperties()->Get( "target", target );
124 XrdCl::URL trgURL( target );
125 trgURL.SetHostName( recRedirURL.GetHostName() );
126 trgURL.SetPort( recRedirURL.GetPort() );
127 trgURL.SetProtocol( recRedirURL.GetProtocol() );
128 trgURL.SetParams( cgi );
129 pJob->GetProperties()->Set( "target", trgURL.GetURL() );
130 pJob->Init();
131
132 // we have a new job, let's try again
133 --pWrtRetryCnt;
134 continue;
135 }
136 //--------------------------------------------------------------------
137 // Copy job retry
138 //--------------------------------------------------------------------
139 if( !st.IsOK() && pRetryCnt > 0 &&
143 {
144 if( pRetryPolicy == "continue" )
145 {
146 pJob->GetProperties()->Set( "force", false );
147 pJob->GetProperties()->Set( "continue", true );
148 }
149 else
150 {
151 pJob->GetProperties()->Set( "force", true );
152 pJob->GetProperties()->Set( "continue", false );
153 }
154 --pRetryCnt;
155 continue;
156 }
157
158 // we only loop in case of retry error
159 break;
160 }
161
162 pJob->GetResults()->Set( "status", st );
163
164 //----------------------------------------------------------------------
165 // Report end of the copy
166 //----------------------------------------------------------------------
167 if( mon )
168 {
169 std::vector<std::string> sources;
170 pJob->GetResults()->Get( "sources", sources );
172 i.transfer.origin = &pJob->GetSource();
173 i.transfer.target = &pJob->GetTarget();
174 i.sources = sources.size();
175 i.bTOD = bTOD;
176 gettimeofday( &i.eTOD, 0 );
177 i.status = &st;
179 }
180
181 if( pProgress )
182 pProgress->EndJob( pCurrentJob, pJob->GetResults() );
183
184 if( pSem )
185 pSem->Post();
186 }
187
188 private:
189 XrdCl::CopyJob *pJob;
191 uint16_t pCurrentJob;
192 uint16_t pTotalJobs;
193 XrdSysSemaphore *pSem;
194 int pWrtRetryCnt;
195 int pRetryCnt;
196 std::string pRetryPolicy;
197 };
198};
199
200namespace XrdCl
201{
203 {
204 std::vector<PropertyList> pJobProperties;
205 std::vector<PropertyList*> pJobResults;
206 std::vector<CopyJob*> pJobs;
207 };
208
209 //----------------------------------------------------------------------------
210 // Destructor
211 //----------------------------------------------------------------------------
213 {
214 }
215
216 //----------------------------------------------------------------------------
217 // Destructor
218 //----------------------------------------------------------------------------
220 {
221 CleanUpJobs();
222 delete pImpl;
223 }
224
225 //----------------------------------------------------------------------------
226 // Add job
227 //----------------------------------------------------------------------------
229 PropertyList *results )
230 {
231 Env *env = DefaultEnv::GetEnv();
232
233 //--------------------------------------------------------------------------
234 // Process a configuraion job
235 //--------------------------------------------------------------------------
236 if( properties.HasProperty( "jobType" ) &&
237 properties.Get<std::string>( "jobType" ) == "configuration" )
238 {
239 if( pImpl->pJobProperties.size() > 0 &&
240 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
241 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
242 {
243 PropertyList &config = *pImpl->pJobProperties.rbegin();
244 PropertyList::PropertyMap::const_iterator it;
245 for( it = properties.begin(); it != properties.end(); ++it )
246 config.Set( it->first, it->second );
247 }
248 else
249 pImpl->pJobProperties.push_back( properties );
250 return XRootDStatus();
251 }
252
253 //--------------------------------------------------------------------------
254 // Validate properties
255 //--------------------------------------------------------------------------
256 if( !properties.HasProperty( "source" ) )
257 return XRootDStatus( stError, errInvalidArgs, 0, "source not specified" );
258
259 if( !properties.HasProperty( "target" ) )
260 return XRootDStatus( stError, errInvalidArgs, 0, "target not specified" );
261
262 pImpl->pJobProperties.push_back( properties );
263 PropertyList &p = pImpl->pJobProperties.back();
264
265 const char *bools[] = {"target", "force", "posc", "coerce", "makeDir",
266 "zipArchive", "xcp", "preserveXAttr", "rmOnBadCksum",
267 "continue", "zipAppend", "doServer", 0};
268 for( int i = 0; bools[i]; ++i )
269 if( !p.HasProperty( bools[i] ) )
270 p.Set( bools[i], false );
271
272 if( !p.HasProperty( "thirdParty" ) )
273 p.Set( "thirdParty", "none" );
274
275 if( !p.HasProperty( "checkSumMode" ) )
276 p.Set( "checkSumMode", "none" );
277 else
278 {
279 if( !p.HasProperty( "checkSumType" ) )
280 {
281 pImpl->pJobProperties.pop_back();
283 "checkSumType not specified" );
284 }
285 else
286 {
287 //----------------------------------------------------------------------
288 // Checksum type has to be case insensitive
289 //----------------------------------------------------------------------
290 std::string checkSumType;
291 p.Get( "checkSumType", checkSumType );
292 std::transform(checkSumType.begin(), checkSumType.end(),
293 checkSumType.begin(), ::tolower);
294 p.Set( "checkSumType", checkSumType );
295 }
296 }
297
298 if( !p.HasProperty( "parallelChunks" ) )
299 {
300 int val = DefaultCPParallelChunks;
301 env->GetInt( "CPParallelChunks", val );
302 p.Set( "parallelChunks", val );
303 }
304
305 if( !p.HasProperty( "chunkSize" ) )
306 {
307 int val = DefaultCPChunkSize;
308 env->GetInt( "CPChunkSize", val );
309 p.Set( "chunkSize", val );
310 }
311
312 if( !p.HasProperty( "xcpBlockSize" ) )
313 {
314 int val = DefaultXCpBlockSize;
315 env->GetInt( "XCpBlockSize", val );
316 p.Set( "xcpBlockSize", val );
317 }
318
319 if( !p.HasProperty( "initTimeout" ) )
320 {
321 int val = DefaultCPInitTimeout;
322 env->GetInt( "CPInitTimeout", val );
323 p.Set( "initTimeout", val );
324 }
325
326 if( !p.HasProperty( "tpcTimeout" ) )
327 {
328 int val = DefaultCPTPCTimeout;
329 env->GetInt( "CPTPCTimeout", val );
330 p.Set( "tpcTimeout", val );
331 }
332
333 if( !p.HasProperty( "cpTimeout" ) )
334 {
335 int val = DefaultCPTimeout;
336 env->GetInt( "CPTimeout", val );
337 p.Set( "cpTimeout", val );
338 }
339
340 if( !p.HasProperty( "dynamicSource" ) )
341 p.Set( "dynamicSource", false );
342
343 if( !p.HasProperty( "xrate" ) )
344 p.Set( "xrate", 0 );
345
346 if( !p.HasProperty( "xrateThreshold" ) || p.Get<long long>( "xrateThreshold" ) == 0 )
347 {
348 int val = DefaultXRateThreshold;
349 env->GetInt( "XRateThreshold", val );
350 p.Set( "xrateThreshold", val );
351 }
352
353 //--------------------------------------------------------------------------
354 // Insert the properties
355 //--------------------------------------------------------------------------
356 Log *log = DefaultEnv::GetLog();
357 Utils::LogPropertyList( log, UtilityMsg, "Adding job with properties: %s",
358 p );
359 pImpl->pJobResults.push_back( results );
360 return XRootDStatus();
361 }
362
363 //----------------------------------------------------------------------------
364 // Prepare the copy jobs
365 //----------------------------------------------------------------------------
367 {
368 Log *log = DefaultEnv::GetLog();
369 std::vector<PropertyList>::iterator it;
370
371 log->Debug( UtilityMsg, "CopyProcess: %llu jobs to prepare",
372 (unsigned long long) pImpl->pJobProperties.size() );
373
374 std::map<std::string, uint32_t> targetFlags;
375 int i = 0;
376 for( it = pImpl->pJobProperties.begin(); it != pImpl->pJobProperties.end(); ++it, ++i )
377 {
378 PropertyList &props = *it;
379
380 if( props.HasProperty( "jobType" ) &&
381 props.Get<std::string>( "jobType" ) == "configuration" )
382 continue;
383
384 PropertyList *res = pImpl->pJobResults[i];
385 std::string tmp;
386
387 props.Get( "source", tmp );
388 URL source = tmp;
389 if( !source.IsValid() )
390 return XRootDStatus( stError, errInvalidArgs, 0, "invalid source" );
391
392 //--------------------------------------------------------------------------
393 // Create a virtual redirector if it is a Metalink file
394 //--------------------------------------------------------------------------
395 if( source.IsMetalink() )
396 {
398 XRootDStatus st = registry.RegisterAndWait( source );
399 if( !st.IsOK() ) return st;
400 }
401
402 // handle UNZIP CGI
403 const URL::ParamsMap &cgi = source.GetParams();
404 URL::ParamsMap::const_iterator itr = cgi.find( "xrdcl.unzip" );
405 if( itr != cgi.end() )
406 {
407 props.Set( "zipArchive", true );
408 props.Set( "zipSource", itr->second );
409 }
410
411 props.Get( "target", tmp );
412 URL target = tmp;
413 if( !target.IsValid() )
414 return XRootDStatus( stError, errInvalidArgs, 0, "invalid target" );
415
416 if( target.GetProtocol() != "stdio" )
417 {
418 // handle directories
419 bool targetIsDir = false;
420 props.Get( "targetIsDir", targetIsDir );
421
422 if( targetIsDir )
423 {
424 std::string path = target.GetPath() + '/';
425 std::string fn;
426
427 bool isZip = false;
428 props.Get( "zipArchive", isZip );
429 if( isZip )
430 {
431 props.Get( "zipSource", fn );
432 }
433 else if( source.IsMetalink() )
434 {
436 VirtualRedirector *redirector = registry.Get( source );
437 fn = redirector->GetTargetName();
438 }
439 else
440 {
441 fn = source.GetPath();
442 }
443
444 size_t pos = fn.rfind( '/' );
445 if( pos != std::string::npos )
446 fn = fn.substr( pos + 1 );
447 path += fn;
448 target.SetPath( path );
449 props.Set( "target", target.GetURL() );
450 }
451 }
452
453 bool tpc = false;
454 props.Get( "thirdParty", tmp );
455 if( tmp != "none" )
456 tpc = true;
457
458 //------------------------------------------------------------------------
459 // Check if we have all we need
460 //------------------------------------------------------------------------
461 if( source.GetProtocol() != "stdio" && source.GetPath().empty() )
462 {
463 log->Debug( UtilityMsg, "CopyProcess (job #%d): no source specified.",
464 i );
465 CleanUpJobs();
467 res->Set( "status", st );
468 return st;
469 }
470
471 if( target.GetProtocol() != "stdio" && target.GetPath().empty() )
472 {
473 log->Debug( UtilityMsg, "CopyProcess (job #%d): no target specified.",
474 i );
475 CleanUpJobs();
477 res->Set( "status", st );
478 return st;
479 }
480
481 //------------------------------------------------------------------------
482 // Check what kind of job we should do
483 //------------------------------------------------------------------------
484 CopyJob *job = 0;
485
486 if( tpc == true )
487 {
488 MarkTPC( props );
489 job = new TPFallBackCopyJob( i+1, &props, res );
490 }
491 else
492 job = new ClassicCopyJob( i+1, &props, res );
493
494 pImpl->pJobs.push_back( job );
495 }
496 return XRootDStatus();
497 }
498
499 //----------------------------------------------------------------------------
500 // Run the copy jobs
501 //----------------------------------------------------------------------------
503 {
504 //--------------------------------------------------------------------------
505 // Get the configuration
506 //--------------------------------------------------------------------------
507 uint8_t parallelThreads = 1;
508 if( pImpl->pJobProperties.size() > 0 &&
509 pImpl->pJobProperties.rbegin()->HasProperty( "jobType" ) &&
510 pImpl->pJobProperties.rbegin()->Get<std::string>( "jobType" ) == "configuration" )
511 {
512 PropertyList &config = *pImpl->pJobProperties.rbegin();
513 if( config.HasProperty( "parallel" ) )
514 parallelThreads = (uint8_t)config.Get<int>( "parallel" );
515 }
516
517 //--------------------------------------------------------------------------
518 // Run the show
519 //--------------------------------------------------------------------------
520 std::vector<CopyJob *>::iterator it;
521 uint16_t currentJob = 1;
522 uint16_t totalJobs = pImpl->pJobs.size();
523
524 //--------------------------------------------------------------------------
525 // Single thread
526 //--------------------------------------------------------------------------
527 if( parallelThreads == 1 )
528 {
529 XRootDStatus err;
530
531 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
532 {
533 QueuedCopyJob j( *it, progress, currentJob, totalJobs );
534 j.Run(0);
535
536 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
537 if( err.IsOK() && !st.IsOK() )
538 {
539 err = st;
540 }
541 ++currentJob;
542 }
543
544 if( !err.IsOK() ) return err;
545 }
546 //--------------------------------------------------------------------------
547 // Multiple threads
548 //--------------------------------------------------------------------------
549 else
550 {
551 uint16_t workers = std::min( (uint16_t)parallelThreads,
552 (uint16_t)pImpl->pJobs.size() );
553 JobManager jm( workers );
554 jm.Initialize();
555 if( !jm.Start() )
556 return XRootDStatus( stError, errOSError, 0,
557 "Unable to start job manager" );
558
559 XrdSysSemaphore *sem = new XrdSysSemaphore(0);
560 std::vector<QueuedCopyJob*> queued;
561 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
562 {
563 QueuedCopyJob *j = new QueuedCopyJob( *it, progress, currentJob,
564 totalJobs, sem );
565
566 queued.push_back( j );
567 jm.QueueJob(j, 0);
568 ++currentJob;
569 }
570
571 std::vector<QueuedCopyJob*>::iterator itQ;
572 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
573 sem->Wait();
574 delete sem;
575
576 if( !jm.Stop() )
577 return XRootDStatus( stError, errOSError, 0,
578 "Unable to stop job manager" );
579 jm.Finalize();
580 for( itQ = queued.begin(); itQ != queued.end(); ++itQ )
581 delete *itQ;
582
583 for( it = pImpl->pJobs.begin(); it != pImpl->pJobs.end(); ++it )
584 {
585 XRootDStatus st = (*it)->GetResults()->Get<XRootDStatus>( "status" );
586 if( !st.IsOK() ) return st;
587 }
588 };
589 return XRootDStatus();
590 }
591
592 void CopyProcess::CleanUpJobs()
593 {
594 std::vector<CopyJob*>::iterator itJ;
595 for( itJ = pImpl->pJobs.begin(); itJ != pImpl->pJobs.end(); ++itJ )
596 {
597 CopyJob *job = *itJ;
598 URL src = job->GetSource();
599 if( src.IsMetalink() )
600 {
602 registry.Release( src );
603 }
604 delete job;
605 }
606 pImpl->pJobs.clear();
607 }
608}
const URL & GetSource() const
Get source.
virtual ~CopyProcess()
Destructor.
CopyProcess()
Constructor.
XRootDStatus Run(CopyProgressHandler *handler)
Run the copy jobs.
XRootDStatus AddJob(const PropertyList &properties, PropertyList *results)
Interface for copy progress notification.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
A synchronized queue.
bool Finalize()
Finalize the job manager, clear the queues.
bool Start()
Start the workers.
bool Initialize()
Initialize the job manager.
bool Stop()
Stop the workers.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
TransferInfo transfer
The transfer in question.
@ EvCopyBeg
CopyBInfo: Copy operation started.
@ EvCopyEnd
CopyEInfo: Copy operation ended.
virtual void Event(EventCode evCode, void *evData)=0
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
PropertyMap::const_iterator end() const
Get the end iterator.
bool Get(const std::string &name, Item &item) const
bool HasProperty(const std::string &name) const
Check if we now about the given name.
PropertyMap::const_iterator begin() const
Get the begin iterator.
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
void Release(const URL &url)
Release the virtual redirector associated with the given URL.
XRootDStatus RegisterAndWait(const URL &url)
Creates a new virtual redirector and registers it (sync).
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:458
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
void SetPath(const std::string &path)
Set the path.
Definition XrdClURL.hh:225
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
static void LogPropertyList(Log *log, uint64_t topic, const char *format, const PropertyList &list)
Log property list.
An interface for metadata redirectors.
virtual std::string GetTargetName() const =0
Gets the file name as specified in the metalink.
const int DefaultCPInitTimeout
const int DefaultXRateThreshold
const uint16_t errOperationExpired
const int DefaultCPChunkSize
const uint16_t stError
An error occurred that could potentially be retried.
const int DefaultRetryWrtAtLBLimit
std::vector< PropertyList * > pJobResults
const int DefaultCPParallelChunks
const uint16_t errOSError
const int DefaultXCpBlockSize
const uint64_t UtilityMsg
const int DefaultCPTimeout
const uint16_t errInvalidArgs
std::vector< PropertyList > pJobProperties
const int DefaultCpRetry
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errThresholdExceeded
const char *const DefaultCpRetryPolicy
const int DefaultCPTPCTimeout
std::vector< CopyJob * > pJobs
Describe an end of copy event.
TransferInfo transfer
The transfer in question.
int sources
Number of sources used for the copy.
timeval bTOD
Copy start time.
const XRootDStatus * status
Status of the copy.
timeval eTOD
Copy end time.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
static bool IsSocketError(uint16_t code)