Seata的介绍见 分布式事务解决方案和Seata的介绍
下载与运行
下载地址: https://github.com/seata/seata/releases
这里选择v1.0.0
版本下载到本地解压(几个月前还是1.0.0来着,现在最新版本已经到1.2.0了 ←_← ,不过我觉得使用区别最大的还是0.9.0到1.0.0),进入conf/
目录,修改file.conf
(主要是修改数据库连接配置,建议先备份):
service {
#transaction service group mapping
vgroup_mapping.my_test_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#disable seata
disableGlobalTransaction = false
}
## transaction log store, only used in seata-server
store {
## store mode: file、db
mode = "db"
## file store property
file {
## store location dir
dir = "sessionStore"
}
## 将这部分修改为自己的数据库连接信息
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
db-type = "mysql"
driver-class-name = "com.mysql.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata"
user = "root"
password = "123456"
}
}
在mysql中新建seata
数据库,使用sql脚本建表:(现在的版本好像是启动了Seata服务会自动建立这个数据库,如果没有自动建立数据库可以去github上面找建表sql脚本)
https://github.com/seata/seata/blob/develop/script/server/db/mysql.sql
然后修改conf/registry.conf
文件,配置nacos信息(seata需要注册到注册中心):
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
...
启动nacos服务,再运行seata目录下的bin/startup.bat
,即可启动seata服务。
使用
数据库环境搭建
这里我们会创建三个微服务, 一个订单服务, 一个库存服务, 一个账户服务。
当用户下单时,会在订单服务中创建一个订单, 然后通过远程调用库存服务来扣减下单商品的库存,再通过远程调用账户服务来扣减用户账户里面的余额,最后在订单服务中修改订单状态为已完成。
该操作跨越三个数据库,有两次远程调用,很明显会有分布式事务问题。
建库建表:
create database seata_order;
create database seata_storage;
create database seata_account;
use seata_order;
DROP TABLE IF EXISTS `t_order`;
CREATE TABLE `t_order` (
`id` bigint(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`user_id` bigint(20) DEFAULT NULL COMMENT '用户id',
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`count` int(11) DEFAULT NULL COMMENT '数量',
`money` decimal(11, 0) DEFAULT NULL COMMENT '金额',
`status` int(1) DEFAULT NULL COMMENT '订单状态: 0:创建中 1:已完结'
) ENGINE = InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET = utf8;
/* 回滚日志表 */
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
use seata_storage;
DROP TABLE IF EXISTS `t_storage`;
CREATE TABLE `t_storage` (
`id` bigint(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',
`total` int(11) DEFAULT NULL COMMENT '总库存',
`used` int(11) DEFAULT NULL COMMENT '已用库存',
`residue` int(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE = InnoDB auto_increment=2 CHARSET = utf8;
INSERT INTO `t_storage` VALUES (1, 1, 100, 0, 100);
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
use seata_account;
CREATE TABLE `t_account` (
`id` bigint(11) NOT NULL auto_increment PRIMARY KEY COMMENT 'id',
`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',
`total` decimal(10, 0) DEFAULT NULL COMMENT '总额度',
`used` decimal(10, 0) DEFAULT NULL COMMENT '已用余额',
`residue` decimal(10, 0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE = InnoDB auto_increment=2 CHARSET = utf8;
INSERT INTO `t_account` VALUES (1, 1, 1000, 0, 1000);
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
其中undo_log
表是SEATA AT模式
需要的回滚日志表。(现在的版本可能可以自动创建,如果没有自动创建可以去官方github上面找建表sql脚本)
测试模块搭建
新建三个模块:seata-order-service2001
、seata-storage-service2002
、seata-account-service2003
。
maven依赖
seata 版本v1.0.0
之后使用seata-spring-boot-starter
替换之前的spring-cloud-starter-alibaba-seata
,可以省去许多配置。
特别注意必须检查seata-spring-boot-starter
中引入依赖的seata-all
的版本是否符合seata版本,否则就像我这样去掉传递依赖引入seata-all
,如果版本不对应会出现很多问题。(浪费了几个小时的经验之谈)
<dependencies>
<!-- nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- nacos -->
<!-- seata-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.0.0</version>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- seata-->
<!--feign-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rhett</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
yaml配置
三个微服务配置大体是一样的,只是端口号、应用名、连接的数据库有区别
server:
port: 2001
spring:
application:
name: seata-order-service
cloud:
nacos:
discovery:
server-addr: 127.0.0.1:8848
datasource:
# 当前数据源操作类型
type: com.alibaba.druid.pool.DruidDataSource
# mysql驱动类
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata_order?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: root
feign:
hystrix:
enabled: false
logging:
level:
io:
seata: info
######## Seata 配置,有一些是默认值,我还是加上来了 ########
seata:
enabled: true
tx-service-group: my_test_tx_group
service:
vgroup-mapping: default
disable-global-transaction: false
client:
support:
spring:
# seata 1.0特性,需要加上这个,否则需要手动配置数据源代理
datasource-autoproxy: true
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
cluster: default
config:
type: file
file:
name: file.conf
seata v1.0.0
之前需要在类加载路径下面引入file.conf
和registry.conf
,这个版本使用了seata-spring-boot-starter
之后就可以把配置写在yaml或者properties文件里面。
订单微服务
Order实体:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private Long id;
private Long userId;
private Long productId;
private Integer count;
private BigDecimal money;
/**
* 订单状态 0:创建中,1:已完结
*/
private Integer status;
}
Mapper:
@Mapper
public interface OrderDao {
/**
* 1 新建订单
* @param order
* @return
*/
int create(Order order);
/**
* 2 修改订单状态,从0改为1
* @param userId
* @param status
* @return
*/
int update(@Param("userId") Long userId, @Param("status") Integer status);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.OrderDao">
<resultMap id="BaseResultMap" type="com.rhett.springcloud.entities.Order">
<id column="id" property="id" jdbcType="BIGINT"></id>
<result column="user_id" property="userId" jdbcType="BIGINT"></result>
<result column="product_id" property="productId" jdbcType="BIGINT"></result>
<result column="count" property="count" jdbcType="INTEGER"></result>
<result column="money" property="money" jdbcType="DECIMAL"></result>
<result column="status" property="status" jdbcType="INTEGER"></result>
</resultMap>
<insert id="create" parameterType="com.rhett.springcloud.entities.Order" useGeneratedKeys="true"
keyProperty="id">
insert into t_order(user_id,product_id,count,money,status) values (#{userId},#{productId},#{count},#{money},0);
</insert>
<update id="update">
update t_order set status =1 where user_id =#{userId} and status=#{status};
</update>
</mapper>
service:
public interface OrderService {
/**
* 创建订单
* @param order
*/
void create(Order order);
}
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private AccountService accountService;
@Resource
private StorageService storageService;
/**
* 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态
*/
@Override
///@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
public void create(Order order) {
// 1 新建订单
orderDao.create(order);
// 2 扣减库存
storageService.decrease(order.getProductId(), order.getCount());
// 3 扣减账户
accountService.decrease(order.getUserId(), order.getMoney());
// 4 修改订单状态,从0到1,1代表已完成
orderDao.update(order.getUserId(), 0);
}
}
controller:
@RestController
public class OrderController {
@Resource
private OrderService orderService;
/**
* 创建订单
*
* @param order
* @return
*/
@GetMapping("order/create")
public CommonResult create(Order order) {
orderService.create(order);
return new CommonResult(200, "订单创建成功");
}
}
feign-client:
@FeignClient(value = "seata-account-service")
public interface AccountService {
/**
* 减余额
*
* @param userId
* @param money
* @return
*/
@PostMapping(value = "account/decrease")
CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
/**
* 减库存
*
* @param productId
* @param count
* @return
*/
@PostMapping(value = "storage/decrease")
CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count);
}
启动类:
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class SeataOrderMain2001 {
public static void main(String[] args) {
SpringApplication.run(SeataOrderMain2001.class, args);
}
}
库存微服务
Storage实体:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Storage {
private Long id;
//产品id
private Long productId;
//总库存
private Integer total;
//已用库存
private Integer used;
//剩余库存
private Integer residue;
}
mapper:
@Mapper
public interface StorageDao {
/**
* 减库存
* @param productId
* @param count
* @return
*/
int decrease(@Param("productId") Long productId, @Param("count") Integer count);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.StorageDao">
<resultMap id="BaseResultMap" type="com.rhett.springcloud.alibaba.domain.Storage">
<id column="id" property="id" jdbcType="BIGINT"></id>
<result column="product_id" property="productId" jdbcType="BIGINT"></result>
<result column="total" property="total" jdbcType="INTEGER"></result>
<result column="used" property="used" jdbcType="INTEGER"></result>
<result column="residue" property="residue" jdbcType="INTEGER"></result>
</resultMap>
<!--减库存-->
<update id="decrease">
update t_storage
set used =used + #{count},residue=residue-#{count}
where product_id=#{productId};
</update>
</mapper>
service:
public interface StorageService {
/**
* 减库存
*
* @param productId
* @param count
* @return
*/
void decrease(Long productId, Integer count);
}
@Service
public class StorageServiceImpl implements StorageService {
@Resource
private StorageDao storageDao;
@Override
public void decrease(Long productId, Integer count) {
storageDao.decrease(productId, count);
}
}
controller:
@RestController
public class StorageController {
@Resource
private StorageService storageService;
//扣除库存
@PostMapping(value = "storage/decrease")
public CommonResult decrease(@RequestParam("productId") Long productId, @RequestParam("count") Integer count) {
storageService.decrease(productId, count);
return new CommonResult(200, "扣减库存成功");
}
}
启动类:
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class StorageMain2002 {
public static void main(String[] args) {
SpringApplication.run(StorageMain2002.class, args);
}
}
账户微服务
Account实体:
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
private Long id;
//用户id
private Long userId;
//总额度
private Integer total;
//已用额度
private Integer used;
//剩余额度
private Integer residue;
}
mapper:
@Mapper
public interface AccountDao {
/**
* 扣减账户余额
*
* @param userId
* @param money
* @return
*/
int decrease(@Param("userId") Long userId, @Param("money") BigDecimal money);
}
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.rhett.springcloud.alibaba.dao.AccountDao">
<update id="decrease">
update t_account set residue = residue- #{money},used = used + #{money}
where user_id =#{userId};
</update>
</mapper>
service:
@Service
public interface AccountService {
/**
* 减库存
*
* @param userId 用户id
* @param money 金额
* @return
*/
void decrease(Long userId, BigDecimal money);
}
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Resource
private AccountDao accountDao;
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("*******->account-service中扣减账户余额开始");
// 模拟超时异常,全局事务回滚
/*try {
// 暂停20秒钟
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
accountDao.decrease(userId, money);
log.info("*******->account-service中扣减账户余额结束");
}
}
controller:
@RestController
public class AccountController {
@Resource
private AccountService accountService;
@PostMapping(value = "account/decrease")
public CommonResult decrease(@RequestParam("userId") Long userId, @RequestParam("money") BigDecimal money) {
accountService.decrease(userId, money);
return new CommonResult(200, "扣减账户余额成功");
}
}
启动类:
@EnableDiscoveryClient
@EnableFeignClients
@SpringBootApplication
public class SeataAccountMain2003 {
public static void main(String[] args) {
SpringApplication.run(SeataAccountMain2003.class, args);
}
}
测试
模拟正常业务流程
我们先查看数据库中各表原始的数据:
订单表初始为空,这里不贴图上来了。
启动三个微服务,访问
http://localhost:2001/order/create?userId=1&productId=1&count=10&money=100
进行下单(使用用户id为1的用户,购买10个产品id为1的产品,总金额为100),观察各表的变化:
订单表多出一行:
账户表扣除余额:
库存表扣除了库存量:
可见,正常流程是没有问题的。
模拟某过程失败
我们将账户微服务中的service中注释的代码加上去,即模拟在扣除余额的的时候超时失败:
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Resource
private AccountDao accountDao;
@Override
public void decrease(Long userId, BigDecimal money) {
log.info("*******->account-service中扣减账户余额开始");
// 模拟超时异常,全局事务回滚
try {
// 暂停20秒钟
TimeUnit.SECONDS.sleep(20);
} catch (InterruptedException e) {
e.printStackTrace();
}
accountDao.decrease(userId, money);
log.info("*******->account-service中扣减账户余额结束");
}
}
在订单微服务中我们下单是先扣除的库存、然后扣除账户的余额,最后修改订单的状态,如果扣除账户的余额过程中发生异常,可能导致库存量白白减少。
重启账户微服务,再次调用接口下单,观察各表变化:
订单创建成功,但状态为失败:
库存扣除成功,减至80:
账户余额扣除失败,仍为900:
20秒过完后,账户金额仍会被扣除100,但订单状态总是失败的。
加上Seata事务注解
我们在订单微服务的业务方法上加上注解@GlobalTransactional(name = "fsp-create-order", rollbackFor = Exception.class)
name是为事务起的名字,rollbackFor指定需要回滚的异常
为了效果明显我先把库存和余额改回原来的数据。
重启订单微服务,再次进行下单,查看各表变化:
订单表无变化,无新增:
库存无变化:
账户余额无变化:
实际上是因为出现异常,Seata帮我们回滚了,可以查看输出日志:
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/seata%e5%88%86%e5%b8%83%e5%bc%8f%e4%ba%8b%e5%8a%a1%e6%a1%86%e6%9e%b6%e4%bd%bf%e7%94%a8/