JAVA和Nginx 教程大全

网站首页 > 精选教程 正文

parseq-让java异步并发编程更简单

wys521 2024-11-27 12:14:01 精选教程 23 ℃ 0 评论

一、parseq简介

ParSeq 是一个框架,它使用 Java 编写异步代码变得更加容易。它的特点有异步操作的并行化、非阻塞计算的序列化执行、通过任务组合重用代码、常见的错误传播和恢复、执行跟踪和可视化、异步批量的批处理、具有重试策略的任务。

可用版本:https://mvnrepository.com/artifact/com.linkedin.parseq/parseq。

change log:https://github.com/linkedin/parseq/blob/master/CHANGELOG.md

二、关键概念

概念

含义

Task

是ParSeq系统中一系列工作的基础,类似于Java的Callable,但是Task可以异步的获取结果。Task不能被用户直接执行,必须通过Engine执行。Task实现了类似于Java Future的Promise接口。Task可以被转换和组合并最终执行得到预期的结果。

Promise

Promise和Future有着相同的概念,在ParSeq中,用户可以使用CompletableFuture创建一个任务。

Plan

Plan是一系列Task的集合,作为一个运行根Task的结果。

Engine

Task的执行者,它跟Event Loop相似,但是一个Plan中只有一个循环。

三、API简介

API

简介

转型方法

.map()

.flatMap()

.transform()

另外创建一个任务,使用前一个任务的返回值

串行

.andThen()

Tasks.seq()

后续任务在前一任务完成后开始

并行

Task.par()

任务并行

Task.value()

创建一个值解析Task,这是一个包装值的方法。

Task.failure()

创建一个抛异常就会失败的Task,这是一个用于包装异常的方法。

Task.callable()

将代码包装到Task中;这个方法包装callable,并将返回值返回到Task中;注意,此方法不应用于阻塞需要长时间运行的执行。此方法创建的任务将阻止ParSeq引擎在该阻止代码完成之前拾取同一计划中的其他任务。

Task.action()

跟Task.callable()方法类似,但是没有返回值。

Task.blocking()

将阻塞代码逻辑包装到Task中;将callable包装到Task,但可调用对象将在另一个执行器服务中运行。它比Task使用更多的资源开销。Task.callable(),但它不会阻止当前计划中的其他任务。建议用户使用此方法运行长时间运行的阻塞代码。

Task.async(Callable<Promise>)

传入一个返回Promise的callable。例如,当将其他异步库与ParSeq集成时,此方法非常有用

四、使用方法

4.1 快速开始

第1-9行,声明了一个Engine,第10-21行,将一个任务进行异步执行。当然,这样做毫无意义,只是展示了API的基本用法。

final int numCores = Runtime.getRuntime().availableProcessors();
final ExecutorService taskScheduler = Executors.newFixedThreadPool(numCores + 1);
final ScheduledExecutorService timerScheduler = Executors.newSingleThreadScheduledExecutor();

final Engine engine = new EngineBuilder()
    .setTaskExecutor(taskScheduler)
    .setTimerScheduler(timerScheduler)
    .build();

public List<Blog> findBlogList(String userId) {
	Task<List<Blog>> blogListFuture = Task.callable("findLogList", 
                                () -> blogMapper.findBlogList(userId));
    engine.run(blogListFuture);
    
    try {
        blogListFuture.await();
    } catch (InterruptedException e) {
        // 处理异常
    }
    
    return blogListFuture.get();
}

4.2 任务并联

下面代码是一个查询博客列表的例子,查询博客列表和查询博客数目可以并发执行。

第3行查询博客列表,第7行查询共有多少条博客,第11行将两个任务进行并联执行,并将结果进行封装。

public PageResult queryBlogPage(PageQueryUtil pageUtil) {

    Task<List<Blog>> blogListFuture = Task.callable("findLogList", () ->
        blogMapper.findBlogList(pageUtil)
    );

    Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
            blogMapper.getTotalBlogs(pageUtil)
    );

    Task<PageResult> result = Task.par(blogListFuture,blogTotalFuture).map("mergeResult", (blogList,blogTotal) ->
        new PageResult(blogList, blogTotal, pageUtil.getLimit(), pageUtil.getPage())
    );

    engine.run(result);

    try {
        result.await();
    } catch (InterruptedException e) {
        // 处理异常
    }

    return result.get();
}


4.3 任务串联

假如后一个任务对前一个任务的返回结果有依赖,代码又该如何写呢。

下面代码中,步骤[2]查询博客的分类需要依赖步骤[1]查出来的博客列表,步骤[2]利用flatMap方法取出步骤[1]的返回结果,再创建一个新的Task来执行步骤[2],达到串行的目的。

步骤[2]和步骤[3]又可以并发执行,使用Task.par可以实现任务的并发编排。

public PageResult getBlogsForIndexPage(int page) {
    Map params = new HashMap<String, Object>();
    params.put("page", page);
    params.put("limit", 10);
    params.put("blogStatus", 1);

    PageQueryUtil pageUtil = new PageQueryUtil(params);

    // [1] 查询博客列表
    Task<List<Blog>> blogListFuture = Task.callable("findLogList", () ->
            blogMapper.findBlogList(pageUtil)
    );

    // [2] 根据博客列表查询博客的分类,依赖步骤[1]
    Task<List<BlogListVO>> blogListVOSListFuture =
            blogListFuture.flatMap("flatMap", blogList ->
                    Task.callable("queryBlogCategory", () -> queryBlogCategory(blogList))
            );

    // [3] 查询博客总条数
    Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
            blogMapper.getTotalBlogs(pageUtil)
    );

    // [2][3]并发执行
    Task<PageResult> result = Task.par(blogListVOSListFuture, blogTotalFuture)
            .map("mergeResult", (blogListVOSList, blogTotal) -> new PageResult(blogListVOSList, blogTotal, pageUtil.getLimit(), pageUtil.getPage()));

    engine.run(result);

    try {
        result.await();
    } catch (Exception e) {
        e.printStackTrace();
    }

    return result.get();
}

4.4 重试机制

带重试策略的异步任务,RetryPolicy.attempts(3, 500) 表示重试3次,每次间隔500ms。

 Task<List<Blog>> blogListFuture =
                Task.withRetryPolicy("retry",RetryPolicy.attempts(3, 500),
                () -> Task.callable("findBlogList",() -> blogMapper.findBlogList(pageUtil))
        );

4.5 异常恢复

使用默认值:使用recover方法,可以在getTotalBlogs的方法发生异常时,使用默认值兜底。

Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
                blogMapper.getTotalBlogs(pageUtil)
        ).recover("dealException",e -> 0);

降级到具体的降级方法:使用recoverWith方法,可以在getTotalBlogs的方法发生异常时,使用降级方法兜底。

 Task<Integer> blogTotalFuture = Task.callable("getTotalBlogs", () ->
                blogMapper.getTotalBlogs(pageUtil)
        ).recoverWith("dealException",
      e -> Task.callable("degradationMethod",() -> degradationMethod(pageUtil)));

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表