原文
讨论如何在asio
上扩展自定义的异步io
操作.包括asio
中经典的Context/Service
插件机制,及经常打交道的几个Service
,还有使用asio
分发器的一些注意等.
Context/Service
机制使用asio
开发时,会创建两类对象:一类是exectioncontext
,另一类是IOobject
.比如以asio
的tcpsocket
为例:
asio::io_context ioc{};
asio::ip::tcp::socket socket{ ioc };
asio::io_context
是执行环境
的实现类,而asio::ip::tcp::socket
则是IOobject
.以上短短两行代码,用户就已通过IOobject
对象,将tcp/ip
的socket
功能加载入io_context
中.
也就是将socketservice
,注册到了io_context
中.注册的扳机是asio::detail::io_object_impl
模板类.
io_object_impl
asio::ip::tcp::socket
类型,抛开中间的各种实例化和继承的关系,它是asio::detail::io_object_impl
实例化类型的外观.如下面代码所示(代码经过简化处理,方便展示概念,与asio
有出入)
// 在`asio`名字空间
class socket
{
private:detail::io_object_impl,io_context::executor> impl_;public:socket(const io_context::executor& ex): impl_(0, ex){}
};
可见socket
外观类,使用了reactive_socket_service
(这里以linux
平台为例),和io_context
导出的executor
,来实例化io_object_impl
;而socket
构造器,把关联io_context
的executor
传递给impl_
成员.再来看看io_object_impl
对应的构造器:
// 在`asio::detail`名字空间中
template
class io_object_impl
{
public:
//用来提供`I/O`操作服务的类型.typedef IoObjectService service_type;// `I/O`对象的底层实现类型.typedef typename service_type::实现类型 实现类型;// 与对象关联的执行器的类型.typedef Executor executor_type;// 用执行器构造`I/O`对象.explicit io_object_impl(int, const executor_type& ex): service_(&asio::use_service(io_object_impl::get_context(ex))),executor_(ex){service_->construct(implementation_);}private:// 与`I/O`对象关联的服务.service_type* service_;// `I/O`对象的底层实现.实现类型 implementation_;// 关联执行器executor_type executor_;
};
构造器非常明确的展示了io_object_impl
通过Executor
取执行环境
对象,并通过use_service
函数向执行环境
中注册Service
.
暂时跳过,实现类型
相关内容.这里着重关注use_service
函数,它是asio
向其异步IO
事件框架中,加载具体IO
功能(这里是reactor_socket_service
)的关键入口.
Context/Service
的对象模型Context/Service
大家也许很陌生,但换一个说法也许大家就很熟悉了.如COM
中使用的Component/Interface
组件对象模型,还有游戏引擎中常用的Object/Component
模型.如下所示:
执行环境<-注册服务
-----先--------|
|0服务->1服务->2服务
use_service
函数就是向执行环境
中管理Service
集合的ServiceRegistry
类中,申请指定的Service
对象.而service_registry
则保证每一个Service
只有一个,没有时则创建一个新的
为原则,最终返回有效的Service
引用.
当然,以上操作都是线安的.注意,ServiceRegistry
使用了一个id
来区分不同的Service
,而id
的生成则是通过CRTP
配合std::type_info
来生成的,如下代码所示:
//特殊的继承服务`ID`类型,使类仅头文件.
template
class service_id: public execution_context::id
{
};//使类仅头文件的特殊服务基类.
template
class execution_context_service_base: public execution_context::service
{
public:static service_id id;// 构造器.execution_context_service_base(execution_context& e): execution_context::service(e){}
};// 实现你私有服务
template
class rdma_core_service :public execution_context_service_base>
{// ....
};
注意CRTP
中的传递的实现类
千万不要写错了,这里是rdma_core_service
.否则底层计算的ID
会出错.如果ID
是某一个已有的Service
的ID
,会直接返回那个Service
给用户,这会非常危险.
虽然CRTP
可在正确
时,用户保证
运行时保证
类型安全,但是错误
使用并不会有编译期报错,并把灾难抛给运行时.
2
.如何扩展有效的Service
到现在为止,应该知道扩展asio
的关键是实现自己的Service
,并实现自己的io_object_impl
外观类.Service
负责实现
具体的同步或异步IO
的操作,而外观类
则提供封装好的接口,让用户方便地,正确地
使用Service
提供的能力.下面,展开讨论一下:
2.1
导出implement_type
回顾一下第一节贴出的io_object_impl
代码段,其中有一个implementation_
的成员.它的类型是io_object_impl
通过Service
中导出的实现类型
而来.
它是IOObject
真正的数据实现,并由各个Service
来定义.好处是,可让每个
平台来处理自己的实现细节.
比如Linux
平台的reactor_socket_service
使用的socket
类型是fd
,而窗口
平台的iocp_socket_service
则使用的是SOCKET
,使用实现类型
,可让io_object_impl
不关心平台相关数据类型
差异和实现细节.
如前面讨论的在asio
上封装rdma-core
功能的Service
,在此
template
class rdma_core_service :public execution_context_service_base>
{
public:
//`cmID`的实现类型.struct 实现类型{// 本地`cm`表示.rdma_cm_id_data* cm_id_data_;// 内存区域状态rdma_mr_state mr_state_;//如果有附加到该`IO`对象的队列对bool has_qp_;};
};
2.2
实现五规则
io_object_impl
会实现自己的五规则
.如第一节io_object_impl
中的构造器,转发给了Service
的construct
函数.此函数照例,需要实现如下:
template
class rdma_core_service :public execution_context_service_base>
{
public:// 构造新的实现inline void construct(实现类型& impl);// 破坏实现inline void destroy(实现类型& impl);// 移动构造器inline void move_construct(实现类型& impl, 实现类型& other_impl);// 移动赋值inline void move_assign(实现类型& impl,rdma_core_service& other_service, 实现类型& other_impl);
};
io_object_impl
是Noncopyable
,所以只需要实现移动构造器
和移动赋值符
.至于转换构造器
,则根据用户自己的需求来决定.
2.3
覆盖一些必要的函数Service
中还有几个需要覆盖
的函数,如下列代码所示:
template
class rdma_core_service :public execution_context_service_base>
{
public:/// 析构器ASIO_DECL virtual ~rdma_core_service()//析构服务拥有的所有用户定义的处理器对象ASIO_DECL virtual void 关机();//处理与`fork`相关的事件通知,以执行必要细节处理./***该函数不是纯虚的,所以服务只需要*必要时实现它.默认的实现闲着.*/ASIO_DECL virtual void notify_fork(execution_context::fork_event event);
};
虚析构器
就不多说了,因为ServiceRegistry
持有的是基类
指针,清理Service
时析构器得是虚
的.关机
则是给有状态的Service
在整个asio
发起关机
时清理和校验
自己扩展点.notify_fork
只适合Linux
平台,用来响应Fork
系统调用的事件.
2.4
实现IOObject
外观类如第一节代码所示,用户并不直接调用Service
的接口而是通过IOObject
,来适用asio
的功能.因此,还要实现对应的IOObject
外观类.
为了通用,Service
与IOObject
外观是一对多的关系,如在linux
平台上Socket
和SocketAcceptor
都是对应于reactive_socket_service
.反之,因为IOObject
会使用确定Service
导出的实现类型
,IOobject
只能对应一个确定的Service
.如一个rdma_connetion
的实现:
//`RDMA`的`IO`对象,提供`RDMA`功能
template
class rdma_connection
{
public:// ....
#if ASIO_HAS_IOCPusing service_type = detail::rdma_iocp_connector_service;
#elseusing service_type = detail::rdma_core_service;
#endifprivate:detail::io_object_impl impl_;public: // 实现接口void open(port_space_type const& port_space);bool is_open() const{return impl_.get_service().is_open(impl_.get_implementation());}// .....
};
通过外观类调用Service
的接口,可参考is_open
成员方法的实现.这里为了方便,简单只用了一个同步的,实现简单
的接口来展示.
至此,就可在asio
上扩展"插件"了.可发现并没有侵入式的修改asio
的代码,这都得益于Context/Service
对象模型强大的可扩展性.
3
.如何扩展异步操作asio
是异步IO
框架,如果需要支持自定义的异步IO
操作,还需要额外工作.在工作开始之前,需要了解一些asio
底层分发器的细节.
3.1asio
中的调度器
asio
的实现中没有具体统一的调度器
实现,原因是Linux
平台上使用了Reactor
模式,而窗口
平台则是基于IOCP
的Proactor
模式.
两个模式有差异,但asio
的选择是在Linux
平台上实现一个调度器
类,并与Reactor
一同工作,提供与Proactor
模式相同的接口.最后,asio
对Service
层提供了一套统一的调度器
机制,如下:
林操io环境实现 |IOCP
调度器 完成队列 |IOCP
epoll反应器 |IOCP开始操作 后(立即)延迟操作 跑(一个)
可见,Reactor
中额外实现了个内部管理完成队列
的调度器
类,来模拟与IOCP
中等同的功能,并允许用户直接向该队列提交
回调.
统一的调度器
机制,也就是各个平台实现的context_impl
类,并不能抹平Reactor
与Proactor
所有的差异.对Proactor
而言,调用IO
函数就直接
发起了异步IO
操作(窗口
平台中WSA
系列的函数).
而Reactor
依旧是同步IO
,因此Reactor
额外提供了start_op
接口.该接口将根据IO
的类型,注册IO
事件给Reactor
,并同步地在调度器
收到IO
事件后,选择适当时机执行IO
操作.
分发过程和细节,asio
封装的Reactor
与Proactor
也有所不同.基于IOCP
的完成队列
中,只有IO
完成事件回调
和用户主动提交
入队的回调;而基于Reactor
的完成队列
中,除了这两个
回调以外,还有从Reactor
中通知的同步IO
事件.
对asio
中Reactor
与Proactor
的机制有所了解之后,便可继续在不同平台上适配异步操作的工作了.
3.2
扩展Reactor
模式的异步操作首先是封装Reactor
模式下的异步
操作.在Reactor
模式下,asio
有同步调用IO
函数的机制.对asio
源码稍作阅读后,不难发现这些秘密都藏在asio::detail::reactor_op
类中.
则只要继承
该类,并覆盖
两个关键函数:一个是同步
执行IO
操作函数,另一个是IO
完成函数,就可实现自定义异步
操作.当然,在Reactor
模式下的异步操作,是模拟
异步操作.
// reactor_op 实现
class reactor_op : public operation
{ /******/ };class your_reactor_async_op : public reactor_op
{
public:your_reactor_async_op(asio::error_code const& success_ec, ...): reactor_op(success_ec, do_perform, do_complete)//, 其他初化{}static status do_perform(reactor_op* base){auto* this_op = static_cast(base);// TODO ... 调用你的同步`IO`}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... `io`完成逻辑}
};
reactor_op
类从操作
类继承,该类也是调度器
中完成队列
链表的节点类,因此可把要模拟
的某个异步IO
操作的处理(perform)
和完成(complete)
业务都实现在一起.
3.3
扩展IOCPProactor
模式的异步操作扩展IOCPProactor
的操作就比Reactor
模式下简单许多.操作
类从OVERLAPPED
结构继承
,可直接把op
操作传递给异步IO
函数的LPOVERLAPPED
指针参数.因此只需要考虑IOcomplete
回调即可.
class your_iocp_async_op : public operation
{
public:your_iocp_async_op(...): operation(do_complete)//, 其他初化{}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... 干`io`完成逻辑}
};
3.4
独立于调度器
之外的完成队列
有些IO
库或标准中的IO
完成事件,并不能统一到asio
的标准统一调度器
中.如rdma
中的完成队列(CQ)
.该设计也是合理的,毕竟使用统一的分发,并不能满足对低延迟极度敏感的场景.
适配类似rdma
中的CQ
,一个简单可行的做法,可参考在asio
上试RDMA
中的内容.除此外,既然rdma
提供了CQ
的概念和PollCQ
的接口,说明用户一定有自己主动使用它们的场景.于是笔者试提供了CQ
对象,让用户可在任意线程主动PollCQ
.
asio::io_context ioc{};
asio::rdma::completion_queue cq{ ioc };std::thread thread {[&] { cq.run(); }
}
该场景下,io_context
中的统一分发CQ
的Service
停止会工作,并把PollCQ
的权限交给用户.
3.5io_object_impl
通过初化器
转发给Service
有了适配各个平台的异步
操作定义之后,剩下的工作就是为io_object_impl
与Service
实现异步
接口了.io_object_impl
外观类的实现尽量统一,但是每个平台的操作
类型实现大概率不一样,asio
在外观类层通过初化器
机制,把实现上差异
转发给了各个平台的Service
,以rdma_connect
的实现为例:
//`RDMA`的`IO`对象,提供`RDMA`功能
template
class rdma_connection
{
public:// .......class initiate_async_connect;// 异步连接template ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,void (asio::error_code))async_connect(endpoint_type const& endpoint,ASIO_MOVE_ARG(ConnectToken) tokenASIO_DEFAULT_COMPLETION_TOKEN(executor_type))ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate(declval(), token,declval()))){asio::error_code open_ec{};if(!is_open()){open_ec = asio::error::try_again;}#if ASIO_HAS_IOCPif(!open_ec){impl_.get_service().allocate_qp(impl_.get_implementation(), open_ec);}
#endifreturn async_initiate(initiate_async_connect(this), token, endpoint, open_ec);}// `async_connect_op`的初化器class initiate_async_connect{public:using executor_type = Executor;explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT: self_(self) {}template void operator() (ASIO_MOVE_ARG(Handler) handler,endpoint_type const& endpoint,asio::error_code const& open_ec) const{ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;if(open_ec){asio::post(self_->impl_.get_executor(),asio::detail::bind_handler(ASIO_MOVE_CAST(Handler)(handler), open_ec));}else{detail::non_const_lvalue handler2(handler);self_->impl_.get_service().async_connect(self_->impl_.get_implementation(), endpoint,handler2.value, self_->impl_.get_executor());}}private:rdma_connection* self_;};
};
初化器
还有编译期检查,比如Handler
的类型是否满足,Buffer
是否满足ConstBufferSequence
的约束等.在跟祁宇的一次讨论中,他还提到了初化器
的设计还可让用户去特化async_result
来实现自定义
异步操作,是个很强大的用户扩展点
.
3.6Service
通过调度器
发起IO
操作io_object_impl
通过初化器
转发给Service
之后,剩下的事情就很简单了.Service
的异步操作接口主要负责创建
对应的异步
操作对象,初化
操作对象,并通知调度器
发起IO
操作.
以rdma
在Linux
和窗口
平台上的实现为例:
// `Linux`的`rdma-core`
template
template
void rdma_core_service::async_connect(实现类型& impl,endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{bool const is_continuation =asio_handler_cont_helpers::is_continuation(handler);// TODO ... 取消using op = rdma_cm_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(success_ec_, impl.cm_id_data_->cm_id_, get_context(), handler, io_ex);cm_event_service_.start_connect_op(impl.cm_id_data_, p.p, endpoint.data(), is_continuation);p.v = p.p = 0;
}// 窗口的iocp
template
template
void rdma_iocp_connector_service::async_connect(实现类型& impl, endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{// TODO ... 取消using op = rdma_nd_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);start_connect_op(impl, endpoint, p.p);p.v = p.p = 0;
}
注意,Reactor
与Proactor
模式的差异在此就显现出来了.Reactor
在此发起的是个reactor_op
操作,注册给连接管理器
也就是这里的cm_event_service_
对象,真正的connect
函数调用是在连接管理器
的事件Channel
对应的fd
有POLLIN
消息时,才会调用.
而对IOCPProactor
而言,start_connect_op
会直接非阻塞地调用Connector
的Connect
函数,并在Connection
创建成功后,通过IOCP
的完成队列
通知用户.
上一篇:QT常用位置函数区别