分布式事务

Lou.Chen2022年10月9日
大约 29 分钟

1.什么是分布式事务问题

单体应用

单体应用中,例如一个业务操作需要调用三个模块完成,此时数据的一致性由本地事务来保证。

image-20220916103959904

微服务应用

随着业务需求的变化,单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用独立的数

据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致

性问题没法保证。

image-20220919093900496

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。
  • 订单服务:根据采购需求创建订单。
  • 帐户服务:从用户帐户中扣除余额。

由上面的图可知客户端请求Business,由业务模块Business通过RPC请求库存Stroage服务和Order订单服务,订单服务Order再通过RPC请求Acoount账户服务。这个过程无法保证全局事务的一致性,只能保证每个服务的本地事务的一致性。

2.Seate

2.1 什么是Seata

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 ATTCCSAGAXA 事务模式,为用户打造一站式的分布式解决方案。

image-20220916105153579

2.2 Seata原理和设计

2.2.1 定义一个分布式事务

我们可以把一个分布式事务理解成一个包含了若干分支事务全局事务,全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个满足ACID的本地事务。

扩展:事务的ACID特性

  • 原子性(Atomicity):一个事务必须是一系列操作的最小单元,这系列操作的过程中,要么整个执行,要么整个回滚
  • 一致性(Consistency):事务要保证数据库整体数据的完整性和业务数据的一致性,事务成功提交整体数据修改,事务错误则回滚到数据回到原来的状态。
  • 隔离性(Isolation):两个事务的执行都是独立隔离开来的,事务之前不会相互影响,多个事务操作一个对象时会以串行等待的方式保证事务相互之间是隔离的:
  • 持久性(Durability):事务成功提交后,只要修改的数据都会进行持久化(通常是指数据成功保存到磁盘),不会因为异常、宕机而造成数据错误或丢失。
image-20220916153958893

2.2.2 协调分布式事务处理过程的三个组件

  • TC:事务协调者(Transaction Coordinator),负责维护全局事务和分支事务的运行状态,负责协调并驱动全局事务提交和回滚

  • TM:事务管理器(Transaction Manager),定义全局事务的范围,全局事务什么时候开启,什么时候提交,什么时候回滚?

  • RM:资源管理器(Resource Manager),管理分支事务的资源,向TC注册分支事务的状态,并驱动分⽀事务提交和回滚

image-20220916133757016

2.2.3 一个典型的分布式事务过程

  • TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;
  • XID 在微服务调用链路的上下文中传播;
  • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖;
  • TM 向 TC 发起针对 XID 的全局提交或回滚决议;
  • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
image-20220916142418979

2.3 Seata-server的安装

https://github.com/seata/seataopen in new window

Seata 所提供的 seata-server 本质上就是⼀个 SpringBoot。

2.3.1 安装和启动

2.3.2 相关配置

相关的启动配置在 seata/conf/application.yml 文件中

server:
  # seata服务启动端口
  port: 7091

spring:
  application:
    name: seata-server

logging:
  config: classpath:logback-spring.xml
  file:
    path: ${user.home}/logs/seata
  extend:
    logstash-appender:
      destination: 127.0.0.1:4560
    kafka-appender:
      bootstrap-servers: 127.0.0.1:9092
      topic: logback_to_logstash

console:
  user:
    username: seata
    password: seata

seata:
  config:
    # support: nacos, consul, apollo, zk, etcd3
    # 配置中心,这里是注册到一个文件
    type: file
  registry:
    # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    # 注册中心,这里是注册到一个文件
    type: file
  store:
    # 存储位置,这里是注册到一个文件
    # support: file 、 db 、 redis
    mode: file
#  server:
#    service-port: 8091 #If not configured, the default is '${server.port} + 1000'
  security:
    secretKey: SeataSecretKey0c382ef121d778043159209298fd40bf3850a017
    tokenValidityInMilliseconds: 1800000
    ignore:
      urls: /,/**/*.css,/**/*.js,/**/*.html,/**/*.map,/**/*.svg,/**/*.png,/**/*.ico,/console-fe/public/**,/api/v1/auth/login

所有配置的示例在 seata/conf/application.example.yml文件中

2.3.3 其它问题

  • 不要删除 seata/logs/seata_gc.log 文件

  • 启动后的日志文件 seata/logs/start.out

  • 若出现报错,则在seata/log/start.out下找到报错信息

3.Seata各事务模式

3.1 AT模式

https://seata.io/zh-cn/docs/dev/mode/at-mode.htmlopen in new window

AT(Automatic Transaction): 自动的提供代码无侵入自动补偿的事务模式,对代码无入侵。

3.1.1 自动补偿/反向补偿

例如有微服务 A、B、C,现在在 A 中分别去调⽤ B 和 C,为了确保 B 和 C 的调⽤同时成功或者同时失败,那么就要使⽤分布式事务。

假设先调⽤ B 在调⽤ C,B调⽤完成后,事务就提交了,然后调⽤ C ,C 出错了,现在要回滚。此时 B 需要回滚,但是 B 的回滚并不是我们传统意义上所说的回滚,⽽是通过⼀条更新 SQL,将 B 中的数据复原,这个过程就叫做反向补偿。而在AT模式中,这条更新SQL由Seata自动帮我们生成,无需手动编写

3.1.2 AT模式原理

简介:

AT 模式基于 支持本地 ACID 事务关系型数据库:即数据库要支持事务

  • 一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
  • 二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
  • 二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。
  • 一阶段

    • 解析SQL:首先对业务SQL进行解析,得到SQL的类型(UPDATE),表(account),条件(where user_id='u_1001')等相关信息

    • 查询前镜像:根据解析SQL得到的条件信息生成查询语句,定位到该条数据

    • 执行业务SQL

    • 查询后镜像:根据前镜像的结果,根据主键定位该条数据修改后的结果

    • 插入回滚日志:将前后镜像数据分支事务ID全局事务XID等相关信息组成一条回滚日志记录在UNDO_LOG表中

    • 提交前,向TC注册分支

    • 本地事务提交:业务数据的更新和前面步骤生成的UNDO LOG一并提交

    • 将本地事务提交的结果上报给TC

  • 二阶段

    • 二阶段-回滚
      • 收到TC的分支回滚请求,开启一个本地事务,执行如下操作。
      • 通过全局事务XID和Branch ID分支事务ID查找出相应的UNDO LOG记录
      • 数据校验:拿到UNDO LOG记录的后镜像和当前数据进行比较,若相同则执行后续操作。否则说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理
      • 根据UNDO LOG中的前镜像和业务SQL的相关信息生成并执行回滚的语句
      • 提交本地事务,并将本地事务的执行结果(分支事务的回滚结果)上报给TC
    • 二阶段-提交
      • 收到TC的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC
      • 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

3.1.2 案例

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。
  • 订单服务:根据采购需求创建订单。
  • 帐户服务:从用户帐户中扣除余额。
image-20220919161653526
3.1.2.1 SQL准备
-- 每个数据库都要有一个回滚日志表
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

-- account数据库
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `money` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 初始化一条账户语句
INSERT INTO `account`.`account_tbl` (`id`, `user_id`, `money`) VALUES (1, 'u_001', 1000);

-- order数据库
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
  `count` int(11) DEFAULT '0',
  `money` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 	storage数据库
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 初始化一条库存语句
INSERT INTO `storage`.`storage_tbl` (`id`, `commodity_code`, `count`) VALUES (1, '1001', 90);

3.1.2.2 通用服务模块
@RestControllerAdvice
public class GlobalException {
    @ExceptionHandler(RuntimeException.class)
    //这里需要返回内部异常,否则Business控制器中的的catch无法触发
    @ResponseStatus(HttpStatus.INTERNAL_SERVER_ERROR)
    public ResponseBean runtimeException(RuntimeException e) {
        return ResponseBean.error(e.getMessage());
    }
}
public class ResponseBean extends HashMap<String, Object> {
    private static final long serialVersionUID = 1L;

    /**
     * 状态码
     */
    public static final String CODE_TAG = "code";

    /**
     * 返回内容
     */
    public static final String MSG_TAG = "msg";

    /**
     * 数据对象
     */
    public static final String DATA_TAG = "data";

    /**
     * 初始化一个新创建的 ResponseBean 对象,使其表示一个空消息。
     */
    public ResponseBean() {
    }

    /**
     * 初始化一个新创建的 ResponseBean 对象
     *
     * @param code 状态码
     * @param msg  返回内容
     */
    public ResponseBean(int code, String msg) {
        super.put(CODE_TAG, code);
        super.put(MSG_TAG, msg);
    }

    /**
     * 初始化一个新创建的 ResponseBean 对象
     *
     * @param code 状态码
     * @param msg  返回内容
     * @param data 数据对象
     */
    public ResponseBean(int code, String msg, Object data) {
        super.put(CODE_TAG, code);
        super.put(MSG_TAG, msg);
        super.put(DATA_TAG, data);
    }

    /**
     * 返回成功消息
     *
     * @return 成功消息
     */
    public static ResponseBean success() {
        return ResponseBean.success("操作成功");
    }

    /**
     * 返回成功数据
     *
     * @return 成功消息
     */
    public static ResponseBean success(Object data) {
        return ResponseBean.success("操作成功", data);
    }

    /**
     * 返回成功消息
     *
     * @param msg 返回内容
     * @return 成功消息
     */
    public static ResponseBean success(String msg) {
        return ResponseBean.success(msg, null);
    }

    /**
     * 返回成功消息
     *
     * @param msg  返回内容
     * @param data 数据对象
     * @return 成功消息
     */
    public static ResponseBean success(String msg, Object data) {
        return new ResponseBean(HttpStatus.SUCCESS, msg, data);
    }

    /**
     * 返回错误消息
     *
     * @return
     */
    public static ResponseBean error() {
        return ResponseBean.error("操作失败");
    }

    /**
     * 返回错误消息
     *
     * @param msg 返回内容
     * @return 警告消息
     */
    public static ResponseBean error(String msg) {
        return ResponseBean.error(msg, null);
    }

    /**
     * 返回错误消息
     *
     * @param msg  返回内容
     * @param data 数据对象
     * @return 警告消息
     */
    public static ResponseBean error(String msg, Object data) {
        return new ResponseBean(HttpStatus.ERROR, msg, data);
    }

    /**
     * 返回错误消息
     *
     * @param code 状态码
     * @param msg  返回内容
     * @return 警告消息
     */
    public static ResponseBean error(int code, String msg) {
        return new ResponseBean(code, msg, null);
    }

    /**
     * 方便链式调用
     *
     * @param key   键
     * @param value 值
     * @return 数据对象
     */
    @Override
    public ResponseBean put(String key, Object value) {
        super.put(key, value);
        return this;
    }
}
public class HttpStatus {
    /**
     * 操作成功
     */
    public static final int SUCCESS = 200;

    /**
     * 对象创建成功
     */
    public static final int CREATED = 201;

    /**
     * 请求已经被接受
     */
    public static final int ACCEPTED = 202;

    /**
     * 操作已经执行成功,但是没有返回数据
     */
    public static final int NO_CONTENT = 204;

    /**
     * 资源已被移除
     */
    public static final int MOVED_PERM = 301;

    /**
     * 重定向
     */
    public static final int SEE_OTHER = 303;

    /**
     * 资源没有被修改
     */
    public static final int NOT_MODIFIED = 304;

    /**
     * 参数列表错误(缺少,格式不匹配)
     */
    public static final int BAD_REQUEST = 400;

    /**
     * 未授权
     */
    public static final int UNAUTHORIZED = 401;

    /**
     * 访问受限,授权过期
     */
    public static final int FORBIDDEN = 403;

    /**
     * 资源,服务未找到
     */
    public static final int NOT_FOUND = 404;

    /**
     * 不允许的http方法
     */
    public static final int BAD_METHOD = 405;

    /**
     * 资源冲突,或者资源被锁
     */
    public static final int CONFLICT = 409;

    /**
     * 不支持的数据,媒体类型
     */
    public static final int UNSUPPORTED_TYPE = 415;

    /**
     * 系统内部错误
     */
    public static final int ERROR = 500;

    /**
     * 接口未实现
     */
    public static final int NOT_IMPLEMENTED = 501;
}

3.1.2.3 账户服务
# 应用名称
spring.application.name=business
# 应用服务 WEB 访问端口
server.port=8080

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
@Mapper
public interface AccountMapper {
    /**
     * 更新账户
     * @param account
     * @param money
     * @return
     */
    @Update("update account_tbl set money=money-#{money} where user_id=#{account}")
    int updateAccount(@Param("account") String account, @Param("money") Double money);

    /**
     * 根据账户获取金额
     * @param account
     * @return
     */
    @Select("select money from account_tbl where user_id=#{account}")
    Double getMoneyByAccount(@Param("account") String account);
}

@Service
public class AccountService {

    @Autowired
    private AccountMapper accountMapper;

    /**
     * 扣款
     * @param account
     * @param money
     * @return
     */
    public boolean deductAccount(String account,Double money) {
        //更新账户
        accountMapper.updateAccount(account, money);
        //查询账户余额
        Double balanceMoney = accountMapper.getMoneyByAccount(account);
        if (balanceMoney >= 0) {
            return true;
        }
        throw new RuntimeException("账户余额不足,扣款失败!");
    }

}
@RestController
public class AccountController {

    @Autowired
    private AccountService accountService;

    @PostMapping("/deduct")
    public ResponseBean deduct(String account,Double money) {
        if (accountService.deductAccount(account, money)) {
            return ResponseBean.success("扣款成功");
        }
        return ResponseBean.error("扣款失败");
    }

}

//保证能够扫描到全局捕获异常类
@SpringBootApplication(scanBasePackages = "org.lc")
public class AccountApplication {
    public static void main(String[] args) {
        SpringApplication.run(AccountApplication.class, args);
    }
}
3.1.2.4 订单服务
# 应用名称
spring.application.name=order
# 应用服务 WEB 访问端口
server.port=1112
#下面这些内容是为了让MyBatis映射
#指定Mybatis的Mapper文件
mybatis.mapper-locations=classpath:mappers/*xml
#指定Mybatis的实体目录
mybatis.type-aliases-package=org.lc.order.mybatis.entity
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 数据源名称
spring.datasource.name=defaultDataSource
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/order?serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=123456

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
@Mapper
public interface OrderMapper {
    @Insert("insert into order_tbl(user_id,commodity_code,count,money) values(#{userId},#{commodityCode},#{count},#{money})")
    int createOrder(@Param("userId") String userId, @Param("commodityCode") String commodityCode,@Param("count") Integer count,@Param("money") Double money);
}

@FeignClient("account")
public interface AccountFeign {
    @PostMapping("/deduct")
    ResponseBean deduct(@RequestParam("account") String account, @RequestParam("money") Double money);
}

@Service
public class OrderService {
    @Autowired
    private AccountFeign accountFeign;

    @Autowired
    private OrderMapper orderMapper;

    public boolean createOrder(String account, String productId, Integer count) {
        //先去账户扣款,默认商品单价为100
        ResponseBean deduct = accountFeign.deduct(account, count * 100.0);
        if (StringUtils.equals(deduct.get("code").toString(), "200")) {
            //账户扣款成功则去创建订单
            int order = orderMapper.createOrder(account, productId, count, count * 100.0);
            return order == 1;
        }
        return false;
    }
}

@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    @PostMapping("/createOrder")
    public ResponseBean createOrder(String account, String productId, Integer count) {
        if (orderService.createOrder(account, productId, count)) {
            return ResponseBean.success("下单成功");
        }
        return ResponseBean.error("下单失败");
    }
}

//保证能够扫描到全局捕获异常类
@SpringBootApplication(scanBasePackages = "org.lc")
@EnableEurekaClient
@EnableFeignClients
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

}
3.1.2.5 库存服务
# 应用名称
spring.application.name=storage
# 应用服务 WEB 访问端口
server.port=1113
#下面这些内容是为了让MyBatis映射
#指定Mybatis的Mapper文件
mybatis.mapper-locations=classpath:mappers/*xml
#指定Mybatis的实体目录
mybatis.type-aliases-package=org.lc.storage.mybatis.entity
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 数据源名称
spring.datasource.name=defaultDataSource
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/storage?serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=123456

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
@Mapper
public interface StorageMapper {
    @Update("update storage_tbl set count=count-#{count} where commodity_code=#{productId}")
    int deductStorage(@Param("productId") String productId, @Param("count") Integer count);

    @Select("select count from storage_tbl where commodity_code=#{productId}")
    int getCountStorage(String productId);
}

@Service
public class StorageService {
    @Autowired
    private StorageMapper storageMapper;

    public boolean deduct(String productId, Integer count) {
        storageMapper.deductStorage(productId, count);
        int countStorage = storageMapper.getCountStorage(productId);
        if (countStorage > 0) {
            return true;
        }
        throw new RuntimeException("库存不足");
    }
}

@RestController
public class StorageController {

    @Autowired
    private StorageService storageService;

    @PostMapping("/deduct")
    public ResponseBean deduct(String productId, Integer count) {
        if (storageService.deduct(productId, count)) {
            return ResponseBean.success("扣库存成功");
        }
        return ResponseBean.error("扣库存失败");
    }
}

//保证能够扫描到全局捕获异常类
@SpringBootApplication(scanBasePackages = "org.lc")
public class StorageApplication {

    public static void main(String[] args) {
        SpringApplication.run(StorageApplication.class, args);
    }

}
3.1.2.6 业务调用服务
# 应用名称
spring.application.name=business
# 应用服务 WEB 访问端口
server.port=8080

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
@FeignClient("order")
public interface OrderFeign {
    @PostMapping("/createOrder")
    ResponseBean createOrder(@RequestParam("account") String account,@RequestParam("productId") String productId,@RequestParam("count") Integer count);
}

@FeignClient("storage")
public interface StorageFeign {
    @PostMapping("/deduct")
    ResponseBean deduct(@RequestParam("productId") String productId,@RequestParam("count") Integer count);
}

@Service
public class BusinessService {
    @Autowired
    private OrderFeign orderFeign;
    @Autowired
    private StorageFeign storageFeign;

    @GlobalTransactional
    public void purchase(String account, String productId, Integer count) {
        //下订单
        orderFeign.createOrder(account, productId, count);
        //扣库存
        storageFeign.deduct(productId, count);
    }
}

@RestController
public class BusinessController {
    @Autowired
    private BusinessService businessService;

    @PostMapping("/order")
    public ResponseBean order(String account, String productId, Integer count) {
        try {
            businessService.purchase(account, productId, count);
            return ResponseBean.success("下单成功");
        } catch (Exception e) {
            e.printStackTrace();
           return ResponseBean.error("下单失败",e.getMessage());
        }
    }
}
@SpringBootApplication
@EnableFeignClients
@EnableEurekaClient
public class BusinessApplication {

    public static void main(String[] args) {
        SpringApplication.run(BusinessApplication.class, args);
    }

}
3.1.2.7 注册中心
# 应用名称
spring.application.name=eureka
# 应用服务 WEB 访问端口
server.port=8761

eureka.client.fetch-registry=false
eureka.client.register-with-eureka=false
@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaApplication.class, args);
    }

}
3.1.2.8 file.conf & registry.conf

以下两个文件在 账户服务订单服务库存服务业务调用服务中都要存在

通信配置文件file.conf

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = true
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThread-prefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}
service {
  #transaction service group mapping
  vgroupMapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #degrade, current not support
  enableDegrade = false
  #disable seata
  disableGlobalTransaction = false
}

client {
  rm {
    asyncCommitBufferLimit = 10000
    lock {
      retryInterval = 10
      retryTimes = 30
      retryPolicyBranchRollbackOnConflict = true
    }
    reportRetryCount = 5
    tableMetaCheckEnable = false
    reportSuccessEnable = false
  }
  tm {
    commitRetryCount = 5
    rollbackRetryCount = 5
  }
  undo {
    dataValidation = true
    logSerialization = "jackson"
    logTable = "undo_log"
  }
  log {
    exceptionRate = 100
  }
}

seata注册位置配置文件registry.conf

本案例注册到eureka上

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "localhost"
    namespace = ""
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = "0"
    password = ""
    timeout = "0"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig
  type = "file"

  nacos {
    serverAddr = "localhost"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}
3.1.2.8 请求示例

http://localhost:8080/order?account=u_001&productId=1001&count=10open in new window

若余额不足则下单失败,已完成的业务操作事务会进行回滚

{
    "msg": "下单失败",
    "code": 500,
    "data": "[500] during [POST] to [http://order/createOrder?account=u_001&productId=1001&count=10] [OrderFeign#createOrder(String,String,Integer)]: [{\"msg\":\"[500] during [POST] to [http://account/deduct?account=u_001&money=1000.0] [AccountFeign#deduct(String,Double)]: [{\\\"msg\\\":\\\"账户余额不足,扣款失败!\\\",\\\"code\\\":500,\\\"data\\\":null}]\",\"code\":500,\"data\":null}]"
}

3.2 TCC模式

TCC(Try Confirm Cancel): 手动实现分布式事务的方式,对代码入侵较高。

3.2.1 TCC模式原理

https://seata.io/zh-cn/docs/dev/mode/tcc-mode.htmlopen in new window

image-20220920094317625

TCC 模式,不依赖于底层数据资源的事务支持:即数据库可以不支持事务,

  • 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
  • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
  • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

3.2.2 案例

3.2.2.1 SQL准备
-- tcc_account数据库
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `userId` varchar(255) DEFAULT NULL,
  `money` double DEFAULT '0',
  `freezeMoney` double DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `account_tbl` (`id`, `userId`, `money`, `freezeMoney`)
VALUES
	(1,'u_001',1000,0);
	
	
-- tcc_order数据库
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `userId` varchar(255) DEFAULT NULL,
  `productId` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  `money` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
	
	
-- 	tcc_storage数据库
	CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `productId` varchar(255) DEFAULT NULL,
  `count` int(11) DEFAULT '0',
  `freezeCount` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`productId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO `storage_tbl` (`id`, `productId`, `count`, `freezeCount`)
VALUES
	(1,'1001',100,0);

3.2.2.2 通用服务模块

全局异常处理统一数据返回参考AT模式

@LocalTCC
public interface AccountServiceApi {

    /**
     * 一阶段提交
     * 这个方法用来检查资源,例如检查用户是否存在,检查账户余额是否充足等
     * prepare方法是开发者手动调用的,commit 和 rollback 为seata 根具所有的prepare执行的情况,自动调用执行的。例如:只要有一个prepare调用失败,则执行rollback回滚。若全部prepare全部调用成功,则执行提交方法
     * @param actionContext
     * @param userId
     * @param money
     * @return
     */
    @TwoPhaseBusinessAction(name = "accountServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    @RequestMapping("/account/deduct/prepare")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("money") @BusinessActionContextParameter(paramName = "money") Double money);

    /**
     * 二阶段提交
     * @param actionContext
     * @return
     */
    @RequestMapping("/account/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    /**
     * 二阶段回滚
     * @param actionContext
     * @return
     */
    @RequestMapping("/account/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}
@LocalTCC
public interface OrderServiceApi {

    @TwoPhaseBusinessAction(name = "orderServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    @RequestMapping("/order/create/prepare")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("userId") @BusinessActionContextParameter(paramName = "userId") String userId, @RequestParam("productId") @BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/order/create/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/order/create/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);

}

@LocalTCC
public interface StorageServiceApi {

    @TwoPhaseBusinessAction(name = "storageServiceApi", commitMethod = "commit", rollbackMethod = "rollback")
    @RequestMapping("/storage/deduct/prepare")
    boolean prepare(@RequestBody BusinessActionContext actionContext, @RequestParam("productId") @BusinessActionContextParameter(paramName = "productId") String productId, @RequestParam("count") @BusinessActionContextParameter(paramName = "count") Integer count);

    @RequestMapping("/storage/deduct/commit")
    boolean commit(@RequestBody BusinessActionContext actionContext);

    @RequestMapping("/storage/deduct/rollback")
    boolean rollback(@RequestBody BusinessActionContext actionContext);
}

3.2.2.3 账户服务
@Data
public class Account {
    private Integer id;
    private String userId;
    private Double money;
    private Double freezeMoney;
}

@Mapper
public interface AccountMapper {

    @Select("select * from account_tbl where userId=#{userId}")
    Account getAccountByUserId(String userId);

    @Update("update account_tbl set money=#{money},freezeMoney=#{freezeMoney} where userId=#{userId}")
    int updateAccount(Account account);
}

@Service
public class AccountService {

    private static final Logger logger = LoggerFactory.getLogger(AccountService.class);

    @Autowired
    private AccountMapper accountMapper;

    /**
     * 一阶段的事务准备工作
     * 检查账户是否存在。检查账户余额是否充足,需要先把要扣的钱冻结起来
     *
     * @param userId
     * @param money
     * @return
     */
    //本地事务和分布式事务不冲突
    @Transactional(rollbackFor = Exception.class)
    public boolean prepare(String userId, Double money) {
        Account account = accountMapper.getAccountByUserId(userId);
        if (account == null) {
            throw new RuntimeException("账户不存在");
        }
        if (account.getMoney() < money) {
            throw new RuntimeException("账户余额不足,预扣款失败");
        }
        //把预扣款等金额冻结起来
        account.setFreezeMoney(account.getFreezeMoney() + money);
        account.setMoney(account.getMoney() - money);
        //再重新进行更新
        int result = accountMapper.updateAccount(account);
        logger.info("账户:{} 预处理扣款:{}", userId, money);
        return result == 1;
    }

    /**
     * 二阶段提交操作
     * 真正的扣款操作
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean commit(BusinessActionContext actionContext) {
        //获取 prepare 阶段的两个参数
        //要扣钱的用户id
        String userId = (String) actionContext.getActionContext("userId");
        //要扣的钱
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() < money) {
            throw new RuntimeException("账户余额不足,扣款失败");
        }
        //还原预处理时的冻结的钱
        account.setFreezeMoney(account.getFreezeMoney() - money);
        int result = accountMapper.updateAccount(account);
        logger.info("账户:{} 实际扣款:{}", userId, money);
        return result == 1;
    }

    /**
     * 二阶段回滚
     * 恢复之前的状态,即被冻结的钱
     *
     * @param actionContext
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    public boolean rollback(BusinessActionContext actionContext) {
        //获取 prepare 阶段的两个参数
        //要扣钱的用户id
        String userId = (String) actionContext.getActionContext("userId");
        //要扣的钱
        Double money = ((BigDecimal) actionContext.getActionContext("money")).doubleValue();
        Account account = accountMapper.getAccountByUserId(userId);
        if (account.getFreezeMoney() >= money) {
            account.setFreezeMoney(account.getFreezeMoney() - money);
            account.setMoney(account.getMoney() + money);
            int result = accountMapper.updateAccount(account);
            logger.info("账户:{} 恢复被冻结的金额:{}", userId, money);
            return result == 1;
        }
        logger.info("账户:{} 冻结的金额已被释放",userId);
        return true;
    }
}

@RestController
public class AccountController implements AccountServiceApi {

    @Autowired
    private AccountService accountService;


    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, Double money) {
        return accountService.prepare(userId,money);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return accountService.commit(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return accountService.rollback(actionContext);
    }
}

@SpringBootApplication(scanBasePackages = "org.lc")
public class AccountApplication {

    public static void main(String[] args) {
        SpringApplication.run(AccountApplication.class, args);
    }

}
# 应用名称
spring.application.name=account
# 应用服务 WEB 访问端口
server.port=1111
#下面这些内容是为了让MyBatis映射
#指定Mybatis的Mapper文件
mybatis.mapper-locations=classpath:mappers/*xml
#指定Mybatis的实体目录
mybatis.type-aliases-package=org.lc.account.mybatis.entity
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 数据源名称
spring.datasource.name=defaultDataSource
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/tcc_account?serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=123456

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group


3.2.2.4 订单服务
@Mapper
public interface OrderMapper {
    @Insert("insert into order_tbl(userId,productId,count,money) values(#{userId},#{productId},#{count},#{money})")
    int createOrder(@Param("userId") String userId,@Param("productId") String productId, @Param("count") Integer count,@Param("money") double money);
}

@FeignClient("account")
public interface AccountFeign extends AccountServiceApi {

}

	@Service
public class OrderService {

    private static final Logger logger = LoggerFactory.getLogger(OrderService.class);

    @Autowired
    private AccountFeign accountFeign;

    @Autowired
    private OrderMapper orderMapper;

    @Transactional(rollbackFor = Exception.class)
    public boolean prepare(BusinessActionContext actionContext, String userId, String productId, Integer count) {
        //假设每件商品100元。先去执行账户扣款操作
        boolean prepare = accountFeign.prepare(actionContext, userId, count * 100.0);
        logger.info("账户:{} ,购买商品:{}, 共购买了:{} 件,预下单成功", userId, productId, count);
        return prepare;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean commit(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        //实际去减库存
        int result = orderMapper.createOrder(userId, productId, count, count * 100.0);
        logger.info("账户:{} ,购买商品:{}, 共购买了:{} 件,实际下单成功", userId, productId, count);
        return result==1;
    }

    @Transactional(rollbackFor = Exception.class)
    public boolean rollback(BusinessActionContext actionContext) {
        String userId = (String) actionContext.getActionContext("userId");
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        logger.info("账户:{} ,购买商品:{}, 共购买了:{} 件,订单回滚成功", userId, productId, count);
        return true;
    }
}
@RestController
public class OrderController implements OrderServiceApi {

    @Autowired
    private OrderService orderService;

    @Override
    public boolean prepare(BusinessActionContext actionContext, String userId, String productId, Integer count) {
        return orderService.prepare(actionContext, userId, productId, count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return orderService.commit(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return orderService.rollback(actionContext);
    }
}

@SpringBootApplication(scanBasePackages = "org.lc")
@EnableEurekaClient
@EnableFeignClients
public class OrderApplication {

    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }

}

# 应用名称
spring.application.name=order
# 应用服务 WEB 访问端口
server.port=1112
#下面这些内容是为了让MyBatis映射
#指定Mybatis的Mapper文件
mybatis.mapper-locations=classpath:mappers/*xml
#指定Mybatis的实体目录
mybatis.type-aliases-package=org.lc.order.mybatis.entity
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 数据源名称
spring.datasource.name=defaultDataSource
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/tcc_order?serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=123456

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group

3.2.2.5 库存服务
@Data
public class Storage {
    private Integer id;
    private String productId;
    private Integer count;
    private Integer freezeCount;
}

@Mapper
public interface StorageMapper {

    @Select("select * from storage_tbl where productId=#{productId}")
    Storage getStorageProductId(String productId);

    @Update("update storage_tbl set count=#{count},freezeCount=#{freezeCount} where productId=#{productId}")
    int updateStorage(Storage storage);
}

@Service
public class StorageService {
    private static final Logger logger = LoggerFactory.getLogger(StorageService.class);
    @Autowired
    private StorageMapper storageMapper;

    public boolean prepare(String productId, Integer count) {
        Storage storage = storageMapper.getStorageProductId(productId);
        if (storage == null) {
            throw new RuntimeException("商品不存在");
        }
        if (storage.getCount() < count) {
            throw new RuntimeException("库存不足,预扣库存失败");
        }
        storage.setFreezeCount(storage.getFreezeCount() + count);
        storage.setCount(storage.getCount() - count);
        int result = storageMapper.updateStorage(storage);
        logger.info("商品:{},库存冻结数量:{}", productId, count);
        return result == 1;
    }

    public boolean commit(BusinessActionContext actionContext) {
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageProductId(productId);
        if (storage.getFreezeCount() < count) {
            throw new RuntimeException("库存不足,扣库存失败");
        }
        storage.setFreezeCount(storage.getFreezeCount() - count);
        int result = storageMapper.updateStorage(storage);
        logger.info("商品:{},实际扣库存:{}", productId, count);
        return result == 1;
    }

    public boolean rollback(BusinessActionContext actionContext) {
        String productId = (String) actionContext.getActionContext("productId");
        Integer count = (Integer) actionContext.getActionContext("count");
        Storage storage = storageMapper.getStorageProductId(productId);
        if (storage.getFreezeCount() >= count) {
            storage.setFreezeCount(storage.getFreezeCount() - count);
            storage.setCount(storage.getCount() + count);
            int result = storageMapper.updateStorage(storage);
            logger.info("商品:{},释放库存数量:{}", productId, count);
            return result == 1;
        }
        logger.info("商品:{},被冻结的商品已经释放", productId);
        return true;
    }
}
@RestController
public class StorageController implements StorageServiceApi {

    @Autowired
    private StorageService storageService;


    @Override
    public boolean prepare(BusinessActionContext actionContext, String productId, Integer count) {
        return storageService.prepare(productId,count);
    }

    @Override
    public boolean commit(BusinessActionContext actionContext) {
        return storageService.commit(actionContext);
    }

    @Override
    public boolean rollback(BusinessActionContext actionContext) {
        return storageService.rollback(actionContext);
    }
}

@SpringBootApplication(scanBasePackages = "org.lc")
public class StorageApplication {

    public static void main(String[] args) {
        SpringApplication.run(StorageApplication.class, args);
    }

}

# 应用名称
spring.application.name=storage
# 应用服务 WEB 访问端口
server.port=1113
#下面这些内容是为了让MyBatis映射
#指定Mybatis的Mapper文件
mybatis.mapper-locations=classpath:mappers/*xml
#指定Mybatis的实体目录
mybatis.type-aliases-package=org.lc.storage.mybatis.entity
# 数据库驱动:
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# 数据源名称
spring.datasource.name=defaultDataSource
# 数据库连接地址
spring.datasource.url=jdbc:mysql://localhost:3306/tcc_storage?serverTimezone=UTC
# 数据库用户名&密码:
spring.datasource.username=root
spring.datasource.password=123456

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
3.2.2.6 业务调用服务
@FeignClient("order")
public interface OrderFeign extends OrderServiceApi {

}
@FeignClient("storage")
public interface StorageFeign extends StorageServiceApi {

}

@Service
public class BusinessService {
    @Autowired
    private OrderFeign orderFeign;
    @Autowired
    private StorageFeign storageFeign;

    @GlobalTransactional
    public void purchase(String account, String productId, Integer count) {
        //获取全局事务的xid
        String xid = RootContext.getXID();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        //下订单
        orderFeign.prepare(actionContext, account, productId, count);
        //扣库存
        storageFeign.prepare(actionContext, productId, count);
    }
}

@RestController
public class BusinessController {
    @Autowired
    private BusinessService businessService;

    @PostMapping("/order")
    public ResponseBean order(String account, String productId, Integer count) {
        try {
            businessService.purchase(account, productId, count);
            return ResponseBean.success("下单成功");
        } catch (Exception e) {
            e.printStackTrace();
           return ResponseBean.error("下单失败",e.getMessage());
        }
    }
}
@SpringBootApplication
@EnableFeignClients
@EnableEurekaClient
public class BusinessApplication {

    public static void main(String[] args) {
        SpringApplication.run(BusinessApplication.class, args);
    }

}

# 应用名称
spring.application.name=business
# 应用服务 WEB 访问端口
server.port=8080

eureka.client.service-url.defaultzone=http://localhost:8761/eureka
spring.cloud.alibaba.seata.tx-service-group=my_test_tx_group
3.2.2.7 注册中心

参考AT模式

3.2.2.8 file.conf & registry.conf

参考AT模式

3.2.2.9 请求示例

http://localhost:8080/order?account=u_001&productId=1001&count=100open in new window

模拟下单操作,若库存不足,则会回滚

{
    "msg": "下单失败",
    "code": 500,
    "data": "[500] during [GET] to [http://storage/storage/deduct/prepare?productId=1001&count=100] [StorageFeign#prepare(BusinessActionContext,String,Integer)]: [{\"msg\":\"库存不足,预扣库存失败\",\"code\":500,\"data\":null}]"
}

3.3 XA模式

XA 模式是 X/Open 组织定义的分布式事务处理标准。

XA 规范描述了全局的事务管理器与局部的资源管理器之间的接口,利用 XA 规范可以实现多个资源,例如数据库、MQ 等,在同一个事务中进行访问,这样就可以使得数据库的 ACID 属性在跨应用程序的时候依然有效。

目前所有主流的数据库基本上都支持 XA 协议,包括 MySQL。

MySQL 中的 XA 事务分为两种:

  • 内部 XA:内部 XA 可以用作同一个 MySQL 实例下,跨多引擎的事务,这种一般使用 binlog 作为协调者。
  • 外部 XA:外部 XA 可以参与到外部的分布式事务中,这种一般需要应用层介入作为协调者。

3.3.1 XA模式原理

在 Seata 定义的分布式事务框架内,利用事务资源(数据库、消息服务等)对 XA 协议的支持,以 XA 协议的机制来管理分支事务的一种 事务模式。

image-20220921162110433
  • 首先TM开启全局分布式事务

  • 一阶段

    • 不同的微服务开始执行,各个微服务(RM)依次执行 xa start->SQL->xa end->xa prepare . xa prepare会将当前分支事务的状态报告给TC
  • 二阶段

    • 二阶段回滚:

      • TC判断各个分支如果有某一个分支执行失败,则就通知各个分支事务一起回滚

        这里的回滚指的是MySQL事务中的回滚

    • 二阶段提交:

      • TC判断各个分支如果全部分支执行成功,就通知各个分支事务提交。

3.3.2 案例

和AT模式大致相同,参考AT模式,这里介绍不同的地方。XA模式靠的是MySQL中的XA事务的支持,所以这里不需要undo log表

3.3.2.1 SQL准备
-- xa_account数据库
CREATE TABLE `account_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `money` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 初始化一条账户语句
INSERT INTO `xa_account`.`account_tbl` (`id`, `user_id`, `money`) VALUES (1, 'u_001', 1000);

-- xa_order数据库
CREATE TABLE `order_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `user_id` varchar(255) DEFAULT NULL,
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
  `count` int(11) DEFAULT '0',
  `money` int(11) DEFAULT '0',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- 	xa_storage数据库
CREATE TABLE `storage_tbl` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `commodity_code` varchar(255) DEFAULT NULL COMMENT '商品编号',
  `count` int(11) DEFAULT '0',
  PRIMARY KEY (`id`),
  UNIQUE KEY `commodity_code` (`commodity_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- 初始化一条库存语句
INSERT INTO `xa_storage`.`storage_tbl` (`id`, `commodity_code`, `count`) VALUES (1, '1001', 90);
3.3.2.2 修改和添加相关配置

order,account,sotrage模块:

  • 依次修改pom.xml中的MySQL驱动版本
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
            <version>8.0.11</version>
        </dependency>
  • 依次添加application.propertie配置
#关闭数据源自动代理配置,这里我选择手动配置数据源
seata.enable-auto-data-source-proxy=false
  • 数据库连接依次修改
spring.datasource.url=jdbc:mysql://localhost:3306/xa_storage?serverTimezone=UTC&useSSL=false
spring.datasource.url=jdbc:mysql://localhost:3306/xa_order?serverTimezone=UTC&useSSL=false
spring.datasource.url=jdbc:mysql://localhost:3306/xa_account?serverTimezone=UTC&useSSL=false
  • 依次添加XA数据源配置
@Configuration
public class DataSourceConfig {

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean
    @Primary
    public DataSource dataSourceProxy(DruidDataSource druidDataSource) {
        return new DataSourceProxyXA(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactory(DataSource dataSourceProxy) throws Exception {
        SqlSessionFactoryBean bean=new SqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        bean.setTransactionFactory(new SpringManagedTransactionFactory());
        return bean.getObject();
    }
}

  • 依次修改启动类,排除自动配置的数据源
@SpringBootApplication(scanBasePackages = "org.lc", exclude = DataSourceAutoConfiguration.class)
3.3.2.3 调整业务
  • 修改AccountService中的deductAccount方法
    public boolean deductAccount(String account,Double money) {
        //查询账户余额
        Double balanceMoney = accountMapper.getMoneyByAccount(account);
        if (balanceMoney < money) {
            throw new RuntimeException("账户余额不足,扣款失败!");
        }
        //更新账户
        int result = accountMapper.updateAccount(account, money);
        return result == 1;
    }
  • 修改StorageService中的deduct方法
    public boolean deduct(String productId, Integer count) {
        int countStorage = storageMapper.getCountStorage(productId);
        if (countStorage < count) {
            throw new RuntimeException("库存不足");
        }
        int result = storageMapper.deductStorage(productId, count);
        return result == 1;
    }
3.3.2.4 请求示例

http://localhost:8080/order?account=u_001&productId=1001&count=100open in new window

若库存不足,则会出现如下提示:

{
    "msg": "下单失败",
    "code": 500,
    "data": "[500] during [POST] to [http://storage/deduct?productId=1001&count=100] [StorageFeign#deduct(String,Integer)]: [{\"msg\":\"库存不足\",\"code\":500,\"data\":null}]"
}

4.分布式事务总结

**XA:**X/Open定义的分布式事务处理解决方案,不同数据库厂商根据此规范进行各自的实现。例如:MySQL、Oracle、SQLServer都实现了XA分布式事务规范

  • 二阶段提交模式
    • 一阶段:业务SQL放在XA分支事务中执行,即各个微服务(RM)依次执行 xa start->SQL->xa end->xa prepare . xa prepare会将当前分支事务的状态报告给TC。此操作也就相当于预提交操作。
    • 二阶段:
      • 二阶段回滚:TC判断各个分支如果有某一个分支执行失败,则就通知各个分支事务一起回滚
      • 二阶段提交:TC判断各个分支是否否执行成功,若都成功则提交。
  • 特点
    • 简单,开发较容易
    • 对资源锁定的时间比较长,会导致并发度降低(因为 XA 是两阶段提交,从一阶段开始,到二阶段回滚或者二阶段提交之间,数据都是处于被锁定的状态)
  • 缺点
    • 同步阻塞:所有分支事务都是阻塞的,当两阶段在执行的过程中,资源被锁定,其他的第三方应用都无法访问。
    • 单点故障。

AT:提供无侵入的自动补偿机制

  • 二阶段提交模式
    • 一阶段:
      • 首先各事务解析业务SQL,得到SQL类型,条件,表
      • 根据解析的结果定位数据,得到前镜像
      • 执行业务SQL
      • 根据前镜像通过主键定位数据,得到后镜像,也就是修改后的数据
      • 然后把前后镜像,全局事务XID和分支事务id等其它业务数据作为一条回滚日志插入到undo log表中
      • 然后进行本地事务的提交,并向TC报告执行的结果
    • 二阶段:
      • 二阶段回滚:若TC收到某个事务的RM的回滚分支请求,那么会根据全局事务XID和分支事务id通过undo log表找到该条回滚日志,根据前后镜像生成回滚sql并执行。然后提交本地事务,并向TC报告执行的结果
      • 二阶段提交:若TC收到各个分支的RM的提交请求,就会执行提交操作。将异步和批量地删除相应 UNDO LOG 记录

TCC:提供自定义的prepare、commit、rollback。两阶段的内容全部自定义,对代码入侵较高

  • 二阶段提交模式
    • 一阶段 :prepare 行为:调用 自定义 的 prepare 逻辑。
    • 二阶段:
      • 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
      • 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
  • 特点
    • 资源锁定粒度小。
    • 数据最终一致性。
    • 可靠性。
  • 缺点:
    • 业务入侵过强。
    • try、confirm、rollback 三个方法都应该有重试机制,有重试机制就需要保证幂等性了,这样代码就复杂了

SAGA:将一个长事务拆分为多个短事务,由由 Saga 的事务协调器进行协调。

  • 特点:
    • 并发能力高。
    • 开发工作量大一些(跟 XA 比)。
    • 一致性较弱。

https://juejin.cn/post/6844903936718012430#heading-24open in new window

https://seata.io/zh-cn/docs/dev/mode/tcc-mode.htmlopen in new window