摘要

公司有用到pulsar,原有的功能不满足需求,所以进行了二次开发。增加的功能就是简单的消息查询,通过时间段、消息ID等参数查询消息内容,使用pulsar-admin的api提供了查询接口。

在测试的过程中发现查询接口会出现长时间阻塞,而且没办法恢复。

分析

增加的流程

  • 校验topic是否存在
  • 获取topic的所有订阅
  • 每个订阅开启一个readonly cursor
  • 通过cursor读取消息封装后返回给客户端

pulsar的所有模块基本都是异步的,通过CompletableFuture实现,因此就怀疑是这个原因导致线程出现了死锁。
通过jstack获取堆栈信息:
jstack pid
在这里插入图片描述
从上述图中可以看到zk的查询操作和业务操作出现了阻塞,相互等待。
而且执行业务的线程是metadata-store的线程,这就很奇怪。因为只有zk的操作才通过这个线程执行。在AbstractMetadataStore中可以看到,zk里初始化了只有一个线程的线程池负责zk的所有后续操作,执行查询后的回调函数。
在这里插入图片描述

在这里插入图片描述
上述execute方法就是通过初始化的线程池执行业务逻辑传递的回调函数。

通过上述的分析,基本可以确定到是zk执行的回调函数出现了阻塞,导致zk的线程池一直不能用,也就不能继续执行后续的zk查询等操作。

定位

知道了原因,后边定位就比较简单了。消息查询的时候有一个判断topic是否存在的逻辑,在这个逻辑中也有查询zk的行为。
之前的代码在判断topic是否存在后执行了消息查询的逻辑,而消息查询也涉及到zk的查询,消息查询使用了zk的线程,因此出现了阻塞。

解决

解决办法就是把topic的判断拆分出来。
在这里插入图片描述

后续

pulsar中zk的操作基本都是单线程的,而且全部都是异步操作,很容易出现这种阻塞的问题。在二次开发过程中,要注意:

  • 不在zk的查询回调函数中执行耗时长的逻辑。
  • 不在zk的查询回调里执行zk的查询逻辑。
  • 做好异常判断,及时释放线程资源。
Logo

汇聚全球AI编程工具,助力开发者即刻编程。

更多推荐