上一个简析主要介绍了MGR的插件注册、调用流程,这篇文章会主要记录一下MGR中几个关键模块的梳理,便于后续将它们串联起来。

各种类用途

Continuation

Continuation其实是一个Condition的包装类,实现其实很简单,主要的用途就是调用者陷入睡眠来等待信号,而其余线程可以signal来通知等待者操作完成,这个signal是广播,具体的实现可以可以详见pipeline_interfaces.h中的实现,和条件变量相比,主要包装了几个返回值用于读取最终执行的结果,主要有执行的错误码error_code和该事务是否应该丢弃transaction_discarded

Data_packet

Data_packet用于存储原始的网络数据包

Pipeline_event

该类主要用于Data_packetLog_event之间的相互转化

Event_handler

事件处理器,主要用于处理各种事件,是一个接口类,处理事件的入口点在handle_event中:

  /**
    Handling of an event as defined in the handler implementation.

    As the handler can be included in a pipeline, somewhere in the
    method, the handler.next(event,continuation) method shall be
    invoked to allow the passing of the event to the next handler.

    Also, if an error occurs, the continuation object shall be used to
    propagate such error. This class can also be used to know/report
    when the transaction to whom the event belongs was discarded.

    @param[in]      event           the pipeline event to be handled
    @param[in,out]  continuation    termination notification object.
  */
  virtual int handle_event(Pipeline_event *event, Continuation *continuation)= 0;
  
  /**
    Handling of an action as defined in the handler implementation.

    As the handler can be included in a pipeline, somewhere in the
    method, the handler.next(action) method shall be invoked to allow
    the passing of the action to the next handler.

    @note Actions should not be treated asynchronously and as so, Continuations
    are not used here. Errors are returned directly or passed by in the action
    if it includes support for such

    @param[in]      action         the pipeline event to be handled
  */
  virtual int handle_action(Pipeline_action *action)= 0;

Event_handler的逻辑其实非常简单,其实每一个Event_handler都是一个链表上的节点,每一个该对象都有一个指向下一个Event_handler的指针,而每个链表上的节点,都支持在尾节点上再追加一个Event_handler对象。同时Event_handler支持role属性,可以通过role来查找想要的对象指针。

Event_handlerprotected成员函数中,有两个next函数,分别负责将Pipeline_eventPipeline_action传递向下一个节点,所以事件的传递可以一直向下进行。

before_commit 流程

有了以上大概的认识之后,我们可以着手分析一下group_replication_trans_before_commit的流程。

首先是各种合法性验证,假设无法满足提交条件,则直接返回。满足提交条件之后,会获得shared_plugin_stop_lock的读锁,则首先创建一个Group commit的cache,这个cache会存在于一个全局的链表中(io_cache_unused_list)做重用来提高性能。

后续会创建Transaction_context_log_event对象,该对象保存着事务的binlog以及用于冲突检测的gtid_executed

假设该事务是DML,则会获取当前的SessionWrite set,然后会将Transaction_context_log_event写入Group commit cache,然后会向Group commit cache中追加一个Gtid_log_event。最终Group commit cache中的内容会写入一个Transaction_message的对象中,接下来的步骤就是开始要向整个集群进行广播了。

为了进行广播,首先要注册一个ticket用于等待集群消息,然后进行流控检测。接下来会通过gcs_module进行原子广播,原子广播一旦完成,则释放shared_plugin_stop_lock读锁,可以看到在并发提交事务的时候,读与读是可以并行完成的。

释放读锁之后,会等待事务冲突的结果,由

certification_latch->waitTicket(param->thread_id)

完成,假设成功,则会重置cache log,这个cache不是Group commit的cache,是binlog的cache,然后将Group commit cache放入全局队列进行缓存。

然而我们回到提交上,提交后的代码为:

    /*
      Check whether the transaction should commit or abort given the
      plugin feedback.
    */
    if (thd->get_transaction()->get_rpl_transaction_ctx()->is_transaction_rollback() ||
        (DBUG_EVALUATE_IF("simulate_transaction_rollback_request", true, false)))
    {
      ha_rollback_low(thd, all);
      gtid_state->update_on_rollback(thd);
      thd_get_cache_mngr(thd)->reset();
      if (thd->get_stmt_da()->is_ok())
        thd->get_stmt_da()->reset_diagnostics_area();
      my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
      DBUG_RETURN(RESULT_ABORTED);
    }

所以在获得MGR的冲突验证之后,会根据验证结果,来判断当前事务是否需要回滚。这个事务是怎样被冲突验证处理的,以及本地事务和远程事务的处理流程,我们在下一篇进行简析。

共 0 条回复
暂时没有人回复哦,赶紧抢沙发
发表新回复

作者

sryan
today is a good day