扩展asio
创始人
2025-05-29 18:48:00
0

原文
讨论如何在asio上扩展自定义的异步io操作.包括asio中经典的Context/Service插件机制,及经常打交道的几个Service,还有使用asio分发器的一些注意等.

1.Context/Service机制

使用asio开发时,会创建两类对象:一类是exectioncontext,另一类是IOobject.比如以asiotcpsocket为例:

asio::io_context ioc{};
asio::ip::tcp::socket socket{ ioc };

asio::io_context执行环境的实现类,而asio::ip::tcp::socket则是IOobject.以上短短两行代码,用户就已通过IOobject对象,将tcp/ipsocket功能加载入io_context中.
也就是将socketservice,注册到了io_context中.注册的扳机是asio::detail::io_object_impl模板类.

1.1,io_object_impl

asio::ip::tcp::socket类型,抛开中间的各种实例化和继承的关系,它是asio::detail::io_object_impl实例化类型的外观.如下面代码所示(代码经过简化处理,方便展示概念,与asio有出入)

// 在`asio`名字空间
class socket
{
private:detail::io_object_impl,io_context::executor> impl_;public:socket(const io_context::executor& ex): impl_(0, ex){}
};

可见socket外观类,使用了reactive_socket_service(这里以linux平台为例),和io_context导出的executor,来实例化io_object_impl;而socket构造器,把关联io_contextexecutor传递给impl_成员.再来看看io_object_impl对应的构造器:

// 在`asio::detail`名字空间中
template 
class io_object_impl
{
public:
//用来提供`I/O`操作服务的类型.typedef IoObjectService service_type;// `I/O`对象的底层实现类型.typedef typename service_type::实现类型 实现类型;// 与对象关联的执行器的类型.typedef Executor executor_type;// 用执行器构造`I/O`对象.explicit io_object_impl(int, const executor_type& ex): service_(&asio::use_service(io_object_impl::get_context(ex))),executor_(ex){service_->construct(implementation_);}private:// 与`I/O`对象关联的服务.service_type* service_;// `I/O`对象的底层实现.实现类型 implementation_;// 关联执行器executor_type executor_;
};

构造器非常明确的展示了io_object_impl通过Executor执行环境对象,并通过use_service函数向执行环境中注册Service.

暂时跳过,实现类型相关内容.这里着重关注use_service函数,它是asio向其异步IO事件框架中,加载具体IO功能(这里是reactor_socket_service)的关键入口.

1.2Context/Service的对象模型

Context/Service大家也许很陌生,但换一个说法也许大家就很熟悉了.如COM中使用的Component/Interface组件对象模型,还有游戏引擎中常用的Object/Component模型.如下所示:

执行环境<-注册服务
-----先--------|
|0服务->1服务->2服务

use_service函数就是向执行环境中管理Service集合的ServiceRegistry类中,申请指定的Service对象.而service_registry则保证每一个Service只有一个,没有时则创建一个新的为原则,最终返回有效的Service引用.
当然,以上操作都是线安的.注意,ServiceRegistry使用了一个id来区分不同的Service,而id的生成则是通过CRTP配合std::type_info来生成的,如下代码所示:

//特殊的继承服务`ID`类型,使类仅头文件.
template 
class service_id: public execution_context::id
{
};//使类仅头文件的特殊服务基类.
template 
class execution_context_service_base: public execution_context::service
{
public:static service_id id;// 构造器.execution_context_service_base(execution_context& e): execution_context::service(e){}
};// 实现你私有服务
template 
class rdma_core_service :public execution_context_service_base>
{// ....
};

注意CRTP中的传递的实现类千万不要写错了,这里是rdma_core_service.否则底层计算的ID会出错.如果ID是某一个已有的ServiceID,会直接返回那个Service给用户,这会非常危险.
虽然CRTP可在正确时,用户保证运行时保证类型安全,但是错误使用并不会有编译期报错,并把灾难抛给运行时.

2.如何扩展有效的Service

到现在为止,应该知道扩展asio的关键是实现自己的Service,并实现自己的io_object_impl外观类.Service负责实现具体的同步或异步IO的操作,而外观类则提供封装好的接口,让用户方便地,正确地使用Service提供的能力.下面,展开讨论一下:

2.1导出implement_type

回顾一下第一节贴出的io_object_impl代码段,其中有一个implementation_的成员.它的类型是io_object_impl通过Service中导出的实现类型而来.
它是IOObject真正的数据实现,并由各个Service来定义.好处是,可让每个平台来处理自己的实现细节.
比如Linux平台的reactor_socket_service使用的socket类型是fd,而窗口平台的iocp_socket_service则使用的是SOCKET,使用实现类型,可让io_object_impl不关心平台相关数据类型差异和实现细节.

如前面讨论的在asio上封装rdma-core功能的Service,在此

template 
class rdma_core_service :public execution_context_service_base>
{
public:
//`cmID`的实现类型.struct 实现类型{// 本地`cm`表示.rdma_cm_id_data*      cm_id_data_;// 内存区域状态rdma_mr_state         mr_state_;//如果有附加到该`IO`对象的队列对bool                  has_qp_;};
};

2.2实现五规则

io_object_impl会实现自己的五规则.如第一节io_object_impl中的构造器,转发给了Serviceconstruct函数.此函数照例,需要实现如下:

template 
class rdma_core_service :public execution_context_service_base>
{
public:// 构造新的实现inline void construct(实现类型& impl);// 破坏实现inline void destroy(实现类型& impl);// 移动构造器inline void move_construct(实现类型& impl, 实现类型& other_impl);// 移动赋值inline void move_assign(实现类型& impl,rdma_core_service& other_service, 实现类型& other_impl);
};

io_object_implNoncopyable,所以只需要实现移动构造器移动赋值符.至于转换构造器,则根据用户自己的需求来决定.

2.3覆盖一些必要的函数

Service中还有几个需要覆盖的函数,如下列代码所示:

template 
class rdma_core_service :public execution_context_service_base>
{
public:/// 析构器ASIO_DECL virtual ~rdma_core_service()//析构服务拥有的所有用户定义的处理器对象ASIO_DECL virtual void 关机();//处理与`fork`相关的事件通知,以执行必要细节处理./***该函数不是纯虚的,所以服务只需要*必要时实现它.默认的实现闲着.*/ASIO_DECL virtual void notify_fork(execution_context::fork_event event);
};

虚析构器就不多说了,因为ServiceRegistry持有的是基类指针,清理Service时析构器得是的.关机则是给有状态的Service在整个asio发起关机清理和校验自己扩展点.notify_fork只适合Linux平台,用来响应Fork系统调用的事件.

2.4实现IOObject外观类

如第一节代码所示,用户并不直接调用Service的接口而是通过IOObject,来适用asio的功能.因此,还要实现对应的IOObject外观类.

为了通用,ServiceIOObject外观是一对多的关系,如在linux平台上SocketSocketAcceptor都是对应于reactive_socket_service.反之,因为IOObject会使用确定Service导出的实现类型,IOobject只能对应一个确定的Service.如一个rdma_connetion的实现:

//`RDMA`的`IO`对象,提供`RDMA`功能
template 
class rdma_connection
{
public:// ....
#if ASIO_HAS_IOCPusing service_type = detail::rdma_iocp_connector_service;
#elseusing service_type = detail::rdma_core_service;
#endifprivate:detail::io_object_impl impl_;public: // 实现接口void open(port_space_type const& port_space);bool is_open() const{return impl_.get_service().is_open(impl_.get_implementation());}// .....
};

通过外观类调用Service的接口,可参考is_open成员方法的实现.这里为了方便,简单只用了一个同步的,实现简单的接口来展示.
至此,就可在asio上扩展"插件"了.可发现并没有侵入式的修改asio的代码,这都得益于Context/Service对象模型强大的可扩展性.

3.如何扩展异步操作

asio是异步IO框架,如果需要支持自定义的异步IO操作,还需要额外工作.在工作开始之前,需要了解一些asio底层分发器的细节.

3.1asio中的调度器

asio的实现中没有具体统一的调度器实现,原因是Linux平台上使用了Reactor模式,而窗口平台则是基于IOCPProactor模式.

两个模式有差异,但asio的选择是在Linux平台上实现一个调度器类,并与Reactor一同工作,提供与Proactor模式相同的接口.最后,asioService层提供了一套统一的调度器机制,如下:

林操io环境实现    |IOCP
调度器  完成队列  |IOCP
epoll反应器       |IOCP开始操作 后(立即)延迟操作 跑(一个)

可见,Reactor中额外实现了个内部管理完成队列调度器类,来模拟与IOCP中等同的功能,并允许用户直接向该队列提交回调.

统一的调度器机制,也就是各个平台实现的context_impl类,并不能抹平ReactorProactor所有的差异.对Proactor而言,调用IO函数就直接发起了异步IO操作(窗口平台中WSA系列的函数).

Reactor依旧是同步IO,因此Reactor额外提供了start_op接口.该接口将根据IO的类型,注册IO事件给Reactor,并同步地在调度器收到IO事件后,选择适当时机执行IO操作.

分发过程和细节,asio封装的ReactorProactor也有所不同.基于IOCP完成队列中,只有IO完成事件回调和用户主动提交入队的回调;而基于Reactor完成队列中,除了这两个回调以外,还有从Reactor中通知的同步IO事件.

asioReactorProactor的机制有所了解之后,便可继续在不同平台上适配异步操作的工作了.

3.2扩展Reactor模式的异步操作

首先是封装Reactor模式下的异步操作.在Reactor模式下,asio有同步调用IO函数的机制.对asio源码稍作阅读后,不难发现这些秘密都藏在asio::detail::reactor_op类中.
则只要继承该类,并覆盖两个关键函数:一个是同步执行IO操作函数,另一个是IO完成函数,就可实现自定义异步操作.当然,在Reactor模式下的异步操作,是模拟异步操作.

// reactor_op 实现
class reactor_op : public operation
{ /******/ };class your_reactor_async_op : public reactor_op
{
public:your_reactor_async_op(asio::error_code const& success_ec, ...): reactor_op(success_ec, do_perform, do_complete)//, 其他初化{}static status do_perform(reactor_op* base){auto* this_op = static_cast(base);// TODO ... 调用你的同步`IO`}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... `io`完成逻辑}
};

reactor_op类从操作类继承,该类也是调度器完成队列链表的节点类,因此可把要模拟的某个异步IO操作的处理(perform)完成(complete)业务都实现在一起.

3.3扩展IOCPProactor模式的异步操作

扩展IOCPProactor的操作就比Reactor模式下简单许多.操作类从OVERLAPPED结构继承,可直接把op操作传递给异步IO函数的LPOVERLAPPED指针参数.因此只需要考虑IOcomplete回调即可.

class your_iocp_async_op : public operation
{
public:your_iocp_async_op(...): operation(do_complete)//, 其他初化{}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... 干`io`完成逻辑}
};

3.4独立于调度器之外的完成队列

有些IO库或标准中的IO完成事件,并不能统一到asio的标准统一调度器中.如rdma中的完成队列(CQ).该设计也是合理的,毕竟使用统一的分发,并不能满足对低延迟极度敏感的场景.

适配类似rdma中的CQ,一个简单可行的做法,可参考在asio上试RDMA中的内容.除此外,既然rdma提供了CQ的概念和PollCQ的接口,说明用户一定有自己主动使用它们的场景.于是笔者试提供了CQ对象,让用户可在任意线程主动PollCQ.

asio::io_context ioc{};
asio::rdma::completion_queue cq{ ioc };std::thread thread {[&] { cq.run(); }
}

该场景下,io_context中的统一分发CQService停止会工作,并把PollCQ的权限交给用户.

3.5io_object_impl通过初化器转发给Service

有了适配各个平台的异步操作定义之后,剩下的工作就是为io_object_implService实现异步接口了.io_object_impl外观类的实现尽量统一,但是每个平台的操作类型实现大概率不一样,asio在外观类层通过初化器机制,把实现上差异转发给了各个平台的Service,以rdma_connect的实现为例:

//`RDMA`的`IO`对象,提供`RDMA`功能
template 
class rdma_connection
{
public:// .......class initiate_async_connect;// 异步连接template ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,void (asio::error_code))async_connect(endpoint_type const& endpoint,ASIO_MOVE_ARG(ConnectToken) tokenASIO_DEFAULT_COMPLETION_TOKEN(executor_type))ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate(declval(), token,declval()))){asio::error_code open_ec{};if(!is_open()){open_ec = asio::error::try_again;}#if ASIO_HAS_IOCPif(!open_ec){impl_.get_service().allocate_qp(impl_.get_implementation(), open_ec);}
#endifreturn async_initiate(initiate_async_connect(this), token, endpoint, open_ec);}// `async_connect_op`的初化器class initiate_async_connect{public:using executor_type = Executor;explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT: self_(self) {}template void operator() (ASIO_MOVE_ARG(Handler) handler,endpoint_type const& endpoint,asio::error_code const& open_ec) const{ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;if(open_ec){asio::post(self_->impl_.get_executor(),asio::detail::bind_handler(ASIO_MOVE_CAST(Handler)(handler), open_ec));}else{detail::non_const_lvalue handler2(handler);self_->impl_.get_service().async_connect(self_->impl_.get_implementation(), endpoint,handler2.value, self_->impl_.get_executor());}}private:rdma_connection* self_;};
};

初化器还有编译期检查,比如Handler的类型是否满足,Buffer是否满足ConstBufferSequence的约束等.在跟祁宇的一次讨论中,他还提到了初化器的设计还可让用户去特化async_result来实现自定义异步操作,是个很强大的用户扩展点.

3.6Service通过调度器发起IO操作

io_object_impl通过初化器转发给Service之后,剩下的事情就很简单了.Service的异步操作接口主要负责创建对应的异步操作对象,初化操作对象,并通知调度器发起IO操作.
rdmaLinux窗口平台上的实现为例:

// `Linux`的`rdma-core`
template 
template 
void rdma_core_service::async_connect(实现类型& impl,endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{bool const is_continuation =asio_handler_cont_helpers::is_continuation(handler);// TODO ... 取消using op = rdma_cm_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(success_ec_, impl.cm_id_data_->cm_id_, get_context(), handler, io_ex);cm_event_service_.start_connect_op(impl.cm_id_data_, p.p, endpoint.data(), is_continuation);p.v = p.p = 0;
}// 窗口的iocp
template 
template 
void rdma_iocp_connector_service::async_connect(实现类型& impl, endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{// TODO ... 取消using op = rdma_nd_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);start_connect_op(impl, endpoint, p.p);p.v = p.p = 0;
}

注意,ReactorProactor模式的差异在此就显现出来了.Reactor在此发起的是个reactor_op操作,注册给连接管理器也就是这里的cm_event_service_对象,真正的connect函数调用是在连接管理器的事件Channel对应的fdPOLLIN消息时,才会调用.

而对IOCPProactor而言,start_connect_op会直接非阻塞地调用ConnectorConnect函数,并在Connection创建成功后,通过IOCP完成队列通知用户.

相关内容

热门资讯

现货黄金直线跳水,跌破5200... 新闻荐读 1月29日晚,现货黄金白银快速走低,回吐盘中全部涨幅。23:15左右,现货黄金跌破5300...
加拿大拟与多国联合设立国防银行 新华社北京1月31日电 加拿大财政部长商鹏飞1月30日说,加拿大将在未来数月与国际伙伴密切合作,推进...
马斯克大消息!SpaceX申请... 据券商中国,美东时间1月30日,路透社报道,据两位知情人士透露,马斯克旗下SpaceX公司2025年...
澳网:雷巴金娜2-1萨巴伦卡女... 北京时间1月31日,2026赛季网球大满贯澳大利亚公开赛继续进行,在女单决赛中,5号种子雷巴金娜6-...
春节前白酒促销热:“扫码抽黄金... 春节临近,白酒市场再现价格异动。 近日,飞天茅台批价拉升,有酒商直言“年前要冲2000元关口”,引发...
新安县人民医院让专业护理走进千... 由211名专业人员组成的服务团队,提供60项全维度服务,累计完成上门服务3217人次;实现“入院—出...
跨国企业负责人高度肯定中国经济... 本文转自【中国经济网-《经济日报》】; 参观者在第八届中国国际进口博览会美敦力公司一款超硬导丝产品...
中药配方颗粒标准化浪潮:数商云... 在中医药现代化与国际化加速推进的背景下,中药配方颗粒行业正经历一场以标准化为核心的深刻变革。截至20...
长江能科迪拜孙公司完成注册 拓... 来源:新浪财经-鹰眼工作室 【财经网讯】长江三星能源科技股份有限公司(证券代码:920158,证券简...
银行职工因贪污罪获刑后留任,在... 新京报记者 刘锦涵 制作 礼牧周 ▲新京报我们视频出品(ID:wevideo) 近日,农发行福建福鼎...
黄金创40年来最大单日跌幅!金... (来源:劳动报) 转自:劳动报 1月31日,国际金银价格同步大跌,创40余年来最大跌幅。国内金饰价...
“一人公司”近来何以兴起? 2026年开年,“一人公司”发展备受关注。这种新型创业模式正在上海、北京、江苏等地悄然兴起,凭借低成...
寒武纪预计 2025 年净利润... 消息,AI 芯片企业寒武纪今日发布 2025 年年度业绩预告: 经财务部门初步测算,公司预计 2...
和讯投顾徐剑波:ETF买入法! 这轮牛市是机构主导的ETF牛市,选对ETF往往比选股更加赚钱。那么如何投资ETF?今天教给大家一个非...
君乐宝上市申请已递交,国内乳品... 2026年 1月19日,中国领先的综合乳制品企业君乐宝乳业集团股份有限公司正式向香港联交所递交主板上...
大涨!马斯克,突传大消息!重磅... SpaceX的“赚钱能力”曝光。 据最新消息,世界首富埃隆·马斯克旗下的商业航天公司SpaceX去年...
原创 顶... 2025年微博之夜定档于2026年2月5日北京线上直播,这场已经走过二十多年风雨的互联网年度盛典,因...
体检查出肺结节?3个日常行为正... 太原龙城中医医院科普:如今越来越多人在体检中发现肺结节,看到报告上的“阴影”便忧心忡忡。其实研究表明...
记者观察丨美联储下任主席提名揭... 在经过长达一年反复挑选后,美国总统唐纳德·特朗普终于做出决定,提名凯文·沃什为下一任美联储主席,接替...
首饰金,一夜大跌上百元!金价暴... 【导读】多家首饰品牌金价出现大幅下跌 中国基金报记者 忆山 随着国际金价急速下跌,国内首饰金价也迎来...