xorp

asyncio.hh

00001 // -*- c-basic-offset: 4; tab-width: 8; indent-tabs-mode: t -*-
00002 // vim:set sts=4 ts=8:
00003 
00004 // Copyright (c) 2001-2011 XORP, Inc and Others
00005 //
00006 // This program is free software; you can redistribute it and/or modify
00007 // it under the terms of the GNU Lesser General Public License, Version
00008 // 2.1, June 1999 as published by the Free Software Foundation.
00009 // Redistribution and/or modification of this program under the terms of
00010 // any other version of the GNU Lesser General Public License is not
00011 // permitted.
00012 // 
00013 // This program is distributed in the hope that it will be useful, but
00014 // WITHOUT ANY WARRANTY; without even the implied warranty of
00015 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. For more details,
00016 // see the GNU Lesser General Public License, Version 2.1, a copy of
00017 // which can be found in the XORP LICENSE.lgpl file.
00018 // 
00019 // XORP, Inc, 2953 Bunker Hill Lane, Suite 204, Santa Clara, CA 95054, USA;
00020 // http://xorp.net
00021 
00022 #ifndef __LIBXORP_ASYNCIO_HH__
00023 #define __LIBXORP_ASYNCIO_HH__
00024 
00025 #include "libxorp/xorp.h"
00026 #ifdef HAVE_FCNTL_H
00027 #include <fcntl.h>
00028 #endif
00029 #include "libxorp/xorpfd.hh"
00030 #include "libxorp/callback.hh"
00031 #include "libxorp/eventloop.hh"
00032 #include "libxorp/ipvx.hh"
00033 
00034 struct iovec;
00035 
00036 // Asynchronous file transfer classes.  These utilize XORP EventLoop
00037 // and the IoEvent framework to read / write files asynchronously.  The
00038 // user creates and AsyncFile{Reader,Writer} and adds a buffer for
00039 // reading / writing with add_buffer().  A callback is provided with
00040 // each buffer is called every time I/O happens on the buffer.
00041 //
00042 // Note that in case of AsyncFileWriter the user can use add_data() to
00043 // add the data to write/send, and that data will be stored/buffered
00044 // internally by AsyncFileWriter itself.
00045 //
00046 // Reading/Writing only begins when start() is called, and normally
00047 // continues until there are no buffers left.
00048 
00049 // ----------------------------------------------------------------------------
00050 // AsyncFileOperator - Abstract base class for asynchronous file operators.
00051 
00059 class AsyncFileOperator {
00060 public:
00061     enum Event {
00062     DATA = 1,       // I/O occured
00063     FLUSHING = 2,       // Buffer is being flushed
00064     OS_ERROR = 4,       // I/O Error has occurred, check error()
00065     END_OF_FILE = 8,    // End of file reached (applies to read only)
00066     WOULDBLOCK = 16     // I/O would block the current thread
00067     };
00068 
00083     typedef XorpCallback4<void, Event, const uint8_t*, size_t, size_t>::RefPtr Callback;
00084 public:
00088     virtual size_t  buffers_remaining() const = 0;
00089 
00093     virtual void    flush_buffers() = 0;
00094 
00100     virtual bool    start() = 0;
00101 
00105     virtual void    stop()  = 0;
00106 
00112     bool        resume()        { return start(); }
00113 
00117     bool        running() const     { return _running; }
00118 
00122     XorpFd      fd() const      { return _fd; }
00123 
00127     int         error() const       { return _last_error; }
00128 
00129     virtual string toString() const;
00130 
00131 protected:
00132     AsyncFileOperator(EventLoop& e, XorpFd fd, 
00133               int priority = XorpTask::PRIORITY_DEFAULT)
00134     : _eventloop(e), _fd(fd), _running(false),
00135       _last_error(0), _priority(priority)
00136     {
00137 #ifndef HOST_OS_WINDOWS
00138     int fl = fcntl(fd, F_GETFL);
00139     assert(fl & O_NONBLOCK);
00140 #endif
00141     }
00142     virtual ~AsyncFileOperator();
00143 
00144     EventLoop&      _eventloop;
00145     XorpFd      _fd;
00146     bool        _running;
00147     int         _last_error;
00148     int         _priority;
00149 };
00150 
00154 class AsyncFileReader : public AsyncFileOperator {
00155 public:
00160     AsyncFileReader(EventLoop& e, XorpFd fd,
00161             int priority = XorpTask::PRIORITY_DEFAULT);
00162     virtual ~AsyncFileReader();
00163 
00173     void add_buffer(uint8_t* buffer, size_t buffer_bytes, const Callback& cb);
00174 
00185     void add_buffer_with_offset(uint8_t* buffer, size_t buffer_bytes,
00186                 size_t offset, const Callback& cb);
00187 
00193     bool start();
00194 
00198     void stop();
00199 
00203     size_t buffers_remaining() const { return _buffers.size(); }
00204 
00208     void flush_buffers();
00209 
00210     virtual string toString() const;
00211 
00212 protected:
00213     class BufferInfo :
00214         public NONCOPYABLE
00215     {
00216     public:
00217     BufferInfo(uint8_t* b, size_t bb, Callback cb)
00218         : _buffer(b), _buffer_bytes(bb), _offset(0), _cb(cb) {}
00219     BufferInfo(uint8_t* b, size_t bb, size_t off, Callback cb)
00220         : _buffer(b), _buffer_bytes(bb), _offset(off), _cb(cb) {}
00221 
00222     void dispatch_callback(AsyncFileOperator::Event e) {
00223         _cb->dispatch(e, _buffer, _buffer_bytes, _offset);
00224     }
00225 
00226     uint8_t* buffer() { return (_buffer); }
00227     size_t buffer_bytes() const { return (_buffer_bytes); }
00228     size_t offset() const { return (_offset); }
00229     void incr_offset(size_t done) { _offset += done; }
00230 
00231     private:
00232     BufferInfo();               // Not directly constructible
00233 
00234     uint8_t*    _buffer;
00235     size_t      _buffer_bytes;
00236     size_t      _offset;
00237     Callback    _cb;
00238     };
00239 
00240     void read(XorpFd fd, IoEventType type);
00241     void complete_transfer(int err, ssize_t done);
00242 
00243     list<BufferInfo *> _buffers;
00244 
00245 #ifdef HOST_OS_WINDOWS
00246     void disconnect(XorpFd fd, IoEventType type);
00247 
00248     XorpTask        _deferred_io_task;
00249     bool        _disconnect_added;
00250 #endif
00251 };
00252 
00253 
00257 class AsyncFileWriter :
00258     public NONCOPYABLE,
00259     public AsyncFileOperator
00260 {
00261 public:
00268     AsyncFileWriter(EventLoop& e, XorpFd fd, uint32_t coalesce = 1,
00269             int priority = XorpTask::PRIORITY_DEFAULT);
00270 
00271     virtual ~AsyncFileWriter();
00272 
00282     void add_buffer(const uint8_t*  buffer,
00283             size_t      buffer_bytes,
00284             const Callback& cb);
00285 
00298     void add_buffer_sendto(const uint8_t*   buffer,
00299                size_t       buffer_bytes,
00300                const IPvX&      dst_addr,
00301                uint16_t     dst_port,
00302                const Callback&  cb);
00303 
00312     void add_buffer_with_offset(const uint8_t*  buffer,
00313                 size_t      buffer_bytes,
00314                 size_t      offset,
00315                 const Callback& cb);
00316 
00326     void add_data(const vector<uint8_t>&    data,
00327           const Callback&       cb);
00328 
00342     void add_data_sendto(const vector<uint8_t>& data,
00343              const IPvX&        dst_addr,
00344              uint16_t       dst_port,
00345              const Callback&    cb);
00346 
00352     bool start();
00353 
00357     void stop();
00358 
00362     size_t buffers_remaining() const { return _buffers.size(); }
00363 
00367     void flush_buffers();
00368 
00369     virtual string toString() const;
00370 
00371 private:
00372     AsyncFileWriter();          // Not directly constructible
00373 
00374 protected:
00375     class BufferInfo :
00376     public NONCOPYABLE
00377     {
00378     public:
00379     BufferInfo(const uint8_t* b, size_t bb, const Callback& cb)
00380         : _buffer(b), _buffer_bytes(bb), _offset(0), _dst_port(0),
00381           _cb(cb), _is_sendto(false) {}
00382     BufferInfo(const uint8_t* b, size_t bb, const IPvX& dst_addr,
00383            uint16_t dst_port, const Callback& cb)
00384         : _buffer(b), _buffer_bytes(bb), _offset(0), _dst_addr(dst_addr),
00385           _dst_port(dst_port), _cb(cb), _is_sendto(true) {}
00386     BufferInfo(const uint8_t* b, size_t bb, size_t off, const Callback& cb)
00387         : _buffer(b), _buffer_bytes(bb), _offset(off), _dst_port(0),
00388           _cb(cb), _is_sendto(false) {}
00389 
00390     BufferInfo(const vector<uint8_t>& data, const Callback& cb)
00391         : _data(data), _buffer(&_data[0]), _buffer_bytes(_data.size()),
00392           _offset(0), _dst_port(0), _cb(cb), _is_sendto(false) {}
00393     BufferInfo(const vector<uint8_t>& data, const IPvX& dst_addr,
00394            uint16_t dst_port, const Callback& cb)
00395         : _data(data), _buffer(&_data[0]), _buffer_bytes(_data.size()),
00396           _offset(0), _dst_addr(dst_addr), _dst_port(dst_port),
00397           _cb(cb), _is_sendto(true) {}
00398 
00399     void dispatch_callback(AsyncFileOperator::Event e) {
00400         _cb->dispatch(e, _buffer, _buffer_bytes, _offset);
00401     }
00402 
00403     const uint8_t* buffer() const { return (_buffer); }
00404     size_t buffer_bytes() const { return (_buffer_bytes); }
00405     size_t offset() const { return (_offset); }
00406     void incr_offset(size_t done) { _offset += done; }
00407     const IPvX& dst_addr() const { return (_dst_addr); }
00408     uint16_t dst_port() const { return (_dst_port); }
00409     bool is_sendto() const { return (_is_sendto); }
00410 
00411     private:
00412     BufferInfo();           // Not directly constructible
00413 
00414     const vector<uint8_t>   _data;      // Local copy of the data
00415     const uint8_t*      _buffer;
00416     size_t          _buffer_bytes;
00417     size_t          _offset;
00418     const IPvX      _dst_addr;
00419     const uint16_t      _dst_port;
00420     Callback        _cb;
00421     bool            _is_sendto;
00422     };
00423 
00424     void write(XorpFd, IoEventType);
00425     void complete_transfer(ssize_t done);
00426 
00427     uint32_t        _coalesce;
00428     struct iovec*   _iov;
00429     ref_ptr<int>    _dtoken;
00430     list<BufferInfo *>  _buffers;
00431 
00432 #ifdef HOST_OS_WINDOWS
00433     void disconnect(XorpFd fd, IoEventType type);
00434 
00435     XorpTask        _deferred_io_task;
00436 #endif
00437 };
00438 
00439 #endif // __LIBXORP_ASYNCIO_HH__
 All Classes Namespaces Functions Variables Typedefs Enumerations