Spring 异步事件监听器

基于spring    

SmartApplicationListener实现异步事件监听器,但是两个异步之间的控制不了执行顺序
/**
 * 定义事件监听器的顺序
 * @return
 */
@Override
public int getOrder() {
    return 100;
} 

无效

idea中需要加上延迟才能看出异步的效果

效果图

访问地址:http://127.0.0.1:8090/hello

 

 

配置线程池
package com.event.springboot.configuration;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;

/**
 * 配置线程池
 * @author
 */
@Configuration
public class ListenerAsyncConfiguration implements AsyncConfigurer {
    /**
     * 获取异步线程池执行对象
     * @return
     */
    @Override
    public Executor getAsyncExecutor() {
        //使用Spring内置线程池任务对象
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //设置线程池参数
        taskExecutor.setCorePoolSize(1);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setThreadNamePrefix("MyExecutor-");
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
    /**
     * 处理异步方法中未捕获的异常
     */
    static class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
        @Override
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
            throwable.printStackTrace(); //打印出现的异常堆栈
        }

    }

}
消费者事件
package com.event.springboot.events;

import com.event.springboot.pojo.ConsumerUser;
import org.springframework.context.ApplicationEvent;

/**
 * 消费者事件
 * @author
 */
public class ConsumerRegisterEvent extends ApplicationEvent {


    private ConsumerUser consumerUser;

    /**
     *
     * @param source 事件源
     * @param consumerUser 事件处理内容
     */
    public ConsumerRegisterEvent(Object source, ConsumerUser consumerUser) {
        super(source);
        this.consumerUser = consumerUser;
    }
    public ConsumerUser getConsumerUser() {
        return consumerUser;
    }

    public void setConsumerUser(ConsumerUser consumerUser) {
        this.consumerUser = consumerUser;
    }

}

提供者事件
package com.event.springboot.events;

import com.event.springboot.pojo.ProvideUser;
import org.springframework.context.ApplicationEvent;

/**
 *  提供者事件
 * @author
 */
public class ProvideRegisterEvent extends ApplicationEvent {


    private ProvideUser provideUser;

    /**
     *
     * @param source 事件源
     * @param provideUser 事件处理内容
     */
    public ProvideRegisterEvent(Object source, ProvideUser provideUser) {
        super(source);
        this.provideUser = provideUser;
    }
    public ProvideUser getProvideUser() {
        return provideUser;
    }

    public void setProvideUserr(ProvideUser user) {
        this.provideUser = user;
    }

}

消费监听器
package com.event.springboot.listene;

import com.event.springboot.events.ConsumerRegisterEvent;
import com.event.springboot.pojo.ConsumerUser;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * 消费监听器
 * @author
 */
@Component
public class ConsumerEventListener implements SmartApplicationListener {

    /**
     *支持的事件类型
     * @param eventType
     * @return
     */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType== ConsumerRegisterEvent.class;
    }

    /**
     * 支持的事件源所在的类
     * @param sourceType
     * @return
     */
    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return true;
    }

    /**
     *
     * @Async 开启异步
     * @param event
     */
    @Async
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        try {
            ConsumerRegisterEvent consumer= (ConsumerRegisterEvent) event;
            ConsumerUser consumerUser = consumer.getConsumerUser();
            //在idea中debug需要加上延迟才看得出异步返回 模拟事件监听器顺序 只能控制提交优先级
            Thread.sleep(1000);
            System.out.println("接收到consumer事件:"+event);
            System.out.println("consumerUser.getCode() = " + consumerUser.getCode());
            System.out.println("consumerUser.getName() = " + consumerUser.getName());
            // 接收到事件
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 定义事件监听器的顺序
     * @return
     */
    @Override
    public int getOrder() {
        return 0;
    }
}
消费者监听器
package com.event.springboot.listene;

import com.event.springboot.events.ProvideRegisterEvent;
import com.event.springboot.pojo.ProvideUser;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 *消费者监听器
 * @author
 */
@Component
public class ProvideEventListener implements SmartApplicationListener {

    /**
     *支持的事件类型
     * @param eventType
     * @return
     */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType== ProvideRegisterEvent.class;
    }

    /**
     * 支持的事件源所在的类
     * @param sourceType
     * @return
     */
    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return true;
    }

    /**
     *
     * @Async 开启异步
     * @param event
     */
    @Async
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        try {
            ProvideRegisterEvent provide= (ProvideRegisterEvent) event;
            ProvideUser provideUser = provide.getProvideUser();
//            Thread.sleep(1111);
            System.out.println("接收到provide事件:"+event);
            System.out.println("provideUser.getCode() = " + provideUser.getCode());
            System.out.println("provideUser.getName() = " + provideUser.getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 定义事件监听器的顺序
     * @return
     */
    @Override
    public int getOrder() {
        return 100;
    }
}

实体类

package com.event.springboot.pojo;

public class ConsumerUser {
    private int code;
    private String name;

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

package com.event.springboot.pojo;

public class ProvideUser {
    private int code;
    private String name;

    public int getCode() {
        return code;
    }

    public void setCode(int code) {
        this.code = code;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

事件发布接口

package com.event.springboot.publish;

/**
 *
 * Description: 事件发布
 * @author
 */
public interface EventPublishService<T> {
    /**
     * 事件发射器
     * @param event 事件
     */
    void publishEvent(T event);
}

事件发布实现类

package com.event.springboot.publish.impl;

import com.event.springboot.publish.EventPublishService;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;

/**
 * 实现的事件发布
 * @author
 */
@Component
public class EventPublishServiceImpl implements EventPublishService<ApplicationEvent>, ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void publishEvent(ApplicationEvent event) {
        applicationContext.publishEvent(event);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

服务类

package com.event.springboot.service;


import com.event.springboot.events.ConsumerRegisterEvent;
import com.event.springboot.events.ProvideRegisterEvent;
import com.event.springboot.pojo.ConsumerUser;
import com.event.springboot.pojo.ProvideUser;
import com.event.springboot.publish.impl.EventPublishServiceImpl;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author
 */
@Service
public class UserRegisterService {
    @Autowired
    EventPublishServiceImpl eventPublishService;

    /**
     * 提供者
     */
    public void createProvideUser() {
        ProvideUser provideUser = new ProvideUser();
        provideUser.setName("我是提供者");
        provideUser.setCode(1);
        //发布事件
        eventPublishService.publishEvent(new ProvideRegisterEvent(this,provideUser));
    }
    /**
     * 消费者
     */
    public void createConsumerUser() {
        ConsumerUser consumerUser = new ConsumerUser();
        consumerUser.setName("我是消费者");
        consumerUser.setCode(2);
        //发布事件
        eventPublishService.publishEvent(new ConsumerRegisterEvent(this,consumerUser));
    }
}
HelloController
package com.event.springboot.web;

import com.event.springboot.service.UserRegisterService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class HelloController {

    @Autowired
    UserRegisterService userRegisterService;

    @RequestMapping("/hello")
    public String hello() {
        System.out.println("开始执行");
        userRegisterService.createConsumerUser();
        userRegisterService.createProvideUser();
        System.out.println("我已经执行完了");
        return "Hello Spring Boot!";
    }

}

启动类

package com.event.springboot;
 
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class Application {
 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}