From cf3c12a36a8aee4b41ca195744ef1d99a51d0574 Mon Sep 17 00:00:00 2001 From: Vladimir Borisov Date: Mon, 19 Jul 2021 18:07:30 +0300 Subject: [PATCH] Working most of the player IPC; Better error handling --- src/stremio_app/stremio_app.rs | 99 +++++++---- .../stremio_player/stremio_player.rs | 160 +++++++++++++++++- 2 files changed, 214 insertions(+), 45 deletions(-) diff --git a/src/stremio_app/stremio_app.rs b/src/stremio_app/stremio_app.rs index ec07abc..54960fb 100644 --- a/src/stremio_app/stremio_app.rs +++ b/src/stremio_app/stremio_app.rs @@ -28,13 +28,16 @@ struct RPCResponseData { transport: RPCResponseDataTransport, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Default, Serialize, Deserialize, Debug, Clone)] struct RPCResponse { id: u64, object: String, #[serde(rename = "type")] response_type: u32, - data: RPCResponseData, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, + #[serde(skip_serializing_if = "Option::is_none")] + args: Option, } ////////////////////////////////////////// @@ -61,12 +64,16 @@ impl MainWindow { self.window.set_position(x, y); let player_channel = self.player.channel.borrow(); - let (player_tx, player_rx) = player_channel.as_ref().unwrap(); + let (player_tx, player_rx) = player_channel + .as_ref() + .expect("Cannont obtain communication channel for the Player"); let player_tx = player_tx.clone(); let player_rx = Arc::clone(player_rx); let web_channel = self.webview.channel.borrow(); - let (web_tx, web_rx) = web_channel.as_ref().unwrap(); + let (web_tx, web_rx) = web_channel + .as_ref() + .expect("Cannont obtain communication channel for the Web UI"); let web_tx = web_tx.clone(); let web_rx = Arc::clone(web_rx); thread::spawn(move || { @@ -75,8 +82,16 @@ impl MainWindow { { let rx = player_rx.lock().unwrap(); if let Ok(msg) = rx.try_recv() { - println!("APP GOT FROM PLAYER {}", msg); - // web_tx.send(msg).ok(); + let resp = RPCResponse { + id: 1, + object: "transport".to_string(), + response_type: 1, + args: serde_json::from_str(&msg).ok(), + ..Default::default() + }; + let resp_json = + serde_json::to_string(&resp).expect("Cannot serialize the response"); + web_tx.send(resp_json).ok(); } }; @@ -84,39 +99,51 @@ impl MainWindow { { let rx = web_rx.lock().unwrap(); if let Ok(msg) = rx.try_recv() { - let msg: RPCRequest = serde_json::from_str(&msg).unwrap(); - if msg.id == 0 { - let resp: RPCResponse = RPCResponse { - id: 0, - object: "transport".to_string(), - response_type: 3, - data: RPCResponseData { - transport: RPCResponseDataTransport { - properties: vec![ - vec![], - vec![ - "".to_string(), - "shellVersion".to_string(), - "".to_string(), - "5.0.0".to_string(), + if let Ok(msg) = serde_json::from_str::(&msg) { + // The handshake. Here we send some useful data to the WEB UI + if msg.id == 0 { + let resp = RPCResponse { + id: 0, + object: "transport".to_string(), + response_type: 3, + data: Some(RPCResponseData { + transport: RPCResponseDataTransport { + properties: vec![ + vec![], + vec![ + "".to_string(), + "shellVersion".to_string(), + "".to_string(), + "5.0.0".to_string(), + ], ], - ], - signals: vec![], - methods: vec![vec!["onEvent".to_string(), "".to_string()]], - }, - }, - }; - let resp_json = serde_json::to_string(&resp).unwrap(); - web_tx.send(resp_json).ok(); - } else if let Some(args) = msg.args { - // TODO: this can panic - let method = serde_json::from_value::(args[0].clone()).unwrap(); - if method.starts_with("mpv-") { - let resp_json = serde_json::to_string(&args).unwrap(); - player_tx.send(resp_json).ok(); + signals: vec![], + methods: vec![vec![ + "onEvent".to_string(), + "".to_string(), + ]], + }, + }), + ..Default::default() + }; + let resp_json = serde_json::to_string(&resp).unwrap(); + web_tx.send(resp_json).ok(); + } else if let Some(args) = msg.args { + // TODO: this can panic + if let Some(method) = args.first() { + let method = method.as_str().unwrap(); + if method.starts_with("mpv-") { + let resp_json = serde_json::to_string(&args).unwrap(); + player_tx.send(resp_json).ok(); + } else { + eprintln!("Unsupported command {:?}", args) + } + } } + } else { + eprintln!("Web UI sent invalid JSON: {:?}", msg); } - } + } // try_recv }; } }); diff --git a/src/stremio_app/stremio_player/stremio_player.rs b/src/stremio_app/stremio_player/stremio_player.rs index a8111b1..a7b0990 100644 --- a/src/stremio_app/stremio_player/stremio_player.rs +++ b/src/stremio_app/stremio_player/stremio_player.rs @@ -1,9 +1,51 @@ use native_windows_gui::{self as nwg, PartialUi}; +use serde::{Deserialize, Serialize}; use std::cell::RefCell; use std::sync::mpsc; use std::sync::{Arc, Mutex}; use std::thread; +#[derive(Default, Serialize, Deserialize, Debug, Clone)] +pub struct MpvEvent { + #[serde(skip_serializing_if = "Option::is_none")] + error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + data: Option, + #[serde(skip_serializing_if = "Option::is_none")] + reason: Option, +} + +impl MpvEvent { + fn value_from_format(data: mpv::Format, as_json: bool) -> serde_json::Value { + match data { + mpv::Format::Flag(d) => serde_json::Value::Bool(d), + mpv::Format::Int(d) => serde_json::Value::Number( + serde_json::Number::from_f64(d as f64).expect("MPV returned invalid number"), + ), + mpv::Format::Double(d) => serde_json::Value::Number( + serde_json::Number::from_f64(d).expect("MPV returned invalid number"), + ), + mpv::Format::OsdStr(s) => serde_json::Value::String(s.to_string()), + mpv::Format::Str(s) => { + if as_json { + serde_json::from_str(s).expect("MPV returned invalid JSON data") + } else { + serde_json::Value::String(s.to_string()) + } + } + } + } + fn string_from_end_reason(data: mpv::EndFileReason) -> String { + match data { + mpv::EndFileReason::MPV_END_FILE_REASON_ERROR => "error".to_string(), + mpv::EndFileReason::MPV_END_FILE_REASON_QUIT => "quit".to_string(), + _ => "other".to_string(), + } + } +} + #[derive(Default)] pub struct Player { pub channel: RefCell, Arc>>)>>, @@ -48,24 +90,124 @@ impl PartialUi for Player { .set_option("msg-level", "all=v") .expect("failed setting msg-level"); //mpv_builder.set_option("quiet", "yes").expect("failed setting msg-level"); - let mut mpv = mpv_builder.build().unwrap(); + let mut mpv = mpv_builder.build().expect("Cannot build MPV"); 'main: loop { // wait up to 0.0 seconds for an event. while let Some(event) = mpv.wait_event(0.0) { // even if you don't do anything with the events, it is still necessary to empty // the event loop - // TODO: Parse and format the Event in proper JSON format - tx1.send(format!("{:?}", event)).ok(); - println!("RECEIVED EVENT : {:?}", event); - match event { - mpv::Event::Shutdown | mpv::Event::EndFile(_) => { + + let json_responses = ["track-list", "video-params", "metadata"]; + let resp_event = match event { + mpv::Event::PropertyChange { + name, + change, + reply_userdata: _, + } => Some(( + "mpv-prop-change", + MpvEvent { + name: Some(name.to_string()), + data: Some(MpvEvent::value_from_format( + change, + json_responses.contains(&name), + )), + ..Default::default() + }, + )), + mpv::Event::EndFile(Ok(reason)) => Some(( + "mpv-event-ended", + MpvEvent { + reason: Some(MpvEvent::string_from_end_reason(reason)), + ..Default::default() + }, + )), + mpv::Event::Shutdown => { break 'main; } + _ => None, + }; + if let Some(resp) = resp_event { + tx1.send( + serde_json::to_string(&resp).expect("Cannot generate MPV event JSON"), + ) + .ok(); + } + } + if let Ok(msg) = rx.try_recv() { + let (message, data): (String, serde_json::Value) = + serde_json::from_str(&msg).unwrap(); + match message.as_str() { + "mpv-observe-prop" => { + if let Some(property) = data.as_str() { + match property { + "pause" | "paused-for-cache" | "seeking" | "eof-reached" => { + mpv.observe_property::(property, 0).ok(); + } + "aid" | "vid" | "sid" => { + mpv.observe_property::(property, 0).ok(); + } + "time-pos" + | "volume" + | "duration" + | "sub-scale" + | "cache-buffering-state" + | "sub-pos" => { + mpv.observe_property::(property, 0).ok(); + } + "path" | "mpv-version" | "ffmpeg-version" | "track-list" + | "video-params" | "metadata" => { + mpv.observe_property::<&str>(property, 0).ok(); + } + other => { + eprintln!( + "mpv-observe-prop: not implemented for `{}`", + other + ); + } + }; + } + } + "mpv-set-prop" => { + match serde_json::from_value::>(data.clone()) { + Ok(prop_vector) => { + if let [prop, val] = &prop_vector[..] { + let prop = prop.as_str().expect("Property is not a string"); + // If we change vo MPV panics + if prop != "vo" { + match val { + serde_json::Value::Bool(v) => { + mpv.set_property(prop, *v).ok(); + } + serde_json::Value::Number(v) => { + mpv.set_property(prop, v.as_f64().unwrap()) + .ok(); + } + serde_json::Value::String(v) => { + mpv.set_property(prop, v.as_str()).ok(); + } + _ => {} + }; + }; + } + } + Err(e) => { + eprintln!("mpv-set-prop Error: {:?} for data {}", e, data) + } + }; + } + "mpv-command" => { + match serde_json::from_value::>(data.clone()) { + Ok(data) => { + let data: Vec<_> = data.iter().map(|s| s.as_str()).collect(); + mpv.command(&data).ok(); + } + Err(e) => { + eprintln!("mpv-command Error: {:?} for data {}", e, data) + } + } + } _ => {} }; - } - if let Ok(msg) = rx.try_recv() { - println!("PLAYER RECEIVED MESSAGE: {}", msg); // let video_path = "http://distribution.bbb3d.renderfarming.net/video/mp4/bbb_sunflower_1080p_30fps_normal.mp4"; // mpv.command(&["loadfile", video_path]).ok(); // mpv.command(&["stop"]).ok();