Привет, Алексей! Я просматриваю патч. Где-нить через 2-3 часа должны быть комментарии от меня. Можно будет созвониться. И сразу - полагаю, ты сейчас прямо усердно трудишься над тестами (в патче не присутствующими).
Andrei. [email protected] (Alexey Botchkov) writes: > revision-id: b428e822da09b2bc82a2447332bb980f93c80262 > (mariadb-10.4.1-103-gb428e82) > parent(s): 8aae31cf494678b6253031c627566e50bc666920 > committer: Alexey Botchkov > timestamp: 2019-02-14 02:46:57 +0400 > message: > > MDEV-7974 backport fix for mysql bug#12161 (XA and binlog). > > XA transactions now are kept persistent after prepare. > XA_prepare_log_event implamented, and XA tranasctions are logged > as XA transactions. > > --- > sql/handler.cc | 9 ++ > sql/handler.h | 10 ++ > sql/log.cc | 115 +++++++++++++--- > sql/log.h | 10 ++ > sql/log_event.cc | 397 > +++++++++++++++++++++++++++++++++++++++++++++++++++-- > sql/log_event.h | 81 +++++++++++ > sql/sql_class.cc | 18 +-- > sql/sql_class.h | 20 ++- > sql/sql_connect.cc | 1 + > sql/transaction.cc | 69 +++++++++- > sql/transaction.h | 1 + > 11 files changed, 691 insertions(+), 40 deletions(-) > > diff --git a/sql/handler.cc b/sql/handler.cc > index 001055c..3b2a3e0 100644 > --- a/sql/handler.cc > +++ b/sql/handler.cc > @@ -1214,6 +1214,9 @@ static int prepare_or_error(handlerton *ht, THD *thd, > bool all) > } > > > +/*static inline */int > +binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, > + bool all, my_xid xid); > /** > @retval > 0 ok > @@ -1225,6 +1228,7 @@ int ha_prepare(THD *thd) > int error=0, all=1; > THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; > Ha_trx_info *ha_info= trans->ha_list; > + > DBUG_ENTER("ha_prepare"); > > if (ha_info) > @@ -1250,6 +1254,11 @@ int ha_prepare(THD *thd) > > } > } > + if (unlikely(tc_log->log_prepare(thd, all))) > + { > + ha_rollback_trans(thd, all); > + error=1; > + } > } > > DBUG_RETURN(error); > diff --git a/sql/handler.h b/sql/handler.h > index fc6246c..613c1c3 100644 > --- a/sql/handler.h > +++ b/sql/handler.h > @@ -810,6 +810,16 @@ struct xid_t { > long gtrid_length; > long bqual_length; > char data[XIDDATASIZE]; // not \0-terminated ! > + /* > + The size of the string containing serialized Xid representation > + is computed as a sum of > + eight as the number of formatting symbols (X'',X'',) > + plus 2 x XIDDATASIZE (2 due to hex format), > + plus space for decimal digits of XID::formatID, > + plus one for 0x0. > + */ > + static const uint ser_buf_size= > + 8 + 2 * XIDDATASIZE + 4 * sizeof(long) + 1; > > xid_t() {} /* Remove gcc warning */ > bool eq(struct xid_t *xid) > diff --git a/sql/log.cc b/sql/log.cc > index a56117a..316b871 100644 > --- a/sql/log.cc > +++ b/sql/log.cc > @@ -87,6 +87,9 @@ static bool > binlog_savepoint_rollback_can_release_mdl(handlerton *hton, > static int binlog_commit(handlerton *hton, THD *thd, bool all); > static int binlog_rollback(handlerton *hton, THD *thd, bool all); > static int binlog_prepare(handlerton *hton, THD *thd, bool all); > +static int binlog_xa_recover(handlerton *hton, XID *xid_list, uint len); > +static int binlog_commit_by_xid(handlerton *hton, XID *xid); > +static int binlog_rollback_by_xid(handlerton *hton, XID *xid); > static int binlog_start_consistent_snapshot(handlerton *hton, THD *thd); > > static const LEX_CSTRING write_error_msg= > @@ -1688,6 +1691,9 @@ int binlog_init(void *p) > binlog_hton->commit= binlog_commit; > binlog_hton->rollback= binlog_rollback; > binlog_hton->prepare= binlog_prepare; > + binlog_hton->recover= binlog_xa_recover; > + binlog_hton->commit_by_xid = binlog_commit_by_xid; > + binlog_hton->rollback_by_xid = binlog_rollback_by_xid; > binlog_hton->start_consistent_snapshot= binlog_start_consistent_snapshot; > binlog_hton->flags= HTON_NOT_USER_SELECTABLE | HTON_HIDDEN; > return 0; > @@ -1883,23 +1889,16 @@ static inline int > binlog_commit_flush_xid_caches(THD *thd, binlog_cache_mngr *cache_mngr, > bool all, my_xid xid) > { > - if (xid) > - { > - Xid_log_event end_evt(thd, xid, TRUE); > - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); > - } > - else > + /* Mask XA COMMIT ... ONE PHASE as plain BEGIN ... COMMIT */ > + if (!xid) > { > - /* > - Empty xid occurs in XA COMMIT ... ONE PHASE. > - In this case, we do not have a MySQL xid for the transaction, and the > - external XA transaction coordinator will have to handle recovery if > - needed. So we end the transaction with a plain COMMIT query event. > - */ > - Query_log_event end_evt(thd, STRING_WITH_LEN("COMMIT"), > - TRUE, TRUE, TRUE, 0); > - return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); > + DBUG_ASSERT(thd->transaction.xid_state.xa_state == XA_IDLE && > + thd->lex->xa_opt == XA_ONE_PHASE); > + xid= thd->query_id; > } > + > + Xid_log_event end_evt(thd, xid, TRUE); > + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); > } > > /** > @@ -1961,11 +1960,77 @@ static int binlog_prepare(handlerton *hton, THD *thd, > bool all) > do nothing. > just pretend we can do 2pc, so that MySQL won't > switch to 1pc. > - real work will be done in MYSQL_BIN_LOG::log_and_order() > + real work is done in MYSQL_BIN_LOG::log_and_order() > */ > return 0; > } > > + > +static int serialize_xid(XID *xid, char *buf) > +{ > + size_t size; > + buf[0]= '\''; > + memcpy(buf+1, xid->data, xid->gtrid_length); > + size= xid->gtrid_length + 2; > + buf[size-1]= '\''; > + if (xid->bqual_length == 0 && xid->formatID == 1) > + return size; > + > + memcpy(buf+size, ", '", 3); > + memcpy(buf+size+3, xid->data+xid->gtrid_length, xid->bqual_length); > + size+= 3 + xid->bqual_length; > + buf[size]= '\''; > + size++; > + if (xid->formatID != 1) > + size+= sprintf(buf+size, ", %ld", xid->formatID); > + return size; > +} > + > + > +static int binlog_xa_recover(handlerton *hton __attribute__((unused)), > + XID *xid_list __attribute__((unused)), > + uint len __attribute__((unused))) > +{ > + /* Does nothing. */ > + return 0; > +} > + > + > +static int binlog_commit_by_xid(handlerton *hton, XID *xid) > +{ > + THD *thd= current_thd; > + const size_t xc_len= sizeof("XA COMMIT ") - 1; // do not count trailing 0 > + char buf[xc_len + xid_t::ser_buf_size]; > + size_t buflen; > + binlog_cache_mngr *const cache_mngr= thd->binlog_setup_trx_data(); > + > + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_COMMIT); > + > + if (!cache_mngr) > + return 1; > + > + memcpy(buf, "XA COMMIT ", xc_len); > + buflen= xc_len + serialize_xid(xid, buf+xc_len); > + Query_log_event qe(thd, buf, buflen, TRUE, FALSE, FALSE, 0); > + return binlog_flush_cache(thd, cache_mngr, &qe, TRUE, TRUE, TRUE); > +} > + > + > +static int binlog_rollback_by_xid(handlerton *hton, XID *xid) > +{ > + THD *thd= current_thd; > + const size_t xr_len= sizeof("XA ROLLBACK ") - 1; // do not count trailing 0 > + char buf[xr_len + xid_t::ser_buf_size]; > + size_t buflen; > + > + DBUG_ASSERT(thd->lex->sql_command == SQLCOM_XA_ROLLBACK); > + memcpy(buf, "XA ROLLBACK ", xr_len); > + buflen= xr_len + serialize_xid(xid, buf+xr_len); > + Query_log_event qe(thd, buf, buflen, FALSE, TRUE, TRUE, 0); > + return mysql_bin_log.write_event(&qe); > +} > + > + > /* > We flush the cache wrapped in a beging/rollback if: > . aborting a single or multi-statement transaction and; > @@ -9809,6 +9874,24 @@ int TC_LOG_BINLOG::unlog(ulong cookie, my_xid xid) > DBUG_RETURN(BINLOG_COOKIE_GET_ERROR_FLAG(cookie)); > } > > + > +int TC_LOG_BINLOG::log_prepare(THD *thd, bool all) > +{ > + XID *xid= &thd->transaction.xid_state.xid; > + XA_prepare_log_event end_evt(thd, xid, FALSE); > + binlog_cache_mngr *cache_mngr= thd->binlog_setup_trx_data(); > + > + if (!cache_mngr) > + { > + WSREP_DEBUG("Skipping empty log_xid: %s", thd->query()); > + return 0; > + } > + > + cache_mngr->using_xa= FALSE; > + return (binlog_flush_cache(thd, cache_mngr, &end_evt, all, TRUE, TRUE)); > +} > + > + > void > TC_LOG_BINLOG::commit_checkpoint_notify(void *cookie) > { > diff --git a/sql/log.h b/sql/log.h > index 7dfdb36..92fdf95 100644 > --- a/sql/log.h > +++ b/sql/log.h > @@ -61,6 +61,7 @@ class TC_LOG > bool need_prepare_ordered, > bool need_commit_ordered) = 0; > virtual int unlog(ulong cookie, my_xid xid)=0; > + virtual int log_prepare(THD *thd, bool all)= 0; > virtual void commit_checkpoint_notify(void *cookie)= 0; > > protected: > @@ -115,6 +116,10 @@ class TC_LOG_DUMMY: public TC_LOG // use it to disable > the logging > return 1; > } > int unlog(ulong cookie, my_xid xid) { return 0; } > + int log_prepare(THD *thd, bool all) > + { > + return 0; > + } > void commit_checkpoint_notify(void *cookie) { DBUG_ASSERT(0); }; > }; > > @@ -198,6 +203,10 @@ class TC_LOG_MMAP: public TC_LOG > int log_and_order(THD *thd, my_xid xid, bool all, > bool need_prepare_ordered, bool need_commit_ordered); > int unlog(ulong cookie, my_xid xid); > + int log_prepare(THD *thd, bool all) > + { > + return 0; > + } > void commit_checkpoint_notify(void *cookie); > int recover(); > > @@ -698,6 +707,7 @@ class MYSQL_BIN_LOG: public TC_LOG, private MYSQL_LOG > int log_and_order(THD *thd, my_xid xid, bool all, > bool need_prepare_ordered, bool need_commit_ordered); > int unlog(ulong cookie, my_xid xid); > + int log_prepare(THD *thd, bool all); > void commit_checkpoint_notify(void *cookie); > int recover(LOG_INFO *linfo, const char *last_log_name, IO_CACHE > *first_log, > Format_description_log_event *fdle, bool do_xa); > diff --git a/sql/log_event.cc b/sql/log_event.cc > index 7a0d0be..354c5f3 100644 > --- a/sql/log_event.cc > +++ b/sql/log_event.cc > @@ -2139,6 +2139,9 @@ Log_event* Log_event::read_log_event(const char* buf, > uint event_len, > case XID_EVENT: > ev = new Xid_log_event(buf, fdle); > break; > + case XA_PREPARE_LOG_EVENT: > + ev = new XA_prepare_log_event(buf, fdle); > + break; > case RAND_EVENT: > ev = new Rand_log_event(buf, fdle); > break; > @@ -2190,7 +2193,6 @@ Log_event* Log_event::read_log_event(const char* buf, > uint event_len, > case PREVIOUS_GTIDS_LOG_EVENT: > case TRANSACTION_CONTEXT_EVENT: > case VIEW_CHANGE_EVENT: > - case XA_PREPARE_LOG_EVENT: > ev= new Ignorable_log_event(buf, fdle, > get_type_str((Log_event_type) event_type)); > break; > @@ -6222,6 +6224,7 @@ Format_description_log_event(uint8 binlog_ver, const > char* server_ver) > post_header_len[USER_VAR_EVENT-1]= USER_VAR_HEADER_LEN; > post_header_len[FORMAT_DESCRIPTION_EVENT-1]= > FORMAT_DESCRIPTION_HEADER_LEN; > post_header_len[XID_EVENT-1]= XID_HEADER_LEN; > + post_header_len[XA_PREPARE_LOG_EVENT-1]= XA_PREPARE_HEADER_LEN; > post_header_len[BEGIN_LOAD_QUERY_EVENT-1]= BEGIN_LOAD_QUERY_HEADER_LEN; > post_header_len[EXECUTE_LOAD_QUERY_EVENT-1]= > EXECUTE_LOAD_QUERY_HEADER_LEN; > /* > @@ -7874,7 +7877,7 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint > event_len, > buf+= 8; > domain_id= uint4korr(buf); > buf+= 4; > - flags2= *buf; > + flags2= *(buf++); > if (flags2 & FL_GROUP_COMMIT_ID) > { > if (event_len < (uint)header_size + GTID_HEADER_LEN + 2) > @@ -7882,8 +7885,22 @@ Gtid_log_event::Gtid_log_event(const char *buf, uint > event_len, > seq_no= 0; // So is_valid() returns > false > return; > } > - ++buf; > commit_id= uint8korr(buf); > + buf+= 8; > + } > + if (flags2 & FL_XA_TRANSACTION) > + { > + xid.formatID= (long) buf[0]; > + xid.gtrid_length= (long) buf[1]; > + xid.bqual_length= (long) buf[2]; > + > + buf+= 3; > + if (xid.formatID >= 0) > + { > + long data_length= xid.bqual_length + xid.gtrid_length; > + memcpy(xid.data, buf, data_length); > + buf+= data_length; > + } > } > } > > @@ -7914,6 +7931,12 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 > seq_no_arg, > /* Preserve any DDL or WAITED flag in the slave's binlog. */ > if (thd_arg->rgi_slave) > flags2|= (thd_arg->rgi_slave->gtid_ev_flags2 & (FL_DDL|FL_WAITED)); > + if (thd->transaction.xid_state.xa_state == XA_IDLE && > + thd->lex->xa_opt != XA_ONE_PHASE) > + { > + flags2|= FL_XA_TRANSACTION; > + xid= thd->transaction.xid_state.xid; > + } > } > > > @@ -7956,7 +7979,7 @@ Gtid_log_event::peek(const char *event_start, size_t > event_len, > bool > Gtid_log_event::write() > { > - uchar buf[GTID_HEADER_LEN+2]; > + uchar buf[GTID_HEADER_LEN+2+sizeof(XID)]; > size_t write_len; > > int8store(buf, seq_no); > @@ -7968,8 +7991,25 @@ Gtid_log_event::write() > write_len= GTID_HEADER_LEN + 2; > } > else > + write_len= 13; > + > + if (flags2 & FL_XA_TRANSACTION) > + { > + buf[write_len]= (uchar) ((char) xid.formatID); > + buf[write_len+1]= (uchar) xid.gtrid_length; > + buf[write_len+2]= (uchar) xid.bqual_length; > + write_len+= 3; > + if (xid.formatID >= 0) > + { > + long data_length= xid.bqual_length + xid.gtrid_length; > + memcpy(buf+write_len, xid.data, data_length); > + write_len+= data_length; > + } > + } > + > + if (write_len < GTID_HEADER_LEN) > { > - bzero(buf+13, GTID_HEADER_LEN-13); > + bzero(buf+write_len, GTID_HEADER_LEN-write_len); > write_len= GTID_HEADER_LEN; > } > return write_header(write_len) || > @@ -8012,7 +8052,7 @@ Gtid_log_event::make_compatible_event(String *packet, > bool *need_dummy_event, > void > Gtid_log_event::pack_info(Protocol *protocol) > { > - char buf[6+5+10+1+10+1+20+1+4+20+1]; > + char buf[6+5+10+1+10+1+20+1+4+20+1+5+128]; > char *p; > p = strmov(buf, (flags2 & FL_STANDALONE ? "GTID " : "BEGIN GTID ")); > p= longlong10_to_str(domain_id, p, 10); > @@ -8026,6 +8066,12 @@ Gtid_log_event::pack_info(Protocol *protocol) > p= longlong10_to_str(commit_id, p, 10); > } > > + if (flags2 & FL_XA_TRANSACTION) > + { > + p= strmov(p, " XID :"); > + p= strnmov(p, xid.data, xid.bqual_length + xid.gtrid_length); > + } > + > protocol->store(buf, p-buf, &my_charset_bin); > } > > @@ -8079,11 +8125,25 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) > thd->lex->sql_command= SQLCOM_BEGIN; > thd->is_slave_error= 0; > status_var_increment(thd->status_var.com_stat[thd->lex->sql_command]); > - if (trans_begin(thd, 0)) > + if (flags2 & FL_XA_TRANSACTION) > { > - DBUG_PRINT("error", ("trans_begin() failed")); > - thd->is_slave_error= 1; > + thd->lex->xid= &xid; > + thd->lex->xa_opt= XA_NONE; > + if (trans_xa_start(thd)) > + { > + DBUG_PRINT("error", ("trans_xa_start() failed")); > + thd->is_slave_error= 1; > + } > + } > + else > + { > + if (trans_begin(thd, 0)) > + { > + DBUG_PRINT("error", ("trans_begin() failed")); > + thd->is_slave_error= 1; > + } > } > + > thd->update_stats(); > > if (likely(!thd->is_slave_error)) > @@ -8202,9 +8262,29 @@ Gtid_log_event::print(FILE *file, PRINT_EVENT_INFO > *print_event_info) > buf, print_event_info->delimiter)) > goto err; > } > - if (!(flags2 & FL_STANDALONE)) > - if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", > print_event_info->delimiter)) > + if ((flags2 & FL_XA_TRANSACTION) && !is_flashback) > + { > + my_b_write_string(&cache, "XA START '"); > + my_b_write(&cache, (uchar *) xid.data, xid.gtrid_length); > + my_b_write_string(&cache, "'"); > + if (xid.bqual_length > 0 || xid.formatID != 1) > + { > + my_b_write_string(&cache, ", '"); > + my_b_write(&cache, (uchar *) xid.data+xid.gtrid_length, > xid.bqual_length); > + my_b_write_string(&cache, "'"); > + if (xid.formatID != 1) > + if (my_b_printf(&cache, ", %d", xid.formatID)) > + goto err; > + } > + if (my_b_printf(&cache, "%s\n", print_event_info->delimiter)) > + goto err; > + } > + else if (!(flags2 & FL_STANDALONE)) > + { > + if (my_b_printf(&cache, is_flashback ? "COMMIT\n%s\n" : "BEGIN\n%s\n", > + print_event_info->delimiter)) > goto err; > + } > > return cache.flush_data(); > err: > @@ -9003,6 +9083,300 @@ Xid_log_event::do_shall_skip(rpl_group_info *rgi) > #endif /* !MYSQL_CLIENT */ > > > +/** > + Function serializes XID which is characterized by by four last arguments > + of the function. > + Serialized XID is presented in valid hex format and is returned to > + the caller in a buffer pointed by the first argument. > + The buffer size provived by the caller must be not less than > + 8 + 2 * XIDDATASIZE + 4 * sizeof(XID::formatID) + 1, see > + XID::serialize_xid() that is a caller and plugin.h for XID declaration. > + > + @param buf pointer to a buffer allocated for storing serialized data > + > + @return the value of the buffer pointer > +*/ > + > +char *XA_prepare_log_event::event_xid_t::serialize(char *buf) const > +{ > + int i; > + char *c= buf; > + /* > + Build a string like following pattern: > + X'hex11hex12...hex1m',X'hex21hex22...hex2n',11 > + and store it into buf. > + Here hex1i and hex2k are hexadecimals representing XID's internal > + raw bytes (1 <= i <= m, 1 <= k <= n), and `m' and `n' even numbers > + half of which corresponding to the lengths of XID's components. > + */ > + c[0]= 'X'; > + c[1]= '\''; > + c+= 2; > + for (i= 0; i < gtrid_length; i++) > + { > + c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; > + c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; > + c+= 2; > + } > + c[0]= '\''; > + c[1]= ','; > + c[2]= 'X'; > + c[3]= '\''; > + c+= 4; > + > + for (; i < gtrid_length + bqual_length; i++) > + { > + c[0]=_dig_vec_lower[((uchar*) data)[i] >> 4]; > + c[1]=_dig_vec_lower[((uchar*) data)[i] & 0x0f]; > + c+= 2; > + } > + c[0]= '\''; > + sprintf(c+1, ",%lu", formatID); > + > + return buf; > +} > + > + > +/************************************************************************** > + XA_prepare_log_event methods > +**************************************************************************/ > +/** > + @note > + It's ok not to use int8store here, > + as long as xid_t::set(ulonglong) and > + xid_t::get_n_xid doesn't do it either. > + We don't care about actual values of xids as long as > + identical numbers compare identically > +*/ > + > +XA_prepare_log_event:: > +XA_prepare_log_event(const char* buf, > + const Format_description_log_event *description_event) > + :Log_event(buf, description_event) > +{ > + uint32 temp= 0; > + uint8 temp_byte; > + > + buf+= description_event->common_header_len + > + description_event->post_header_len[XA_PREPARE_LOG_EVENT-1]; > + memcpy(&temp_byte, buf, 1); > + one_phase= (bool) temp_byte; > + buf += sizeof(temp_byte); > + memcpy(&temp, buf, sizeof(temp)); > + m_xid.formatID= le32toh(temp); > + buf += sizeof(temp); > + memcpy(&temp, buf, sizeof(temp)); > + m_xid.gtrid_length= le32toh(temp); > + buf += sizeof(temp); > + memcpy(&temp, buf, sizeof(temp)); > + m_xid.bqual_length= le32toh(temp); > + buf += sizeof(temp); > + memcpy(m_xid.data, buf, m_xid.gtrid_length + m_xid.bqual_length); > + > + xid= NULL; > +} > + > + > +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) > +void XA_prepare_log_event::pack_info(Protocol *protocol) > +{ > + char buf[ser_buf_size]; > + char query[sizeof("XA COMMIT ONE PHASE") + 1 + sizeof(buf)]; > + > + /* RHS of the following assert is unknown to client sources */ > + compile_time_assert(ser_buf_size == XID::ser_buf_size); > + m_xid.serialize(buf); > + sprintf(query, > + (one_phase ? "XA COMMIT %s ONE PHASE" : "XA PREPARE %s"), > + buf); > + > + protocol->store(query, strlen(query), &my_charset_bin); > +} > +#endif > + > + > +#ifndef MYSQL_CLIENT > +bool XA_prepare_log_event::write() > +{ > + uchar data[1 + 4 + 4 + 4]; > + uint8 one_phase_byte= one_phase; > + > + data[0]= one_phase; > + int4store(data+1, static_cast<XID*>(xid)->formatID); > + int4store(data+(1+4), static_cast<XID*>(xid)->gtrid_length); > + int4store(data+(1+4+4), static_cast<XID*>(xid)->bqual_length); > + > + DBUG_ASSERT(xid_bufs_size == sizeof(data) - 1); > + > + return write_header(sizeof(one_phase_byte) + xid_bufs_size + > + static_cast<XID*>(xid)->gtrid_length + > + static_cast<XID*>(xid)->bqual_length) || > + write_data(data, sizeof(data)) || > + write_data((uchar*) static_cast<XID*>(xid)->data, > + static_cast<XID*>(xid)->gtrid_length + > + static_cast<XID*>(xid)->bqual_length) || > + write_footer(); > +} > +#endif > + > + > +#ifdef MYSQL_CLIENT > +bool XA_prepare_log_event::print(FILE* file, PRINT_EVENT_INFO* > print_event_info) > +{ > + Write_on_release_cache cache(&print_event_info->head_cache, file, > + Write_on_release_cache::FLUSH_F, this); > + char buf[ser_buf_size]; > + > + m_xid.serialize(buf); > + > + if (!print_event_info->short_form) > + { > + print_header(&cache, print_event_info, FALSE); > + if (my_b_printf(&cache, "\tXID = %s\n", buf)) > + goto error; > + } > + > + if (my_b_printf(&cache, "XA END %s\n%s\n", > + buf, print_event_info->delimiter) || > + my_b_printf(&cache, "XA PREPARE %s\n%s\n", > + buf, print_event_info->delimiter)) > + goto error; > + > + return cache.flush_data(); > +error: > + return TRUE; > +} > +#endif /* MYSQL_CLIENT */ > + > + > +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT) > +int XA_prepare_log_event::do_apply_event(rpl_group_info *rgi) > +{ > + bool res; > + int err; > + rpl_gtid gtid; > + uint64 sub_id= 0; > + Relay_log_info const *rli= rgi->rli; > + xid_t xid; > + void *hton= NULL; > + > + /* > + XID_EVENT works like a COMMIT statement. And it also updates the > + mysql.gtid_slave_pos table with the GTID of the current transaction. > + > + Therefore, it acts much like a normal SQL statement, so we need to do > + THD::reset_for_next_command() as if starting a new statement. > + */ > + thd->reset_for_next_command(); > + /* > + Record any GTID in the same transaction, so slave state is > transactionally > + consistent. > + */ > +#ifdef WITH_WSREP > + thd->wsrep_affected_rows= 0; > +#endif > + > + if (rgi->gtid_pending) > + { > + xa_states c_state= thd->transaction.xid_state.xa_state; > + sub_id= rgi->gtid_sub_id; > + rgi->gtid_pending= false; > + > + gtid= rgi->current_gtid; > + > + thd->transaction.xid_state.xa_state= XA_ACTIVE; > + err= rpl_global_gtid_slave_state->record_gtid(thd, >id, sub_id, true, > + false, &hton); > + thd->transaction.xid_state.xa_state= c_state; > + if (err) > + { > + int ec= thd->get_stmt_da()->sql_errno(); > + /* > + Do not report an error if this is really a kill due to a deadlock. > + In this case, the transaction will be re-tried instead. > + */ > + if (!is_parallel_retry_error(rgi, ec)) > + rli->report(ERROR_LEVEL, ER_CANNOT_UPDATE_GTID_STATE, > rgi->gtid_info(), > + "Error during XID COMMIT: failed to update GTID state in > " > + "%s.%s: %d: %s", > + "mysql", rpl_gtid_slave_state_table_name.str, ec, > + thd->get_stmt_da()->message()); > + thd->is_slave_error= 1; > + return err; > + } > + > + DBUG_EXECUTE_IF("gtid_fail_after_record_gtid", > + { my_error(ER_ERROR_DURING_COMMIT, MYF(0), HA_ERR_WRONG_COMMAND); > + thd->is_slave_error= 1; > + return 1; > + }); > + } > + /* For a slave XA_prepare_log_event is COMMIT */ > + general_log_print(thd, COM_QUERY, > + "COMMIT /* implicit, from Xid_log_event */"); > + thd->variables.option_bits&= ~OPTION_GTID_BEGIN; > + > + xid.set(m_xid.formatID, > + m_xid.data, m_xid.gtrid_length, > + m_xid.data + m_xid.gtrid_length, m_xid.bqual_length); > + > + thd->lex->xid= &xid; > + if (trans_xa_end(thd)) > + return 1; > + > + if (!one_phase) > + { > + res= trans_xa_prepare(thd); > + } > + else > + { > + res= trans_xa_commit(thd); > + thd->mdl_context.release_transactional_locks(); > + } > + > + > + if (!res && sub_id) > + rpl_global_gtid_slave_state->update_state_hash(sub_id, >id, hton, rgi); > + > + /* > + Increment the global status commit count variable > + */ > + status_var_increment(thd->status_var.com_stat[SQLCOM_COMMIT]); > + > + return res; > +} > + > + > +Log_event::enum_skip_reason > +XA_prepare_log_event::do_shall_skip(rpl_group_info *rgi) > +{ > + DBUG_ENTER("Xid_log_event::do_shall_skip"); > + if (rgi->rli->slave_skip_counter > 0) > + { > + DBUG_ASSERT(!rgi->rli->get_flag(Relay_log_info::IN_TRANSACTION)); > + thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_GTID_BEGIN); > + DBUG_RETURN(Log_event::EVENT_SKIP_COUNT); > + } > +#ifdef WITH_WSREP > + else if (wsrep_mysql_replication_bundle && WSREP_ON && > + opt_slave_domain_parallel_threads == 0) > + { > + if (++thd->wsrep_mysql_replicated < (int)wsrep_mysql_replication_bundle) > + { > + WSREP_DEBUG("skipping wsrep commit %d", thd->wsrep_mysql_replicated); > + DBUG_RETURN(Log_event::EVENT_SKIP_IGNORE); > + } > + else > + { > + thd->wsrep_mysql_replicated = 0; > + } > + } > +#endif > + DBUG_RETURN(Log_event::do_shall_skip(rgi)); > +} > +#endif /* !MYSQL_CLIENT */ > + > + > /************************************************************************** > User_var_log_event methods > **************************************************************************/ > @@ -14789,7 +15163,6 @@ bool event_that_should_be_ignored(const char *buf) > event_type == PREVIOUS_GTIDS_LOG_EVENT || > event_type == TRANSACTION_CONTEXT_EVENT || > event_type == VIEW_CHANGE_EVENT || > - event_type == XA_PREPARE_LOG_EVENT || > (uint2korr(buf + FLAGS_OFFSET) & LOG_EVENT_IGNORABLE_F)) > return 1; > return 0; > diff --git a/sql/log_event.h b/sql/log_event.h > index 38a40c9..b5c48c9 100644 > --- a/sql/log_event.h > +++ b/sql/log_event.h > @@ -217,6 +217,7 @@ class String; > #define GTID_HEADER_LEN 19 > #define GTID_LIST_HEADER_LEN 4 > #define START_ENCRYPTION_HEADER_LEN 0 > +#define XA_PREPARE_HEADER_LEN 0 > > /* > Max number of possible extra bytes in a replication event compared to a > @@ -3064,6 +3065,79 @@ class Xid_log_event: public Log_event > #endif > }; > > + > +/** > + @class XA_prepare_log_event > + > + Similar to Xid_log_event except that > + - it is specific to XA transaction > + - it carries out the prepare logics rather than the final committing > + when @c one_phase member is off. > + From the groupping perspective the event finalizes the current "prepare" > group > + started with XA START Query-log-event. > + When @c one_phase is false Commit of Rollback for XA transaction are > + logged separately to the prepare-group events so being a groups of > + their own. > +*/ > + > +class XA_prepare_log_event: public Log_event > +{ > +protected: > + /* The event_xid_t members were copied from handler.h */ > + struct event_xid_t > + { > + long formatID; > + long gtrid_length; > + long bqual_length; > + char data[MYSQL_XIDDATASIZE]; // not \0-terminated ! > + char *serialize(char *buf) const; > + }; > + > + /* size of serialization buffer is explained in $MYSQL/sql/xa.h. */ > + static const uint ser_buf_size= > + 8 + 2 * MYSQL_XIDDATASIZE + 4 * sizeof(long) + 1; > + > + /* Total size of buffers to hold serialized members of XID struct */ > + static const int xid_bufs_size= 12; > + event_xid_t m_xid; > + void *xid; > + bool one_phase; > + > +public: > +#ifdef MYSQL_SERVER > + XA_prepare_log_event(THD* thd_arg, XID *xid_arg, bool one_phase_arg): > + Log_event(thd_arg, 0, TRUE), xid(xid_arg), one_phase(one_phase_arg) > + { > + cache_type= Log_event::EVENT_NO_CACHE; > + } > +#ifdef HAVE_REPLICATION > + void pack_info(Protocol* protocol); > +#endif /* HAVE_REPLICATION */ > +#else > + bool print(FILE* file, PRINT_EVENT_INFO* print_event_info); > +#endif > + XA_prepare_log_event(const char* buf, > + const Format_description_log_event > *description_event); > + ~XA_prepare_log_event() {} > + Log_event_type get_type_code() { return XA_PREPARE_LOG_EVENT; } > + int get_data_size() > + { > + return xid_bufs_size + m_xid.gtrid_length + m_xid.bqual_length; > + } > + > +#ifdef MYSQL_SERVER > + bool write(); > +#endif > + bool is_valid() const { return 1; } > + > +private: > +#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) > + virtual int do_apply_event(rpl_group_info *rgi); > + enum_skip_reason do_shall_skip(rpl_group_info *rgi); > +#endif > +}; > + > + > /** > @class User_var_log_event > > @@ -3376,6 +3450,11 @@ class Gtid_log_event: public Log_event > uint64 seq_no; > uint64 commit_id; > uint32 domain_id; > +#ifdef MYSQL_SERVER > + XID xid; > +#else > + struct st_mysql_xid xid; > +#endif > uchar flags2; > > /* Flags2. */ > @@ -3404,6 +3483,8 @@ class Gtid_log_event: public Log_event > static const uchar FL_WAITED= 16; > /* FL_DDL is set for event group containing DDL. */ > static const uchar FL_DDL= 32; > + /* FL_XA_TRANSACTION is set for XA transaction. */ > + static const uchar FL_XA_TRANSACTION= 64; > > #ifdef MYSQL_SERVER > Gtid_log_event(THD *thd_arg, uint64 seq_no, uint32 domain_id, bool > standalone, > diff --git a/sql/sql_class.cc b/sql/sql_class.cc > index fa2f866..cc75da9 100644 > --- a/sql/sql_class.cc > +++ b/sql/sql_class.cc > @@ -1461,12 +1461,19 @@ void THD::cleanup(void) > DBUG_ASSERT(cleanup_done == 0); > > set_killed(KILL_CONNECTION); > -#ifdef ENABLE_WHEN_BINLOG_WILL_BE_ABLE_TO_PREPARE > if (transaction.xid_state.xa_state == XA_PREPARED) > { > -#error xid_state in the cache should be replaced by the allocated value > + trans_detach(this); > + transaction.xid_state.xa_state= XA_NOTR; > + transaction.xid_state.rm_error= 0; > + } > + else > + { > + transaction.xid_state.xa_state= XA_NOTR; > + transaction.xid_state.rm_error= 0; > + trans_rollback(this); > + xid_cache_delete(this, &transaction.xid_state); > } > -#endif > > mysql_ha_cleanup(this); > locked_tables_list.unlock_locked_tables(this); > @@ -1474,11 +1481,6 @@ void THD::cleanup(void) > delete_dynamic(&user_var_events); > close_temporary_tables(); > > - transaction.xid_state.xa_state= XA_NOTR; > - transaction.xid_state.rm_error= 0; > - trans_rollback(this); > - xid_cache_delete(this, &transaction.xid_state); > - > DBUG_ASSERT(open_tables == NULL); > /* > If the thread was in the middle of an ongoing transaction (rolled > diff --git a/sql/sql_class.h b/sql/sql_class.h > index 69fabee..76befcb 100644 > --- a/sql/sql_class.h > +++ b/sql/sql_class.h > @@ -1255,6 +1255,18 @@ typedef struct st_xid_state { > /* Error reported by the Resource Manager (RM) to the Transaction Manager. > */ > uint rm_error; > XID_cache_element *xid_cache_element; > + /* > + Binary logging status. > + It is set to TRUE at XA PREPARE if the transaction was written > + to the binlog. > + Naturally FALSE means the transaction was not written to > + the binlog. Happens if the trnasaction did not modify anything > + or binlogging was turned off. In that case we shouldn't binlog > + the consequent XA COMMIT/ROLLBACK. > + The recovered transaction after server restart sets it to TRUE always. > + That can cause inconsistencies (shoud be fixed?). > + */ > + bool is_binlogged; > > /** > Check that XA transaction has an uncommitted work. Report an error > @@ -1278,6 +1290,12 @@ typedef struct st_xid_state { > } > return false; > } > + > + void reset() > + { > + xid.null(); > + is_binlogged= FALSE; > + } > } XID_STATE; > > void xid_cache_init(void); > @@ -2603,7 +2621,7 @@ class THD :public Statement, > then. > */ > if (!xid_state.rm_error) > - xid_state.xid.null(); > + xid_state.reset(); > free_root(&mem_root,MYF(MY_KEEP_PREALLOC)); > DBUG_VOID_RETURN; > } > diff --git a/sql/sql_connect.cc b/sql/sql_connect.cc > index b48070b..3e4a067 100644 > --- a/sql/sql_connect.cc > +++ b/sql/sql_connect.cc > @@ -1414,6 +1414,7 @@ void do_handle_one_connection(CONNECT *connect) > #endif > end_thread: > close_connection(thd); > + thd->get_stmt_da()->reset_diagnostics_area(); > > if (thd->userstat_running) > update_global_user_stats(thd, create_user, time(NULL)); > diff --git a/sql/transaction.cc b/sql/transaction.cc > index 13614d3..64533d7 100644 > --- a/sql/transaction.cc > +++ b/sql/transaction.cc > @@ -790,6 +790,44 @@ bool trans_release_savepoint(THD *thd, LEX_CSTRING name) > > > /** > + Detach the current XA transaction; > + > + @param thd Current thread > + > + @retval FALSE Success > + @retval TRUE Failure > +*/ > + > +bool trans_detach(THD *thd) > +{ > + XID_STATE *xid_s= &thd->transaction.xid_state; > + Ha_trx_info *ha_info, *ha_info_next; > + > + DBUG_ENTER("trans_detach"); > + > +// DBUG_ASSERT(xid_s->xa_state == XA_PREPARED && > +// xid_cache_search(thd, &xid_s->xid)); > + > + xid_cache_delete(thd, xid_s); > + if (xid_cache_insert(&xid_s->xid, XA_PREPARED)) > + DBUG_RETURN(TRUE); > + > + for (ha_info= thd->transaction.all.ha_list; > + ha_info; > + ha_info= ha_info_next) > + { > + ha_info_next= ha_info->next(); > + ha_info->reset(); /* keep it conveniently zero-filled */ > + } > + > + thd->transaction.all.ha_list= 0; > + thd->transaction.all.no_2pc= 0; > + > + DBUG_RETURN(FALSE); > +} > + > + > +/** > Starts an XA transaction with the given xid value. > > @param thd Current thread > @@ -928,6 +966,12 @@ bool trans_xa_commit(THD *thd) > res= !xs; > if (res) > my_error(ER_XAER_NOTA, MYF(0)); > + else if (thd->in_multi_stmt_transaction_mode()) > + { > + my_error(ER_XAER_RMFAIL, MYF(0), > + xa_state_names[thd->transaction.xid_state.xa_state]); > + res= TRUE; > + } > else > { > res= xa_trans_rolled_back(xs); > @@ -978,8 +1022,16 @@ bool trans_xa_commit(THD *thd) > { > DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); > > - res= MY_TEST(ha_commit_one_phase(thd, 1)); > - if (res) > + if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) > + { > + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, > + thd->query(), thd->query_length(), > + FALSE, FALSE, FALSE, 0); > + } > + else > + res= 0; > + > + if (res || (res= MY_TEST(ha_commit_one_phase(thd, 1)))) > my_error(ER_XAER_RMERR, MYF(0)); > } > } > @@ -1044,7 +1096,18 @@ bool trans_xa_rollback(THD *thd) > DBUG_RETURN(TRUE); > } > > - res= xa_trans_force_rollback(thd); > + if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) > + { > + res= thd->binlog_query(THD::THD::STMT_QUERY_TYPE, > + thd->query(), thd->query_length(), > + FALSE, FALSE, FALSE, 0); > + } > + else > + res= 0; > + > + res= res || xa_trans_force_rollback(thd); > + if (res || (res= MY_TEST(xa_trans_force_rollback(thd)))) > + my_error(ER_XAER_RMERR, MYF(0)); > > thd->variables.option_bits&= ~(OPTION_BEGIN | OPTION_KEEP_LOG); > thd->transaction.all.reset(); > diff --git a/sql/transaction.h b/sql/transaction.h > index 7e34693..f228cc6 100644 > --- a/sql/transaction.h > +++ b/sql/transaction.h > @@ -29,6 +29,7 @@ bool trans_commit(THD *thd); > bool trans_commit_implicit(THD *thd); > bool trans_rollback(THD *thd); > bool trans_rollback_implicit(THD *thd); > +bool trans_detach(THD *thd); > > bool trans_commit_stmt(THD *thd); > bool trans_rollback_stmt(THD *thd); > _______________________________________________ > commits mailing list > [email protected] > https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits _______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : [email protected] Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp

