xrootd
XrdClAsyncMsgWriter.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
20 #define SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_
21 
22 #include "XrdCl/XrdClMessage.hh"
25 #include "XrdCl/XrdClSocket.hh"
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClStream.hh"
28 #include "XrdSys/XrdSysE2T.hh"
29 
30 #include <memory>
31 
32 namespace XrdCl
33 {
34  //----------------------------------------------------------------------------
36  //----------------------------------------------------------------------------
38  {
39  public:
40  //------------------------------------------------------------------------
48  //------------------------------------------------------------------------
50  Socket &socket,
51  const std::string &strmname,
52  Stream &strm,
53  uint16_t substrmnb,
56  socket( socket ),
57  strmname( strmname ),
58  strm( strm ),
60  chdata( chdata ),
61  outmsg( nullptr ),
62  outmsgsize( 0 ),
63  outhandler( nullptr )
64  {
65  }
66 
67  //------------------------------------------------------------------------
69  //------------------------------------------------------------------------
70  inline void Reset()
71  {
73  outmsg = nullptr;
74  outmsgsize = 0;;
75  outhandler = nullptr;
76  outsign.reset();
77  }
78 
79  //------------------------------------------------------------------------
81  //------------------------------------------------------------------------
83  {
84  Log *log = DefaultEnv::GetLog();
85  while( true )
86  {
87  switch( writestage )
88  {
89  //------------------------------------------------------------------
90  // Pick up a message if we're not in process of writing something
91  //------------------------------------------------------------------
92  case WriteStart:
93  {
94  std::pair<Message *, MsgHandler *> toBeSent;
95  toBeSent = strm.OnReadyToWrite( substrmnb );
96  outmsg = toBeSent.first;
97  outhandler = toBeSent.second;
98  if( !outmsg ) return XRootDStatus( stOK, suAlreadyDone );
99 
100  outmsg->SetCursor( 0 );
102 
103  //----------------------------------------------------------------
104  // Secure the message if necessary
105  //----------------------------------------------------------------
106  Message *signature = nullptr;
107  XRootDStatus st = xrdTransport.GetSignature( outmsg, signature, chdata );
108  if( !st.IsOK() ) return st;
109  outsign.reset( signature );
110 
111  if( outsign )
112  outmsgsize += outsign->GetSize();
113 
114  //----------------------------------------------------------------
115  // The next step is to write the signature
116  //----------------------------------------------------------------
118  continue;
119  }
120  //------------------------------------------------------------------
121  // First write the signature (if there is one)
122  //------------------------------------------------------------------
123  case WriteSign:
124  {
125  //----------------------------------------------------------------
126  // If there is a signature for the request send it over the socket
127  //----------------------------------------------------------------
128  if( outsign )
129  {
131  if( !st.IsOK() || st.code == suRetry ) return st;
132  }
133  //----------------------------------------------------------------
134  // The next step is to write the signature
135  //----------------------------------------------------------------
137  continue;
138  }
139  //------------------------------------------------------------------
140  // Then write the request itself
141  //------------------------------------------------------------------
142  case WriteRequest:
143  {
145  if( !st.IsOK() || st.code == suRetry ) return st;
146  //----------------------------------------------------------------
147  // The next step is to write the signature
148  //----------------------------------------------------------------
150  continue;
151  }
152  //------------------------------------------------------------------
153  // And then write the raw data (if any)
154  //------------------------------------------------------------------
155  case WriteRawData:
156  {
157  if( outhandler->IsRaw() )
158  {
159  uint32_t wrtcnt = 0;
161  if( !st.IsOK() || st.code == suRetry ) return st;
162  outmsgsize += wrtcnt;
163  log->Dump( AsyncSockMsg, "[%s] Wrote %d bytes of raw data of message"
164  "(0x%x) body.", strmname.c_str(), wrtcnt, outmsg );
165  }
166  //----------------------------------------------------------------
167  // The next step is to finalize the write operation
168  //----------------------------------------------------------------
170  continue;
171  }
172  //------------------------------------------------------------------
173  // Finally, finalize the write operation
174  //------------------------------------------------------------------
175  case WriteDone:
176  {
177  XRootDStatus st = socket.Flash();
178  if( !st.IsOK() )
179  {
180  log->Error( AsyncSockMsg, "[%s] Unable to flash the socket: %s",
181  strmname.c_str(), XrdSysE2T( st.errNo ) );
182  return st;
183  }
184 
185  log->Dump( AsyncSockMsg, "[%s] Successfully sent message: %s (0x%x).",
186  strmname.c_str(), outmsg->GetDescription().c_str(), outmsg );
187 
189  return XRootDStatus();
190  }
191  }
192  // just in case ...
193  break;
194  }
195  //----------------------------------------------------------------------
196  // We are done
197  //----------------------------------------------------------------------
198  return XRootDStatus();
199  }
200 
201  private:
202 
203  //------------------------------------------------------------------------
205  //------------------------------------------------------------------------
206  enum Stage
207  {
208  WriteStart, //< the next step is to initialize the read
209  WriteSign, //< the next step is to write the signature
210  WriteRequest, //< the next step is to write the request
211  WriteRawData, //< the next step is to write the raw data
212  WriteDone //< the next step is to finalize the write
213  };
214 
215  //------------------------------------------------------------------------
216  // Current read stage
217  //------------------------------------------------------------------------
219 
220  //------------------------------------------------------------------------
221  // The context of the read operation
222  //------------------------------------------------------------------------
225  const std::string &strmname;
227  uint16_t substrmnb;
229 
230  //------------------------------------------------------------------------
231  // The internal state of the the reader
232  //------------------------------------------------------------------------
233  Message *outmsg; //< we don't own the message
234  uint32_t outmsgsize;
236  std::unique_ptr<Message> outsign;
237  };
238 
239 }
240 
241 #endif /* SRC_XRDCL_XRDCLASYNCMSGWRITER_HH_ */
Definition: XrdClAnyObject.hh:32
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
void SetCursor(uint32_t cursor)
Set the cursor.
Definition: XrdClBuffer.hh:148
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
XRootDStatus Write()
Write the request into the socket.
Definition: XrdClAsyncMsgWriter.hh:82
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
const uint64_t AsyncSockMsg
Definition: XrdClConstants.hh:41
Socket & socket
Definition: XrdClAsyncMsgWriter.hh:224
const std::string & GetDescription() const
Get the description of the message.
Definition: XrdClMessage.hh:95
Message handler.
Definition: XrdClPostMasterInterfaces.hh:50
uint32_t outmsgsize
Definition: XrdClAsyncMsgWriter.hh:234
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:146
bool IsOK() const
We&#39;re fine.
Definition: XrdClStatus.hh:123
virtual XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
Definition: XrdClPostMasterInterfaces.hh:196
Definition: XrdClAsyncMsgWriter.hh:211
Stream & strm
Definition: XrdClAsyncMsgWriter.hh:226
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
XRootDStatus Flash()
static Log * GetLog()
Get default log.
Stage
Stages of reading out a response from the socket.
Definition: XrdClAsyncMsgWriter.hh:206
Definition: XrdClAsyncMsgWriter.hh:210
virtual bool IsRaw() const
Definition: XrdClPostMasterInterfaces.hh:184
virtual Status GetSignature(Message *toSign, Message *&sign, AnyObject &channelData)=0
Get signature for given message.
const uint16_t suAlreadyDone
Definition: XrdClStatus.hh:42
Perform the handshake and the authentication for each physical stream.
Definition: XrdClPostMasterInterfaces.hh:289
const std::string & strmname
Definition: XrdClAsyncMsgWriter.hh:225
Request status.
Definition: XrdClXRootDResponses.hh:218
Definition: XrdClAnyObject.hh:25
AnyObject & chdata
Definition: XrdClAsyncMsgWriter.hh:228
std::unique_ptr< Message > outsign
Definition: XrdClAsyncMsgWriter.hh:236
Definition: XrdClAsyncMsgWriter.hh:212
TransportHandler & xrdTransport
Definition: XrdClAsyncMsgWriter.hh:223
const char * XrdSysE2T(int errcode)
void Error(uint64_t topic, const char *format,...)
Report an error.
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:147
Utility class encapsulating writing request logic.
Definition: XrdClAsyncMsgWriter.hh:37
uint16_t substrmnb
Definition: XrdClAsyncMsgWriter.hh:227
MsgHandler * outhandler
Definition: XrdClAsyncMsgWriter.hh:235
const uint16_t suRetry
Definition: XrdClStatus.hh:40
Stream.
Definition: XrdClStream.hh:49
Message * outmsg
Definition: XrdClAsyncMsgWriter.hh:233
Stage writestage
Definition: XrdClAsyncMsgWriter.hh:218
void Reset()
Reset the state of the object (makes it ready to read out next msg)
Definition: XrdClAsyncMsgWriter.hh:70
Definition: XrdClAsyncMsgWriter.hh:209
AsyncMsgWriter(TransportHandler &xrdTransport, Socket &socket, const std::string &strmname, Stream &strm, uint16_t substrmnb, AnyObject &chdata)
Definition: XrdClAsyncMsgWriter.hh:49
A network socket.
Definition: XrdClSocket.hh:42
Definition: XrdClAsyncMsgWriter.hh:208
Handle diagnostics.
Definition: XrdClLog.hh:100
uint32_t GetSize() const
Get the size of the message.
Definition: XrdClBuffer.hh:132
XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)