以下是针对20秒内完成200万数据比对的几种优化方案:
1. 基础方案 – 批量查询 + 哈希优化
// 使用分页批量查询 + HashSet
public void compareData() {
int pageSize = 50000; // 每次处理5万条
// 从DB1批量读取
Map<Integer, String> db1Data = new ConcurrentHashMap<>();
int offset = 0;
while (true) {
String sql1 = "SELECT id, data FROM table1 LIMIT ? OFFSET ?";
// 执行查询...
if (data1.isEmpty()) break;
data1.forEach(row -> db1Data.put(row.getId(), row.getData()));
offset += pageSize;
}
// 从DB2批量查询并比对
offset = 0;
List<String> diffInDB2 = Collections.synchronizedList(new ArrayList<>());
List<String> diffInDB1 = Collections.synchronizedList(new ArrayList<>(db1Data.keySet()));
ExecutorService executor = Executors.newFixedThreadPool(8);
while (true) {
String sql2 = "SELECT id, data FROM table2 LIMIT ? OFFSET ?";
// 执行查询...
if (data2.isEmpty()) break;
List<DataRow> batch = data2;
executor.submit(() -> {
for (DataRow row : batch) {
String db1Value = db1Data.remove(row.getId());
if (db1Value == null) {
diffInDB2.add("DB2独有: " + row.getId());
} else if (!db1Value.equals(row.getData())) {
diffInDB2.add("数据不一致: " + row.getId());
}
}
});
offset += pageSize;
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
// diffInDB1中剩余的就是DB1独有的
}
2. 内存优化方案 – 布隆过滤器 + 外部存储
// 使用布隆过滤器快速排除大部分数据
public void compareWithBloomFilter() throws Exception {
BloomFilter<String> bloomFilter = BloomFilter.create(
Funnels.stringFunnel(Charset.defaultCharset()),
1000000, // 期望元素数量
0.01 // 误判率1%
);
// 1. 构建布隆过滤器
String sql1 = "SELECT id FROM table1";
try (PreparedStatement ps1 = conn1.prepareStatement(sql1);
ResultSet rs1 = ps1.executeQuery()) {
while (rs1.next()) {
bloomFilter.put(rs1.getString("id"));
}
}
// 2. 使用多线程比对
ExecutorService executor = Executors.newFixedThreadPool(8);
List<Future<List<String>>> futures = new ArrayList<>();
for (int i = 0; i < 8; i++) {
int threadId = i;
futures.add(executor.submit(() -> {
List<String> diffs = new ArrayList<>();
int batchSize = 125000; // 每个线程处理12.5万条
String sql2 = "SELECT id, data FROM table2 LIMIT ? OFFSET ?";
try (PreparedStatement ps2 = conn2.prepareStatement(sql2)) {
ps2.setInt(1, batchSize);
ps2.setInt(2, threadId * batchSize);
ResultSet rs2 = ps2.executeQuery();
while (rs2.next()) {
String id = rs2.getString("id");
if (!bloomFilter.mightContain(id)) {
diffs.add("DB2独有: " + id);
} else {
// 需要精确比对
// 这里从DB1查询详细信息
}
}
}
return diffs;
}));
}
// 收集结果
List<String> allDiffs = new ArrayList<>();
for (Future<List<String>> future : futures) {
allDiffs.addAll(future.get());
}
}
3. 高性能方案 – Redis中间缓存
// 使用Redis作为中间缓存
public void compareWithRedis() {
JedisPool jedisPool = new JedisPool("localhost", 6379);
// 1. 将DB1数据批量写入Redis
String sql1 = "SELECT id, data FROM table1";
try (Jedis jedis = jedisPool.getResource();
PreparedStatement ps1 = conn1.prepareStatement(sql1);
ResultSet rs1 = ps1.executeQuery()) {
Pipeline pipeline = jedis.pipelined();
int count = 0;
while (rs1.next()) {
pipeline.set(rs1.getString("id"), rs1.getString("data"));
if (++count % 10000 == 0) {
pipeline.sync();
}
}
pipeline.sync();
}
// 2. 批量比对DB2数据
String sql2 = "SELECT id, data FROM table2";
try (Jedis jedis = jedisPool.getResource();
PreparedStatement ps2 = conn2.prepareStatement(sql2,
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
ResultSet rs2 = ps2.executeQuery()) {
rs2.setFetchSize(50000);
Pipeline pipeline = jedis.pipelined();
List<Response<String>> responses = new ArrayList<>();
List<String> diffs = new ArrayList<>();
while (rs2.next()) {
String id = rs2.getString("id");
String db2Data = rs2.getString("data");
responses.add(pipeline.get(id));
if (responses.size() % 10000 == 0) {
pipeline.sync();
for (int i = 0; i < responses.size(); i++) {
String db1Data = responses.get(i).get();
if (db1Data == null) {
diffs.add("DB2独有: " + id);
} else if (!db1Data.equals(db2Data)) {
diffs.add("数据不一致: " + id);
}
}
responses.clear();
}
}
}
}
4. 终极方案 – 数据库直接比对
// 如果两个数据库可以连接,使用数据库的UNION/EXCEPT
public void directDatabaseCompare() {
// 使用数据库的集合操作
String sql = """
-- DB1独有的
SELECT id, 'DB1_ONLY' as source FROM db1.table1
EXCEPT
SELECT id, 'DB1_ONLY' FROM db2.table2
UNION ALL
-- DB2独有的
SELECT id, 'DB2_ONLY' as source FROM db2.table2
EXCEPT
SELECT id, 'DB2_ONLY' FROM db1.table1
UNION ALL
-- 数据不一致的
SELECT a.id, 'DIFFERENT' as source
FROM db1.table1 a
INNER JOIN db2.table2 b ON a.id = b.id
WHERE a.data <> b.data
""";
// 执行查询,数据库会进行优化
}
5. 优化建议总结
| 优化方向 | 具体措施 | 预期效果 |
|---|---|---|
| IO优化 | 批量读取、设置fetchSize、使用游标 | 减少网络往返 |
| 内存优化 | 分块处理、布隆过滤器 | 降低内存占用 |
| CPU优化 | 多线程并行处理 | 充分利用多核 |
| 算法优化 | 使用哈希、位图、排序合并 | 减少比较次数 |
| 存储优化 | Redis缓存、数据库链接 | 加速数据访问 |
关键配置参数
// JDBC优化参数
String url = "jdbc:mysql://host/db?useUnicode=true&characterEncoding=utf8"
+ "&rewriteBatchedStatements=true" // 批量插入
+ "&useServerPrepStmts=false" // 禁用服务器预处理
+ "&useCursorFetch=true" // 使用游标
+ "&defaultFetchSize=50000"; // 每批获取5万条
// 连接池配置
HikariConfig config = new HikariConfig();
config.setMaximumPoolSize(20); // 增大连接池
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
选择哪种方案取决于具体场景:
- 如果允许数据库直连,方案4最快
- 如果内存充足,方案1最简单
- 如果需要分布式处理,方案3最合适