java 如何在20秒内完成两个库各100万数据比对操作?

以下是针对20秒内完成200万数据比对的几种优化方案:

1. 基础方案 – 批量查询 + 哈希优化

// 使用分页批量查询 + HashSet
public void compareData() {
    int pageSize = 50000; // 每次处理5万条
    
    // 从DB1批量读取
    Map<Integer, String> db1Data = new ConcurrentHashMap<>();
    int offset = 0;
    while (true) {
        String sql1 = "SELECT id, data FROM table1 LIMIT ? OFFSET ?";
        // 执行查询...
        if (data1.isEmpty()) break;
        data1.forEach(row -> db1Data.put(row.getId(), row.getData()));
        offset += pageSize;
    }
    
    // 从DB2批量查询并比对
    offset = 0;
    List<String> diffInDB2 = Collections.synchronizedList(new ArrayList<>());
    List<String> diffInDB1 = Collections.synchronizedList(new ArrayList<>(db1Data.keySet()));
    
    ExecutorService executor = Executors.newFixedThreadPool(8);
    
    while (true) {
        String sql2 = "SELECT id, data FROM table2 LIMIT ? OFFSET ?";
        // 执行查询...
        if (data2.isEmpty()) break;
        
        List<DataRow> batch = data2;
        executor.submit(() -> {
            for (DataRow row : batch) {
                String db1Value = db1Data.remove(row.getId());
                if (db1Value == null) {
                    diffInDB2.add("DB2独有: " + row.getId());
                } else if (!db1Value.equals(row.getData())) {
                    diffInDB2.add("数据不一致: " + row.getId());
                }
            }
        });
        offset += pageSize;
    }
    
    executor.shutdown();
    executor.awaitTermination(1, TimeUnit.MINUTES);
    
    // diffInDB1中剩余的就是DB1独有的
}

2. 内存优化方案 – 布隆过滤器 + 外部存储

// 使用布隆过滤器快速排除大部分数据
public void compareWithBloomFilter() throws Exception {
    BloomFilter<String> bloomFilter = BloomFilter.create(
        Funnels.stringFunnel(Charset.defaultCharset()), 
        1000000,  // 期望元素数量
        0.01      // 误判率1%
    );
    
    // 1. 构建布隆过滤器
    String sql1 = "SELECT id FROM table1";
    try (PreparedStatement ps1 = conn1.prepareStatement(sql1);
         ResultSet rs1 = ps1.executeQuery()) {
        while (rs1.next()) {
            bloomFilter.put(rs1.getString("id"));
        }
    }
    
    // 2. 使用多线程比对
    ExecutorService executor = Executors.newFixedThreadPool(8);
    List<Future<List<String>>> futures = new ArrayList<>();
    
    for (int i = 0; i < 8; i++) {
        int threadId = i;
        futures.add(executor.submit(() -> {
            List<String> diffs = new ArrayList<>();
            int batchSize = 125000; // 每个线程处理12.5万条
            String sql2 = "SELECT id, data FROM table2 LIMIT ? OFFSET ?";
            
            try (PreparedStatement ps2 = conn2.prepareStatement(sql2)) {
                ps2.setInt(1, batchSize);
                ps2.setInt(2, threadId * batchSize);
                ResultSet rs2 = ps2.executeQuery();
                
                while (rs2.next()) {
                    String id = rs2.getString("id");
                    if (!bloomFilter.mightContain(id)) {
                        diffs.add("DB2独有: " + id);
                    } else {
                        // 需要精确比对
                        // 这里从DB1查询详细信息
                    }
                }
            }
            return diffs;
        }));
    }
    
    // 收集结果
    List<String> allDiffs = new ArrayList<>();
    for (Future<List<String>> future : futures) {
        allDiffs.addAll(future.get());
    }
}

3. 高性能方案 – Redis中间缓存

// 使用Redis作为中间缓存
public void compareWithRedis() {
    JedisPool jedisPool = new JedisPool("localhost", 6379);
    
    // 1. 将DB1数据批量写入Redis
    String sql1 = "SELECT id, data FROM table1";
    try (Jedis jedis = jedisPool.getResource();
         PreparedStatement ps1 = conn1.prepareStatement(sql1);
         ResultSet rs1 = ps1.executeQuery()) {
        
        Pipeline pipeline = jedis.pipelined();
        int count = 0;
        while (rs1.next()) {
            pipeline.set(rs1.getString("id"), rs1.getString("data"));
            if (++count % 10000 == 0) {
                pipeline.sync();
            }
        }
        pipeline.sync();
    }
    
    // 2. 批量比对DB2数据
    String sql2 = "SELECT id, data FROM table2";
    try (Jedis jedis = jedisPool.getResource();
         PreparedStatement ps2 = conn2.prepareStatement(sql2, 
             ResultSet.TYPE_FORWARD_ONLY, 
             ResultSet.CONCUR_READ_ONLY);
         ResultSet rs2 = ps2.executeQuery()) {
        
        rs2.setFetchSize(50000);
        Pipeline pipeline = jedis.pipelined();
        List<Response<String>> responses = new ArrayList<>();
        List<String> diffs = new ArrayList<>();
        
        while (rs2.next()) {
            String id = rs2.getString("id");
            String db2Data = rs2.getString("data");
            
            responses.add(pipeline.get(id));
            if (responses.size() % 10000 == 0) {
                pipeline.sync();
                for (int i = 0; i < responses.size(); i++) {
                    String db1Data = responses.get(i).get();
                    if (db1Data == null) {
                        diffs.add("DB2独有: " + id);
                    } else if (!db1Data.equals(db2Data)) {
                        diffs.add("数据不一致: " + id);
                    }
                }
                responses.clear();
            }
        }
    }
}

4. 终极方案 – 数据库直接比对

// 如果两个数据库可以连接,使用数据库的UNION/EXCEPT
public void directDatabaseCompare() {
    // 使用数据库的集合操作
    String sql = """
        -- DB1独有的
        SELECT id, 'DB1_ONLY' as source FROM db1.table1
        EXCEPT
        SELECT id, 'DB1_ONLY' FROM db2.table2
        
        UNION ALL
        
        -- DB2独有的
        SELECT id, 'DB2_ONLY' as source FROM db2.table2
        EXCEPT
        SELECT id, 'DB2_ONLY' FROM db1.table1
        
        UNION ALL
        
        -- 数据不一致的
        SELECT a.id, 'DIFFERENT' as source 
        FROM db1.table1 a
        INNER JOIN db2.table2 b ON a.id = b.id
        WHERE a.data <> b.data
        """;
    
    // 执行查询,数据库会进行优化
}

5. 优化建议总结

优化方向具体措施预期效果
IO优化批量读取、设置fetchSize、使用游标减少网络往返
内存优化分块处理、布隆过滤器降低内存占用
CPU优化多线程并行处理充分利用多核
算法优化使用哈希、位图、排序合并减少比较次数
存储优化Redis缓存、数据库链接加速数据访问

关键配置参数

// JDBC优化参数
String url = "jdbc:mysql://host/db?useUnicode=true&characterEncoding=utf8"
    + "&rewriteBatchedStatements=true"    // 批量插入
    + "&useServerPrepStmts=false"         // 禁用服务器预处理
    + "&useCursorFetch=true"              // 使用游标
    + "&defaultFetchSize=50000";          // 每批获取5万条

// 连接池配置
HikariConfig config = new HikariConfig();
config.setMaximumPoolSize(20);            // 增大连接池
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);

选择哪种方案取决于具体场景:

  • 如果允许数据库直连,方案4最快
  • 如果内存充足,方案1最简单
  • 如果需要分布式处理,方案3最合适

作 者:南烛
链 接:https://www.itnotes.top/archives/935
来 源:IT笔记
文章版权归作者所有,转载请注明出处!


上一篇
下一篇