每日技术干货,第一时间送达!
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
//1.查询出当前资源模块下所有资源,查询出来后进行删除
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
//2.查询出当前资源模块下所有子模块,递归查询,当删除完所有子模块下的资源后,再删除所有子模块,最终删除当前资源模块
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
//3.删除当前资源模块
removeById(authorityModuleId);
}
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
CompletableFuture.runAsync(()->{
//两个并行执行的任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
//等待两个并行任务执行完后,再执行最后一个步骤
CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
},executor);
}
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
CompletableFuture.runAsync(()->{
//两个并行执行的任务
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService),executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
//等待两个并行任务执行完后,再执行最后一个步骤
CompletableFuture.allOf(future1,future2).thenRun(()->removeById(authorityModuleId));
},executor);
}
-
事务的隔离级别
-
事务的传播行为
-
事务的超时时间
-
是否为只读事务
-
…
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
-
当前事务是否是新事务
-
当前事务是否结束
-
当前事务是否需要回滚(通过标记来判断,因此我也可以在业务流程中手动设置标记为true,来让事务在没有发生异常的情况下进行回滚)
-
当前事务是否设置了回滚点(savePoint)
-
通过自动代理创建器依次尝试为每个放入容器中的bean尝试进行代理 -
尝试进行代理的过程对于事务管理来说,就是利用事务管理涉及到的增强器advisor,即TransactionAttributeSourceAdvisor -
判断当前增强器是否能够应用与当前bean上,怎么判断呢? —> advisor内部的pointCut喽 ! -
如果能够应用,那么好,为当前bean创建代理对象返回,并且往代理对象内部添加一个TransactionInterceptor拦截器。 -
此时我们再从容器中获取,拿到的就是代理对象了,当我们调用代理对象的方法时,首先要经过代理对象内部拦截器链的处理,处理完后,最终才会调用被代理对象的方法。(这里其实就是责任链模式的应用)
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
//TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源
//可以看做是一个存放TransactionAttribute与method方法映射的池子
TransactionAttributeSource tas = getTransactionAttributeSource();
//获取当前事务方法对应的TransactionAttribute
final TransactionAttribute txAttr = (tas !=null ? tas.getTransactionAttribute(method, targetClass) :null);
//定位TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
.....
//类型转换为局部事务管理器
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr ==null || !(ptminstanceof CallbackPreferringPlatformTransactionManager)) {
//TransactionManager根据TransactionAttribute创建事务后返回
//TransactionInfo封装了当前事务的信息--包括TransactionStatus
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
//继续执行过滤器链---过滤链最终会调用目标方法
//因此可以理解为这里是调用目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//目标方法抛出异常则进行判断是否需要回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
//清除当前事务信息
cleanupTransactionInfo(txInfo);
}
...
//正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)
commitTransactionAfterReturning(txInfo);
return retVal;
}
...
public class TransactionMain {
public static void main(String[] args) throws ClassNotFoundException, SQLException {
test();
}
private static void test() {
DataSource dataSource = getDS();
JdbcTransactionManager jtm =new JdbcTransactionManager(dataSource);
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef =new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
TransactionStatus ts = jtm.getTransaction(transactionDef);
//进行业务逻辑操作
try {
update(dataSource);
jtm.commit(ts);
}catch (Exception e){
jtm.rollback(ts);
System.out.println("发生异常,我已回滚");
}
}
private static void update(DataSource dataSource) throws Exception {
JdbcTemplate jt =new JdbcTemplate();
jt.setDataSource(dataSource);
jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
thrownew Exception("我是来捣乱的");
}
}
/**
* 多线程事务一致性管理 <br>
* 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行
* @author 大忽悠
* @create 2022/10/19 21:34
*/
@Component
@RequiredArgsConstructor
publicclass MultiplyThreadTransactionManager {
/**
* 如果是多数据源的情况下,需要指定具体是哪一个数据源
*/
privatefinal DataSource dataSource;
/**
* 执行的是无返回值的任务
* @param tasks 异步执行的任务列表
* @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
*/
public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
if(executor==null){
thrownew IllegalArgumentException("线程池不能为空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否发生了异常
AtomicBoolean ex=new AtomicBoolean();
List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
tasks.forEach(task->{
taskFutureList.add(CompletableFuture.runAsync(
() -> {
try{
//1.开启新事务
transactionStatusList.add(openNewTransaction(transactionManager));
//2.异步任务执行
task.run();
}catch (Throwable throwable){
//打印异常
throwable.printStackTrace();
//其中某个异步任务执行出现了异常,进行标记
ex.set(Boolean.TRUE);
//其他任务还没执行的不需要执行了
taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
}
}
, executor)
);
});
try {
//阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//发生了异常则进行回滚操作,否则提交
if(ex.get()){
System.out.println("发生异常,全部事务回滚");
transactionStatusList.forEach(transactionManager::rollback);
}else {
System.out.println("全部事务正常提交");
transactionStatusList.forEach(transactionManager::commit);
}
}
private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef =new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
return transactionManager.getTransaction(transactionDef);
}
private DataSourceTransactionManager getTransactionManager() {
returnnew DataSourceTransactionManager(dataSource);
}
}
public void test(){
List<Runnable> tasks=new ArrayList<>();
tasks.add(()->{
userMapper.deleteById(26);
});
tasks.add(()->{
signMapper.deleteById(10);
});
multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}
No value for key [HikariDataSource (HikariPool-1)] bound to thread [main]
解释: 无法在当前线程绑定的threadLocal中寻找到HikariDataSource作为key,对应关联的资源对象ConnectionHolder
//保存当前事务关联的资源--默认只会在新建事务的时候保存当前获取到的DataSource和当前事务对应Connection的映射关系--当然这里Connection被包装为了ConnectionHolder
privatestaticfinal ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
//事务监听者--在事务执行到某个阶段的过程中,会去回调监听者对应的回调接口(典型观察者模式的应用)---默认为空集合
privatestaticfinal ThreadLocal<Set<TransactionSynchronization>> synchronizations =
new NamedThreadLocal<>("Transaction synchronizations");
//见名知意: 存放当前事务名字
privatestaticfinal ThreadLocal<String> currentTransactionName =
new NamedThreadLocal<>("Current transaction name");
//见名知意: 存放当前事务是否是只读事务
privatestaticfinal ThreadLocal<Boolean> currentTransactionReadOnly =
new NamedThreadLocal<>("Current transaction read-only status");
//见名知意: 存放当前事务的隔离级别
privatestaticfinal ThreadLocal<Integer> currentTransactionIsolationLevel =
new NamedThreadLocal<>("Current transaction isolation level");
//见名知意: 存放当前事务是否处于激活状态
privatestaticfinal ThreadLocal<Boolean> actualTransactionActive =
new NamedThreadLocal<>("Actual transaction active");
/**
* 多线程事务一致性管理 <br>
* 声明式事务管理无法完成,此时我们只能采用初期的编程式事务管理才行
* @author 大忽悠
* @create 2022/10/19 21:34
*/
@Component
@RequiredArgsConstructor
publicclass MultiplyThreadTransactionManager {
/**
* 如果是多数据源的情况下,需要指定具体是哪一个数据源
*/
privatefinal DataSource dataSource;
/**
* 执行的是无返回值的任务
* @param tasks 异步执行的任务列表
* @param executor 异步执行任务需要用到的线程池,考虑到线程池需要隔离,这里强制要求传
*/
public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
if(executor==null){
thrownew IllegalArgumentException("线程池不能为空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
//是否发生了异常
AtomicBoolean ex=new AtomicBoolean();
List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
List<TransactionResource> transactionResources=new ArrayList<>(tasks.size());
tasks.forEach(task->{
taskFutureList.add(CompletableFuture.runAsync(
() -> {
try{
//1.开启新事务
transactionStatusList.add(openNewTransaction(transactionManager));
//2.copy事务资源
transactionResources.add(TransactionResource.copyTransactionResource());
//3.异步任务执行
task.run();
}catch (Throwable throwable){
//打印异常
throwable.printStackTrace();
//其中某个异步任务执行出现了异常,进行标记
ex.set(Boolean.TRUE);
//其他任务还没执行的不需要执行了
taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
}
}
, executor)
);
});
try {
//阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
//发生了异常则进行回滚操作,否则提交
if(ex.get()){
System.out.println("发生异常,全部事务回滚");
for (int i =0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.rollback(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
}else {
System.out.println("全部事务正常提交");
for (int i =0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.commit(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
}
}
private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
//JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
//包括隔离级别和传播行为等
DefaultTransactionDefinition transactionDef =new DefaultTransactionDefinition();
//开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
return transactionManager.getTransaction(transactionDef);
}
private DataSourceTransactionManager getTransactionManager() {
returnnew DataSourceTransactionManager(dataSource);
}
/**
* 保存当前事务资源,用于线程间的事务资源COPY操作
*/
@Builder
privatestaticclass TransactionResource{
//事务结束后默认会移除集合中的DataSource作为key关联的资源记录
private Map<Object, Object> resources =new HashMap<>();
//下面五个属性会在事务结束后被自动清理,无需我们手动清理
private Set<TransactionSynchronization> synchronizations =new HashSet<>();
private String currentTransactionName;
private Boolean currentTransactionReadOnly;
private Integer currentTransactionIsolationLevel;
private Boolean actualTransactionActive;
publicstatic TransactionResource copyTransactionResource(){
return TransactionResource.builder()
//返回的是不可变集合
.resources(TransactionSynchronizationManager.getResourceMap())
//如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
.synchronizations(new LinkedHashSet<>())
.currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
.currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
.currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
.actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
.build();
}
public void autoWiredTransactionResource(){
resources.forEach(TransactionSynchronizationManager::bindResource);
//如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
TransactionSynchronizationManager.initSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
}
public void removeTransactionResource() {
//事务结束后默认会移除集合中的DataSource作为key关联的资源记录
//DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
resources.keySet().forEach(key->{
if(!(keyinstanceof DataSource)){
TransactionSynchronizationManager.unbindResource(key);
}
});
}
}
}
@SpringBootTest(classes = UserMain.class)
publicclass Test {
@Resource
private UserMapper userMapper;
@Resource
private SignMapper signMapper;
@Resource
private MultiplyThreadTransactionManager multiplyThreadTransactionManager;
@SneakyThrows
@org.junit.jupiter.api.Test
public void test(){
List<Runnable> tasks=new ArrayList<>();
tasks.add(()->{
userMapper.deleteById(26);
thrownew RuntimeException("我就要抛出异常!");
});
tasks.add(()->{
signMapper.deleteById(10);
});
multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
}
}
往期推荐
蚂蚁又开源了一个顶级 Java 项目!
网易二面:阿里为何建议MVC+Manager层混合架构?
从一个程序员的角度告诉你:“12306”有多牛逼!
Arthas全面使用指南:离线安装+Docker/K8s集成+集中管理
如何优雅实现多账号统一登录?so easy!
HTTPS 行为大赏:三分钟了解加密过程

