参考:https://microservices.io/patterns/data/transactional-outbox.html
这是2022年10月28日创建的草稿,最终的文字内容和代码通过 ChatGPT 生成。
事务性发件箱模式是一种解决数据库事务和消息的一致性问题的方法。它的基本思想是,将消息的发送操作与数据库的更新操作绑定在一起,保证这两个操作是原子性的,要么都成功,要么都失败。
在传统的数据库应用中,数据库事务与消息的发送是独立的两个操作,如果数据库事务因为某种原因失败了,消息可能已经被发送出去了,这就导致了数据的不一致性。事务性发件箱模式的目的就是解决这个问题。
事务性发件箱模式的实现方式有多种,其中一种是将消息的发送操作与数据库事务放在同一个线程中进行。当数据库事务提交之前,将消息放入一个临时的发件箱中,如果数据库事务成功提交,则消息也会被发送出去;如果数据库事务失败,则消息也不会被发送。
另一种实现方式是在数据库事务提交之后,通过发布/订阅模式将消息发送给消息队列,然后由消息队列负责将消息发送出去。这种方式的优点是能够支持分布式系统,数据库事务和消息发送可以在不同的机器上进行。
无论哪种实现方式,事务性发件箱模式都能够保证数据库事务和消息的一致性。这对于需要处理大量数据和消息的应用来说,是非常有用的。它能够防止数据的不一致性,避免出现因消息发送失败而导致的数据错误。
+----------------+| ORDER SERVICE +-----------INSERT------++-----+----------+ || |INSERT |
+------------+----------------------------------+-----------------+
| Transaction| | |
| ORDER|TABLE OUTBOX|TABLE |
| +------>--------+------+ +------+--v------+-----+ |
| | | | | | | | | |
| +------+--------+------+ +------+--^------+-----+ |
| | |
+-----------------------------------------------+-----------------+|READ|+-------------+---------+|JOB | || +---------+------+ || | Send Message | || +----------------+ || |+-----------------------+
create table outbox (id int auto_increment primary key,message varchar(255),status tinyint
);
public class UserDao {private Connection conn;private MessageSender sender;public UserDao(Connection conn, MessageSender sender) {this.conn = conn;this.sender = sender;}public void updateUser(int userId, String username) throws SQLException {// 开启事务conn.setAutoCommit(false);try {// 更新数据库中的用户信息String sql = "update user set username = ? where id = ?";PreparedStatement stmt = conn.prepareStatement(sql);stmt.setString(1, username);stmt.setInt(2, userId);stmt.executeUpdate();// 将消息写入发件箱表sql = "insert into outbox (message, status) values (?, 0)";stmt = conn.prepareStatement(sql);stmt.setString(1, "用户 " + userId + " 的用户名已经更新为 " + username);stmt.executeUpdate();// 提交事务conn.commit();} catch (SQLException e) {// 回滚事务conn.rollback();throw e;} finally {// 关闭数据库连接conn.close();}}
}
public class OutboxProcessor implements Runnable {private Connection conn;private MessageSender sender;public OutboxProcessor(Connection conn, MessageSender sender) {this.conn = conn;this.sender = sender;}public void run() {while (true) {try {// 使用独占锁锁定发件箱表中的消息String sql = "select * from outbox where status = 0 for update";PreparedStatement stmt = conn.prepareStatement(sql);ResultSet rs = stmt.executeQuery();// 遍历结果集,发送消息并删除发件箱表中的消息while (rs.next()) {// 发送消息到消息sender.send(rs.getString("message"));// 删除发件箱表中的消息sql = "delete from outbox where id = ?";PreparedStatement stmt2 = conn.prepareStatement(sql);stmt2.setInt(1, rs.getInt("id"));stmt2.executeUpdate();}// 休眠一段时间,避免频繁查询发件箱表Thread.sleep(1000);} catch (SQLException | InterruptedException e) {e.printStackTrace();}}}
}
OutboxProcessor 线程,并启动它。比如:public class Main {public static void main(String[] args) {// 创建消息发送器MessageSender sender = new RabbitMQSender();// 创建数据库连接Connection conn = DriverManager.getConnection(...);// 创建 OutboxProcessor 线程OutboxProcessor processor = new OutboxProcessor(conn, sender);// 启动 OutboxProcessor 线程new Thread(processor).start();}
}