From d29129a1f3015236c63f74b83a60ef9832677dec Mon Sep 17 00:00:00 2001 From: mykola2312 <49044616+mykola2312@users.noreply.github.com> Date: Fri, 26 Apr 2024 05:40:50 +0300 Subject: [PATCH] implement async command response handling --- src/main/java/com/mykola2312/mptv/Main.java | 10 +++ .../java/com/mykola2312/mptv/mpv/MPV.java | 85 ++++++++++++++++--- .../com/mykola2312/mptv/mpv/MPVCommand.java | 3 +- .../mykola2312/mptv/mpv/MPVCommandResult.java | 16 ++++ .../mptv/mpv/MPVCommandTimeout.java | 7 ++ 5 files changed, 110 insertions(+), 11 deletions(-) create mode 100644 src/main/java/com/mykola2312/mptv/mpv/MPVCommandResult.java create mode 100644 src/main/java/com/mykola2312/mptv/mpv/MPVCommandTimeout.java diff --git a/src/main/java/com/mykola2312/mptv/Main.java b/src/main/java/com/mykola2312/mptv/Main.java index 5a749b2..3affbd4 100644 --- a/src/main/java/com/mykola2312/mptv/Main.java +++ b/src/main/java/com/mykola2312/mptv/Main.java @@ -4,6 +4,9 @@ import com.mykola2312.mptv.config.Config; import com.mykola2312.mptv.crawler.Crawler; import com.mykola2312.mptv.db.DB; import com.mykola2312.mptv.mpv.MPV; +import com.mykola2312.mptv.mpv.MPVCommandResult; +import com.mykola2312.mptv.mpv.MPVProperty; +import com.mykola2312.mptv.mpv.MPVSetProperty; import com.mykola2312.mptv.task.ProcessService; import com.mykola2312.mptv.task.TaskDispatcher; import com.mykola2312.mptv.ui.MainFrame; @@ -97,6 +100,13 @@ public class Main { logger.info("spawned mpv"); processService.registerProcess(mpv); + + for (int i = 0; i < 10; i++) { + MPVCommandResult result = mpv.writeCommand(new MPVSetProperty(MPVProperty.VOLUME, 0)); + if (result != null) { + logger.info(String.format("command %d status: %s", result.request_id, result.error)); + } + } } else { logger.error("failed to spawn mpv"); } diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPV.java b/src/main/java/com/mykola2312/mptv/mpv/MPV.java index deb1d78..6829bb8 100644 --- a/src/main/java/com/mykola2312/mptv/mpv/MPV.java +++ b/src/main/java/com/mykola2312/mptv/mpv/MPV.java @@ -2,7 +2,11 @@ package com.mykola2312.mptv.mpv; import java.io.IOException; import java.net.SocketException; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; +import java.util.Arrays; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.newsclub.net.unix.AFInputStream; import org.newsclub.net.unix.AFOutputStream; @@ -27,17 +31,50 @@ public class MPV implements TaskProcess { private AFUNIXSocket socket; private class MPVReader implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(MPVReader.class); + + private final MPV mpv; private final AFInputStream input; public boolean running = true; - public MPVReader(AFInputStream input) { + public MPVReader(MPV mpv, AFInputStream input) { + this.mpv = mpv; this.input = input; } + private static final int BUFFER_SIZE = 512; + private static final Pattern EVENT_PATTERN = Pattern.compile("^\\{\"event\""); + @Override public void run() { - while (running && !Thread.currentThread().isInterrupted()) { + byte[] buf = new byte[BUFFER_SIZE]; + try { + while (running && !Thread.currentThread().isInterrupted()) { + int len = input.read(buf, 0, buf.length); + if (len < 0) { + running = false; + return; + } + String line = new String( + Arrays.copyOfRange(buf, 0, len), + StandardCharsets.UTF_8); + + Matcher eventMatch = EVENT_PATTERN.matcher(line); + if (eventMatch.find()) { + // handle event + } else { + // handle command result + try { + MPVCommandResult result = MPVCommandResult.deserialize(line); + mpv.handleCommandResult(result); + } catch (JsonProcessingException e) { + logger.warn("failed to parse: " + line); + } + } + } + } catch (IOException e) { + logger.error("failed to read. exiting reader", e); } } @@ -50,7 +87,7 @@ public class MPV implements TaskProcess { private static final long WAIT_MILLIS = 250; private static final int WAIT_ATTEMPTS = 5; - public void waitForConnection(Path socketPath) throws IOException { + private void waitForConnection(Path socketPath) throws IOException { for (int i = 0; i < WAIT_ATTEMPTS; i++) { try { Thread.sleep(WAIT_MILLIS); @@ -58,7 +95,7 @@ public class MPV implements TaskProcess { socket = AFUNIXSocket.newInstance(); socket.connect(AFUNIXSocketAddress.of(socketPath)); - reader = new MPVReader(socket.getInputStream()); + reader = new MPVReader(this, socket.getInputStream()); readerThread = new Thread(reader); readerThread.start(); @@ -72,7 +109,7 @@ public class MPV implements TaskProcess { } } - public void closeConnection() { + private void closeConnection() { try { socket.close(); } catch (IOException e) { @@ -88,8 +125,6 @@ public class MPV implements TaskProcess { waitForConnection(MPV_SOCKET_PATH); - writeCommand(new MPVSetProperty(MPVProperty.VOLUME, 0)); - return isAlive(); } @@ -110,16 +145,46 @@ public class MPV implements TaskProcess { process = null; } - public void writeCommand(MPVCommand command) { + private int requestIdCounter = 0; + + final Object commandMutex = new Object(); + private int commandRequestId; + private MPVCommandResult commandResult; + + private static final long COMMAND_TIMEOUT = 2000L; + + public MPVCommandResult writeCommand(MPVCommand command) { try { + commandRequestId = command.setRequestId(requestIdCounter++); + AFOutputStream output = socket.getOutputStream(); output.write(command.serialize()); output.flush(); + + // wait for command result + synchronized (commandMutex) { + try { + commandMutex.wait(COMMAND_TIMEOUT); + } catch (InterruptedException e) { + throw new MPVCommandTimeout(); + } + } + + return commandResult; } catch (JsonProcessingException e) { - logger.error("failed to serialize command", e); + throw new RuntimeException("failed to serialize command", e); } catch (IOException e) { - logger.error("io exception", e); + throw new RuntimeException("io exception", e); + } + } + + public void handleCommandResult(MPVCommandResult result) { + synchronized (commandMutex) { + if (commandRequestId == result.request_id) { + commandResult = result; + commandMutex.notifyAll(); + } } } } diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPVCommand.java b/src/main/java/com/mykola2312/mptv/mpv/MPVCommand.java index 9ff613f..9b1f5fc 100644 --- a/src/main/java/com/mykola2312/mptv/mpv/MPVCommand.java +++ b/src/main/java/com/mykola2312/mptv/mpv/MPVCommand.java @@ -9,8 +9,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; public abstract class MPVCommand { private int requestId; - public void setRequestId(int requestId) { + public int setRequestId(int requestId) { this.requestId = requestId; + return this.requestId; } protected abstract List serializeCommand(); diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPVCommandResult.java b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandResult.java new file mode 100644 index 0000000..0335dbc --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandResult.java @@ -0,0 +1,16 @@ +package com.mykola2312.mptv.mpv; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class MPVCommandResult { + public int request_id; + public String error; + public String data; + + public static MPVCommandResult deserialize(String data) throws JsonProcessingException { + return new ObjectMapper() + .readerFor(MPVCommandResult.class) + .readValue(data); + } +} diff --git a/src/main/java/com/mykola2312/mptv/mpv/MPVCommandTimeout.java b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandTimeout.java new file mode 100644 index 0000000..2a7358f --- /dev/null +++ b/src/main/java/com/mykola2312/mptv/mpv/MPVCommandTimeout.java @@ -0,0 +1,7 @@ +package com.mykola2312.mptv.mpv; + +public class MPVCommandTimeout extends RuntimeException { + public MPVCommandTimeout() { + super("mpv command response timeout"); + } +}