网站首页 > 精选教程 正文
一、parseq简介
ParSeq 是一个框架,它使用 Java 编写异步代码变得更加容易。它的特点有异步操作的并行化、非阻塞计算的序列化执行、通过任务组合重用代码、常见的错误传播和恢复、执行跟踪和可视化、异步批量的批处理、具有重试策略的任务。
可用版本:https://mvnrepository.com/artifact/com.linkedin.parseq/parseq。
change log:https://github.com/linkedin/parseq/blob/master/CHANGELOG.md
二、关键概念
概念 | 含义 |
Task | 是ParSeq系统中一系列工作的基础,类似于Java的Callable,但是Task可以异步的获取结果。Task不能被用户直接执行,必须通过Engine执行。Task实现了类似于Java Future的Promise接口。Task可以被转换和组合并最终执行得到预期的结果。 |
Promise | Promise和Future有着相同的概念,在ParSeq中,用户可以使用CompletableFuture创建一个任务。 |
Plan | Plan是一系列Task的集合,作为一个运行根Task的结果。 |
Engine | Task的执行者,它跟Event Loop相似,但是一个Plan中只有一个循环。 |
三、API简介
API | 简介 |
转型方法 .map() .flatMap() .transform() | 另外创建一个任务,使用前一个任务的返回值 |
串行 .andThen() Tasks.seq() | 后续任务在前一任务完成后开始 |
并行 Task.par() | 任务并行 |
Task.value() | 创建一个值解析Task,这是一个包装值的方法。 |
Task.failure() | 创建一个抛异常就会失败的Task,这是一个用于包装异常的方法。 |
Task.callable() | 将代码包装到Task中;这个方法包装callable,并将返回值返回到Task中;注意,此方法不应用于阻塞需要长时间运行的执行。此方法创建的任务将阻止ParSeq引擎在该阻止代码完成之前拾取同一计划中的其他任务。 |
Task.action() | 跟Task.callable()方法类似,但是没有返回值。 |
Task.blocking() | 将阻塞代码逻辑包装到Task中;将callable包装到Task,但可调用对象将在另一个执行器服务中运行。它比Task使用更多的资源开销。Task.callable(),但它不会阻止当前计划中的其他任务。建议用户使用此方法运行长时间运行的阻塞代码。 |
Task.async(Callable<Promise>) | 传入一个返回Promise的callable。例如,当将其他异步库与ParSeq集成时,此方法非常有用 |
四、使用方法
4.1 快速开始
第1-9行,声明了一个Engine,第10-21行,将一个任务进行异步执行。当然,这样做毫无意义,只是展示了API的基本用法。
final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();
final Engine engine = new EngineBuilder()
.setTaskExecutor(taskScheduler)
.setTimerScheduler(timerScheduler)
.build();
public List<Blog> findBlogList(String userId) {
Task<List<Blog>> blogListFuture = Task.callable("findLogList",
() -> blogMapper.findBlogList(userId));
engine.run(blogListFuture);
try {
blogListFuture.await();
} catch (InterruptedException e) {
// 处理异常
}
return blogListFuture.get();
}
4.2 任务并联
下面代码是一个查询博客列表的例子,查询博客列表和查询博客数目可以并发执行。
第3行查询博客列表,第7行查询共有多少条博客,第11行将两个任务进行并联执行,并将结果进行封装。
public PageResult queryBlogPage(PageQueryUtil pageUtil) {
Task<List<Blog>> blogListFuture = Task.callable("findLogList", () ->
blogMapper.findBlogList(pageUtil)
);
Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
blogMapper.getTotalBlogs(pageUtil)
);
Task<PageResult> result = Task.par(blogListFuture,blogTotalFuture).map("mergeResult", (blogList,blogTotal) ->
new PageResult(blogList, blogTotal, pageUtil.getLimit(), pageUtil.getPage())
);
engine.run(result);
try {
result.await();
} catch (InterruptedException e) {
// 处理异常
}
return result.get();
}
4.3 任务串联
假如后一个任务对前一个任务的返回结果有依赖,代码又该如何写呢。
下面代码中,步骤[2]查询博客的分类需要依赖步骤[1]查出来的博客列表,步骤[2]利用flatMap方法取出步骤[1]的返回结果,再创建一个新的Task来执行步骤[2],达到串行的目的。
步骤[2]和步骤[3]又可以并发执行,使用Task.par可以实现任务的并发编排。
public PageResult getBlogsForIndexPage(int page) {
Map params = new HashMap<String, Object>();
params.put("page", page);
params.put("limit", 10);
params.put("blogStatus", 1);
PageQueryUtil pageUtil = new PageQueryUtil(params);
// [1] 查询博客列表
Task<List<Blog>> blogListFuture = Task.callable("findLogList", () ->
blogMapper.findBlogList(pageUtil)
);
// [2] 根据博客列表查询博客的分类,依赖步骤[1]
Task<List<BlogListVO>> blogListVOSListFuture =
blogListFuture.flatMap("flatMap", blogList ->
Task.callable("queryBlogCategory", () -> queryBlogCategory(blogList))
);
// [3] 查询博客总条数
Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
blogMapper.getTotalBlogs(pageUtil)
);
// [2][3]并发执行
Task<PageResult> result = Task.par(blogListVOSListFuture, blogTotalFuture)
.map("mergeResult", (blogListVOSList, blogTotal) -> new PageResult(blogListVOSList, blogTotal, pageUtil.getLimit(), pageUtil.getPage()));
engine.run(result);
try {
result.await();
} catch (Exception e) {
e.printStackTrace();
}
return result.get();
}
4.4 重试机制
带重试策略的异步任务,RetryPolicy.attempts(3, 500) 表示重试3次,每次间隔500ms。
Task<List<Blog>> blogListFuture =
Task.withRetryPolicy("retry",RetryPolicy.attempts(3, 500),
() -> Task.callable("findBlogList",() -> blogMapper.findBlogList(pageUtil))
);
4.5 异常恢复
使用默认值:使用recover方法,可以在getTotalBlogs的方法发生异常时,使用默认值兜底。
Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
blogMapper.getTotalBlogs(pageUtil)
).recover("dealException",e -> 0);
降级到具体的降级方法:使用recoverWith方法,可以在getTotalBlogs的方法发生异常时,使用降级方法兜底。
Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
blogMapper.getTotalBlogs(pageUtil)
).recoverWith("dealException",
e -> Task.callable("degradationMethod",() -> degradationMethod(pageUtil)));
- 上一篇: Java异步记录日志-2022新项目
- 下一篇: Java面试常见问题:阻塞与非阻塞,同步与异步
猜你喜欢
- 2024-11-27 多线程技术(同步异步,并发并行)守护线程(java垃圾回收机制)
- 2024-11-27 Java 爬虫遇上数据异步加载,试试这两种办法
- 2024-11-27 Java 17 多线程的同步和异步synchronized关键字
- 2024-11-27 java异步编程产生的原因
- 2024-11-27 Java:了解Java中的异步套接字通道
- 2024-11-27 Java面试常见问题:阻塞与非阻塞,同步与异步
- 2024-11-27 Java异步记录日志-2022新项目
- 2024-11-27 Java异步编程实战:基于JDK中的Future实现异步编程
- 2024-11-27 Java8异步编程就是拽的不行
- 2024-11-27 Java异步任务优化CompletionService
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- nginx反向代理 (57)
- nginx日志 (56)
- nginx限制ip访问 (62)
- mac安装nginx (55)
- java和mysql (59)
- java中final (62)
- win10安装java (72)
- java启动参数 (64)
- java链表反转 (64)
- 字符串反转java (72)
- java逻辑运算符 (59)
- java 请求url (65)
- java信号量 (57)
- java定义枚举 (59)
- java字符串压缩 (56)
- java中的反射 (59)
- java 三维数组 (55)
- java插入排序 (68)
- java线程的状态 (62)
- java异步调用 (55)
- java中的异常处理 (62)
- java锁机制 (54)
- java静态内部类 (55)
- java怎么添加图片 (60)
- java 权限框架 (55)
本文暂时没有评论,来添加一个吧(●'◡'●)