基于锁的数据结构

上篇笔记介绍了锁的实现大概是怎样的,这篇笔记我们讲解一些基于锁的数据结构。

给某些数据结构可以使得数据结构支持多线程环境下使用,让这些数据结构变得 线程安全(thread-safe)

当然文中也不会记录所有的数据结构,这得要成千上万篇论文的篇幅去讲述,这里还是抛砖引玉。

Concurrent Counter

A Counter Without Locks

typedef struct __counter_t {
    int value;
} counter_t;

void init(counter_t * c) { c->value = 0; }

void increment(counter_t * c) { c->value++; }

int get(counter_t * c) { return c->value; }

A Counter With Locks But Not Scalable

typedef struct __counter_t {
    int value;
    pthread_mutex_t lock;
} counter_t;

void init(counter_t * c) {
    c->value = 0;
    Pthread_mutex_init(&c->lock, NULL);
}

void increment(counter_t * c) {
    Pthread_mutex_lock(&c->lock);
    c->value++;
    Pthread_mutex_unlock(&c->lock);
}

void decrement(counter_t * c) {
    Pthread_mutex_lock(&c->lock);
    c->value--;
    Pthread_mutex_unlock(&c->lock);
}

int get(counter_t * c) {
    Pthread_mutex_lock(&c->lock);
    int rc = c->value;
    Pthread_mutex_unlock(&c->lock);
    return rc;
}

A Counter With Locks and Scalable, Sloppy Counter Implementation

The basic idea of sloppy counting is as follows. When a thread running on a given core wishes to increment the counter, it increments its local counter; access to this local counter is synchronized via the corresponding local lock. Because each CPU has its own local counter, threads across CPUs can update local counters without contention, and thus counter updates are scalable.
 
However, to keep the global counter up to date (in case a thread wishes to read its value), the local values are periodically transferred to the global counter, by acquiring the global lock and incrementing it by the local counter’s value; the local counter is then reset to zero.

typedef struct __counter_t {
    int global;
    pthread_mutex_t glock;
    int local[NUMCPUS];
    pthread_mutex_t llock[NUMCPUS];
    int threshold;
} counter_t;

// init: record threshold, init locks, init values
// of all local counts and global count
void init(counter_t * c, int threshold) {
    c->threshold = threshold;

    c->global = 0; pthread_mutex_init(&c->glock, NULL);

    int i;
    for (i = 0; i < NUMCPUS; i++) {
        c->local[i] = 0;

        pthread_mutex_init(&c->llock[i], NULL);
    }
}

// update: usually, just grab local lock and update local amount
// once local count has risen by ’threshold’, grab global
// lock and transfer local values to it
void update(counter_t * c, int threadID, int amt) {
    pthread_mutex_lock(&c->llock[threadID]);
    c->local[threadID] += amt; // assumes amt > 0
    if (c->local[threadID] >= c->threshold) { // transfer to global
        pthread_mutex_lock(&c->glock);

        c->global += c->local[threadID];

        pthread_mutex_unlock(&c->glock);

        c->local[threadID] = 0;
    }
    pthread_mutex_unlock(&c->llock[threadID]);
}

// get: just return global amount (which may not be perfect)
int get(counter_t * c) {
    pthread_mutex_lock(&c->glock);
    int val = c->global;
    pthread_mutex_unlock(&c->glock);
    return val; // only approximate!
}

这个数据结构设计得其实有缺陷的,因为 global count 不是最准确的,所以使用它的场景得斟酌。

If S is low, performance is poor (but the global count is always quite accurate); if S is high, performance is excellent, but the global count lags (by the number of CPUs multiplied by S). This accuracy/performance trade-off is what sloppy counters enables.

Sloppy Counter 的性能
01

Concurrent Linked Lists

// basic node structure
typedef struct __node_t {
    int key;
    struct __node_t *next;
} node_t;

// basic list structure (one used per list)
typedef struct __list_t {
    node_t * head;
    pthread_mutex_t lock;
} list_t;

void List_Init(list_t * L) {
    L->head = NULL;
    pthread_mutex_init(&L->lock, NULL);
}

int List_Insert(list_t * L, int key) {
    pthread_mutex_lock(&L->lock);
    node_t * new = malloc(sizeof(node_t));
    if (new == NULL) {
        perror("malloc");
        pthread_mutex_unlock(&L->lock);
        return -1; // fail
    }

    new->key = key;
    new->next = L->head;
    L->head = new;
    pthread_mutex_unlock(&L->lock);
    return 0; // success
}

int List_Lookup(list_t * L, int key) {
    pthread_mutex_lock(&L->lock);
    node_t * curr = L->head;
    while (curr) {
        if (curr->key == key) {
            pthread_mutex_unlock(&L->lock);
            return 0; // success
        }
        curr = curr->next;
    }
    pthread_mutex_unlock(&L->lock);
    return -1;  // faulure
}

Concurrent Queues

typedef struct __node_t {
    int value;
    struct __node_t * next;
} node_t;

typedef struct __queue_t {
    node_t * head;
    node_t * tail;
    pthread_mutex_t headLock;
    pthread_mutex_t tailLock;
} queue_t;

void Queue_Init(queue_t * q) {
    node_t * tmp = malloc(sizeof(node_t));
    tmp->next = NULL;
    q->head = q->tail = tmp;
    pthread_mutex_init(&q->headLock, NULL);
    pthread_mutex_init(&q->tailLock, NULL);
}

void Queue_Enqueue(queue_t * q, int value) {
    node_t * tmp = malloc(sizeof(node_t));
    assert(tmp != NULL);
    tmp->value = value; tmp->next = NULL;

    pthread_mutex_lock(&q->tailLock);
    q->tail->next = tmp;
    q->tail = tmp;
    pthread_mutex_unlock(&q->tailLock);
}

int Queue_Dequeue(queue_t * q, int * value) {
    pthread_mutex_lock(&q->headLock);
    node_t * tmp = q->head;
    node_t * newHead = tmp->next;
    if (newHead == NULL) {
        pthread_mutex_unlock(&q->headLock);
        return -1; // queue was empty
    }
    *value = newHead->value;
    q->head = newHead;
    pthread_mutex_unlock(&q->headLock);

    free(tmp);
    return 0;
}

Concurrent Hash Tables

#define BUCKETS (101)

typedef struct __hash_t {
    list_t lists[BUCKETS];
} hash_t;

void Hash_Init(hash_t * H) {
    int i;
    for (i = 0; i < BUCKETS; i++) {
        List_Init(&H->lists[i]);
    }
}

int Hash_Insert(hash_t * H, int key) {
    int bucket = key % BUCKETS;
    return List_Insert(&H->lists[bucket], key);
}

int Hash_Lookup(hash_t * H, int key) {
    int bucket = key % BUCKETS;
    return List_Lookup(&H->lists[bucket], key);
}

Summary

总的来说,这节介绍的并发数据结构的实现略简单,更复杂的并发数据结构实现可以去看 Java 的 concurrency 包里的数据结构,也可以参考《C++ Concurrency in Action》。

引用

《 Locked Data Structures》

Comments
Write a Comment