原文 2015-01-24 01:07:54 发表于 CSDN,这里对以前写的文章做下收录。
erlang send是一个很基础的消息发送函数,用于进程把一个消息发给另外一个进程。这个函数可以同时用于本地节点进程通信,或者和远程节点进程之间的通信。
最近有同事遇到erlang:send导致消息堆积问题,这个引起了我的强烈关注。我也看了这块的代码,这里简单做个分享。
函数原型:
erlang:send(Dest, Msg, Options) -> Res
Options可以是以下2个:
nosuspend
If the sender would have to be suspended to do the send, nosuspend is returned instead.
noconnect
If the destination node would have to be auto-connected before doing the send, noconnect is returned instead.
字面上意思是说:
nosuspend 遇到会挂起进程的场合不挂起进程,直接返回nosuspend
noconnect 遇到远程节点没有连接时不会自动连接发送消息,直接返回noconnect
但是,erlang直接在文档说慎用nosuspend Warning
As with erlang:send_nosuspend/2,3: Use with extreme care!
到底为什么这么说,而且返回nosuspend 时这个消息是发送出去了,还是没发送出去?
源码剖析
看了erlang代码,erlang:send是bif实现的,这里以R16B02做说明:
/** * bif.c send_3()函数,实现erlang:send/3 */ BIF_RETTYPE send_3(BIF_ALIST_3) { Eterm ref; Process *p = BIF_P; Eterm to = BIF_ARG_1; Eterm msg = BIF_ARG_2; Eterm opts = BIF_ARG_3; int connect = !0; // 初始值设1,表示非0值 int suspend = !0; // 同上 Eterm l = opts; Sint result; while (is_list(l)) { //遍历参数列表 if (CAR(list_val(l)) == am_noconnect) { connect = 0; // 参数带 noconnect,则 connect 取值0 } else if (CAR(list_val(l)) == am_nosuspend) { suspend = 0; // 同上 } else { BIF_ERROR(p, BADARG); } l = CDR(list_val(l)); } if(!is_nil(l)) { BIF_ERROR(p, BADARG); } // 调用 do_send 发送消息;result 大于0表示本次消息发送要扣除的reds,其他则表示错误码 result = do_send(p, to, msg, suspend, &ref); if (result > 0) { ERTS_VBUMP_REDS(p, result); // 扣除本次消息发送的 reds if (ERTS_IS_PROC_OUT_OF_REDS(p)) goto yield_return; BIF_RET(am_ok); } switch (result) { case 0: /* May need to yield even though we do not bump reds here... */ if (ERTS_IS_PROC_OUT_OF_REDS(p)) goto yield_return; BIF_RET(am_ok); break; // 遇到 SEND_TRAP 错误 case SEND_TRAP: if (connect) { // connect 不等于 0 BIF_TRAP3(dsend3_trap, p, to, msg, opts); } else { // connect 等于 0,直接返回 noconnect BIF_RET(am_noconnect); } break; // 遇到 SEND_YIELD 错误 case SEND_YIELD: if (suspend) { // suspend 不等于 0 ERTS_BIF_YIELD3(bif_export[BIF_send_3], p, to, msg, opts); } else { // suspend 等于 0,直接返回 nosuspend BIF_RET(am_nosuspend); } break; // 遇到 SEND_YIELD_RETURN 错误 case SEND_YIELD_RETURN: // suspend 等于 0,直接返回 nosuspend if (!suspend) BIF_RET(am_nosuspend); yield_return: ERTS_BIF_YIELD_RETURN(p, am_ok); case SEND_AWAIT_RESULT: ASSERT(is_internal_ref(ref)); BIF_TRAP3(await_port_send_result_trap, p, ref, am_nosuspend, am_ok); case SEND_BADARG: BIF_ERROR(p, BADARG); break; case SEND_USER_ERROR: BIF_ERROR(p, EXC_ERROR); break; case SEND_INTERNAL_ERROR: BIF_ERROR(p, EXC_INTERNAL_ERROR); break; default: ASSERT(! "Illegal send result"); break; } ASSERT(! "Can not arrive here"); BIF_ERROR(p, BADARG); }
再来看看 do_send() 函数:
/* * bif.c do_send()函数,实现发送到其他进程,或端口,或远程进程 * 返回消息发送的reds,或错误码 */ //以下几个是 do_send 可能返回的错误码 #define SEND_TRAP (-1) #define SEND_YIELD (-2) #define SEND_YIELD_RETURN (-3) #define SEND_BADARG (-4) #define SEND_USER_ERROR (-5) #define SEND_INTERNAL_ERROR (-6) #define SEND_AWAIT_RESULT (-7) Sint do_send(Process *p, Eterm to, Eterm msg, int suspend, Eterm *refp) { Eterm portid; Port *pt; Process* rp; DistEntry *dep; Eterm* tp; // 如果目标进程在本地节点 if (is_internal_pid(to)) { if (IS_TRACED(p)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); rp = erts_proc_lookup_raw(to); if (!rp) return 0; // 找到这个进程则执行最后的 send_message } else if (is_external_pid(to)) { // 如果目标进程在远程节点 dep = external_pid_dist_entry(to); if(dep == erts_this_dist_entry) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, p->common.id, to, external_pid_creation(to), erts_this_node->creation); erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } // 远程消息调用 remote_send 发送 return remote_send(p, dep, to, to, msg, suspend); } else if (is_atom(to)) { // 如果传参是原子,尝试从进程表找到这个进程 Eterm id = erts_whereis_name_to_id(p, to); rp = erts_proc_lookup(id); if (rp) goto send_message; // 找不到这个进程,检查目标是不是端口 pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP); if (pt) { portid = id; goto port_common; } if (IS_TRACED(p)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); return SEND_BADARG; } else if (is_external_port(to) && (external_port_dist_entry(to) == erts_this_dist_entry)) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "Discarding message %T from %T to %T in an old " "incarnation (%d) of this node (%d)\n", msg, p->common.id, to, external_port_creation(to), erts_this_node->creation); erts_send_error_to_logger(p->group_leader, dsbufp); return 0; } else if (is_internal_port(to)) { // 如果是本地端口 int ret_val; portid = to; pt = erts_port_lookup(portid, ERTS_PORT_SFLGS_INVALID_LOOKUP); port_common: ret_val = 0; if (pt) { int ps_flags = suspend ? 0 : ERTS_PORT_SIG_FLG_NOSUSPEND; *refp = NIL; // 执行端口操作 switch (erts_port_command(p, ps_flags, pt, msg, refp)) { case ERTS_PORT_OP_CALLER_EXIT: /* We are exiting... */ return SEND_USER_ERROR; case ERTS_PORT_OP_BUSY: /* Nothing has been sent */ if (suspend) erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); // 如果 nosuspend 返回 SEND_YIELD,消息还没发送 return SEND_YIELD; case ERTS_PORT_OP_BUSY_SCHEDULED: /* Message was sent */ if (suspend) { erts_suspend(p, ERTS_PROC_LOCK_MAIN, pt); ret_val = SEND_YIELD_RETURN; break; } // 这里没有break,如果 nosuspend 将执行下一步操作,消息已发送 /* Fall through */ case ERTS_PORT_OP_SCHEDULED: if (is_not_nil(*refp)) { ASSERT(is_internal_ref(*refp)); ret_val = SEND_AWAIT_RESULT; } break; case ERTS_PORT_OP_DROPPED: case ERTS_PORT_OP_BADARG: case ERTS_PORT_OP_DONE: break; default: ERTS_INTERNAL_ERROR("Unexpected erts_port_command() result"); break; } } if (IS_TRACED(p)) /* trace once only !! */ trace_send(p, portid, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); if (SEQ_TRACE_TOKEN(p) != NIL #ifdef USE_VM_PROBES && SEQ_TRACE_TOKEN(p) != am_have_dt_utag #endif ) { seq_trace_update_send(p); seq_trace_output(SEQ_TRACE_TOKEN(p), msg, SEQ_TRACE_SEND, portid, p); } if (ERTS_PROC_IS_EXITING(p)) { KILL_CATCHES(p); /* Must exit */ return SEND_USER_ERROR; } return ret_val; } else if (is_tuple(to)) { /* Remote send */ // 如果to是原子,走到这里只有是发送远程消息的情况了 int ret; tp = tuple_val(to); if (*tp != make_arityval(2)) return SEND_BADARG; if (is_not_atom(tp[1]) || is_not_atom(tp[2])) return SEND_BADARG; /* sysname_to_connected_dist_entry will return NULL if there is no dist_entry or the dist_entry has no port, but remote_send() will handle that. */ // 找到 dist_entry 就用本地进程消息或端口形式发送 dep = erts_sysname_to_connected_dist_entry(tp[2]); if (dep == erts_this_dist_entry) { Eterm id; erts_deref_dist_entry(dep); if (IS_TRACED(p)) trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); id = erts_whereis_name_to_id(p, tp[1]); rp = erts_proc_lookup_raw(id); if (rp) goto send_message; pt = erts_port_lookup(id, ERTS_PORT_SFLGS_INVALID_LOOKUP); if (pt) { portid = id; goto port_common; } return 0; } // 找不到 dist_entry 就用 remote_send 发送 ret = remote_send(p, dep, tp[1], to, msg, suspend); if (dep) erts_deref_dist_entry(dep); return ret; } else { if (IS_TRACED(p)) /* XXX Is this really neccessary ??? */ trace_send(p, to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); return SEND_BADARG; } // 以下过程是处理本节点进程消息 send_message: { ErtsProcLocks rp_locks = 0; Sint res; #ifdef ERTS_SMP if (p == rp) rp_locks |= ERTS_PROC_LOCK_MAIN; #endif /* send to local process */ res = erts_send_message(p, rp, &rp_locks, msg, 0); if (erts_use_sender_punish) res *= 4; else res = 0; erts_smp_proc_unlock(rp, p == rp ? (rp_locks & ~ERTS_PROC_LOCK_MAIN) : rp_locks); return res; } }
再看下本地进程消息处理 erts_send_message() 函数:
/* * erl_message.c erts_send_message()函数 实现发送本地消息给进程 */ Sint erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, Eterm message, unsigned flags) { Uint msize; ErlHeapFragment* bp = NULL; Eterm token = NIL; Sint res = 0; #ifdef USE_VM_PROBES DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; #endif BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); #ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send)) { erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), "%T", sender->common.id); erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)), "%T", receiver->common.id); } #endif if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { // 发送进程打了跟踪标记 sequential_trace_token;后面处理进程跟踪过程 // 见 http://www.erlang.org/doc/man/erlang.html#process_flag-3 /* * 篇幅有限,这里省略了部分无关代码 */ } else if (sender == receiver) { // 进程发送消息给自己 /* 如果进程正在关闭,则丢弃消息 */ #ifdef ERTS_SMP ErtsProcLocks need_locks = (~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ | ERTS_PROC_LOCK_STATUS)); if (need_locks) { *receiver_locks |= need_locks; if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { if (need_locks == ERTS_PROC_LOCK_MSGQ) { erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS; } erts_smp_proc_lock(receiver, need_locks); } } if (!ERTS_PROC_PENDING_EXIT(receiver)) #endif { ErlMessage* mp = message_alloc(); DTRACE6(message_send, sender_name, receiver_name, size_object(message), tok_label, tok_lastcnt, tok_serial); mp->data.attached = NULL; ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = NIL; #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = NIL; #endif mp->next = NULL; // SMP下把消息移到进程私有堆尾部(纯指针操作) ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); // 把消息追加到消息队列尾部(纯指针操作) LINK_MESSAGE_PRIVQ(receiver, mp); // res 取进程消息队列长度 res = receiver->msg.len; if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } } BM_SWAP_TIMER(send,system); } else { // 进程发送消息给别的进程 #ifdef ERTS_SMP ErlOffHeap *ohp; Eterm *hp; erts_aint32_t state; BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); // 接收者进程分配一个大小为 msize 的堆空间,用于存放这个消息 hp = erts_alloc_message_heap_state(msize, &bp, &ohp, receiver, receiver_locks, &state); BM_SWAP_TIMER(send,copy); // 复制消息到接受者进程,返回值 message 有可能是引用二进制 refc binary message = copy_struct(message, msize, &hp, ohp); BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); // res 取接收者进程消息队列长度 res = queue_message(sender, receiver, receiver_locks, &state, bp, message, token #ifdef USE_VM_PROBES , NIL #endif ); BM_SWAP_TIMER(send,system); #else ErlMessage* mp = message_alloc(); Eterm *hp; BM_SWAP_TIMER(send,size); msize = size_object(message); BM_SWAP_TIMER(size,send); // 检查接收者进程内存不足,执行GC if (receiver->stop - receiver->htop <= msize) { BM_SWAP_TIMER(send,system); erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity); BM_SWAP_TIMER(system,send); } hp = receiver->htop; receiver->htop = hp + msize; BM_SWAP_TIMER(send,copy); // 处理引用二进制的数据(修改引用计数) message = copy_struct(message, msize, &hp, &receiver->off_heap); BM_MESSAGE_COPIED(msize); BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, (uint32_t)msize, tok_label, tok_lastcnt, tok_serial); ERL_MESSAGE_TERM(mp) = message; ERL_MESSAGE_TOKEN(mp) = NIL; #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = NIL; #endif mp->next = NULL; mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); // res 取接收者进程消息队列长度 res = receiver->msg.len; /* 将接收者进程添加到调度队列 * 接收者进程可能receive消息导致失去调度,在新消息到来时需要将进程加到调度队列。 */ erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } BM_SWAP_TIMER(send,system); #endif /* #ifndef ERTS_SMP */ } return res; }
再来看看 remote_send() 函数:
/** * bif.c remote_send()函数,实现远程消息发送 * dist_entry是erlang分布式接口 */ static Sint remote_send(Process *p, DistEntry *dep, Eterm to, Eterm full_to, Eterm msg, int suspend) { Sint res; int code; ErtsDSigData dsd; ASSERT(is_atom(to) || is_external_pid(to)); //检查dist_entry状态 code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, !suspend); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: // 连接不存在直接返回 SEND_TRAP res = SEND_TRAP; break; // suspend 走这里,返回 SEND_YIELD case ERTS_DSIG_PREP_WOULD_SUSPEND: ASSERT(!suspend); res = SEND_YIELD; break; // nosuspend 在dist_entry连接非稳定状态时会走这里,强制把消息压入发送队列,返回 0,即发送完成 case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) code = erts_dsig_send_reg_msg(&dsd, to, msg); else code = erts_dsig_send_msg(&dsd, to, msg); /* * Note that reductions have been bumped on calling * process by erts_dsig_send_reg_msg() or * erts_dsig_send_msg(). */ if (code == ERTS_DSIG_SEND_YIELD) res = SEND_YIELD_RETURN; else res = 0; break; } default: ASSERT(! "Invalid dsig prepare result"); res = SEND_INTERNAL_ERROR; } if (res >= 0) { if (IS_TRACED(p)) trace_send(p, full_to, msg); if (ERTS_PROC_GET_SAVED_CALLS_BUF(p)) save_calls(p, &exp_send); } return res; }
其中,erts_dsig_send_msg() 底层调用了 dsig_send,把消息放到发送队列,再由 port_task 工作线程负责把消息投递到其他节点。这里面篇幅较大,主要是dist.c, erl_port_task.c, erl_process.c, erl_node_tables.c, io.c 这几个模块,以后找时间再讲。
问题讨论
1、erlang:send 不带 nosuspend / noconnect, 会导致消息堆积?
erlang被挂起时(也就是进程是{status,suspended}),就会导致当前这个进程消息堆积。
30> Pid = spawn(fun() -> receive M -> M end end). <0.68.0> 31> process_info(Pid,messages). {messages,[]} 32> erlang:suspend_process(Pid). true 33> Pid ! hello. hello 34> process_info(Pid,status). {status,suspended} 35> process_info(Pid,messages). {messages,[hello]}
2、为什么erlang文档说慎用 nosuspend ?
因为,当消息无法发送时,原本会导致发送者进程失去调度权,但是这种方法则会以返回值的方式通知调用者端口忙,不会抑制发送者进程。特别是在分布式环境下,当erlang:send 使用了nosuspend时,当端口繁忙堆积了很多消息时,还会强制把消息压入端口发送队列,如果端口一直处于不稳定的状态,就会导致消息不停的堆积,撑爆内存。所以,还是建议使用 noconnect,当连接异常时就会返回 noconnect ,程序这边再做异常处理
3、erlang:send 返回nosuspend 时这个消息是发送出去了,还是没发送出去?
当返回nosuspend 时这个消息肯定是没发送出去。(文章这里最开始写错了,所以网上搜到这篇文章都是错的,建议还是少上那些复制网站)
扩展延伸
reductions
可以理解为 Erlang的基本调度计量单位,Erlang VM基于reduction来进行调度,用来保证调度实现的准实时性。进程的 reduction 值越高,得到的调度机会就越多。
trap_send
前面代码提到了SEND_TRAP 错误,也就是这里
case SEND_TRAP: if (connect) { BIF_TRAP3(dsend3_trap, p, to, msg, opts); } else { BIF_RET(am_noconnect); } break;
看下BIF_TRAP3的代码,实际是个宏,修改当前进程的'寄存器'信息,关键是设置 freason 为 TRAP
#define BIF_TRAP3(Trap_, p, A0, A1, A2) do { \ Eterm* reg = ERTS_PROC_GET_SCHDATA((p))->x_reg_array; \ (p)->arity = 3; \ reg[0] = (A0); \ reg[1] = (A1); \ reg[2] = (A2); \ (p)->i = (BeamInstr*) ((Trap_)->addressv[erts_active_code_ix()]); \ (p)->freason = TRAP; \ return THE_NON_VALUE; \ } while(0)
现在看下这个过程是怎么工作的:
/* * beam_emu.c process_main() 线程入口函数,实现VM调度 * 以下截取 bif 处理过程 */ OpCase(call_bif_e): { Eterm (*bf)(Process*, Eterm*, BeamInstr*) = GET_BIF_ADDRESS(Arg(0)); // 根据参数获取bif实际执行函数 Eterm result; BeamInstr *next; PRE_BIF_SWAPOUT(c_p); c_p->fcalls = FCALLS - 1; if (FCALLS <= 0) { save_calls(c_p, (Export *) Arg(0)); } PreFetch(1, next); ASSERT(!ERTS_PROC_IS_EXITING(c_p)); reg[0] = r(0); result = (*bf)(c_p, reg, I); // 执行bif函数 ASSERT(!ERTS_PROC_IS_EXITING(c_p) || is_non_value(result)); ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); ERTS_HOLE_CHECK(c_p); ERTS_SMP_REQ_PROC_MAIN_LOCK(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); if (c_p->mbuf || MSO(c_p).overhead >= BIN_VHEAP_SZ(c_p)) { Uint arity = ((Export *)Arg(0))->code[2]; result = erts_gc_after_bif_call(c_p, result, reg, arity); E = c_p->stop; } HTOP = HEAP_TOP(c_p); FCALLS = c_p->fcalls; if (is_value(result)) { r(0) = result; CHECK_TERM(r(0)); NextPF(1, next); } else if (c_p->freason == TRAP) { // 当 freason 设置为 TRAP 时 SET_CP(c_p, I+2); SET_I(c_p->i); SWAPIN; r(0) = reg[0]; Dispatch(); // 到这一步之后就会调用 dsend3_trap 指向的函数,涉及VM实现,这里不多讲了
这里也说下 dsend3_trap 指向哪个函数:
/* dist.c erlang分布式上层实现函数 */ static Export* trap_function(Eterm func, int arity) { return erts_export_put(am_erlang, func, arity); // 从导出函数表获取函数地址 } void init_dist(void) { init_nodes_monitors(); nodedown.reason = NIL; nodedown.bp = NULL; erts_smp_atomic_init_nob(&no_nodes, 0); erts_smp_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ dsend2_trap = trap_function(am_dsend,2); dsend3_trap = trap_function(am_dsend,3); // dsend3_trap 指向的函数就是 erlang:dsend/3 /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ dlink_trap = trap_function(am_dlink,1); dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); }
为什么erlang要大费周章搞trap?erlang:send() bif化后又执行非bif函数(erlang:dsend/3)
erlang:send/3 只有在节点连接失败的情况下才会执行erlang:dsend/3,这个时候,当前进程就会失去CPU调度权,放到了下一次调度去执行这个函数。而且,这一来一回就扣除了不少reductions
2015/1/25 修正 erlang:send 返回nosuspend时消息没发送出去
2015/5/22 修正 erts_proc_notify_new_message 的注释说明