XRootD
Loading...
Searching...
No Matches
XrdOssArcStage.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d O s s A r c S t a g e . c c */
4/* */
5/* (c) 2024 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <queue>
32#include <string>
33#include <set>
34
35#include <errno.h>
36#include <fcntl.h>
37#include <stdlib.h>
38
39#include "Xrd/XrdScheduler.hh"
43#include "XrdOuc/XrdOucEnv.hh"
44#include "XrdOuc/XrdOucProg.hh"
45#include "XrdSys/XrdSysFD.hh"
48
49/******************************************************************************/
50/* S t a t i c O b j e c t s */
51/******************************************************************************/
52
53namespace XrdOssArcGlobals
54{
55extern XrdScheduler* schedP;
56
57extern XrdSysError Elog;
58
59extern XrdOssArcConfig Config;
60
63
64struct ActInfo
65 {int rc;
66 const char* path;
67 char* pMem;
68
69 ActInfo(const char* p, bool cpy=false) : rc(0)
70 {if (cpy) path = pMem = strdup(p);
71 else {path = p; pMem = 0;}
72 }
73
74 ~ActInfo() {if (pMem) free(pMem);}
75 };
76
77bool cmpLess(const ActInfo* a, const ActInfo* b)
78 {return strcmp(a->path, b->path) < 0;}
79
80std::set<ActInfo*, decltype(&cmpLess)> Active(&cmpLess);
81
82std::queue<const char*> Pending;
83}
84using namespace XrdOssArcGlobals;
85
86/******************************************************************************/
87/* D o I t */
88/******************************************************************************/
89
91{
92 TraceInfo("Bring_Online",0);
93 const char* nxtPath;
94 time_t seTime;
95 int fd;
96
97// The arcvPath is the path of the file in the locally mounted tape buffer.
98// Simply open it to force it to be staged online.
99//
100do{DEBUG("Staging "<<arcvPath);
101 seTime = time(0);
102 if ((fd = XrdSysFD_Open(arcvPath, O_RDONLY)) < 0)
103 StageError(errno, "open/stage file", arcvPath);
104 else {close(fd);
105 seTime = time(0) - seTime;
106 DEBUG(arcvPath<<" staged in "<<seTime<<" second(s)");
107 }
108
109// Check if there is something pending that we can do now
110//
111 schedMtx.Lock();
112 if (Pending.empty()) break;
113 nxtPath = Pending.front();
114 Pending.pop();
115 schedMtx.UnLock();
116
117// Reset this object to handle the path
118//
119 Reset(nxtPath);
120
121 } while(true);
122
123// Do final reset as we finished processing
124//
125 Reset(0);
126
127// We are done, so delete this object and return
128//
129 int n = Config.maxStage++; // schedMtx is still held
130 schedMtx.UnLock();
131 DEBUG("Staging queue empty; MaxStage="<<n+1);
132 delete this;
133}
134
135/******************************************************************************/
136/* Private: i s O n l i n e */
137/******************************************************************************/
138
140{
141 TraceInfo("isOnline",0);
142 int rc, finrc;
143
144 DEBUG("Running "<<Config.MssComName<<" online "<<path);
145 rc = Config.MssComProg->Run("online", path);
146
147// Adjust return code. Note that XrdOucProg return -status!
148//
149 if (rc < -1 || rc > 1) finrc = -1;
150 else finrc = -rc;
151 DEBUG("MssComCmd returned "<<rc<<" -> "<<finrc);
152
153 return static_cast<MssRC>(finrc);
154}
155
156/******************************************************************************/
157/* Private: R e s e t */
158/******************************************************************************/
159
160void XrdOssArcStage::Reset(const char* path)
161{
162
163// Remove ourselves from the active set if we still have a path
164//
165 if (arcvPath)
166 {ActInfo aInfo(arcvPath);
167 stageMtx.Lock();
168 auto it = Active.find(&aInfo);
169 if (it != Active.end())
170 {ActInfo* aiP = *it;
171 Active.erase(it);
172 delete aiP;
173 }
175 }
176
177// Replace out path with the new path
178//
179 arcvPath = path;
180}
181
182/******************************************************************************/
183/* S t a g e */
184/******************************************************************************/
185
186int XrdOssArcStage::Stage(const char *path, const char* mssPath)
187{
188 TraceInfo("Stage",0);
189 ActInfo aInfo(path);
190
191// Check if this is being staged
192//
193 stageMtx.Lock();
194 auto it = Active.find(&aInfo);
195 if (it != Active.end())
196 {int rc;
197 if ((*it)->rc == 0) rc = EINPROGRESS;
198 else {rc = (*it)->rc;
199 ActInfo* aiP = *it;
200 Active.erase(it);
201 delete aiP;
202 }
203 stageMtx.UnLock();
204 return rc;
205 } else stageMtx.UnLock();
206
207// Make sure the path exists and is actually online
208//
209 MssRC mssRC = isOnline(mssPath);
210 switch(mssRC)
211 {case isFalse: break;
212 case isTrue: return 0; break;
213 default: return EINVAL; break;
214 }
215
216// Create a an action information object. This will copy the path and we
217// can use the copy in other places as the pointer i
218//
219 ActInfo* stageInfo = new ActInfo(path, true);
220
221// Add the path to the staging set. Another thread may have beat us to it.
222//
223 stageMtx.Lock();
224 auto iResult = Active.insert(stageInfo);
225 stageMtx.UnLock();
226 if (!iResult.second)
227 {delete stageInfo;
228 return EINPROGRESS;
229 }
230
231// Schedule this staging request if we are allowed to do so
232//
233 int smx;
234 schedMtx.Lock();
235 if (Config.maxStage)
236 {smx = Config.maxStage--;
237 schedMtx.UnLock();
238 XrdOssArcStage *asP = new XrdOssArcStage(stageInfo->path);
239 schedP->Schedule((XrdJob*)asP);
240 return EINPROGRESS;
241 } else smx = 0;
242
243// Too many things being staged, so queue this request
244//
245 Pending.push(stageInfo->path);
246 schedMtx.UnLock();
247
248// Do some debugging
249//
250 DEBUG("MaxStage="<<smx<<" staging '"<<path<<(smx?"' scheduled":"' queued"));
251
252// All done
253//
254 return EINPROGRESS;
255}
256
257/******************************************************************************/
258/* Private: S t a g e E r r o r */
259/******************************************************************************/
260
261void XrdOssArcStage::StageError(int rc, const char* what, const char* path)
262{
263 ActInfo aInfo(arcvPath);
264
265// Flag this request as failed
266//
267 stageMtx.Lock();
268 auto it = Active.find(&aInfo);
269 stageMtx.UnLock();
270 if (it != Active.end()) (*it)->rc = rc;
271
272// Issue error message
273//
274 Elog.Emsg("Stage", rc, what, path);
275
276// We now must clear our arcvPath to prevent removal from the active set
277//
278 arcvPath = 0;
279}
#define DEBUG(x)
#define TraceInfo(x, y)
#define close(a)
Definition XrdPosix.hh:48
XrdJob(const char *desc="")
Definition XrdJob.hh:51
virtual void DoIt() override
XrdOssArcStage(const char *aPath)
static MssRC isOnline(const char *path)
static int Stage(const char *path, const char *mssPath)
XrdOssArcConfig Config
Definition XrdOssArc.cc:68
XrdSysMutex stageMtx
XrdScheduler * schedP
Definition XrdOssArc.cc:66
std::set< ActInfo *, decltype(&cmpLess)> Active & cmpLess
std::queue< const char * > Pending
XrdSysError Elog(0, "OssArc_")
XrdSysMutex schedMtx
ActInfo(const char *p, bool cpy=false)