XRootD
XrdClAsyncPageReader.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Michal Simon <michal.simon@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
20 #define SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_
21 
23 #include "XrdCl/XrdClSocket.hh"
25 #include "XrdSys/XrdSysPageSize.hh"
26 
27 #include <sys/uio.h>
28 #include <memory>
29 #include <arpa/inet.h>
30 
31 namespace XrdCl
32 {
33 
34 //------------------------------------------------------------------------------
36 //------------------------------------------------------------------------------
38 {
39  public:
40 
41  //--------------------------------------------------------------------------
46  //--------------------------------------------------------------------------
48  std::vector<uint32_t> &digests ) :
49  chunks( chunks ),
50  digests( digests ),
51  dlen( 0 ),
52  rspoff( 0 ),
53  chindex( 0 ),
54  choff( 0 ),
55  dgindex( 0 ),
56  dgoff( 0 ),
57  iovcnt( 0 ),
58  iovindex( 0 )
59  {
60  uint64_t rdoff = chunks.front().offset;
61  uint32_t rdlen = 0;
62  for( auto &ch : chunks )
63  rdlen += ch.length;
64  int fpglen, lpglen;
65  int pgcnt = XrdOucPgrwUtils::csNum( rdoff, rdlen, fpglen, lpglen);
66  digests.resize( pgcnt );
67  }
68 
69  //--------------------------------------------------------------------------
71  //--------------------------------------------------------------------------
72  virtual ~AsyncPageReader()
73  {
74  }
75 
76  //--------------------------------------------------------------------------
78  //--------------------------------------------------------------------------
79  void SetRsp( ServerResponseV2 *rsp )
80  {
81  dlen = rsp->status.bdy.dlen;
82  rspoff = rsp->info.pgread.offset;
83 
84  uint64_t bufoff = rspoff - chunks[0].offset;
85  chindex = 0;
86 
87  for( chindex = 0; chindex < chunks.size(); ++chindex )
88  {
89  if( chunks[chindex].length < bufoff )
90  {
91  bufoff -= chunks[chindex].length;
92  continue;
93  }
94  break;
95  }
96  choff = bufoff;
97  dgindex = rspoff/XrdSys::PageSize - chunks[0].offset/XrdSys::PageSize;
98  }
99 
100  //--------------------------------------------------------------------------
105  //--------------------------------------------------------------------------
106  XRootDStatus Read( Socket &socket, uint32_t &btsread )
107  {
108  if( dlen == 0 || chindex >= chunks.size() )
109  return XRootDStatus();
110  btsread = 0;
111  int nbbts = 0;
112  do
113  {
114  // Prepare the IO vector for receiving the data
115  if( iov.empty() )
116  InitIOV();
117  // read the data into the buffer
118  nbbts = 0;
119  auto st = socket.ReadV( iov.data() + iovindex, iovcnt, nbbts );
120  if( !st.IsOK() )
121  return st;
122  btsread += nbbts;
123  dlen -= nbbts;
124  ShiftIOV( nbbts );
125  if( st.code == suRetry )
126  return st;
127  }
128  while( nbbts > 0 && dlen > 0 && chindex < chunks.size() );
129 
130  return XRootDStatus();
131  }
132 
133  private:
134 
135  //--------------------------------------------------------------------------
137  //--------------------------------------------------------------------------
138  inline static int max_iovcnt()
139  {
140  // make sure it is an even number
141  static const int iovmax = XrdSys::getIovMax() & ~1;
142  return iovmax;
143  }
144 
145  //--------------------------------------------------------------------------
147  //--------------------------------------------------------------------------
148  inline void addiov( char *&buf, size_t len )
149  {
150  iov.emplace_back();
151  iov.back().iov_base = buf;
152  iov.back().iov_len = len;
153  buf += len;
154  ++iovcnt;
155  }
156 
157  //--------------------------------------------------------------------------
159  //--------------------------------------------------------------------------
160  inline void addiov( char *&buf, uint32_t len, uint32_t &dleft )
161  {
162  if( len > dleft ) len = dleft;
163  addiov( buf, len );
164  dleft -= len;
165  }
166 
167  //--------------------------------------------------------------------------
170  //--------------------------------------------------------------------------
171  inline static uint32_t CalcIOVSize( uint32_t dleft )
172  {
173  uint32_t ret = ( dleft / PageWithDigest + 2 ) * 2;
174  return ( ret > uint32_t( max_iovcnt() ) ? max_iovcnt() : ret );
175  }
176 
177  //--------------------------------------------------------------------------
179  //--------------------------------------------------------------------------
180  uint32_t CalcRdSize()
181  {
182  // data size in the server response (including digests)
183  uint32_t dleft = dlen;
184  // space in our page buffer
185  uint32_t pgspace = chunks[chindex].length - choff;
186  // space in our digest buffer
187  uint32_t dgspace = sizeof( uint32_t ) * (digests.size() - dgindex ) - dgoff;
188  if( dleft > pgspace + dgspace )
189  dleft = pgspace + dgspace;
190  return dleft;
191  }
192 
193  //--------------------------------------------------------------------------
195  //--------------------------------------------------------------------------
196  void InitIOV()
197  {
198  iovindex = 0;
199  // figure out the number of data we can read in one go
200  uint32_t dleft = CalcRdSize();
201  // and reset the I/O vector
202  iov.clear();
203  iovcnt = 0;
204  iov.reserve( CalcIOVSize( dleft ) );
205  // now prepare the page and digest buffers
206  ChunkInfo ch = chunks[chindex];
207  char* pgbuf = static_cast<char*>( ch.buffer ) + choff;
208  uint64_t rdoff = ch.offset + choff;
209  char* dgbuf = reinterpret_cast<char*>( digests.data() + dgindex ) + dgoff;
210  // handle the first digest
211  uint32_t fdglen = sizeof( uint32_t ) - dgoff;
212  addiov( dgbuf, fdglen, dleft );
213  if( dleft == 0 || iovcnt >= max_iovcnt() )
214  return;
215  // handle the first page
216  uint32_t fpglen = XrdSys::PageSize - rdoff % XrdSys::PageSize;
217  addiov( pgbuf, fpglen, dleft );
218  if( dleft == 0 || iovcnt >= max_iovcnt() )
219  return;
220  // handle all the subsequent aligned pages
221  size_t fullpgs = dleft / PageWithDigest;
222  for( size_t i = 0; i < fullpgs; ++i )
223  {
224  addiov( dgbuf, sizeof( uint32_t ), dleft );
225  if( dleft == 0 || iovcnt >= max_iovcnt() )
226  return;
227  addiov( pgbuf, XrdSys::PageSize, dleft );
228  if( dleft == 0 || iovcnt >= max_iovcnt() )
229  return;
230  }
231  // handle the last digest
232  uint32_t ldglen = sizeof( uint32_t );
233  addiov( dgbuf, ldglen, dleft );
234  if( dleft == 0 || iovcnt >= max_iovcnt() )
235  return;
236  // handle the last page
237  addiov( pgbuf, dleft, dleft );
238  }
239 
240  //--------------------------------------------------------------------------
242  //--------------------------------------------------------------------------
243  inline void shift( void *&buffer, size_t nbbts )
244  {
245  char *buf = static_cast<char*>( buffer );
246  buf += nbbts;
247  buffer = buf;
248  }
249 
250  //--------------------------------------------------------------------------
254  //--------------------------------------------------------------------------
255  inline void shiftdgbuf( uint32_t &btsread )
256  {
257  if( iov[iovindex].iov_len > btsread )
258  {
259  iov[iovindex].iov_len -= btsread;
260  shift( iov[iovindex].iov_base, btsread );
261  dgoff += btsread;
262  btsread = 0;
263  return;
264  }
265 
266  btsread -= iov[iovindex].iov_len;
267  iov[iovindex].iov_len = 0;
268  dgoff = 0;
269  digests[dgindex] = ntohl( digests[dgindex] );
270  ++dgindex;
271  ++iovindex;
272  --iovcnt;
273  }
274 
275  //--------------------------------------------------------------------------
279  //--------------------------------------------------------------------------
280  inline void shiftpgbuf( uint32_t &btsread )
281  {
282  if( iov[iovindex].iov_len > btsread )
283  {
284  iov[iovindex].iov_len -= btsread;
285  shift( iov[iovindex].iov_base, btsread );
286  choff += btsread;
287  btsread = 0;
288  return;
289  }
290 
291  btsread -= iov[iovindex].iov_len;
292  choff += iov[iovindex].iov_len;
293  iov[iovindex].iov_len = 0;
294  ++iovindex;
295  --iovcnt;
296  }
297 
298  //--------------------------------------------------------------------------
300  //--------------------------------------------------------------------------
301  void ShiftIOV( uint32_t btsread )
302  {
303  // if iovindex is even it point to digest, otherwise it points to a page
304  if( iovindex % 2 == 0 )
305  shiftdgbuf( btsread );
306  // adjust as many I/O buffers as necessary
307  while( btsread > 0 )
308  {
309  // handle page
310  shiftpgbuf( btsread );
311  if( btsread == 0 ) break;
312  // handle digest
313  shiftdgbuf( btsread );
314  }
315  // if we filled the buffer, move to the next one
316  if( iovcnt == 0 )
317  iov.clear();
318  // do we need to move to the next chunk?
319  if( choff >= chunks[chindex].length )
320  {
321  ++chindex;
322  choff = 0;
323  }
324  }
325 
326  ChunkList &chunks; //< list of data chunks to be filled with user data
327  std::vector<uint32_t> &digests; //< list of crc32c digests for every 4KB page of data
328  uint32_t dlen; //< size of the data in the message
329  uint64_t rspoff; //< response offset
330 
331  size_t chindex; //< index of the current data buffer
332  size_t choff; //< offset within the current buffer
333  size_t dgindex; //< index of the current digest buffer
334  size_t dgoff; //< offset within the current digest buffer
335 
336  std::vector<iovec> iov; //< I/O vector
337  int iovcnt; //< size of the I/O vector
338  size_t iovindex; //< index of the first valid element in the I/O vector
339 
340  static const int PageWithDigest = XrdSys::PageSize + sizeof( uint32_t );
341 };
342 
343 } /* namespace XrdEc */
344 
345 #endif /* SRC_XRDCL_XRDCLASYNCPAGEREADER_HH_ */
ServerResponseStatus status
Definition: XProtocol.hh:1310
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
union ServerResponseV2::@1 info
Object for reading out data from the PgRead response.
void SetRsp(ServerResponseV2 *rsp)
Sets message data size.
virtual ~AsyncPageReader()
Destructor.
AsyncPageReader(ChunkList &chunks, std::vector< uint32_t > &digests)
XRootDStatus Read(Socket &socket, uint32_t &btsread)
A network socket.
Definition: XrdClSocket.hh:43
XRootDStatus ReadV(iovec *iov, int iocnt, int &bytesRead)
Definition: XrdClSocket.cc:761
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
Definition: XrdClStatus.hh:40
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
int getIovMax()
char * data
Definition: XrdOucIOVec.hh:45