Seata分布式事务框架使用

Seata的介绍见 分布式事务解决方案和Seata的介绍

下载与运行

下载地址: https://github.com/seata/seata/releases

这里选择v1.0.0版本下载到本地解压(几个月前还是1.0.0来着,现在最新版本已经到1.2.0了 ←_← ,不过我觉得使用区别最大的还是0.9.0到1.0.0),进入conf/目录,修改file.conf(主要是修改数据库连接配置,建议先备份):

service {
  #transaction service group mapping
  vgroup_mapping.my_test_tx_group = "default"
  #only support when registry.type=file, please don't set multiple addresses
  default.grouplist = "127.0.0.1:8091"
  #disable seata
  disableGlobalTransaction = false
}

## transaction log store, only used in seata-server
store {
  ## store mode: file、db
  mode = "db"

  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
  }

  ## 将这部分修改为自己的数据库连接信息
  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
    db-type = "mysql"
    driver-class-name = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata"
    user = "root"
    password = "123456"
  }
}

在mysql中新建seata数据库,使用sql脚本建表:(现在的版本好像是启动了Seata服务会自动建立这个数据库,如果没有自动建立数据库可以去github上面找建表sql脚本)

https://github.com/seata/seata/blob/develop/script/server/db/mysql.sql

然后修改conf/registry.conf文件,配置nacos信息(seata需要注册到注册中心):

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
  }
...

启动nacos服务,再运行seata目录下的bin/startup.bat,即可启动seata服务。

使用

数据库环境搭建

这里我们会创建三个微服务, 一个订单服务, 一个库存服务, 一个账户服务。

当用户下单时,会在订单服务中创建一个订单, 然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。

该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。

建库建表:

create database seata_order;
create database seata_storage;
create database seata_account;

use seata_order;

DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  `user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `count` int(11) DEFAULT NULL COMMENT '数量',
  `money` decimal(11, 0) DEFAULT NULL COMMENT '金额',
  `status` int(1) DEFAULT NULL COMMENT '订单状态:  0:创建中 1:已完结'
) ENGINE = InnoDB  AUTO_INCREMENT=7 DEFAULT CHARSET = utf8;

/* 回滚日志表 */
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

use seata_storage;

DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage`  (
  `id` bigint(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
  `product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
  `total` int(11) DEFAULT NULL COMMENT '总库存',
  `used` int(11) DEFAULT NULL COMMENT '已用库存',
  `residue` int(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE = InnoDB auto_increment=2 CHARSET = utf8;

INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

use seata_account;

CREATE TABLE `t_account`  (
  `id` bigint(11) NOT NULL auto_increment PRIMARY KEY COMMENT 'id',
  `user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
  `total` decimal(10, 0) DEFAULT NULL COMMENT '总额度',
  `used` decimal(10, 0) DEFAULT NULL COMMENT '已用余额',
  `residue` decimal(10, 0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE = InnoDB auto_increment=2 CHARSET = utf8;

INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

其中undo_log表是SEATA AT模式需要的回滚日志表。(现在的版本可能可以自动创建,如果没有自动创建可以去官方github上面找建表sql脚本)

测试模块搭建

新建三个模块:seata-order-service2001seata-storage-service2002seata-account-service2003

maven依赖

seata 版本v1.0.0之后使用seata-spring-boot-starter替换之前的spring-cloud-starter-alibaba-seata,可以省去许多配置。

特别注意必须检查seata-spring-boot-starter中引入依赖的seata-all的版本是否符合seata版本,否则就像我这样去掉传递依赖引入seata-all,如果版本不对应会出现很多问题。(浪费了几个小时的经验之谈)

<dependencies>
    <!-- nacos -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
    </dependency>
    <!-- nacos -->

    <!-- seata-->
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-all</artifactId>
        <version>1.0.0</version>
    </dependency>

    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <!-- seata-->

    <!--feign-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.mybatis.spring.boot</groupId>
        <artifactId>mybatis-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>druid-spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
    </dependency>
    <!--jdbc-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.rhett</groupId>
        <artifactId>cloud-api-commons</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
</dependencies>

yaml配置

三个微服务配置大体是一样的,只是端口号、应用名、连接的数据库有区别

server:
  port: 2001

spring:
  application:
    name: seata-order-service
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848
  datasource:
    # 当前数据源操作类型
    type: com.alibaba.druid.pool.DruidDataSource
    # mysql驱动类
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
    username: root
    password: root
feign:
  hystrix:
    enabled: false
logging:
  level:
    io:
      seata: info

######## Seata 配置,有一些是默认值,我还是加上来了 ########
seata:
  enabled: true
  tx-service-group: my_test_tx_group
  service:
    vgroup-mapping: default
    disable-global-transaction: false
  client:
    support:
      spring:
        # seata 1.0特性,需要加上这个,否则需要手动配置数据源代理
        datasource-autoproxy: true
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      cluster: default
  config:
    type: file
    file:
      name: file.conf

seata v1.0.0之前需要在类加载路径下面引入file.confregistry.conf,这个版本使用了seata-spring-boot-starter之后就可以把配置写在yaml或者properties文件里面。

订单微服务

Order实体:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    private Long id;

    private Long userId;

    private Long productId;

    private Integer count;

    private BigDecimal money;

    /**
     * 订单状态 0:创建中,1:已完结
     */
    private Integer status;
}

Mapper:

@Mapper
public interface OrderDao {
    /**
     * 1 新建订单
     * @param order
     * @return
     */
    int create(Order order);

    /**
     * 2 修改订单状态,从0改为1
     * @param userId
     * @param status
     * @return
     */
    int update(@Param("userId") Long userId, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.OrderDao">

    <resultMap id="BaseResultMap" type="com.rhett.springcloud.entities.Order">
        <id column="id" property="id" jdbcType="BIGINT"></id>
        <result column="user_id" property="userId" jdbcType="BIGINT"></result>
        <result column="product_id" property="productId" jdbcType="BIGINT"></result>
        <result column="count" property="count" jdbcType="INTEGER"></result>
        <result column="money" property="money" jdbcType="DECIMAL"></result>
        <result column="status" property="status" jdbcType="INTEGER"></result>
    </resultMap>

    <insert id="create" parameterType="com.rhett.springcloud.entities.Order" useGeneratedKeys="true"
            keyProperty="id">
        insert into t_order(user_id,product_id,count,money,status) values (#{userId},#{productId},#{count},#{money},0);
    </insert>

    <update id="update">
        update t_order set status =1 where user_id =#{userId} and status=#{status};
    </update>
</mapper>

service:

public interface OrderService {
    /**
     * 创建订单
     * @param order
     */
    void create(Order order);
}
@Service
public class OrderServiceImpl implements OrderService {

    @Resource
    private OrderDao orderDao;
    @Resource
    private AccountService accountService;
    @Resource
    private StorageService storageService;

    /**
     * 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
     */
    @Override
    ///@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
    public void create(Order order) {
        // 1 新建订单
        orderDao.create(order);

        // 2 扣减库存
        storageService.decrease(order.getProductId(), order.getCount());

        // 3 扣减账户
        accountService.decrease(order.getUserId(), order.getMoney());

        // 4 修改订单状态,从0到1,1代表已完成
        orderDao.update(order.getUserId(), 0);
    }
}

controller:

@RestController
public class OrderController {
    @Resource
    private OrderService orderService;

    /**
     * 创建订单
     *
     * @param order
     * @return
     */
    @GetMapping("order/create")
    public CommonResult create(Order order) {
        orderService.create(order);
        return new CommonResult(200, "订单创建成功");
    }
}

feign-client:

@FeignClient(value = "seata-account-service")
public interface AccountService {
    /**
     * 减余额
     *
     * @param userId
     * @param money
     * @return
     */
    @PostMapping(value = "account/decrease")
    CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
    /**
     * 减库存
     *
     * @param productId
     * @param count
     * @return
     */
    @PostMapping(value = "storage/decrease")
    CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}

启动类:

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class SeataOrderMain2001 {
    public static void main(String[] args) {
        SpringApplication.run(SeataOrderMain2001.class, args);
    }
}

库存微服务

Storage实体:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {
    private Long id;

    //产品id
    private Long productId;

    //总库存
    private Integer total;

    //已用库存
    private Integer used;

    //剩余库存
    private Integer residue;
}

mapper:

@Mapper
public interface StorageDao {
    /**
     * 减库存
     * @param productId
     * @param count
     * @return
     */
    int decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.StorageDao">

    <resultMap id="BaseResultMap" type="com.rhett.springcloud.alibaba.domain.Storage">
        <id column="id" property="id" jdbcType="BIGINT"></id>
        <result column="product_id" property="productId" jdbcType="BIGINT"></result>
        <result column="total" property="total" jdbcType="INTEGER"></result>
        <result column="used" property="used" jdbcType="INTEGER"></result>
        <result column="residue" property="residue" jdbcType="INTEGER"></result>
    </resultMap>


    <!--减库存-->
    <update id="decrease">
        update t_storage
        set used =used + #{count},residue=residue-#{count}
        where product_id=#{productId};
    </update>
</mapper>

service:

public interface StorageService {
    /**
     * 减库存
     *
     * @param productId
     * @param count
     * @return
     */
    void decrease(Long productId, Integer count);
}
@Service
public class StorageServiceImpl implements StorageService {

    @Resource
    private StorageDao storageDao;

    @Override
    public void decrease(Long productId, Integer count) {
        storageDao.decrease(productId, count);
    }
}

controller:

@RestController
public class StorageController {

    @Resource
    private StorageService storageService;

    //扣除库存
    @PostMapping(value = "storage/decrease")
    public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count) {
        storageService.decrease(productId, count);
        return new CommonResult(200, "扣减库存成功");
    }
}

启动类:

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class StorageMain2002 {
    public static void main(String[] args) {
        SpringApplication.run(StorageMain2002.class, args);
    }
}

账户微服务

Account实体:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
    private Long id;

    //用户id
    private Long userId;

    //总额度
    private Integer total;

    //已用额度
    private Integer used;

    //剩余额度
    private Integer residue;
}

mapper:

@Mapper
public interface AccountDao {
    /**
     * 扣减账户余额
     *
     * @param userId
     * @param money
     * @return
     */
    int decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.AccountDao">

    <update id="decrease">
        update t_account set residue = residue- #{money},used = used + #{money}
        where user_id =#{userId};
    </update>
</mapper>

service:

@Service
public interface AccountService {
    /**
     * 减库存
     *
     * @param userId 用户id
     * @param money  金额
     * @return
     */
    void decrease(Long userId, BigDecimal money);
}
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Resource
    private AccountDao accountDao;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("*******->account-service中扣减账户余额开始");
        // 模拟超时异常,全局事务回滚
        /*try {
            // 暂停20秒钟
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
        accountDao.decrease(userId, money);
        log.info("*******->account-service中扣减账户余额结束");
    }
}

controller:

@RestController
public class AccountController {
    @Resource
    private AccountService accountService;

    @PostMapping(value = "account/decrease")
    public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
        accountService.decrease(userId, money);
        return new CommonResult(200, "扣减账户余额成功");
    }
}

启动类:

@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class SeataAccountMain2003 {
    public static void main(String[] args) {
        SpringApplication.run(SeataAccountMain2003.class, args);
    }
}

测试

模拟正常业务流程

我们先查看数据库中各表原始的数据:

账户表初始数据

库存表初始数据

订单表初始为空,这里不贴图上来了。

启动三个微服务,访问

http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100

进行下单(使用用户id为1的用户,购买10个产品id为1的产品,总金额为100),观察各表的变化:

订单表多出一行:

订单创建成功

账户表扣除余额:

Seata分布式事务框架使用

库存表扣除了库存量:

Seata分布式事务框架使用

可见,正常流程是没有问题的。

模拟某过程失败

我们将账户微服务中的service中注释的代码加上去,即模拟在扣除余额的的时候超时失败:

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
    @Resource
    private AccountDao accountDao;

    @Override
    public void decrease(Long userId, BigDecimal money) {
        log.info("*******->account-service中扣减账户余额开始");
        // 模拟超时异常,全局事务回滚
        try {
            // 暂停20秒钟
            TimeUnit.SECONDS.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        accountDao.decrease(userId, money);
        log.info("*******->account-service中扣减账户余额结束");
    }
}

在订单微服务中我们下单是先扣除的库存、然后扣除账户的余额,最后修改订单的状态,如果扣除账户的余额过程中发生异常,可能导致库存量白白减少。

重启账户微服务,再次调用接口下单,观察各表变化:

订单创建成功,但状态为失败:

Seata分布式事务框架使用

库存扣除成功,减至80:

Seata分布式事务框架使用

账户余额扣除失败,仍为900:

Seata分布式事务框架使用

20秒过完后,账户金额仍会被扣除100,但订单状态总是失败的。

加上Seata事务注解

我们在订单微服务的业务方法上加上注解@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)

name是为事务起的名字,rollbackFor指定需要回滚的异常

为了效果明显我先把库存和余额改回原来的数据。

重启订单微服务,再次进行下单,查看各表变化:

订单表无变化,无新增:

Seata分布式事务框架使用

库存无变化:

Seata分布式事务框架使用

账户余额无变化:

Seata分布式事务框架使用

实际上是因为出现异常,Seata帮我们回滚了,可以查看输出日志:

Seata分布式事务框架使用

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/seata%e5%88%86%e5%b8%83%e5%bc%8f%e4%ba%8b%e5%8a%a1%e6%a1%86%e6%9e%b6%e4%bd%bf%e7%94%a8/

(0)
彭晨涛彭晨涛管理者
上一篇 2020年5月8日 23:13
下一篇 2020年5月9日

相关推荐

发表回复

登录后才能评论