27.02.2021: worker initial commit.

This commit is contained in:
mykola2312 2021-02-27 17:44:33 +02:00
commit 01be3f95e2
5 changed files with 605 additions and 0 deletions

17
CMakeLists.txt Normal file
View file

@ -0,0 +1,17 @@
cmake_minimum_required(VERSION 3.18)
project(worker)
set(SOURCES
worker.cpp
scheduler.cpp
)
set(HEADERS
worker.h
scheduler.h
)
add_library(worker SHARED ${SOURCES} ${HEADERS})
target_include_directories(worker PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
)

134
scheduler.cpp Normal file
View file

@ -0,0 +1,134 @@
#include "scheduler.h"
Scheduler::Scheduler(CreateFunc func, unsigned uMaxWorkers)
{
m_Create = func;
m_uMaxWorkers = uMaxWorkers;
}
Scheduler::~Scheduler()
{
for (auto& worker : m_Workers)
WorkerDestroy(worker.m_pWorker);
}
void Scheduler::DoControl()
{
if(GetState() == workstate::INIT)
Continue(0);
else Wait(WaitObject(SCHEDULER_DELAY));
}
void Scheduler::DoWork()
{
for (auto it = m_Workers.begin(); it != m_Workers.end(); ++it)
{
Worker* wrk = it->m_pWorker;
printf("wrk %p state %d timer %d\n", wrk,
wrk->GetState(), it->m_iTimer);
try {
if (wrk->GetState() == workstate::INIT)
wrk->SetState(workstate::INIT, true);
if (wrk->IsRunning())
{
if (!it->m_iTimer)
wrk->DoControl();
}
else continue;
if (wrk->GetState() == workstate::WAIT)
{
WaitObject& obj = wrk->GetWait();
if (!it->m_iTimer)
{
switch(obj.m_Type)
{
case WaitObject::NoWait:
wrk->SetState(workstate::RUNNING, true);
break;
case WaitObject::WaitInterval:
it->m_iTimer = (int)obj.m_lInterval;
break;
case WaitObject::WaitFunction:
printf("WaitFunction\n");
if (obj.m_Func(obj.m_pData))
{
printf("OK WaitFunction\n");
wrk->SetState(workstate::RUNNING, true);
it->m_iTimer = 0;
}
else it->m_iTimer = (int)obj.m_lInterval;
break;
}
}
else
{
it->m_iTimer -= SCHEDULER_DELAY;
if (it->m_iTimer <= 0)
{
if (obj.m_Type != WaitObject::WaitFunction)
wrk->SetState(workstate::RUNNING, true);
it->m_iTimer = 0;
}
}
}
if (wrk->IsRunning() && wrk->GetState() != workstate::WAIT)
wrk->DoWork();
} catch (const std::exception& exc) {
wrk->OnException(exc);
}
}
for (auto it = m_Workers.begin(); it != m_Workers.end();)
{
Worker* wrk = it->m_pWorker;
if (!wrk->IsRunning())
{
WorkerDestroy(wrk);
m_Workers.erase(it);
}
else ++it;
}
if (GetStage() == 0)
{
while(m_Workers.size() < m_uMaxWorkers)
{
Worker* wrk = m_Create(GetData());
if (wrk)
WorkerStart(wrk);
else
{
Continue(1);
break;
}
}
}
else if (GetStage() == 1 && m_Workers.empty())
Finish();
}
void Scheduler::WorkerStart(Worker* wrk)
{
printf("WorkerStart %p\n", wrk);
wrk->EnableThink(false);
schedworker_t sched = { wrk, 0 };
m_Workers.push_back(sched);
}
void Scheduler::WorkerUpdate(Worker* wrk)
{
printf("WorkerUpdate %p\n", wrk);
wrk->Update();
}
void Scheduler::WorkerDestroy(Worker* wrk)
{
printf("WorkerDestroy %p %s\n", wrk, wrk->GetExitMessage().c_str());
delete wrk;
}

36
scheduler.h Normal file
View file

@ -0,0 +1,36 @@
#ifndef __SCHEDULER_H
#define __SCHEDULER_H
#include <Windows.h>
#include <functional>
#include <vector>
#include "worker.h"
#define SCHEDULER_DELAY 100
typedef struct {
Worker* m_pWorker;
int m_iTimer;
} schedworker_t;
typedef std::function<Worker*(void*)> CreateFunc;
class Scheduler : public ThreadWorker
{
public:
Scheduler(CreateFunc func, unsigned uMaxWorkers);
~Scheduler();
void DoControl() override;
void DoWork() override;
protected:
void WorkerStart(Worker* wrk);
void WorkerUpdate(Worker* wrk);
void WorkerDestroy(Worker* wrk);
private:
CreateFunc m_Create;
unsigned m_uMaxWorkers;
std::vector<schedworker_t> m_Workers;
};
#endif

272
worker.cpp Normal file
View file

@ -0,0 +1,272 @@
#include "worker.h"
/* WaitObject */
WaitObject::WaitObject()
: m_Type(WaitObject::NoWait)
{
m_lInterval = 0;
m_Func = NULL;
m_pData = NULL;
}
WaitObject::WaitObject(long lInterval)
: m_Type(WaitObject::WaitInterval)
{
m_lInterval = lInterval > 0 ? lInterval : 0;
m_Func = NULL;
m_pData = NULL;
}
WaitObject::WaitObject(WaitFunc func, void* data, long delay)
: m_Type(WaitObject::WaitFunction)
{
m_lInterval = delay > 0 ? delay : 0;
m_Func = func;
m_pData = data;
}
const WaitObject NoWait = WaitObject();
const WaitObject DefWait = WaitObject(100);
/* Worker */
Worker::Worker()
{
m_pData = NULL;
m_bThink = false;
m_iStage = 0;
m_State = workstate::INIT;
m_bRunning = false;
}
Worker::~Worker()
{
}
void Worker::SetName(const std::wstring& name)
{
m_Name = name;
}
std::wstring Worker::GetName() const
{
return m_Name;
}
void Worker::SetData(void* data)
{
m_pData = data;
}
void* Worker::GetData() const
{
return m_pData;
}
void Worker::SetControlFunction(WorkerFunc func)
{
m_Control = func;
}
void Worker::SetWorkerFunction(WorkerFunc func)
{
m_Work = func;
}
void Worker::SetUpdateFunction(WorkerFunc func)
{
m_Update = func;
}
void Worker::SetState(workstate state, bool bRunning)
{
if (m_State == workstate::WAIT && state != m_State)
m_Wait = NoWait;
m_State = state;
m_bRunning = bRunning;
}
workstate Worker::GetState() const
{
return m_State;
}
bool Worker::IsRunning() const
{
return m_bRunning;
}
std::string& Worker::GetExitMessage()
{
return m_Msg;
}
WaitObject& Worker::GetWait()
{
return m_Wait;
}
void Worker::SetStage(int stage)
{
m_iStage = stage;
}
int Worker::GetStage() const
{
return m_iStage;
}
int Worker::GetProgress() const
{
return -1;
}
void Worker::EnableThink(bool bThink)
{
m_bThink = bThink;
}
void Worker::Start()
{
SetState(workstate::INIT, true);
Update();
}
void Worker::Fail(const std::string& msg)
{
SetState(workstate::FAIL, false);
m_Msg = msg;
}
void Worker::Wait(const WaitObject& obj)
{
SetState(workstate::WAIT, true);
m_Wait = obj;
}
void Worker::Continue(int nextStage)
{
SetState(workstate::RUNNING, true);
if (nextStage != -1)
SetStage(nextStage);
}
void Worker::Finish()
{
SetState(workstate::FINISH, false);
}
void Worker::OnException(const std::exception& exc)
{
fprintf(stderr, "%s\n", exc.what());
Fail(exc.what());
}
void Worker::DoControl()
{
if (m_Control)
m_Control(this);
}
void Worker::DoWork()
{
if (m_Work)
m_Work(this);
}
void Worker::Update()
{
try {
if (m_Update)
m_Update(this);
DoControl();
if (m_bThink)
Think();
if (IsRunning() && GetState() != workstate::WAIT)
DoWork();
} catch (const std::exception& exc) {
OnException(exc);
}
}
void Worker::Think()
{
switch(GetState())
{
case workstate::WAIT: DoWait(); break;
default: return;
}
}
void Worker::DoWait()
{
WaitObject& obj = GetWait();
switch(obj.m_Type)
{
case WaitObject::NoWait:
SetState(workstate::RUNNING, true);
break;
case WaitObject::WaitInterval:
Sleep(obj.m_lInterval);
SetState(workstate::RUNNING, true);
break;
case WaitObject::WaitFunction:
if (obj.m_Func(obj.m_pData))
SetState(workstate::RUNNING, true);
else Sleep(obj.m_lInterval);
break;
}
}
void Worker::Run()
{
do {
Update();
} while (IsRunning());
}
/* ThreadWorker */
ThreadWorker::ThreadWorker()
{
m_hThread = INVALID_HANDLE_VALUE;
m_ulThreadId = 0;
}
ThreadWorker::~ThreadWorker()
{
}
unsigned long ThreadWorker::GetThreadId() const
{
return m_ulThreadId;
}
DWORD WINAPI ThreadWorker::_ThreadWorker(LPVOID lpArg)
{
ThreadWorker* wrk = (ThreadWorker*)lpArg;
wrk->SetState(workstate::INIT, false);
wrk->EnableThink(true);
wrk->Update();
wrk->Run();
return !(wrk->GetState() == workstate::FINISH);
}
void ThreadWorker::Start()
{
DWORD dwTid;
m_hThread = CreateThread(NULL, 0, _ThreadWorker, this, 0, &dwTid);
m_ulThreadId = dwTid;
}
void ThreadWorker::JoinThread()
{
WaitForSingleObject(m_hThread, INFINITE);
}

146
worker.h Normal file
View file

@ -0,0 +1,146 @@
#ifndef __WORKER_H
#define __WORKER_H
#include <Windows.h>
#include <functional>
#include <exception>
#include <string>
/* WaitObject */
typedef std::function<bool(void*)> WaitFunc;
class WaitObject
{
public:
enum : int { NoWait, WaitInterval, WaitFunction };
WaitObject();
WaitObject(long lInterval);
WaitObject(WaitFunc func, void* data, long delay);
int m_Type;
long m_lInterval;
WaitFunc m_Func;
void* m_pData;
};
extern const WaitObject NoWait;
extern const WaitObject DefWait;
/* Worker */
class Worker;
/*typedef enum class {
STATE_FAIL = -1,
STATE_INIT,
STATE_WAIT,
STATE_RUNNING,
STATE_FINISH,
} workstate;*/
/*enum class workstate {
STATE_FAIL = -1,
STATE_INIT,
STATE_WAIT,
STATE_RUNNING,
STATE_FINISH
};*/
enum class workstate {
FAIL = -1,
INIT,
WAIT,
RUNNING,
FINISH
};
typedef std::function<void(Worker*)> WorkerFunc;
class Worker
{
public:
Worker();
virtual ~Worker();
virtual void SetName(const std::wstring& name);
virtual std::wstring GetName() const;
virtual void SetData(void* pData);
virtual void* GetData() const;
virtual void SetControlFunction(WorkerFunc func);
virtual void SetWorkerFunction(WorkerFunc func);
virtual void SetUpdateFunction(WorkerFunc func);
virtual void SetState(workstate state, bool bRunning);
virtual workstate GetState() const;
virtual bool IsRunning() const;
virtual std::string& GetExitMessage();
virtual WaitObject& GetWait();
virtual void SetStage(int stage);
virtual int GetStage() const;
virtual int GetProgress() const;
virtual void EnableThink(bool bThink);
virtual void Start();
virtual void Fail(const std::string& msg);
virtual void Wait(const WaitObject& obj);
virtual void Continue(int nextStage = -1);
virtual void Finish();
virtual void OnException(const std::exception& exc);
virtual void DoControl();
virtual void DoWork();
protected:
virtual void Think();
void DoWait();
public:
virtual void Update();
virtual void Run();
private:
std::wstring m_Name;
void* m_pData;
WorkerFunc m_Control;
WorkerFunc m_Work;
WorkerFunc m_Update;
workstate m_State;
bool m_bRunning;
int m_iStage;
std::string m_Msg;
WaitObject m_Wait;
bool m_bThink;
};
/* ThreadWorker */
class ThreadWorker : public Worker
{
public:
ThreadWorker();
virtual ~ThreadWorker();
virtual unsigned long GetThreadId() const;
virtual void Start();
void JoinThread();
private:
static DWORD WINAPI _ThreadWorker(LPVOID);
unsigned long m_ulThreadId;
HANDLE m_hThread;
};
#endif