Rocksdb Code Analysis Transaction 2PC
Background Introduction
Two Phase Commit
Two Phase Commit(2PC)适用于保证多Transaction 之间逻辑上的原子性。要不所有Txn都提交成功,要不所有Txn都没有提交成功。2PC常用于分布式场景下实现分布式事务,对于分布式事务来说将一个客户端的事务拆分成各个节点的小事务,通过2PC协议可以保证客户端整个Txn 的原子性。然而,对于支持2PC的引擎来说,不需要考虑2PC具体实现细节,只需要提供Prepare 和 Commit两个接口提供调用就可以了。具体的2PC 协议见Two Phase Commit Protocol. 这里只讨论Rocksdb 为上层实现2PC 提供的Prepare 和Commit 接口。
WriteBatch
WriteBatch 是在Rocksdb写入时使用的一种数据结构,负责存储要写入到WAL 和Memtable的数据。数据的存储结构举例如下:
Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);
Sequence 和NumRecords 字段是其固定字段,之后是操作的命令和操作的数据本身。
PessimisticTransaction 2PC
Rocksdb 实现的2PC接口的流程大体上思路是在之前的Txn流程中添加Prepare 调用,通过Prepare 将Txn流程写入WAL但不写入Memtable。之后通过Commit调用,在WAL中记录Commit Txn,再将本次Txn的数据部分写入Memtable。例如:
TransactionDB* db;
TransactionDB::Open(Options(), TransactionDBOptions(), DBPath, &db);
TransactionOptions txn_options;
txn_options.two_phase_commit = true
Transaction* txn = db->BeginTransaction(write_options, txn_options);
txn->Put(...);
txn->Get(...);
txn->Prepare();
txn->Commit();
为实现2PC 需要将WriteBatch内的数据结构进行改进。初始化时候,在Sequence和NumRecords字段后添加Noop占位符字段。WriteBatch 初始化如下
Sequence(0);NumRecords(0);Noop;
接下来调用Put Get 接口后,WriteBatch如下
Sequence(0);NumRecords(0);Noop;Put(a,1);Get(a);
调用Prepare 后Noop 标记会被BeginPrepare 替代,同时写入EndPrepare(XID),之后会将此WriteBatch 写入WAL,WriteBatch 结构如下
Sequence(0);NumRecords();BeginPrepare();Put(a,1);Get(a);EndPrepare(XID);
调用Commit 后重新生成一个WriteBatch 如下,然后将此WriteBatch 分别写入WAL和Memtable,写入WAL的部分只是Commit 命令本身和其Xid,因为之前已经将数据部分写入过WAL了没有必要再写一次。
Sequence(0);NumRecords();Commit(xid);BeginPrepare();Put(a,1);Get(a);EndPrepare(XID);
Code Analysis
目前只有PessimisticTransaction实现了2PC 的相关接口。对于PessimisticTransaction的原理和基本接口实现可以参考ROCKSDB_TRANSACTION。这里只介绍跟2PC 相关的Prepare 和Commit 接口。
PessimisticTransaction::Prepare
Status PessimisticTransaction::Prepare() {
s = PrepareInternal();
}
Status WriteCommittedTxn::PrepareInternal() {
WriteOptions write_options = write_options_;
write_options.disableWAL = false;
WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),name_);
Status s =
db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
/*callback*/ nullptr, &log_number_, /*log ref*/ 0,
/* disable_memtable*/ true);
return s;
}
Status WriteBatchInternal::MarkEndPrepare(WriteBatch* b, const Slice& xid) {
// a manually constructed batch can only contain one prepare section
assert(b->rep_[12] == static_cast<char>(kTypeNoop));
// rewrite noop as begin marker
b->rep_[12] = static_cast<char>(kTypeBeginPrepareXID);
b->rep_.push_back(static_cast<char>(kTypeEndPrepareXID));
PutLengthPrefixedSlice(&b->rep_, xid);
return Status::OK();
}
通过PessimisticTransaction::Prepare => WriteCommittedTxn::PrepareInternal 。
1,调用MarkEndPrepare 将kTypeBeginPrepareXID 写到原本kTypeNoop 的位置,然后放入kTypeEndPrepareXID 和本次Txn 的Xid。此时的WriteBatch中的数据可能如下所示:
Sequence(0);NumRecords();BeginPrepare();Put(a,1);Get(a);EndPrepare(XID);
2,调用WriteImpl 将WriteBatch的内容写入到 WAL 中。
PessimisticTransaction::Commit
Status PessimisticTransaction::Commit() {
s = CommitInternal();
}
Status WriteCommittedTxn::CommitInternal() {
// We take the commit-time batch and append the Commit marker.
WriteBatch* working_batch = GetCommitTimeWriteBatch();
// push back kTypeCommitXID marker and xid
WriteBatchInternal::MarkCommit(working_batch, name_);
// any operations appended to this working_batch will be ignored from WAL
working_batch->MarkWalTerminationPoint();
// append WriteBatch content into new working batch. WriteBatch content
// will not append to WAL since MarkWalTerminationPoint is invoked
WriteBatchInternal::Append(working_batch,GetWriteBatch()->GetWriteBatch());
// write
auto s = db_impl_->WriteImpl(write_options_, working_batch,
nullptr, nullptr, log_number_);
return s;
}
1,新建一个WriteBatch,在新的WriteBatch 中写入Commit 标记和Xid。
2,将准备写入WAL的位置标记在Commit之前,这样只有Commit 标记跟Xid 写入WAL。
3,将之前保存数据的WriteBatch 追加到新建的WriteBatch上。
4,调用WriteImpl 将Commit 标记跟Xid 写入WAL,将数据写入Memtable。
新建的WriteBatch可能如下所示:
Sequence(0);NumRecords();Commit(xid);BeginPrepare();Put(a,1);Get(a);EndPrepare(XID);
Recovery From WAL
恢复阶段主要是将WAL中的已经Commit的日志加载到Memtable中,之后Memtable 的恢复主要调用WriteBatchInternal::InsertInto 接口来完成。
Status WriteBatchInternal::InsertInto(
const WriteBatch* batch, ColumnFamilyMemTables* memtables,
...) {
MemTableInserter inserter(Sequence(batch), memtables,...);
Status s = batch->Iterate(&inserter);
return s;
}
Status WriteBatch::Iterate(Handler* handler) const {
while() {
switch (tag) {
case kTypeColumnFamilyValue:
handler->PutCF(column_family, key, value);
break;
case kTypeBeginPrepareXID:
handler->MarkBeginPrepare();
break;
case kTypeEndPrepareXID:
handler->MarkEndPrepare(xid);
break;
case kTypeCommitXID:
handler->MarkCommit(xid);
break;
.......
}
}
}
通过对WriteBatch::Iterate 的调用,对WriteBatch 进行命令的遍历处理,例如正常的写入命令会调用handler->PutCF接口,这里调用的是MemTableInserter 的实现。这里主要关注2PC 相关的数据恢复。
class MemTableInserter : public WriteBatch::Handler {
Status PutCFImpl(uint32_t column_family_id, const Slice& key,
const Slice& value, ValueType value_type) {
WriteBatchInternal::Put(rebuilding_trx_, column_family_id, key, value);
}
Status MarkBeginPrepare() override {
rebuilding_trx_ = new WriteBatch();
}
Status MarkEndPrepare(const Slice& name) override {
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_, rebuilding_trx_seq_);
}
Status MarkCommit(const Slice& name) override {
RecoveredTransaction* trx =
db_->GetRecoveredTransaction(name.ToString());
s = trx->batch_->Iterate(this);
}
}
1,BeginPrepare 标记处理,将新建一个用于后续Commit标记处理的WriteBatch。
2,Put 标记处理,将Put 标记压到WriteBatch 中。
3,EndPrepare 标记处理,将Txn记录到全局db 中,由于EndPrepare 跟Commit标记中间可能会有其他标记,例如单独对db进行若干次的读写操作。所以将此次WriteBatch 写入Txn,遇到Commit 标记再从全局db 中查找出来进行Commit 标记处理。
4,Commit 标记处理,从db 中找到步骤 3 写入的Txn,调用WriteBach的Iterate,对WriteBatch 的每一条读写标记进行处理。
Summary
对于Rocksdb来说只是提供2PC 所需要的接口,对于上层如何使用实现2PC的细节完全不是Rocksdb该关心的事情。这里Rocksdb 把写WAL 跟写Memtable 流程通过Prepare 跟Commit 两个命令分别实现,这样可以保证Commit写入WAL后,这个Txn一定可以持久化,即使在写入Memtable当中发生断电,也可以从WAL中进行恢复。但是,抛开Rocksdb的实现来看,2PC 协议本身有许多诟病,有兴趣的读者可以自行查找。
Reference
RocksdbWiki: Two Phase Commit Implementation