
RabbitMQ流量控制机制分析.docx
2页RabbitMQ 流量控制机制简单分析在 RabbitMQ 中,消息可能被存储在多个不同的队列,消息越早被消费,那么消息经过的队列层次 越少,则平均每个消息处理的开销就越小但若接收消息的速率过快,MQ来不及处理,这些消息 就可能进入很深层次的队列,大大增加平均每个消息的处理开销,进一步使得处理新消息和发送旧 消息的能力减弱,更多的消息会进入很深的队列,循环往复,整个系统的性能就会极大的降低另 外若接收消息的速率过快还会实现某些进程的 mailbox 过大,可能会产生很严重的后果为此, RabbitMQ设计了一套流控机制,本文从以下三个方面去阐述该流控机制是如何工作的1. 如何开关闸门RabbitMQ 使用TCP长连接进行通讯,接收数据的起点进程为rabbit_reader首先分析它的接收loop recvloop(Deb, State = #v1{connection_state = blocked})->mainloop(Deb, State); Qrecvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) when BufLen < RecvLen ->ok = rabbit_net:setopts(Sock, [{active, once}]), Q mainloop(Deb, State#v1{pending_recv = true});recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen})->{Data, Rest} = split_binary(case Buf of[B] -> B;_ -> list_to_binary(lists:reverse(Buf))end, RecvLen),recvloop(Deb, handle_input(State#v1.callback, Data,State#v1{buf = [Rest],buf_len = BufLen - RecvLen})).从上面代码可以看出,rabbit_reader每接收到一个包,就设置套接字属性为{active, onece},若当前 连接被blocked时则不设置{active,once},这个接收进程就阻塞在receive方法上。
通过这种方式来实 现闸门的开关2. 何时关闭闸门RabbitMQ 是用 erlang/OTP 开发的,一个消息从被接收到被发送给订阅者,必然要在多个进程间的 转发,从接收到被消费,一个消息所走过的所有进程自然形成一条消息链,RabbitMQ通过监控这条 链上每个节点“mailbox”中未被接收的消息数量,决定何时关闭闸门实现机制如下所述:• {{credit_from,B}, value}• {{credit_to, pid}, value}• {{credit_from,C}, value}• {{credit_to,A}, value}• {{credit_from,pid}, value}• {{credit_to,B}, value}如图所示,进程A、B、C连成一条消息链,每个进程字典中有一对关于收发消息的credit值,以进 程B为例,{{credit_from, C} , Value},表示能发多少条消息给C,每发一条消息该值减1,当为0 时,本进程阻塞住不再往下游进程发消息也不再接收上游的消息;{{credit_to, A} , Value}表示再接 收多少个消息就向上游进程发增加credit值的消息{bump_credit, { self(), Quantity} },在上游进程接 收到该消息后,就增加{credit_from, pid}值,这样上游进程就能持续发消息。
但当上游发送速率 高于下游接收速率,credit值会逐渐被耗光这时进程就会被阻塞,阻塞的情况会一直传递到最上游 Rabbit_reader,这时 rabbit_reader 就关闭闸门3. 何时开启闸门当上游进程收到来自下游进程的bump_credit消息时,若此时上游进程处于block状态则解除block 状态,开始接收更上游进程的消息,一个个的传导最终能够解除rabbit_reader的block状态。
