implement async command response handling

This commit is contained in:
mykola2312 2024-04-26 05:40:50 +03:00
parent 14676023b8
commit d29129a1f3
5 changed files with 110 additions and 11 deletions

View file

@ -4,6 +4,9 @@ import com.mykola2312.mptv.config.Config;
import com.mykola2312.mptv.crawler.Crawler;
import com.mykola2312.mptv.db.DB;
import com.mykola2312.mptv.mpv.MPV;
import com.mykola2312.mptv.mpv.MPVCommandResult;
import com.mykola2312.mptv.mpv.MPVProperty;
import com.mykola2312.mptv.mpv.MPVSetProperty;
import com.mykola2312.mptv.task.ProcessService;
import com.mykola2312.mptv.task.TaskDispatcher;
import com.mykola2312.mptv.ui.MainFrame;
@ -97,6 +100,13 @@ public class Main {
logger.info("spawned mpv");
processService.registerProcess(mpv);
for (int i = 0; i < 10; i++) {
MPVCommandResult result = mpv.writeCommand(new MPVSetProperty(MPVProperty.VOLUME, 0));
if (result != null) {
logger.info(String.format("command %d status: %s", result.request_id, result.error));
}
}
} else {
logger.error("failed to spawn mpv");
}

View file

@ -2,7 +2,11 @@ package com.mykola2312.mptv.mpv;
import java.io.IOException;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.newsclub.net.unix.AFInputStream;
import org.newsclub.net.unix.AFOutputStream;
@ -27,17 +31,50 @@ public class MPV implements TaskProcess {
private AFUNIXSocket socket;
private class MPVReader implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(MPVReader.class);
private final MPV mpv;
private final AFInputStream input;
public boolean running = true;
public MPVReader(AFInputStream input) {
public MPVReader(MPV mpv, AFInputStream input) {
this.mpv = mpv;
this.input = input;
}
private static final int BUFFER_SIZE = 512;
private static final Pattern EVENT_PATTERN = Pattern.compile("^\\{\"event\"");
@Override
public void run() {
while (running && !Thread.currentThread().isInterrupted()) {
byte[] buf = new byte[BUFFER_SIZE];
try {
while (running && !Thread.currentThread().isInterrupted()) {
int len = input.read(buf, 0, buf.length);
if (len < 0) {
running = false;
return;
}
String line = new String(
Arrays.copyOfRange(buf, 0, len),
StandardCharsets.UTF_8);
Matcher eventMatch = EVENT_PATTERN.matcher(line);
if (eventMatch.find()) {
// handle event
} else {
// handle command result
try {
MPVCommandResult result = MPVCommandResult.deserialize(line);
mpv.handleCommandResult(result);
} catch (JsonProcessingException e) {
logger.warn("failed to parse: " + line);
}
}
}
} catch (IOException e) {
logger.error("failed to read. exiting reader", e);
}
}
@ -50,7 +87,7 @@ public class MPV implements TaskProcess {
private static final long WAIT_MILLIS = 250;
private static final int WAIT_ATTEMPTS = 5;
public void waitForConnection(Path socketPath) throws IOException {
private void waitForConnection(Path socketPath) throws IOException {
for (int i = 0; i < WAIT_ATTEMPTS; i++) {
try {
Thread.sleep(WAIT_MILLIS);
@ -58,7 +95,7 @@ public class MPV implements TaskProcess {
socket = AFUNIXSocket.newInstance();
socket.connect(AFUNIXSocketAddress.of(socketPath));
reader = new MPVReader(socket.getInputStream());
reader = new MPVReader(this, socket.getInputStream());
readerThread = new Thread(reader);
readerThread.start();
@ -72,7 +109,7 @@ public class MPV implements TaskProcess {
}
}
public void closeConnection() {
private void closeConnection() {
try {
socket.close();
} catch (IOException e) {
@ -88,8 +125,6 @@ public class MPV implements TaskProcess {
waitForConnection(MPV_SOCKET_PATH);
writeCommand(new MPVSetProperty(MPVProperty.VOLUME, 0));
return isAlive();
}
@ -110,16 +145,46 @@ public class MPV implements TaskProcess {
process = null;
}
public void writeCommand(MPVCommand command) {
private int requestIdCounter = 0;
final Object commandMutex = new Object();
private int commandRequestId;
private MPVCommandResult commandResult;
private static final long COMMAND_TIMEOUT = 2000L;
public MPVCommandResult writeCommand(MPVCommand command) {
try {
commandRequestId = command.setRequestId(requestIdCounter++);
AFOutputStream output = socket.getOutputStream();
output.write(command.serialize());
output.flush();
// wait for command result
synchronized (commandMutex) {
try {
commandMutex.wait(COMMAND_TIMEOUT);
} catch (InterruptedException e) {
throw new MPVCommandTimeout();
}
}
return commandResult;
} catch (JsonProcessingException e) {
logger.error("failed to serialize command", e);
throw new RuntimeException("failed to serialize command", e);
} catch (IOException e) {
logger.error("io exception", e);
throw new RuntimeException("io exception", e);
}
}
public void handleCommandResult(MPVCommandResult result) {
synchronized (commandMutex) {
if (commandRequestId == result.request_id) {
commandResult = result;
commandMutex.notifyAll();
}
}
}
}

View file

@ -9,8 +9,9 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public abstract class MPVCommand {
private int requestId;
public void setRequestId(int requestId) {
public int setRequestId(int requestId) {
this.requestId = requestId;
return this.requestId;
}
protected abstract List<String> serializeCommand();

View file

@ -0,0 +1,16 @@
package com.mykola2312.mptv.mpv;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class MPVCommandResult {
public int request_id;
public String error;
public String data;
public static MPVCommandResult deserialize(String data) throws JsonProcessingException {
return new ObjectMapper()
.readerFor(MPVCommandResult.class)
.readValue(data);
}
}

View file

@ -0,0 +1,7 @@
package com.mykola2312.mptv.mpv;
public class MPVCommandTimeout extends RuntimeException {
public MPVCommandTimeout() {
super("mpv command response timeout");
}
}