原文
讨论如何在asio上扩展自定义的异步io操作.包括asio中经典的Context/Service插件机制,及经常打交道的几个Service,还有使用asio分发器的一些注意等.
Context/Service机制使用asio开发时,会创建两类对象:一类是exectioncontext,另一类是IOobject.比如以asio的tcpsocket为例:
asio::io_context ioc{};
asio::ip::tcp::socket socket{ ioc };
asio::io_context是执行环境的实现类,而asio::ip::tcp::socket则是IOobject.以上短短两行代码,用户就已通过IOobject对象,将tcp/ip的socket功能加载入io_context中.
也就是将socketservice,注册到了io_context中.注册的扳机是asio::detail::io_object_impl模板类.
io_object_implasio::ip::tcp::socket类型,抛开中间的各种实例化和继承的关系,它是asio::detail::io_object_impl实例化类型的外观.如下面代码所示(代码经过简化处理,方便展示概念,与asio有出入)
// 在`asio`名字空间
class socket
{
private:detail::io_object_impl,io_context::executor> impl_;public:socket(const io_context::executor& ex): impl_(0, ex){}
};
可见socket外观类,使用了reactive_socket_service(这里以linux平台为例),和io_context导出的executor,来实例化io_object_impl;而socket构造器,把关联io_context的executor传递给impl_成员.再来看看io_object_impl对应的构造器:
// 在`asio::detail`名字空间中
template
class io_object_impl
{
public:
//用来提供`I/O`操作服务的类型.typedef IoObjectService service_type;// `I/O`对象的底层实现类型.typedef typename service_type::实现类型 实现类型;// 与对象关联的执行器的类型.typedef Executor executor_type;// 用执行器构造`I/O`对象.explicit io_object_impl(int, const executor_type& ex): service_(&asio::use_service(io_object_impl::get_context(ex))),executor_(ex){service_->construct(implementation_);}private:// 与`I/O`对象关联的服务.service_type* service_;// `I/O`对象的底层实现.实现类型 implementation_;// 关联执行器executor_type executor_;
};
构造器非常明确的展示了io_object_impl通过Executor取执行环境对象,并通过use_service函数向执行环境中注册Service.
暂时跳过,实现类型相关内容.这里着重关注use_service函数,它是asio向其异步IO事件框架中,加载具体IO功能(这里是reactor_socket_service)的关键入口.
Context/Service的对象模型Context/Service大家也许很陌生,但换一个说法也许大家就很熟悉了.如COM中使用的Component/Interface组件对象模型,还有游戏引擎中常用的Object/Component模型.如下所示:
执行环境<-注册服务
-----先--------|
|0服务->1服务->2服务
use_service函数就是向执行环境中管理Service集合的ServiceRegistry类中,申请指定的Service对象.而service_registry则保证每一个Service只有一个,没有时则创建一个新的为原则,最终返回有效的Service引用.
当然,以上操作都是线安的.注意,ServiceRegistry使用了一个id来区分不同的Service,而id的生成则是通过CRTP配合std::type_info来生成的,如下代码所示:
//特殊的继承服务`ID`类型,使类仅头文件.
template
class service_id: public execution_context::id
{
};//使类仅头文件的特殊服务基类.
template
class execution_context_service_base: public execution_context::service
{
public:static service_id id;// 构造器.execution_context_service_base(execution_context& e): execution_context::service(e){}
};// 实现你私有服务
template
class rdma_core_service :public execution_context_service_base>
{// ....
};
注意CRTP中的传递的实现类千万不要写错了,这里是rdma_core_service.否则底层计算的ID会出错.如果ID是某一个已有的Service的ID,会直接返回那个Service给用户,这会非常危险.
虽然CRTP可在正确时,用户保证运行时保证类型安全,但是错误使用并不会有编译期报错,并把灾难抛给运行时.
2.如何扩展有效的Service到现在为止,应该知道扩展asio的关键是实现自己的Service,并实现自己的io_object_impl外观类.Service负责实现具体的同步或异步IO的操作,而外观类则提供封装好的接口,让用户方便地,正确地使用Service提供的能力.下面,展开讨论一下:
2.1导出implement_type回顾一下第一节贴出的io_object_impl代码段,其中有一个implementation_的成员.它的类型是io_object_impl通过Service中导出的实现类型而来.
它是IOObject真正的数据实现,并由各个Service来定义.好处是,可让每个平台来处理自己的实现细节.
比如Linux平台的reactor_socket_service使用的socket类型是fd,而窗口平台的iocp_socket_service则使用的是SOCKET,使用实现类型,可让io_object_impl不关心平台相关数据类型差异和实现细节.
如前面讨论的在asio上封装rdma-core功能的Service,在此
template
class rdma_core_service :public execution_context_service_base>
{
public:
//`cmID`的实现类型.struct 实现类型{// 本地`cm`表示.rdma_cm_id_data* cm_id_data_;// 内存区域状态rdma_mr_state mr_state_;//如果有附加到该`IO`对象的队列对bool has_qp_;};
};
2.2实现五规则io_object_impl会实现自己的五规则.如第一节io_object_impl中的构造器,转发给了Service的construct函数.此函数照例,需要实现如下:
template
class rdma_core_service :public execution_context_service_base>
{
public:// 构造新的实现inline void construct(实现类型& impl);// 破坏实现inline void destroy(实现类型& impl);// 移动构造器inline void move_construct(实现类型& impl, 实现类型& other_impl);// 移动赋值inline void move_assign(实现类型& impl,rdma_core_service& other_service, 实现类型& other_impl);
};
io_object_impl是Noncopyable,所以只需要实现移动构造器和移动赋值符.至于转换构造器,则根据用户自己的需求来决定.
2.3覆盖一些必要的函数Service中还有几个需要覆盖的函数,如下列代码所示:
template
class rdma_core_service :public execution_context_service_base>
{
public:/// 析构器ASIO_DECL virtual ~rdma_core_service()//析构服务拥有的所有用户定义的处理器对象ASIO_DECL virtual void 关机();//处理与`fork`相关的事件通知,以执行必要细节处理./***该函数不是纯虚的,所以服务只需要*必要时实现它.默认的实现闲着.*/ASIO_DECL virtual void notify_fork(execution_context::fork_event event);
};
虚析构器就不多说了,因为ServiceRegistry持有的是基类指针,清理Service时析构器得是虚的.关机则是给有状态的Service在整个asio发起关机时清理和校验自己扩展点.notify_fork只适合Linux平台,用来响应Fork系统调用的事件.
2.4实现IOObject外观类如第一节代码所示,用户并不直接调用Service的接口而是通过IOObject,来适用asio的功能.因此,还要实现对应的IOObject外观类.
为了通用,Service与IOObject外观是一对多的关系,如在linux平台上Socket和SocketAcceptor都是对应于reactive_socket_service.反之,因为IOObject会使用确定Service导出的实现类型,IOobject只能对应一个确定的Service.如一个rdma_connetion的实现:
//`RDMA`的`IO`对象,提供`RDMA`功能
template
class rdma_connection
{
public:// ....
#if ASIO_HAS_IOCPusing service_type = detail::rdma_iocp_connector_service;
#elseusing service_type = detail::rdma_core_service;
#endifprivate:detail::io_object_impl impl_;public: // 实现接口void open(port_space_type const& port_space);bool is_open() const{return impl_.get_service().is_open(impl_.get_implementation());}// .....
};
通过外观类调用Service的接口,可参考is_open成员方法的实现.这里为了方便,简单只用了一个同步的,实现简单的接口来展示.
至此,就可在asio上扩展"插件"了.可发现并没有侵入式的修改asio的代码,这都得益于Context/Service对象模型强大的可扩展性.
3.如何扩展异步操作asio是异步IO框架,如果需要支持自定义的异步IO操作,还需要额外工作.在工作开始之前,需要了解一些asio底层分发器的细节.
3.1asio中的调度器asio的实现中没有具体统一的调度器实现,原因是Linux平台上使用了Reactor模式,而窗口平台则是基于IOCP的Proactor模式.
两个模式有差异,但asio的选择是在Linux平台上实现一个调度器类,并与Reactor一同工作,提供与Proactor模式相同的接口.最后,asio对Service层提供了一套统一的调度器机制,如下:
林操io环境实现 |IOCP
调度器 完成队列 |IOCP
epoll反应器 |IOCP开始操作 后(立即)延迟操作 跑(一个)
可见,Reactor中额外实现了个内部管理完成队列的调度器类,来模拟与IOCP中等同的功能,并允许用户直接向该队列提交回调.
统一的调度器机制,也就是各个平台实现的context_impl类,并不能抹平Reactor与Proactor所有的差异.对Proactor而言,调用IO函数就直接发起了异步IO操作(窗口平台中WSA系列的函数).
而Reactor依旧是同步IO,因此Reactor额外提供了start_op接口.该接口将根据IO的类型,注册IO事件给Reactor,并同步地在调度器收到IO事件后,选择适当时机执行IO操作.
分发过程和细节,asio封装的Reactor与Proactor也有所不同.基于IOCP的完成队列中,只有IO完成事件回调和用户主动提交入队的回调;而基于Reactor的完成队列中,除了这两个回调以外,还有从Reactor中通知的同步IO事件.
对asio中Reactor与Proactor的机制有所了解之后,便可继续在不同平台上适配异步操作的工作了.
3.2扩展Reactor模式的异步操作首先是封装Reactor模式下的异步操作.在Reactor模式下,asio有同步调用IO函数的机制.对asio源码稍作阅读后,不难发现这些秘密都藏在asio::detail::reactor_op类中.
则只要继承该类,并覆盖两个关键函数:一个是同步执行IO操作函数,另一个是IO完成函数,就可实现自定义异步操作.当然,在Reactor模式下的异步操作,是模拟异步操作.
// reactor_op 实现
class reactor_op : public operation
{ /******/ };class your_reactor_async_op : public reactor_op
{
public:your_reactor_async_op(asio::error_code const& success_ec, ...): reactor_op(success_ec, do_perform, do_complete)//, 其他初化{}static status do_perform(reactor_op* base){auto* this_op = static_cast(base);// TODO ... 调用你的同步`IO`}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... `io`完成逻辑}
};
reactor_op类从操作类继承,该类也是调度器中完成队列链表的节点类,因此可把要模拟的某个异步IO操作的处理(perform)和完成(complete)业务都实现在一起.
3.3扩展IOCPProactor模式的异步操作扩展IOCPProactor的操作就比Reactor模式下简单许多.操作类从OVERLAPPED结构继承,可直接把op操作传递给异步IO函数的LPOVERLAPPED指针参数.因此只需要考虑IOcomplete回调即可.
class your_iocp_async_op : public operation
{
public:your_iocp_async_op(...): operation(do_complete)//, 其他初化{}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... 干`io`完成逻辑}
};
3.4独立于调度器之外的完成队列有些IO库或标准中的IO完成事件,并不能统一到asio的标准统一调度器中.如rdma中的完成队列(CQ).该设计也是合理的,毕竟使用统一的分发,并不能满足对低延迟极度敏感的场景.
适配类似rdma中的CQ,一个简单可行的做法,可参考在asio上试RDMA中的内容.除此外,既然rdma提供了CQ的概念和PollCQ的接口,说明用户一定有自己主动使用它们的场景.于是笔者试提供了CQ对象,让用户可在任意线程主动PollCQ.
asio::io_context ioc{};
asio::rdma::completion_queue cq{ ioc };std::thread thread {[&] { cq.run(); }
}
该场景下,io_context中的统一分发CQ的Service停止会工作,并把PollCQ的权限交给用户.
3.5io_object_impl通过初化器转发给Service有了适配各个平台的异步操作定义之后,剩下的工作就是为io_object_impl与Service实现异步接口了.io_object_impl外观类的实现尽量统一,但是每个平台的操作类型实现大概率不一样,asio在外观类层通过初化器机制,把实现上差异转发给了各个平台的Service,以rdma_connect的实现为例:
//`RDMA`的`IO`对象,提供`RDMA`功能
template
class rdma_connection
{
public:// .......class initiate_async_connect;// 异步连接template ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,void (asio::error_code))async_connect(endpoint_type const& endpoint,ASIO_MOVE_ARG(ConnectToken) tokenASIO_DEFAULT_COMPLETION_TOKEN(executor_type))ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate(declval(), token,declval()))){asio::error_code open_ec{};if(!is_open()){open_ec = asio::error::try_again;}#if ASIO_HAS_IOCPif(!open_ec){impl_.get_service().allocate_qp(impl_.get_implementation(), open_ec);}
#endifreturn async_initiate(initiate_async_connect(this), token, endpoint, open_ec);}// `async_connect_op`的初化器class initiate_async_connect{public:using executor_type = Executor;explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT: self_(self) {}template void operator() (ASIO_MOVE_ARG(Handler) handler,endpoint_type const& endpoint,asio::error_code const& open_ec) const{ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;if(open_ec){asio::post(self_->impl_.get_executor(),asio::detail::bind_handler(ASIO_MOVE_CAST(Handler)(handler), open_ec));}else{detail::non_const_lvalue handler2(handler);self_->impl_.get_service().async_connect(self_->impl_.get_implementation(), endpoint,handler2.value, self_->impl_.get_executor());}}private:rdma_connection* self_;};
};
初化器还有编译期检查,比如Handler的类型是否满足,Buffer是否满足ConstBufferSequence的约束等.在跟祁宇的一次讨论中,他还提到了初化器的设计还可让用户去特化async_result来实现自定义异步操作,是个很强大的用户扩展点.
3.6Service通过调度器发起IO操作io_object_impl通过初化器转发给Service之后,剩下的事情就很简单了.Service的异步操作接口主要负责创建对应的异步操作对象,初化操作对象,并通知调度器发起IO操作.
以rdma在Linux和窗口平台上的实现为例:
// `Linux`的`rdma-core`
template
template
void rdma_core_service::async_connect(实现类型& impl,endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{bool const is_continuation =asio_handler_cont_helpers::is_continuation(handler);// TODO ... 取消using op = rdma_cm_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(success_ec_, impl.cm_id_data_->cm_id_, get_context(), handler, io_ex);cm_event_service_.start_connect_op(impl.cm_id_data_, p.p, endpoint.data(), is_continuation);p.v = p.p = 0;
}// 窗口的iocp
template
template
void rdma_iocp_connector_service::async_connect(实现类型& impl, endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{// TODO ... 取消using op = rdma_nd_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);start_connect_op(impl, endpoint, p.p);p.v = p.p = 0;
}
注意,Reactor与Proactor模式的差异在此就显现出来了.Reactor在此发起的是个reactor_op操作,注册给连接管理器也就是这里的cm_event_service_对象,真正的connect函数调用是在连接管理器的事件Channel对应的fd有POLLIN消息时,才会调用.
而对IOCPProactor而言,start_connect_op会直接非阻塞地调用Connector的Connect函数,并在Connection创建成功后,通过IOCP的完成队列通知用户.
上一篇:QT常用位置函数区别