From b03f34ffebed8ca74de3e5e7bfac462586c973b2 Mon Sep 17 00:00:00 2001 From: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Wed, 24 Apr 2024 04:19:32 +0300 Subject: [PATCH] implement TaskDispatcher to plan tasks like cron does --- config.json.example | 7 ++ src/main/java/com/mykola2312/mptv/Main.java | 8 ++ .../com/mykola2312/mptv/config/Config.java | 1 + .../com/mykola2312/mptv/config/TaskItem.java | 11 ++ .../com/mykola2312/mptv/crawler/Crawler.java | 19 +++- .../java/com/mykola2312/mptv/task/Task.java | 6 ++ .../mykola2312/mptv/task/TaskDispatcher.java | 101 ++++++++++++++++++ .../resources/db/migration/V001.01__init.sql | 3 +- 8 files changed, 152 insertions(+), 4 deletions(-) create mode 100644 src/main/java/com/mykola2312/mptv/config/TaskItem.java create mode 100644 src/main/java/com/mykola2312/mptv/task/Task.java create mode 100644 src/main/java/com/mykola2312/mptv/task/TaskDispatcher.java diff --git a/config.json.example b/config.json.example index 99688cc..eba00d4 100644 --- a/config.json.example +++ b/config.json.example @@ -25,5 +25,12 @@ "path": "test.m3u8", "rootCategory": "test" } + ], + + "tasks": [ + { + "name": "crawler", + "interval": 60 + } ] } \ No newline at end of file diff --git a/src/main/java/com/mykola2312/mptv/Main.java b/src/main/java/com/mykola2312/mptv/Main.java index 4c9fd6b..56cf352 100644 --- a/src/main/java/com/mykola2312/mptv/Main.java +++ b/src/main/java/com/mykola2312/mptv/Main.java @@ -3,6 +3,7 @@ package com.mykola2312.mptv; import com.mykola2312.mptv.config.Config; import com.mykola2312.mptv.crawler.Crawler; import com.mykola2312.mptv.db.DB; +import com.mykola2312.mptv.task.TaskDispatcher; import com.mykola2312.mptv.ui.MainFrame; import org.apache.commons.cli.*; import org.apache.log4j.Logger; @@ -73,6 +74,13 @@ public class Main { crawler.updateSources(config.sources); crawler.crawl(); + // task dispatcher + TaskDispatcher dispatcher = new TaskDispatcher(); + dispatcher.updateTaskConfig(config.tasks); + dispatcher.registerTask(crawler); + + new Thread(dispatcher).start(); + // initialize ui MainFrame frame = new MainFrame(); frame.create(config.frame); diff --git a/src/main/java/com/mykola2312/mptv/config/Config.java b/src/main/java/com/mykola2312/mptv/config/Config.java index c55b9f8..5b4746b 100644 --- a/src/main/java/com/mykola2312/mptv/config/Config.java +++ b/src/main/java/com/mykola2312/mptv/config/Config.java @@ -10,6 +10,7 @@ public class Config { public FrameConfig frame; public DBConfig db; public List sources; + public List tasks; public static Config loadConfig(String path) throws IOException { return new ObjectMapper().readerFor(Config.class).readValue(new File(path)); diff --git a/src/main/java/com/mykola2312/mptv/config/TaskItem.java b/src/main/java/com/mykola2312/mptv/config/TaskItem.java new file mode 100644 index 0000000..81f4e7d --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/config/TaskItem.java @@ -0,0 +1,11 @@ +package com.mykola2312.mptv.config; + +import org.checkerframework.checker.nullness.qual.*; + +public class TaskItem { + @NonNull + public String name; + + @NonNull + public int interval; // in seconds +} diff --git a/src/main/java/com/mykola2312/mptv/crawler/Crawler.java b/src/main/java/com/mykola2312/mptv/crawler/Crawler.java index d09f8ae..65e028a 100644 --- a/src/main/java/com/mykola2312/mptv/crawler/Crawler.java +++ b/src/main/java/com/mykola2312/mptv/crawler/Crawler.java @@ -28,8 +28,9 @@ import com.mykola2312.mptv.db.DB; import com.mykola2312.mptv.db.pojo.Source; import com.mykola2312.mptv.tables.records.ChannelRecord; import com.mykola2312.mptv.tables.records.SourceRecord; +import com.mykola2312.mptv.task.Task; -public class Crawler { +public class Crawler implements Task { private static final Logger logger = Logger.getLogger(Crawler.class); private Integer crawlId; @@ -38,7 +39,7 @@ public class Crawler { } public void updateSources(List sourceItems) { - ArrayList> sources = new ArrayList<>(); + ArrayList> sources = new ArrayList<>(sourceItems.size()); for (SourceItem item : sourceItems) { UpdatableRecord source = new UpdatableRecordImpl<>(SOURCE); source.set(SOURCE.TYPE, item.type.getSqlName()); @@ -111,7 +112,7 @@ public class Crawler { } // upsert all channels - ArrayList> channels = new ArrayList<>(); + ArrayList> channels = new ArrayList<>(items.size()); for (M3U item : items) { UpdatableRecord channel = new UpdatableRecordImpl<>(CHANNEL); Integer categoryId = item.groupTitle != null @@ -170,4 +171,16 @@ public class Crawler { } } } + + private static final String TASK_NAME = "crawler"; + + @Override + public String getTaskName() { + return TASK_NAME; + } + + @Override + public void dispatch() { + crawl(); + } } diff --git a/src/main/java/com/mykola2312/mptv/task/Task.java b/src/main/java/com/mykola2312/mptv/task/Task.java new file mode 100644 index 0000000..e0afa90 --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/task/Task.java @@ -0,0 +1,6 @@ +package com.mykola2312.mptv.task; + +public interface Task { + public String getTaskName(); + public void dispatch(); +} diff --git a/src/main/java/com/mykola2312/mptv/task/TaskDispatcher.java b/src/main/java/com/mykola2312/mptv/task/TaskDispatcher.java new file mode 100644 index 0000000..f2a12e2 --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/task/TaskDispatcher.java @@ -0,0 +1,101 @@ +package com.mykola2312.mptv.task; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import org.apache.log4j.Logger; +import org.jooq.*; +import org.jooq.exception.NoDataFoundException; +import org.jooq.impl.*; +import static com.mykola2312.mptv.tables.Task.*; + +import com.mykola2312.mptv.config.TaskItem; +import com.mykola2312.mptv.db.DB; +import com.mykola2312.mptv.tables.records.TaskRecord; + +public class TaskDispatcher implements Runnable { + private static final Logger logger = Logger.getLogger(TaskDispatcher.class); + + private final HashMap taskHandles = new HashMap<>(); + private boolean isRunning = false; + private Integer smallestInterval = null; + + public void updateTaskConfig(List items) { + ArrayList> tasks = new ArrayList<>(); + for (var item : items) { + UpdatableRecord task = new UpdatableRecordImpl<>(TASK); + task.set(TASK.NAME, item.name); + task.set(TASK.INTERVAL, item.interval); + + tasks.add(task); + } + + DSL.using(DB.CONFIG) + .batchMerge(tasks) + .execute(); + } + + public void registerTask(Task task) { + taskHandles.put(task.getTaskName(), task); + // find smallest interval + try { + smallestInterval = DSL.using(DB.CONFIG) + .select(TASK.INTERVAL) + .from(TASK) + .orderBy(TASK.INTERVAL.asc()) + .limit(1) + .fetchSingleInto(Integer.class); + } catch (NoDataFoundException e) { + logger.error("no tasks present in database!"); + } + } + + private void dispatchTask(Task task) { + logger.info(String.format("dispatching task %s", task.getTaskName())); + try { + task.dispatch(); + } catch (Exception e) { + logger.error(e); + } + + DSL.using(DB.CONFIG) + .update(TASK) + .set(TASK.LAST_TIME, Instant.now().toEpochMilli() / 1000L) + .where(TASK.NAME.eq(task.getTaskName())) + .execute(); + } + + @Override + public void run() { + // task dispatching loop + isRunning = true; + while (isRunning) { + try { + Thread.sleep(smallestInterval != null + ? smallestInterval * 1000 : 1000); + } catch (InterruptedException e) { + logger.info("interrupted. exiting"); + isRunning = false; + break; + } + + long thisTime = Instant.now().toEpochMilli() / 1000L; + List taskNames = DSL.using(DB.CONFIG) + .select(TASK.NAME) + .from(TASK) + .where(String.format("%d > task.last_time + task.interval", thisTime)) + .fetchInto(String.class); + for (String name: taskNames) { + Task task = taskHandles.get(name); + if (task == null) { + logger.error(String.format("task %s is not registered!", name)); + continue; + } + + dispatchTask(task); + } + } + } +} diff --git a/src/main/resources/db/migration/V001.01__init.sql b/src/main/resources/db/migration/V001.01__init.sql index 8022030..8191dbc 100644 --- a/src/main/resources/db/migration/V001.01__init.sql +++ b/src/main/resources/db/migration/V001.01__init.sql @@ -37,7 +37,8 @@ CREATE UNIQUE INDEX idx_channel_category_title ON channel(category,title); CREATE TABLE task ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, - last_time INTEGER NOT NULL + interval INTEGER NOT NULL, + last_time INTEGER NOT NULL DEFAULT 0 ); CREATE UNIQUE INDEX idx_task_name ON task(name); \ No newline at end of file