Skip to content

Commit

Permalink
Merge pull request #6425 from dignow/fix/query_onlines_block_thread
Browse files Browse the repository at this point in the history
Fix/query onlines block thread
  • Loading branch information
rustdesk authored Nov 16, 2023
2 parents 512c7df + 02bc5e3 commit 2aa5e68
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 23 deletions.
7 changes: 3 additions & 4 deletions flutter/lib/common/widgets/peers_view.dart
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {
final _curPeers = <String>{};
var _lastChangeTime = DateTime.now();
var _lastQueryPeers = <String>{};
var _lastQueryTime = DateTime.now().subtract(const Duration(hours: 1));
var _lastQueryTime = DateTime.now().add(const Duration(seconds: 30));
var _queryCount = 0;
var _exit = false;

Expand Down Expand Up @@ -272,8 +272,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {
if (_queryCount < _maxQueryCount) {
if (now.difference(_lastQueryTime) >= _queryInterval) {
if (_curPeers.isNotEmpty) {
platformFFI.ffiBind
.queryOnlines(ids: _curPeers.toList(growable: false));
bind.queryOnlines(ids: _curPeers.toList(growable: false));
_lastQueryTime = DateTime.now();
_queryCount += 1;
}
Expand All @@ -287,7 +286,7 @@ class _PeersViewState extends State<_PeersView> with WindowListener {

_queryOnlines(bool isLoadEvent) {
if (_curPeers.isNotEmpty) {
platformFFI.ffiBind.queryOnlines(ids: _curPeers.toList(growable: false));
bind.queryOnlines(ids: _curPeers.toList(growable: false));
_lastQueryPeers = {..._curPeers};
if (isLoadEvent) {
_lastChangeTime = DateTime.now();
Expand Down
73 changes: 73 additions & 0 deletions src/flutter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1597,3 +1597,76 @@ pub mod sessions {
.unwrap_or(false)
}
}

pub(super) mod async_tasks {
use hbb_common::{
bail,
tokio::{
self, select,
sync::mpsc::{unbounded_channel, UnboundedSender},
},
ResultType,
};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

type TxQueryOnlines = UnboundedSender<Vec<String>>;
lazy_static::lazy_static! {
static ref TX_QUERY_ONLINES: Arc<Mutex<Option<TxQueryOnlines>>> = Default::default();
}

#[inline]
pub fn start_flutter_async_runner() {
std::thread::spawn(start_flutter_async_runner_);
}

#[allow(dead_code)]
pub fn stop_flutter_async_runner() {
let _ = TX_QUERY_ONLINES.lock().unwrap().take();
}

#[tokio::main(flavor = "current_thread")]
async fn start_flutter_async_runner_() {
let (tx_onlines, mut rx_onlines) = unbounded_channel::<Vec<String>>();
TX_QUERY_ONLINES.lock().unwrap().replace(tx_onlines);

loop {
select! {
ids = rx_onlines.recv() => {
match ids {
Some(_ids) => {
#[cfg(not(any(target_os = "ios")))]
crate::rendezvous_mediator::query_online_states(_ids, handle_query_onlines).await
}
None => {
break;
}
}
}
}
}
}

pub fn query_onlines(ids: Vec<String>) -> ResultType<()> {
if let Some(tx) = TX_QUERY_ONLINES.lock().unwrap().as_ref() {
let _ = tx.send(ids)?;
} else {
bail!("No tx_query_onlines");
}
Ok(())
}

fn handle_query_onlines(onlines: Vec<String>, offlines: Vec<String>) {
let data = HashMap::from([
("name", "callback_query_onlines".to_owned()),
("onlines", onlines.join(",")),
("offlines", offlines.join(",")),
]);
let _res = super::push_global_event(
super::APP_TYPE_MAIN,
serde_json::ser::to_string(&data).unwrap_or("".to_owned()),
);
}
}
16 changes: 2 additions & 14 deletions src/flutter_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ lazy_static::lazy_static! {
}

fn initialize(app_dir: &str) {
flutter::async_tasks::start_flutter_async_runner();
*config::APP_DIR.write().unwrap() = app_dir.to_owned();
#[cfg(target_os = "android")]
{
Expand Down Expand Up @@ -1554,18 +1555,6 @@ pub fn main_get_build_date() -> String {
crate::BUILD_DATE.to_string()
}

fn handle_query_onlines(onlines: Vec<String>, offlines: Vec<String>) {
let data = HashMap::from([
("name", "callback_query_onlines".to_owned()),
("onlines", onlines.join(",")),
("offlines", offlines.join(",")),
]);
let _res = flutter::push_global_event(
flutter::APP_TYPE_MAIN,
serde_json::ser::to_string(&data).unwrap_or("".to_owned()),
);
}

pub fn translate(name: String, locale: String) -> SyncReturn<String> {
SyncReturn(crate::client::translate_locale(name, &locale))
}
Expand All @@ -1589,8 +1578,7 @@ pub fn session_register_texture(
}

pub fn query_onlines(ids: Vec<String>) {
#[cfg(not(any(target_os = "ios")))]
crate::rendezvous_mediator::query_online_states(ids, handle_query_onlines)
let _ = flutter::async_tasks::query_onlines(ids);
}

pub fn version_to_number(v: String) -> SyncReturn<i64> {
Expand Down
16 changes: 11 additions & 5 deletions src/rendezvous_mediator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ async fn direct_server(server: ServerPtr) {
}
}

#[tokio::main(flavor = "current_thread")]
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false;
if test {
Expand All @@ -598,7 +597,11 @@ pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<S
}

if query_begin.elapsed() > query_timeout {
log::debug!("query onlines timeout {:?}", query_timeout);
log::debug!(
"query onlines timeout {:?} ({:?})",
query_begin.elapsed(),
query_timeout
);
break;
}

Expand Down Expand Up @@ -679,8 +682,10 @@ async fn query_online_states_(

#[cfg(test)]
mod tests {
#[test]
fn test_query_onlines() {
use hbb_common::tokio;

#[tokio::test]
async fn test_query_onlines() {
super::query_online_states(
vec![
"152183996".to_owned(),
Expand All @@ -691,6 +696,7 @@ mod tests {
|onlines: Vec<String>, offlines: Vec<String>| {
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
},
);
)
.await;
}
}

0 comments on commit 2aa5e68

Please sign in to comment.