大数跨境
0
0

SpringBoot整合Atomikos实现分布式事务一种不错的实现方案

SpringBoot整合Atomikos实现分布式事务一种不错的实现方案 Spring全家桶实战案例
2023-09-22
0
导读:SpringBoot整合Atomikos多数据源分布式事

环境:JDK8+2.4.12+Atomikos+JPA

依赖管理

  • pom.xml

<properties>  <java.version>1.8</java.version></properties>
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency></dependencies>
  • application.yml

server:  port: 9101---spring:  application:    name: atomikos---spring:  jta:    enabled: true  datasource:    account:      resourceName: accountResource      dsClassName: com.mysql.cj.jdbc.MysqlXADataSource      url: jdbc:mysql://localhost:3306/account?serverTimezone=GMT%2B8&useSSL=false      username: root      password: 123123    storage:      resourceName: storageResource      dsClassName: com.mysql.cj.jdbc.MysqlXADataSource      url: jdbc:mysql://localhost:3306/storage?serverTimezone=GMT%2B8&useSSL=false      username: root      password: 123123---spring:  jpa:    generateDdl: false    hibernate:      ddlAuto: update    openInView: true    show-sql: true    properties:      hibernate:        physical_naming_strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy        dialect: org.hibernate.dialect.MySQL5Dialect        transaction:          jta:            platform: com.pack.jpa.config.AtomikosJtaPlatform  #处理事务的类  hibernate不提供 需要自己写 需要继承 AbstractJtaPlatform[这里要注意 这里写的是这个类的全路径]      javax:        persistence:          transactionType: JTA  #指明事务处理类型---debug: false

这里如果不配置physical_naming_strategy

这里配置了两个数据源,account库和storage库

  • 数据源配置

数据源属性文件:

@Datapublic class BaseDataSourceProperties {  private String resourceName ;  private String dsClassName ;  private String driver ;  private String url ;  private String username ;  private String password ;}

Account库属性文件:

@Component@ConfigurationProperties(prefix = "spring.datasource.account")public class AccountDataSourceProperties extends BaseDataSourceProperties {}

Storage库属性文件:

@Component@ConfigurationProperties(prefix = "spring.datasource.storage")public class StorageDataSourceProperties extends BaseDataSourceProperties {}

数据源配置:

@Configurationpublic class DataSourceConfig {    @Bean(name = "accountDataSource", initMethod = "init", destroyMethod = "close")  @Primary  public DataSource accountDataSource(AccountDataSourceProperties props) {    AtomikosDataSourceBean ds = new AtomikosDataSourceBean() ;    ds.setUniqueResourceName(props.getResourceName()) ;    ds.setXaDataSourceClassName(props.getDsClassName()) ;    Properties xaProperties = new Properties() ;    xaProperties.setProperty("url", props.getUrl()) ;    xaProperties.setProperty("user", props.getUsername()) ;    xaProperties.setProperty("password", props.getPassword()) ;    ds.setXaProperties(xaProperties) ;    ds.setMinPoolSize(10) ;    ds.setMaxPoolSize(10) ;    ds.setBorrowConnectionTimeout(30) ;    ds.setMaxLifetime(60) ;    ds.setMaintenanceInterval(60) ;    return ds ;  }    @Bean(name = "storageDataSource", initMethod = "init", destroyMethod = "close")  public DataSource storageDataSource(StorageDataSourceProperties props) {    AtomikosDataSourceBean ds = new AtomikosDataSourceBean() ;    ds.setUniqueResourceName(props.getResourceName()) ;    ds.setXaDataSourceClassName(props.getDsClassName()) ;    Properties xaProperties = new Properties() ;    xaProperties.setProperty("url", props.getUrl()) ;    xaProperties.setProperty("user", props.getUsername()) ;    xaProperties.setProperty("password", props.getPassword()) ;    ds.setXaProperties(xaProperties) ;    ds.setMinPoolSize(10) ;    ds.setMaxPoolSize(10) ;    ds.setBorrowConnectionTimeout(30) ;    ds.setMaxLifetime(60) ;    ds.setMaintenanceInterval(60) ;    return ds ;  }  }
  • JPA配置

public class EntityManagerFactoryConfig {
@Configuration @EnableJpaRepositories(basePackages = {"com.pack.account.repository"}, entityManagerFactoryRef = "accountEntityManagerFactory", transactionManagerRef = "transactionManager") @DependsOn("transactionManager") static class AccountEntityManagerFactory { @Resource(name = "accountDataSource") private DataSource accountDataSource; @Resource private JpaProperties props ; private String accountDomainPkg = "com.pack.account.domain"; @Bean @Primary public LocalContainerEntityManagerFactoryBean accountEntityManagerFactory(EntityManagerFactoryBuilder builder) { return builder.dataSource(accountDataSource).packages(accountDomainPkg).persistenceUnit("account") .properties(props.getProperties()).build(); }
}
@Configuration @EnableJpaRepositories(basePackages = {"com.pack.storage.repository"}, entityManagerFactoryRef = "storageEntityManagerFactory", transactionManagerRef = "transactionManager") @DependsOn("transactionManager") static class StorageEntityManagerFactory { @Resource(name = "storageDataSource") private DataSource storageDataSource ; @Resource private JpaProperties props ; private String storageDomainPkg = "com.pack.storage.domain" ;
@Bean public LocalContainerEntityManagerFactoryBean storageEntityManagerFactory(EntityManagerFactoryBuilder builder) { return builder.dataSource(storageDataSource).packages(storageDomainPkg).persistenceUnit("storage") .properties(props.getProperties()).build(); }
}
}

事务配置:

public class AtomikosJtaPlatform extends AbstractJtaPlatform {
private static final long serialVersionUID = 1L;
public static TransactionManager transactionManager; public static UserTransaction transaction;
@Override protected TransactionManager locateTransactionManager() { return transactionManager; }
@Override protected UserTransaction locateUserTransaction() { return transaction; }
}
@Configurationpublic class TxConfig {    @Bean(name = "userTransaction")  public UserTransaction userTransaction() throws Throwable {    UserTransactionImp userTransactionImp = new UserTransactionImp();    userTransactionImp.setTransactionTimeout(10000);    return userTransactionImp;  }
@Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close") public TransactionManager atomikosTransactionManager() throws Throwable { UserTransactionManager userTransactionManager = new UserTransactionManager(); userTransactionManager.setForceShutdown(false); return userTransactionManager; }
@Bean(name = "transactionManager") @DependsOn({ "userTransaction", "atomikosTransactionManager" }) public PlatformTransactionManager transactionManager() throws Throwable { UserTransaction userTransaction = userTransaction(); TransactionManager atomikosTransactionManager = atomikosTransactionManager(); return new JtaTransactionManager(userTransaction, atomikosTransactionManager); } }

注意AtomikosJtaPlatform类就是我们的事务处理类,对应如下的配置:


  • 实体类

@Entity@Table(name = "t_account")public class Account {
@Id private Long id; private String userId; private BigDecimal money;}
@Entity@Table(name = "t_storage")public class Storage {
@Id private Long id; private String commodityCode; private Integer count;}
  • DAO及Service

public interface AccountRepository extends JpaRepository<Account, Long>, JpaSpecificationExecutor<Account> {}public interface StorageRepository extends JpaRepository<Storage, Long>, JpaSpecificationExecutor<Storage> {}
@Servicepublic class AccountService {  @Resource  private AccountRepository accountRepository ;     @Transactional  public void saveAccount(Account account) {    accountRepository.save(account) ;  }  }
@Servicepublic class StorageService {    @Resource  private StorageRepository storageRepository ;    @Transactional  public void saveStorage(Storage storage) {    storageRepository.save(storage) ;  }  }
@Servicepublic class OperatorService {
@Resource private AccountService accountService ; @Resource private StorageService storageService ; @Transactional public void save(Account account, Storage storage) { accountService.saveAccount(account) ; if (storage.getCount() == 0) { throw new RuntimeException("库存数量错误0") ; } storageService.saveStorage(storage) ; if (storage.getCount() == -1) { throw new RuntimeException("库存数量错误-1") ; } } }

OperatorService分别调用两个服务类。这里的@Transactional 必须有,如果没有即便是该方法抛出了异常也能数据保存成功。


测试:

@RestController@RequestMapping("/oc")public class OperatorController {    @Resource  private OperatorService os ;    @PostMapping("/save")  public Object save(@RequestBody OperatorDTO dto) {    os.save(dto.getAccount(), dto.getStorage()) ;    return "1" ;  }  }

成功示例:



失败示例:

将count设置为0,或者-1


数据都没有成功地插入到数据库。

完毕!!!

关注+转发


【声明】内容源于网络
0
0
Spring全家桶实战案例
Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
内容 832
粉丝 0
Spring全家桶实战案例 Java全栈开发,前端Vue2/3全家桶;Spring, SpringBoot 2/3, Spring Cloud各种实战案例及源码解读
总阅读195
粉丝0
内容832