【Pulsar】二次开发过程中遇到线程阻塞问题
摘要公司有用到pulsar,原有的功能不满足需求,所以进行了二次开发。增加的功能就是简单的消息查询,通过时间段、消息ID等参数查询消息内容,使用pulsar-admin的api提供了查询接口。在测试的过程中发现查询接口会出现长时间阻塞,而且没办法恢复。分析增加的流程校验topic是否存在获取topic的所有订阅每个订阅开启一个readonly cursor通过cursor读取消息封装后返回给客户端
摘要
公司有用到pulsar,原有的功能不满足需求,所以进行了二次开发。增加的功能就是简单的消息查询,通过时间段、消息ID等参数查询消息内容,使用pulsar-admin的api提供了查询接口。
在测试的过程中发现查询接口会出现长时间阻塞,而且没办法恢复。
分析
增加的流程
- 校验topic是否存在
- 获取topic的所有订阅
- 每个订阅开启一个readonly cursor
- 通过cursor读取消息封装后返回给客户端
pulsar的所有模块基本都是异步的,通过CompletableFuture
实现,因此就怀疑是这个原因导致线程出现了死锁。
通过jstack获取堆栈信息:
jstack pid
从上述图中可以看到zk的查询操作和业务操作出现了阻塞,相互等待。
而且执行业务的线程是metadata-store的线程,这就很奇怪。因为只有zk的操作才通过这个线程执行。在AbstractMetadataStore
中可以看到,zk里初始化了只有一个线程的线程池负责zk的所有后续操作,执行查询后的回调函数。
上述execute方法就是通过初始化的线程池执行业务逻辑传递的回调函数。
通过上述的分析,基本可以确定到是zk执行的回调函数出现了阻塞,导致zk的线程池一直不能用,也就不能继续执行后续的zk查询等操作。
定位
知道了原因,后边定位就比较简单了。消息查询的时候有一个判断topic是否存在的逻辑,在这个逻辑中也有查询zk的行为。
之前的代码在判断topic是否存在后执行了消息查询的逻辑,而消息查询也涉及到zk的查询,消息查询使用了zk的线程,因此出现了阻塞。
解决
解决办法就是把topic的判断拆分出来。
后续
pulsar中zk的操作基本都是单线程的,而且全部都是异步操作,很容易出现这种阻塞的问题。在二次开发过程中,要注意:
- 不在zk的查询回调函数中执行耗时长的逻辑。
- 不在zk的查询回调里执行zk的查询逻辑。
- 做好异常判断,及时释放线程资源。
更多推荐
所有评论(0)