分布式多机扫库脚本

需求背景:需要扫描全库的商品数据,然后根据不同的条件执行业务。比较麻烦的就是如何去扫库。现在的商品表比较少,大概四百万条数据。但是分了八个库,1024张表。每张表大概有4k的数据。

解决思路 有八台机器,所以八机并行,每个处理一部分的数据,那么每个机器需要获得一个标示,然后通过标示去处理数据。那么现在的问题就是如何让机器获取标示。这个时候就想到了分布式任务调度。可以分片, 这样每台机器就可以获得到不同的分片。可以通过分片区处理一部分的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static List<String> getShardingTables(List<String> tables, int shardingCount, Map<Integer, String> shardingMap) { 
// 如果分片数1个,或者分片map信息为空,直接返回整个table
if (shardingCount <= 1 || MapUtils.isEmpty(shardingMap)){
return tables;
}
// 在设定分片数的时候会特意把分片数设为可以整除表数,如果不能整除说明设置的有问题,也返回整张表
if (tables.size() % shardingCount != 0) {
return tables;
}
// 每个分片要处理的表数量
int index = tables.size() / shardingCount;
// key为分片号,value为该分片要处理的表名称列表
Map<Integer, List<String>> map = new HashMap<>();
for (int i = 0; i < shardingCount; i++) {
List<String> subList = tables.subList(i * index, (i + 1) * index);
map.put(i, subList); }
List<String> result = new ArrayList<>();
// 从当前机子分到的分片中找到对应要处理的表名称列表合集
for (Integer key : shardingMap.keySet()) {
if (map.get(key) != null) {
result.addAll(map.get(key));
}
}
return result;
}

这样就可以多机并行处理。 那这样的效率不是最高,我们单机还可以并发。开一个线程池去多线程处理。我们在单机也需要将数据再次分区,分成每个线程去执行一部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void doTotalDump(int shardingCount, Map<Integer, String> shardingMap) {
// 拿到所有物理表名
String s = itemDao.queryTablePartitions("Item");
List<String> tablesAll = Hints.parseRouteCustom2List(s);
List<String> tables = getShardingTables(tablesAll, shardingCount, shardingMap);
if (null != tables && !tables.isEmpty()) {
// 遍历物理表,找到符合当前分片的,进行处理
logger.info("total dump...tables:{}", tables.toString());
// 分线程操作
int total = tables.size();
// 每个线程操作的表数目
// 如果无法除尽,四舍五入,然后最后一个线程操作剩下的表
if (threadCount > total) {
// 如果设置了超过总表数的线程,只使用总表数个
threadCount = total;
}
int eachCount = Math.round(total / (float) threadCount);
for (int i = 0; i < threadCount; i++) {
List<String> tablei;
if (i == threadCount - 1) {
// 最后一个特殊处理
// 前面几个线程已经处理过的数量
int start = eachCount * (threadCount - 1);
tablei = tables.subList(start, total);
} else {
tablei = tables.subList(i * eachCount, (i + 1) * eachCount);
}
List<String> finalTablei = tablei;
threadPoolExecutor.execute(() -> operateTable(finalTablei));
}
}
}

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×