XRootD
Loading...
Searching...
No Matches
XrdClHttpFile.cc
Go to the documentation of this file.
1/***************************************************************
2 *
3 * Copyright (C) 2025, Morgridge Institute for Research
4 *
5 ***************************************************************/
6
7#include "XrdClHttpFile.hh"
9#include "XrdClHttpOps.hh"
11#include "XrdClHttpResponses.hh"
12#include "XrdClHttpUtil.hh"
13#include "XrdClHttpWorker.hh"
14
17#include <XrdCl/XrdClLog.hh>
18#include <XrdCl/XrdClStatus.hh>
19#include <XrdCl/XrdClURL.hh>
20#include <XrdOuc/XrdOucCRC.hh>
22#include <XrdOuc/XrdOucJson.hh>
23
24#include <charconv>
25#include <iostream>
26
27using namespace XrdClHttp;
28
29std::atomic<uint64_t> File::m_prefetch_count = 0;
30std::atomic<uint64_t> File::m_prefetch_expired_count = 0;
31std::atomic<uint64_t> File::m_prefetch_failed_count = 0;
32std::atomic<uint64_t> File::m_prefetch_reads_hit = 0;
33std::atomic<uint64_t> File::m_prefetch_reads_miss = 0;
34std::atomic<uint64_t> File::m_prefetch_bytes_used = 0;
35
36namespace {
37
38// A response handler for the file open operation when "full download" is requested.
39//
40// In this case, the open triggers a GET of the entire object with a zero-sized buffer;
41// that means the response handler is invoked as soon as the GET response is started.
42// Subsequent calls to Read() will return the data from the GET response.
43class OpenFullDownloadResponseHandler : public XrdCl::ResponseHandler {
44public:
45 OpenFullDownloadResponseHandler(bool *is_opened, bool send_response_info, XrdCl::ResponseHandler *handler)
46 : m_send_response_info(send_response_info), m_is_opened(is_opened), m_handler(handler)
47 {}
48
49 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
50 std::unique_ptr<OpenFullDownloadResponseHandler> holder(this);
51 std::unique_ptr<XrdCl::AnyObject> response_holder(response);
52 std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
53
54 if (!status || !status->IsOK()) {
55 if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
56 return;
57 }
58 if (m_is_opened) *m_is_opened = true;
59 if (!m_handler) {
60 return;
61 }
62 if (m_send_response_info) {
63 XrdCl::ChunkInfo *ci = nullptr;
64 response->Get(ci);
65 if (!ci) {
66 m_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal, ENOENT, "No ChunkInfo in response"), nullptr);
67 return;
68 }
69 std::unique_ptr<XrdClHttp::ReadResponseInfo> read_response_info(static_cast<XrdClHttp::ReadResponseInfo *>(ci));
70 auto info = read_response_info->GetResponseInfo();
71 XrdClHttp::OpenResponseInfo *open_info(new XrdClHttp::OpenResponseInfo());
72 open_info->SetResponseInfo(std::move(info));
73 auto obj = new XrdCl::AnyObject();
74 obj->Set(open_info);
75 m_handler->HandleResponse(status_holder.release(), obj);
76 } else {
77 m_handler->HandleResponse(status_holder.release(), nullptr);
78 }
79 }
80private:
81 bool m_send_response_info; // If true, the response handler will set the response info object.
82 bool *m_is_opened; // If the file-open is successful, this will be set to true.
83 XrdCl::ResponseHandler *m_handler; // The handler to call with the final result
84};
85
86// A response handler for the "normal" open mode (which typically translates
87// to a HEAD or PROPFIND).
88class OpenResponseHandler : public XrdCl::ResponseHandler {
89public:
90 OpenResponseHandler(bool *is_opened, XrdCl::ResponseHandler *handler)
91 : m_is_opened(is_opened), m_handler(handler)
92 {}
93
94 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
95 std::unique_ptr<OpenResponseHandler> holder(this);
96 std::unique_ptr<XrdCl::AnyObject> response_holder(response);
97 std::unique_ptr<XrdCl::XRootDStatus> status_holder(status);
98
99 if (!status || !status->IsOK()) {
100 if (m_handler) m_handler->HandleResponse(status_holder.release(), response_holder.release());
101 return;
102 }
103 if (m_is_opened) *m_is_opened = true;
104 if (!m_handler) {
105 return;
106 }
107 m_handler->HandleResponse(status_holder.release(), response_holder.release());
108 }
109
110private:
111 bool *m_is_opened; // If the file-open is successful, this will be set to true.
112 XrdCl::ResponseHandler *m_handler; // The handler to call with the final result
113};
114
115// A response handler that transforms the read result into a PageInfo object.
116// This is used for page reads which require a checksum of each page; note
117// this is computed client-side whereas for the xroot protocol the checksum is computed server-side.
118class PgReadResponseHandler : public XrdCl::ResponseHandler {
119public:
120 PgReadResponseHandler(XrdCl::ResponseHandler *handler)
121 : m_handler(handler)
122 {}
123
124 virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
125 std::unique_ptr<PgReadResponseHandler> holder(this);
126 if (!status || !status->IsOK()) {
127 if (m_handler) m_handler->HandleResponse(status, response);
128 else delete response;
129 return;
130 }
131 if (!m_handler) {
132 delete response;
133 return;
134 }
135
136 // Transform the read result ChunkInfo into a PageInfo.
137 XrdCl::ChunkInfo *ci = nullptr;
138 response->Get(ci);
139 if (!ci) {
140 delete response;
141 if (m_handler) m_handler->HandleResponse(status, nullptr);
142 return;
143 }
144 std::vector<uint32_t> cksums;
145 size_t nbpages = ci->GetLength() / XrdSys::PageSize;
146 if (ci->GetLength() % XrdSys::PageSize) ++nbpages;
147 cksums.reserve(nbpages);
148
149 auto buffer = static_cast<const char *>(ci->GetBuffer());
150 size_t size = ci->GetLength();
151 for (size_t pg=0; pg<nbpages; ++pg)
152 {
153 auto pgsize = static_cast<size_t>(XrdSys::PageSize);
154 if (pgsize > size) pgsize = size;
155 cksums.push_back(XrdOucCRC::Calc32C(buffer, pgsize));
156 buffer += pgsize;
157 size -= pgsize;
158 }
159
160 auto page_info = new XrdCl::PageInfo(ci->GetOffset(), ci->GetLength(), ci->GetBuffer(), std::move(cksums));
161 auto obj = new XrdCl::AnyObject();
162 obj->Set(page_info);
163 delete response;
164 auto handle = m_handler;
165 m_handler = nullptr;
166 handle->HandleResponse(status, obj);
167 }
168
169private:
170 XrdCl::ResponseHandler *m_handler;
171};
172
173// A response handler for close operations that require creating a zero-length
174// object.
175class CloseCreateHandler : public XrdCl::ResponseHandler {
176public:
177 CloseCreateHandler(XrdCl::ResponseHandler *handler)
178 : m_handler(handler)
179 {}
180
181 virtual void HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
182 std::unique_ptr<CloseCreateHandler> self(this);
183 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
184 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
185
186 if (m_handler) m_handler->HandleResponse(status.release(), nullptr);
187 }
188
189private:
190 XrdCl::ResponseHandler *m_handler;
191};
192
193} // anonymous namespace
194
195// Note: these values are typically overwritten by `CurlFactory::CurlFactory`;
196// they are set here just to avoid uninitialized globals.
197struct timespec XrdClHttp::File::m_min_client_timeout = {2, 0};
198struct timespec XrdClHttp::File::m_default_header_timeout = {9, 5};
199struct timespec XrdClHttp::File::m_fed_timeout = {5, 0};
200
201
202File::~File() noexcept {
203 auto handler = m_put_handler.load(std::memory_order_acquire);
204 if (handler) {
205 // We must wait for all ongoing writes to complete; the XrdCl::File
206 // destructor will trigger a Close() operation when it is called without
207 // waiting for the Close to finish, then invoke our destructor.
208 // If the Close() is still ongoing, then the handler will receive a
209 // callback after its memory is freed.
210 handler->WaitForCompletion();
211 delete handler;
212 }
213}
214
216File::GetConnCallout() const {
217 std::string pointer_str;
218 if (!GetProperty("XrdClConnectionCallout", pointer_str) && pointer_str.empty()) {
219 return nullptr;
220 }
221 long long pointer;
222 try {
223 pointer = std::stoll(pointer_str, nullptr, 16);
224 } catch (...) {
225 return nullptr;
226 }
227 if (!pointer) {
228 return nullptr;
229 }
230 return reinterpret_cast<CreateConnCalloutType>(pointer);
231}
232
233struct timespec
234File::ParseHeaderTimeout(const std::string &timeout_string, XrdCl::Log *logger)
235{
236 struct timespec ts = File::GetDefaultHeaderTimeout();
237 if (!timeout_string.empty()) {
238 std::string errmsg;
239 // Parse the provided timeout and decrease by a second if we can (if it's below a second, halve it).
240 // The thinking is that if the client needs a response in N seconds, then we ought to set the internal
241 // timeout to (N-1) seconds to provide enough time for our response to arrive at the client.
242 if (!XrdClHttp::ParseTimeout(timeout_string, ts, errmsg)) {
243 logger->Error(kLogXrdClHttp, "Failed to parse xrdclhttp.timeout parameter: %s", errmsg.c_str());
244 } else if (ts.tv_sec >= 1) {
245 ts.tv_sec--;
246 } else {
247 ts.tv_nsec /= 2;
248 }
249 }
250 const auto mct = File::GetMinimumHeaderTimeout();
251 if (ts.tv_sec < mct.tv_sec ||
252 (ts.tv_sec == mct.tv_sec && ts.tv_nsec < mct.tv_nsec))
253 {
254 ts.tv_sec = mct.tv_sec;
255 ts.tv_nsec = mct.tv_nsec;
256 }
257
258 return ts;
259}
260
261struct timespec
262File::GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
263{
264 if (oper_timeout == 0) {
266 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
267 oper_timeout = val;
268 }
269 if (oper_timeout <= 0) {
270 return header_timeout;
271 }
272 if (oper_timeout == header_timeout.tv_sec) {
273 return {header_timeout.tv_sec, 0};
274 } else if (header_timeout.tv_sec < oper_timeout) {
275 return header_timeout;
276 } else { // header timeout is larger than the operation timeout
277 return {oper_timeout, 0};
278 }
279}
280
281struct timespec
282File::GetHeaderTimeout(time_t oper_timeout) const
283{
284 return GetHeaderTimeoutWithDefault(oper_timeout, m_header_timeout);
285}
286
287std::string
289{
290 return "{\"prefetch\": {"
291 "\"count\": " + std::to_string(m_prefetch_count) + ","
292 "\"expired\": " + std::to_string(m_prefetch_expired_count) + ","
293 "\"failed\": " + std::to_string(m_prefetch_failed_count) + ","
294 "\"reads_hit\": " + std::to_string(m_prefetch_reads_hit) + ","
295 "\"reads_miss\": " + std::to_string(m_prefetch_reads_miss) + ","
296 "\"bytes_used\": " + std::to_string(m_prefetch_bytes_used) +
297 "}}";
298}
299
301File::Open(const std::string &url,
304 XrdCl::ResponseHandler *handler,
305 time_t timeout)
306{
307 if (m_is_opened) {
308 m_logger->Error(kLogXrdClHttp, "URL %s already open", url.c_str());
310 }
311
312 // Note: workaround for a design flaw of the XrdCl API.
313 //
314 // Any properties we set on the file *prior* to opening it are sent to the
315 // XrdCl base implementation, not the plugin object. Hence, they are effectively
316 // ignored because the later `GetProperty` accesses a different object. We want
317 // the SetProperty calls to take effect because they are needed for successfully
318 // `Open`ing the file. There's no way to "setup the plugin", "set properties", and
319 // then "open file" because the first and third operations are part of the same API
320 // call. We thus allow the caller to trigger the plugin loading by doing a special
321 // `Open` call (flags set to Compress, access mode None) that is a no-op.
322 //
323 // Contrast the XrdCl::File plugin loading style with XrdCl::Filesystem; the latter
324 // gets a target URL on construction, before any operations are done, allowing
325 // the `SetProperty` to work.
326 if ((flags == XrdCl::OpenFlags::Compress) && (mode == XrdCl::Access::None) &&
327 (handler == nullptr) && (timeout == 0))
328 {
329 return XrdCl::XRootDStatus();
330 }
331
332 m_open_flags = flags;
333
334 m_header_timeout.tv_nsec = m_default_header_timeout.tv_nsec;
335 m_header_timeout.tv_sec = m_default_header_timeout.tv_sec;
336 auto parsed_url = XrdCl::URL();
337 parsed_url.SetPort(0);
338 if (!parsed_url.FromString(url)) {
339 m_logger->Error(kLogXrdClHttp, "Failed to parse provided URL as a valid URL: %s", url.c_str());
341 }
342 auto pm = parsed_url.GetParams();
343 auto iter = pm.find("xrdclhttp.timeout");
344 std::string timeout_string = (iter == pm.end()) ? "" : iter->second;
345 m_header_timeout = ParseHeaderTimeout(timeout_string, m_logger);
346 pm["xrdclhttp.timeout"] = XrdClHttp::MarshalDuration(m_header_timeout);
347 parsed_url.SetParams(pm);
348 iter = pm.find("oss.asize");
349 if (iter != pm.end()) {
350 off_t asize;
351 auto ec = std::from_chars(iter->second.c_str(), iter->second.c_str() + iter->second.size(), asize);
352 if ((ec.ec == std::errc()) && (ec.ptr == iter->second.c_str() + iter->second.size()) && asize >= 0) {
353 m_asize = asize;
354 } else {
355 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Unable to parse oss.asize to a valid size");
356 }
357 pm.erase(iter);
358 parsed_url.SetParams(pm);
359 }
360
361 m_url = parsed_url.GetURL();
362 m_last_url = "";
363 m_url_current = "";
364
365 auto ts = GetHeaderTimeout(timeout);
366
367 bool full_download = m_full_download.load(std::memory_order_relaxed);
368 m_default_prefetch_handler.reset(new PrefetchDefaultHandler(*this));
369 if (full_download) {
370 m_default_prefetch_handler->m_prefetch_enabled.store(true, std::memory_order_relaxed);
371 }
372
373 if (full_download && !(flags & XrdCl::OpenFlags::Write)) {
374 m_logger->Debug(kLogXrdClHttp, "Opening %s in full download mode", m_url.c_str());
375
376 handler = new OpenFullDownloadResponseHandler(&m_is_opened, SendResponseInfo(), handler);
377 m_prefetch_size = std::numeric_limits<off_t>::max();
378 auto [status, ok] = ReadPrefetch(0, 0, nullptr, handler, timeout, false);
379 if (ok) {
380 return status;
381 } else {
382 m_logger->Error(kLogXrdClHttp, "Failed to start prefetch of data at open (URL %s): %s", m_url.c_str(), status.ToString().c_str());
383 return status;
384 }
385 }
386
387
388 m_logger->Debug(kLogXrdClHttp, "Opening %s (with timeout %lld)", m_url.c_str(), (long long) timeout);
389
390 // This response handler sets the m_is_opened flag to true if the open callback is successfully invoked.
391 handler = new OpenResponseHandler(&m_is_opened, handler);
392
393 std::shared_ptr<XrdClHttp::CurlOpenOp> openOp(
395 handler, GetCurrentURL(), ts, m_logger, this, SendResponseInfo(), GetConnCallout(),
396 &m_default_header_callout
397 )
398 );
399 try {
400 m_queue->Produce(std::move(openOp));
401 } catch (...) {
402 m_logger->Warning(kLogXrdClHttp, "Failed to add open op to queue");
404 }
405
406 return XrdCl::XRootDStatus();
407}
408
411 time_t timeout)
412{
413 if (!m_is_opened) {
414 m_logger->Error(kLogXrdClHttp, "Cannot close. URL isn't open");
416 }
417 m_is_opened = false;
418
419 std::unique_ptr<XrdCl::XRootDStatus> status(new XrdCl::XRootDStatus{});
420 if (m_put_op && !m_put_op->HasFailed()) {
421 auto put_size = m_put_offset.load(std::memory_order_relaxed);
422 if (m_asize >= 0 && put_size == m_asize) {
423 if (put_size == m_asize) {
424 m_logger->Debug(kLogXrdClHttp, "Closing a finished file %s", m_url.c_str());
425 } else {
426 m_logger->Debug(kLogXrdClHttp, "Closing a file %s with partial size (offset %llu, expected %lld)",
427 m_url.c_str(), static_cast<unsigned long long>(put_size), static_cast<long long>(m_asize));
429 0, "Cannot close file with partial size"));
430 }
431 } else {
432 m_logger->Debug(kLogXrdClHttp, "Flushing final write buffer on close");
433 auto put_handler = m_put_handler.load(std::memory_order_acquire);
434 if (put_handler) {
435 return put_handler->QueueWrite(std::make_pair(nullptr, 0), handler);
436 } else {
437 m_logger->Error(kLogXrdClHttp, "Internal state error - put operation ongoing without handle");
439 }
440 }
441 } else if (!m_put_op && m_open_flags & XrdCl::OpenFlags::Write) {
442 timespec ts;
443 timespec_get(&ts, TIME_UTC);
444 ts.tv_sec += timeout;
445 m_asize = 0;
446 auto handler_wrapper = new PutResponseHandler(new CloseCreateHandler(handler));
447 m_put_handler.store(handler_wrapper, std::memory_order_release);
448 m_put_op.reset(new XrdClHttp::CurlPutOp(
449 handler_wrapper, m_default_put_handler, m_url, nullptr, 0, ts, m_logger,
450 GetConnCallout(), &m_default_header_callout
451 ));
452 handler_wrapper->SetOp(m_put_op);
453 m_url_current = "";
454 m_last_url = "";
455 m_logger->Debug(kLogXrdClHttp, "Creating a zero-sized object at %s for close", m_url.c_str());
456 try {
457 m_queue->Produce(m_put_op);
458 } catch (...) {
459 m_put_handler.store(nullptr, std::memory_order_release);
460 m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
462 }
463 return {};
464 }
465
466 m_logger->Debug(kLogXrdClHttp, "Closed %s", m_url.c_str());
467 m_url_current = "";
468 m_last_url = "";
469
470 if (handler) {
471 handler->HandleResponse(status.release(), nullptr);
472 }
473 return XrdCl::XRootDStatus();
474}
475
477File::Stat(bool /*force*/,
478 XrdCl::ResponseHandler *handler,
479 time_t timeout)
480{
481 if (!m_is_opened) {
482 m_logger->Error(kLogXrdClHttp, "Cannot stat. URL isn't open");
484 }
485
486 std::string content_length_str;
487 int64_t content_length;
488 if (!GetProperty("ContentLength", content_length_str)) {
489 m_logger->Error(kLogXrdClHttp, "Content length missing for %s", m_url.c_str());
491 }
492 try {
493 content_length = std::stoll(content_length_str);
494 } catch (...) {
495 m_logger->Error(kLogXrdClHttp, "Content length not an integer for %s", m_url.c_str());
497 }
498 if (content_length < 0) {
499 m_logger->Error(kLogXrdClHttp, "Content length negative for %s", m_url.c_str());
501 }
502
503 m_logger->Debug(kLogXrdClHttp, "Successful stat operation on %s (size %lld)", m_url.c_str(), static_cast<long long>(content_length));
504 auto stat_info = new XrdCl::StatInfo("nobody", content_length,
506 auto obj = new XrdCl::AnyObject();
507 obj->Set(stat_info);
508
509 handler->HandleResponse(new XrdCl::XRootDStatus(), obj);
510 return XrdCl::XRootDStatus();
511}
512
515 time_t timeout)
516{
517 if (!m_is_opened) {
518 m_logger->Error(kLogXrdClHttp, "Cannot run fcntl. URL isn't open");
520 }
521
522 auto obj = new XrdCl::AnyObject();
523 std::string as = arg.ToString();
524 try
525 {
527 if (code == XrdCl::QueryCode::XAttr)
528 {
529 nlohmann::json xatt;
530 std::string etagRes;
531 if (GetProperty("ETag", etagRes))
532 {
533 xatt["ETag"] = etagRes;
534 }
535 std::string cc;
536 if (GetProperty("Cache-Control", cc))
537 {
538 if (cc.find("must-revalidate") != std::string::npos)
539 {
540 xatt["revalidate"] = true;
541 }
542 size_t fm = cc.find("max-age=");
543 if (fm != std::string::npos)
544 {
545 fm += 9; // idx of the first character after the make-age= match
546 for (size_t i = fm; i < cc.length(); i++)
547 {
548 if (!std::isdigit(cc[i]))
549 {
550 std::string sa = cc.substr(fm, i);
551 long int a = std::stol(sa);
552 time_t t = time(NULL) + a;
553 xatt["expire"] = t;
554 break;
555 }
556 }
557 }
558 }
559 XrdCl::Buffer *respBuff = new XrdCl::Buffer();
560 m_logger->Debug(kLogXrdClHttp, "Fcntl content %s", xatt.dump().c_str());
561 respBuff->FromString(xatt.dump());
562 obj->Set(respBuff);
563 }
564 //
565 // Query codes supported by XrdCl::File::Fctnl
566 //
567 else {
568 std::string msg;
570 switch (code) {
572 msg = "Server status query not supported.";
573 break;
574 case XrdCl::QueryCode::Checksum: // fallthrough
576 msg = "Checksum query not supported.";
577 break;
579 msg = "Server configuration query not supported.";
580 break;
582 msg = "Local space stats query not supported.";
583 break;
584 case XrdCl::QueryCode::Opaque: // fallthrough
586 // XrdCl implementation dependent
587 msg = "Opaque query not supported.";
588 break;
590 msg = "Prepare status query not supported.";
591 break;
592 default:
593 msg = "Invalid information query type code";
594 }
595 m_logger->Error(kLogXrdClHttp, "%s", msg.c_str());
596 return status;
597 }
598 }
599 catch (const std::exception& e)
600 {
601 m_logger->Warning(kLogXrdClHttp, "Failed to parse query code %s", e.what());
603 }
604
605 handler->HandleResponse(new XrdCl::XRootDStatus(), obj);
606 return XrdCl::XRootDStatus();
607}
608
610File::Read(uint64_t offset,
611 uint32_t size,
612 void *buffer,
613 XrdCl::ResponseHandler *handler,
614 time_t timeout)
615{
616 if (!m_is_opened) {
617 m_logger->Error(kLogXrdClHttp, "Cannot read. URL isn't open");
619 }
620 auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout, false);
621 if (ok) {
622 if (status.IsOK()) {
623 m_logger->Debug(kLogXrdClHttp, "Read %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size, static_cast<long long>(offset));
624 } else {
625 m_logger->Warning(kLogXrdClHttp, "Read %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size, static_cast<long long>(offset), status.GetErrorMessage().c_str());
626 }
627 return status;
628 } else if (m_full_download.load(std::memory_order_relaxed)) {
629 std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
630 if (m_prefetch_op && m_prefetch_op->IsDone() && (static_cast<off_t>(offset) == m_prefetch_offset.load(std::memory_order_acquire))) {
631 if (handler) {
632 auto ci = new XrdCl::ChunkInfo(offset, 0, buffer);
633 auto obj = new XrdCl::AnyObject();
634 obj->Set(ci);
635 handler->HandleResponse(new XrdCl::XRootDStatus{}, obj);
636 }
637 return XrdCl::XRootDStatus{};
638 }
639 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Non-sequential read detected when in full-download mode");
640 }
641
642 auto ts = GetHeaderTimeout(timeout);
643 auto url = GetCurrentURL();
644 m_logger->Debug(kLogXrdClHttp, "Read %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size, static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
645
646 std::shared_ptr<XrdClHttp::CurlReadOp> readOp(
648 handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, size),
649 static_cast<char*>(buffer), size, m_logger,
650 GetConnCallout(), &m_default_header_callout
651 )
652 );
653 try {
654 m_queue->Produce(std::move(readOp));
655 } catch (...) {
656 m_logger->Warning(kLogXrdClHttp, "Failed to add read op to queue");
658 }
659
660 return XrdCl::XRootDStatus();
661}
662
663std::tuple<XrdCl::XRootDStatus, bool>
664File::ReadPrefetch(uint64_t offset, uint64_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout, bool isPgRead)
665{
666 // Check if prefetching is enabled; if not, return early.
667 auto prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled.load(std::memory_order_relaxed);
668 if (!prefetch_enabled) {
669 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
670 m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to prefetching being disabled", isPgRead ? "Pg": "");
671 return std::make_tuple(XrdCl::XRootDStatus{}, false);
672 }
673 std::unique_lock lock(m_default_prefetch_handler->m_prefetch_mutex);
674 if (m_prefetch_size == -1) {
675 m_logger->Debug(kLogXrdClHttp, "%sRead prefetch skipping due to unknown file size", isPgRead ? "Pg": "");
676 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
677 m_default_prefetch_handler->m_prefetch_enabled = false;
678 }
679 prefetch_enabled = m_default_prefetch_handler->m_prefetch_enabled;
680 if (!prefetch_enabled) {
681 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
682 return std::make_tuple(XrdCl::XRootDStatus{}, false);
683 }
684
685 if (isPgRead) {
686 handler = new PgReadResponseHandler(handler);
687 }
688
689 auto url = GetCurrentURL();
690 if (!m_prefetch_op) {
691 auto ts = GetHeaderTimeout(timeout);
692 if (m_prefetch_size == INT64_MAX) {
693 m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch full object)", isPgRead ? "Pg" : "", url.c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
694 } else {
695 m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld with timeout %lld; starting prefetch of size %lld)", isPgRead ? "Pg" : "", url.c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec), static_cast<long long>(m_prefetch_size));
696 }
697
698 try {
699 // Note we don't set m_last_prefetch_handler here; the constructor will do this automatically if necessary.
700 new PrefetchResponseHandler(*this, offset, size, &m_prefetch_offset, static_cast<char *>(buffer), handler, nullptr, timeout);
701 } catch (std::runtime_error &exc) {
702 m_logger->Warning(kLogXrdClHttp, "Failed to create prefetch response handler: %s", exc.what());
703 m_default_prefetch_handler->m_prefetch_enabled = false;
704 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
705 return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
706 }
707
708 // If we are prefetching as part of an open (i.e., a "full download"), there's special handling logic
709 // to pass along the response headers as file properties.
710 m_prefetch_op.reset(
711 m_is_opened ?
712 new XrdClHttp::CurlReadOp(
713 m_last_prefetch_handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, m_prefetch_size),
714 static_cast<char*>(buffer), size, m_logger,
715 GetConnCallout(), &m_default_header_callout
716 )
717 :
718 new XrdClHttp::CurlPrefetchOpenOp(
719 *this, m_last_prefetch_handler, m_default_prefetch_handler, url, ts,
720 std::make_pair(offset, m_prefetch_size), static_cast<char*>(buffer), size, m_logger,
721 GetConnCallout(), &m_default_header_callout
722 )
723 );
724 lock.unlock();
725 m_prefetch_count.fetch_add(1, std::memory_order_relaxed);
726 m_prefetch_reads_hit.fetch_add(1, std::memory_order_relaxed);
727 m_prefetch_offset.store(offset + size, std::memory_order_release);
728 try {
729 m_queue->Produce(m_prefetch_op);
730 } catch (...) {
731 m_logger->Warning(kLogXrdClHttp, "Failed to add prefetch read op to queue");
732 lock.lock();
733 m_prefetch_op.reset();
734 m_default_prefetch_handler->m_prefetch_enabled = false;
735 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
736 return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
737 }
738 return std::make_tuple(XrdCl::XRootDStatus{}, true);
739 }
740 if (m_prefetch_op->IsDone()) {
741 // Prefetch operation has completed (maybe failed); cannot re-use it.
742 m_default_prefetch_handler->m_prefetch_enabled = false;
743 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
744 m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to prefetching being already complete", isPgRead ? "Pg": "");
745 return std::make_tuple(XrdCl::XRootDStatus{}, false);
746 }
747
748 auto expected_offset = static_cast<off_t>(offset);
749 if (!m_prefetch_offset.compare_exchange_strong(expected_offset, static_cast<off_t>(offset + size), std::memory_order_acq_rel)) {
750 // Out-of-order read; can't handle the prefetch.
751 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
752 m_logger->Dump(kLogXrdClHttp, "%sRead prefetch skipping due to out-of-order reads (requested %lld; current offset %lld)", isPgRead ? "Pg": "", static_cast<long long>(offset), static_cast<long long>(expected_offset));
753 return std::make_tuple(XrdCl::XRootDStatus{}, false);
754 }
755 if (m_logger->GetLevel() >= XrdCl::Log::LogLevel::DebugMsg) {
756 m_logger->Debug(kLogXrdClHttp, "%sRead %s (%llu bytes at offset %lld; using ongoing prefetch)", isPgRead ? "Pg" : "", GetCurrentURL().c_str(), static_cast<unsigned long long>(size), static_cast<long long>(offset));
757 }
758 try {
759 // Notice we don't set m_last_prefetch_handler here; as soon as the constructor is invoked, another thread could have
760 // invoked the handler's callback and deleted it.
761 new PrefetchResponseHandler(*this, offset, size, &m_prefetch_offset, static_cast<char *>(buffer), handler, &lock, timeout);
762 } catch (std::runtime_error &exc) {
763 m_logger->Warning(kLogXrdClHttp, "Failed to create prefetch response handler: %s", exc.what());
764 m_default_prefetch_handler->m_prefetch_enabled = false;
765 m_prefetch_reads_miss.fetch_add(1, std::memory_order_relaxed);
766 return std::make_tuple(XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError), true);
767 }
768
769 return std::make_tuple(XrdCl::XRootDStatus{}, true);
770}
771
772XrdCl::XRootDStatus
774 void *buffer,
775 XrdCl::ResponseHandler *handler,
776 time_t timeout )
777{
778 if (!m_is_opened) {
779 m_logger->Error(kLogXrdClHttp, "Cannot do vector read: URL isn't open");
781 } else if (m_full_download.load(std::memory_order_relaxed)) {
782 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Only sequential reads are supported when in full-download mode");
783 }
784 if (chunks.empty()) {
785 if (handler) {
786 auto status = new XrdCl::XRootDStatus();
787 auto vr = std::make_unique<XrdCl::VectorReadInfo>();
788 vr->SetSize(0);
789 auto obj = new XrdCl::AnyObject();
790 obj->Set(vr.release());
791 handler->HandleResponse(status, obj);
792 }
793 return XrdCl::XRootDStatus();
794 }
795
796 auto ts = GetHeaderTimeout(timeout);
797 auto url = GetCurrentURL();
798 m_logger->Debug(kLogXrdClHttp, "Read %s (%lld chunks; first chunk is %u bytes at offset %lld with timeout %lld)", url.c_str(), static_cast<long long>(chunks.size()), static_cast<unsigned>(chunks[0].GetLength()), static_cast<long long>(chunks[0].GetOffset()), static_cast<long long>(ts.tv_sec));
799
800 std::shared_ptr<XrdClHttp::CurlVectorReadOp> readOp(
802 handler, url, ts, chunks, m_logger, GetConnCallout(), &m_default_header_callout
803 )
804 );
805 try {
806 m_queue->Produce(std::move(readOp));
807 } catch (...) {
808 m_logger->Warning(kLogXrdClHttp, "Failed to add vector read op to queue");
810 }
811
812 return XrdCl::XRootDStatus();
813}
814
816File::Write(uint64_t offset,
817 uint32_t size,
818 const void *buffer,
819 XrdCl::ResponseHandler *handler,
820 time_t timeout)
821{
822 if (!m_is_opened) {
823 m_logger->Error(kLogXrdClHttp, "Cannot write: URL isn't open");
825 } else if (m_full_download.load(std::memory_order_relaxed)) {
826 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Only sequential reads are supported when in full-download mode");
827 }
828 m_default_prefetch_handler->DisablePrefetch();
829
830 auto ts = GetHeaderTimeout(timeout);
831 auto url = GetCurrentURL();
832 m_logger->Debug(kLogXrdClHttp, "Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), size, static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
833
834 auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
835 if (!handler_wrapper) {
836 handler_wrapper = new PutResponseHandler(handler);
837 PutResponseHandler *expected_value = nullptr;
838 if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
839 delete handler_wrapper;
840 return expected_value->QueueWrite(std::make_pair(buffer, size), handler);
841 }
842
843 if (offset != 0) {
844 m_put_handler.store(nullptr, std::memory_order_release);
845 delete handler_wrapper;
846 m_logger->Warning(kLogXrdClHttp, "Cannot start PUT operation at non-zero offset");
847 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "HTTP uploads must start at offset 0");
848 }
849 m_put_op.reset(new XrdClHttp::CurlPutOp(
850 handler_wrapper, m_default_put_handler, url, static_cast<const char*>(buffer), size, ts, m_logger,
851 GetConnCallout(), &m_default_header_callout
852 ));
853 handler_wrapper->SetOp(m_put_op);
854 m_put_offset.fetch_add(size, std::memory_order_acq_rel);
855 try {
856 m_queue->Produce(m_put_op);
857 } catch (...) {
858 m_put_handler.store(nullptr, std::memory_order_release);
859 delete handler_wrapper;
860 m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
862 }
863 return XrdCl::XRootDStatus();
864 }
865
866 auto old_offset = m_put_offset.fetch_add(size, std::memory_order_acq_rel);
867 if (static_cast<off_t>(offset) != old_offset) {
868 m_put_offset.fetch_sub(size, std::memory_order_acq_rel);
869 m_logger->Warning(kLogXrdClHttp, "Requested write offset at %lld does not match current file descriptor offset at %lld",
870 static_cast<long long>(offset), static_cast<long long>(old_offset));
871 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "Requested write offset does not match current offset");
872 }
873 return handler_wrapper->QueueWrite(std::make_pair(buffer, size), handler);
874}
875
877File::Write(uint64_t offset,
878 XrdCl::Buffer &&buffer,
879 XrdCl::ResponseHandler *handler,
880 time_t timeout)
881{
882 if (!m_is_opened) {
883 m_logger->Error(kLogXrdClHttp, "Cannot write: URL isn't open");
885 }
886 m_default_prefetch_handler->DisablePrefetch();
887
888 auto ts = GetHeaderTimeout(timeout);
889 auto url = GetCurrentURL();
890 m_logger->Debug(kLogXrdClHttp, "Write %s (%d bytes at offset %lld with timeout %lld)", url.c_str(), static_cast<int>(buffer.GetSize()), static_cast<long long>(offset), static_cast<long long>(ts.tv_sec));
891
892 auto handler_wrapper = m_put_handler.load(std::memory_order_relaxed);
893 if (!handler_wrapper) {
894 handler_wrapper = new PutResponseHandler(handler);
895 PutResponseHandler *expected_value = nullptr;
896 if (!m_put_handler.compare_exchange_strong(expected_value, handler_wrapper, std::memory_order_acq_rel)) {
897 delete handler_wrapper;
898 return expected_value->QueueWrite(std::move(buffer), handler);
899 }
900
901 if (offset != 0) {
902 m_put_handler.store(nullptr, std::memory_order_release);
903 delete handler_wrapper;
904 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "HTTP uploads must start at offset 0");
905 }
906 m_put_op.reset(new XrdClHttp::CurlPutOp(
907 handler_wrapper, m_default_put_handler, url, std::move(buffer), ts, m_logger,
908 GetConnCallout(), &m_default_header_callout
909 ));
910 handler_wrapper->SetOp(m_put_op);
911 m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
912 try {
913 m_queue->Produce(m_put_op);
914 } catch (...) {
915 m_put_handler.store(nullptr, std::memory_order_release);
916 delete handler_wrapper;
917 m_logger->Warning(kLogXrdClHttp, "Failed to add put op to queue");
919 }
920 return XrdCl::XRootDStatus();
921 }
922
923 auto old_offset = m_put_offset.fetch_add(buffer.GetSize(), std::memory_order_acq_rel);
924 if (static_cast<off_t>(offset) != old_offset) {
925 m_put_offset.fetch_sub(buffer.GetSize(), std::memory_order_acq_rel);
926 m_logger->Warning(kLogXrdClHttp, "Requested write offset at %lld does not match current file descriptor offset at %lld",
927 static_cast<long long>(offset), static_cast<long long>(old_offset));
928 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidArgs, 0, "Requested write offset does not match current offset");
929 }
930 return handler_wrapper->QueueWrite(std::move(buffer), handler);
931}
932
934File::PgRead(uint64_t offset,
935 uint32_t size,
936 void *buffer,
937 XrdCl::ResponseHandler *handler,
938 time_t timeout)
939{
940 if (!m_is_opened) {
941 m_logger->Error(kLogXrdClHttp, "Cannot pgread. URL isn't open");
943 }
944 auto [status, ok] = ReadPrefetch(offset, size, buffer, handler, timeout, true);
945 if (ok) {
946 if (status.IsOK()) {
947 m_logger->Debug(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld) will be served from prefetch handler", m_url.c_str(), size, static_cast<long long>(offset));
948 } else {
949 m_logger->Warning(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld) failed: %s", m_url.c_str(), size, static_cast<long long>(offset), status.GetErrorMessage().c_str());
950 }
951 return status;
952 } else if (m_full_download.load(std::memory_order_relaxed)) {
953 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Non-sequential read detected when in full-download mode");
954 }
955
956 auto ts = GetHeaderTimeout(timeout);
957 auto url = GetCurrentURL();
958 m_logger->Debug(kLogXrdClHttp, "PgRead %s (%d bytes at offset %lld)", url.c_str(), size, static_cast<long long>(offset));
959
960 std::shared_ptr<XrdClHttp::CurlPgReadOp> readOp(
962 handler, m_default_prefetch_handler, url, ts, std::make_pair(offset, size),
963 static_cast<char*>(buffer), size, m_logger,
964 GetConnCallout(), &m_default_header_callout
965 )
966 );
967
968 try {
969 m_queue->Produce(std::move(readOp));
970 } catch (...) {
971 m_logger->Warning(kLogXrdClHttp, "Failed to add read op to queue");
973 }
974
975 return XrdCl::XRootDStatus();
976}
977
978bool
980{
981 return m_is_opened;
982}
983
984bool
985File::GetProperty(const std::string &name,
986 std::string &value) const
987{
988 if (name == "CurrentURL") {
989 value = GetCurrentURL();
990 return true;
991 }
992
993 if (name == "IsPrefetching") {
994 value = m_default_prefetch_handler->IsPrefetching() ? "true" : "false";
995 return true;
996 }
997
998 std::shared_lock lock(m_properties_mutex);
999 if (name == "LastURL") {
1000 value = m_last_url;
1001 return true;
1002 }
1003
1004 const auto p = m_properties.find(name);
1005 if (p == std::end(m_properties)) {
1006 return false;
1007 }
1008
1009 value = p->second;
1010 return true;
1011}
1012
1013bool File::SendResponseInfo() const {
1014 std::string val;
1015 return GetProperty(ResponseInfoProperty, val) && val == "true";
1016}
1017
1018bool
1019File::SetProperty(const std::string &name,
1020 const std::string &value)
1021{
1022 if (name == "XrdClHttpHeaderCallout") {
1023 long long pointer;
1024 try {
1025 pointer = std::stoll(value, nullptr, 16);
1026 } catch (...) {
1027 pointer = 0;
1028 }
1029 m_header_callout.store(reinterpret_cast<XrdClHttp::HeaderCallout*>(pointer), std::memory_order_release);
1030 } else if (name == "XrdClHttpFullDownload") {
1031 if (value == "true") {
1032 auto prefetch_handler = m_default_prefetch_handler;
1033 if (prefetch_handler) {
1034 std::unique_lock lock(prefetch_handler->m_prefetch_mutex);
1035 prefetch_handler->m_prefetch_enabled.store(true, std::memory_order_relaxed);
1036 }
1037 m_full_download.store(true, std::memory_order_relaxed);
1038 }
1039 }
1040
1041 std::unique_lock lock(m_properties_mutex);
1042
1043 m_properties[name] = value;
1044 if (name == "LastURL") {
1045 m_last_url = value;
1046 m_url_current = "";
1047 }
1048 else if (name == "XrdClHttpQueryParam") {
1049 CalculateCurrentURL(value);
1050 }
1051 else if (name == "XrdClHttpMaintenancePeriod") {
1052 unsigned period;
1053 auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), period);
1054 if ((ec.ec == std::errc()) && (ec.ptr == value.c_str() + value.size()) && period > 0) {
1055 m_logger->Debug(kLogXrdClHttp, "Setting maintenance period to %u", period);
1057 }
1058 }
1059 else if (name == "XrdClHttpStallTimeout") {
1060 std::string errmsg;
1061 timespec ts;
1062 if (!ParseTimeout(value, ts, errmsg)) {
1063 m_logger->Debug(kLogXrdClHttp, "Failed to parse timeout value (%s): %s", value.c_str(), errmsg.c_str());
1064 } else {
1065 CurlOperation::SetStallTimeout(std::chrono::seconds{ts.tv_sec} + std::chrono::nanoseconds{ts.tv_nsec});
1066 }
1067 }
1068 else if (name == "XrdClHttpPrefetchSize") {
1069 off_t size;
1070 auto ec = std::from_chars(value.c_str(), value.c_str() + value.size(), size);
1071 if ((ec.ec == std::errc()) && (ec.ptr == value.c_str() + value.size())) {
1072 lock.unlock();
1073 std::unique_lock lock2(m_default_prefetch_handler->m_prefetch_mutex);
1074 m_prefetch_size = size;
1075 } else {
1076 m_logger->Debug(kLogXrdClHttp, "XrdClHttpPrefetchSize value (%s) was not parseable", value.c_str());
1077 }
1078 }
1079 return true;
1080}
1081
1082const std::string
1083File::GetCurrentURL() const {
1084 {
1085 std::shared_lock lock(m_properties_mutex);
1086
1087 if (!m_url_current.empty()) {
1088 return m_url_current;
1089 } else if (m_url.empty() && m_last_url.empty()) {
1090 return "";
1091 }
1092 }
1093 std::unique_lock lock(m_properties_mutex);
1094
1095 auto iter = m_properties.find("XrdClHttpQueryParam");
1096 if (iter == m_properties.end()) {
1097 return m_last_url.empty() ? m_url : m_last_url;
1098 }
1099 CalculateCurrentURL(iter->second);
1100
1101 return m_url_current;
1102}
1103
1104void
1105File::CalculateCurrentURL(const std::string &value) const {
1106 const auto &last_url = m_last_url.empty() ? m_url : m_last_url;
1107 if (value.empty()) {
1108 m_url_current = last_url;
1109 } else {
1110 auto loc = last_url.find('?');
1111 if (loc == std::string::npos) {
1112 m_url_current = last_url + '?' + value;
1113 } else {
1114 XrdCl::URL url(last_url);
1115 auto map = url.GetParams(); // Make a copy of the pre-existing parameters
1116 url.SetParams(value); // Parse the new value
1117 auto update_map = url.GetParams();
1118 for (const auto &entry : map) {
1119 if (update_map.find(entry.first) == update_map.end()) {
1120 update_map[entry.first] = entry.second;
1121 }
1122 }
1123 bool first = true;
1124 std::stringstream ss;
1125 for (const auto &entry : update_map) {
1126 ss << (first ? "?" : "&") << entry.first << "=" << entry.second;
1127 first = false;
1128 }
1129 m_url_current = last_url.substr(0, loc) + ss.str();
1130 }
1131 }
1132}
1133
1134File::PrefetchResponseHandler::PrefetchResponseHandler(
1135 File &parent, off_t offset, size_t size, std::atomic<off_t> *prefetch_offset, char *buffer,
1136 XrdCl::ResponseHandler *handler, std::unique_lock<std::mutex> *lock, time_t timeout
1137)
1138 : m_parent(parent),
1139 m_handler(handler),
1140 m_buffer(buffer),
1141 m_size(size),
1142 m_offset(offset),
1143 m_prefetch_offset(prefetch_offset),
1144 m_timeout(timeout)
1145{
1146 if (parent.m_last_prefetch_handler) {
1147 parent.m_last_prefetch_handler->m_next = this;
1148 parent.m_last_prefetch_handler = this;
1149 } else {
1150 m_parent.m_last_prefetch_handler = this;
1151 // If lock is nullptr, then we are guaranteed that this is called during the creation
1152 // of the m_prefetch_op and can skip this check.
1153 if (lock && m_parent.m_prefetch_op) {
1154 // If continuing the prefetch operation fails, then the failure callback
1155 // will be invoked; the callback requires the mutex and hence we need to unlock it
1156 // here to avoid a deadlock.
1157 lock->unlock();
1158 if (!parent.m_prefetch_op->Continue(parent.m_prefetch_op, this, buffer, size)) {
1159 lock->lock();
1160 // As soon as we unlock the lock, another thread could have used finished the
1161 // operation (which deletes the object); we must be careful to not touch the
1162 // object (reference m_*) in the meantime.
1163 if (parent.m_last_prefetch_handler == this)
1164 parent.m_last_prefetch_handler = nullptr;
1165 throw std::runtime_error("Failed to continue prefetch operation");
1166 }
1167 }
1168 }
1169}
1170
1171void
1172File::PrefetchResponseHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1173 // Ensure that we are deleted once the callback is done.
1174 std::unique_ptr<PrefetchResponseHandler> owner(this);
1175
1176 bool mismatched_size = false;
1177 if (status) {
1178 if (status->IsOK() && response) {
1179 XrdCl::ChunkInfo *ci = nullptr;
1180 response->Get(ci);
1181 if (ci) {
1182 auto missing_bytes = m_size - ci->GetLength();
1183 if (missing_bytes) {
1184 mismatched_size = true;
1185 m_prefetch_offset->fetch_sub(missing_bytes, std::memory_order_relaxed);
1186 }
1187 m_prefetch_bytes_used.fetch_add(ci->GetLength(), std::memory_order_relaxed);
1188 }
1189 } else if (!status->IsOK()) {
1190 m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1191 }
1192 }
1193
1194 PrefetchResponseHandler *next;
1195 {
1196 std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1197 next = m_next;
1198 }
1199 if (next) {
1200 if (status && status->IsOK() && !mismatched_size) {
1201 m_parent.m_prefetch_op->Continue(m_parent.m_prefetch_op, next, next->m_buffer, next->m_size);
1202 } else {
1203 // On failure resubmit subsequent operations.
1204 // All the subsequent ops also depend on us having the expected read length (otherwise the
1205 // file offsets are incorrect). If there's a mismatched read size (shorter actual bytes available
1206 // than what is originally requested), then that's another sign of potential issue and we disable
1207 // the prefetch mechanism.
1208 m_parent.m_default_prefetch_handler->DisablePrefetch();
1209 next->ResubmitOperation();
1210 }
1211 }
1212
1213 {
1214 std::unique_lock lock(m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1215 if (m_parent.m_last_prefetch_handler == this) {
1216 m_parent.m_last_prefetch_handler = nullptr;
1217 }
1218 if (!status || !status->IsOK()) {
1219 m_parent.m_prefetch_op.reset();
1220 m_parent.m_default_prefetch_handler->m_prefetch_enabled = false;
1221 }
1222 }
1223
1224 if (m_handler) m_handler->HandleResponse(status, response);
1225 else delete response;
1226}
1227
1228void
1229File::PrefetchResponseHandler::ResubmitOperation()
1230{
1231 m_parent.m_logger->Debug(kLogXrdClHttp, "Resubmitting waiting prefetch operations as new reads due to prefetch failure");
1232 PrefetchResponseHandler *next = this;
1233 while (next) {
1234 auto cur = next;
1235 auto st = next->m_parent.Read(next->m_offset, next->m_size, next->m_buffer, next->m_handler, next->m_timeout);
1236 if (!st.IsOK() && next->m_handler) {
1237 next->m_handler->HandleResponse(new XrdCl::XRootDStatus(st), nullptr);
1238 }
1239 {
1240 std::unique_lock lock(next->m_parent.m_default_prefetch_handler->m_prefetch_mutex);
1241 next = next->m_next;
1242 }
1243 delete cur;
1244 }
1245}
1246
1247void
1248File::PrefetchDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw) {
1249 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1250 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1251 if (status && !status->IsOK()) {
1252 if ((status->code == XrdCl::errOperationExpired) && (status->GetErrorMessage().find("Transfer stalled for too long") != std::string::npos)) {
1253 m_prefetch_expired_count.fetch_add(1, std::memory_order_relaxed);
1254 m_logger->Debug(kLogXrdClHttp, "Prefetch data for %s went unused; disabling.", m_url.c_str());
1255 } else {
1256 m_prefetch_failed_count.fetch_add(1, std::memory_order_relaxed);
1257 m_logger->Warning(kLogXrdClHttp, "Disabling prefetch of %s due to error: %s", m_url.c_str(), status->ToStr().c_str());
1258 }
1259 }
1260 DisablePrefetch();
1261}
1262
1263void
1264File::PutDefaultHandler::HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) {
1265 delete response;
1266 if (status) {
1267 m_logger->Warning(kLogXrdClHttp, "Failing future write calls due to error: %s", status->ToStr().c_str());
1268 delete status;
1269 }
1270}
1271
1272std::shared_ptr<XrdClHttp::HeaderCallout::HeaderList>
1273File::HeaderCallout::GetHeaders(const std::string &verb,
1274 const std::string &url,
1275 const HeaderList &headers)
1276{
1277 auto parent_callout = m_parent.m_header_callout.load(std::memory_order_acquire);
1278 std::shared_ptr<std::vector<std::pair<std::string, std::string>>> result_headers;
1279 if (parent_callout != nullptr) {
1280 result_headers = parent_callout->GetHeaders(verb, url, headers);
1281 } else {
1282 result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1283 for (const auto & info : headers) {
1284 result_headers->emplace_back(info.first, info.second);
1285 }
1286 }
1287 if (m_parent.m_asize >= 0 && verb == "PUT") {
1288 if (!result_headers) {
1289 result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1290 }
1291 auto iter = std::find_if(result_headers->begin(), result_headers->end(),
1292 [](const auto &pair) { return !strcasecmp(pair.first.c_str(), "Content-Length"); });
1293 if (iter == result_headers->end()) {
1294 result_headers->emplace_back("Content-Length", std::to_string(m_parent.m_asize));
1295 }
1296 } else if (!result_headers) {
1297 result_headers.reset(new std::vector<std::pair<std::string, std::string>>{});
1298 }
1299 return result_headers;
1300}
1301
1302File::PutResponseHandler::PutResponseHandler(XrdCl::ResponseHandler *handler)
1303 : m_active_handler(handler)
1304{}
1305
1306void
1307File::PutResponseHandler::HandleResponse(XrdCl::XRootDStatus *status_raw, XrdCl::AnyObject *response_raw)
1308{
1309 std::unique_ptr<XrdCl::XRootDStatus> status(status_raw);
1310 std::unique_ptr<XrdCl::AnyObject> response(response_raw);
1311
1312 // Note: if the handler owns the file object (as in the case of Pelican's writeback
1313 // response handler), then the callback may cause the file to be deleted - and hence
1314 // this instance of PutResponseHandler to be deleted. However, if m_active is true,
1315 // the destructor will wait until it's set to false; that cannot occur until we clear
1316 // m_active (in ProcessQueue or in the cleanup path for pending writes).
1317 //
1318 // Hence, we must ensure that we clear m_active and run any queue logic before invoking
1319 // callback handlers, which may delete this object or generate work in other threads.
1320
1321 XrdCl::ResponseHandler *current_handler = nullptr;
1322 if (!status->IsOK()) {
1323 // Fail remaining (pending) handlers with the same error
1324 // Any writes attempts by the client after failure are set
1325 // are prompty declined
1326 std::vector<XrdCl::ResponseHandler *> pending_handlers;
1327 {
1328 std::lock_guard<std::mutex> lg(m_mutex);
1329 current_handler = m_active_handler;
1330 for (auto &[_, h] : m_pending_writes) {
1331 if (h) pending_handlers.push_back(h);
1332 }
1333
1334 m_pending_writes.clear();
1335 m_active = false;
1336 m_active_handler = nullptr;
1337 m_cv.notify_all();
1338 }
1339
1340 XrdCl::XRootDStatus status_copy(*status);
1341 if (current_handler) {
1342 current_handler->HandleResponse(status.release(), response.release());
1343 }
1344
1345 for (auto *h : pending_handlers) {
1346 h->HandleResponse(new XrdCl::XRootDStatus(status_copy), nullptr);
1347 }
1348 return;
1349 }
1350
1351 current_handler = m_active_handler;
1352 if (ProcessQueue() && current_handler) {
1353 current_handler->HandleResponse(status.release(), response.release());
1354 }
1355}
1356
1357XrdCl::XRootDStatus
1358File::PutResponseHandler::QueueWrite(std::variant<std::pair<const void *, size_t>, XrdCl::Buffer> buffer, XrdCl::ResponseHandler *handler)
1359{
1360 if (m_op->HasFailed()) {
1361 auto sc = m_op->GetStatusCode();
1362 if (HTTPStatusIsError(sc)){
1363 auto httpErr = HTTPStatusConvert(sc);
1364 auto err_msg = m_op->GetCurlErrorMessage();
1365 if (err_msg.empty()) {
1366 err_msg = m_op->GetStatusMessage();
1367 }
1368 return XrdCl::XRootDStatus(XrdCl::stError, httpErr.first, httpErr.second, err_msg);
1369 }
1370 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInvalidOp, 0, "Cannot continue writing to open file after error");
1371 }
1372 std::lock_guard<std::mutex> lg(m_mutex);
1373 if (!m_active) {
1374 m_active = true;
1375 m_active_handler = handler;
1376 if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1377 if (!m_op->Continue(m_op, this, std::move(std::get<XrdCl::Buffer>(buffer)))) {
1378 m_active = false;
1379 m_cv.notify_all();
1380 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation");
1381 }
1382 } else {
1383 auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1384 if (!m_op->Continue(m_op, this, static_cast<const char *>(buffer_info.first), buffer_info.second)) {
1385 m_active = false;
1386 m_cv.notify_all();
1387 return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation");
1388 }
1389 }
1390 } else {
1391 m_pending_writes.emplace_back(std::move(buffer), handler);
1392 }
1393 return XrdCl::XRootDStatus{};
1394}
1395
1396// Start the next pending write operation.
1397bool
1398File::PutResponseHandler::ProcessQueue() {
1399 std::lock_guard<std::mutex> lg(m_mutex);
1400 if (m_pending_writes.empty()) {
1401 // No pending writes; mark the operation as inactive.
1402 m_active = false;
1403 m_active_handler = nullptr;
1404 m_cv.notify_all();
1405 return true;
1406 }
1407
1408 // Start the next pending write.
1409 auto & [buffer, handler] = m_pending_writes.front();
1410 bool rv;
1411 m_active_handler = handler;
1412 if (std::holds_alternative<XrdCl::Buffer>(buffer)) {
1413 rv = m_op->Continue(m_op, this, std::move(std::get<XrdCl::Buffer>(buffer)));
1414 } else {
1415 auto buffer_info = std::get<std::pair<const void *, size_t>>(buffer);
1416 rv = m_op->Continue(m_op, this, static_cast<const char *>(buffer_info.first), buffer_info.second);
1417 }
1418 m_pending_writes.pop_front();
1419 if (!rv) {
1420 // The continuation failed; mark the operation as inactive and
1421 // invoke all pending handlers with the error.
1422 if (m_active_handler) {
1423 m_active_handler->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation"), nullptr);
1424 }
1425 for (auto& [_, h] : m_pending_writes) {
1426 if (h) {
1427 h->HandleResponse(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errOSError, ENOSPC, "Cannot continue PUT operation"), nullptr);
1428 }
1429 }
1430 m_active = false;
1431 m_cv.notify_all();
1432 return false;
1433 }
1434 return true;
1435}
1436
1437void
1438File::PutResponseHandler::WaitForCompletion() {
1439 std::unique_lock lock(m_mutex);
1440 m_cv.wait(lock, [&]{return !m_active;});
1441}
static std::string ts()
timestamp output for logging messages
Definition XrdCephOss.cc:53
static void parent()
#define ResponseInfoProperty
static void SetStallTimeout(int stall_interval)
static void SetMaintenancePeriod(unsigned maint)
virtual XrdCl::XRootDStatus Open(const std::string &url, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode mode, XrdCl::ResponseHandler *handler, time_t timeout) override
static struct timespec ParseHeaderTimeout(const std::string &header_value, XrdCl::Log *logger)
File(std::shared_ptr< XrdClHttp::HandlerQueue > queue, XrdCl::Log *log)
static const struct timespec & GetDefaultHeaderTimeout()
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout, const struct timespec &header_timeout)
virtual bool SetProperty(const std::string &name, const std::string &value) override
virtual bool IsOpen() const override
virtual XrdCl::XRootDStatus VectorRead(const XrdCl::ChunkList &chunks, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Fcntl(const XrdCl::Buffer &arg, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Write(uint64_t offset, uint32_t size, const void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual ~File() noexcept
static std::string GetMonitoringJson()
virtual bool GetProperty(const std::string &name, std::string &value) const override
static const struct timespec & GetMinimumHeaderTimeout()
virtual XrdCl::XRootDStatus PgRead(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Read(uint64_t offset, uint32_t size, void *buffer, XrdCl::ResponseHandler *handler, time_t timeout) override
virtual XrdCl::XRootDStatus Stat(bool force, XrdCl::ResponseHandler *handler, time_t timeout) override
struct timespec GetHeaderTimeout(time_t oper_timeout) const
virtual XrdCl::XRootDStatus Close(XrdCl::ResponseHandler *handler, time_t timeout) override
void Get(Type &object)
Retrieve the object being held.
Binary blob representation.
void FromString(const std::string str)
Fill the buffer from a string.
std::string ToString() const
Convert the buffer to a string.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
@ DebugMsg
print debug info
Definition XrdClLog.hh:112
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
Handle an async response.
virtual void HandleResponse(XRootDStatus *status, AnyObject *response)
Object stat info.
@ IsReadable
Read access is allowed.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetErrorMessage() const
Get error message.
std::string ToStr() const
Convert to string.
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
Definition XrdOucCRC.cc:190
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
std::pair< uint16_t, uint32_t > HTTPStatusConvert(unsigned status)
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
bool HTTPStatusIsError(unsigned status)
const uint64_t kLogXrdClHttp
std::string MarshalDuration(const struct timespec &timeout)
const uint16_t errOperationExpired
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t errInvalidOp
const uint16_t errOSError
const uint16_t errInvalidResponse
const uint16_t errInvalidArgs
const int DefaultRequestTimeout
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Flags
Open flags, may be or'd when appropriate.
@ Write
Open only for writing.
Code
XRootD query request codes.
@ OpaqueFile
Implementation dependent.
@ XAttr
Query file extended attributes.
@ Opaque
Implementation dependent.
@ Config
Query server configuration.
@ Stats
Query server stats.
@ ChecksumCancel
Query file checksum cancellation.
@ Checksum
Query file checksum.
@ Space
Query logical space stats.
@ Prepare
Query prepare status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.