XRootD
Loading...
Searching...
No Matches
XrdClHttpFactory.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#include "XrdClHttpFactory.hh"
22#include "XrdClHttpFile.hh"
24#include "XrdClHttpUtil.hh"
25#include "XrdClHttpOps.hh"
27#include "XrdClHttpWorker.hh"
28
31#include "XrdCl/XrdClLog.hh"
33#include "XrdVersion.hh"
34
35#include <stdio.h>
36#include <unistd.h>
37
39
40using namespace XrdClHttp;
41
42struct timespec
44{
45 if (oper_timeout == 0) {
47 XrdCl::DefaultEnv::GetEnv()->GetInt( "RequestTimeout", val );
48 oper_timeout = val;
49 }
50 if (oper_timeout <= 0) {
51 return {0, 0};
52 }
53 return {oper_timeout, 0};
54}
55
56bool Factory::m_initialized = false;
57std::shared_ptr<XrdClHttp::HandlerQueue> Factory::m_queue;
58XrdCl::Log *Factory::m_log = nullptr;
59std::once_flag Factory::m_init_once;
60std::string Factory::m_stats_location;
61std::chrono::system_clock::time_point Factory::m_start{};
62
63std::mutex Factory::m_shutdown_lock;
64std::thread Factory::m_monitor_tid;
65std::condition_variable Factory::m_shutdown_requested_cv;
66bool Factory::m_shutdown_requested = false;
67
68// shutdown trigger, must be last of the static members
69Factory::shutdown_s Factory::m_shutdowns;
70
71void
72Factory::Initialize()
73{
74 std::unique_lock lock(m_shutdown_lock);
75 if (m_shutdown_requested) {
76 return;
77 }
78 std::call_once(m_init_once, [&] {
80 if (!m_log) {
81 return;
82 }
83 m_log->SetTopicName(kLogXrdClHttp, "XrdClHttp");
84
85 auto env = XrdCl::DefaultEnv::GetEnv();
86 if (!env) {
87 return;
88 }
89
90 SetupX509();
91
92 // The location for the client to write the statistics file; this will be dropped
93 // atomically every ~5 seconds and is meant to be complementary to the future g-stream.
94 env->PutString("HttpStatisticsLocation", "");
95 env->ImportString("HttpStatisticsLocation", "XRD_HTTPSTATISTICSLOCATION");
96 if (env->GetString("HttpStatisticsLocation", m_stats_location)) {
97 m_log->Debug(kLogXrdClHttp, "Will write client statistics to %s", m_stats_location.c_str());
98 } else {
99 m_log->Debug(kLogXrdClHttp, "Not writing client statistics to disk");
100 }
101 m_start = std::chrono::system_clock::now();
102
103 // The minimum value we will accept from the request for a header timeout.
104 // (i.e., the amount of time the plugin will wait to receive headers from the remote server)
105 env->PutString("HttpMinimumHeaderTimeout", "");
106 env->ImportString("HttpMinimumHeaderTimeout", "XRD_HTTPMINIMUMHEADERTIMEOUT");
107
108 // The default value of the header timeout (the amount of time the plugin will wait)
109 // to receive headers from the remote server.
110 env->PutString("HttpDefaultHeaderTimeout", "");
111 env->ImportString("HttpDefaultHeaderTimeout", "XRD_HTTPDEFAULTHEADERTIMEOUT");
112
113 // The number of pending operations allowed in the global work queue.
114 env->PutInt("HttpMaxPendingOps", XrdClHttp::HandlerQueue::GetDefaultMaxPendingOps());
115 env->ImportInt("HttpMaxPendingOps", "XRD_HTTPMAXPENDINGOPS");
117 if (env->GetInt("HttpMaxPendingOps", max_pending)) {
118 if (max_pending <= 0 || max_pending > 10'000'000) {
119 m_log->Error(kLogXrdClHttp,
120 "Invalid value for the maximum number of pending operations in the global work queue (%d); using default value of %d",
121 max_pending,
124 env->PutInt("HttpMaxPendingOps", max_pending);
125 }
126 m_log->Debug(kLogXrdClHttp, "Using %d pending operations in the global work queue", max_pending);
127 }
128 m_queue.reset(new XrdClHttp::HandlerQueue(max_pending));
129
130 // The number of threads to use for curl operations.
131 env->PutInt("HttpNumThreads", m_poll_threads);
132 env->ImportInt("HttpNumThreads", "XRD_HTTPNUMTHREADS");
133 int num_threads = m_poll_threads;
134 if (env->GetInt("HttpNumThreads", num_threads)) {
135 if (num_threads <= 0 || num_threads > 1'000) {
136 m_log->Error(kLogXrdClHttp, "Invalid value for the number of threads to use for curl operations (%d); using default value of %d", num_threads, m_poll_threads);
137 num_threads = m_poll_threads;
138 env->PutInt("HttpNumThreads", num_threads);
139 }
140 m_log->Debug(kLogXrdClHttp, "Using %d threads for curl operations", num_threads);
141 }
142
143 // The stall timeout to use for transfer operations.
144 env->PutInt("HttpStallTimeout", XrdClHttp::CurlOperation::GetDefaultStallTimeout());
145 env->ImportInt("HttpStallTimeout", "XRD_HTTPSTALLTIMEOUT");
147 if (env->GetInt("HttpStallTimeout", stall_timeout)) {
148 if (stall_timeout < 0 || stall_timeout > 86'400) {
149 m_log->Error(kLogXrdClHttp, "Invalid value for the stall timeout (%d); using default value of %d", stall_timeout, XrdClHttp::CurlOperation::GetDefaultStallTimeout());
151 env->PutInt("HttpStallTimeout", stall_timeout);
152 }
153 m_log->Debug(kLogXrdClHttp, "Using %d seconds for the stall timeout", stall_timeout);
154 }
156
157 // The slow transfer rate, in bytes per second, for timing out slow uploads/downloads.
158 env->PutInt("HttpSlowRateBytesSec", XrdClHttp::CurlOperation::GetDefaultSlowRateBytesSec());
159 env->ImportInt("HttpSlowRateBytesSec", "XRD_HTTPSLOWRATEBYTESSEC");
161 if (env->GetInt("HttpSlowRateBytesSec", slow_xfer_rate)) {
162 if (slow_xfer_rate < 0 || slow_xfer_rate > (1024 * 1024 * 1024)) {
163 m_log->Error(kLogXrdClHttp, "Invalid value for the slow transfer rate threshold (%d); using default value of %d", stall_timeout, XrdClHttp::CurlOperation::GetDefaultSlowRateBytesSec());
165 env->PutInt("HttpSlowRateBytesSec", slow_xfer_rate);
166 }
167 m_log->Debug(kLogXrdClHttp, "Using %d bytes/sec for the slow transfer rate threshold", slow_xfer_rate);
168 }
170
171 // Determine the minimum header timeout. It's somewhat arbitrarily defaulted to 2s; below
172 // that and timeouts could be caused by OS scheduling noise. If the client has unreasonable
173 // expectations of the origin, we don't want to cause it to generate lots of origin-side load.
174 std::string val;
175 struct timespec mct{2, 0};
176 if (env->GetString("HttpMinimumHeaderTimeout", val) && !val.empty()) {
177 std::string errmsg;
178 if (!ParseTimeout(val, mct, errmsg)) {
179 m_log->Error(kLogXrdClHttp, "Failed to parse the minimum client timeout (%s): %s", val.c_str(), errmsg.c_str());
180 }
181 }
183
184 struct timespec dht{9, 500'000'000};
185 if (env->GetString("HttpDefaultHeaderTimeout", val) && !val.empty()) {
186 std::string errmsg;
187 if (!ParseTimeout(val, dht, errmsg)) {
188 m_log->Error(kLogXrdClHttp, "Failed to parse the default header timeout (%s): %s", val.c_str(), errmsg.c_str());
189 }
190 }
192
193 // Start up the cache for the OPTIONS response
194 auto &cache = XrdClHttp::VerbsCache::Instance();
195
196 // Startup curl workers after we've set the configs to avoid race conditions
197 for (unsigned idx=0; idx<m_poll_threads; idx++) {
198 auto wk = std::make_unique<XrdClHttp::CurlWorker>(m_queue, cache, m_log);
199 auto wkp = wk.get();
200 std::thread t(XrdClHttp::CurlWorker::RunStatic, wkp);
201 wkp->Start(std::move(wk), std::move(t));
202 }
203
204 std::thread t([this]{Monitor();});
205 m_monitor_tid = std::move(t);
206
207 m_initialized = true;
208 });
209}
210
211void
212Factory::Monitor()
213{
214 // This function is run in a separate thread to monitor the XrdClHttp statistics.
215 // It periodically saves the statistics to the stats file.
216 // Note: this previously had support for sending the statistics through the gstream.
217 // However, this was removed because gstream currently requires linking against XrdServer
218 // which is not available in the client; some further rearranging of headers and linkages
219 // is necessary.
220
221 while (true) {
222 {
223 std::unique_lock lock(m_shutdown_lock);
224 m_shutdown_requested_cv.wait_for(
225 lock,
226 std::chrono::seconds(5),
227 []{return m_shutdown_requested;}
228 );
229 if (m_shutdown_requested) {
230 break;
231 }
232 }
233
234 auto now = std::chrono::system_clock::now();
235
236 std::string monitoring = "{\"event\": \"xrdclhttp\", "
237 "\"start\": " + std::to_string(std::chrono::duration<double>(m_start.time_since_epoch()).count()) + ","
238 "\"now\": " + std::to_string(std::chrono::duration<double>(now.time_since_epoch()).count()) + ","
239 "\"file\": " + File::GetMonitoringJson() + ","
240 "\"workers\": " + CurlWorker::GetMonitoringJson() + ","
241 "\"queues\": " + HandlerQueue::GetMonitoringJson() +
242 " }";
243 m_log->Info(kLogXrdClHttp, "Client monitoring statistics: %s", monitoring.c_str());
244 if (!m_stats_location.empty())
245 {
246 auto stats_tmp = m_stats_location + ".XXXXXX";
247 std::vector<char> stats_vector(stats_tmp.size() + 1, '\0');
248 memcpy(&stats_vector[0], stats_tmp.data(), stats_tmp.size() + 1);
249 auto fd = mkstemp(&stats_vector[0]);
250 if (fd == -1) {
251 m_log->Warning(kLogXrdClHttp, "Failed to create temporary stats file %s: %s", m_stats_location.c_str(), strerror(errno));
252 continue;
253 }
254 auto nb = write(fd, monitoring.data(), monitoring.size());
255 if (nb != static_cast<ssize_t>(monitoring.size())) {
256 if (nb == -1) m_log->Warning(kLogXrdClHttp, "Failed to write statistics into temporary file %s: %s", &stats_vector[0], strerror(errno));
257 else m_log->Warning(kLogXrdClHttp, "Failed to write statistics into temporary file %s: short write", &stats_vector[0]);
258 close(fd);
259 continue;
260 }
261 close(fd);
262 auto rv = rename(&stats_vector[0], m_stats_location.c_str());
263 if (rv) {
264 m_log->Warning(kLogXrdClHttp, "Failed to atomically rename stats file to final destination %s: %s", m_stats_location.c_str(), strerror(errno));
265 }
266 }
267 }
268}
269
270namespace {
271
272void SetIfEmpty(XrdCl::Env *env, XrdCl::Log &log, const std::string &optName, const std::string &envName) {
273 if (!env) return;
274
275 std::string val;
276 if (!env->GetString(optName, val) || val.empty()) {
277 env->PutString(optName, "");
278 env->ImportString(optName, envName);
279 }
280 if (env->GetString(optName, val) && !val.empty()) {
281 log.Info(kLogXrdClHttp, "Setting %s to value '%s'", optName.c_str(), val.c_str());
282 }
283}
284
285} // namespace
286
287void
288Factory::SetupX509() {
289
290 auto env = XrdCl::DefaultEnv::GetEnv();
291 SetIfEmpty(env, *m_log, "HttpCertFile", "XRD_HTTPCERTFILE");
292 SetIfEmpty(env, *m_log, "HttpCertDir", "XRD_HTTPCERTDIR");
293 SetIfEmpty(env, *m_log, "HttpClientCertFile", "XRD_HTTPCLIENTCERTFILE");
294 SetIfEmpty(env, *m_log, "HttpClientKeyFile", "XRD_HTTPCLIENTKEYFILE");
295
296 int disable_proxy = 0;
297 env->PutInt("HttpDisableX509", 0);
298 env->ImportInt("HttpDisableX509", "XRD_HTTPDISABLEX509");
299
300 std::string filename;
301 char *filename_char;
302 if (!disable_proxy && (!env->GetString("HttpClientCertFile", filename) || filename.empty())) {
303 if ((filename_char = getenv("X509_USER_PROXY"))) {
304 filename = filename_char;
305 }
306 if (filename.empty()) {
307 filename = "/tmp/x509up_u" + std::to_string(geteuid());
308 }
309 if (access(filename.c_str(), R_OK) == 0) {
310 m_log->Debug(kLogXrdClHttp, "Using X509 proxy file found at %s for TLS client credential", filename.c_str());
311 env->PutString("HttpClientCertFile", filename);
312 env->PutString("HttpClientKeyFile", filename);
313 }
314 }
315 if ((!env->GetString("HttpCertDir", filename) || filename.empty()) && (filename_char = getenv("X509_CERT_DIR"))) {
316 env->PutString("HttpCertDir", filename_char);
317 }
318}
319
320void
321Factory::Shutdown()
322{
323 {
324 std::unique_lock lock(m_shutdown_lock);
325 m_shutdown_requested = true;
326 m_shutdown_requested_cv.notify_one();
327 }
328 if (m_monitor_tid.joinable()) {
329 m_monitor_tid.join();
330 }
331}
332
333void
334Factory::Produce(std::unique_ptr<XrdClHttp::CurlOperation> operation)
335{
336 m_queue->Produce(std::move(operation));
337}
338
340Factory::CreateFile(const std::string & /*url*/) {
341 Initialize();
342 if (!m_initialized) {return nullptr;}
343 return new File(m_queue, m_log);
344}
345
347Factory::CreateFileSystem(const std::string & url) {
348 Initialize();
349 if (!m_initialized) {return nullptr;}
350 return new Filesystem(url, m_queue, m_log);
351}
352
353extern "C"
354{
355 void *XrdClGetPlugIn(const void*)
356 {
357 return static_cast<void*>(new Factory());
358 }
359}
void * XrdClGetPlugIn(const void *)
XrdVERSIONINFO(XrdClGetPlugIn, XrdClGetPlugIn) using namespace XrdClHttp
#define access(a, b)
Definition XrdPosix.hh:44
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define rename(a, b)
Definition XrdPosix.hh:92
XrdOucString File
static void SetSlowRateBytesSec(int rate)
static void SetStallTimeout(int stall_interval)
static int GetDefaultSlowRateBytesSec()
static int GetDefaultStallTimeout()
static void RunStatic(CurlWorker *myself)
static std::string GetMonitoringJson()
void Produce(std::unique_ptr< XrdClHttp::CurlOperation > operation)
virtual XrdCl::FilePlugIn * CreateFile(const std::string &url) override
Create a file plug-in for the given URL.
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout)
static std::string GetMonitoringJson()
static void SetDefaultHeaderTimeout(struct timespec &ts)
static void SetMinimumHeaderTimeout(struct timespec &ts)
static std::string GetMonitoringJson()
static unsigned GetDefaultMaxPendingOps()
static VerbsCache & Instance()
virtual XrdCl::FileSystemPlugIn * CreateFileSystem(const std::string &url) override
Create a file system plug-in for the given URL.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
bool PutString(const std::string &key, const std::string &value)
Definition XrdClEnv.cc:52
bool ImportString(const std::string &key, const std::string &shellKey)
Definition XrdClEnv.cc:214
bool ImportInt(const std::string &key, const std::string &shellKey)
Definition XrdClEnv.cc:185
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
An interface for file plug-ins.
An interface for file plug-ins.
Handle diagnostics.
Definition XrdClLog.hh:101
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
const uint64_t kLogXrdClHttp
const int DefaultRequestTimeout