XRootD
Loading...
Searching...
No Matches
XrdFrmMigrate.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d F r m M i g r a t e . c c */
4/* */
5/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstring>
33#include <strings.h>
34#include <utime.h>
35#include <sys/param.h>
36#include <sys/types.h>
37
38#include "XrdOss/XrdOss.hh"
39#include "XrdOss/XrdOssPath.hh"
41#include "XrdOuc/XrdOucTList.hh"
43#include "XrdFrc/XrdFrcTrace.hh"
44#include "XrdFrm/XrdFrmFiles.hh"
51#include "XrdSys/XrdSysTimer.hh"
52
53using namespace XrdFrc;
54using namespace XrdFrm;
55
56/******************************************************************************/
57/* S t a t i c M e m b e r s */
58/******************************************************************************/
59
60XrdFrmFileset *XrdFrmMigrate::fsDefer = 0;
61
62int XrdFrmMigrate::numMig = 0;
63
64/******************************************************************************/
65/* Private: A d d */
66/******************************************************************************/
67
68void XrdFrmMigrate::Add(XrdFrmFileset *sP)
69{
70 EPNAME("Add");
71 const char *Why;
72 time_t xTime;
73
74// Check to see if the file is really eligible for purging
75//
76 if ((Why = Eligible(sP, xTime)))
77 {DEBUG(sP->basePath() <<"cannot be migrated; " <<Why);
78 delete sP;
79 return;
80 }
81
82// Add the file to the migr queue or the defer queue based on mod time
83//
84 if (xTime < Config.IdleHold) Defer(sP);
85 else Queue(sP);
86}
87
88/******************************************************************************/
89/* Private: A d v a n c e */
90/******************************************************************************/
91
92int XrdFrmMigrate::Advance()
93{
94 XrdFrmFileset *fP;
95 int xTime = 0;
96
97// Try to re-add everything in this queue
98//
99 while(fsDefer)
100 {xTime = static_cast<int>(time(0) - fsDefer->baseFile()->Stat.st_mtime);
101 if (xTime < Config.IdleHold) break;
102 fP = fsDefer; fsDefer = fsDefer->Next;
103 if (fP->Refresh(1,0)) Add(fP);
104 else delete fP;
105 }
106
107// Return number of seconds to next advance event
108//
109 return fsDefer ? Config.IdleHold - xTime : 0;
110}
111
112/******************************************************************************/
113/* Private: D e f e r */
114/******************************************************************************/
115
116void XrdFrmMigrate::Defer(XrdFrmFileset *sP)
117{
118 XrdFrmFileset *fP = fsDefer, *pfP = 0;
119 time_t mTime = sP->baseFile()->Stat.st_mtime;
120
121// Insert this entry into the defer queue in ascending mtime order
122//
123 while(fP && fP->baseFile()->Stat.st_mtime < mTime)
124 {pfP = fP; fP = fP->Next;}
125
126// Chain in the fileset
127//
128 sP->Next = fP;
129 if (pfP) pfP->Next = sP;
130 else fsDefer = sP;
131}
132
133/******************************************************************************/
134/* D i s p l a y */
135/******************************************************************************/
136
138{
140 XrdOucTList *tP;
141
142// Type header
143//
144 Say.Say("=====> ", "Migrate configuration:");
145
146// Display what we will scan
147//
148 while(vP)
149 {Say.Say("=====> ", "Scanning ", (vP->Val?"r/w: ":"r/o: "), vP->Name);
150 tP = vP->Dir;
151 while(tP) {Say.Say("=====> ", "Excluded ", tP->text); tP = tP->next;}
152 vP = vP->Next;
153 }
154}
155
156/******************************************************************************/
157/* Private: E l i g i b l e */
158/******************************************************************************/
159
160const char *XrdFrmMigrate::Eligible(XrdFrmFileset *sP, time_t &xTime)
161{
162 XrdOucNSWalk::NSEnt *baseFile = sP->baseFile();
163 XrdOucNSWalk::NSEnt *failFile = sP->failFile();
164 time_t mTimeBF, mTimeLK, nowTime = time(0);
165 const char *eTxt;
166
167// File is inelegible if lockfile mtime is zero (i.e., an mstore placeholder)
168//
169 mTimeLK = static_cast<time_t>(sP->cpyInfo.Attr.cpyTime);
170 if (!mTimeLK) return "migration deferred";
171
172// File is ineligible if it has not changed since last migration
173//
174 mTimeBF = baseFile->Stat.st_mtime;
175 if (mTimeLK >= mTimeBF) return "file unchanged";
176
177// File is ineligible if it has a fail file that is still recent
178//
179 if (failFile && (eTxt=XrdFrmTransfer::checkFF(sP->failPath()))) return eTxt;
180
181// Migration may need to be deferred if the file has been modified too recently
182// (caller will check)
183//
184 xTime = static_cast<int>(nowTime - mTimeBF);
185
186// File can be migrated
187//
188 return 0;
189}
190
191/******************************************************************************/
192/* M i g r a t e */
193/******************************************************************************/
194
195void *XrdMigrateStart(void *parg)
196{
197 (void)parg;
199 return (void *)0;
200}
201
203{
204 XrdFrmFileset *fP;
205 char buff[80];
206 int migWait, wTime;
207
208// If we have not initialized yet, start a thread to handle this
209//
210 if (doinit)
211 {pthread_t tid;
212 int retc;
213 if ((retc = XrdSysThread::Run(&tid, XrdMigrateStart, (void *)0,
214 XRDSYSTHREAD_BIND, "migration scan")))
215 Say.Emsg("Migrate", retc, "create migrtion thread");
216 return;
217 }
218
219// Start the migration sequence, first do a name space scan which will trigger
220// all eligible migrations and defer any that need to wait. We then drain the
221// defer queue and wait for the next period to start.
222//
223do{migWait = Config.WaitMigr; numMig = 0;
224 Scan();
225 while((wTime = Advance()))
226 {if ((migWait -= wTime) <= 0) break;
227 else XrdSysTimer::Snooze(wTime);
228 }
229 while(fsDefer) {fP = fsDefer; fsDefer = fsDefer->Next; delete fP;}
230 sprintf(buff, "%d file%s selected for transfer.",numMig,(numMig==1?"":"s"));
231 Say.Emsg("Migrate", buff);
232 if (migWait > 0) XrdSysTimer::Snooze(migWait);
233 } while(1);
234}
235
236/******************************************************************************/
237/* Private: Q u e u e */
238/******************************************************************************/
239
241{
242 static int reqID = 0;
243 XrdFrcRequest myReq;
244
245// Convert the fileset to a request element
246//
247 memset(&myReq, 0, sizeof(myReq));
248 strlcpy(myReq.User, Config.myProg, sizeof(myReq.User));
249 sprintf(myReq.ID, "Internal%d", reqID++);
251 myReq.addTOD = static_cast<long long>(time(0));
252 if (Config.LogicalPath(sP->basePath(), myReq.LFN, sizeof(myReq.LFN)))
253 {XrdFrmXfrQueue::Add(&myReq, 0, XrdFrcRequest::migQ); numMig++;}
254
255// All done
256//
257 delete sP;
258}
259
260/******************************************************************************/
261/* Private: S c a n */
262/******************************************************************************/
263
264void XrdFrmMigrate::Scan()
265{
266 static const int Opts = XrdFrmFiles::Recursive | XrdFrmFiles::CompressD
268 static time_t lastHP = time(0), nowT = time(0);
269
271 XrdFrmFileset *sP;
272 XrdFrmFiles *fP;
273 char buff[128];
274 int ec = 0, Bad = 0, aFiles = 0, bFiles = 0;
275
276// Purge that bad file table evey 24 hours to keep complaints down
277//
278 if (nowT - lastHP >= 86400) {XrdFrmFileset::Purge(); lastHP = nowT;}
279
280// Indicate scan started
281//
282 VMSG("Scan", "Name space scan started. . .");
283
284// Process each directory
285//
286 do {fP = new XrdFrmFiles(vP->Name, Opts, vP->Dir);
287 while((sP = fP->Get(ec,1)))
288 {aFiles++;
289 if (sP->Screen()) Add(sP);
290 else {delete sP; bFiles++;}
291 }
292 if (ec) Bad = 1;
293 delete fP;
294 } while((vP = vP->Next));
295
296// Indicate scan ended
297//
298 sprintf(buff, "%d file%s with %d error%s", aFiles, (aFiles != 1 ? "s":""),
299 bFiles, (bFiles != 1 ? "s":""));
300 VMSG("Scan", "Name space scan ended;", buff);
301
302// Issue warning if we encountered errors
303//
304 if (Bad) Say.Emsg("Scan", "Errors encountered while scanning for "
305 "migratable files.");
306}
#define DEBUG(x)
#define EPNAME(x)
XrdOucPup XrdCmsParser::Pup & Say
#define VMSG(a,...)
void * XrdMigrateStart(void *parg)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define XRDSYSTHREAD_BIND
char LFN[3072]
static const int migQ
static const int Migrate
long long addTOD
long long cpyTime
int LogicalPath(const char *oldp, char *newp, int newpsz)
VPInfo * pathList
const char * myProg
static const int CompressD
static const int NoAutoDel
XrdFrmFileset * Get(int &rc, int noBase=0)
static const int Recursive
int Refresh(int isMig=0, int doLock=1)
const char * basePath()
XrdOucNSWalk::NSEnt * baseFile()
XrdOucXAttr< XrdFrcXAttrCpy > cpyInfo
int Screen(int needLF=1)
XrdFrmFileset * Next
const char * failPath()
static void Purge()
XrdOucNSWalk::NSEnt * failFile()
static void Queue(XrdFrmFileset *sP)
static void Migrate(int doinit=1)
static void Display()
static const char * checkFF(const char *Path)
static int Add(XrdFrcRequest *rP, XrdFrcReqFile *reqF, int theQ)
XrdOucTList * next
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Snooze(int seconds)
XrdFrmConfig Config