Concurrent Linked List Introduction and Implementation

Fork me on GitHub


在具体实现之前, 现解释一下在Concurrent Programming 中需要用到的一些概念。

RMW 是指在一个操作中读取memory 的值,然后写入新的值的原子操作(Atomic)。这些指令能防止在多线程编程中出现竞争。

CAS 是上面的RMW指令中的一种,具体就是要修改一个memory 地址的值时,先用当前的值和之前写入的值进行比较,如果一致,则写入新值。否则写入失败。由于 CAS 是原子操作,所以在多线程中可以保证同时只被一个线程修改。


bool cas(int *ptr, int oldVal, int newVal) {
    if( *ptr == oldVal ) {
        *ptr = newVal;
        return true;
    return false;

对于x86gcc, 提供了以下builtin的原子调用:

  • Sequential Consistency

Sequential Consistency 是指对memory 的读写操作的顺序在所有线程中都是和程序代码中一致的。具体的理解就是在所有线程都运行在单核,并且程序是在关掉编译器优化功能后编译的。这样对于CPU,所有的memory 读写访问操作都是确定的。

  • Memory Order

Memory Order 是指执行指令时LoadStore的顺序。影响memory 的读写指令顺序因素有两个 :

  1. compiler reordering : 指compiler 在编译优化过程中不改变程序正确性前提下对LoadStore指令执行顺序进行的调整。


inline void MemoryBarrier() {
  // prevent compiler reordering
  __asm__ __volatile__("" : : : "memory");
  1. processor reordering : 指CPU本身的Out-of-Order功能在执行过程中对Load/Store指令的调整。


inline void MemoryBarrier() {
  // prevent compiler && cpu processor reordering
  __asm__ __volatile__("mfence" : : : "memory");
  • Memory Model

Memory Model 是指能够执行Out-of-OrderCPU能够对LoadStore指令进行Reorder的方式。不同的CPU对应各自的Memory Order, 根据能够Reorder的总类, 一般可以分为Strong Memory OrderRelaxed Memory Order。 一般的, X86系列的是Strong Memory Order, 而ARM系列的则属于Relaxed Memory Order。 具体可见下表:

Reordering Activity x86 and x64 ARM
Reads moving ahead of reads No Yes
Writes moving ahead of writes No Yes
Writes moving ahead of reads No Yes
Reads moving ahead of writes Yes Yes

上面提到的 CAS 中我们读取一个值并与其之前值进行比较, 当两者一致时,我们认为在这之间状态没有变化。但是,其他线程可能已经对数据进行一系列操作,致使最后退出时将该值修改为之前的值。 这种出现在 Lock-Free 结构里的现象,就被称为 ABA Problem

具体可以看下面 linklist head->NodeA->NodeB->NodeC的例子:

  1. Thread 1memory 里读取 NodeA值, 然后 Thread 1 被调度出,Thread 2 开始执行
  2. Thread 2 删除NodeANodeB, 然后插入NodeA, 变成head->NodeA->NodeC
  3. Thread 1 继续执行, 删除NodeA, 将head->next = NodeB, 这是就会出现访问未分配的地址

ABA 问题可以对使用指针地址加上tag bit来避免。 对于x86来说, 指针是4 byte, 所以后2 bit可以用来作为tag bit

  • Read AcquireWrite Release Barrier

Read AcquireWrite Release Barrier是 Lock-Free 里经常提到的概念, 具体的:

  • Read Acquire Barrier : 指该Barrier 之后的同一个线程的Load/Store 指令都在之后执行

不管是在lock-based 还是lock-free 代码中, 当我们需要对memory 进行操作时, 一般的都会去测试一个lock或者一个pointer来判断能否进行后面的操作。所以后面操作能否进行依赖于这个测试操作,即必须保证测试操作在之前完成, 而不能被reorder。所以称之为 Read Acquire

  • Write Release Barrier : 指该Barrier 之前的同一个线程的Load/Store 指令都在之前执行

同样的, 当我们对memory 执行操作结束之后,通常都要对memory 进行写操作来通知其他线程当前memory 可用。 因此只能在当前成功完成对memory 操作之后才能执行该写操作。所以称之为 Write Release

上面 Memory Order 提到的MemoryBarrier()可以用来作为阻止Load/Store Reordering 的指令。

Generic Linked List

为了更清楚区别于支持并发的编程思想, 我们先不考虑多线程, 先实现一个基本的linklist数据结构, 要求基本接口和C++里的list一致:

  • bool empty()可以用来判断list是否为空
  • size_t size()可以返回list的大小
  • T& front()可以得到list第一个值
  • T& back()可以返回list最后一个值
  • void pop_front()删除list第一个值
  • void push_back(const T& val)list尾部插入一个值
  • void remove(const T& val)删掉list里所有val


// generic linklist without multi-thread support
template<typename T>
class linklist {
    // empty-argument constructor
    explicit linklist() : head_(nullptr), tail_(nullptr) { }

    // return whether the list is empty
    bool empty() const {
        return size() == 0;
    // return the size of list
    std::size_t size() const {
        std::size_t size = 0;
        node* p = head_;
        while( p ) {
            p = p->next_;
        return size;

    // return the front element
    T& front() {
        assert( head_ != nullptr );
        return head_->val_;

    T& back() {
        assert( tail_ != nullptr );
        assert( tail_->next_ == nullptr );
        return tail_->val_;

    // pop the front element
    void pop_front() {
        assert( head_ );
        head_ = head_->next_;
        if( head_ == nullptr )
            tail_ = head_;

    // push back element
    void push_back(const T& val) {
        node* n = new node(val, nullptr);
        if( !tail_ ) {
            assert( !head_ );
            head_ = tail_ = n;
        } else {
            tail_->next_ = n;
            tail_ = tail_->next_;

    void remove(const T& val) {
        node* prev = nullptr;
        node* p = head_;
        node** pp = &head_;
        while( p ) {
            if( p->val_ == val ) {
                *pp = p->next_;
                p = *pp;
                if( !*pp )
                    tail_ = prev;
            } else {
                prev = p;
                pp = &p->next_;
                p = p->next_;
    // node class definition
    struct node;
    struct node {
        T val_;
        node *next_;
        // constructor
        node() : val_(), next_(nullptr) {}
        node(const T& value, node* next)
            : val_(value), next_(next) {}
        // non-copyable
        node(const node&) = delete;
        node(node &&) = delete;
        node &operator=(const node&) = delete;
    node* head_;
    node* tail_;

Linked List using Lock

上面我们实现了一个基本结构的linklist,但是上面结构不支持多线。多线程编程中常见的方式是对操作进行加锁。对于linklist  ,我们有两种加锁方式:

  • linklist整体加锁, 在进行任何操作之前加锁, 操作结束释放锁  
  • node进行加锁,只针对操作的node, 可以更好控制锁的细粒度  


global locked linked list

实现比较简单,我们使用C++11mutexunique_lock来对list进行加锁操作,具体就是在每一个操作之前, 获得全局锁, 阻止其他线程对list进行操作。

// global lock linklist implementation
// for this linklist, before every operation
// should obtain a unique_lock
// with c++11 unique_lock, we needn't manage the lock
// by ourselfves
template<typename T>
class linklist_glock {
    // empty-argument constructor
    explicit linklist_glock() : head_(nullptr), tail_(nullptr), mutex_() { }

    // return whether the list is empty
    bool empty() const {
        return size() == 0;
    // return the size of list
    std::size_t size() const {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        std::size_t size = 0;
        node* p = head_;
        while( p ) {
            p = p->next_;
        return size;

    // return the front element
    T& front() {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        assert( head_ != nullptr );
        return head_->val_;

    T& back() {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        assert( tail_ != nullptr );
        assert( tail_->next_ == nullptr );
        return tail_->val_;

    // pop the front element
    void pop_front() {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        assert( head_ );
        head_ = head_->next_;
        if( head_ == nullptr )
            tail_ = head_;

    // pop the front element and return the pair
    std::pair<bool,T> try_pop_front() {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        //assert( head_ );
        if( head_ == nullptr )
            return std::make_pair(false,T());
        T v = head_->val_;
        head_ = head_->next_;
        if( head_ == nullptr )
            tail_ = head_;
        return std::make_pair(true, v);

    // push back element
    void push_back(const T& val) {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        node* n = new node(val, nullptr);
        if( !tail_ ) {
            assert( !head_ );
            head_ = tail_ = n;
        } else {
            tail_->next_ = n;
            tail_ = tail_->next_;

    void remove(const T& val) {
        // first obtain the unique_lock
        unique_lock lock(mutex_);
        node* prev = nullptr;
        node* p = head_;
        node** pp = &head_;
        while( p ) {
            if( p->val_ == val ) {
                *pp = p->next_;
                p = *pp;
                if( !*pp )
                    tail_ = prev;
            } else {
                prev = p;
                pp = &p->next_;
                p = p->next_;
    // typedef unique_lock
    typedef std::unique_lock<std::mutex> unique_lock;
    // mutex definition
    mutable std::mutex mutex_;
    // node class definition
    struct node;
    struct node {
        T val_;
        node *next_;
        // constructor
        node() : val_(), next_(nullptr) {}
        node(const T& value, node* next)
            : val_(value), next_(next) {}
        // non-copyable
        node(const node&) = delete;
        node(node &&) = delete;
        node &operator=(const node&) = delete;
    node* head_;
    node* tail_;

node locked linked list

和上面的全局锁不同, list里每一个node都有一个锁, 只针对操作的node进行加锁。

// per-node lock linklist implementation
// this implementation add lock for the node which is different
// from the glock lock
template<typename T>
class linklist_nodelock {
    // empty-argument constructor
    explicit linklist_nodelock() : head_(new node()), tail_(nullptr) { }

    // return whether the list is empty
    bool empty() const {
        return size() == 0;
    // return the size of list
    std::size_t size() const {
        std::size_t size = 0;
        // first obtain the unique_lock of head_
        node* prev = head_;
        node* p = head_->next_;
        while( p ) {

            // if we don't lock current node
            // it may be deleted after we unlock prev
            // node

            prev = p;
            p = p->next_;
        return size;

    // return the front element
    T& front() {
        // first obtain the unique_lock of head_
        unique_lock lock(head_->mutex_);
        node* p = head_->next_;
        assert( p != nullptr );
        return p->val_;

    T& back() {
        // first obtain the unique_lock of tail_
        unique_lock lock(tail_->mutex_);
        assert( tail_ != nullptr );
        assert( tail_->next_ == nullptr );
        return tail_->val_;

    // pop the front element
    void pop_front() {
        // first obtain the unique_lock of head_
        unique_lock lock(head_->mutex_);
        node* p = head_->next_;
        assert( p );

        // lock current node
        unique_lock lock0(p->mutex_);
        bool isTail = !p->next_;

        if( isTail ) {

            assert( head_->next_ == p );
            if( p->next_ ) {
                // no longer tail, retry
                return pop_front();
            assert( tail_ == p );
        head_->next_ = p->next_;
        if( isTail ) {
            tail_ = head_->next_;

    // pop the front element and return the pair
    std::pair<bool,T> try_pop_front() {
        // first obtain the unique_lock
        unique_lock lock(head_->mutex_);
        node* p = head_->next_;
        //assert( head_ );
        if( p == nullptr )
            return std::make_pair(false,T());
        unique_lock lock0(p->mutex_);

        T v = p->val_;
        bool isTail = !p->next_;
        if( isTail ) {
            assert(head_->next_ == p);
            if( p->next_ ) {
                // no longer tail, retry
                return try_pop_front();
            assert( tail_ == p );

        head_->next_ = p->next_;
        if( isTail )
            tail_ = head_;
        return std::make_pair(true, v);

    // push back element
    void push_back(const T& val) {
        // first obtain the unique_lock
        unique_lock lock(head_->mutex_);
        node* n = new node(val, nullptr);
        if( !tail_ ) {
            assert( !head_->next_ );
            head_->next_ = tail_ = n;
        } else {
            unique_lock lock(tail_->mutex_);
            tail_->next_ = n;
            tail_ = tail_->next_;

    void remove(const T& val) {
        // first obtain the unique_lock

        node* prev = head_;
        node* p = head_->next_;

        while( p ) {
            if( p->val_ == val ) {
                bool isTail = !p->next_;
                if( isTail ) {
                    assert(tail_ == p);
                prev->next_ = p->next_;
                if( isTail ) {
                    tail_ = prev;
                p = prev->next_;
            } else {
                prev = p;
                p = p->next_;
    // typedef unique_lock
    typedef std::unique_lock<std::mutex> unique_lock;
    // mutex definition
    mutable std::mutex mutex_;
    // node class definition
    struct node;
    struct node {
        // mutex definition
        mutable std::mutex mutex_;
        T val_;
        node *next_;
        // constructor
        node() : val_(), next_(nullptr) {}
        node(const T& value, node* next)
            : val_(value), next_(next) {}
        // non-copyable
        node(const node&) = delete;
        node(node &&) = delete;
        node &operator=(const node&) = delete;
    node* head_; // head is a sentinel node
    node* tail_;

Lock-free Linked List

LOck-free 区别于上述加锁实现的区别是在操作之前不需要加锁操作,同时对于node的修改操作使用下面的原子操作完成:

#define CAS(addr, oldVal, newVal) __sync_bool_compare_and_swap(addr, oldVal, newVal)

为了避免上面提到的 ABA Problem , 下面实现中使用指针最后一位来作为mark bit,当mark bit为1时, 表示当前node要被其他线程删除, 我们需要重新开始当前操作。


template<typename T>
class linklist_lockfree {
    // empty-argument constructor
    explicit linklist_lockfree() : head_(new node()), tail_(nullptr) { }

    // return whether the list is empty
    bool empty() const {
        return size() == 0;
    // return the size of list
    std::size_t size() const {
        std::size_t size = 0;
        node* p = head_->next();
        while( p ) {
            if( !p->isMarked() )
            p = p->next();
        return size;

    // return the front element
    T& front() {
        assert( !head_->isMarked() );
        node* p = head_->next();
        assert( p != nullptr );
        if( p->isMarked() )
            return front();
        T& val = p->val_;
        if( p->isMarked() )
            return front();

        if( !p->next() && tail_ != p )
            tail_ = p;
        return val;

    T& back() {
        assert( !head_->isMarked() );
        assert( tail_ != nullptr );
        if( tail_->next() )
            return back();
        if( tail_->isMarked() )
            return back();
        return tail_->val_;

    // pop the front element
    void pop_front() {

        assert( !head_->isMarked() );
        node* p = head_->next();
        //assert( p );

        if( p == nullptr )
        if( p->isMarked() )
            return pop_front();
        // mark current node
        if( !p->isMarked() )
            return pop_front();

        //node* pp = head_->next();
        if( !CAS(&head_->next_ , p, p->next()) )
            return pop_front();

        CAS(&tail_, p, head_->next());

    // pop the front element and return the pair
    std::pair<bool,T> try_pop_front() {

        assert( !head_->isMarked() );
        node* p = head_->next();
        //assert( p );
        if( p == nullptr )
            return std::make_pair(false,T());
        if( p->isMarked() )
            return try_pop_front();

        if( !p->isMarked() )
            return try_pop_front();
        //node* pp = head_->next();
        if( !CAS(&head_->next_, p, p->next()) )
            return try_pop_front();
        CAS(&tail_, p, head_->next());

        return std::make_pair(true, p->val_);

    // push back element
    void push_back(const T& val) {

        //assert( !head_->isMarked() );

        node* n = new node(val, nullptr);
        if( !tail_ ) {
            if( !head_->next() ) {
                if( !CAS(&head_->next_, tail_, n) )
                    return push_back(val);
                tail_ = head_->next_;
            } else
                return push_back(val);
        } else {
            if( tail_->next() || tail_->isMarked() )
                return push_back(val);
            tail_ = tail_->next();

    void remove(const T& val) {

        node* prev = head_;
        node* p = head_->next();

        while( p ) {
            if( p->val_ == val ) {
                if( !CAS(&prev->next_, p, p->next()) )
                    return remove(val);
                CAS(&tail_, p, head_->next());
                p = prev->next();
            } else {
                prev = p;
                p = p->next();
    // node class definition
    struct node;
    struct node {
        T val_;
        node* next_;
        // constructor
        node() : val_() , next_(nullptr) {}
        node(const T& value,node* p) : val_(value), next_(p) {}
        // return if current node is marked
        inline bool isMarked() {
            return intptr_t(next_) & 0x1;
        inline void mark() {
           next_ = (node*)(intptr_t(next_) | 0x1);
        inline node* next() {
            return (node*)(intptr_t(next_) & ~0x1);
        inline void setNext(node* ptr) {
            next_ = ptr;
        // non-copyable
        node(const node&) = delete;
        node(node &&) = delete;
        node &operator=(const node&) = delete;
    node* head_; // head is a sentinel node
    node* tail_;


  • 针对单线程,分别测试了上述实现的接口函数。
  • 针对多线程,分别使用8个线程同时去测试接口函数


#include "linklist.hpp"
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <functional>
#include <algorithm>

using namespace std;

template<typename list>
void single_threaded_test() {
    list l;

    assert( l.front() == 1 );
    assert( l.back() == 1 );
    assert( l.size() == 1 );

    assert( l.front() == 1 );
    assert( l.back() == 2 );
    assert( l.size() == 2 );

    assert( l.front() == 2 );
    assert( l.back() == 2 );
    assert( l.size() == 1 );

    assert( l.empty() );

    assert( l.front() == 10 );
    assert( l.back() == 10 );
    assert( l.size() == 5 );

    while( !l.empty() ) {
        assert( l.front() != 10 );

template<typename list, typename T>
void getElemsOfList(list &l, vector<T> &res) {
    while( !l.empty() ) {

template<typename T>
void rangeCompare(vector<T>& res, int start, int end) {
    assert( res.size() == end - start );
    for(int i = start; i < end; ++i)
        assert(res[i-start] == i);

inline void nop_pause() {
    __asm__ volatile ("pause" ::);

template<typename list, typename T>
void list_insert(list &l, atomic<bool> &f, T start, T end) {
    while( !f.load() )
    for(T i = start; i < end; ++i)

template<typename list, typename T>
void list_pop_front(list &l, atomic<bool> &f, atomic<bool> &stop, vector<T> &res) {
    while( !f.load() )
    while( true ) {
        auto val = l.try_pop_front();
        if( val.first == false && stop.load() )
        if( val.first )

template<typename list, typename T>
void list_remove(list &l, atomic<bool> &f, T start, T end) {
    while( !f.load() )
    for(T i = start; i < end; ++i)

template<typename list>
void multiple_threaded_test() {
    // try a bunch of concurrent inserts
    // make sure no value lost
        list l;
        const int NUM_ELEMENTS_PER_THREAD = 2000;
        const int NUM_THREADS = 8;
        vector<thread> threads;
        atomic<bool> start_flag(false);
        for(int i = 0; i < NUM_THREADS; ++i) {
            thread t(list_insert<list, int>, ref(l), ref(start_flag),

        for(auto &t : threads)
        vector<int> list_elems;
        getElemsOfList(l, list_elems);
        sort(list_elems.begin(), list_elems.end());
        rangeCompare(list_elems, 0, NUM_ELEMENTS_PER_THREAD * NUM_THREADS);

    // try a bunch of concurrent pop_front
        list l;
        const int NUM_ELEMENTS_PER_THREAD = 2000;
        const int NUM_THREADS = 8;
        for(int i = 0; i < NUM_ELEMENTS_PER_THREAD; ++i)
        vector<thread> threads;
        vector<vector<int> > res;
        atomic<bool> start_flag(false);
        atomic<bool> stop(true);
        for(int i = 0; i < NUM_THREADS; ++i) {
            thread t(list_pop_front<list, int>, ref(l), ref(stop), ref(start_flag),

        for(auto &t : threads)
        assert( l.empty() );
        vector<int> list_elems;
        for(auto &r : res)
            list_elems.insert(list_elems.end(), r.begin(), r.end());
        sort(list_elems.begin(), list_elems.end());
        rangeCompare(list_elems, 0, NUM_ELEMENTS_PER_THREAD);

    // try a bunch of concurrent remove
        list l;
        const int NUM_ELEMENTS_PER_THREAD = 2000;
        const int NUM_THREADS = 8;
        for(int i = 0; i < NUM_THREADS * NUM_ELEMENTS_PER_THREAD; ++i)
        assert( l.size() == NUM_ELEMENTS_PER_THREAD * NUM_THREADS );

        vector<thread> threads;
        atomic<bool> start_flag(false);

        for(int i = 0; i < NUM_THREADS; ++i) {
            thread t(list_remove<list, int>, ref(l), ref(start_flag),
                i * NUM_ELEMENTS_PER_THREAD, (i + 1) * NUM_ELEMENTS_PER_THREAD);

        for(auto &t : threads)
        assert( l.empty() );

    // try remove with push_back
        list l;
        const int NUM_ELEMENTS_PER_THREAD = 2000;
        const int NUM_THREADS = 8;

        for(int i = 0; i < NUM_THREADS * NUM_ELEMENTS_PER_THREAD; ++i)
        assert( l.size() == NUM_ELEMENTS_PER_THREAD * NUM_THREADS );

        vector<thread> threads;
        atomic<bool> start_flag(false);
        // remove first
        for(int i = 0; i < NUM_THREADS; ++i) {
            thread t(list_remove<list, int>, ref(l), ref(start_flag),

        // then push_back
        for(int i = 0; i < NUM_THREADS; ++i) {
            thread t(list_insert<list, int>, ref(l), ref(start_flag),

        for(auto &t : threads)
        vector<int> list_elems;
        getElemsOfList(l, list_elems);
        sort(list_elems.begin(), list_elems.end());

    // try a producer/consumer queue
        list l;

        atomic<bool> start_flag(false);
        atomic<bool> stop(false);

        thread producer(list_insert<list, int>, ref(l), ref(start_flag),0,10000);

        vector<int> res;
        thread consumer(list_pop_front<list, int>, ref(l), ref(start_flag),ref(stop), ref(res));;



        rangeCompare(res, 0, 10000);
        assert( l.empty() );


template<typename Function>
void Test(Function &&f, const string &name) {
    cout << "Test -- " << name << " passed" << endl;

int main(int argc, char **argv) {
    // generic linklist which doesn't support multithread
    Test(single_threaded_test<linklist<int> >, "single-thread-generic-linklist");
    //Test(multiple_threaded_test<linklist<int> >, "multiple-thread-generic-linklist");

    // glock lock linklist
    Test(single_threaded_test<linklist_glock<int> >, "single-thread-linklist-glock");
    Test(multiple_threaded_test<linklist_glock<int> >, "multiple-thread-linklist-glock");
    // per-node lock
    Test(single_threaded_test<linklist_nodelock<int> >, "single-thread-linklist-nodelock");
    Test(multiple_threaded_test<linklist_nodelock<int> >, "multiple-thread-linklist-nodelock");

    // lock free linklist
    Test(single_threaded_test<linklist_lockfree<int> >, "single-thread-linklist-lockfree");
    Test(multiple_threaded_test<linklist_lockfree<int> >, "multiple-thread-linklist-lockfree");
    return 0;


志飞 /
Published under (CC) BY-NC-SA