0

0

Java如何实现多线程大批量同步数据

WBOY

WBOY

发布时间:2023-04-29 18:16:07

|

3806人浏览过

|

来源于亿速云

转载

背景

最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。

最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率。

一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!

最后,改造了一番,2小时,就成功同步了300w+数据。

以下是主要逻辑。

立即学习Java免费学习笔记(深入)”;

网趣网上购物系统旗舰版
网趣网上购物系统旗舰版

网趣网上购物系统支持PC电脑版+手机版+APP,数据一站式更新,支持微信支付与支付宝支付接口,是专业的网上商城系统,网趣商城系统支持淘宝数据包导入,实现与淘宝同步更新!支持上传图片水印设置、图片批量上传功能,同时支持订单二次编辑以及多级分类隐藏等实用功能,新版增加商品大图浏览与列表显示功能,使分类浏览更方便,支持最新的支付宝即时到帐接口。

下载

线程的个数请根据你自己的服务器性能酌情设置。

思路

先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。

代码实现

package com.github.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
 * 多线程同步历史数据
 * @author songfayuan
 * @date 2019-09-26 15:38
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    /**
     * 多线程同步通话记录历史数据
     * @param params
     * @return
     * @throws Exception
     */
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
                }
            }
        });
        return Response.success("请求成功");
    }

    /**
     * 处理数据逻辑
     * @param params
     * @throws Exception
     */
    private void logicHandler(Map params) throws Exception {
        /******返回结果:多线程处理完的最终数据******/
        List result = new ArrayList<>();

        /******查询数据库总的数据条数******/
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper()
                .eq("is_delete", 0)
                .eq("platform_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");

//        int count = 2620266;
        /******限制每次查询的条数******/
        int num = 1000;

        /******计算需要查询的次数******/
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        /******每个线程开始查询的行数******/
        int offset = 0;

        /******添加任务******/
        List>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
        List>>> smallList = ListUtils.partition(tasks, 10);
        for (List>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
//                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List>> futures = executor.invokeAll(callableList);
                    /******处理线程返回结果******/
                    if (futures != null && futures.size() > 0) {
                        for (Future> future : futures) {
                            List duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        /******异步存储******/
                                        log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任务拆分执行异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
                }
            }
        }
    }

    /**
     * 数据存储MongoDB
     * @param duyanCallRecordDetailList
     */
    private void saveMongoDB(List duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            /******重复数据不同步MongoDB******/
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            /******关联填写的记录******/
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //补充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //补充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}


class ThredQuery implements Callable> {
    /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
    private DuyanCallRecordDetailService myService;
    /******查询条件 根据条件来定义该类的属性******/
    private Map params;

    /******分页index******/
    private int offset;
    /******数量******/
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List call() throws Exception {
        /******通过service查询得到对应结果******/
        List duyanCallRecordDetailList = myService.selectList(new EntityWrapper()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils工具

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * 描述:List工具类
 * @author songfayuan
 * 2018年7月22日下午2:23:22
 */
@Slf4j
public class ListUtils {
    
    /**
     * 描述:list集合深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年7月22日下午2:35:23
     */
    public static  List deepCopy(List src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List dest = (List) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 描述:对象深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年12月14日
     */
    public static  T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    /**
     * 将一个list均分成n个list,主要通过偏移量来实现的
     * @author songfayuan
     * 2018年12月14日
     */
    public static  List> averageAssign(List source, int n) {
        List> result = new ArrayList>();
        int remaider = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * List按指定长度分割
     * @param list the list to return consecutive sublists of (需要分隔的list)
     * @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
     * @author songfayuan
     * @date 2019-07-07 21:37
     */
    public static  List> partition(List list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        List bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList长度为:{}", bigList.size());
        log.info("bigList为:{}", bigList);
        List> smallists = partition(bigList, 20);
        log.info("smallists长度为:{}", smallists.size());
        for (List smallist : smallists) {
            log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
        }
    }

}

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
mysql修改数据表名
mysql修改数据表名

MySQL修改数据表:1、首先查看数据库中所有的表,代码为:‘SHOW TABLES;’;2、修改表名,代码为:‘ALTER TABLE 旧表名 RENAME [TO] 新表名;’。php中文网还提供MySQL的相关下载、相关课程等内容,供大家免费下载使用。

668

2023.06.20

MySQL创建存储过程
MySQL创建存储过程

存储程序可以分为存储过程和函数,MySQL中创建存储过程和函数使用的语句分别为CREATE PROCEDURE和CREATE FUNCTION。使用CALL语句调用存储过程智能用输出变量返回值。函数可以从语句外调用(通过引用函数名),也能返回标量值。存储过程也可以调用其他存储过程。php中文网还提供MySQL创建存储过程的相关下载、相关课程等内容,供大家免费下载使用。

247

2023.06.21

mongodb和mysql的区别
mongodb和mysql的区别

mongodb和mysql的区别:1、数据模型;2、查询语言;3、扩展性和性能;4、可靠性。本专题为大家提供mongodb和mysql的区别的相关的文章、下载、课程内容,供大家免费下载体验。

281

2023.07.18

mysql密码忘了怎么查看
mysql密码忘了怎么查看

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql密码忘了怎么办呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

515

2023.07.19

mysql创建数据库
mysql创建数据库

MySQL是一个关系型数据库管理系统,由瑞典MySQL AB 公司开发,属于 Oracle 旗下产品。MySQL 是最流行的关系型数据库管理系统之一,在 WEB 应用方面,MySQL是最好的 RDBMS 应用软件之一。那么mysql怎么创建数据库呢?php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

256

2023.07.25

mysql默认事务隔离级别
mysql默认事务隔离级别

MySQL是一种广泛使用的关系型数据库管理系统,它支持事务处理。事务是一组数据库操作,它们作为一个逻辑单元被一起执行。为了保证事务的一致性和隔离性,MySQL提供了不同的事务隔离级别。php中文网给大家带来了相关的教程以及文章欢迎大家前来学习阅读。

386

2023.08.08

sqlserver和mysql区别
sqlserver和mysql区别

SQL Server和MySQL是两种广泛使用的关系型数据库管理系统。它们具有相似的功能和用途,但在某些方面存在一些显著的区别。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

532

2023.08.11

mysql忘记密码
mysql忘记密码

MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。那么忘记mysql密码我们该怎么解决呢?php中文网给大家带来了相关的教程以及其他关于mysql的文章,欢迎大家前来学习阅读。

601

2023.08.14

Python 自然语言处理(NLP)基础与实战
Python 自然语言处理(NLP)基础与实战

本专题系统讲解 Python 在自然语言处理(NLP)领域的基础方法与实战应用,涵盖文本预处理(分词、去停用词)、词性标注、命名实体识别、关键词提取、情感分析,以及常用 NLP 库(NLTK、spaCy)的核心用法。通过真实文本案例,帮助学习者掌握 使用 Python 进行文本分析与语言数据处理的完整流程,适用于内容分析、舆情监测与智能文本应用场景。

10

2026.01.27

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.9万人学习

C# 教程
C# 教程

共94课时 | 7.7万人学习

Java 教程
Java 教程

共578课时 | 51.9万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号