This is an assignment that I did for my parallelism class. I implemented a lock-free linked list using atomic operations. A standard linked list would require using a mutex, or a lock, to prevent race conditions when multiple threads access the list concurrently. Using the atomic compare and swap (CAS) operation verifies that the value to be changed is still the value it was when the operation began. If it has changed, then another thread has accessed it and the operation must be retried. This is faster than waiting for a lock to be released.
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdint>
#include <cstdlib>
#include <forward_list>
#include <functional>
#include <mutex>
#include <random>
#include <shared_mutex>
#include <string.h>
#include <thread>
using std::atomic;
using std::atomic_compare_exchange_strong;
using std::forward_list;
using std::mutex;
using std::shared_lock;
using std::shared_mutex;
using std::thread;
using std::unique_lock;
shared_mutex mutex;
uint8_t numThreads{255};
template <typename T>
class Node {
public:
T key;
Node<T> *next{nullptr};
Node (T key) {
this->key = key;
}
Node (T key, Node<T> *next) {
this->key = key;
this->next = next;
}
};
class List {
public:
Node<int> *head;
Node<int> *tail;
List(){
this->head = new Node<int> (-99999);
this->tail = new Node<int> ( 99999);
head->next = tail;
}
~List(){
while (this->head != nullptr){
auto temp = this->head;
this->head = this->head->next;
delete temp;
}
}
Node<int>* prevFind(int data, bool *found){
std::shared_lock lock(mutex);
Node<int> *curr = nullptr;
for (Node<int> *j = this->head; (*j).key < data; j = (*j).next){
curr = j;
}
if (curr->next->key == data){
*found = true;
}
return curr;
}
void insert_after(int data){
bool found{false};
Node<int>* before = this->prevFind(data,&found);
if (found){
std::unique_lock lock(mutex);
Node<int>* after = before->next;
before->next = new Node<int>(data, after);
}
}
void erase(int data){
bool found{false};
Node<int>* before = this->prevFind(data, &found);
if (before->next->key != 99999 && found){
std::unique_lock lock(mutex);
Node<int>* after = before->next->next;
delete before->next;
before->next = after;
}
}
};
List mainList;
void linkMutex(int threadID){
int temp{0};
for (int i = 0; i < 100'000; i++) {
int data = rand() % 100;
int choice = rand() % 10;
switch (choice)
{
case(0):
{
mainList.insert_after(data);
break;
}
case(1):
{
mainList.erase(data);
break;
}
default:
bool found{false};
auto curr = mainList.prevFind(data, &found);
temp += curr->key;
break;
}
if (i == 50'000){
printf("Thread %d has reached 50%%\n",threadID);
}
}
printf("Thread %d has reached 100%%\n",threadID);
}
// ATOMIC
template <typename T>
class NodeA {
public:
T key;
atomic<NodeA<T>*> next{nullptr};
NodeA (T key) {
this->key = key;
}
NodeA (T key, NodeA<T> *next) {
this->key = key;
this->next = next;
}
NodeA<T>* mark(){
uint64_t temp = reinterpret_cast<std::uintptr_t>(this->next.load());
if (temp % 2 == 0){
temp++;
}
return reinterpret_cast<NodeA<T>*>(temp);
}
NodeA<T>* markNT(){
uint64_t temp = reinterpret_cast<std::uintptr_t>(this->next.load());
temp -= temp%2;
return reinterpret_cast<NodeA<T>*>(temp);
}
bool marked(){
if (this != nullptr){
uint64_t temp = reinterpret_cast<std::uintptr_t>(this->next.load());
return (temp % 2 == 1);
} else {
return false;
}
}
};
// CAS
// Cases:
// base: insert atomically, remove atomically
class ListA {
public:
NodeA<int>* head;
NodeA<int>* tail;
ListA(){
this->head = new NodeA<int> (-99999);
this->tail = new NodeA<int> ( 99999);
head->next = tail;
}
~ListA(){
while (this->head != nullptr){
auto temp = head;
this->head = this->head->next;
delete temp;
}
}
NodeA<int>* prevFindA(int data, bool *found){
NodeA<int> *curr = this->head;
while (curr->next != nullptr && (*curr->next).key < data){
curr = curr->markNT();
}
if (curr->next!=nullptr && (curr->next).load()->key == data){
*found = true;
}
return curr;
}
void insert_afterA(NodeA<int>* before, int data){
bool found{false};
NodeA<int>* after = this->prevFindA(data, &found);
NodeA<int>* temp = new NodeA<int>(data,before->next.load());
while (!after->marked() && !atomic_compare_exchange_strong(&before->next,&after,temp)){
before = this->prevFindA(data, &found);
after = before->next;
temp->next = after;
}
}
void mark_afterA(int data){
bool found{false};
NodeA<int>* before = this->prevFindA(data,&found);
if (found && !(*before->next).marked()){
before->mark();
this->erase_afterA(before, data);
}
}
void erase_afterA(NodeA<int>* before, int data){ // SEGFAULT?
NodeA<int>* after2 = (*before->next.load()).markNT();//->markNT()
//printf("just broken %p\n",after2);
NodeA<int>* expected = before->next.load();
if (before->next.load()->marked()){
if(!atomic_compare_exchange_strong(&before->next,&expected,after2)){
printf("erase_failed");
return;
}
//printf("erased %d at %p, replaced with %p\n", data,expected, after2);
}
}
};
ListA mainListA;
void linkAtomic(int threadID){
int temp{0};
for (int i = 0; i < 100'000; i++) {
int data = rand() % 100;
int choice = rand() % 10;
bool found{false};
switch (choice)
{
case(0):
{//insert
auto curr = mainListA.prevFindA(data,&found);
mainListA.insert_afterA(curr,data);
break;
}
case(1):
{//delete
mainListA.mark_afterA(data);
break;
}
default:
mainListA.prevFindA(data, &found);
temp += found;
break;
}
if (i % 10'000 == 0){
printf("Thread %d has reached %d%%\n",threadID,(i/1'000));
}
}
printf("Thread %d has reached 100%%\n",threadID);
printf("Temp reached a value of %d\n",temp);
}
// MAIN
int main(int numberOfArguments, char* arguments[]) {
bool found = false;
numThreads = atoi(arguments[1]);
thread* threads = new thread[numThreads];
auto t1 = std::chrono::high_resolution_clock::now();
for (int i = 0; i < numThreads; i++){
if (numberOfArguments >= 3 && strcmp(arguments[2],"atomic") != 0){
threads[i] = thread(linkMutex,i);
} else {
threads[i] = thread(linkAtomic,i);
}
}
for (int i = 0; i < numThreads; i++){
threads[i].join();
}
auto t2 = std::chrono::high_resolution_clock::now();
std::chrono::duration<float, std::milli> fp_ms = t2 - t1;
printf("Time is: %f ms\n",fp_ms.count());
return 0;
}