上一篇主要介绍了事务提交的流程,得知在事务提交前会通过waitTicket
来等待冲突检测的结果,本篇主要来理一下冲突检测如何通知到提交事务的线程,也就是一个条件变量。相似地,我们还是先来理一下几个会提到的类,简单的梳理一下它的功能与用途。
从xcom读取到的各种通知,或者是从用户线程传入的本地通知消息。核心是直接可以对各种notification当成函数来使用,但是由于无法使用c++11,所以无法绑定成员函数。作为抽象类,定义了notification接口。
主要实现了Gcs_xcom_notification定义的()操作符,用于直接执行do_execute方法来实现调用预设的回调函数。
继承了Parameterized_notification,最主要的xcom消息之一。
这里主要定义了三个Packet类型,均继承了在pipeline_interface中定义的Packet基类,也就是简单的一个定义、获取包类型的基类。
//Define the applier packet types
#define ACTION_PACKET_TYPE 2
#define VIEW_CHANGE_PACKET_TYPE 3
#define SINGLE_PRIMARY_PACKET_TYPE 4
下面看一下各种包的用途:
主要用来驱动applier模块,主要定义了以下三个枚举:
/* Types of action packets used in the context of the applier module */
enum enum_packet_action
{
TERMINATION_PACKET=0, //Packet for a termination action
SUSPENSION_PACKET, //Packet to signal something to suspend
ACTION_NUMBER= 2 //The number of actions
};
用于传递视图改变的消息,包中主要的成员是gtid set和视图id。
用于传递新的主节点选举信息。
用于定义Applier_module的接口,是一个纯虚类。
继承并实现Applier_module_interface
的接口。该类主要是管理着一个incoming队列,并管理pipeline。
在上一篇的提交流程中已经提到,事务提交的相关信息(binlog、write_set、pk)等已经被广播了,然后这个广播怎么被处理,如何最终走入冲突检测模块并最终唤醒提交事务的线程呢?当然这里假设了该事务是一个本地事务,非远程事务。
在MGR插件的初始化函数中,会初始化xcom模块,具体可见plugin.cc:
int plugin_group_replication_start()
{
...
// GR delayed initialization.
if (!server_engine_initialized())
{
wait_on_engine_initialization= true;
plugin_is_auto_starting= false;
delayed_initialization_thread= new Delayed_initialization_thread();
if (delayed_initialization_thread->launch_initialization_thread())
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL,
"It was not possible to guarantee the initialization of plugin"
" structures on server start");
delete delayed_initialization_thread;
delayed_initialization_thread= NULL;
DBUG_RETURN(GROUP_REPLICATION_CONFIGURATION_ERROR);
/* purecov: end */
}
DBUG_RETURN(0); //leave the decision for later
}
初始化xcom主要是在延迟初始化的线程中处理,所以我们可以看一下Delayed_initialization_thread
主要做了什么工作。
在该线程的initialization_thread_handler
方法中,会初始化xcom组件,并且把节点加入到集群中:
if (server_engine_initialized())
{
//Protect this delayed start against other start/stop requests
Mutex_autolock auto_lock_mutex(get_plugin_running_lock());
error= initialize_plugin_and_join(PSESSION_INIT_THREAD, this);
}
else
{
error= 1;
log_message(MY_ERROR_LEVEL,
"Unable to start Group Replication. Replication applier "
"infrastructure is not initialized since the server was "
"started with --initialize or --initialize-insecure.");
}
在initialize_plugin_and_join
的初始化流程里,在start_group_communication
中会初始化xcom组件,并且设置一个事件处理对象来处理xcom底层传递来的事件,加入集群的相关代码:
int start_group_communication()
{
DBUG_ENTER("start_group_communication");
if (auto_increment_handler != NULL)
{
auto_increment_handler->
set_auto_increment_variables(auto_increment_increment_var,
get_server_id());
}
events_handler= new Plugin_gcs_events_handler(applier_module,
recovery_module,
view_change_notifier,
compatibility_mgr,
components_stop_timeout_var);
view_change_notifier->start_view_modification();
if (gcs_module->join(*events_handler, *events_handler))
DBUG_RETURN(GROUP_REPLICATION_COMMUNICATION_LAYER_JOIN_ERROR);
DBUG_RETURN(0);
}
加入集群的逻辑主要在gcs_operations.cc中的
enum enum_gcs_error
Gcs_operations::join(const Gcs_communication_event_listener& communication_event_listener,
const Gcs_control_event_listener& control_event_listener)
中,最终会到Gcs_xcom_control
中进行join,最终join的逻辑主要在retry_do_join
函数内,会创建一个回调为xcom_taskmain_startup
的线程,该线程会做一些初始化操作,然后将执行逻辑交给xcom_taskmain2
,该函数内有一个任务循环用来处理任务,收到终止信号后,做一些清理工作。
int xcom_taskmain2(xcom_port listen_port)
{
// Initialize and listen
...
task_loop();
#if defined(XCOM_HAVE_OPENSSL)
xcom_cleanup_ssl();
#endif
MAY_DBG(FN; STRLIT(" exit"));
xcom_thread_deinit();
return 1;
}
所以主要的工作流程主要在task_loop
中,位于task.c。在该循环中,会不断的获取task来执行:
task_env * t = first_runnable();
/* While runnable tasks */
while (runnable_tasks()) {
task_env * next = next_task(t);
if (!is_task_head(t)) {
/* DBGOUT(FN; PTREXP(t); STRLIT(t->name ? t->name : "TASK WITH NO NAME")); */
stack = t;
assert(stack);
assert(t->terminate != TERMINATED);
if (stack->debug)
/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
assert(ash_nazg_gimbatul.type == type_hash("task_env"));
{
/* double when = seconds(); */
int val = 0;
assert(t->func);
assert(stack == t);
val = t->func(t->arg);
/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
assert(ash_nazg_gimbatul.type == type_hash("task_env"));
if (!val) { /* Is task finished? */
deactivate(t);
t->terminate = TERMINATED;
task_unref(t);
stack = NULL;
}
}
}
t = next;
}
外部的执行回调在t->func(t->arg)
中,具体task的处理函数在xcom_base.c中被注册:
void start_run_tasks()
{
force_recover = 0;
client_boot_done = 1;
netboot_ok = 1;
booting = 0;
set_proposer_startpoint();
create_proposers();
set_task(&executor, task_new(executor_task, null_arg, "executor_task", XCOM_THREAD_DEBUG));
set_task(&sweeper, task_new(sweeper_task, null_arg, "sweeper_task", XCOM_THREAD_DEBUG));
set_task(&detector, task_new(detector_task, null_arg, "detector_task", XCOM_THREAD_DEBUG));
set_task(&alive_t, task_new(alive_task, null_arg, "alive_task", XCOM_THREAD_DEBUG));
}
也就是假设task被xcom组件收到,最终会调用executor_task函数。这里逻辑比较多,也比较复杂,暂时略过,处理消息的函数为execute_msg,最终会通过deliver_to_app来调用设置的xcom_receive_data的回调,也就是cb_xcom_receive_data。
在cb_xcom_receive_data的处理中,主要是消息打包成Gcs_xcom_notification 对象,最主要的是会设置一个回调函数do_cb_xcom_receive_data ,然后压入m_notification_queue队列中,至此xcom传递的消息被缓存入队列中。
至此,xcom_taskmain_startup
线程的逻辑主要是读取xcom消息,并且打包消息,设置处理的回调函数,并且将其压入队列,等待另一个线程对其进行进一步的处理。
下面就是process_notification_thread
线程的逻辑了,简而言之就是读取队列中的消息,进行分发,然后push到对应handler的队列中,等待相应的handler的工作线程进行下一步处理。
而m_notification_queue中消息的处理,是由另一个线程process_notification_thread 处理的,这些实现在gcs_xcom_notification.cc中。Gcs_xcom_engine::process 是一个堵塞队列,假设有新消息被入队了,条件标量会进行signal,唤醒process来处理新的消息。
// 等待条件变量被唤醒
m_wait_for_notification_mutex.lock();
while (m_notification_queue.empty())
{
m_wait_for_notification_cond.wait(
m_wait_for_notification_mutex.get_native_mutex()
);
}
notification= m_notification_queue.front();
m_notification_queue.pop();
m_wait_for_notification_mutex.unlock();
// 入队线程,唤醒条件变量
m_wait_for_notification_mutex.lock();
if (m_schedule)
{
m_notification_queue.push(request);
m_wait_for_notification_cond.broadcast();
scheduled= true;
}
m_wait_for_notification_mutex.unlock();
每次取出一个消息,都会通过()操作符来进行对象调用,在上述的类用途中已经介绍过了,每个notification构造的时候传入的回调函数,这个时候都会被调用,于是取出消息后,又跳到了之前设置的do_cb_xcom_receive_data 函数中进行具体的消息处理。
do_cb_xcom_receive_data 的实现在gcs_xcom_interface.cc中,经过一些处理转换为Gcs_message之后,会调用Gcs_xcom_communication::xcom_receive_data 来进行处理,大致处理的逻辑就是会回调一批监听的回调函数来通知它们消息收到了。
这个监听回调函数的注册在gcs在进行join的时候进行注册,具体可见gcs_oeprations.cc中的Gcs_operations::join :
gcs_control->add_event_listener(control_event_listener);
gcs_communication->add_event_listener(communication_event_listener);
我们来看一下communication的监听函数的信息,这个communication_event_listener是在start_group_communication 的时候被传入的,上面也谈到了,最终注册的事件处理函数对象是一个Plugin_gcs_events_handler 对象,事件处理相关的逻辑都被封装在了这个类的实现中。
我们来看一下它的on_message_received 的实现。它会根据Gcs_message 的类型来进行一次分发,我们这里只关心CT_TRANSACTION_MESSAGE 消息,所以主要看一下handle_transactional_message 的实现。
这个实现非常简短,只要是将消息提交给Applier_module对象来进行处理:
const unsigned char* payload_data= NULL;
uint64 payload_size= 0;
Plugin_gcs_message::get_first_payload_item_raw_data(
message.get_message_data().get_payload(),
&payload_data, &payload_size);
this->applier_module->handle(payload_data, static_cast<ulong>(payload_size));
我们继续看一下Applier_module的handle的具体逻辑:
/**
Queues the packet coming from the reader for future application.
@param[in] data the packet data
@param[in] len the packet length
@return the operation status
@retval 0 OK
@retval !=0 Error on queue
*/
int handle(const uchar *data, ulong len)
{
this->incoming->push(new Data_packet(data, len));
return 0;
}
在进行分发之后,也只是放入一个incoming的队列之中,于是我们也想到了,这个incoming队列的处理,应该又是由一个新的线程进行处理的。
处理对应消息的线程是applier_thread_handle
,我们来看一下它的逻辑。和普通的线程逻辑一样,也是一个死循环,主要用去从incoming队列中取出消息,然后根据消息的类型进行分发处理。我们这里只关心apply_data_packet 的处理,它主要生成Pipeline_event并丢入pipeline进行处理。
我们在一开始讲到了,Applier_module
管理着pipeline,用于处理消息,我们来看看:
int
Applier_module::inject_event_into_pipeline(Pipeline_event* pevent,
Continuation* cont)
{
int error= 0;
pipeline->handle_event(pevent, cont);
if ((error= cont->wait()))
log_message(MY_ERROR_LEVEL, "Error at event handling! Got error: %d", error);
return error;
}
很显然,Applier_module从队列取出消息后,将消息交由它的pipeline进行执行,我们可以看到,这里传入了一个Continuation对象,在pipeline处理返回后进行等待,很明显这里的处理可能是异步、并行化的,因为在之前的分析中我们提到了,pipeline其实是一个Event_handler,它是一个链式的处理机制。
我们来看一下pipeline在何时被初始化,又被初始化为什么。pipeline的初始化,主要在Applier_module::setup_applier_module 函数中,调用在plugin.cc中的configure_and_start_applier_module 中,我们来看一下初始化的代码:
//For now, only defined pipelines are accepted.
error=
applier_module->setup_applier_module(STANDARD_GROUP_REPLICATION_PIPELINE,
known_server_reset,
components_stop_timeout_var,
group_sidno,
gtid_assignment_block_size_var,
shared_plugin_stop_lock);
这里指定了pipeline的类型,而在setup_applier_module 中会根据pipeline的类型来获取相应的pipeline,这段实现主要在get_pipeline内:
int get_pipeline(Handler_pipeline_type pipeline_type,
Event_handler** pipeline)
{
DBUG_ENTER("get_pipeline(pipeline_type, pipeline)");
Handler_id* handler_list= NULL;
int num_handlers= get_pipeline_configuration(pipeline_type, &handler_list);
int error= configure_pipeline(pipeline, handler_list, num_handlers);
if(handler_list != NULL)
{
delete[] handler_list;
}
//when there are no handlers, the pipeline is not valid
DBUG_RETURN(error || num_handlers == 0);
}
主要根据pipeline的类型来获取pipeline的数量,然后获取,从get_pipeline_configuration中我们可以得知,STANDARD_GROUP_REPLICATION_PIPELINE类型的handler,有3个,分别是:
(*pipeline_conf)= new Handler_id[3];
(*pipeline_conf)[0]= CATALOGING_HANDLER;
(*pipeline_conf)[1]= CERTIFICATION_HANDLER;
(*pipeline_conf)[2]= SQL_THREAD_APPLICATION_HANDLER;
有了这些配置,我们就可以从configure_pipeline中得知最终的handler到底是什么:
switch (handler_list[i])
{
case CATALOGING_HANDLER:
handler= new Event_cataloger();
break;
case CERTIFICATION_HANDLER:
handler= new Certification_handler();
break;
case SQL_THREAD_APPLICATION_HANDLER:
handler= new Applier_handler();
break;
default:
error= 1; /* purecov: inspected */
log_message(MY_ERROR_LEVEL,
"Unable to bootstrap group replication event handling "
"infrastructure. Unknown handler type: %d",
handler_list[i]); /* purecov: inspected */
}
目前三个handler分别就是以上的三个handler,于是我们每个incoming中的事件,都可能会被以上的三个handler进行处理。
我们可以看出,event首先会经由Event_cataloger来进行处理,主要进行一次过滤,假设当前的事务已经被丢弃了,那么不会交由后续的handler继续进行处理,假设当前的事务刚开始,则取消丢弃标记,继续交由下一个handler进行处理。那么我们看看下一个handler,也就是Certification_handler做了哪些处理。
首先,Certification_handler会对event进行一次分发,假设不处理任何的event,则直接丢给最后一个handler进行处理。我们这里只关心事务的提交,所以主要看一下handle_transaction_id 的处理。
这些处理主要在certification_handler.cc中的Certification_handler::handle_transaction_id 函数。这里会将本地事务和远程事务做不同的处理,区分远程事务还是本地事务,主要是检测该事件的事务上下文中的server_id是否与本服务的server_id一致:
local_transaction= !strncmp(tcle->get_server_uuid(),
local_member_info->get_uuid().c_str(),
UUID_LENGTH);
// 事务冲突检测
/*
Certify transaction.
*/
seq_number= cert_module->certify(tcle->get_snapshot_version(),
tcle->get_write_set(),
!tcle->is_gtid_specified(),
tcle->get_server_uuid(),
gle, local_transaction);
然后会递交给冲突验证模块进行冲突检测,这里冲突检测不在本篇文章中进行梳理,我们假设已经获得了冲突的结果,我们继续往下走,先看一下本地事务的提交流程,假设我们在冲突检测中失败了,需要回滚,则会给事务上下文设置rollback_transaction的标记,然后会存入THD连接信息的事务上下文信息中。
最终,会释放之前提过的ticket,让对应等待提交结果的THD继续执行,并由上述提到的事务上下文来决定是否回滚事务,释放ticket的代码如下:
if (certification_latch->releaseTicket(tcle->get_thread_id()))
{
/* purecov: begin inspected */
log_message(MY_ERROR_LEVEL, "Failed to notify certification outcome");
cont->signal(1,true);
error= 1;
goto end;
/* purecov: end */
}
//The pipeline ended for this transaction
cont->signal(0,true);
值得注意的是,本地事务处理过后,不会再交由Applier_handler继续进行处理。
至此,本地事务的处理逻辑已经完全梳理完。
那么本节点接收到了远程事务,该如何处理呢?远程事务主要根据事务的冲突检测结果来处理,假设事务冲突,则直接丢弃,假设不冲突,则会提交给Applier_handler继续处理。所以可以看出,Applier_handler是处理远程事务的逻辑类。
Applier_handler的处理也相当的简单,仅仅只是将event压入队列,看到队列我们又可以知道,又会由另一条线程处理远程事务的应用,相关代码如下:
/*
There is no need to queue Transaction_context_log_event to
server applier, this event is only need for certification,
performed on the previous handler.
*/
if (event->get_event_type() != binary_log::TRANSACTION_CONTEXT_EVENT)
{
error= channel_interface.queue_packet((const char*)p->payload, p->len);
if (event->get_event_type() == binary_log::GTID_LOG_EVENT &&
local_member_info->get_recovery_status() == Group_member_info::MEMBER_ONLINE)
{
applier_module->get_pipeline_stats_member_collector()
->increment_transactions_waiting_apply();
}
}
好了,又要跳到另一条线程了,咋们来进行一次线程切换。该线程的创建与销毁主要由Applier_module来控制,我们来看一下,Applier针对上面的event,主要做了什么处理。
详细见下一篇文章吧。