事务性发件箱模式:解决数据库事务和消息的一致性
admin
2024-03-25 02:31:44
0

参考:https://microservices.io/patterns/data/transactional-outbox.html

这是2022年10月28日创建的草稿,最终的文字内容和代码通过 ChatGPT 生成。

事务性发件箱模式是一种解决数据库事务和消息的一致性问题的方法。它的基本思想是,将消息的发送操作与数据库的更新操作绑定在一起,保证这两个操作是原子性的,要么都成功,要么都失败。

在传统的数据库应用中,数据库事务与消息的发送是独立的两个操作,如果数据库事务因为某种原因失败了,消息可能已经被发送出去了,这就导致了数据的不一致性。事务性发件箱模式的目的就是解决这个问题。

事务性发件箱模式的实现方式有多种,其中一种是将消息的发送操作与数据库事务放在同一个线程中进行。当数据库事务提交之前,将消息放入一个临时的发件箱中,如果数据库事务成功提交,则消息也会被发送出去;如果数据库事务失败,则消息也不会被发送。

另一种实现方式是在数据库事务提交之后,通过发布/订阅模式将消息发送给消息队列,然后由消息队列负责将消息发送出去。这种方式的优点是能够支持分布式系统,数据库事务和消息发送可以在不同的机器上进行。

无论哪种实现方式,事务性发件箱模式都能够保证数据库事务和消息的一致性。这对于需要处理大量数据和消息的应用来说,是非常有用的。它能够防止数据的不一致性,避免出现因消息发送失败而导致的数据错误。

架构图

       +----------------+|  ORDER SERVICE +-----------INSERT------++-----+----------+                       ||                                  |INSERT                               |
+------------+----------------------------------+-----------------+
| Transaction|                                  |                 |
|       ORDER|TABLE                       OUTBOX|TABLE            |
|     +------>--------+------+        +------+--v------+-----+    |
|     |      |        |      |        |      |         |     |    |
|     +------+--------+------+        +------+--^------+-----+    |
|                                               |                 |
+-----------------------------------------------+-----------------+|READ|+-------------+---------+|JOB          |         ||   +---------+------+  ||   |  Send Message  |  ||   +----------------+  ||                       |+-----------------------+

示例

  1. 在数据库中建立一张临时发件箱表,包含消息的 ID、内容和状态字段。比如:
    create table outbox (id int auto_increment primary key,message varchar(255),status tinyint
    );
    
  2. 在数据库事务处理类中,在更新数据库之后,将消息写入发件箱表。比如:
    public class UserDao {private Connection conn;private MessageSender sender;public UserDao(Connection conn, MessageSender sender) {this.conn = conn;this.sender = sender;}public void updateUser(int userId, String username) throws SQLException {// 开启事务conn.setAutoCommit(false);try {// 更新数据库中的用户信息String sql = "update user set username = ? where id = ?";PreparedStatement stmt = conn.prepareStatement(sql);stmt.setString(1, username);stmt.setInt(2, userId);stmt.executeUpdate();// 将消息写入发件箱表sql = "insert into outbox (message, status) values (?, 0)";stmt = conn.prepareStatement(sql);stmt.setString(1, "用户 " + userId + " 的用户名已经更新为 " + username);stmt.executeUpdate();// 提交事务conn.commit();} catch (SQLException e) {// 回滚事务conn.rollback();throw e;} finally {// 关闭数据库连接conn.close();}}
    }
    
  3. 在发送消息的过程中,从发件箱表中取出,发送成功后删除
    public class OutboxProcessor implements Runnable {private Connection conn;private MessageSender sender;public OutboxProcessor(Connection conn, MessageSender sender) {this.conn = conn;this.sender = sender;}public void run() {while (true) {try {// 使用独占锁锁定发件箱表中的消息String sql = "select * from outbox where status = 0 for update";PreparedStatement stmt = conn.prepareStatement(sql);ResultSet rs = stmt.executeQuery();// 遍历结果集,发送消息并删除发件箱表中的消息while (rs.next()) {// 发送消息到消息sender.send(rs.getString("message"));// 删除发件箱表中的消息sql = "delete from outbox where id = ?";PreparedStatement stmt2 = conn.prepareStatement(sql);stmt2.setInt(1, rs.getInt("id"));stmt2.executeUpdate();}// 休眠一段时间,避免频繁查询发件箱表Thread.sleep(1000);} catch (SQLException | InterruptedException e) {e.printStackTrace();}}}
    }
    
  4. 在主程序中,创建一个 OutboxProcessor 线程,并启动它。比如:
    public class Main {public static void main(String[] args) {// 创建消息发送器MessageSender sender = new RabbitMQSender();// 创建数据库连接Connection conn = DriverManager.getConnection(...);// 创建 OutboxProcessor 线程OutboxProcessor processor = new OutboxProcessor(conn, sender);// 启动 OutboxProcessor 线程new Thread(processor).start();}
    }
    

相关内容

热门资讯

国民性创新,越来越阳春白雪 问一个问题,最近两年爆火的创新,从生成式AI到人形机器人,到底是离大众越来越近,还是离大众越来越远?...
男子被显示欠银行1000万亿索... 男子被显示欠银行1000万亿索赔200万遭拒,银行仅愿赔偿3万元
研究显示美国散户投资者推动杠杆... 来源:环球市场播报 Direxion公司联合Vanda Research与The Compound ...
高瓴、李录、巴菲特最新持仓披露... 最近,随着美股13F文件的披露,多家私募机构2025Q4最新调仓情况浮出水面。之前的文章,证星研究院...
原创 老... 四十年代的北京珠市口路口,正处于从民国向新中国过渡的时期,它既保留着清末民初形成的鲜明社会分层特征,...
原创 相... 在金融圈,流行一个词叫“估值修复”。意思是股价跌狠了,总会涨回来。 但阿睿发现,自己在相亲市场的估值...
上海楼市重磅新政,非沪籍大松绑... wumiancaijing.com / 最热的泛财经新闻,都在这儿了。 重要提醒!!!为防失联,请“...
春节白酒消费:高端产品热度升高... 来源:新京报 春节是酒水消费传统旺季,马年春节期间,白酒动销稳健复苏。多家机构调研发现,白酒消费呈现...
黄金暴涨的秘密找到了!不是散户... 过去两年,黄金市场最容易被忽视的一条主线,并不是价格本身的起伏,而是一个更为深层的结构性变化——全球...
刘强东投资50亿进军游艇产业,... 极目新闻记者 陈红 刘强东近日创立了独立游艇品牌Sea Expandary,进入游艇产业,计划从研...
韩国驻美大使:密切关注美方新关... 据韩联社报道,韩国驻美国大使康京和2月24日就美国总统特朗普在联邦最高法院裁定“对等关税”违法后宣布...
现货黄金刚刚涨破5200美元关... 25日,现货黄金持续拉涨,盘中再次突破5200美元大关,涨超1.3%。 瑞银(UBS)分析师此前在...
【银行业展望系列】五篇大文章:... 当前银行息差持续承压、规模扩张的增长红利逐渐消退,已经陷入内卷式的同质化竞争。“规模为王”的旧模式将...
原创 一... 美国总统唐纳德·特朗普迅速恢复了之前被最高法院废除的关税政策,这一急转弯让本就面临成本激增压力的美国...
收盘:上证指数、深证成指涨1.... 上证指数(000001)涨0.72%,深证成指(399001)涨1.29%,创业板指(399006)...
蔡宏波、毛健:美国贸易逆差“转... 近年来,美国政府反复将“贸易失衡”描述为事关国家安全的核心问题。从政策实践看,自2018年以来,美国...
五粮液集团入股四川三江汇海融资... 天眼查显示,近日,四川三江汇海融资租赁有限公司发生工商变更,新增四川省宜宾五粮液集团有限公司为股东,...
A股高开高走:周期股延续强势,... A股三大股指2月25日集体高开。早盘震荡走高,午后震荡回落,全天呈现高位震荡走势。 从盘面上看,周期...