51#include "XrdVersion.hh"
88 std::pair< std::set<std::string>::iterator,
bool > ret =
protocols.insert( protocol );
147 strmqueues.resize( size - 1, 0 );
155 strmqueues.resize( size - 1, 0);
163 uint16_t
Select(
const std::vector<bool> &connected )
166 size_t minval = std::numeric_limits<size_t>::max();
168 for( uint16_t i = 0; i < connected.size() && i < strmqueues.size(); ++i )
170 if( !connected[i] )
continue;
172 if( strmqueues[i] < minval )
175 minval = strmqueues[i];
189 --strmqueues[substrm - 1];
194 std::vector<size_t> strmqueues;
200 bindprefs( std::move( bindprefs ) ), next( 0 )
204 inline const std::string&
Get()
206 std::string &ret = bindprefs[next];
208 if( next >= bindprefs.size() )
214 std::vector<std::string> bindprefs;
303 delete pSecUnloadHandler; pSecUnloadHandler = 0;
322 size_t leftToBeRead = 8 - message.
GetCursor();
323 while( leftToBeRead )
327 leftToBeRead, bytesRead );
331 leftToBeRead -= bytesRead;
336 uint32_t bodySize = *(uint32_t*)(message.
GetBuffer(4));
339 "body", (
void*)&message, bodySize );
354 size_t leftToBeRead = 0;
355 uint32_t bodySize = 0;
357 bodySize = rsphdr->
dlen;
359 if( message.
GetSize() < bodySize + 8 )
362 leftToBeRead = bodySize-(message.
GetCursor()-8);
363 while( leftToBeRead )
371 leftToBeRead -= bytesRead;
394 uint32_t bodySize = rsphdr->
dlen;
397 "kXR_status: invalid message size." );
402 if( message.
GetSize() < bodySize + 8 )
405 size_t leftToBeRead = bodySize-(message.
GetCursor()-8);
406 while( leftToBeRead )
414 leftToBeRead -= bytesRead;
446 channelData.
Set( info );
450 env->
GetInt(
"SubStreamsPerChannel", streams );
451 if( streams < 1 ) streams = 1;
452 info->
stream.resize( streams );
473 channelData.
Get( info );
484 "[%s] Internal error: not enough substreams",
492 return HandShakeMain( handShakeData, channelData );
494 return HandShakeParallel( handShakeData, channelData );
504 channelData.
Get( info );
508 "[%s] Internal error: no channel info",
521 handShakeData->
out = GenerateInitialHSProtocol( handShakeData, info,
532 XRootDStatus st = ProcessServerHS( handShakeData, info );
546 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
556 handShakeData->
out = GenerateProtocol( handShakeData, info,
562 handShakeData->
out = GenerateLogIn( handShakeData, info );
573 XRootDStatus st = ProcessLogInResp( handShakeData, info );
581 if( st.IsOK() && st.code ==
suDone )
590 handShakeData->
out = GenerateEndSession( handShakeData, info );
600 st = DoAuthentication( handShakeData, info );
613 XRootDStatus st = DoAuthentication( handShakeData, info );
621 if( st.IsOK() && st.code ==
suDone )
628 handShakeData->
out = GenerateEndSession( handShakeData, info );
646 XRootDStatus st = ProcessEndSessionResp( handShakeData, info );
648 if( st.IsOK() && st.code ==
suDone )
652 else if( !st.IsOK() )
669 XRootDChannelInfo *info = 0;
670 channelData.Get( info );
674 "[%s] Internal error: no channel info",
675 handShakeData->streamName.c_str());
679 XRootDStreamInfo &sInfo = info->
stream[handShakeData->subStreamId];
687 handShakeData->out = GenerateInitialHSProtocol( handShakeData, info,
699 XRootDStatus st = ProcessServerHS( handShakeData, info );
713 XRootDStatus st = ProcessProtocolResp( handShakeData, info );
721 handShakeData->out = GenerateBind( handShakeData, info );
731 XRootDStatus st = ProcessBindResp( handShakeData, info );
739 return XRootDStatus();
741 return XRootDStatus();
752 channelData.
Get( info );
756 "[%s] Internal error: no channel info",
772 channelData.
Get( info );
779 "Internal error: no channel info, behaving as if TTL has elapsed");
790 env->
GetInt(
"DataServerTTL", ttl );
795 env->
GetInt(
"LoadBalancerTTL", ttl );
802 uint16_t allocatedSIDs = info->
sidManager->GetNumberOfAllocatedSIDs();
804 "TTL: %d, allocated SIDs: %d, open files: %d, bound file objects: %d",
805 info->
streamName.c_str(), (
long long) inactiveTime, ttl, allocatedSIDs,
808 if( info->
openFiles != 0 && info->
finstcnt.load( std::memory_order_relaxed ) != 0 )
811 if( !allocatedSIDs && inactiveTime > ttl )
825 channelData.
Get( info );
831 "Internal error: no channel info, behaving as if stream is broken");
836 env->
GetInt(
"StreamTimeout", streamTimeout );
840 const time_t now = time(0);
842 info->
sidManager->IsAnySIDOldAs( now - streamTimeout );
845 "stream timeout: %d, any SID: %d, wait barrier: %s",
846 info->
streamName.c_str(), (
long long) inactiveTime, streamTimeout,
849 if( inactiveTime < streamTimeout )
852 if( now < info->waitBarrier )
877 channelData.
Get( info );
881 "Internal error: no channel info, cannot multiplex");
898 uint16_t upStream = 0;
899 uint16_t downStream = 0;
904 downStream = hint->
down;
909 std::vector<bool> connected;
910 connected.reserve( info->
stream.size() - 1 );
911 size_t nbConnected = 0;
912 for(
size_t i = 1; i < info->
stream.size(); ++i )
915 connected.push_back(
true );
919 connected.push_back(
false );
921 if( nbConnected == 0 )
927 if( upStream >= info->
stream.size() )
930 "[%s] Up link stream %d does not exist, using 0",
935 if( downStream >= info->
stream.size() )
938 "[%s] Down link stream %d does not exist, using 0",
962 memset( newBuf, 0, 8 );
1036 return PathID( upStream, downStream );
1047 channelData.
Get( info );
1066 uint16_t ret = info->
stream.size();
1070 env->
GetInt(
"TlsNoData", nodata );
1083 if( ( usrTlsStrm0 && usrNoTlsData && srvNoTlsData ) ||
1084 ( srvTlsStrm0 && srvNoTlsData && usrNoTlsData ) )
1090 if( ret == 1 ) ++ret;
1093 if( ret > info->
stream.size() )
1095 info->
stream.resize( ret );
1195 uint16_t numChunks = (req->
readv.
dlen)/16;
1197 for(
size_t i = 0; i < numChunks; ++i )
1199 dataChunk[i].
rlen = htonl( dataChunk[i].rlen );
1200 dataChunk[i].
offset = htonll( dataChunk[i].offset );
1210 for(
size_t i = 0; i < numChunks; ++i )
1212 dataChunk[i].
srcOffs = htonll( dataChunk[i].srcOffs );
1213 dataChunk[i].
srcLen = htonll( dataChunk[i].srcLen );
1214 dataChunk[i].
dstOffs = htonll( dataChunk[i].dstOffs );
1227 for(
size_t i = 0; i < numChunks; ++i )
1229 wrtList[i].
wlen = htonl( wrtList[i].wlen );
1230 wrtList[i].
offset = htonll( wrtList[i].offset );
1314 m->
body.protocol.pval = ntohl( m->
body.protocol.pval );
1315 m->
body.protocol.flags = ntohl( m->
body.protocol.flags );
1326 m->
body.error.errnum = ntohl( m->
body.error.errnum );
1336 m->
body.wait.seconds = htonl( m->
body.wait.seconds );
1346 m->
body.redirect.port = htonl( m->
body.redirect.port );
1356 m->
body.waitresp.seconds = htonl( m->
body.waitresp.seconds );
1366 m->
body.attn.actnum = htonl( m->
body.attn.actnum );
1402 "kXR_status: invalid message size." );
1430 "corrupted (crc32c integrity check failed)." );
1437 "(stream ID mismatch)." );
1445 "(request ID mismatch)." );
1469 "kXR_status: invalid message size." );
1483 if( crcval != cse->
cseCRC )
1486 "corrupted (crc32c integrity check failed)." );
1497 for(
size_t i = 0; i < pgcnt; ++i )
1498 pgoffs[i] = ntohll( pgoffs[i] );
1518 header->
dlen = ntohl( header->
dlen );
1528 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
1529 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
1531 rsp->
body.error.errnum, errmsg );
1541 channelData.
Get( info );
1550 uint16_t nbConnected = 0;
1551 for(
size_t i = 1; i < info->
stream.size(); ++i )
1562 uint16_t subStreamId )
1565 channelData.
Get( info );
1574 if( !info->
stream.empty() )
1580 if( subStreamId == 0 )
1582 CleanUpProtection( info );
1599 channelData.
Get( info );
1612 result.
Set( (
const char*)
"XRootD",
false );
1651 channelData.
Get( info );
1673 "response that we're no longer interested in (timed out)",
1685 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1686 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1698 uint32_t seconds = 0;
1700 seconds = ntohl( rsp->
body.wait.seconds ) + 5;
1704 seconds = ntohl( rsp->
body.waitresp.seconds );
1706 log->
Dump(
XRootDMsg,
"[%s] Got kXR_waitresp response of %u seconds, "
1707 "setting up wait barrier.",
1712 time_t barrier = time(0) + seconds;
1720 uint16_t sid; memcpy( &sid, rsp->
hdr.
streamid, 2 );
1721 std::set<uint16_t>::iterator sidIt = info->
sentOpens.find( sid );
1730 info->
finstcnt.fetch_add( 1, std::memory_order_relaxed );
1766 channelData.
Get( info );
1791 channelData.
Get( info );
1819 sign->
Grab(
reinterpret_cast<char*
>( newreq ), rc );
1831 channelData.
Get( info );
1832 if( info->
finstcnt.load( std::memory_order_relaxed ) > 0 )
1833 info->
finstcnt.fetch_sub( 1, std::memory_order_relaxed );
1842 pSecUnloadHandler->unloaded =
true;
1852 channelData.
Get( info );
1856 env->
GetInt(
"NoTlsOK", notlsok );
1930 channelData.
Get( info );
1948 "[%s] Sending out the initial hand shake + kXR_protocol",
1958 init->
fifth = htonl(2012);
1961 InitProtocolReq( proto, info, expect );
1969 Message *XRootDTransport::GenerateProtocol( HandShakeData *hsData,
1970 XRootDChannelInfo *info,
1975 "[%s] Sending out the kXR_protocol",
1976 hsData->streamName.c_str() );
1978 Message *msg =
new Message();
1983 InitProtocolReq( proto, info, expect );
1991 void XRootDTransport::InitProtocolReq( ClientProtocolRequest *request,
2005 env->
GetInt(
"NoTlsOK", notlsok );
2008 env->
GetInt(
"TlsNoData", tlsnodata );
2010 if (info->encrypted ||
InitTLS())
2013 if (info->encrypted && !(notlsok || tlsnodata))
2016 request->
expect = expect;
2034 Message *msg = hsData->in;
2035 ServerResponseHeader *respHdr = (ServerResponseHeader *)msg->GetBuffer();
2036 ServerInitHandShake *hs = (ServerInitHandShake *)msg->GetBuffer(4);
2041 hsData->streamName.c_str() );
2046 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2047 const uint32_t pv = ntohl(hs->
protover);
2052 if( hsData->subStreamId == 0 )
2054 info->protocolVersion = pv;
2055 info->serverFlags = sInfo.serverFlags;
2059 "[%s] Got the server hand shake response (%s, protocol "
2061 hsData->streamName.c_str(),
2062 ServerFlagsToStr( sInfo.serverFlags ).c_str(),
2063 info->protocolVersion );
2080 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2086 hsData->streamName.c_str() );
2091 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2092 if( rsp->
body.protocol.pval >= 0x297 )
2093 sInfo.serverFlags = rsp->
body.protocol.flags;
2095 if( hsData->subStreamId > 0 )
2098 info->serverFlags = sInfo.serverFlags;
2102 env->
GetInt(
"NoTlsOK", notlsok );
2110 if( !notlsok )
return XRootDStatus(
stFatal,
errTlsError, ENOTSUP,
"TLS not supported" );
2117 "[%s] Falling back to unencrypted transmission, server does "
2118 "not support TLS encryption.",
2119 hsData->streamName.c_str() );
2120 info->encrypted =
false;
2123 if( rsp->
body.protocol.pval >= 0x297 )
2124 info->serverFlags = rsp->
body.protocol.flags;
2128 info->protRespBody =
new ServerResponseBody_Protocol();
2129 info->protRespBody->flags = rsp->
body.protocol.flags;
2130 info->protRespBody->pval = rsp->
body.protocol.pval;
2132 char* bodybuff =
reinterpret_cast<char*
>( &rsp->
body.protocol.secreq );
2133 size_t bodysize = rsp->
hdr.
dlen - 8;
2134 XRootDStatus st = ProcessProtocolBody( bodybuff, bodysize, info );
2140 "[%s] kXR_protocol successful (%s, protocol version %x)",
2141 hsData->streamName.c_str(),
2142 ServerFlagsToStr( info->serverFlags ).c_str(),
2143 info->protocolVersion );
2145 if( !( info->serverFlags &
kXR_haveTLS ) && info->encrypted )
2152 "Server was not configured to support encryption." );
2160 env->
GetInt(
"WantTlsOnNoPgrw", tlsOnNoPgrw );
2161 if( !( info->serverFlags &
kXR_suppgrw ) && tlsOnNoPgrw )
2167 if( info->encrypted )
2170 "[%s] Server does not support PgRead/PgWrite and"
2171 " WantTlsOnNoPgrw is on; enforcing encryption for data.",
2172 hsData->streamName.c_str() );
2182 info->encrypted =
true;
2190 XRootDStatus XRootDTransport::ProcessProtocolBody(
char *bodybuff,
2203 if( bodysize < bifreq->bifILen )
2205 "protocol response." );
2206 std::string bindprefs_str( bodybuff, bifreq->
bifILen );
2207 std::vector<std::string> bindprefs;
2209 info->bindSelector.reset(
new BindPrefSelector( std::move( bindprefs ) ) );
2217 if( bodysize >= 6 && secreq->
theTag ==
'S' )
2219 memcpy( &info->protRespBody->secreq, secreq, bodysize );
2220 info->protRespSize = bodysize + 8 ;
2223 return XRootDStatus();
2235 "[%s] Sending out the bind request",
2236 hsData->streamName.c_str() );
2239 Message *msg =
new Message(
sizeof( ClientBindRequest ) );
2240 ClientBindRequest *bindReq = (ClientBindRequest *)msg->GetBuffer();
2243 memcpy( bindReq->
sessid, info->sessionId, 16 );
2261 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2266 hsData->streamName.c_str() );
2270 info->stream[hsData->subStreamId].pathId = rsp->
body.bind.substreamid;
2272 hsData->streamName.c_str() );
2274 return XRootDStatus();
2292 char *cgiBuffer =
new char[1024 + info->logintoken.size()];
2293 std::string appName;
2294 std::string monInfo;
2295 env->GetString(
"AppName", appName );
2296 env->GetString(
"MonInfo", monInfo );
2297 if( info->logintoken.empty() )
2299 snprintf( cgiBuffer, 1024,
2300 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2301 "xrd.hostname=%s&xrd.rn=%s", countryCode.c_str(), timeZone,
2302 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION );
2306 snprintf( cgiBuffer, 1024,
2307 "xrd.cc=%s&xrd.tz=%d&xrd.appname=%s&xrd.info=%s&"
2308 "xrd.hostname=%s&xrd.rn=%s&%s", countryCode.c_str(), timeZone,
2309 appName.c_str(), monInfo.c_str(), hostName, XrdVERSION, info->logintoken.c_str() );
2311 uint16_t cgiLen = strlen( cgiBuffer );
2317 Message *msg =
new Message(
sizeof(ClientLoginRequest) + cgiLen );
2318 ClientLoginRequest *loginReq = (ClientLoginRequest *)msg->GetBuffer();
2321 loginReq->
pid = ::getpid();
2323 loginReq->
dlen = cgiLen;
2329 int multiProtocol = 0;
2330 env->GetInt(
"MultiProtocol", multiProtocol );
2338 bool dualStack =
false;
2339 bool privateIPv6 =
false;
2340 bool privateIPv4 =
false;
2364 if( !dualStack && hsData->serverAddr )
2377 std::string buffer( 8, 0 );
2378 if( hsData->url->GetUserName().length() )
2379 buffer = hsData->url->GetUserName();
2382 char *name =
new char[1024];
2389 buffer.resize( 8, 0 );
2390 std::copy( buffer.begin(), buffer.end(), (
char*)loginReq->
username );
2392 msg->Append( cgiBuffer, cgiLen, 24 );
2395 "username: %s, cgi: %s, dual-stack: %s, private IPv4: %s, "
2396 "private IPv6: %s", hsData->streamName.c_str(),
2397 loginReq->
username, cgiBuffer, dualStack ?
"true" :
"false",
2398 privateIPv4 ?
"true" :
"false",
2399 privateIPv6 ?
"true" :
"false" );
2401 delete [] cgiBuffer;
2418 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2423 hsData->streamName.c_str() );
2427 if( !info->firstLogIn )
2428 memcpy( info->oldSessionId, info->sessionId, 16 );
2430 if( rsp->
hdr.
dlen == 0 && info->protocolVersion <= 0x289 )
2437 memset( info->sessionId, 0, 16 );
2439 "[%s] Logged in, accepting empty login response.",
2440 hsData->streamName.c_str() );
2441 return XRootDStatus();
2447 memcpy( info->sessionId, rsp->
body.login.sessid, 16 );
2452 hsData->streamName.c_str(), sessId.c_str() );
2459 size_t len = rsp->
hdr.
dlen-16;
2460 info->authBuffer =
new char[len+1];
2461 info->authBuffer[len] = 0;
2462 memcpy( info->authBuffer, rsp->
body.login.sec, len );
2464 hsData->streamName.c_str(), info->authBuffer );
2469 return XRootDStatus();
2482 XRootDStreamInfo &sInfo = info->stream[hsData->subStreamId];
2484 std::string protocolName;
2492 hsData->streamName.c_str() );
2497 info->authEnv =
new XrdOucEnv();
2498 info->authEnv->Put(
"sockname", hsData->clientName.c_str() );
2499 info->authEnv->Put(
"username", hsData->url->GetUserName().c_str() );
2500 info->authEnv->Put(
"password", hsData->url->GetPassword().c_str() );
2503 URL::ParamsMap::const_iterator it;
2504 for( it = urlParams.begin(); it != urlParams.end(); ++it )
2506 if( it->first.compare( 0, 4,
"xrd." ) == 0 ||
2507 it->first.compare( 0, 6,
"xrdcl." ) == 0 )
2508 info->authEnv->Put( it->first.c_str(), it->second.c_str() );
2514 size_t authBuffLen = strlen( info->authBuffer );
2515 char *pars = (
char *)malloc( authBuffLen + 1 );
2516 memcpy( pars, info->authBuffer, authBuffLen );
2519 delete [] info->authBuffer;
2520 info->authBuffer = 0;
2525 XRootDStatus st = GetCredentials( credentials, hsData, info );
2528 CleanUpAuthentication( info );
2531 protocolName = info->authProtocol->Entity.prot;
2539 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2540 protocolName = info->authProtocol->Entity.prot;
2548 "[%s] Sending more authentication data for %s",
2549 hsData->streamName.c_str(), protocolName.c_str() );
2552 char *secTokenData = (
char*)malloc( len );
2553 memcpy( secTokenData, rsp->
body.authmore.data, len );
2555 XrdOucErrInfo ei(
"", info->authEnv);
2556 credentials = info->authProtocol->getCredentials( secToken, &ei );
2565 "[%s] Auth protocol handler for %s refuses to give "
2566 "us more credentials %s",
2567 hsData->streamName.c_str(), protocolName.c_str(),
2569 CleanUpAuthentication( info );
2579 info->authProtocolName = info->authProtocol->Entity.prot;
2584 if( info->protRespBody )
2586 int rc =
XrdSecGetProtection( info->protection, *info->authProtocol, *info->protRespBody, info->protRespSize );
2590 "[%s] XrdSecProtect loaded.", hsData->streamName.c_str() );
2595 "[%s] XrdSecProtect: no protection needed.",
2596 hsData->streamName.c_str() );
2601 "[%s] Failed to load XrdSecProtect: %s",
2602 hsData->streamName.c_str(),
XrdSysE2T( -rc ) );
2603 CleanUpAuthentication( info );
2609 if( !info->protection )
2610 CleanUpAuthentication( info );
2612 pSecUnloadHandler->Register( info->authProtocolName );
2615 "[%s] Authenticated with %s.", hsData->streamName.c_str(),
2616 protocolName.c_str() );
2624 return XRootDStatus();
2631 char *errmsg =
new char[rsp->
hdr.
dlen-3]; errmsg[rsp->
hdr.
dlen-4] = 0;
2632 memcpy( errmsg, rsp->
body.error.errmsg, rsp->
hdr.
dlen-4 );
2634 "[%s] Authentication with %s failed: %s",
2635 hsData->streamName.c_str(), protocolName.c_str(),
2639 info->authProtocol->Delete();
2640 info->authProtocol = 0;
2645 XRootDStatus st = GetCredentials( credentials, hsData, info );
2648 CleanUpAuthentication( info );
2651 protocolName = info->authProtocol->Entity.prot;
2658 info->authProtocolName = info->authProtocol->Entity.prot;
2659 CleanUpAuthentication( info );
2662 "[%s] Authentication with %s failed: unexpected answer",
2663 hsData->streamName.c_str(), protocolName.c_str() );
2671 Message *msg =
new Message(
sizeof(ClientAuthRequest)+credentials->
size );
2673 ClientRequest *req = (ClientRequest*)msg->GetBuffer();
2674 char *reqBuffer = msg->GetBuffer(
sizeof(ClientAuthRequest));
2679 protocolName.length() > 4 ? 4 : protocolName.length() );
2681 memcpy( reqBuffer, credentials->
buffer, credentials->
size );
2706 XrdOucErrInfo ei(
"", info->authEnv);
2716 char *secuidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secuid") : 0;
2717 char *secgidc = (ei.getEnv()) ? ei.getEnv()->Get(
"xrdcl.secgid") : 0;
2722 if(secuidc) secuid = atoi(secuidc);
2723 if(secgidc) secgid = atoi(secgidc);
2726 ScopedFsUidSetter uidSetter(secuid, secgid, hsData->streamName);
2727 if(!uidSetter.IsOk()) {
2728 log->Error(
XRootDTransportMsg,
"[%s] Error while setting (fsuid, fsgid) to (%d, %d)",
2729 hsData->streamName.c_str(), secuid, secgid );
2733 if(secuid >= 0 || secgid >= 0) {
2734 log->Error(
XRootDTransportMsg,
"[%s] xrdcl.secuid and xrdcl.secgid only supported on Linux.",
2735 hsData->streamName.c_str() );
2737 " only supported on Linux" );
2745 XrdNetAddr &srvAddrInfo = *
const_cast<XrdNetAddr *
>(hsData->serverAddr);
2746 srvAddrInfo.
SetTLS( info->encrypted );
2752 info->authProtocol = (*authHandler)( hsData->url->GetHostName().c_str(),
2756 if( !info->authProtocol )
2759 hsData->streamName.c_str() );
2763 std::string protocolName = info->authProtocol->Entity.prot;
2765 hsData->streamName.c_str(), protocolName.c_str() );
2770 credentials = info->authProtocol->getCredentials( 0, &ei );
2774 "[%s] Cannot get credentials for protocol %s: %s",
2775 hsData->streamName.c_str(), protocolName.c_str(),
2777 info->authProtocol->Delete();
2789 if( info->authProtocol )
2790 info->authProtocol->Delete();
2791 delete info->authParams;
2792 delete info->authEnv;
2793 info->authProtocol = 0;
2794 info->authParams = 0;
2805 XrdSysRWLockHelper scope( pSecUnloadHandler->lock );
2808 if( info->protection )
2810 info->protection->Delete();
2811 info->protection = 0;
2813 CleanUpAuthentication( info );
2816 if( info->protRespBody )
2818 delete info->protRespBody;
2819 info->protRespBody = 0;
2820 info->protRespSize = 0;
2832 char errorBuff[1024];
2837 auto ret = authHandler.load( std::memory_order_relaxed );
2838 if( ret )
return ret;
2844 static XrdSysMutex mtx;
2845 XrdSysMutexHelper lck( mtx );
2847 ret = authHandler.load( std::memory_order_relaxed );
2848 if( ret )
return ret;
2852 authHandler.store( ret, std::memory_order_relaxed );
2857 "Unable to get the security framework: %s", errorBuff );
2874 Message *msg =
new Message(
sizeof(ClientEndsessRequest) );
2875 ClientEndsessRequest *endsessReq = (ClientEndsessRequest *)msg->GetBuffer();
2878 memcpy( endsessReq->
sessid, info->oldSessionId, 16 );
2882 " %s", hsData->streamName.c_str(), sessId.c_str() );
2893 uint32_t size = sign->GetSize();
2894 sign->ReAllocate( size + msg->GetSize() );
2895 char* buffer = sign->GetBuffer( size );
2896 memcpy( buffer, msg->GetBuffer(), msg->GetSize() );
2897 msg->Grab( sign->GetBuffer(), sign->GetSize() );
2915 ServerResponse *rsp = (ServerResponse*)hsData->in->GetBuffer();
2929 std::string errorMsg( rsp->
body.error.errmsg, rsp->
hdr.
dlen - 4 );
2931 "kXR_endsess: %s", hsData->streamName.c_str(),
2939 std::string msg( rsp->
body.wait.infomsg, rsp->
hdr.
dlen - 4 );
2941 "kXR_endsess: %s", hsData->streamName.c_str(),
2943 hsData->out = GenerateEndSession( hsData, info );
2954 std::string XRootDTransport::ServerFlagsToStr( uint32_t flags )
2956 std::string repr =
"type: ";
2980 repr.erase( repr.length()-1, 1 );
2991 char *GetDataAsString(
char *msg )
2994 char *fn =
new char[req->
dlen+1];
2995 memcpy( fn, msg + 24, req->
dlen );
3022 char *fn = GetDataAsString( msg );
3023 o <<
"file: " << fn <<
", ";
3025 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
3026 o << std::setbase(10);
3033 o <<
"kXR_compress ";
3045 o <<
"kXR_open_apnd ";
3047 o <<
"kXR_open_read ";
3049 o <<
"kXR_open_updt ";
3051 o <<
"kXR_open_wrto ";
3055 o <<
"kXR_prefname ";
3057 o <<
"kXR_refresh ";
3059 o <<
"kXR_4dirlist ";
3061 o <<
"kXR_replica ";
3067 o <<
"kXR_retstat ";
3079 o <<
"fhtemplt: " << FileHandleToStr( sreq->
fhtemplt );
3091 o <<
"kXR_clone ( ";
3092 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3093 o << std::setbase(10);
3097 o <<
"(src_handle: ";
3098 o << FileHandleToStr( dataChunk[i].srcFH );
3100 o << std::setbase(10);
3101 o <<
"src_offset: " << dataChunk[i].
srcOffs;
3102 o <<
", src_length: " << dataChunk[i].
srcLen;
3103 o <<
", dst_offset: " << dataChunk[i].
dstOffs <<
"); ";
3117 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3131 char *fn = GetDataAsString( msg );;
3132 o <<
"path: " << fn <<
", ";
3137 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3159 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3160 o << std::setbase(10);
3162 o <<
"offset: " << sreq->
offset <<
", ";
3163 o <<
"size: " << sreq->
rlen <<
")";
3173 o <<
"kXR_pgread (";
3174 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3175 o << std::setbase(10);
3177 o <<
"offset: " << sreq->
offset <<
", ";
3178 o <<
"size: " << sreq->
rlen <<
")";
3189 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3190 o << std::setbase(10);
3192 o <<
"offset: " << sreq->
offset <<
", ";
3193 o <<
"size: " << sreq->
dlen <<
")";
3203 o <<
"kXR_pgwrite (";
3204 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3205 o << std::setbase(10);
3207 o <<
"offset: " << sreq->
offset <<
", ";
3208 o <<
"size: " << sreq->
dlen <<
")";
3235 o <<
" unknown subcode: " << sreq->
subcode;
3238 o <<
" (handle: " << FileHandleToStr( sreq->
fhandle );
3239 o << std::setbase(10);
3241 o <<
", numattr: " << nattr;
3249 o <<
", total size: " << req->
dlen <<
")";
3260 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3271 o <<
"kXR_truncate (";
3273 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3276 char *fn = GetDataAsString( msg );
3277 o <<
"file: " << fn;
3280 o << std::setbase(10);
3282 o <<
"offset: " << sreq->
offset;
3292 unsigned char *fhandle = 0;
3297 fhandle = dataChunk[0].
fhandle;
3299 o << FileHandleToStr( fhandle );
3303 o << std::setbase(10);
3308 size += dataChunk[i].
rlen;
3309 o <<
"(offset: " << dataChunk[i].
offset;
3310 o <<
", size: " << dataChunk[i].
rlen <<
"); ";
3313 o <<
"total size: " << size <<
")";
3322 unsigned char *fhandle = 0;
3323 o <<
"kXR_writev (";
3328 uint32_t numChunks = 0;
3332 size += wrtList[i].
wlen;
3337 o << FileHandleToStr( fhandle );
3341 o << std::setbase(10);
3342 o <<
"chunks: " << numChunks <<
", ";
3343 o <<
"total size: " << size <<
")";
3353 char *fn = GetDataAsString( msg );;
3354 o <<
"kXR_locate (";
3355 o <<
"path: " << fn <<
", ";
3363 o <<
"kXR_refresh ";
3365 o <<
"kXR_prefname ";
3371 o <<
"kXR_compress ";
3387 o <<
"destination: ";
3409 case kXR_QPrep: o <<
"kXR_QPrep";
break;
3412 case kXR_Qvisa: o <<
"kXR_Qvisa";
break;
3414 default: o << sreq->
infotype;
break;
3420 o <<
"handle: " << FileHandleToStr( sreq->
fhandle );
3424 o <<
"arg length: " << sreq->
dlen <<
")";
3434 char *fn = GetDataAsString( msg );;
3435 o <<
"path: " << fn <<
")";
3447 char *fn = GetDataAsString( msg );
3448 o <<
"path: " << fn <<
", ";
3450 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
", ";
3451 o << std::setbase(10);
3458 o <<
"kXR_mkdirpath";
3470 char *fn = GetDataAsString( msg );
3471 o <<
"path: " << fn <<
")";
3483 char *fn = GetDataAsString( msg );
3484 o <<
"path: " << fn <<
", ";
3486 o <<
"mode: 0" << std::setbase(8) << sreq->
mode <<
")";
3505 o <<
"kXR_protocol (";
3506 o <<
"clientpv: 0x" << std::setbase(16) << sreq->
clientpv <<
")";
3515 o <<
"kXR_dirlist (";
3516 char *fn = GetDataAsString( msg );;
3517 o <<
"path: " << fn <<
")";
3528 char *fn = GetDataAsString( msg );;
3529 o <<
"data: " << fn <<
")";
3540 o <<
"kXR_prepare (";
3557 o <<
", priority: " << (int) sreq->
prty <<
", ";
3559 char *fn = GetDataAsString( msg );
3561 for( cursor = fn; *cursor; ++cursor )
3562 if( *cursor ==
'\n' ) *cursor =
' ';
3564 o <<
"paths: " << fn <<
")";
3572 o <<
"kXR_chkpoint (";
3580 o <<
"kXR_ckpXeq) ";
3594 o <<
"kXR_unknown (length: " << req->
dlen <<
")";
3603 std::string XRootDTransport::FileHandleToStr(
const unsigned char handle[4] )
3605 std::ostringstream o;
3607 for( uint8_t i = 0; i < 4; ++i )
3609 o << std::setbase(16) << std::setfill(
'0') << std::setw(2);
3610 o << (int)handle[i];
static const int kXR_ckpRollback
struct ClientTruncateRequest truncate
ServerResponseStatus status
struct ClientPgReadRequest pgread
union ServerResponse::@040373375333017131300127053271011057331004327334 body
struct ClientMkdirRequest mkdir
struct ClientAuthRequest auth
static const int kXR_ckpXeq
struct ClientPgWriteRequest pgwrite
struct ClientReadVRequest readv
struct ClientOpenRequest open
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
struct ClientWriteVRequest writev
struct ClientLoginRequest login
struct ClientChmodRequest chmod
struct ClientQueryRequest query
struct ClientReadRequest read
struct ClientMvRequest mv
struct ClientChkPointRequest chkpoint
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
static const int kXR_ckpCommit
struct ClientPrepareRequest prepare
static const int kXR_ckpQuery
struct ClientWriteRequest write
#define kXR_PROTTLSVERSION
struct ClientProtocolRequest protocol
struct ClientLocateRequest locate
struct ClientCloneRequest clone
static const int kXR_ckpBegin
XrdSecBuffer XrdSecParameters
XrdSecProtocol *(* XrdSecGetProt_t)(const char *hostname, XrdNetAddrInfo &endPoint, XrdSecParameters §oken, XrdOucErrInfo *einfo)
Typedef to simplify the encoding of methods returning XrdSecProtocol.
XrdSecBuffer XrdSecCredentials
XrdSecGetProt_t XrdSecLoadSecFactory(char *eBuff, int eBlen, const char *seclib)
int XrdSecGetProtection(XrdSecProtect *&protP, XrdSecProtocol &aprot, ServerResponseBody_Protocol &resp, unsigned int resplen)
#define NEED2SECURE(protP)
This class implements the XRootD protocol security protection.
const char * XrdSysE2T(int errcode)
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
void Grab(char *buffer, uint32_t size)
Grab a buffer allocated outside.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
void ReAllocate(uint32_t size)
Reallocate the buffer to a new location of a given size.
void Allocate(uint32_t size)
Allocate the buffer.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
uint32_t GetCursor() const
Get append cursor.
uint32_t GetSize() const
Get the size of the message.
static TransportManager * GetTransportManager()
Get transport manager.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
void SetIsMarshalled(bool isMarshalled)
Set the marshalling status.
bool IsMarshalled() const
Check if the message is marshalled.
static SIDMgrPool & Instance()
std::shared_ptr< SIDManager > GetSIDMgr(const URL &url)
virtual XRootDStatus Read(char *buffer, size_t size, int &bytesRead)
static void ClearErrorQueue()
Clear the error queue for the calling thread.
Perform the handshake and the authentication for each physical stream.
@ RequestClose
Send a close request.
virtual void WaitBeforeExit()=0
Wait before exit.
Manage transport handler objects.
TransportHandler * GetHandler(const std::string &protocol)
Get a transport handler object for a given protocol.
std::string GetChannelId() const
std::map< std::string, std::string > ParamsMap
bool IsSecure() const
Does the protocol indicate encryption.
bool IsTPC() const
Is the URL used in TPC context.
std::string GetLoginToken() const
Get the login token if present in the opaque info.
static std::string TimeToString(time_t timestamp)
Convert timestamp to a string.
static std::string FQDNToCC(const std::string &fqdn)
Convert the fully qualified host name to country code.
static std::string Char2Hex(uint8_t *array, uint16_t size)
Print a char array as hex.
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
const std::string & GetErrorMessage() const
Get error message.
static uint16_t NbConnectedStrm(AnyObject &channelData)
Number of currently connected data streams.
virtual bool IsStreamTTLElapsed(time_t time, AnyObject &channelData)
Check if the stream should be disconnected.
virtual void Disconnect(AnyObject &channelData, uint16_t subStreamId)
The stream has been disconnected, do the cleanups.
XRootDTransport()
Constructor.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)
Check if the message invokes a stream action.
virtual void WaitBeforeExit()
Wait until the program can safely exit.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
virtual XRootDStatus GetBody(Message &message, Socket *socket)
virtual XRootDStatus GetHeader(Message &message, Socket *socket)
~XRootDTransport()
Destructor.
virtual uint16_t SubStreamNumber(AnyObject &channelData)
Return a number of substreams per stream that should be created.
virtual void FinalizeChannel(AnyObject &channelData)
Finalize channel.
virtual bool HandShakeDone(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)
Get signature for given message.
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)
Notify the transport about a message having been sent.
virtual XRootDStatus HandShake(HandShakeData *handShakeData, AnyObject &channelData)
HandShake.
virtual XRootDStatus GetMore(Message &message, Socket *socket)
static void GenerateDescription(char *msg, std::ostringstream &o)
Get the description of a message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarchalStatusMore(Message &msg)
Unmarshall the correction-segment of the status response for pgwrite.
static void LogErrorResponse(const Message &msg)
Log server error response.
virtual void DecFileInstCnt(AnyObject &channelData)
Decrement file object instance count bound to this channel.
virtual PathID Multiplex(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual void InitializeChannel(const URL &url, AnyObject &channelData)
Initialize channel.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)
Query the channel.
static void UnMarshallHeader(Message &msg)
Unmarshall the header incoming message.
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
friend struct PluginUnloadHandler
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)
Get bind preference for the next data stream.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)
virtual bool NeedEncryption(HandShakeData *handShakeData, AnyObject &channelData)
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)
static char * MyHostName(const char *eName="*unknown*", const char **eText=0)
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static uint32_t Calc32C(const void *data, size_t count, uint32_t prevcs=0)
static int UserName(uid_t uID, char *uName, int uNsz)
virtual int Secure(SecurityRequest *&newreq, ClientRequest &thereq, const char *thedata)
const uint16_t errQueryNotSupported
const int DefaultLoadBalancerTTL
const uint64_t XRootDTransportMsg
const uint16_t errTlsError
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errLoginFailed
const int DefaultWantTlsOnNoPgrw
const uint16_t errSocketTimeout
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const int DefaultSubStreamsPerChannel
const uint16_t errInvalidOp
const int DefaultDataServerTTL
const uint16_t errHandShakeFailed
const int DefaultStreamTimeout
const uint16_t suAlreadyDone
const uint16_t errNotSupported
const uint16_t suContinue
const int DefaultTlsNoData
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
struct ServerResponseBifs_Protocol bifReqs
struct ServerResponseReqs_Protocol secReqs
BindPrefSelector(std::vector< std::string > &&bindprefs)
const std::string & Get()
Data structure that carries the handshake information.
std::string streamName
Name of the stream.
uint16_t subStreamId
Sub-stream id.
Message * out
Message to be sent out.
static void UnloadHandler(const std::string &trProt)
void Register(const std::string &protocol)
static void UnloadHandler()
std::set< std::string > protocols
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
Selects less loaded stream for read operation over multiple streams.
void AdjustQueues(uint16_t size)
StreamSelector(uint16_t size)
void MsgReceived(uint16_t substrm)
uint16_t Select(const std::vector< bool > &connected)
static const uint16_t Name
Transport name, returns const char *.
static const uint16_t Auth
Transport name, returns std::string *.
Information holder for xrootd channels.
std::vector< XRootDStreamInfo > StreamInfoVector
std::set< uint16_t > sentCloses
std::unique_ptr< StreamSelector > strmSelector
XrdSecParameters * authParams
XrdSecProtocol * authProtocol
XrdSecProtect * protection
std::unique_ptr< BindPrefSelector > bindSelector
std::string authProtocolName
std::atomic< uint32_t > finstcnt
unsigned int protRespSize
ServerResponseBody_Protocol * protRespBody
XRootDChannelInfo(const URL &url)
std::set< uint16_t > sentOpens
std::shared_ptr< SIDManager > sidManager
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version
static const uint16_t IsEncrypted
returns true if the channel is encrypted
Information holder for XRootDStreams.
char * buffer
Pointer to the buffer.
int size
Size of the buffer or length of data in the buffer.