介绍

在本实验中,您将在 SimpleDB 中实现一个简单的基于锁的事务系统。您需要在代码的适当位置添加锁和解锁调用,并添加代码来跟踪每个事务持有的锁,并在需要时向事务授予锁。

本文档的其余部分将介绍添加事务支持所涉及的内容,并提供如何在数据库中添加该支持的基本概要。

与前一个实验室一样,我们建议您尽早开始。锁定和事务的调试可能相当棘手!

概念

在开始之前,您应该确保了解什么是事务,以及严格的两阶段锁(您将使用它来确保事务的隔离性和原子性)是如何工作的。

在本节的其余部分,我们将简要概述这些概念,并讨论它们与 SimpleDB 的关系。

Transactions

事务是一组以原子方式执行的数据库操作(如插入、删除和读取),也就是说,要么所有操作都完成,要么一个操作都没完成,而且数据库外部观察者不会发现这些操作不是作为一个不可分割的单一操作的一部分完成的。

ACID

为了帮助您了解事务管理在 SimpleDB 中的工作原理,我们将简要回顾它是如何确保满足 ACID 属性的:

  • 原子性:严格的两阶段锁定和谨慎的缓冲区管理可确保原子性。
  • 一致性:由于原子性,数据库具有事务一致性。其他一致性问题(如键约束)在 SimpleDB 中未涉及。
  • 隔离性:严格的两阶段锁提供了隔离性。
  • 持久性:FORCE 缓冲区管理策略可确保持久性(参见下文第 2.3 节)。

Recovery and Buffer Management

为了简化您的工作,我们建议您执行NO STEAL/FORCE缓冲区管理政策。

正如我们在课堂上所讨论的,这意味着:

  • 你不应该将缓冲池中的脏(已更新)的页面驱逐,如果他们被未提交的事务锁定。(这是NO STEAL)
  • 在事务提交时,你应当强制将脏页面转到磁盘,比如将页面写出(这是FORCE)

为了进一步简化你的工作,你可以认为SimpleDB将不会崩溃当处理transactionComplete命令时。请注意这三点意味着你不需要实现基于日志的恢复机制在本Lab中,既然你永远不需要撤销任何工作(你不会驱逐脏页面)并且你永远不需要重做任何工作(你会在提交时强制更新而且并不会在提交过程中崩溃)

任务

Granting Locks

您需要向SimpleDB添加调用(例如,在BufferPool中),允许调用者代表特定事务请求或释放对特定对象的(共享或排他)锁。

我们建议以page为粒度进行锁定;出于测试简化的原因,请不要实现表级锁定(尽管这是可能的)。本文档的其余部分和我们的单元测试假定是page级锁定。

您将需要创建数据结构来跟踪每个事务持有的锁,并在请求时检查是否应授予事务锁。

您将需要实现共享锁和互斥锁;请注意,它们的工作方式如下:

  • 在事务读取对象之前,必须对其具有共享锁。
  • 在事务写入对象之前,必须对其具有互斥锁。
  • 多个事务可以对一个对象持有共享锁。
  • 只有一个事务可以对一个对象持有互斥锁。
  • 如果事务t是唯一持有对象o上的共享锁的事务,则t可以将其在o上的锁升级为互斥锁。

如果事务请求一个不能立即授予的锁,您的代码应该阻塞,等待该锁变得可用(即由在不同线程中运行的另一个事务释放)。在锁的实现中小心竞态条件 — 考虑并发调用锁可能如何影响行为。(您可能希望阅读有关Java中的同步的资料)。


Exercise 1

编写在BufferPool中获取和释放锁的方法。假设使用的是页面级锁定,则需要完成以下工作:

  • 修改 getPage() 方法,在返回页面之前阻塞并获取所需的锁。
  • 实现 unsafeReleasePage() 方法。该方法主要用于测试和事务结束时。
  • 实现 holdsLock() 方法,以便 Exercise 2 中的逻辑可以确定页面是否已被事务锁定。

您可能会发现定义一个 LockManager 类有助于维护有关事务和锁的状态,但设计决策由您决定。 在您的代码通过 LockingTest 单元测试之前,您可能需要实现下一个练习。


Lock Lifetime

你需要实现严格的两阶段(Strick 2PL)锁定。这意味着在访问对象之前,事务应该在该对象上获取适当类型的锁,并且在事务提交之前不应该释放任何锁。

幸运的是,SimpleDB 的设计使得在 BufferPool.getPage() 中可以在读取或修改页面之前获取页面上的锁定。因此,我们建议在 getPage() 中获取锁,而不是在每个操作符中添加锁定例程的调用。根据你的实现,可能你不必在其他地方获取锁。这取决于你来验证!

在读取任何页面(或元组)之前,你需要获取共享锁,并在写入任何页面(或元组)之前,你需要获取独占锁。你会注意到在 BufferPool 中我们已经传递了 Permissions 对象;这些对象指示调用方希望对所访问的对象获得的锁的类型(我们已经为 Permissions 类提供了代码)。

注意,HeapFile.insertTuple()HeapFile.deleteTuple() 的实现,以及由 HeapFile.iterator() 返回的迭代器的实现应该使用 BufferPool.getPage() 访问页面。仔细检查getPage() 的这些不同用法是否传递了正确的权限对象(例如,Permissions.READ_WRITEPermissions.READ_ONLY)。你可能还希望仔细检查 BufferPool.insertTuple() 和 BufferPool.deleteTupe() 的实现,确保它们在访问的任何页面上调用 markDirty()(在实现此代码时,你应该已经这样做了,但我们没有测试这种情况)。

在获取了锁之后,你需要考虑何时释放它们。显然,你应该在事务提交或中止后释放与事务关联的所有锁,以确保严格的两阶段锁定。然而,在事务结束之前释放锁可能会在其他情况下也是有用的。例如,你可能在扫描页面以查找空槽后释放对页面的共享锁(如下所述)。


Exercise 2

确保在整个SimpleDB中获取和释放锁。有一些(但不一定是全部)你应该验证是否正常工作的操作包括:

  • 在 SeqScan 过程中从页面上读取元组(如果你在 BufferPool.getPage() 中实现了锁定,只要你的 HeapFile.iterator() 使用 BufferPool.getPage(),这应该可以正确工作)。
  • 通过 BufferPool 和 HeapFile 方法插入和删除元组(如果你在 BufferPool.getPage() 中实现了锁定,只要 HeapFile.insertTuple()HeapFile.deleteTuple() 使用 BufferPool.getPage(),这应该可以正确工作)。

你还需要特别考虑在以下情况下获取和释放锁:

  • HeapFile 添加新页面。何时将页面实际写入磁盘?是否存在与其他事务(在其他线程上)的竞争条件,可能需要在 HeapFile 层面上特别关注,而不考虑页面级别的锁定?
  • 寻找可以插入元组的空槽。大多数实现会扫描页面以查找空槽,并且将需要使用 READ_ONLY 锁来执行此操作。然而,令人惊讶的是,如果事务 t 在页面 p 上找不到空槽,t 可能会立即释放对 p 的锁。尽管这显然违反了两阶段锁定的规则,但这是可以接受的,因为 t 没有使用页面的任何数据,因此并发事务 t’ 更新 p 不能影响 t 的答案或结果。

到目前为止,你的代码应该能够通过LockingTest中的单元测试。


Implementing NO STEAL

事务的修改只有在提交后才会被写入磁盘。这意味着我们可以通过丢弃脏页并从磁盘重新读取它们来中止事务。因此,我们不能淘汰脏页。这种策略称为“NO STEAL”。

你需要修改BufferPool中的evictPage方法。特别是,它绝不能淘汰脏页。如果你的淘汰策略更喜欢淘汰脏页,你将不得不找到一种淘汰备用页面的方法。在缓冲池中的所有页面都是脏的情况下,你应该抛出一个DbException。如果你的淘汰策略淘汰了一个干净的页面,请注意事务可能已经持有对被淘汰页面的任何锁,并在实现中适当地处理它们。

Exercise3

BufferPoolevictPage 方法中实现必要的页面驱逐逻辑,而不驱逐脏页面。


Transactions

在SimpleDB中,每个查询开始时都会创建一个TransactionId对象。这个对象被传递给参与查询的每个操作符。当查询完成时,会调用BufferPool方法transactionComplete

调用这个方法将根据参数flag commit提交或中止事务。在执行过程中,运算符可以在任何时候抛出TransactionAbortedException异常,表示发生了内部错误或死锁。我们为您提供的测试用例会创建适当的TransactionId对象,以适当的方式将它们传递给您的运算符,并在查询完成时调用transactionComplete。我们还实现了TransactionId


Exercise 4

BufferPool中实现transactionComplete()方法。请注意,有两个版本的transactionComplete,一个接受额外的布尔型参数commit,另一个不接受。没有额外参数的版本应该始终提交,因此可以通过调用transactionComplete(tid, true)来简单实现。

  • 在提交时,你应该将与事务关联的脏页刷新到磁盘。
  • 在中止时,你应该通过将页面恢复到其磁盘状态来撤销事务所做的任何更改。

无论事务是提交还是中止,你还应该释放BufferPool保留的有关事务的任何状态,包括释放事务持有的任何锁。

在此时,你的代码应该能够通过TransactionTest单元测试和AbortEvictionTest系统测试。你可能会发现TransactionTest系统测试很有启发性,但在完成下一个练习之前,它可能会失败。


Deadlocks and Aborts

在SimpleDB中,事务发生死锁是可能的(如果你不了解为什么,我们建议阅读Ramakrishnan&Gehrke中关于死锁的内容)。你需要检测这种情况并抛出TransactionAbortedException异常。

有许多可能的方法来检测死锁。一个初步的例子是实现一个简单的超时策略,如果事务在一定时间内没有完成,则中止它。对于一个真实的解决方案,你可以在依赖图数据结构中实现循环检测,就像在讲座中所示。在这种方案中,你会定期或每当尝试授予新锁时检查依赖图中是否存在循环,并在存在循环时中止某些事务。在检测到死锁存在后,你必须决定如何改善情况。假设在事务t等待锁时检测到死锁。如果你愿意放弃一切,你可能会中止t正在等待的所有事务;这可能导致大量工作被撤销,但你可以保证t将取得进展。或者,你可能决定中止t,以便让其他事务有机会取得进展。这意味着最终用户将不得不重试事务t。

另一种方法是使用事务的全局顺序来避免构建等待图。出于性能原因,有时这种方法更受欢迎,但在该方案下,可能会错误地中止本可以成功的事务。其中一些例子包括WAIT-DIEWOUND-WAIT方案。


Exercise 5

src/simpledb/BufferPool.java中实现死锁检测或预防。关于死锁处理系统,你有许多设计决策的选择,但不一定需要做得非常复杂。我们期望你的设计比每个事务的简单超时要好。一个良好的起点是在每个锁请求之前实现在等待图中的循环检测,对于这样的实现,你将获得全部学分。请在实验报告中描述你的选择,并列出与其他选择相比的优缺点。

你的代码应该能够在发生死锁时通过抛出 TransactionAbortedException 来正确中止事务。这个异常会被执行事务的代码捕获(例如,TransactionTest.java),该代码应该调用 transactionComplete() 来在事务中进行清理。你不需要自动重新启动由于死锁而失败的事务 - 你可以假设更高级别的代码将处理这个问题。

我们在 test/simpledb/DeadlockTest.java 中提供了一些(不太单元的)测试。它们实际上有点复杂,所以可能需要一些时间来运行(取决于你的策略)。如果它们似乎无限期地挂起,那么你可能有一个未解决的死锁。这些测试构建了简单的死锁情况,你的代码应该能够成功解除死锁。

请注意,DeadLockTest.java 文件顶部有两个时间参数;它们确定测试检查锁是否被获取的频率以及中止事务重新启动之前的等待时间。如果你使用基于超时的检测方法,可以通过调整这些参数来观察不同的性能特征。测试将在控制台上输出与已解决的死锁相对应的 TransactionAbortedException

你的代码现在应该能够通过 TransactionTest 系统测试(根据你的实现可能会运行相当长的时间)。

在这一点上,你应该拥有一个可恢复的数据库,即如果数据库系统崩溃(除了 transactionComplete() 外的任何时候)或者用户明确中止一个事务,任何正在运行的事务的效果在系统重新启动后(或事务中止后)将不可见。你可以通过运行一些事务并显式关闭数据库服务器来验证这一点。

实现

本实验需要实现数据库的并发控制。
锁的类型:实验只需要实现共享锁X和排他锁S
锁的粒度:实验测试是基于page级别的,因此实现基于page的锁即可。

为了解耦,我们创建一个LockManager类来负责管理和分配所有的锁。需要注意LockManager应该设计成数据库级别还是数据库服务器级别。

在MySQL中,不同的数据库共享同一个BufferPool,因此BufferPool是数据库服务器级别的。而LockManager则是每个数据库单独的,这是因为可能不同数据库的隔离级别有差异。在本实验中,只涉及一个数据库,因此LockManager应当被设计为单例的,理应把LockManager实例放在Database类中,通过类似获取BufferPool的方式Database.getBufferPool()来获取LockManager实例。但由于测试代码调用resetBufferPool来重置BufferPool的时候不会重置LockManager,因此会导致构建测试数据@Before代码的锁无法释放,在后续测试中无法获得锁,导致无法通过测试。因此我们这里把LockManager作为BufferPool的成员变量,这样在重置BufferPool时,也能够通过构造函数来重置LockManager来释放@Before时获取的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// forget about locks associated to tid, so they don't conflict with  
// test cases
bp.getPage(tid, p0, Permissions.READ_WRITE).markDirty(true, tid);
bp.getPage(tid, p1, Permissions.READ_WRITE).markDirty(true, tid);
bp.getPage(tid, p2, Permissions.READ_WRITE).markDirty(true, tid);
bp.flushAllPages();
bp = Database.resetBufferPool(BufferPool.DEFAULT_PAGES);


public static BufferPool resetBufferPool(int pages) {
java.lang.reflect.Field bufferPoolF=null;
try {
bufferPoolF = Database.class.getDeclaredField("_bufferpool");
bufferPoolF.setAccessible(true);
bufferPoolF.set(_instance.get(), new BufferPool(pages));
} catch (NoSuchFieldException | IllegalAccessException | IllegalArgumentException | SecurityException e) {
e.printStackTrace();
}
// _instance._bufferpool = new BufferPool(pages);
return _instance.get()._bufferpool;
}

PageLock实现

PageLock只被用在LockManager内部,因此可以设计成内部私有类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
private class PageLock{  
private LockType type;
private PageId pageId;
private TransactionId tid;

public PageLock(LockType type, PageId pageId, TransactionId tid) {
this.type = type;
this.pageId = pageId;
this.tid = tid;
}

public void updateToXLock() {
this.type = LockType.XLOCK;
}

public LockType getType() {
return type;
}

public PageId getPageId() {
return pageId;
}

public TransactionId getTid() {
return tid;
}

@Override
public int hashCode() {
return Objects.hash(tid, pageId);
}

@Override
public boolean equals(Object obj) {
if (this == obj){
return true;
}
if (!(obj instanceof PageLock)){
return false;
}
PageLock p1 = (PageLock) obj;
return this.pageId.equals(p1.pageId) && this.tid.equals(p1.tid);
}
}

注意PageLock重写hashCode()equals()时,不可以涉及到LockType属性。因为共享锁可能会升级为排他锁导致LockType发生改变,因此它的hashCode也会随之改变,和它被最初addhashSet时的hashCode不同,导致之后无法通过set.remove()方法删除该锁,从而释放锁。具体原理请参照Set的源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** Constants used for LockType */  
public enum LockType implements Serializable {
XLOCK,SLOCK;


@Override
public String toString() {
if (this == XLOCK)
return "XLOCK";
if (this == SLOCK)
return "SLOCK";
throw new IllegalStateException("impossible to reach here");
}


}

LockManager实现

首先需要明确锁-页面-事务之间的关系。一个事务可以持有多个锁,一个页面也可以被多个锁锁定(共享锁)。因此事务和锁是一对多关系。页面和锁也是一对多关系。那么维护两个映射:

  • TransactionId->Set<PageLock>:事务和事务持有的锁集合。
  • PageId->Set<PageLock>:页面和作用于该页面上的锁集合。

主要需要实现如下方法:

  • 授予锁:某事务需要获取某页面上的锁。
    1. 先判断该事务是否已经有该页面上的锁
      1. 有,则判断锁类型是否符合
        1. 不符合,尝试升级为排他锁
        2. 符合,返回true
      2. 无,则尝试获取锁,判断该页面上是否被其他锁锁定。
        1. 被排他锁锁定,返回false
        2. 被共享锁锁定,判断获取锁类型
        3. 无锁,返回true
  • 释放特定事务持有的锁:当事务提交后,需要释放该事务持有的所有锁。通过TransactionId->Set<PageLock>的映射,获取所有该事务申请的锁,从而释放。这就是2PL阶段的第二阶段。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    public class LockManager {  
    /**
    * PageId -> set of pageLocks */private final ConcurrentHashMap<PageId, HashSet<PageLock>> pid2Locks;
    /**
    * TransactionId -> set of pageLocks,i.e All locks hold by one transaction */private final ConcurrentHashMap<TransactionId,HashSet<PageLock>> tid2Locks;

    public LockManager() {
    pid2Locks = new ConcurrentHashMap<>();
    tid2Locks = new ConcurrentHashMap<>();
    }

    /**
    * Release lock on the specific page * @param pageId pid of page
    */ public synchronized void releaseLock(PageId pageId,TransactionId tid){
    PageLock lock = getLockByPidAndTid(tid, pageId);
    if (lock!=null){
    HashSet<PageLock> pageLocks = pid2Locks.get(pageId);
    HashSet<PageLock> tidLocks = tid2Locks.get(tid);
    pageLocks.remove(lock);
    tidLocks.remove(lock);
    if (pageLocks.isEmpty()){
    pid2Locks.remove(pageId);
    }
    if (tidLocks.isEmpty()){
    tid2Locks.remove(tid);
    }
    this.notifyAll();
    }
    }

    /**
    * Release all lock hold by the transaction * @param tid tid of transaction
    */ public synchronized void releaseByTid(TransactionId tid){
    if (tid2Locks.containsKey(tid)){
    HashSet<PageLock> tidLocks = tid2Locks.get(tid);
    for (PageLock lock : tidLocks) {
    PageId pageId = lock.getPageId();
    if (pid2Locks.containsKey(pageId)){
    HashSet<PageLock> pageLocks = pid2Locks.get(pageId);
    pageLocks.remove(lock);
    if (pageLocks.isEmpty()){
    pid2Locks.remove(pageId);
    }
    }
    }
    this.notifyAll();
    tid2Locks.remove(tid);
    }
    }

    public synchronized boolean grantLock(TransactionId tid, PageId pageId, Permissions perm, int retry) throws InterruptedException{
    if (retry == 3){
    return false;
    }
    //holds the lock originally
    PageLock lock = getLockByPidAndTid(tid, pageId);
    if (lock!=null){
    if (perm == Permissions.READ_ONLY){
    return true;
    }else if (perm == Permissions.READ_WRITE){
    if (lock.getType().equals(LockType.XLOCK)){
    return true;
    }else if (lock.getType().equals(LockType.SLOCK)){
    // try to update lock
    HashSet<PageLock> locksByPid = pid2Locks.get(pageId);
    if (locksByPid!=null&&locksByPid.size()==1){
    // update to XLock
    lock.updateToXLock();
    return true; }
    wait(50);
    return grantLock(tid,pageId,perm,retry+1);
    }
    }
    }
    //try to acquire a lock
    else{
    if (canLockPage(pageId,perm)){
    PageLock pageLock = new PageLock(perm == Permissions.READ_ONLY ? LockType.SLOCK : LockType.XLOCK, pageId, tid);
    HashSet<PageLock> pageLockSet = pid2Locks.getOrDefault(pageId, new HashSet<>());
    HashSet<PageLock> tidLockSet = tid2Locks.getOrDefault(tid, new HashSet<>());
    pageLockSet.add(pageLock);
    tidLockSet.add(pageLock);
    pid2Locks.put(pageId,pageLockSet);
    tid2Locks.put(tid,tidLockSet);
    return true; }else{
    wait(50);
    return grantLock(tid,pageId,perm,retry+1);
    }
    }
    return false;
    }

    private synchronized PageLock getLockByPidAndTid(TransactionId tid,PageId pageId){
    if (this.tid2Locks.containsKey(tid)){
    HashSet<PageLock> locks = this.tid2Locks.get(tid);
    for (PageLock pageLock : locks) {
    if (pageLock.getPageId().equals(pageId)){
    return pageLock;
    }
    }
    }
    return null;
    }

    private synchronized boolean canLockPage(PageId pageId,Permissions perm){
    if (this.pid2Locks.containsKey(pageId)){
    HashSet<PageLock> pageLocks = this.pid2Locks.get(pageId);
    if (!pageLocks.isEmpty() && perm == Permissions.READ_WRITE){
    return false;
    }
    for (PageLock pageLock : pageLocks) {
    if (pageLock.type.equals(LockType.XLOCK)){
    return false;
    }
    }
    if (perm == Permissions.READ_ONLY){
    return true;
    }
    }
    return true;
    }
    public synchronized boolean holdsLock(TransactionId tid,PageId pageId){
    return getLockByPidAndTid(tid,pageId) != null;
    }
    }