//ANASTASIA_PAPOUDA

#include <opencv2/opencv.hpp>
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <chrono>
#include <algorithm>
#include <numeric>
#include <future>
#include <functional>

using namespace std;
using namespace cv;


//use of quick sort algorithm
int partition(vector<uchar>& arr, int low, int high) {
    uchar pivot = arr[low + (high - low) / 2];
    int i = low - 1;
    int j = high + 1;
    while (true) {
        do {
            i++;
        } while (arr[i] < pivot);
        do {
            j--;
        } while (arr[j] > pivot);
        if (i >= j) return j;
        swap(arr[i], arr[j]);
    }
}

//recursive quicksort
void quickSort(vector<uchar>& arr, int low, int high) {
    if (low < high) {
        int pi = partition(arr, low, high);
        quickSort(arr, low, pi);
        quickSort(arr, pi + 1, high);
    }
}


void sortRow(vector<uchar>& row) {
    quickSort(row, 0, row.size() - 1);
}


//process a single frame (sort every row)
Mat processFrame(const Mat& frame) {
    Mat sortedFrame = frame.clone();   //copy to keep original untouched
    for (int r = 0; r < sortedFrame.rows; ++r) {
        //get pointer to row data
        uchar* rowPtr = sortedFrame.ptr<uchar>(r);
        vector<uchar> row(rowPtr, rowPtr + sortedFrame.cols);
        sortRow(row);
        //copy back
        copy(row.begin(), row.end(), rowPtr);
    }
    return sortedFrame;
}


public:
    //creates numThreads workers
    ThreadPool(size_t numThreads) : stop(false) {
        for (size_t i = 0; i < numThreads; ++i) {
            workers.emplace_back([this] {
                while (true) {
                    function<void()> task;
                    {
                        unique_lock<mutex> lock(queueMutex);
                        condition.wait(lock, [this] { return stop || !tasks.empty(); });
                        if (stop && tasks.empty()) return;
                        task = move(tasks.front());
                        tasks.pop();
                    }
                    task();
                }
            });
        }
    }

    //adds a task to the queue
    void enqueue(function<void()> task) {
        {
            unique_lock<mutex> lock(queueMutex);
            tasks.emplace(move(task));
        }
        condition.notify_one();
    }

    //stops all threads
    ~ThreadPool() {
        {
            unique_lock<mutex> lock(queueMutex);
            stop = true;
        }
        condition.notify_all();
        for (thread& worker : workers) {
            worker.join();
        }
    }

private:
    vector<thread> workers;
    queue<function<void()>> tasks;
    mutex queueMutex;
    condition_variable condition;
    bool stop;
};


//sequential processing
vector<Mat> processSequential(const vector<Mat>& frames) {
    vector<Mat> result;
    result.reserve(frames.size());
    auto start = chrono::high_resolution_clock::now();
    for (const auto& frame : frames) {
        result.push_back(processFrame(frame));
    }
    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Sequential time: " << elapsed << " seconds" << endl;
    return result;
}


//Bounded Prolific
vector<Mat> schedulerProlific(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    vector<thread> workers;
    auto start = chrono::high_resolution_clock::now();

    //determines chunk size
    int chunkSize = frames.size() / numWorkers;
    int remainder = frames.size() % numWorkers;

    int startIdx = 0;
    for (int w = 0; w < numWorkers; ++w) {
        int endIdx = startIdx + chunkSize + (w < remainder ? 1 : 0);
        workers.emplace_back([&, startIdx, endIdx] {
            for (int i = startIdx; i < endIdx; ++i) {
                result[i] = processFrame(frames[i]);
            }
        });
        startIdx = endIdx;
    }

    for (auto& t : workers) t.join();

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Prolific time: " << elapsed << " seconds" << endl;
    return result;
}


//Bounded Collective
vector<Mat> schedulerCollective(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    atomic<int> nextIdx(0);
    vector<thread> workers;
    auto start = chrono::high_resolution_clock::now();

    for (int w = 0; w < numWorkers; ++w) {
        workers.emplace_back([&] {
            while (true) {
                int idx = nextIdx.fetch_add(1);
                if (idx >= (int)frames.size()) break;
                result[idx] = processFrame(frames[idx]);
            }
        });
    }

    for (auto& t : workers) t.join();

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Collective time: " << elapsed << " seconds" << endl;
    return result;
}


//Bounded Sko (Prolific + Collective)
vector<Mat> schedulerSko(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    vector<thread> workers;
    auto start = chrono::high_resolution_clock::now();

    for (int w = 0; w < numWorkers; ++w) {
        workers.emplace_back([&, w] {
        
            for (int i = w; i < (int)frames.size(); i += numWorkers) {
                result[i] = processFrame(frames[i]);
            }
        });
    }

    for (auto& t : workers) t.join();

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Sko time: " << elapsed << " seconds" << endl;
    return result;
}


//Bounded Subreaper Prolific
vector<Mat> schedulerSubreaper(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    ThreadPool pool(numWorkers);
    atomic<int> taskCounter(0);
    int totalTasks = frames.size();

    auto start = chrono::high_resolution_clock::now();

    //enqueue all frames as tasks
    for (int i = 0; i < totalTasks; ++i) {
        pool.enqueue([&, i] {
            result[i] = processFrame(frames[i]);
            taskCounter.fetch_add(1);
        });
    }

    //waits for all tasks to be completed
    while (taskCounter.load() < totalTasks) {
        this_thread::yield();
    }

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Subreaper Prolific time: " << elapsed << " seconds" << endl;
    return result;
}


//Mmap bounded Subreaper Collective
vector<Mat> schedulerMmapSubreaper(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    ThreadPool pool(numWorkers);
    atomic<int> taskCounter(0);
    int totalTasks = frames.size();

    auto start = chrono::high_resolution_clock::now();

    for (int i = 0; i < totalTasks; ++i) {
        pool.enqueue([&, i] {
            result[i] = processFrame(frames[i]);
            taskCounter.fetch_add(1);
        });
    }

    while (taskCounter.load() < totalTasks) {
        this_thread::yield();
    }

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Mmap Subreaper Collective time: " << elapsed << " seconds" << endl;
    return result;
}


//Mmap bounded supreaper collective with group barrier and affinity
vector<Mat> schedulerMmapSupreaper(const vector<Mat>& frames, int numWorkers) {
    vector<Mat> result(frames.size());
    atomic<int> nextIdx(0);
    vector<thread> workers;

    
    mutex barrierMutex;
    condition_variable barrierCV;
    int barrierCount = 0;
    int targetCount = numWorkers;
    bool barrierReady = false;

    auto barrierWait = [&] {
        unique_lock<mutex> lock(barrierMutex);
        barrierCount++;
        if (barrierCount == targetCount) {
            barrierReady = true;
            barrierCV.notify_all();
        } else {
            barrierCV.wait(lock, [&] { return barrierReady; });
        }
    };

    auto start = chrono::high_resolution_clock::now();

    for (int w = 0; w < numWorkers; ++w) {
        workers.emplace_back([&, w] {
            //sets CPU affinity (Linux only)
#ifdef __linux__
            cpu_set_t cpuset;
            CPU_ZERO(&cpuset);
            CPU_SET(w % std::thread::hardware_concurrency(), &cpuset);
            pthread_t thread = pthread_self();
            pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
#endif

            
            barrierWait();

            
            while (true) {
                int idx = nextIdx.fetch_add(1);
                if (idx >= (int)frames.size()) break;
                result[idx] = processFrame(frames[idx]);
            }

            
            barrierWait();
        });
    }

    for (auto& t : workers) t.join();

    auto end = chrono::high_resolution_clock::now();
    double elapsed = chrono::duration<double>(end - start).count();
    cout << "Mmap Supreaper (barrier+affinity) time: " << elapsed << " seconds" << endl;
    return result;
}


// Performance metrics calculation
void printMetrics(const string& name, double parallelTime, double sequentialTime, int numWorkers) {
    double speedup = sequentialTime / parallelTime;
    double efficiency = speedup / numWorkers;
    double sefficiency = efficiency;   // can be defined as efficiency * (some factor), here we keep same
    cout << "\n--- " << name << " ---" << endl;
    cout << "  Time: " << parallelTime << " s" << endl;
    cout << "  Speedup: " << speedup << endl;
    cout << "  Efficiency: " << efficiency << endl;
    cout << "  SEfficiency: " << sefficiency << endl;
}


int main() {

    //load video frames
    string videoPath = "2a.mp4";
    VideoCapture cap(videoPath);
    if (!cap.isOpened()) {
        cerr << "Error: Could not open video file " << videoPath << endl;
        return -1;
    }

    vector<Mat> frames;
    Mat frame;
    while (cap.read(frame)) {
        Mat gray;
        cvtColor(frame, gray, COLOR_BGR2GRAY);
        frames.push_back(gray.clone());
    }
    cap.release();
    cout << "Loaded " << frames.size() << " frames, each of size " << frames[0].rows << "x" << frames[0].cols << endl;

    cout << "\nRunning sequential baseline..." << endl;
    vector<Mat> sortedFramesSeq = processSequential(frames);
    //saves the sorted frames as a new video
    VideoWriter outVideo("sorted_output.mp4", VideoWriter::fourcc('m', 'p', '4', 'v'), 30.0,
                         Size(frames[0].cols, frames[0].rows), false);
    if (!outVideo.isOpened()) {
        cerr << "Error: Could not create output video." << endl;
    } else {
        for (const auto& sf : sortedFramesSeq) {
            outVideo.write(sf);
        }
        outVideo.release();
        cout << "Sorted video saved as sorted_output.mp4" << endl;
    }

    //get sequential time for metrics
    auto startSeq = chrono::high_resolution_clock::now();
    vector<Mat> dummy = processSequential(frames); // just for time
    auto endSeq = chrono::high_resolution_clock::now();
    double seqTime = chrono::duration<double>(endSeq - startSeq).count();

    //number of workers = number of hardware threads
    int numWorkers = thread::hardware_concurrency();
    if (numWorkers == 0) numWorkers = 4; // fallback
    cout << "\nUsing " << numWorkers << " worker threads." << endl;

    //tests each scheduler
    auto runScheduler = [&](function<vector<Mat>(const vector<Mat>&, int)> scheduler, const string& name) {
        auto start = chrono::high_resolution_clock::now();
        vector<Mat> result = scheduler(frames, numWorkers);
        auto end = chrono::high_resolution_clock::now();
        double parallelTime = chrono::duration<double>(end - start).count();
        printMetrics(name, parallelTime, seqTime, numWorkers);
        return result;
    };

    runScheduler(schedulerProlific, "Bounded Prolific");
    runScheduler(schedulerCollective, "Bounded Collective");
    runScheduler(schedulerSko, "Bounded Sko (Prolific & Collective)");
    runScheduler(schedulerSubreaper, "Bounded Subreaper Prolific");
    runScheduler(schedulerMmapSubreaper, "Mmap bounded Subreaper Collective");
    runScheduler(schedulerMmapSupreaper, "Mmap bounded supreaper collective with group barrier & affinity");

    return 0;
}
