扩展asio
创始人
2025-05-29 18:48:00
0

原文
讨论如何在asio上扩展自定义的异步io操作.包括asio中经典的Context/Service插件机制,及经常打交道的几个Service,还有使用asio分发器的一些注意等.

1.Context/Service机制

使用asio开发时,会创建两类对象:一类是exectioncontext,另一类是IOobject.比如以asiotcpsocket为例:

asio::io_context ioc{};
asio::ip::tcp::socket socket{ ioc };

asio::io_context执行环境的实现类,而asio::ip::tcp::socket则是IOobject.以上短短两行代码,用户就已通过IOobject对象,将tcp/ipsocket功能加载入io_context中.
也就是将socketservice,注册到了io_context中.注册的扳机是asio::detail::io_object_impl模板类.

1.1,io_object_impl

asio::ip::tcp::socket类型,抛开中间的各种实例化和继承的关系,它是asio::detail::io_object_impl实例化类型的外观.如下面代码所示(代码经过简化处理,方便展示概念,与asio有出入)

// 在`asio`名字空间
class socket
{
private:detail::io_object_impl,io_context::executor> impl_;public:socket(const io_context::executor& ex): impl_(0, ex){}
};

可见socket外观类,使用了reactive_socket_service(这里以linux平台为例),和io_context导出的executor,来实例化io_object_impl;而socket构造器,把关联io_contextexecutor传递给impl_成员.再来看看io_object_impl对应的构造器:

// 在`asio::detail`名字空间中
template 
class io_object_impl
{
public:
//用来提供`I/O`操作服务的类型.typedef IoObjectService service_type;// `I/O`对象的底层实现类型.typedef typename service_type::实现类型 实现类型;// 与对象关联的执行器的类型.typedef Executor executor_type;// 用执行器构造`I/O`对象.explicit io_object_impl(int, const executor_type& ex): service_(&asio::use_service(io_object_impl::get_context(ex))),executor_(ex){service_->construct(implementation_);}private:// 与`I/O`对象关联的服务.service_type* service_;// `I/O`对象的底层实现.实现类型 implementation_;// 关联执行器executor_type executor_;
};

构造器非常明确的展示了io_object_impl通过Executor执行环境对象,并通过use_service函数向执行环境中注册Service.

暂时跳过,实现类型相关内容.这里着重关注use_service函数,它是asio向其异步IO事件框架中,加载具体IO功能(这里是reactor_socket_service)的关键入口.

1.2Context/Service的对象模型

Context/Service大家也许很陌生,但换一个说法也许大家就很熟悉了.如COM中使用的Component/Interface组件对象模型,还有游戏引擎中常用的Object/Component模型.如下所示:

执行环境<-注册服务
-----先--------|
|0服务->1服务->2服务

use_service函数就是向执行环境中管理Service集合的ServiceRegistry类中,申请指定的Service对象.而service_registry则保证每一个Service只有一个,没有时则创建一个新的为原则,最终返回有效的Service引用.
当然,以上操作都是线安的.注意,ServiceRegistry使用了一个id来区分不同的Service,而id的生成则是通过CRTP配合std::type_info来生成的,如下代码所示:

//特殊的继承服务`ID`类型,使类仅头文件.
template 
class service_id: public execution_context::id
{
};//使类仅头文件的特殊服务基类.
template 
class execution_context_service_base: public execution_context::service
{
public:static service_id id;// 构造器.execution_context_service_base(execution_context& e): execution_context::service(e){}
};// 实现你私有服务
template 
class rdma_core_service :public execution_context_service_base>
{// ....
};

注意CRTP中的传递的实现类千万不要写错了,这里是rdma_core_service.否则底层计算的ID会出错.如果ID是某一个已有的ServiceID,会直接返回那个Service给用户,这会非常危险.
虽然CRTP可在正确时,用户保证运行时保证类型安全,但是错误使用并不会有编译期报错,并把灾难抛给运行时.

2.如何扩展有效的Service

到现在为止,应该知道扩展asio的关键是实现自己的Service,并实现自己的io_object_impl外观类.Service负责实现具体的同步或异步IO的操作,而外观类则提供封装好的接口,让用户方便地,正确地使用Service提供的能力.下面,展开讨论一下:

2.1导出implement_type

回顾一下第一节贴出的io_object_impl代码段,其中有一个implementation_的成员.它的类型是io_object_impl通过Service中导出的实现类型而来.
它是IOObject真正的数据实现,并由各个Service来定义.好处是,可让每个平台来处理自己的实现细节.
比如Linux平台的reactor_socket_service使用的socket类型是fd,而窗口平台的iocp_socket_service则使用的是SOCKET,使用实现类型,可让io_object_impl不关心平台相关数据类型差异和实现细节.

如前面讨论的在asio上封装rdma-core功能的Service,在此

template 
class rdma_core_service :public execution_context_service_base>
{
public:
//`cmID`的实现类型.struct 实现类型{// 本地`cm`表示.rdma_cm_id_data*      cm_id_data_;// 内存区域状态rdma_mr_state         mr_state_;//如果有附加到该`IO`对象的队列对bool                  has_qp_;};
};

2.2实现五规则

io_object_impl会实现自己的五规则.如第一节io_object_impl中的构造器,转发给了Serviceconstruct函数.此函数照例,需要实现如下:

template 
class rdma_core_service :public execution_context_service_base>
{
public:// 构造新的实现inline void construct(实现类型& impl);// 破坏实现inline void destroy(实现类型& impl);// 移动构造器inline void move_construct(实现类型& impl, 实现类型& other_impl);// 移动赋值inline void move_assign(实现类型& impl,rdma_core_service& other_service, 实现类型& other_impl);
};

io_object_implNoncopyable,所以只需要实现移动构造器移动赋值符.至于转换构造器,则根据用户自己的需求来决定.

2.3覆盖一些必要的函数

Service中还有几个需要覆盖的函数,如下列代码所示:

template 
class rdma_core_service :public execution_context_service_base>
{
public:/// 析构器ASIO_DECL virtual ~rdma_core_service()//析构服务拥有的所有用户定义的处理器对象ASIO_DECL virtual void 关机();//处理与`fork`相关的事件通知,以执行必要细节处理./***该函数不是纯虚的,所以服务只需要*必要时实现它.默认的实现闲着.*/ASIO_DECL virtual void notify_fork(execution_context::fork_event event);
};

虚析构器就不多说了,因为ServiceRegistry持有的是基类指针,清理Service时析构器得是的.关机则是给有状态的Service在整个asio发起关机清理和校验自己扩展点.notify_fork只适合Linux平台,用来响应Fork系统调用的事件.

2.4实现IOObject外观类

如第一节代码所示,用户并不直接调用Service的接口而是通过IOObject,来适用asio的功能.因此,还要实现对应的IOObject外观类.

为了通用,ServiceIOObject外观是一对多的关系,如在linux平台上SocketSocketAcceptor都是对应于reactive_socket_service.反之,因为IOObject会使用确定Service导出的实现类型,IOobject只能对应一个确定的Service.如一个rdma_connetion的实现:

//`RDMA`的`IO`对象,提供`RDMA`功能
template 
class rdma_connection
{
public:// ....
#if ASIO_HAS_IOCPusing service_type = detail::rdma_iocp_connector_service;
#elseusing service_type = detail::rdma_core_service;
#endifprivate:detail::io_object_impl impl_;public: // 实现接口void open(port_space_type const& port_space);bool is_open() const{return impl_.get_service().is_open(impl_.get_implementation());}// .....
};

通过外观类调用Service的接口,可参考is_open成员方法的实现.这里为了方便,简单只用了一个同步的,实现简单的接口来展示.
至此,就可在asio上扩展"插件"了.可发现并没有侵入式的修改asio的代码,这都得益于Context/Service对象模型强大的可扩展性.

3.如何扩展异步操作

asio是异步IO框架,如果需要支持自定义的异步IO操作,还需要额外工作.在工作开始之前,需要了解一些asio底层分发器的细节.

3.1asio中的调度器

asio的实现中没有具体统一的调度器实现,原因是Linux平台上使用了Reactor模式,而窗口平台则是基于IOCPProactor模式.

两个模式有差异,但asio的选择是在Linux平台上实现一个调度器类,并与Reactor一同工作,提供与Proactor模式相同的接口.最后,asioService层提供了一套统一的调度器机制,如下:

林操io环境实现    |IOCP
调度器  完成队列  |IOCP
epoll反应器       |IOCP开始操作 后(立即)延迟操作 跑(一个)

可见,Reactor中额外实现了个内部管理完成队列调度器类,来模拟与IOCP中等同的功能,并允许用户直接向该队列提交回调.

统一的调度器机制,也就是各个平台实现的context_impl类,并不能抹平ReactorProactor所有的差异.对Proactor而言,调用IO函数就直接发起了异步IO操作(窗口平台中WSA系列的函数).

Reactor依旧是同步IO,因此Reactor额外提供了start_op接口.该接口将根据IO的类型,注册IO事件给Reactor,并同步地在调度器收到IO事件后,选择适当时机执行IO操作.

分发过程和细节,asio封装的ReactorProactor也有所不同.基于IOCP完成队列中,只有IO完成事件回调和用户主动提交入队的回调;而基于Reactor完成队列中,除了这两个回调以外,还有从Reactor中通知的同步IO事件.

asioReactorProactor的机制有所了解之后,便可继续在不同平台上适配异步操作的工作了.

3.2扩展Reactor模式的异步操作

首先是封装Reactor模式下的异步操作.在Reactor模式下,asio有同步调用IO函数的机制.对asio源码稍作阅读后,不难发现这些秘密都藏在asio::detail::reactor_op类中.
则只要继承该类,并覆盖两个关键函数:一个是同步执行IO操作函数,另一个是IO完成函数,就可实现自定义异步操作.当然,在Reactor模式下的异步操作,是模拟异步操作.

// reactor_op 实现
class reactor_op : public operation
{ /******/ };class your_reactor_async_op : public reactor_op
{
public:your_reactor_async_op(asio::error_code const& success_ec, ...): reactor_op(success_ec, do_perform, do_complete)//, 其他初化{}static status do_perform(reactor_op* base){auto* this_op = static_cast(base);// TODO ... 调用你的同步`IO`}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... `io`完成逻辑}
};

reactor_op类从操作类继承,该类也是调度器完成队列链表的节点类,因此可把要模拟的某个异步IO操作的处理(perform)完成(complete)业务都实现在一起.

3.3扩展IOCPProactor模式的异步操作

扩展IOCPProactor的操作就比Reactor模式下简单许多.操作类从OVERLAPPED结构继承,可直接把op操作传递给异步IO函数的LPOVERLAPPED指针参数.因此只需要考虑IOcomplete回调即可.

class your_iocp_async_op : public operation
{
public:your_iocp_async_op(...): operation(do_complete)//, 其他初化{}static void do_complete(void* owner, operation* base,const asio::error_code& /*ec*/,std::size_t /*传输字节数*/){auto* this_op = static_cast(base);// TODO ... 干`io`完成逻辑}
};

3.4独立于调度器之外的完成队列

有些IO库或标准中的IO完成事件,并不能统一到asio的标准统一调度器中.如rdma中的完成队列(CQ).该设计也是合理的,毕竟使用统一的分发,并不能满足对低延迟极度敏感的场景.

适配类似rdma中的CQ,一个简单可行的做法,可参考在asio上试RDMA中的内容.除此外,既然rdma提供了CQ的概念和PollCQ的接口,说明用户一定有自己主动使用它们的场景.于是笔者试提供了CQ对象,让用户可在任意线程主动PollCQ.

asio::io_context ioc{};
asio::rdma::completion_queue cq{ ioc };std::thread thread {[&] { cq.run(); }
}

该场景下,io_context中的统一分发CQService停止会工作,并把PollCQ的权限交给用户.

3.5io_object_impl通过初化器转发给Service

有了适配各个平台的异步操作定义之后,剩下的工作就是为io_object_implService实现异步接口了.io_object_impl外观类的实现尽量统一,但是每个平台的操作类型实现大概率不一样,asio在外观类层通过初化器机制,把实现上差异转发给了各个平台的Service,以rdma_connect的实现为例:

//`RDMA`的`IO`对象,提供`RDMA`功能
template 
class rdma_connection
{
public:// .......class initiate_async_connect;// 异步连接template ASIO_INITFN_AUTO_RESULT_TYPE_PREFIX(ConnectToken,void (asio::error_code))async_connect(endpoint_type const& endpoint,ASIO_MOVE_ARG(ConnectToken) tokenASIO_DEFAULT_COMPLETION_TOKEN(executor_type))ASIO_INITFN_AUTO_RESULT_TYPE_SUFFIX((async_initiate(declval(), token,declval()))){asio::error_code open_ec{};if(!is_open()){open_ec = asio::error::try_again;}#if ASIO_HAS_IOCPif(!open_ec){impl_.get_service().allocate_qp(impl_.get_implementation(), open_ec);}
#endifreturn async_initiate(initiate_async_connect(this), token, endpoint, open_ec);}// `async_connect_op`的初化器class initiate_async_connect{public:using executor_type = Executor;explicit initiate_async_connect(rdma_connection* self) ASIO_NOEXCEPT: self_(self) {}template void operator() (ASIO_MOVE_ARG(Handler) handler,endpoint_type const& endpoint,asio::error_code const& open_ec) const{ASIO_CONNECT_HANDLER_CHECK(Handler, handler) type_check;if(open_ec){asio::post(self_->impl_.get_executor(),asio::detail::bind_handler(ASIO_MOVE_CAST(Handler)(handler), open_ec));}else{detail::non_const_lvalue handler2(handler);self_->impl_.get_service().async_connect(self_->impl_.get_implementation(), endpoint,handler2.value, self_->impl_.get_executor());}}private:rdma_connection* self_;};
};

初化器还有编译期检查,比如Handler的类型是否满足,Buffer是否满足ConstBufferSequence的约束等.在跟祁宇的一次讨论中,他还提到了初化器的设计还可让用户去特化async_result来实现自定义异步操作,是个很强大的用户扩展点.

3.6Service通过调度器发起IO操作

io_object_impl通过初化器转发给Service之后,剩下的事情就很简单了.Service的异步操作接口主要负责创建对应的异步操作对象,初化操作对象,并通知调度器发起IO操作.
rdmaLinux窗口平台上的实现为例:

// `Linux`的`rdma-core`
template 
template 
void rdma_core_service::async_connect(实现类型& impl,endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{bool const is_continuation =asio_handler_cont_helpers::is_continuation(handler);// TODO ... 取消using op = rdma_cm_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(success_ec_, impl.cm_id_data_->cm_id_, get_context(), handler, io_ex);cm_event_service_.start_connect_op(impl.cm_id_data_, p.p, endpoint.data(), is_continuation);p.v = p.p = 0;
}// 窗口的iocp
template 
template 
void rdma_iocp_connector_service::async_connect(实现类型& impl, endpoint_type const& endpoint,Handler& handler, const IoExecutor& io_ex)
{// TODO ... 取消using op = rdma_nd_connect_op;typename op::ptr p = { asio::detail::addressof(handler),op::ptr::allocate(handler), 0 };p.p = new (p.v) op(impl.connector_.Get(), handler, io_ex);start_connect_op(impl, endpoint, p.p);p.v = p.p = 0;
}

注意,ReactorProactor模式的差异在此就显现出来了.Reactor在此发起的是个reactor_op操作,注册给连接管理器也就是这里的cm_event_service_对象,真正的connect函数调用是在连接管理器的事件Channel对应的fdPOLLIN消息时,才会调用.

而对IOCPProactor而言,start_connect_op会直接非阻塞地调用ConnectorConnect函数,并在Connection创建成功后,通过IOCP完成队列通知用户.

相关内容

热门资讯

超6300亿元!A股银行“分红... 7月25日,成都银行完成权益分派股权登记,将于7月28日发放现金红利,这标志着A股上市银行2024年...
老铺黄金:2025年上半年单个... 7月27日晚,老铺黄金(HK06181)披露2025年中期业绩预告。预计2025年上半年实现销售业绩...
保险行业2025年上半年回顾与... 今天分享的是:保险行业2025年上半年回顾与未来展望 报告共计:59页 2025年上半年保险行业回顾...
数币App上新!消费者、商户两... 数字人民币试点持续推进,相关数字钱包手机应用程序功能也在优化中。7月21日,北京商报记者注意到,日前...
A股热点迭出,个股连续涨停!资... 近段时间以来A股市场整体走势较为强劲,上周以来在雅江概念集体上行的推动下涨势更为明显,主要指数不同程...
原创 印... 令人惊讶的是,印度人开始反思自身制造业的发展状况。印度经济学家帕纳加利亚指出,印度原本有机会在20年...
首创证券拟赴港上市,“A+H”... 首创证券在A股上市不足三年便启动赴港上市计划。近日,首创证券公告称,公司董事会已审议通过了公司拟发行...
肥东杨大爷要帮“儿子”还钱,银... “儿子”在外借了2万元还不上 “要债人”电话直接打了过来 还?还是不还? 7月6日 肥东县公安局梁园...
A股上周16家上市公司公布并购... 转自:扬子晚报 扬子晚报网7月27日讯(记者 范晓林 薄云峰)近段时间以来,A股市场并购重组活跃度持...
独家|某股份行改动零售业务关键... 在资产端信贷“投不动”(多家行零售信贷增速连续几个季度放缓、更有甚者个贷投放负增长)、负债端存款“定...
四川五日游报团指南及详细行程,... 四川,这片位于中国西南的神奇土地,以其独特的自然风光、丰富的文化遗产和诱人的美食而闻名遐迩。从成都的...
原创 中... 在2025年4月初,时任美国总统的特朗普正式启动了针对世界各国的关税战,旨在通过实施经济制裁来促进美...
牛市主升浪开启了?别急!珍惜布... 本周,A股市场上行,主要宽基指数都收获了或多或少的周涨幅,其中,科创50、微盘股涨幅居前。板块方面,...
公募二季报两大看点!港股配置逼... 本报(chinatimes.net.cn)记者栗鹏菲 叶青 北京报道 2025年公募基金二季报披露收...
长和出售港口磋商期或延长 随着可能出现的各方介入及交易结构变化,此次长和港口出售交易如继续进行,其复杂性会提升 文 |《财经》...
中航重机涨0.17%,成交额4... 来源:新浪证券-红岸工作室 7月25日,中航重机涨0.17%,成交额4.14亿元,换手率1.52%,...
重仓电子和新能源行业 【深圳商报讯】(记者 陈燕青)基金二季报出炉,公募二季度依然重仓电子、新能源、食品饮料等行业。公募排...
大婚之后,大笔减持!昔日全球首... 当地时间7月25日,亚马逊公司提交至美国证券交易委员会的文件显示,前全球首富、亚马逊创始人杰夫·贝索...
创源股份涨2.32%,成交额3... 来源:新浪证券-红岸工作室 7月25日,创源股份涨2.32%,成交额3.50亿元,换手率8.32%,...
筹备登陆韩国综合股价指数!大韩... 近日,大韩造船(Daehan Shipbuilding)的首次公开募股(IPO)发行价最终确定为每股...