更新时间:
#Java 使用 Disruptor 并发框架
#一、什么是 Disruptor
Disruptor 是一个高性能的异步处理框架,或者可以认为是最快的消息框架(轻量的 JMS),也可以认为是一个观察者模式的实现,或者事件监听模式的实现。能够在无锁的情况下实现网络的 Queue 并发操作。Disruptor 使用观察者模式, 主动将消息发送给消费者, 而不是等消费者从队列中取; 在无锁的情况下, 实现 queue(环形, RingBuffer) 的并发操作, 性能远高于 BlockingQueue。
#二、Disruptor 的优化策略
- 环形数组结构
为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。 - 元素位置定位
数组长度 2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心 index 溢出的问题。index 是 long 类型,即使 100 万 QPS 的处理速度,也需要 30 万年才能用完。 - 无锁设计
每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。
参考 : https://xie.infoq.cn/article/fe04e42cd473982d7d901b2c8
cas代替锁
独占缓存行 缓存行填充
环形队列 2的n次方 ,就是将取模转变为取与运算。 m % 2^n = m & ( 2^n - 1 )
预分配内存, 重用
#二、在 java 中使用
- pom 引入依赖
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.2.1</version>
</dependency>
- 为了方便操作 bean,引入 lombok 框架
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
- 定义消息 bean
@Data
public class Msg {
private int code;
private String msg;
}
- 定义消费者
public class ConsumerOne implements EventHandler<Msg> {
public void onEvent(Msg msg, long l, boolean b) throws Exception {
System.out.println("ConsumerOne - 消费者1 :="+msg);
}
}
public class ConsumerTwo implements EventHandler<Msg> {
public void onEvent(Msg msg, long l, boolean b) throws Exception {
System.out.println("ConsumerTwo - 消费者2 :="+msg);
}
}
- 定义工厂
public class MsgEventFactory implements EventFactory<Msg> {
public Msg newInstance() {
return new Msg();
}
}
- 提供者
public class MsgProducer {
private RingBuffer<Msg> ringBuffer;
private ExecutorService executor;
private Disruptor<Msg> disruptor;
public MsgProducer() {
init();
}
private void init() {
executor = Executors.newCachedThreadPool();
EventFactory<Msg> factory = new MsgEventFactory();
// 2的N次方。
int ringbuffer = 1024 * 1024;
//创建disruptor对象
disruptor = new Disruptor<Msg>(factory, ringbuffer, executor, ProducerType.MULTI, new YieldingWaitStrategy());
//注册消费者
disruptor.handleEventsWith(new ConsumerOne());
disruptor.handleEventsWith(new ConsumerTwo());
//启动
disruptor.start();
RingBuffer<Msg> ringBuffer = disruptor.getRingBuffer();
this.ringBuffer = ringBuffer;
}
public boolean Public(int code,String message){
long sequence =ringBuffer.next();
try {
Msg msg = ringBuffer.get(sequence);
msg.setMsg(message);
msg.setCode(code);
ringBuffer.publish(sequence);
return true;
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
public void Close(){
executor.shutdown();
disruptor.shutdown();
}
}
- 测试
public class Test {
public static void main(String[] args) {
MsgProducer producer = new MsgProducer();
producer.Public(200,"哈哈");
producer.Public(200,"哈哈22");
producer.Public(200,"哈哈22哈哈");
producer.Close();
}
}
原文地址 blog.csdn.net