加入收藏 | 设为首页 | 会员中心 | 我要投稿 马鞍山站长网 (https://www.0555zz.cn/)- 媒体处理、内容创作、云渲染、网络安全、业务安全!
当前位置: 首页 > 综合聚焦 > 编程要点 > 语言 > 正文

多线程事务如何回滚

发布时间:2023-10-19 12:31:32 所属栏目:语言 来源:
导读:最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚.

在spring中
最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚.

在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

下面用一个简单示例演示多线程事务.

公用的类和方法

/**
    * 平均拆分list方法.
    * @param source
    * @param n
    * @param <T>
    * @return
    */
   public static <T> List<List<T>> averageAssign(List<T> source,int n){
       List<List<T>> result=new ArrayList<List<T>>();
       int remaider=source.size()%n; 
       int number=source.size()/n; 
       int offset=0;//偏移量
       for(int i=0;i<n;i++){
           List<T> value=null;
           if(remaider>0){
               value=source.subList(i*number+offset, (i+1)*number+offset+1);
               remaider--;
               offset++;
           }else{
               value=source.subList(i*number+offset, (i+1)*number+offset);
           }
           result.add(value);
       }
       return result;
   }

/**  线程池配置
 * @version V1.0
 * @since 2021-06-08 15:39
 */
public class ExecutorConfig {
    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;
    public static ExecutorService getThreadPool() {
        if (executorService == null){
            synchronized (ExecutorConfig.class){
                if (executorService == null){
                    executorService =  newThreadPool();
                }
            }
        }
        return executorService;
    }
    private static  ExecutorService newThreadPool(){
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());
    }
    private ExecutorConfig(){}
}

/** 获取sqlSession
 * @author 86182
 * @version V1.0
 * @since 2021-06-03 15:08
 */
@Component
public class SqlContext {
    @Resource
    private SqlSessionTemplate sqlSessionTemplate;
    public SqlSession getSqlSession(){
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}
示例事务不成功操作


 /**
     * 测试多线程事务.
     * @param employeeDOList
     */
    @Override
    @Transactional
    public void saveThread(List<EmployeeDO> employeeDOList) {
        try {
            //先做删除操作,如果子线程出现异常,此操作不会回滚
            this.getBaseMapper().delete(null);
            //获取线程池
            ExecutorService service = ExecutorConfig.getThreadPool();
            //拆分数据,拆分5份
            List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);
            //执行的线程
            Thread []threadArray = new Thread[lists.size()];
            //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i =0;i<lists.size();i++){
                if (i==lists.size()-1){
                    atomicBoolean.set(false);
                }
                List<EmployeeDO> list  = lists.get(i);
                threadArray[i] =  new Thread(() -> {
                    try {
                      //最后一个线程抛出异常
                        if (!atomicBoolean.get()){
                            throw new ServiceException("001","出现异常");
                        }
                        //批量添加,mybatisPlus中自带的batch方法
                        this.saveBatch(list);
                    }finally {
                        countDownLatch.countDown();
                    }
                });
            }
            for (int i = 0; i <lists.size(); i++){
                service.execute(threadArray[i]);
            }
            //当子线程执行完毕时,主线程再往下执行
            countDownLatch.await();
            System.out.println("添加完毕");
        }catch (Exception e){
            log.info("error",e);
            throw new ServiceException("002","出现异常");
        }finally {
             connection.close();
         }
    }
 

(编辑:马鞍山站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

    推荐文章