33#include "XrdVersion.hh"
45 if (oper_timeout == 0) {
50 if (oper_timeout <= 0) {
53 return {oper_timeout, 0};
56bool Factory::m_initialized =
false;
57std::shared_ptr<XrdClHttp::HandlerQueue> Factory::m_queue;
59std::once_flag Factory::m_init_once;
60std::string Factory::m_stats_location;
61std::chrono::system_clock::time_point Factory::m_start{};
63std::mutex Factory::m_shutdown_lock;
64std::thread Factory::m_monitor_tid;
65std::condition_variable Factory::m_shutdown_requested_cv;
66bool Factory::m_shutdown_requested =
false;
69Factory::shutdown_s Factory::m_shutdowns;
74 std::unique_lock lock(m_shutdown_lock);
75 if (m_shutdown_requested) {
78 std::call_once(m_init_once, [&] {
94 env->PutString(
"HttpStatisticsLocation",
"");
95 env->ImportString(
"HttpStatisticsLocation",
"XRD_HTTPSTATISTICSLOCATION");
96 if (env->GetString(
"HttpStatisticsLocation", m_stats_location)) {
97 m_log->Debug(
kLogXrdClHttp,
"Will write client statistics to %s", m_stats_location.c_str());
99 m_log->Debug(
kLogXrdClHttp,
"Not writing client statistics to disk");
101 m_start = std::chrono::system_clock::now();
105 env->PutString(
"HttpMinimumHeaderTimeout",
"");
106 env->ImportString(
"HttpMinimumHeaderTimeout",
"XRD_HTTPMINIMUMHEADERTIMEOUT");
110 env->PutString(
"HttpDefaultHeaderTimeout",
"");
111 env->ImportString(
"HttpDefaultHeaderTimeout",
"XRD_HTTPDEFAULTHEADERTIMEOUT");
115 env->ImportInt(
"HttpMaxPendingOps",
"XRD_HTTPMAXPENDINGOPS");
117 if (env->GetInt(
"HttpMaxPendingOps", max_pending)) {
118 if (max_pending <= 0 || max_pending > 10'000'000) {
120 "Invalid value for the maximum number of pending operations in the global work queue (%d); using default value of %d",
124 env->PutInt(
"HttpMaxPendingOps", max_pending);
126 m_log->Debug(
kLogXrdClHttp,
"Using %d pending operations in the global work queue", max_pending);
131 env->PutInt(
"HttpNumThreads", m_poll_threads);
132 env->ImportInt(
"HttpNumThreads",
"XRD_HTTPNUMTHREADS");
133 int num_threads = m_poll_threads;
134 if (env->GetInt(
"HttpNumThreads", num_threads)) {
135 if (num_threads <= 0 || num_threads > 1'000) {
136 m_log->Error(
kLogXrdClHttp,
"Invalid value for the number of threads to use for curl operations (%d); using default value of %d", num_threads, m_poll_threads);
137 num_threads = m_poll_threads;
138 env->PutInt(
"HttpNumThreads", num_threads);
140 m_log->Debug(
kLogXrdClHttp,
"Using %d threads for curl operations", num_threads);
145 env->ImportInt(
"HttpStallTimeout",
"XRD_HTTPSTALLTIMEOUT");
147 if (env->GetInt(
"HttpStallTimeout", stall_timeout)) {
148 if (stall_timeout < 0 || stall_timeout > 86'400) {
151 env->PutInt(
"HttpStallTimeout", stall_timeout);
153 m_log->Debug(
kLogXrdClHttp,
"Using %d seconds for the stall timeout", stall_timeout);
159 env->ImportInt(
"HttpSlowRateBytesSec",
"XRD_HTTPSLOWRATEBYTESSEC");
161 if (env->GetInt(
"HttpSlowRateBytesSec", slow_xfer_rate)) {
162 if (slow_xfer_rate < 0 || slow_xfer_rate > (1024 * 1024 * 1024)) {
165 env->PutInt(
"HttpSlowRateBytesSec", slow_xfer_rate);
167 m_log->Debug(
kLogXrdClHttp,
"Using %d bytes/sec for the slow transfer rate threshold", slow_xfer_rate);
175 struct timespec mct{2, 0};
176 if (env->GetString(
"HttpMinimumHeaderTimeout", val) && !val.empty()) {
178 if (!ParseTimeout(val, mct, errmsg)) {
179 m_log->Error(kLogXrdClHttp,
"Failed to parse the minimum client timeout (%s): %s", val.c_str(), errmsg.c_str());
184 struct timespec dht{9, 500'000'000};
185 if (env->GetString(
"HttpDefaultHeaderTimeout", val) && !val.empty()) {
188 m_log->Error(
kLogXrdClHttp,
"Failed to parse the default header timeout (%s): %s", val.c_str(), errmsg.c_str());
197 for (
unsigned idx=0; idx<m_poll_threads; idx++) {
198 auto wk = std::make_unique<XrdClHttp::CurlWorker>(m_queue, cache, m_log);
201 wkp->Start(std::move(wk), std::move(t));
204 std::thread t([
this]{Monitor();});
205 m_monitor_tid = std::move(t);
207 m_initialized =
true;
223 std::unique_lock lock(m_shutdown_lock);
224 m_shutdown_requested_cv.wait_for(
226 std::chrono::seconds(5),
227 []{
return m_shutdown_requested;}
229 if (m_shutdown_requested) {
234 auto now = std::chrono::system_clock::now();
236 std::string monitoring =
"{\"event\": \"xrdclhttp\", "
237 "\"start\": " + std::to_string(std::chrono::duration<double>(m_start.time_since_epoch()).count()) +
","
238 "\"now\": " + std::to_string(std::chrono::duration<double>(now.time_since_epoch()).count()) +
","
243 m_log->Info(
kLogXrdClHttp,
"Client monitoring statistics: %s", monitoring.c_str());
244 if (!m_stats_location.empty())
246 auto stats_tmp = m_stats_location +
".XXXXXX";
247 std::vector<char> stats_vector(stats_tmp.size() + 1,
'\0');
248 memcpy(&stats_vector[0], stats_tmp.data(), stats_tmp.size() + 1);
249 auto fd = mkstemp(&stats_vector[0]);
251 m_log->Warning(
kLogXrdClHttp,
"Failed to create temporary stats file %s: %s", m_stats_location.c_str(), strerror(errno));
254 auto nb =
write(fd, monitoring.data(), monitoring.size());
255 if (nb !=
static_cast<ssize_t
>(monitoring.size())) {
256 if (nb == -1) m_log->Warning(
kLogXrdClHttp,
"Failed to write statistics into temporary file %s: %s", &stats_vector[0], strerror(errno));
257 else m_log->Warning(
kLogXrdClHttp,
"Failed to write statistics into temporary file %s: short write", &stats_vector[0]);
262 auto rv =
rename(&stats_vector[0], m_stats_location.c_str());
264 m_log->Warning(
kLogXrdClHttp,
"Failed to atomically rename stats file to final destination %s: %s", m_stats_location.c_str(), strerror(errno));
272void SetIfEmpty(
XrdCl::Env *env,
XrdCl::Log &log,
const std::string &optName,
const std::string &envName) {
276 if (!env->
GetString(optName, val) || val.empty()) {
280 if (env->
GetString(optName, val) && !val.empty()) {
281 log.
Info(
kLogXrdClHttp,
"Setting %s to value '%s'", optName.c_str(), val.c_str());
288Factory::SetupX509() {
291 SetIfEmpty(env, *m_log,
"HttpCertFile",
"XRD_HTTPCERTFILE");
292 SetIfEmpty(env, *m_log,
"HttpCertDir",
"XRD_HTTPCERTDIR");
293 SetIfEmpty(env, *m_log,
"HttpClientCertFile",
"XRD_HTTPCLIENTCERTFILE");
294 SetIfEmpty(env, *m_log,
"HttpClientKeyFile",
"XRD_HTTPCLIENTKEYFILE");
296 int disable_proxy = 0;
297 env->
PutInt(
"HttpDisableX509", 0);
298 env->
ImportInt(
"HttpDisableX509",
"XRD_HTTPDISABLEX509");
300 std::string filename;
302 if (!disable_proxy && (!env->
GetString(
"HttpClientCertFile", filename) || filename.empty())) {
303 if ((filename_char = getenv(
"X509_USER_PROXY"))) {
304 filename = filename_char;
306 if (filename.empty()) {
307 filename =
"/tmp/x509up_u" + std::to_string(geteuid());
309 if (
access(filename.c_str(), R_OK) == 0) {
310 m_log->Debug(
kLogXrdClHttp,
"Using X509 proxy file found at %s for TLS client credential", filename.c_str());
311 env->
PutString(
"HttpClientCertFile", filename);
312 env->
PutString(
"HttpClientKeyFile", filename);
315 if ((!env->
GetString(
"HttpCertDir", filename) || filename.empty()) && (filename_char = getenv(
"X509_CERT_DIR"))) {
316 env->
PutString(
"HttpCertDir", filename_char);
324 std::unique_lock lock(m_shutdown_lock);
325 m_shutdown_requested =
true;
326 m_shutdown_requested_cv.notify_one();
328 if (m_monitor_tid.joinable()) {
329 m_monitor_tid.join();
336 m_queue->Produce(std::move(operation));
342 if (!m_initialized) {
return nullptr;}
343 return new File(m_queue, m_log);
349 if (!m_initialized) {
return nullptr;}
357 return static_cast<void*
>(
new Factory());
void * XrdClGetPlugIn(const void *)
XrdVERSIONINFO(XrdClGetPlugIn, XrdClGetPlugIn) using namespace XrdClHttp
static void SetSlowRateBytesSec(int rate)
static void SetStallTimeout(int stall_interval)
static int GetDefaultSlowRateBytesSec()
static int GetDefaultStallTimeout()
static void RunStatic(CurlWorker *myself)
static std::string GetMonitoringJson()
void Produce(std::unique_ptr< XrdClHttp::CurlOperation > operation)
virtual XrdCl::FilePlugIn * CreateFile(const std::string &url) override
Create a file plug-in for the given URL.
static struct timespec GetHeaderTimeoutWithDefault(time_t oper_timeout)
static std::string GetMonitoringJson()
static void SetDefaultHeaderTimeout(struct timespec &ts)
static void SetMinimumHeaderTimeout(struct timespec &ts)
static std::string GetMonitoringJson()
static unsigned GetDefaultMaxPendingOps()
static VerbsCache & Instance()
virtual XrdCl::FileSystemPlugIn * CreateFileSystem(const std::string &url) override
Create a file system plug-in for the given URL.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
bool PutString(const std::string &key, const std::string &value)
bool ImportString(const std::string &key, const std::string &shellKey)
bool ImportInt(const std::string &key, const std::string &shellKey)
bool GetString(const std::string &key, std::string &value)
bool GetInt(const std::string &key, int &value)
An interface for file plug-ins.
An interface for file plug-ins.
void Info(uint64_t topic, const char *format,...)
Print an info.
bool ParseTimeout(const std::string &duration, struct timespec &, std::string &errmsg)
const uint64_t kLogXrdClHttp
const int DefaultRequestTimeout