commit 01be3f95e260bc9485e54de1c739117bf3145bb0 Author: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Sat Feb 27 17:44:33 2021 +0200 27.02.2021: worker initial commit. diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..18a22bb --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 3.18) +project(worker) + +set(SOURCES + worker.cpp + scheduler.cpp +) + +set(HEADERS + worker.h + scheduler.h +) + +add_library(worker SHARED ${SOURCES} ${HEADERS}) +target_include_directories(worker PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} +) diff --git a/scheduler.cpp b/scheduler.cpp new file mode 100644 index 0000000..1b3f6f6 --- /dev/null +++ b/scheduler.cpp @@ -0,0 +1,134 @@ +#include "scheduler.h" + +Scheduler::Scheduler(CreateFunc func, unsigned uMaxWorkers) +{ + m_Create = func; + m_uMaxWorkers = uMaxWorkers; +} + +Scheduler::~Scheduler() +{ + for (auto& worker : m_Workers) + WorkerDestroy(worker.m_pWorker); +} + +void Scheduler::DoControl() +{ + if(GetState() == workstate::INIT) + Continue(0); + else Wait(WaitObject(SCHEDULER_DELAY)); +} + +void Scheduler::DoWork() +{ + for (auto it = m_Workers.begin(); it != m_Workers.end(); ++it) + { + Worker* wrk = it->m_pWorker; + printf("wrk %p state %d timer %d\n", wrk, + wrk->GetState(), it->m_iTimer); + + try { + if (wrk->GetState() == workstate::INIT) + wrk->SetState(workstate::INIT, true); + + if (wrk->IsRunning()) + { + if (!it->m_iTimer) + wrk->DoControl(); + } + else continue; + + if (wrk->GetState() == workstate::WAIT) + { + WaitObject& obj = wrk->GetWait(); + if (!it->m_iTimer) + { + switch(obj.m_Type) + { + case WaitObject::NoWait: + wrk->SetState(workstate::RUNNING, true); + break; + case WaitObject::WaitInterval: + it->m_iTimer = (int)obj.m_lInterval; + break; + case WaitObject::WaitFunction: + printf("WaitFunction\n"); + if (obj.m_Func(obj.m_pData)) + { + printf("OK WaitFunction\n"); + wrk->SetState(workstate::RUNNING, true); + it->m_iTimer = 0; + } + else it->m_iTimer = (int)obj.m_lInterval; + break; + } + } + else + { + it->m_iTimer -= SCHEDULER_DELAY; + if (it->m_iTimer <= 0) + { + if (obj.m_Type != WaitObject::WaitFunction) + wrk->SetState(workstate::RUNNING, true); + it->m_iTimer = 0; + } + } + } + + if (wrk->IsRunning() && wrk->GetState() != workstate::WAIT) + wrk->DoWork(); + } catch (const std::exception& exc) { + wrk->OnException(exc); + } + } + + for (auto it = m_Workers.begin(); it != m_Workers.end();) + { + Worker* wrk = it->m_pWorker; + if (!wrk->IsRunning()) + { + WorkerDestroy(wrk); + m_Workers.erase(it); + } + else ++it; + } + + if (GetStage() == 0) + { + while(m_Workers.size() < m_uMaxWorkers) + { + Worker* wrk = m_Create(GetData()); + if (wrk) + WorkerStart(wrk); + else + { + Continue(1); + break; + } + } + } + else if (GetStage() == 1 && m_Workers.empty()) + Finish(); +} + +void Scheduler::WorkerStart(Worker* wrk) +{ + printf("WorkerStart %p\n", wrk); + wrk->EnableThink(false); + + schedworker_t sched = { wrk, 0 }; + m_Workers.push_back(sched); +} + +void Scheduler::WorkerUpdate(Worker* wrk) +{ + printf("WorkerUpdate %p\n", wrk); + wrk->Update(); + +} + +void Scheduler::WorkerDestroy(Worker* wrk) +{ + printf("WorkerDestroy %p %s\n", wrk, wrk->GetExitMessage().c_str()); + delete wrk; +} diff --git a/scheduler.h b/scheduler.h new file mode 100644 index 0000000..18c3ec8 --- /dev/null +++ b/scheduler.h @@ -0,0 +1,36 @@ +#ifndef __SCHEDULER_H +#define __SCHEDULER_H + +#include +#include +#include +#include "worker.h" + +#define SCHEDULER_DELAY 100 + +typedef struct { + Worker* m_pWorker; + int m_iTimer; +} schedworker_t; + +typedef std::function CreateFunc; + +class Scheduler : public ThreadWorker +{ +public: + Scheduler(CreateFunc func, unsigned uMaxWorkers); + ~Scheduler(); + + void DoControl() override; + void DoWork() override; +protected: + void WorkerStart(Worker* wrk); + void WorkerUpdate(Worker* wrk); + void WorkerDestroy(Worker* wrk); +private: + CreateFunc m_Create; + unsigned m_uMaxWorkers; + std::vector m_Workers; +}; + +#endif diff --git a/worker.cpp b/worker.cpp new file mode 100644 index 0000000..219faad --- /dev/null +++ b/worker.cpp @@ -0,0 +1,272 @@ +#include "worker.h" + +/* WaitObject */ + +WaitObject::WaitObject() + : m_Type(WaitObject::NoWait) +{ + m_lInterval = 0; + m_Func = NULL; + m_pData = NULL; +} + +WaitObject::WaitObject(long lInterval) + : m_Type(WaitObject::WaitInterval) +{ + m_lInterval = lInterval > 0 ? lInterval : 0; + m_Func = NULL; + m_pData = NULL; +} + +WaitObject::WaitObject(WaitFunc func, void* data, long delay) + : m_Type(WaitObject::WaitFunction) +{ + m_lInterval = delay > 0 ? delay : 0; + m_Func = func; + m_pData = data; +} + +const WaitObject NoWait = WaitObject(); +const WaitObject DefWait = WaitObject(100); + +/* Worker */ + +Worker::Worker() +{ + m_pData = NULL; + m_bThink = false; + m_iStage = 0; + m_State = workstate::INIT; + m_bRunning = false; +} + +Worker::~Worker() +{ +} + +void Worker::SetName(const std::wstring& name) +{ + m_Name = name; +} + +std::wstring Worker::GetName() const +{ + return m_Name; +} + +void Worker::SetData(void* data) +{ + m_pData = data; +} + +void* Worker::GetData() const +{ + return m_pData; +} + +void Worker::SetControlFunction(WorkerFunc func) +{ + m_Control = func; +} + +void Worker::SetWorkerFunction(WorkerFunc func) +{ + m_Work = func; +} + +void Worker::SetUpdateFunction(WorkerFunc func) +{ + m_Update = func; +} + +void Worker::SetState(workstate state, bool bRunning) +{ + if (m_State == workstate::WAIT && state != m_State) + m_Wait = NoWait; + + m_State = state; + m_bRunning = bRunning; +} + +workstate Worker::GetState() const +{ + return m_State; +} + +bool Worker::IsRunning() const +{ + return m_bRunning; +} + +std::string& Worker::GetExitMessage() +{ + return m_Msg; +} + +WaitObject& Worker::GetWait() +{ + return m_Wait; +} + +void Worker::SetStage(int stage) +{ + m_iStage = stage; +} + +int Worker::GetStage() const +{ + return m_iStage; +} + +int Worker::GetProgress() const +{ + return -1; +} + +void Worker::EnableThink(bool bThink) +{ + m_bThink = bThink; +} + +void Worker::Start() +{ + SetState(workstate::INIT, true); + + Update(); +} + +void Worker::Fail(const std::string& msg) +{ + SetState(workstate::FAIL, false); + m_Msg = msg; +} + +void Worker::Wait(const WaitObject& obj) +{ + SetState(workstate::WAIT, true); + m_Wait = obj; +} + +void Worker::Continue(int nextStage) +{ + SetState(workstate::RUNNING, true); + if (nextStage != -1) + SetStage(nextStage); +} + +void Worker::Finish() +{ + SetState(workstate::FINISH, false); +} + +void Worker::OnException(const std::exception& exc) +{ + fprintf(stderr, "%s\n", exc.what()); + Fail(exc.what()); +} + +void Worker::DoControl() +{ + if (m_Control) + m_Control(this); +} + +void Worker::DoWork() +{ + if (m_Work) + m_Work(this); +} + +void Worker::Update() +{ + try { + if (m_Update) + m_Update(this); + + DoControl(); + if (m_bThink) + Think(); + + if (IsRunning() && GetState() != workstate::WAIT) + DoWork(); + } catch (const std::exception& exc) { + OnException(exc); + } +} + +void Worker::Think() +{ + switch(GetState()) + { + case workstate::WAIT: DoWait(); break; + default: return; + } +} + +void Worker::DoWait() +{ + WaitObject& obj = GetWait(); + switch(obj.m_Type) + { + case WaitObject::NoWait: + SetState(workstate::RUNNING, true); + break; + case WaitObject::WaitInterval: + Sleep(obj.m_lInterval); + SetState(workstate::RUNNING, true); + break; + case WaitObject::WaitFunction: + if (obj.m_Func(obj.m_pData)) + SetState(workstate::RUNNING, true); + else Sleep(obj.m_lInterval); + break; + } +} + +void Worker::Run() +{ + do { + Update(); + } while (IsRunning()); +} + +/* ThreadWorker */ + +ThreadWorker::ThreadWorker() +{ + m_hThread = INVALID_HANDLE_VALUE; + m_ulThreadId = 0; +} + +ThreadWorker::~ThreadWorker() +{ +} + +unsigned long ThreadWorker::GetThreadId() const +{ + return m_ulThreadId; +} + +DWORD WINAPI ThreadWorker::_ThreadWorker(LPVOID lpArg) +{ + ThreadWorker* wrk = (ThreadWorker*)lpArg; + + wrk->SetState(workstate::INIT, false); + wrk->EnableThink(true); + wrk->Update(); + + wrk->Run(); + return !(wrk->GetState() == workstate::FINISH); +} + +void ThreadWorker::Start() +{ + DWORD dwTid; + m_hThread = CreateThread(NULL, 0, _ThreadWorker, this, 0, &dwTid); + m_ulThreadId = dwTid; +} + +void ThreadWorker::JoinThread() +{ + WaitForSingleObject(m_hThread, INFINITE); +} diff --git a/worker.h b/worker.h new file mode 100644 index 0000000..28a5b9e --- /dev/null +++ b/worker.h @@ -0,0 +1,146 @@ +#ifndef __WORKER_H +#define __WORKER_H + +#include +#include +#include +#include + +/* WaitObject */ + +typedef std::function WaitFunc; + +class WaitObject +{ +public: + enum : int { NoWait, WaitInterval, WaitFunction }; + + WaitObject(); + WaitObject(long lInterval); + WaitObject(WaitFunc func, void* data, long delay); + + int m_Type; + long m_lInterval; + WaitFunc m_Func; + void* m_pData; +}; + +extern const WaitObject NoWait; +extern const WaitObject DefWait; + +/* Worker */ + +class Worker; + +/*typedef enum class { + STATE_FAIL = -1, + STATE_INIT, + STATE_WAIT, + STATE_RUNNING, + STATE_FINISH, +} workstate;*/ +/*enum class workstate { + STATE_FAIL = -1, + STATE_INIT, + STATE_WAIT, + STATE_RUNNING, + STATE_FINISH +};*/ + +enum class workstate { + FAIL = -1, + INIT, + WAIT, + RUNNING, + FINISH +}; + +typedef std::function WorkerFunc; + +class Worker +{ +public: + Worker(); + virtual ~Worker(); + + virtual void SetName(const std::wstring& name); + virtual std::wstring GetName() const; + + virtual void SetData(void* pData); + virtual void* GetData() const; + + virtual void SetControlFunction(WorkerFunc func); + virtual void SetWorkerFunction(WorkerFunc func); + virtual void SetUpdateFunction(WorkerFunc func); + + virtual void SetState(workstate state, bool bRunning); + + virtual workstate GetState() const; + virtual bool IsRunning() const; + + virtual std::string& GetExitMessage(); + virtual WaitObject& GetWait(); + + virtual void SetStage(int stage); + virtual int GetStage() const; + + virtual int GetProgress() const; + + virtual void EnableThink(bool bThink); + virtual void Start(); + + virtual void Fail(const std::string& msg); + virtual void Wait(const WaitObject& obj); + virtual void Continue(int nextStage = -1); + + virtual void Finish(); + + virtual void OnException(const std::exception& exc); + + virtual void DoControl(); + virtual void DoWork(); +protected: + virtual void Think(); + void DoWait(); +public: + virtual void Update(); + virtual void Run(); +private: + std::wstring m_Name; + + void* m_pData; + + WorkerFunc m_Control; + WorkerFunc m_Work; + WorkerFunc m_Update; + + workstate m_State; + bool m_bRunning; + int m_iStage; + std::string m_Msg; + + WaitObject m_Wait; + + bool m_bThink; +}; + +/* ThreadWorker */ + +class ThreadWorker : public Worker +{ +public: + ThreadWorker(); + virtual ~ThreadWorker(); + + virtual unsigned long GetThreadId() const; + + virtual void Start(); + void JoinThread(); +private: + static DWORD WINAPI _ThreadWorker(LPVOID); + + unsigned long m_ulThreadId; + HANDLE m_hThread; +}; + +#endif