diff --git a/src/main/java/com/mykola2312/mptv/Main.java b/src/main/java/com/mykola2312/mptv/Main.java index 0218478..5a749b2 100644 --- a/src/main/java/com/mykola2312/mptv/Main.java +++ b/src/main/java/com/mykola2312/mptv/Main.java @@ -73,14 +73,14 @@ public class Main { flyway.migrate(); // load sources, start crawlers - Crawler crawler = new Crawler(); - crawler.updateSources(config.sources); - crawler.crawl(); + // Crawler crawler = new Crawler(); + // crawler.updateSources(config.sources); + // crawler.crawl(); // task dispatcher TaskDispatcher dispatcher = new TaskDispatcher(); dispatcher.updateTaskConfig(config.tasks); - dispatcher.registerTask(crawler); + //dispatcher.registerTask(crawler); // TODO: enable ProcessService processService = new ProcessService(); dispatcher.registerTask(processService); @@ -93,9 +93,13 @@ public class Main { try { MPV mpv = new MPV("test.mp4"); - logger.info("spawned mpv"); + if (mpv.spawn()) { + logger.info("spawned mpv"); - processService.registerProcess(mpv); + processService.registerProcess(mpv); + } else { + logger.error("failed to spawn mpv"); + } } catch (IOException e) { logger.error("failed to start mpv", e); } diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPV.java b/src/main/java/com/mykola2312/mptv/mpv/MPV.java index cdb5faf..8764ce9 100644 --- a/src/main/java/com/mykola2312/mptv/mpv/MPV.java +++ b/src/main/java/com/mykola2312/mptv/mpv/MPV.java @@ -1,23 +1,40 @@ package com.mykola2312.mptv.mpv; import java.io.IOException; +import java.nio.file.Path; import com.mykola2312.mptv.task.TaskProcess; public class MPV implements TaskProcess { private final String url; private Process process; + private MPVSocket socket = null; public MPV(String url) throws IOException { this.url = url; - spawn(); } + private static final Path MPV_SOCKET_PATH = Path.of("/tmp/mptv-mpv.sock"); + @Override - public void spawn() throws IOException { + public boolean spawn() throws IOException { + // to prevent possible file descriptor leaks + if (socket != null) { + socket.close(); + socket = null; + } + process = Runtime.getRuntime().exec(new String[] { - "mpv", url + "mpv", url, "--input-ipc-server=" + MPV_SOCKET_PATH }); + + socket = new MPVSocket(); + socket.waitForConnection(MPV_SOCKET_PATH); + + // TODO: remove test code + socket.writeCommandRaw(new MPVCommandRaw("set_property", "volume", "0")); + + return isAlive(); } @Override @@ -27,6 +44,9 @@ public class MPV implements TaskProcess { @Override public void stop() { + socket.close(); + socket = null; + process.destroyForcibly(); } } diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPVCommandRaw.java b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandRaw.java new file mode 100644 index 0000000..489ca9e --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandRaw.java @@ -0,0 +1,24 @@ +package com.mykola2312.mptv.mpv; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MPVCommandRaw { + public ArrayList command; + + public MPVCommandRaw(String name, String... args) { + command = new ArrayList<>(); + command.add(name); + command.addAll(Arrays.asList(args)); + } + + public byte[] serialize() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + String jsonCommand = mapper.writeValueAsString(this); + return jsonCommand.getBytes(StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPVSocket.java b/src/main/java/com/mykola2312/mptv/mpv/MPVSocket.java new file mode 100644 index 0000000..825f61e --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/mpv/MPVSocket.java @@ -0,0 +1,62 @@ +package com.mykola2312.mptv.mpv; + +import java.io.IOException; +import java.net.SocketException; +import java.nio.file.Path; + +import org.newsclub.net.unix.AFOutputStream; +import org.newsclub.net.unix.AFUNIXSocket; +import org.newsclub.net.unix.AFUNIXSocketAddress; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; + +public class MPVSocket { + private static final Logger logger = LoggerFactory.getLogger(MPVSocket.class); + + private AFUNIXSocket socket; + + private static final long WAIT_MILLIS = 250; + private static final int WAIT_ATTEMPTS = 5; + + public void waitForConnection(Path socketPath) throws IOException { + for (int i = 0; i < WAIT_ATTEMPTS; i++) { + try { + Thread.sleep(WAIT_MILLIS); + + socket = AFUNIXSocket.newInstance(); + socket.connect(AFUNIXSocketAddress.of(socketPath)); + + logger.info(String.format("connected to socket %s", socket.toString())); + } catch (SocketException e) { + logger.error("SocketException", e); + close(); + } catch (InterruptedException e) { + break; + } + } + } + + public void close() { + try { + socket.close(); + } catch (IOException e) { + logger.error("failed to close socket", e); + } + } + + public void writeCommandRaw(MPVCommandRaw command) { + try { + AFOutputStream output = socket.getOutputStream(); + + output.write(command.serialize()); + output.write('\n'); + output.flush(); + } catch (JsonProcessingException e) { + logger.error("failed to serialize command", e); + } catch (IOException e) { + logger.error("io exception", e); + } + } +} diff --git a/src/main/java/com/mykola2312/mptv/task/TaskProcess.java b/src/main/java/com/mykola2312/mptv/task/TaskProcess.java index 99da6cf..b492c36 100644 --- a/src/main/java/com/mykola2312/mptv/task/TaskProcess.java +++ b/src/main/java/com/mykola2312/mptv/task/TaskProcess.java @@ -3,7 +3,7 @@ package com.mykola2312.mptv.task; import java.io.IOException; public interface TaskProcess { - public void spawn() throws IOException; + public boolean spawn() throws IOException; public boolean isAlive(); public void stop(); }