需求背景:需要扫描全库的商品数据,然后根据不同的条件执行业务。比较麻烦的就是如何去扫库。现在的商品表比较少,大概四百万条数据。但是分了八个库,1024张表。每张表大概有4k的数据。
解决思路 有八台机器,所以八机并行,每个处理一部分的数据,那么每个机器需要获得一个标示,然后通过标示去处理数据。那么现在的问题就是如何让机器获取标示。这个时候就想到了分布式任务调度。可以分片, 这样每台机器就可以获得到不同的分片。可以通过分片区处理一部分的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| private static List<String> getShardingTables(List<String> tables, int shardingCount, Map<Integer, String> shardingMap) { if (shardingCount <= 1 || MapUtils.isEmpty(shardingMap)){ return tables; } if (tables.size() % shardingCount != 0) { return tables; } int index = tables.size() / shardingCount; Map<Integer, List<String>> map = new HashMap<>(); for (int i = 0; i < shardingCount; i++) { List<String> subList = tables.subList(i * index, (i + 1) * index); map.put(i, subList); } List<String> result = new ArrayList<>(); for (Integer key : shardingMap.keySet()) { if (map.get(key) != null) { result.addAll(map.get(key)); } } return result; }
|
这样就可以多机并行处理。 那这样的效率不是最高,我们单机还可以并发。开一个线程池去多线程处理。我们在单机也需要将数据再次分区,分成每个线程去执行一部分。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private void doTotalDump(int shardingCount, Map<Integer, String> shardingMap) { String s = itemDao.queryTablePartitions("Item"); List<String> tablesAll = Hints.parseRouteCustom2List(s); List<String> tables = getShardingTables(tablesAll, shardingCount, shardingMap); if (null != tables && !tables.isEmpty()) { logger.info("total dump...tables:{}", tables.toString()); int total = tables.size(); if (threadCount > total) { threadCount = total; } int eachCount = Math.round(total / (float) threadCount); for (int i = 0; i < threadCount; i++) { List<String> tablei; if (i == threadCount - 1) { int start = eachCount * (threadCount - 1); tablei = tables.subList(start, total); } else { tablei = tables.subList(i * eachCount, (i + 1) * eachCount); } List<String> finalTablei = tablei; threadPoolExecutor.execute(() -> operateTable(finalTablei)); } } }
|