implement TaskDispatcher to plan tasks like cron does

This commit is contained in:
mykola2312 2024-04-24 04:19:32 +03:00
parent 8b3005bac9
commit b03f34ffeb
8 changed files with 152 additions and 4 deletions

View file

@ -25,5 +25,12 @@
"path": "test.m3u8",
"rootCategory": "test"
}
],
"tasks": [
{
"name": "crawler",
"interval": 60
}
]
}

View file

@ -3,6 +3,7 @@ package com.mykola2312.mptv;
import com.mykola2312.mptv.config.Config;
import com.mykola2312.mptv.crawler.Crawler;
import com.mykola2312.mptv.db.DB;
import com.mykola2312.mptv.task.TaskDispatcher;
import com.mykola2312.mptv.ui.MainFrame;
import org.apache.commons.cli.*;
import org.apache.log4j.Logger;
@ -73,6 +74,13 @@ public class Main {
crawler.updateSources(config.sources);
crawler.crawl();
// task dispatcher
TaskDispatcher dispatcher = new TaskDispatcher();
dispatcher.updateTaskConfig(config.tasks);
dispatcher.registerTask(crawler);
new Thread(dispatcher).start();
// initialize ui
MainFrame frame = new MainFrame();
frame.create(config.frame);

View file

@ -10,6 +10,7 @@ public class Config {
public FrameConfig frame;
public DBConfig db;
public List<SourceItem> sources;
public List<TaskItem> tasks;
public static Config loadConfig(String path) throws IOException {
return new ObjectMapper().readerFor(Config.class).readValue(new File(path));

View file

@ -0,0 +1,11 @@
package com.mykola2312.mptv.config;
import org.checkerframework.checker.nullness.qual.*;
public class TaskItem {
@NonNull
public String name;
@NonNull
public int interval; // in seconds
}

View file

@ -28,8 +28,9 @@ import com.mykola2312.mptv.db.DB;
import com.mykola2312.mptv.db.pojo.Source;
import com.mykola2312.mptv.tables.records.ChannelRecord;
import com.mykola2312.mptv.tables.records.SourceRecord;
import com.mykola2312.mptv.task.Task;
public class Crawler {
public class Crawler implements Task {
private static final Logger logger = Logger.getLogger(Crawler.class);
private Integer crawlId;
@ -38,7 +39,7 @@ public class Crawler {
}
public void updateSources(List<SourceItem> sourceItems) {
ArrayList<UpdatableRecord<SourceRecord>> sources = new ArrayList<>();
ArrayList<UpdatableRecord<SourceRecord>> sources = new ArrayList<>(sourceItems.size());
for (SourceItem item : sourceItems) {
UpdatableRecord<SourceRecord> source = new UpdatableRecordImpl<>(SOURCE);
source.set(SOURCE.TYPE, item.type.getSqlName());
@ -111,7 +112,7 @@ public class Crawler {
}
// upsert all channels
ArrayList<UpdatableRecord<ChannelRecord>> channels = new ArrayList<>();
ArrayList<UpdatableRecord<ChannelRecord>> channels = new ArrayList<>(items.size());
for (M3U item : items) {
UpdatableRecord<ChannelRecord> channel = new UpdatableRecordImpl<>(CHANNEL);
Integer categoryId = item.groupTitle != null
@ -170,4 +171,16 @@ public class Crawler {
}
}
}
private static final String TASK_NAME = "crawler";
@Override
public String getTaskName() {
return TASK_NAME;
}
@Override
public void dispatch() {
crawl();
}
}

View file

@ -0,0 +1,6 @@
package com.mykola2312.mptv.task;
public interface Task {
public String getTaskName();
public void dispatch();
}

View file

@ -0,0 +1,101 @@
package com.mykola2312.mptv.task;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.log4j.Logger;
import org.jooq.*;
import org.jooq.exception.NoDataFoundException;
import org.jooq.impl.*;
import static com.mykola2312.mptv.tables.Task.*;
import com.mykola2312.mptv.config.TaskItem;
import com.mykola2312.mptv.db.DB;
import com.mykola2312.mptv.tables.records.TaskRecord;
public class TaskDispatcher implements Runnable {
private static final Logger logger = Logger.getLogger(TaskDispatcher.class);
private final HashMap<String, Task> taskHandles = new HashMap<>();
private boolean isRunning = false;
private Integer smallestInterval = null;
public void updateTaskConfig(List<TaskItem> items) {
ArrayList<UpdatableRecord<TaskRecord>> tasks = new ArrayList<>();
for (var item : items) {
UpdatableRecord<TaskRecord> task = new UpdatableRecordImpl<>(TASK);
task.set(TASK.NAME, item.name);
task.set(TASK.INTERVAL, item.interval);
tasks.add(task);
}
DSL.using(DB.CONFIG)
.batchMerge(tasks)
.execute();
}
public void registerTask(Task task) {
taskHandles.put(task.getTaskName(), task);
// find smallest interval
try {
smallestInterval = DSL.using(DB.CONFIG)
.select(TASK.INTERVAL)
.from(TASK)
.orderBy(TASK.INTERVAL.asc())
.limit(1)
.fetchSingleInto(Integer.class);
} catch (NoDataFoundException e) {
logger.error("no tasks present in database!");
}
}
private void dispatchTask(Task task) {
logger.info(String.format("dispatching task %s", task.getTaskName()));
try {
task.dispatch();
} catch (Exception e) {
logger.error(e);
}
DSL.using(DB.CONFIG)
.update(TASK)
.set(TASK.LAST_TIME, Instant.now().toEpochMilli() / 1000L)
.where(TASK.NAME.eq(task.getTaskName()))
.execute();
}
@Override
public void run() {
// task dispatching loop
isRunning = true;
while (isRunning) {
try {
Thread.sleep(smallestInterval != null
? smallestInterval * 1000 : 1000);
} catch (InterruptedException e) {
logger.info("interrupted. exiting");
isRunning = false;
break;
}
long thisTime = Instant.now().toEpochMilli() / 1000L;
List<String> taskNames = DSL.using(DB.CONFIG)
.select(TASK.NAME)
.from(TASK)
.where(String.format("%d > task.last_time + task.interval", thisTime))
.fetchInto(String.class);
for (String name: taskNames) {
Task task = taskHandles.get(name);
if (task == null) {
logger.error(String.format("task %s is not registered!", name));
continue;
}
dispatchTask(task);
}
}
}
}

View file

@ -37,7 +37,8 @@ CREATE UNIQUE INDEX idx_channel_category_title ON channel(category,title);
CREATE TABLE task (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
last_time INTEGER NOT NULL
interval INTEGER NOT NULL,
last_time INTEGER NOT NULL DEFAULT 0
);
CREATE UNIQUE INDEX idx_task_name ON task(name);