This example demonstrate a classic synchronization problem known as the producer-consumer problem using threads, mutexes, and condition variables. The problem illustrates how two processes (or threads, in this example) share a common, fixed-size buffer. One thread (the producer) creates items and puts them into the buffer, while the other thread (the consumer) takes items out of the buffer. The key challenge is to ensure that the producer does not try to add items to the buffer if it is full, and the consumer does not try to remove items from the buffer if it is empty, to prevent race conditions and ensure data integrity.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#define BUFFER_EMPTY (-1)
int sharedBuffer = BUFFER_EMPTY;
Mutex bufferMutex;
ThreadCondition bufferNotEmpty;
ThreadCondition bufferNotFull;
int producer(void* arg) {
(void)arg;
for (int i = 0; i < 10; ++i) {
mutex_lock(&bufferMutex);
while (sharedBuffer != BUFFER_EMPTY) {
condition_wait(&bufferNotEmpty, &bufferMutex);
}
sharedBuffer = i;
fmt_printf("Produced: %d\n", i);
condition_signal(&bufferNotFull);
mutex_unlock(&bufferMutex);
}
return 0;
}
int consumer(void* arg) {
(void)arg; // To avoid unused parameter warning
for (int i = 0; i < 10; ++i) {
mutex_lock(&bufferMutex);
while (sharedBuffer == BUFFER_EMPTY) {
condition_wait(&bufferNotFull, &bufferMutex);
}
fmt_printf("Consumed: %d\n", sharedBuffer);
sharedBuffer = BUFFER_EMPTY;
condition_signal(&bufferNotEmpty);
mutex_unlock(&bufferMutex);
}
return 0;
}
int main(void) {
Thread producerThread, consumerThread;
mutex_init(&bufferMutex, MUTEX_PLAIN);
condition_init(&bufferNotEmpty);
condition_init(&bufferNotFull);
thread_create(&producerThread, (ThreadStart)producer, NULL);
thread_create(&consumerThread, (ThreadStart)consumer, NULL);
thread_join(producerThread, NULL);
thread_join(consumerThread, NULL);
mutex_destroy(&bufferMutex);
condition_destroy(&bufferNotEmpty);
condition_destroy(&bufferNotFull);
return 0;
}
This example demonstrates a multi-threaded task processing system using a dynamic queue. It uses mutexes and condition variables to safely enqueue and dequeue tasks in a shared task queue. Each task consists of a function pointer and an argument, allowing for varied task execution. Worker threads continuously dequeue and execute tasks. The program showcases thread synchronization, dynamic memory allocation for task arguments, and safe multi-threaded access to a shared resource (the task queue).
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include <stdlib.h>
#define MAX_TASKS 100
#define NUM_THREADS 5
typedef struct task {
void (*function)(void*);
void* arg;
} Task;
Task taskQueue[MAX_TASKS];
int taskCount = 0;
Mutex queueMutex;
ThreadCondition hasTasks;
void enqueueTask(void (*function)(void*), void* arg) {
mutex_lock(&queueMutex);
if (taskCount < MAX_TASKS) {
Task newTask;
newTask.function = function;
newTask.arg = arg;
taskQueue[taskCount++] = newTask;
condition_signal(&hasTasks);
}
mutex_unlock(&queueMutex);
}
Task* dequeueTask() {
Task* task = NULL;
if (taskCount > 0) {
task = &taskQueue[--taskCount];
}
return task;
}
int worker(void* arg) {
(void)arg; // Unused parameter
while (1) {
mutex_lock(&queueMutex);
while (taskCount == 0) {
condition_wait(&hasTasks, &queueMutex);
}
Task* task = dequeueTask();
mutex_unlock(&queueMutex);
if (task) {
task->function(task->arg);
}
}
return -1;
}
void exampleTask(void* arg) {
int* num = (int*)arg;
fmt_printf("Executing task: %d\n", *num);
free(arg);
}
int main(void) {
Thread threads[NUM_THREADS];
struct timespec remaining;
mutex_init(&queueMutex, MUTEX_PLAIN);
condition_init(&hasTasks);
for (int i = 0; i < NUM_THREADS; ++i) {
thread_create(&threads[i], worker, NULL);
}
for (int i = 0; i < 20; ++i) {
int* arg = (int*) malloc(sizeof(int));
*arg = i;
enqueueTask(exampleTask, arg);
}
thread_sleep(&(struct timespec){.tv_sec = 2, .tv_nsec = 0}, &remaining); // Wait for tasks to complete
for (int i = 0; i < NUM_THREADS; ++i) {
thread_detach(threads[i]); // Detach threads for cleanup
}
mutex_destroy(&queueMutex);
condition_destroy(&hasTasks);
return 0;
}
This example creates a thread pool that manages a fixed number of worker threads. Tasks can be submitted to the pool and are executed by the next available worker thread. This pattern is useful for efficiently processing tasks asynchronously and concurrently, without the overhead of creating and destroying threads for each task.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#define MAX_TASKS 100
#define THREAD_POOL_SIZE 4
typedef struct task {
void (*function)(void*);
void* arg;
} Task;
Task taskQueue[MAX_TASKS];
int queueStart = 0; // Points to the start of the queue
int queueEnd = 0; // Points to the end of the queue
Mutex queueMutex;
ThreadCondition queueNotEmpty;
ThreadCondition queueNotFull;
int threadPoolShutdown = 0;
void enqueueTask(void (*function)(void*), void* arg) {
mutex_lock(&queueMutex);
while ((queueEnd + 1) % MAX_TASKS == queueStart) { // Wait if the queue is full
condition_wait(&queueNotFull, &queueMutex);
}
taskQueue[queueEnd].function = function;
taskQueue[queueEnd].arg = arg;
queueEnd = (queueEnd + 1) % MAX_TASKS;
condition_signal(&queueNotEmpty);
mutex_unlock(&queueMutex);
}
Task dequeueTask() {
Task task;
mutex_lock(&queueMutex);
while (queueStart == queueEnd && !threadPoolShutdown) { // Wait if the queue is empty
condition_wait(&queueNotEmpty, &queueMutex);
}
if (threadPoolShutdown) {
mutex_unlock(&queueMutex);
thread_exit(0);
}
task = taskQueue[queueStart];
queueStart = (queueStart + 1) % MAX_TASKS;
condition_signal(&queueNotFull);
mutex_unlock(&queueMutex);
return task;
}
int worker(void* arg) {
(void)arg; // Unused parameter
while (1) {
Task task = dequeueTask();
if (task.function) {
task.function(task.arg);
}
}
return 0;
}
void initializeThreadPool() {
mutex_init(&queueMutex, MUTEX_PLAIN);
condition_init(&queueNotEmpty);
condition_init(&queueNotFull);
Thread threads[THREAD_POOL_SIZE];
for (int i = 0; i < THREAD_POOL_SIZE; ++i) {
thread_create(&threads[i], worker, NULL);
}
}
void shutdownThreadPool() {
mutex_lock(&queueMutex);
threadPoolShutdown = 1;
condition_broadcast(&queueNotEmpty); // Wake up all workers for shutdown
mutex_unlock(&queueMutex);
}
void exampleTask(void* arg) {
int* num = (int*)arg;
fmt_printf("Processing task: %d\n", *num);
free(arg);
}
int main(void) {
initializeThreadPool();
// Submit tasks to the pool
for (int i = 0; i < 20; ++i) {
int* arg = (int*) malloc(sizeof(int));
*arg = i;
enqueueTask(exampleTask, arg);
}
// Give some time for tasks to be processed
thread_sleep(&(struct timespec){.tv_sec = 5, .tv_nsec = 0}, NULL);
fmt_printf("After Proccess\n");
shutdownThreadPool();
return 0;
}
This example demonstrates how to implement a barrier synchronization mechanism. The barrier will ensure that all participating threads reach a certain point in their execution before any of them can proceed. We'll simulate a scenario where threads perform an initial phase of work, wait at the barrier for all to finish, and then proceed to a final phase of work.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include <stdlib.h>
#define NUM_THREADS 5
typedef struct barrier {
Mutex mutex;
ThreadCondition condition;
int count;
int waiting;
int phase;
} Barrier;
void barrier_init(Barrier* barrier, int count) {
mutex_init(&barrier->mutex, MUTEX_PLAIN);
condition_init(&barrier->condition);
barrier->count = count;
barrier->waiting = 0;
barrier->phase = 0;
}
void barrier_wait(Barrier* barrier) {
mutex_lock(&barrier->mutex);
int my_phase = barrier->phase; // Capture the current phase
if (++barrier->waiting == barrier->count) {
barrier->waiting = 0; // Reset for the next use
barrier->phase++;
condition_broadcast(&barrier->condition); // Wake up all threads
}
else {
while (my_phase == barrier->phase) {
condition_wait(&barrier->condition, &barrier->mutex);
}
}
mutex_unlock(&barrier->mutex);
}
void barrier_destroy(Barrier* barrier) {
mutex_destroy(&barrier->mutex);
condition_destroy(&barrier->condition);
}
int worker(void* arg) {
Barrier* barrier = (Barrier*)arg;
// Phase 1: Initial work before the barrier
fmt_printf("Thread %lu doing initial work\n", thread_current());
thread_sleep(&(struct timespec){.tv_sec = 1, .tv_nsec = 0}, NULL);
// Wait at the barrier for all threads
fmt_printf("Thread %lu waiting at the barrier\n", thread_current());
barrier_wait(barrier);
// Phase 2: Final work after the barrier
fmt_printf("Thread %lu doing final work\n", thread_current());
return 0;
}
int main(void) {
Barrier barrier;
barrier_init(&barrier, NUM_THREADS);
Thread threads[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; ++i) {
thread_create(&threads[i], worker, &barrier);
}
for (int i = 0; i < NUM_THREADS; ++i) {
thread_join(threads[i], NULL);
}
barrier_destroy(&barrier);
return 0;
}
This is a example that demonstrates the use of thread-specific storage (TSS), also known as thread-local storage (TLS), along with a one-time initialization pattern using call_once. This example will simulate a logging mechanism where each thread can have its own log buffer, and the initialization of the logging infrastructure is done only once in a thread-safe manner.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include <stdlib.h>
#include <string.h>
#define LOG_BUFFER_SIZE 256
#define NUM_THREADS 4
// Thread-specific key for accessing the log buffer
ThreadSpecific logBufferKey;
// OnceFlag for one-time initialization
OnceFlag logInitOnce = ONCE_FLAG_INIT;
// Function to initialize logging resources
void initLogging(void) {
// Create the thread-specific storage key for log buffers
thread_specific_create(&logBufferKey, free);
}
// Function to get or create a log buffer for the current thread
char* getLogBuffer(void) {
char* buffer = (char*)thread_specific_get(logBufferKey);
if (buffer == NULL) {
buffer = (char*)malloc(LOG_BUFFER_SIZE);
if (buffer) {
memset(buffer, 0, LOG_BUFFER_SIZE);
thread_specific_set(logBufferKey, buffer);
}
}
return buffer;
}
void logMessage(const char* message) {
// Ensure the logging system is initialized exactly once
call_once(&logInitOnce, initLogging);
// Get the current thread's log buffer
char* buffer = getLogBuffer();
if (buffer) {
strcat(buffer, message);
strcat(buffer, "\n");
fmt_printf("Thread %lu: %s", thread_current(), buffer);
}
}
int worker(void* arg) {
(void)arg;
logMessage("Starting work");
// Simulate some work
thread_sleep(&(struct timespec){.tv_sec = 1, .tv_nsec = 0}, NULL);
logMessage("Work completed");
return 0;
}
int main(void) {
Thread threads[NUM_THREADS];
// Create and start threads
for (int i = 0; i < NUM_THREADS; ++i) {
thread_create(&threads[i], worker, NULL);
}
// Wait for threads to finish
for (int i = 0; i < NUM_THREADS; ++i) {
thread_join(threads[i], NULL);
}
thread_specific_delete(logBufferKey);
return 0;
}
This example demonstrates a simple multithreaded application using a custom concurrent library. It's structured to create two threads that execute concurrently, each printing the current time at specified intervals, synchronized using a mutex to ensure that the time printing does not occur simultaneously from both threads.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include "time/time.h"
// Define a structure for thread arguments
typedef struct {
int threadID;
const char* name;
int counter;
int delay;
} ThreadArgs;
// Mutex for synchronizing thread access
Mutex threadLock;
// Thread function prototype
int run(void* arguments);
// Function for printing the current time
void print_time(const char* threadName, int delay, int counter) {
for (int i = 0; i < counter; ++i) {
time_sleep(delay);
Time* time = time_current_time();
fmt_printf("%s: %s\n", threadName, time_to_string(time));
time_deallocate(time);
}
}
// Thread function
int run(void* arguments) {
ThreadArgs* args = (ThreadArgs*)arguments;
fmt_printf("Starting %s\n", args->name);
// Acquire the lock
mutex_lock(&threadLock);
print_time(args->name, args->delay, args->counter);
// Release the lock
mutex_unlock(&threadLock);
return 0;
}
int main(void) {
Thread thread1, thread2;
// Initialize the mutex
mutex_init(&threadLock, MUTEX_PLAIN);
// Define thread arguments
ThreadArgs args1 = {1, "Thread-1", 3, 1};
ThreadArgs args2 = {2, "Thread-2", 3, 2};
// Create new threads
thread_create(&thread1, run, &args1);
thread_create(&thread2, run, &args2);
// Wait for all threads to complete
thread_join(thread1, NULL);
thread_join(thread2, NULL);
// Destroy the mutex
mutex_destroy(&threadLock);
fmt_printf("Exiting Main Thread\n");
return 0;
}
Same C Code in Python
import threading
import time
class myThread (threading.Thread):
def __init__(self, threadID, name, counter):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.counter = counter
def run(self):
print ("Starting " + self.name)
# Get lock to synchronize threads
threadLock.acquire()
print_time(self.name, self.counter, 3)
# Free lock to release next thread
threadLock.release()
def print_time(threadName, delay, counter):
while counter:
time.sleep(delay)
print ("%s: %s" % (threadName, time.ctime(time.time())))
counter -= 1
threadLock = threading.Lock()
threads = []
# Create new threads
thread1 = myThread(1, "Thread-1", 1)
thread2 = myThread(2, "Thread-2", 2)
# Start new Threads
thread1.start()
thread2.start()
# Add threads to thread list
threads.append(thread1)
threads.append(thread2)
# Wait for all threads to complete
for t in threads:
t.join()
print ("Exiting Main Thread")
This example demonstrates that how we can use vector container for saving each Thread
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include "vector/vector.h"
#include "time/time.h"
typedef struct {
int threadID;
const char* name;
int delay;
} ThreadArgs;
int doTask(void* arguments) {
ThreadArgs* args = (ThreadArgs*)arguments;
fmt_printf("Starting %s\n", args->name);
time_sleep(args->delay);
fmt_printf("%s completed\n", args->name);
return 0;
}
int main() {
Vector* threads = vector_create(sizeof(Thread));
for (int i = 0; i < 10; ++i) {
ThreadArgs* args = (ThreadArgs*)malloc(sizeof(ThreadArgs));
args->threadID = i;
args->name = "Thread";
args->delay = 1;
Thread newThread;
if (thread_create(&newThread, doTask, args) == THREAD_SUCCESS) {
vector_push_back(threads, &newThread);
}
else {
fmt_printf("Failed to create thread\n");
}
}
// Join all threads
for (size_t i = 0; i < vector_size(threads); ++i) {
Thread* t = (Thread*)vector_at(threads, i);
if (t != NULL) {
thread_join(*t, NULL);
}
}
vector_deallocate(threads);
fmt_printf("Exiting Main Thread\n");
return 0;
}
This example demonstrates a parallel implementation of the accumulate algorithm, typically used to sum or otherwise combine all elements in a collection. This particular implementation is designed to work with an array of integers. The program is structured to leverage multi-threading for improved performance, especially beneficial for large datasets and on systems with multiple cores or processors.
The program efficiently utilizes multiple threads to parallelize the work, potentially reducing the execution time on multi-core systems
The number of threads is determined at runtime based on the hardware's concurrency level, ensuring the program can adapt to different environments
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include "algorithm/algorithm.h"
#include "vector/vector.h"
#define MIN_PER_THREAD 50
#define SIZE 100
// Function to perform accumulation in a segment
void accumulate_range(int* start, int* end, int* result) {
*result = 0;
for (int* ptr = start; ptr != end; ++ptr) {
*result += *ptr;
}
}
// Thread start function
int accumulate_segment(void* args) {
Pair* range = (Pair*)args; // Using Pair to pass start and end pointers
int* start = (int*)range->first;
int* end = (int*)range->second;
int result;
accumulate_range(start, end, &result);
return result;
}
// Parallel accumulate function
int parallel_accumulate(int* array, size_t num_elements, int init) {
if (num_elements == 0) return init;
unsigned long num_threads = thread_hardware_concurrency();
num_threads = num_threads ? num_threads : 2; // Use 2 if hardware_concurrency() returns 0
unsigned long block_size = num_elements / num_threads;
if (block_size < MIN_PER_THREAD) {
block_size = MIN_PER_THREAD;
num_threads = (num_elements + MIN_PER_THREAD - 1) / MIN_PER_THREAD;
}
Vector* threads = vector_create(sizeof(Thread));
Vector* results = vector_create(sizeof(int));
for (size_t i = 0; i < (num_threads - 1); ++i) {
Pair range = {(void*)(array + i * block_size), (void*)(array + (i + 1) * block_size)};
Thread new_thread;
thread_create(&new_thread, accumulate_segment, &range);
vector_push_back(threads, &new_thread);
}
// Handle the last segment
Pair last_range = {(void*)(array + (num_threads - 1) * block_size), (void*)(array + num_elements)};
int last_result = accumulate_segment(&last_range);
vector_push_back(results, &last_result);
// Join threads and accumulate results
int final_result = init;
for (size_t i = 0; i < vector_size(threads); ++i) {
Thread* thread = (Thread*)vector_at(threads, i);
int res = 0;
thread_join(*thread, &res);
final_result += res;
}
final_result += last_result;
vector_deallocate(threads);
vector_deallocate(results);
return final_result;
}
void int_generator(void* output) {
static int counter = 1;
*(int *)output = counter++;
}
int main() {
int array[SIZE];
algorithm_generate(array, array + SIZE, sizeof(int), int_generator);
int sum = parallel_accumulate(array, SIZE, 0);
fmt_printf("Sum: %d\n", sum);
return 0;
}
This example divides the task of calculating a factorial into smaller tasks, each handled by a separate thread. The results are then combined to get the final output. It showcases thread creation, mutex locking for shared data protection, condition variables for synchronization, and the use of a dynamic array to store thread handles
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#define NUM_FACT_THREADS 4
typedef struct {
int start;
int end;
unsigned long long result;
} FactorialSegment;
Mutex resultMutex;
ThreadCondition calculationDone;
int calculationsCompleted = 0;
unsigned long long sequentialFactorial(int start, int end) {
unsigned long long result = 1;
for (int i = start; i <= end; ++i) {
result *= i;
}
return result;
}
int factorialWorker(void* arg) {
FactorialSegment* segment = (FactorialSegment*)arg;
segment->result = sequentialFactorial(segment->start, segment->end);
mutex_lock(&resultMutex);
calculationsCompleted++;
if (calculationsCompleted == NUM_FACT_THREADS) {
condition_signal(&calculationDone);
}
mutex_unlock(&resultMutex);
return 0;
}
unsigned long long parallelFactorial(int n) {
int segmentSize = n / NUM_FACT_THREADS;
Thread threads[NUM_FACT_THREADS];
FactorialSegment segments[NUM_FACT_THREADS];
mutex_init(&resultMutex, MUTEX_PLAIN);
condition_init(&calculationDone);
for (int i = 0; i < NUM_FACT_THREADS; ++i) {
segments[i].start = i * segmentSize + 1;
segments[i].end = (i == NUM_FACT_THREADS - 1) ? n : (i + 1) * segmentSize;
thread_create(&threads[i], factorialWorker, &segments[i]);
}
mutex_lock(&resultMutex);
while (calculationsCompleted < NUM_FACT_THREADS) {
condition_wait(&calculationDone, &resultMutex);
}
mutex_unlock(&resultMutex);
unsigned long long finalResult = 1;
for (int i = 0; i < NUM_FACT_THREADS; ++i) {
finalResult *= segments[i].result;
}
mutex_destroy(&resultMutex);
condition_destroy(&calculationDone);
return finalResult;
}
int main() {
int n = 20; // Example: Calculate 20!
unsigned long long result = parallelFactorial(n);
fmt_printf("Factorial of %d is %llu\n", n, result);
return 0;
}
This example demonstrates how to create and join multiple threads. Each thread will print its thread ID and a message, showcasing basic thread managemen
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
// Thread function
int printMessage(void* arg) {
const char* message = (const char*)arg;
unsigned long threadID = thread_current();
fmt_printf("Thread %lu: %s\n", threadID, message);
return 0;
}
int main(void) {
Thread thread1, thread2;
// Create threads
thread_create(&thread1, printMessage, "Hello from Thread 1");
thread_create(&thread2, printMessage, "Hello from Thread 2");
// Wait for threads to finish
thread_join(thread1, NULL);
thread_join(thread2, NULL);
fmt_printf("Main thread: All threads have finished.\n");
return 0;
}
This example shows how to use a mutex to ensure mutual exclusion when accessing a shared resource. Two threads will increment a shared counter, but access to the counter is synchronized using a mutex to prevent race conditions.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
int sharedCounter = 0;
Mutex counterMutex;
// Thread function to increment the shared counter
int incrementCounter(void* arg) {
(void)arg;
for (int i = 0; i < 100000; ++i) {
mutex_lock(&counterMutex);
sharedCounter++;
mutex_unlock(&counterMutex);
}
return 0;
}
int main(void) {
Thread thread1, thread2;
// Initialize the mutex
mutex_init(&counterMutex, MUTEX_PLAIN);
// Create and start threads
thread_create(&thread1, incrementCounter, NULL);
thread_create(&thread2, incrementCounter, NULL);
// Wait for threads to finish
thread_join(thread1, NULL);
thread_join(thread2, NULL);
// Destroy the mutex
mutex_destroy(&counterMutex);
fmt_printf("Main thread: Final counter value is %d.\n", sharedCounter);
return 0;
}
This example demonstrates how to implement a synchronized logging system using threads. Each thread generates log messages, and a dedicated logging thread writes these messages to a log file. The communication between threads is synchronized using mutexes and condition variables to ensure thread safety.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include "log/log.h"
#define LOG_QUEUE_SIZE 10
#define NUM_WORKER_THREADS 3
typedef struct {
char message[256];
} LogMessage;
typedef struct {
LogMessage logQueue[LOG_QUEUE_SIZE];
int logCount;
Mutex logMutex;
ThreadCondition logNotEmpty;
ThreadCondition logNotFull;
int stopLogging;
Log* logger;
} Logger;
// Logging thread function using the custom Log library
int loggerThreadFunc(void* arg) {
Logger* loggerStruct = (Logger*)arg;
loggerStruct->logger = log_init();
if (!loggerStruct->logger) {
fmt_fprintf(stderr, "Failed to initialize logging system.\n");
return -1;
}
while (!loggerStruct->stopLogging || loggerStruct->logCount > 0) {
mutex_lock(&loggerStruct->logMutex);
while (loggerStruct->logCount == 0 && !loggerStruct->stopLogging) {
condition_wait(&loggerStruct->logNotEmpty, &loggerStruct->logMutex);
}
if (loggerStruct->logCount > 0) {
log_message(loggerStruct->logger, LOG_LEVEL_INFO, loggerStruct->logQueue[0].message);
for (int i = 1; i < loggerStruct->logCount; ++i) {
loggerStruct->logQueue[i - 1] = loggerStruct->logQueue[i];
}
--loggerStruct->logCount;
condition_signal(&loggerStruct->logNotFull);
}
mutex_unlock(&loggerStruct->logMutex);
}
log_deallocate(loggerStruct->logger);
return 0;
}
// Adapted to use the custom logging system
void addLogMessage(Logger* loggerStruct, const char* message) {
mutex_lock(&loggerStruct->logMutex);
while (loggerStruct->logCount == LOG_QUEUE_SIZE) {
condition_wait(&loggerStruct->logNotFull, &loggerStruct->logMutex);
}
strncpy(loggerStruct->logQueue[loggerStruct->logCount++].message, message, sizeof(loggerStruct->logQueue[0].message) - 1);
condition_signal(&loggerStruct->logNotEmpty);
mutex_unlock(&loggerStruct->logMutex);
}
int main(void) {
Logger loggerStruct;
Thread loggerThread;
int creationResult;
mutex_init(&loggerStruct.logMutex, MUTEX_PLAIN);
condition_init(&loggerStruct.logNotEmpty);
condition_init(&loggerStruct.logNotFull);
loggerStruct.logCount = 0;
loggerStruct.stopLogging = 0;
creationResult = thread_create(&loggerThread, loggerThreadFunc, &loggerStruct);
if (creationResult != THREAD_SUCCESS) {
fmt_fprintf(stderr, "Failed to create logger thread.\n");
}
for (int i = 0; i < 5; ++i) {
char message[256];
sprintf(message, "Log message %d", i + 1);
addLogMessage(&loggerStruct, message);
thread_sleep(&(struct timespec){.tv_sec = 1, .tv_nsec = 0}, NULL);
}
thread_sleep(&(struct timespec){.tv_sec = 2, .tv_nsec = 0}, NULL);
if (creationResult == THREAD_SUCCESS) {
mutex_lock(&loggerStruct.logMutex);
loggerStruct.stopLogging = 1;
condition_signal(&loggerStruct.logNotEmpty);
mutex_unlock(&loggerStruct.logMutex);
thread_join(loggerThread, NULL);
}
mutex_destroy(&loggerStruct.logMutex);
condition_destroy(&loggerStruct.logNotEmpty);
condition_destroy(&loggerStruct.logNotFull);
return 0;
}
This example demonstrate a File Copying Program using Multithreading. This example will showcase how to divide the task of copying a large file into smaller chunks, which are then processed in parallel by multiple threads. This approach can potentially speed up the file copying process on systems with multiple cores.
#include "concurrent/concurrent.h"
#include "fmt/fmt.h"
#include "file_io/file_reader.h"
#include "file_io/file_writer.h"
#define MAX_THREADS 4
#define BUFFER_SIZE 1024*1024 // 1MB buffer
typedef struct {
const char* sourceFile;
const char* destFile;
size_t startOffset;
size_t endOffset;
} FileCopyTask;
Mutex fileMutex;
// Thread function to copy a file segment
int copyFileSegment(void* arg) {
FileCopyTask* task = (FileCopyTask*)arg;
FileReader* source = NULL;
FileWriter* dest = NULL;
char buffer[BUFFER_SIZE];
size_t bytesRead, totalBytesRead = 0;
// Synchronize the opening of files to prevent concurrent access issues
mutex_lock(&fileMutex);
source = file_reader_open(task->sourceFile, READ_BINARY);
dest = file_writer_open(task->destFile, WRITE_BINARY);
mutex_unlock(&fileMutex);
if (!source || !dest) {
fmt_printf("Error opening files.\n");
return -1;
}
file_reader_seek(source, task->startOffset, POS_BEGIN);
file_writer_seek(dest, task->startOffset, POS_BEGIN);
while (totalBytesRead < (task->endOffset - task->startOffset)) {
size_t bytesToRead = BUFFER_SIZE;
if ((task->startOffset + totalBytesRead + bytesToRead) > task->endOffset) {
bytesToRead = task->endOffset - task->startOffset - totalBytesRead;
}
bytesRead = file_reader_read(buffer, 1, bytesToRead, source);
file_writer_write(buffer, 1, bytesRead, dest);
totalBytesRead += bytesRead;
}
file_reader_close(source);
file_writer_close(dest);
return 0;
}
void multiThreadedFileCopy(const char* sourceFile, const char* destFile) {
FileReader* source = file_reader_open(sourceFile, READ_BINARY);
if (!source) {
fmt_printf("Error opening source file.\n");
return;
}
long fileSize = file_reader_get_size(source);
file_reader_close(source);
FileWriter* dest = file_writer_open(destFile, WRITE_BINARY);
if (!dest) {
fmt_printf("Error preparing destination file.\n");
return;
}
file_writer_close(dest); // Close immediately after open just to create or clear the file
long partSize = fileSize / MAX_THREADS;
Thread threads[MAX_THREADS];
FileCopyTask tasks[MAX_THREADS];
mutex_init(&fileMutex, MUTEX_PLAIN);
for (int i = 0; i < MAX_THREADS; ++i) {
tasks[i].sourceFile = sourceFile;
tasks[i].destFile = destFile;
tasks[i].startOffset = i * partSize;
tasks[i].endOffset = (i == MAX_THREADS - 1) ? fileSize : (i + 1) * partSize;
thread_create(&threads[i], copyFileSegment, &tasks[i]);
}
for (int i = 0; i < MAX_THREADS; ++i) {
thread_join(threads[i], NULL);
}
mutex_destroy(&fileMutex);
fmt_printf("File copy completed.\n");
}
int main(int argc, char* argv[]) {
// send source and dest over cli
if (argc != 3) {
fmt_printf("Usage: %s <source file> <destination file>\n", argv[0]);
return 1;
}
multiThreadedFileCopy(argv[1], argv[2]);
return 0;
}