#include "one_to_one_pipe_scheduler.hpp"

#include <iostream>
#include <vector>
#include <algorithm>
#include <cerrno>
#include <cstring>

#include <unistd.h>
#include <sys/wait.h>
#include <sched.h>

using namespace std;

static bool bind_process_to_cpu_pipe(int cpuId)
{
    cpu_set_t cpuset;

    CPU_ZERO(&cpuset);
    CPU_SET(cpuId, &cpuset);

    int rc = sched_setaffinity(
        0,
        sizeof(cpu_set_t),
        &cpuset
    );

    return rc == 0;
}

int run_one_to_one_pipe_scheduler(
    int totalTasks,
    int workerCount,
    scheduler_task_fn taskFn,
    void* context,
    bool enableAffinity
)
{
    if (totalTasks <= 0)
        return 0;

    if (workerCount <= 0)
        workerCount = 1;

    if (workerCount > totalTasks)
        workerCount = totalTasks;

    if (taskFn == nullptr)
    {
        cerr << "[One-to-One Pipe Scheduler] taskFn is null." << endl;
        return -1;
    }

    long numCPUs = sysconf(_SC_NPROCESSORS_ONLN);

    if (numCPUs <= 0)
        numCPUs = 1;

    vector<pid_t> children;
    children.reserve(static_cast<size_t>(workerCount));

    int pipes[workerCount][2];

    for (int i = 0; i < workerCount; i++)
    {
        if (pipe(pipes[i]) == -1)
        {
            perror("[One-to-One Pipe Scheduler] pipe failed");
            return -1;
        }
    }

    for (int workerId = 0; workerId < workerCount; workerId++)
    {
        pid_t pid = fork();

        if (pid < 0)
        {
            perror("[One-to-One Pipe Scheduler] fork failed");

            int stop = -1;

            for (int i = 0; i < workerCount; i++)
            {
                //(void)write(pipes[i][1], &stop, sizeof(int));
                ssize_t written = write(pipes[i][1], &stop, sizeof(int));
                if (written != sizeof(int))
                {
                    perror("[One-to-One Pipe Scheduler] write stop failed");
                }
                (void)close(pipes[i][0]);
                (void)close(pipes[i][1]);
            }

            int status = 0;

            while (waitpid(-1, &status, 0) > 0)
            {
            }

            return -1;
        }

        if (pid == 0)
        {
            close(pipes[workerId][1]);

            for (int j = 0; j < workerCount; j++)
            {
                if (j != workerId)
                {
                    close(pipes[j][0]);
                    close(pipes[j][1]);
                }
            }

            if (enableAffinity)
            {
                int cpuId = workerId % static_cast<int>(numCPUs);
                bind_process_to_cpu_pipe(cpuId);
            }

            while (true)
            {
                int taskId = -1;

                ssize_t n = read(
                    pipes[workerId][0],
                    &taskId,
                    sizeof(int)
                );

                if (n <= 0)
                    break;

                if (taskId == -1)
                    break;

                taskFn(taskId, workerId, workerCount, context);
            }

            (void)close(pipes[workerId][0]);
            _exit(0);
        }
        else
        {
            children.push_back(pid);
        }
    }

    for (int i = 0; i < workerCount; i++)
    {
        (void)close(pipes[i][0]);
    }

    for (int taskId = 0; taskId < totalTasks; taskId++)
    {
        int worker = taskId % workerCount;

        ssize_t written = write(
            pipes[worker][1],
            &taskId,
            sizeof(int)
        );

        if (written != sizeof(int))
        {
            perror("[One-to-One Pipe Scheduler] write failed");
        }
    }

    int stop = -1;

    for (int i = 0; i < workerCount; i++)
    {
        //(void)write(pipes[i][1], &stop, sizeof(int));
        //(void)close(pipes[i][1]);
        ssize_t written = write(pipes[i][1], &stop, sizeof(int));
        if (written != sizeof(int))
        {
            perror("[One-to-One Pipe Scheduler] write stop failed");
        }

(void)close(pipes[i][1]);
    }

    while (!children.empty())
    {
        int status = 0;

        pid_t finished = waitpid(-1, &status, 0);

        if (finished < 0)
        {
            if (errno == ECHILD)
                break;

            perror("[One-to-One Pipe Scheduler] waitpid failed");
            return -1;
        }

        auto it = find(children.begin(), children.end(), finished);

        if (it != children.end())
            children.erase(it);
    }

    return 0;
}