floem/
ext_event.rs

1use std::{cell::Cell, collections::VecDeque, sync::Arc};
2
3use floem_reactive::{
4    create_effect, create_rw_signal, untrack, with_scope, ReadSignal, RwSignal, Scope, SignalGet,
5    SignalUpdate, SignalWith, WriteSignal,
6};
7use parking_lot::Mutex;
8
9use crate::{
10    app::UserEvent,
11    window_handle::{get_current_view, set_current_view},
12    Application,
13};
14
15#[cfg(feature = "crossbeam")]
16use crossbeam::channel::Receiver;
17#[cfg(not(feature = "crossbeam"))]
18use std::sync::mpsc::Receiver;
19
20/// # SAFETY
21///
22/// **DO NOT USE THIS** trigger except for when using with `create_ext_action` or when you guarantee that
23/// the signal is never used from a different thread than it was created on.
24#[derive(Debug)]
25pub struct ExtSendTrigger {
26    signal: RwSignal<()>,
27}
28
29impl Copy for ExtSendTrigger {}
30
31impl Clone for ExtSendTrigger {
32    fn clone(&self) -> Self {
33        *self
34    }
35}
36
37impl ExtSendTrigger {
38    pub fn notify(&self) {
39        self.signal.set(());
40    }
41
42    pub fn track(&self) {
43        self.signal.with(|_| {});
44    }
45
46    #[allow(clippy::new_without_default)]
47    pub fn new() -> Self {
48        create_trigger()
49    }
50}
51
52pub fn create_trigger() -> ExtSendTrigger {
53    ExtSendTrigger {
54        signal: create_rw_signal(()),
55    }
56}
57
58unsafe impl Send for ExtSendTrigger {}
59unsafe impl Sync for ExtSendTrigger {}
60
61pub(crate) static EXT_EVENT_HANDLER: ExtEventHandler = ExtEventHandler::new();
62
63pub(crate) struct ExtEventHandler {
64    pub(crate) queue: Mutex<VecDeque<ExtSendTrigger>>,
65}
66
67impl Default for ExtEventHandler {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl ExtEventHandler {
74    pub const fn new() -> Self {
75        Self {
76            queue: Mutex::new(VecDeque::new()),
77        }
78    }
79
80    pub fn add_trigger(&self, trigger: ExtSendTrigger) {
81        {
82            // Run this in a short block to prevent any deadlock if running the trigger effects
83            // causes another trigger to be registered
84            EXT_EVENT_HANDLER.queue.lock().push_back(trigger);
85        }
86        Application::send_proxy_event(UserEvent::Idle);
87    }
88}
89
90pub fn register_ext_trigger(trigger: ExtSendTrigger) {
91    EXT_EVENT_HANDLER.add_trigger(trigger);
92}
93
94pub fn create_ext_action<T: Send + 'static>(
95    cx: Scope,
96    action: impl FnOnce(T) + 'static,
97) -> impl FnOnce(T) {
98    let view = get_current_view();
99    let cx = cx.create_child();
100    let trigger = with_scope(cx, ExtSendTrigger::new);
101    let data = Arc::new(Mutex::new(None));
102
103    {
104        let data = data.clone();
105        let action = Cell::new(Some(action));
106        with_scope(cx, move || {
107            create_effect(move |_| {
108                trigger.track();
109                if let Some(event) = data.lock().take() {
110                    untrack(|| {
111                        let current_view = get_current_view();
112                        set_current_view(view);
113                        let action = action.take().unwrap();
114                        action(event);
115                        set_current_view(current_view);
116                    });
117                    cx.dispose();
118                }
119            });
120        });
121    }
122
123    move |event| {
124        *data.lock() = Some(event);
125        EXT_EVENT_HANDLER.add_trigger(trigger);
126    }
127}
128
129pub fn update_signal_from_channel<T: Send + 'static>(
130    writer: WriteSignal<Option<T>>,
131    rx: Receiver<T>,
132) {
133    let cx = Scope::new();
134    let trigger = with_scope(cx, ExtSendTrigger::new);
135
136    let channel_closed = cx.create_rw_signal(false);
137    let data = Arc::new(Mutex::new(VecDeque::new()));
138
139    {
140        let data = data.clone();
141        cx.create_effect(move |_| {
142            trigger.track();
143            while let Some(value) = data.lock().pop_front() {
144                writer.set(value);
145            }
146
147            if channel_closed.get() {
148                cx.dispose();
149            }
150        });
151    }
152
153    let send = create_ext_action(cx, move |_| {
154        channel_closed.set(true);
155    });
156
157    std::thread::spawn(move || {
158        while let Ok(event) = rx.recv() {
159            data.lock().push_back(Some(event));
160            EXT_EVENT_HANDLER.add_trigger(trigger);
161        }
162        send(());
163    });
164}
165
166pub fn create_signal_from_channel<T: Send + 'static>(rx: Receiver<T>) -> ReadSignal<Option<T>> {
167    let cx = Scope::new();
168    let trigger = with_scope(cx, ExtSendTrigger::new);
169
170    let channel_closed = cx.create_rw_signal(false);
171    let (read, write) = cx.create_signal(None);
172    let data = Arc::new(Mutex::new(VecDeque::new()));
173
174    {
175        let data = data.clone();
176        cx.create_effect(move |_| {
177            trigger.track();
178            while let Some(value) = data.lock().pop_front() {
179                write.set(value);
180            }
181
182            if channel_closed.get() {
183                cx.dispose();
184            }
185        });
186    }
187
188    let send = create_ext_action(cx, move |_| {
189        channel_closed.set(true);
190    });
191
192    std::thread::spawn(move || {
193        while let Ok(event) = rx.recv() {
194            data.lock().push_back(Some(event));
195            EXT_EVENT_HANDLER.add_trigger(trigger);
196        }
197        send(());
198    });
199
200    read
201}
202
203#[cfg(feature = "tokio")]
204pub fn create_signal_from_tokio_channel<T: Send + 'static>(
205    mut rx: tokio::sync::mpsc::UnboundedReceiver<T>,
206) -> ReadSignal<Option<T>> {
207    let cx = Scope::new();
208    let trigger = with_scope(cx, ExtSendTrigger::new);
209
210    let channel_closed = cx.create_rw_signal(false);
211    let (read, write) = cx.create_signal(None);
212    let data = std::sync::Arc::new(std::sync::Mutex::new(VecDeque::new()));
213
214    {
215        let data = data.clone();
216        cx.create_effect(move |_| {
217            trigger.track();
218            while let Some(value) = data.lock().unwrap().pop_front() {
219                write.set(value);
220            }
221
222            if channel_closed.get() {
223                cx.dispose();
224            }
225        });
226    }
227
228    let send = create_ext_action(cx, move |_| {
229        channel_closed.set(true);
230    });
231
232    tokio::spawn(async move {
233        while let Some(event) = rx.recv().await {
234            data.lock().unwrap().push_back(Some(event));
235            crate::ext_event::register_ext_trigger(trigger);
236        }
237        send(());
238    });
239
240    read
241}
242
243#[cfg(feature = "futures")]
244pub fn create_signal_from_stream<T: 'static>(
245    initial_value: T,
246    stream: impl futures::Stream<Item = T> + 'static,
247) -> ReadSignal<T> {
248    use std::{
249        cell::RefCell,
250        task::{Context, Poll},
251    };
252
253    use futures::task::{waker, ArcWake};
254
255    let cx = Scope::current().create_child();
256    let trigger = with_scope(cx, ExtSendTrigger::new);
257    let (read, write) = cx.create_signal(initial_value);
258
259    /// Waker that wakes by registering a trigger
260    // TODO: since the trigger is just a `u64`, it could theoretically be changed to be a `usize`,
261    //       Then the implementation of the std::task::RawWakerVTable could pass the `usize` as the data pointer,
262    //       avoiding any allocation/reference counting
263    struct TriggerWake(ExtSendTrigger);
264    impl ArcWake for TriggerWake {
265        fn wake_by_ref(arc_self: &Arc<Self>) {
266            EXT_EVENT_HANDLER.add_trigger(arc_self.0);
267        }
268    }
269
270    // We need a refcell because effects are `Fn` and not `FnMut`
271    let stream = RefCell::new(Box::pin(stream));
272    let arc_trigger = Arc::new(TriggerWake(trigger));
273
274    cx.create_effect(move |_| {
275        // Run the effect when the waker is called
276        trigger.track();
277        let Ok(mut stream) = stream.try_borrow_mut() else {
278            unreachable!("The waker registers events effecs to be run only at idle")
279        };
280
281        let waker = waker(arc_trigger.clone());
282        let mut context = Context::from_waker(&waker);
283
284        let mut last_value = None;
285        // Wee need to loop because if the stream returns `Poll::Ready`, it can discard the waker until
286        // `poll_next` is called again, because it assumes that the task is performing other things
287        loop {
288            let poll = stream.as_mut().poll_next(&mut context);
289            match poll {
290                Poll::Pending => break,
291                Poll::Ready(Some(v)) => last_value = Some(v),
292                Poll::Ready(None) => {
293                    // The stream is closed, the effect and the trigger will not be used anymore
294                    cx.dispose();
295                    break;
296                }
297            }
298        }
299        // Only write once to the signal
300        if let Some(v) = last_value {
301            write.set(v);
302        }
303    });
304
305    read
306}