XRootD
Loading...
Searching...
No Matches
XrdNetPMarkFF.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d N e t P M a r k C f g . h h */
4/* */
5/* (c) 2021 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdint>
32#include <stdio.h>
33#include <stdlib.h>
34#include <string.h>
35#include <time.h>
36#include <unistd.h>
37#include <sys/socket.h>
38#include <sys/time.h>
39#include <sys/types.h>
40
41#include "Xrd/XrdScheduler.hh"
43#include "XrdNet/XrdNetMsg.hh"
45#include "XrdNet/XrdNetUtils.hh"
46#include "XrdSys/XrdSysError.hh"
47#include "XrdSys/XrdSysTrace.hh"
48
49/******************************************************************************/
50/* L o c a l M a c r o s */
51/******************************************************************************/
52
53#define TRACE(txt) if (doTrace) SYSTRACE(Trace->, tident, epName, 0, txt)
54
55#define DEBUG(txt) if (doDebug) SYSTRACE(Trace->, tident, epName, 0, txt)
56
57#define EPName(ep) const char *epName = ep
58
59/******************************************************************************/
60/* F i r e f l y P a c k e t T e m p l a t e */
61/******************************************************************************/
62
63namespace
64{
65const char *ffFmt0 =
66"<134>1 - %s xrootd - firefly-json - " //RFC5424 syslog header (abbreviated)
67"{"
68 "\"version\":1,"
69 "\"flow-lifecycle\":{"
70 "\"state\":\"%%s\"," //-> start | ongoing | end
71 "\"current-time\":\"%%s\"," //-> yyyy-mm-ddThh:mm:ss.uuuuuu+00:00
72 "\"start-time\":\"%s\""
73 "%%s" //-> ,"end-time":"<date-time>"
74 "},"
75 "\"usage\":{\"received\":%%llu,\"sent\":%%llu},"
76 "\"netlink\":{\"rtt\":%%u.%%.03u},";
77
78const char *ffFmt1 =
79 "\"context\":{"
80 "\"experiment-id\":%d,"
81 "\"activity-id\":%d"
82 "%s" //-> ,application:<appname>
83 "},";
84
85const char *ffFmt2 =
86 "\"flow-id\":{"
87 "\"afi\":\"ipv%c\"," //-> ipv4 | ipv6
88 "\"src-ip\":\"%s\"," // source which is always server (us)
89 "\"dst-ip\":\"%s\"," // dest which is always client
90 "\"protocol\":\"tcp\","
91 "\"src-port\":%d,"
92 "\"dst-port\":%d"
93 "}"
94"}";
95
96const char *ffApp = ",\"application\":\"%.*s\"";
97
98const char *ffEnd = ",\"end-time\":\"%s\"";
99}
100
101/******************************************************************************/
102/* s t a t i c O b j e c t s */
103/******************************************************************************/
104
105namespace XrdNetPMarkConfig
106{
107
108// Other configuration values
109//
110extern XrdSysError *eDest;
111extern XrdNetMsg *netMsg;
112extern XrdNetMsg *netOrg;
113extern XrdScheduler *Sched;
114extern XrdSysTrace *Trace;
115
116extern char *ffDest;
117extern int ffPortO;
118extern int ffEcho;
119extern bool doDebug;
120extern bool doTrace;
121
122extern const char *myHostName;
123}
124using namespace XrdNetPMarkConfig;
125
126/******************************************************************************/
127/* T h r e a d I n t e r f a c e s */
128/******************************************************************************/
129/*
130namespace
131{
132void *Refresh(void *carg)
133 {int intvl = *(int *)carg;
134 while(true) {XrdSysTimer::Snooze(intvl); XrdNetPMarkCfg::Ping();}
135 }
136XrdSysMutex ffMutex;
137}
138*/
139
140/******************************************************************************/
141/* Private: E m i t */
142/******************************************************************************/
143
144bool XrdNetPMarkFF::Emit(const char *state, const char *cT, const char *eT)
145{
146 EPName("Emit");
147 struct sockStats ss;
148 char msgBuff[1024];
149
150 SockStats(ss);
151
152 int n = snprintf(msgBuff, sizeof(msgBuff), ffHdr, state, cT, eT,
153 ss.bRecv, ss.bSent, ss.msRTT, ss.usRTT);
154
155 if (n + ffTailsz >= (int)sizeof(msgBuff))
156 {eDest->Emsg("PMarkFF", "invalid json; msgBuff truncated.");
157 fdOK = odOK = false;
158 return false;
159 }
160
161 memcpy(msgBuff+n, ffTail, ffTailsz+1);
162
163 if (fdOK)
164 {DEBUG("Sending pmark s-msg: " <<msgBuff);
165 if (netMsg->Send(msgBuff, n+ffTailsz) < 0)
166 {fdOK = false;
167 return false;
168 }
169 }
170
171 if (odOK)
172 {DEBUG("Sending pmark o-msg: " <<(netMsg ? "=s-msg" : msgBuff));
173 if (netOrg->Send(oDest, *mySad, msgBuff, n+ffTailsz) < 0)
174 {odOK = false;
175 return false;
176 }
177 }
178
179 return true;
180}
181
182/******************************************************************************/
183/* Private: g e t U T C */
184/******************************************************************************/
185
186const char *XrdNetPMarkFF::getUTC(char *utcBuff, int utcBLen)
187{
188 struct timeval tod;
189 struct tm utcDT;
190 char *bP;
191
192// Get the current time in UTC
193//
194 gettimeofday(&tod, 0);
195 gmtime_r(&tod.tv_sec, &utcDT);
196
197// Format this ISO-style
198//
199 size_t n = strftime(utcBuff, utcBLen, "%FT%T", &utcDT);
200 bP = utcBuff + n; utcBLen -= n;
201 snprintf(bP, utcBLen, ".%06u+00:00", static_cast<unsigned int>(tod.tv_usec));
202
203// Return result
204//
205 return utcBuff;
206}
207
208/******************************************************************************/
209/* P i n g */
210/******************************************************************************/
211/*
212void XrdNetPMarkCfg::Ping()
213{
214// Tell every registered task to send out a continuation
215//
216 ffMutex.Lock();
217 for (std::set<XdNetPMarkFF*> it = ffTasks.begin(); it!= ffTasks.end(); it++)
218???
219 ffMutex.UnLock();
220}
221*/
222/******************************************************************************/
223/* R e g i s t r y */
224/******************************************************************************/
225/*
226XrdNetMsg *XrdNetPMarkCfg::netMsg = 0;
227std::set<XrdNetPMarkFF*> XrdNetPMarkCfg::ffTasks;
228
229void XrdNetPMarkCfg::Registry(XrdNetPMarkFF *ffobj, bool doadd)
230{
231// Add or delete ityem from task list
232//
233 ffMutex.Lock();
234 if (doadd) ffTasks.insert(ffObj);
235 else ffTasks.erase(ffObj);
236 ffMutex.UnLock();
237}
238
239// This is firefly so we must get a netmsg object
240//
241 bool aOK;
242 netMsg = new XrdNetMsg(eLog, ffDest, aOK);
243 if (!aOK)
244 {eLog->Emsg("Config", "Unable to create UDP tunnel to", ffDest);
245 return 0;
246 }
247
248// If there is an interval, start a thread to handle continuations
249//
250 if (ffIntvl && XrdSysThread::Run(&tid,Refresh,(void *)&ffIntvl,0,"pmark")
251 {eDest->Emsg(epname, errno, "start pmark refresh timer");
252 return 0;
253 }
254*/
255
256/******************************************************************************/
257/* D e s t r u c t o r */
258/******************************************************************************/
259
261{
262// If all is well, emit the closing message
263//
264 if (fdOK || odOK)
265 {char utcBuff[40], endBuff[80];
266 snprintf(endBuff, sizeof(endBuff), ffEnd,
267 getUTC(utcBuff, sizeof(utcBuff)));
268 Emit("end", utcBuff, endBuff);
269 }
270
271// Cleanup
272//
273 if (mySad) delete(mySad);
274 if (oDest) free(oDest);
275 if (ffHdr) free(ffHdr);
276 if (ffTail) free(ffTail);
277 if (xtraFH) delete xtraFH;
278};
279
280/******************************************************************************/
281/* S o c k S t a t s */
282/******************************************************************************/
283
284#ifdef __linux__
285#include <linux/tcp.h>
286#endif
287
288void XrdNetPMarkFF::SockStats(struct sockStats &ss)
289{
290#ifndef __linux__
291 memset(&ss, 0, sizeof(struct sockStats));
292#else
293 EPName("SockStats");
294 struct tcp_info tcpInfo;
295 socklen_t tiLen = sizeof(tcpInfo);
296
297 if (getsockopt(sockFD, IPPROTO_TCP, TCP_INFO, (void *)&tcpInfo, &tiLen) == 0)
298 {ss.bRecv = static_cast<uint64_t>(tcpInfo.tcpi_bytes_received);
299 ss.bSent = static_cast<uint64_t>(tcpInfo.tcpi_bytes_acked);
300 ss.msRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt/1000);
301 ss.usRTT = static_cast<uint32_t>(tcpInfo.tcpi_rtt%1000);
302 } else {
303 memset(&ss, 0, sizeof(struct sockStats));
304 DEBUG("Unable to get TCP information errno=" << strerror(errno));
305 }
306#endif
307}
308
309/******************************************************************************/
310/* S t a r t */
311/******************************************************************************/
312
314{
315 char appInfo[128], clIP[INET6_ADDRSTRLEN+2], svIP[INET6_ADDRSTRLEN+2];
316 int clPort, svPort;
317 char clType, svType;
318 bool fdok = false, odok = false;
319
320// Preform app if we need to
321//
322 if (!appName) *appInfo = 0;
323 else snprintf(appInfo,sizeof(appInfo),ffApp,sizeof(appInfo)-20,appName);
324
325// Get the file descriptor for the socket
326//
327 sockFD = addr.SockFD();
328
329// Obtain connectivity information about the peer and ourselves. We really
330// should obtain our external address and use that but the issue is that
331// we may have multiple external addresses and the client determines which
332// one actually gets used. So, it's complicated. A TODO.
333//
334 clPort = XrdNetUtils::GetSokInfo( sockFD, clIP, sizeof(clIP), clType);
335 if (clPort < 0)
336 {eDest->Emsg("PMarkFF", clPort, "get peer information.");
337 return false;
338 }
339
340 svPort = XrdNetUtils::GetSokInfo(-sockFD, svIP, sizeof(svIP), svType);
341 if (svPort < 0)
342 {eDest->Emsg("PMarkFF", clPort, "get self information.");
343 return false;
344 }
345
346// If there is no special collector, indicate so
347//
348 if (netMsg) fdok = true;
349
350// If the messages need to flow to the origin, get the destination information
351//
352 if (netOrg)
353 {const XrdNetSockAddr *urSad = addr.NetAddr();
354 if (!urSad) eDest->Emsg("PMarkFF", "unable to get origin address.");
355 else {char buff[1024];
356 mySad = new XrdNetSockAddr;
357 memcpy(mySad, urSad, sizeof(XrdNetSockAddr));
358 mySad->v4.sin_port = htons(static_cast<uint16_t>(ffPortO));
359 snprintf(buff, sizeof(buff), "%s:%d", clIP, ffPortO);
360 oDest = strdup(buff);
361 odok = true;
362 }
363 }
364
365// If we cannot report anywhere then indicate we failed
366//
367 if (!fdok && !odok) return false;
368
369// Format the base firefly template. Note that the client determines the
370// address family that is being used.
371//
372 char utcBuff[40], bseg0[512];
373 int len0 = snprintf(bseg0, sizeof(bseg0), ffFmt0, myHostName,
374 getUTC(utcBuff, sizeof(utcBuff)));
375 if (len0 >= (int)sizeof(bseg0))
376 {eDest->Emsg("PMarkFF", "invalid json; bseg0 truncated.");
377 return false;
378 }
379
380 ffHdr = strdup(bseg0);
381
382 char bseg1[256];
383 int len1 = snprintf(bseg1, sizeof(bseg1), ffFmt1, eCode, aCode, appInfo);
384 if (len1 >= (int)sizeof(bseg1))
385 {eDest->Emsg("PMarkFF", "invalid json; bseg1 truncated.");
386 return false;
387 }
388
389 char bseg2[256];
390 int len2 = snprintf(bseg2, sizeof(bseg2), ffFmt2,
391 clType, svIP, clIP, svPort, clPort);
392 if (len2 >= (int)sizeof(bseg2))
393 {eDest->Emsg("PMarkFF", "invalid json; cl bseg2 truncated.");
394 return false;
395 }
396
397 ffTailsz = len1 + len2;
398 ffTail = (char *)malloc(ffTailsz + 1);
399 strcpy(ffTail, bseg1);
400 strcpy(ffTail+len1, bseg2);
401
402// OK, we now can emit the starting packet
403//
404 fdOK = fdok;
405 odOK = odok;
406 return Emit("start", utcBuff, "");
407}
#define DEBUG(x)
static XrdSysError eDest(0,"crypto_")
#define EPName(ep)
struct sockaddr_in v4
#define IPPROTO_TCP
const XrdNetSockAddr * NetAddr()
int Send(const char *buff, int blen=0, const char *dest=0, int tmo=-1)
Definition XrdNetMsg.cc:70
bool Start(XrdNetAddrInfo &addr)
virtual ~XrdNetPMarkFF()
const char * appName
static int GetSokInfo(int fd, char *theAddr, int theALen, char &theType)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysTrace * Trace
XrdScheduler * Sched
XrdSysError * eDest
const char * myHostName