业务场景
我们公司做的是加油业务,用户可以在app上通过当前位置和目标位置来查询地图路线以及路线途径的所有加油站,路线查询会调用高德地图提供的接口,途径油站则根据返回的路线信息进行查询,所以当用户输入起始位置和目标位置点击查询后会做以下几步操作:
调用高德地图接口获取路线根据高德地图返回的路线信息去查询途径的所有油站
问题点
公司发展很快,入驻平台的油站很多,当用户输入的起始地和目标地距离很远时,那么途径油站的数量会很大,单独采取普通查询库的方式会很耗时,并且途径油站的数据必须是实时的,所以无法使用缓存来提高接口响应效率。第一步是调用高德地图API,也会存在一定的延迟。那么我们应该怎么优化呢?
为了降低接口耗时,提高用户体验,我们需要对接口实现进行优化,调用高德API我们无法优化,所以只能优化查询途径油站这部分。
优化思路
当油站过多时,一次查询会很耗时,所以我们可以考虑分批多线程并发的去查询,将一段很长的路线按照路径长度分成若干个条件,比如一段路径长达800km,我们可以将这800km的查询参数拆分成若干个距离较小的参数集合(ps:举例方便大家理解,实际路径规划查询都是根据经纬度、距离等多重参数进行查询的)。比如,{[0,50],[50,100],[100,150]…..[750,800]},这时我们开启多个线程去并发的根据新的查询条件去查询,最后将结果拼接封装返回,从而达到降低查询时间的目的。
虽然思路很容易理解,但是实现起来有两个需要注意的地方,我列出来看看大家有没有考虑到。
根据业务场景,这里不是单纯的异步查询就可以的,而是需要所有的线程都执行完后并且组合查询结果后进行返回,所以这里需要进行同步控制。这里我们使用jdk提供的CountDownLatch同步组件实现。线程内操作需要有返回值,使用Callable接口以及FutureTask搭配实现。
具体实现
1.通常来说,我们定义线程需要实现Runnable接口,但是对于需要返回值的线程,就需要线程实现Callable接口了。
@Component@Slf4j@Scope("protoType") // 这里需要注意Spring默认注入的Bean都是单例的,当前业务场景下肯定需要多个线程去执行查询操作,所以这里声明组件为protoType模式public class PathPlanTask implements Callable<List<Object>> {// 查询参数 private PathPlanPartQuery pathPlanPartQuery; private CountDownLatch countDownLatch; @Override public List<Object> call() throws Exception { try { //TODO 业务查询 List<Object> result = queryList(pathPlanPartQuery); // 返回结果 return result; }catch (Exception e){ // 错误日志打印 log.error("query PathByGasstation error!"); }finally{ // 类似 i-- 的操作,当减到0的时候,countDownLatch.await()就会放行,否则会一直阻塞。 countDownLatch.countDown(); } } public void setPathPlanPartQuery(PathPlanPartQuery pathPlanPartQuery){ this.pathPlanPartQuery = pathPlanPartQuery; } public void setCountDownLatch(CountDownLatch countDownLatch){ this.countDownLatch = countDownLatch; } private List<Object> queryList(PathPlanPartQuery pathPlanPartQuery) { // TODO 具体查询逻辑,这里省略 return Lists.newArrayList(); }}复制代码
2.Callable通常和FutureTask搭配使用,通过FutureTask的get方法获取到线程的返回值。
// 通常定义为工具类进行获取private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(8, 20, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), new ThreadPoolExecutor.AbortPolicy());// 业务代码 private List<Object> queryGasInfoBaseDtoList(List<PathPlanQueryParam> queryParamList) { long stMills = System.currentTimeMillis(); // 定义线程池来进行多线程的管理,通过Util获取静态的线程池 // 定义countDownLatch,构造函数传递参数集合的size,该集合具体参数可以参考 // 上面举的例子,{[0,50],[50,100],[100,150]...[750,800]} CountDownLatch countDownLatch = new CountDownLatch(queryParamList.size());// 批量查询,定义FutureTask集合 List<FutureTask<List<GasInfoBaseResponseDto>>> futureTaskList = Lists.newArrayList(); try { // 遍历查询参数集合 for (PathPlanQueryParam queryParam : queryParamList) { // 这里使用getBean方式获取。 PathPlanTask pathPlanTask = ApplicationContextProvider.getBean("pathPlanTask", PathPlanTask.class); // 设置countDownLatch pathPlanTask.setCountDown(countDownLatch); // 获取查询参数 PathPlanPartQuery pathPlanPartQuery = getPathPlanPartQuery(queryParam); pathPlanTask.setPathPlanPartQuery(pathPlanPartQuery); // 定义FutureTask,将定义好的Callable实现类作为构造参数 FutureTask<List<GasInfoBaseResponseDto>> futureTask = new FutureTask<>(pathPlanTask); // 交给线程池去执行 poolExecutor.submit(futureTask); // 添加futureTask集合 futureTaskList.add(futureTask); } // 这里会一直进行阻塞,直到countDownLatch.countDown()方法将创建时传递的size参数减为0后放行。 // 这块可以保证多个线程全部执行完后进行最终返回。 countDownLatch.await(); // 多个线程执行完后我们拼接最终结果 List<Object> gasInfoDtoList = Lists.newArrayList(); for (FutureTask<List<Object>> futureTask : futureTaskList) { // 通过futrueTask的get方法获取返回值,当线程还在执行未返回时执行futureTask.get()会被阻塞 List<Object> baseResponseDtoList = futureTask.get(); if (CollectionUtils.isNotEmpty(baseResponseDtoList)) { gasInfoDtoList.addAll(baseResponseDtoList); } } return gasInfoDtoList; } catch (Exception e) { log.error("queryGasInfoBaseDtoList_err", e); } finally { log.info("queryGasInfoBaseDtoList_requestId:{},batchTimes:{},cost:{}", pointRequestOld.getRequestId(), pointRequestOld.getBatchTimes(), System.currentTimeMillis() - stMills); } return null; }复制代码
总结
以上是我在工作中遇到的多线程实际应用的场景,总结下来就是通过线程池开启多个Callable线程去分批查询数据, 引入CountDownLatch组件来控制查询结束时机,而后利用FutureTask的get方法获取最终结果拼装返回。