19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
67 assert(
mlist.empty() );
70 assert(
hasfn ==
false );
91 void Lock( uint16_t subStream,
bool &isclosing );
99 void Lock(
const std::function<
void()> &func,
bool &isclosing );
113 std::function<void()>
fn;
119 std::map<pthread_t, std::list<MtxInfo>::iterator>
mthmap;
138 bool &isclosing ):
mtx( &sm )
140 mtx->Lock( idx, isclosing );
141 if( isclosing )
mtx =
nullptr;
145 bool &isclosing ):
mtx( &sm )
147 mtx->Lock( func, isclosing );
148 if( isclosing )
mtx =
nullptr;
211 pTransport = transport;
227 pIncomingQueue = incomingQueue;
243 pChannelData = channelData;
251 pTaskManager = taskManager;
259 pJobManager = jobManager;
279 void Tick( time_t now );
311 std::shared_ptr<Message> msg,
312 uint32_t bytesReceived );
317 std::pair<Message *, MsgHandler *>
325 uint32_t bytesSent );
346 const uint64_t sess );
394 pOnDataConnJob = onConnJob;
416 return pChannel.lock();
424 static bool IsPartial(
Message &msg );
429 inline static bool HasNetAddr(
const XrdNetAddr &addr,
430 std::vector<XrdNetAddr> &addresses )
432 auto itr = addresses.begin();
433 for( ; itr != addresses.end() ; ++itr )
435 if( itr->Same( &addr ) )
return true;
445 void Reinsert( uint16_t subStream );
450 class HandleIncMsgJob:
public Job
453 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
454 virtual ~HandleIncMsgJob() {};
455 virtual void Run(
void* )
461 MsgHandler *pHandler;
467 void OnFatalError( uint16_t subStream,
469 StreamMutexHelper &lock );
474 void MonitorDisconnection( XRootDStatus status );
479 XRootDStatus RequestClose( Message &resp );
484 void SockHandlerClose( uint16_t subStream );
487 typedef std::vector<SubStreamData*> SubStreamList;
494 std::string pStreamName;
495 TransportHandler *pTransport;
497 TaskManager *pTaskManager;
498 JobManager *pJobManager;
500 InQueue *pIncomingQueue;
501 AnyObject *pChannelData;
502 uint32_t pLastStreamError;
503 XRootDStatus pLastFatalError;
504 uint16_t pStreamErrorWindow;
505 uint16_t pConnectionCount;
506 uint16_t pConnectionRetry;
507 time_t pConnectionInitTime;
508 uint16_t pConnectionWindow;
509 SubStreamList pSubStreams;
510 std::vector<XrdNetAddr> pAddresses;
512 ChannelHandlerList pChannelEvHandlers;
518 timeval pConnectionStarted;
519 timeval pConnectionDone;
520 std::atomic<uint64_t> pBytesSent;
521 std::atomic<uint64_t> pBytesReceived;
526 std::shared_ptr<Job> pOnDataConnJob;
536 std::weak_ptr<Channel> pChannel;
#define XRD_WARN_UNUSED_RESULT
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
A synchronize queue for incoming data.
The message representation used throughout the system.
Interface for socket pollers.
StreamMutexHelper(StreamMutex &sm, const std::function< void()> &func, bool &isclosing)
StreamMutexHelper(StreamMutex &sm)
StreamMutexHelper(StreamMutex &sm, uint16_t idx, bool &isclosing)
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
std::list< MtxInfo >::iterator fnlistit
void Lock()
Lock. Regular, non-subStream aware recursive lock.
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
std::map< uint16_t, size_t > mclosing
std::list< MtxInfo > mlist
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void SetTransport(TransportHandler *transport)
Set the transport.
StreamStatus
Status of the stream.
@ Disconnected
Not connected.
@ Connecting
In the process of being connected.
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
void SetChannel(std::weak_ptr< Channel > &channel)
Sets a weak_ptr of our owning Channel.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
void ForceConnect()
Force connection.
void SetTaskManager(TaskManager *taskManager)
Set task manager.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void SetJobManager(JobManager *jobManager)
Set job manager.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
std::shared_ptr< Channel > GetChannel()
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void SetChannelData(AnyObject *channelData)
Set the channel data.
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Perform the handshake and the authentication for each physical stream.
Procedure execution status.
std::function< void()> fn
MtxInfo(const std::function< void()> &func)