Erlang消息选择性接收特性

原文 2015-03-06 01:44:46 发表于 CSDN,这里对以前写的文章做下收录。

从 rabbitMQ 代码中找到 gen_server2 , 对gen_server进行了一些优化。看到前辈写的博文也提到这个,引发了我的思考。见 gen_server2 - OTP gen_server优化版 。

gen_server2 引发的思考

正如 litaocheng 所说的:
gen_server 和 gen_server2 最大的不同是:
gen_server2 收到任何一条消息放到外部的队列中,当VM内部消息队列为空后,才进行消息处理,继续循环
gen_server 收到任何一条消息后,立即进行处理,处理完成后继续循环

其次,还有一个很重要的不同点:
gen_server2 使用的外部队列是带优先级排序的,功能模块本身可以定制消息优先级,甚至直接抛弃消息。(导出 prioritise_call/prioritise_cast/prioritise_info 几个函数实现定制,返回的数值越大优先级越高,返回drop就抛弃消息)
最高优先级是 infinity, 在处理 {system, _From, _Req} 和 {'EXIT', Parent, _R} 使用了这个优先级。

但在博文也引用了Joe' Bog 在merle 所做的测试,看了merle 的代码,没有用到 prioritise_XXX 函数,说明没有明显利用到 gen_server2 优先级控制的好处,那为什么能获得不错的效果?(见下图)

 

讨论 gen_server2 的测试

通过阅读 Joe 在Github写的 merle 代码,很快发现问题:

可以看出,merle 在 handle_call 时都会调用 send_generic_cmd 、send_get_cmd 等类似函数。这些函数实现上都会阻塞进程直到接收到某些特定消息。
下面以 send_generic_cmd 为例做说明:

send_generic_cmd(Socket, Cmd) ->
    gen_tcp:send(Socket, <<Cmd/binary, "\r\n">>),
    Reply = recv_simple_reply(),
    Reply.


recv_simple_reply() ->
   receive
      {tcp,_,Data} ->
           string:tokens(binary_to_list(Data), "\r\n");
      {error, closed} ->
           connection_closed
      after ?TIMEOUT -> timeout
   end.

另外,gen_tcp:send 在实现上也 receive 等待某个特定信息,见 prim_inet:send(Socket, Packet, Opts)

send(S, Data, OptList) when is_port(S), is_list(OptList) ->
    try erlang:port_command(S, Data, OptList) of
    false -> % Port busy and nosuspend option passed
       {error,busy};
    true ->
       receive
          {inet_reply,S,Status} ->
             Status
      end
   catch
      error:_Error ->
         {error,einval}
   end.

或许可能有读者不明白,这里说的等待某个特定消息是指选择性接收。具体例子如下:

%选择性接收:
receive
  {ok, Result} ->
    Result
end.

%非选择性接收:
receive
  Info ->
    Info
end.

选择性接收只针对某类信件,会一直阻塞住直到找到该信件为止,或者超时。非选择性接收的结果是每条消息都会被消费掉,方式为先进先出,不存在扫描信箱的问题。

前面提到,merle 在 handle_call 时都会 receive 住,等待某个特定消息。这个的代价就是每次receive住,erlang VM都要扫描进程整个信箱队列。特别像 Joe 在做此类测试时,消息处理速度远远低于消息投递速度,换句话说,gen_server进程信箱前面所有大部分的信件都是作者自己发的 gen_server:call 请求消息,然后每次 receive 住都要匹配这些消息。
比如, Joe 测试的是 merle:getkey 操作,那么信箱大部分消息就是 gen_server:call 投递的 getkey 消息,而 handle_call 在处理时就要扫描完前面的getkey消息,才能得到想要的 {tcp,_,Data} 消息。进程信箱消息队列如下所示:

getkey getkey getkey ... getkey {tcp,_,Data} ...

换成 gen_server2 的方式,gen_server2 会清空消息队列,那么进程信箱消息队列如下所示:

getkey getkey {tcp,_,Data} ... getkey getkey ...

前面还有2个getkey表示 gen_server2清空后在 handle_call 处理过程中 gen_server:call 又投递了新的 getkey 消息,数据量对比 gen_server来说可以说是极少了,所以,消息匹配的次数就少了很多,这就会出现 Joe 测试的结果。

讨论erlang消息选择性接收

在讨论这个问题之前,先引用 learnyousomeerlang 对消息选择性接收的介绍(原文),很生动具体。

When there is no way to match a given message, it is put in a save queue and the next message is tried. If the second message matches, the first message is put back on top of the mailbox to be retried later.

就是说,erlang消息匹配不上,就会把消息放到 Save Queue 的队列中,当匹配到了后再把消息放回进程信箱。以上是形象化的说法,如果是这样就必然存在消息入列出列开销,那VM到底是不是这样实现呢?

所以,对于选择性接收,这里取3个问题出来讲:
1、上面提到的,消息 Save Queue 是否存在入列出列开销
2、当选择性接收时,新消息到来时会不会重复扫描信箱前面匹配不上的消息
3、假设第2点不存在重复扫描,那么如果消息已经匹配到了,再匹配多一次这个消息,会不会重复扫描前面的消息

带着上面的疑问,下面以一个简单的例子做说明

-module(test).
-compile(export_all).
t() ->
   receive
      ok ->
          ok
   end.

保存为test.erl,然后编译,生成opcode

1> c(test).
{ok,test}
2> erts_debug:df(test).
ok

在目录下找到生成的 test.dis,t() 函数opcode如下:
04BE84B0: i_func_info_IaaI 0 test t 0
04BE84C4: i_loop_rec_fr f(04BE84EC) x(0)
04BE84CC: i_is_eq_exact_immed_frc f(04BE84E4) x(0) ok
04BE84D8: remove_message
04BE84DC: move_return_cr ok x(0)
04BE84E4: loop_rec_end_f test:t/0
04BE84EC: wait_locked_f test:t/0
逐行解释这段代码:

i_loop_rec_fr receive接收信息,如果有信息放到 x(0) 寄存器,继续下一条指令;没有消息就跳到地址 04BE84EC,即 wait_locked_f
i_is_eq_exact_immed_frc 匹配 x(0)寄存器的值和ok是否相等,如果相等继续下一条指令;否则跳到04BE84E4,即 loop_rec_end_f
remove_message 移除进程消息队列中“当前”的信息(也就是上一行匹配到的信息)
move_return_cr 将 ok 送到 x(0)寄存器并返回结果
loop_rec_end_f 将“当前消息”指针指向下一个位置,如果指向位置有消息,则跳到test:t/0第一段代码地址继续执行,即 04BE84C4;否则继续执行下一条指令 04BE84EC,即 wait_locked_f
wait_locked_f 阻塞当前进程,等待下一次调度,再检查是否有新的消息到达

以上, i_is_eq_exact_immed_frc 和 move_return_cr 在 beam_hot.h实现,其他在 beam_emu.c 实现,都可以找相关代码。

/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 i_loop_rec_fr 处理过程
 * 作用是从信箱取出一条消息放到 x(0) 寄存器;没消息则跳到 wait或者 wait_timeout指令
 */ 
 OpCase(i_loop_rec_fr):
 {
     BeamInstr *next;
     ErlMessage* msgp;
 
 loop_rec__:
 
     PROCESS_MAIN_CHK_LOCKS(c_p);
 
     // 取出“当前位置”的消息
     msgp = PEEK_MESSAGE(c_p);
 
     if (!msgp) { //如果消息不存在,尝试从SMP下public queue获取消息
#ifdef ERTS_SMP
	 erts_smp_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	 if (ERTS_PROC_PENDING_EXIT(c_p)) {
             // 如果进程准备退出,则不处理消息了
	     erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	     SWAPOUT;
	     goto do_schedule;  // 等待下一次调度
	 }
	 // SMP下把消息移到进程私有堆尾部(纯指针操作)
	 ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
	 
	 // 再尝试取出“当前位置”的消息
	 msgp = PEEK_MESSAGE(c_p);
	 if (msgp)
	     erts_smp_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
	 else
#endif
	 {
             // 信箱没消息则跳到 wait或者 wait_timeout指令(实际上就是执行下一条执行)
	     SET_I((BeamInstr *) Arg(0));
	     Goto(*I);
	 }
     }
     // 解析分布式消息,把消息附加的数据复制到进程私有堆
     ErtsMoveMsgAttachmentIntoProc(msgp, c_p, E, HTOP, FCALLS,
				   {
				       SWAPOUT;
				       reg[0] = r(0);
				       PROCESS_MAIN_CHK_LOCKS(c_p);
				   },
				   {
				       ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
				       PROCESS_MAIN_CHK_LOCKS(c_p);
				       r(0) = reg[0];
				       SWAPIN;
				   });
     if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
	 /*
	  * 如果消息损坏就移除(出现这种情况是分布式消息解码出现错误)
	  */
	 ASSERT(!msgp->data.attached);
	 UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重将“当前”位置指向下一条消息
	 free_message(msgp); // 销毁消息
	 goto loop_rec__; // 跳到上面继续
     }
     PreFetch(1, next); // 标记下一条指令位置
     r(0) = ERL_MESSAGE_TERM(msgp);
     NextPF(1, next);   // 执行下一条指令
 }

来看下这两个宏定义:

/* Get "current" message */
#define PEEK_MESSAGE(p)  (*(p)->msg.save)

从字面上就知道这个宏是取"当前的"消息,取了 msg.save 的值

#define UNLINK_MESSAGE(p,msgp) do { \
     ErlMessage* __mp = (msgp)->next; \
     *(p)->msg.save = __mp; \
     (p)->msg.len--; \
     if (__mp == NULL) \
         (p)->msg.last = (p)->msg.save; \
     (p)->msg.mark = 0; \
} while(0)

这个宏就是移除消息操作,消息队列长度-1,把 msg.save 指向了 msgp的下一条消息;如果 msgp->next 为 NULL,表示这是最后一条消息,就把 msg.last 等于了 msg.save


/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 remove_message  处理过程(已删除不必要的代码)
 * 作用是将消息从信箱队列中移除
 */ 
 OpCase(remove_message): {
     BeamInstr *next;
     ErlMessage* msgp;
 
     PROCESS_MAIN_CHK_LOCKS(c_p);
 
     PreFetch(0, next);
     msgp = PEEK_MESSAGE(c_p); // 取出当前的消息
 
     if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
	 save_calls(c_p, &exp_receive);
     }
     if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
	     SEQ_TRACE_TOKEN(c_p) = NIL;
     } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
	     // 追踪调试内容,可以忽略
	     Eterm msg;
	     SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
	     c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
	     if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) { c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
	     }
	     msg = ERL_MESSAGE_TERM(msgp);
	     seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, 
			      c_p->common.id, c_p);
     }
     UNLINK_MESSAGE(c_p, msgp); // 移除消息,侧重队列长度-1
     JOIN_MESSAGE(c_p); // 重置“当前”位置,指向了队列第一条消息
     CANCEL_TIMER(c_p);
     free_message(msgp); // 销毁消息
 
     ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
     PROCESS_MAIN_CHK_LOCKS(c_p);
 
     NextPF(0, next); // 执行下一条指令
 }

所以,当消息匹配时,就会重新指向了信箱第一条消息,这样,第3个问题就有了答案,会重新扫描信箱。
再来看看这个宏:

/* Reset message save point (after receive match) */
#define JOIN_MESSAGE(p) \
     (p)->msg.save = &(p)->msg.first

这个宏就是讲 msg.save 指向了 msg.first ,就是第一个消息
下面看下消息不匹配的情况就是 loop_rec_end_f 

/* 
 * beam_emu.c process_main() 线程入口函数,实现VM调度 
 * 以下截取 loop_rec_end_f   处理过程
 * 作用是继续取出最新的消息匹配
 */ 
 /*
 * Advance the save pointer to the next message (the current
 * message didn't match), then jump to the loop_rec instruction.
 */
 OpCase(loop_rec_end_f): {
     SET_I((BeamInstr *) Arg(0));
     SAVE_MESSAGE(c_p);  // “当前”位置指向下一个位置
     goto loop_rec__;  // 继续取出消息匹配
 }

这个opcode实现了不断取消息出来匹配的过程,直到失去调度机会,等待下一次调度。
也看下这个宏:

/* Save current message */
#define SAVE_MESSAGE(p) \
     (p)->msg.save = &(*(p)->msg.save)->next

这个宏就是将 msg.save 指向了下一个位置。
到这里第1个问题和第2个问题都有答案了,前面说到的 Save Queue 只是“形象化”的队列,实际不存在,这里,VM利用一个Save指针,记录当前匹配消息的位置,所以不存在消息入列出列的开销问题。然后第二个问题,消息选择性接收,当消息匹配不上,有新消息到来时不会重复扫描信箱前面匹配不上的消息。

总结

针对erlang选择性接收的问题,gen_server2给我们一个方向,通过外部队列减少了消息的匹配,而且控制优先级来控制消息的处理。

这里也说说 gen_server2 的副作用:
gen_server2会带来一种问题,erlang原来会利用进程信箱长度来抑制发送者进程(通过减少消息发送者进程的调度机会 Reduction,可以参考这篇文章《erlang send剖析及参数意义》)。但是,gen_server2 每次都会清空进程信箱的消息队列,无法利用到 VM 提供的抑制消息队列过快暴涨的保护机制。
针对这个问题,gen_server2 通过 prioritise_XXX 函数向外部模块暴露消息队列长度,使调用者可以根据消息队列长度控制是否丢弃消息,以实现对消息的抑制。

实际上,gen_server 在我们的开发中就够用了,很少需要去考虑erlang选择性接收的问题。rabbitMQ是针对消息队列的处理,必然有成千上万的消息量,那才正好需要 gen_server2 的作用。如果也到了这种消息量,那就建议使用 gen_server2

《Erlang消息选择性接收特性》上有1条评论

发表评论

邮箱地址不会被公开。 必填项已用*标注