Skip to main content

floem/
ext_event.rs

1use std::{cell::Cell, collections::VecDeque, sync::Arc};
2
3use floem_reactive::{
4    Effect, ReadSignal, RwSignal, Scope, SignalGet, SignalUpdate, SignalWith, WriteSignal,
5};
6use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
7
8use crate::{
9    Application,
10    app::UserEvent,
11    window::handle::{get_current_view, set_current_view},
12};
13
14#[cfg(feature = "crossbeam")]
15use crossbeam::channel::Receiver;
16#[cfg(not(feature = "crossbeam"))]
17use std::sync::mpsc::Receiver;
18
19/// # SAFETY
20///
21/// **DO NOT USE THIS** trigger except for when using with `create_ext_action` or when you guarantee that
22/// the signal is never used from a different thread than it was created on.
23#[derive(Debug)]
24pub struct ExtSendTrigger {
25    signal: RwSignal<()>,
26}
27
28impl Copy for ExtSendTrigger {}
29
30impl Clone for ExtSendTrigger {
31    fn clone(&self) -> Self {
32        *self
33    }
34}
35
36impl ExtSendTrigger {
37    pub fn notify(&self) {
38        self.signal.set(());
39    }
40
41    pub fn track(&self) {
42        self.signal.with(|_| {});
43    }
44
45    #[allow(clippy::new_without_default)]
46    pub fn new() -> Self {
47        create_trigger()
48    }
49}
50
51pub fn create_trigger() -> ExtSendTrigger {
52    ExtSendTrigger {
53        signal: RwSignal::new(()),
54    }
55}
56
57unsafe impl Send for ExtSendTrigger {}
58unsafe impl Sync for ExtSendTrigger {}
59
60pub(crate) static EXT_EVENT_HANDLER: ExtEventHandler = ExtEventHandler::new();
61
62pub(crate) struct ExtEventHandler {
63    pub(crate) queue: Mutex<VecDeque<ExtSendTrigger>>,
64}
65
66impl Default for ExtEventHandler {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72impl ExtEventHandler {
73    pub const fn new() -> Self {
74        Self {
75            queue: Mutex::new(VecDeque::new()),
76        }
77    }
78
79    pub fn add_trigger(&self, trigger: ExtSendTrigger) {
80        {
81            // Run this in a short block to prevent any deadlock if running the trigger effects
82            // causes another trigger to be registered
83            EXT_EVENT_HANDLER.queue.lock().push_back(trigger);
84        }
85        Application::send_proxy_event(UserEvent::Idle);
86    }
87}
88
89pub fn register_ext_trigger(trigger: ExtSendTrigger) {
90    EXT_EVENT_HANDLER.add_trigger(trigger);
91}
92
93pub fn create_ext_action<T: Send + 'static>(
94    cx: Scope,
95    action: impl FnOnce(T) + 'static,
96) -> impl FnOnce(T) {
97    let view = get_current_view();
98    let cx = cx.create_child();
99    let trigger = cx.enter(ExtSendTrigger::new);
100    let data = Arc::new(Mutex::new(None));
101
102    {
103        let data = data.clone();
104        let action = Cell::new(Some(action));
105        cx.enter(move || {
106            Effect::new(move |_| {
107                trigger.track();
108                if let Some(event) = data.lock().take() {
109                    Effect::untrack(|| {
110                        let current_view = get_current_view();
111                        set_current_view(view.root());
112                        let action = action.take().unwrap();
113                        action(event);
114                        set_current_view(current_view);
115                    });
116                    cx.dispose();
117                }
118            });
119        });
120    }
121
122    move |event| {
123        *data.lock() = Some(event);
124        EXT_EVENT_HANDLER.add_trigger(trigger);
125    }
126}
127
128// Deprecated functions removed due to type incompatibility with new API
129// Use StreamSignal::on_executor_with_default or ChannelSignal::on_executor_with_default instead
130
131pub fn update_signal_from_channel<T: Send + 'static>(
132    writer: WriteSignal<Option<T>>,
133    rx: Receiver<T>,
134) {
135    let cx = Scope::new();
136    let trigger = cx.enter(ExtSendTrigger::new);
137
138    let channel_closed = cx.create_rw_signal(false);
139    let data = Arc::new(Mutex::new(VecDeque::new()));
140
141    {
142        let data = data.clone();
143        cx.create_effect(move |_| {
144            trigger.track();
145            while let Some(value) = data.lock().pop_front() {
146                writer.set(value);
147            }
148
149            if channel_closed.get() {
150                cx.dispose();
151            }
152        });
153    }
154
155    let send = create_ext_action(cx, move |_| {
156        channel_closed.set(true);
157    });
158
159    std::thread::spawn(move || {
160        while let Ok(event) = rx.recv() {
161            data.lock().push_back(Some(event));
162            EXT_EVENT_HANDLER.add_trigger(trigger);
163        }
164        send(());
165    });
166}
167
168#[derive(Clone)]
169pub struct ArcRwSignal<T> {
170    inner: Arc<ArcRwSignalInner<T>>,
171}
172
173struct ArcRwSignalInner<T> {
174    // The actual data, protected by a RwLock for thread safety
175    data: RwLock<T>,
176    // Trigger for notifying the reactive system of changes
177    trigger: ExtSendTrigger,
178    // Main-thread signal for reactive integration (created lazily)
179    main_signal: parking_lot::Mutex<Option<RwSignal<()>>>,
180}
181
182impl<T> ArcRwSignal<T> {
183    /// Create a new ArcRwSignal with the given initial value
184    pub fn new(value: T) -> Self {
185        Self {
186            inner: Arc::new(ArcRwSignalInner {
187                data: RwLock::new(value),
188                trigger: ExtSendTrigger::new(),
189                main_signal: parking_lot::Mutex::new(None),
190            }),
191        }
192    }
193
194    /// Get a read guard to the data (like Arc<RwLock<T>>::read())
195    /// This does NOT subscribe to reactive effects.
196    pub fn read(&self) -> RwLockReadGuard<'_, T> {
197        self.inner.data.read()
198    }
199
200    /// Get a write guard to the data (like Arc<RwLock<T>>::write())
201    /// This will notify reactive effects when the guard is dropped.
202    pub fn write(&self) -> ArcRwSignalWriteGuard<'_, T> {
203        let guard = self.inner.data.write();
204        ArcRwSignalWriteGuard {
205            guard,
206            trigger: self.inner.trigger,
207        }
208    }
209
210    /// Try to get a read guard without blocking
211    pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
212        self.inner.data.try_read()
213    }
214
215    /// Try to get a write guard without blocking
216    pub fn try_write(&self) -> Option<ArcRwSignalWriteGuard<'_, T>> {
217        self.inner
218            .data
219            .try_write()
220            .map(|guard| ArcRwSignalWriteGuard {
221                guard,
222                trigger: self.inner.trigger,
223            })
224    }
225}
226
227impl<T: Clone> ArcRwSignal<T> {
228    /// Get a clone of the current value and subscribe to changes in reactive contexts
229    pub fn get(&self) -> T {
230        self.track();
231        self.inner.data.read().clone()
232    }
233
234    /// Get a clone of the current value without subscribing to changes
235    pub fn get_untracked(&self) -> T {
236        self.inner.data.read().clone()
237    }
238
239    /// Set the value, notifying all reactive subscribers
240    pub fn set(&self, value: T) {
241        *self.inner.data.write() = value;
242        self.notify();
243    }
244
245    /// Update the value using a closure, notifying all reactive subscribers
246    pub fn update<R>(&self, f: impl FnOnce(&mut T) -> R) -> R {
247        let result = f(&mut *self.inner.data.write());
248        self.notify();
249        result
250    }
251
252    /// Apply a closure to the current value, subscribing to changes in reactive contexts
253    pub fn with<R>(&self, f: impl FnOnce(&T) -> R) -> R {
254        self.track();
255        f(&*self.inner.data.read())
256    }
257
258    /// Apply a closure to the current value without subscribing to changes
259    pub fn with_untracked<R>(&self, f: impl FnOnce(&T) -> R) -> R {
260        f(&*self.inner.data.read())
261    }
262}
263
264impl<T> ArcRwSignal<T> {
265    /// Subscribe to changes in reactive contexts (like signal.track())
266    pub fn track(&self) {
267        self.ensure_main_signal();
268        self.inner.trigger.track();
269    }
270
271    /// Manually notify reactive subscribers of changes
272    pub fn notify(&self) {
273        EXT_EVENT_HANDLER.add_trigger(self.inner.trigger);
274    }
275
276    /// Ensure the main-thread signal exists for reactive integration
277    fn ensure_main_signal(&self) {
278        let mut main_signal = self.inner.main_signal.lock();
279        if main_signal.is_none() {
280            *main_signal = Some(RwSignal::new(()));
281        }
282    }
283
284    /// Get a regular ReadSignal that updates when this ArcRwSignal changes.
285    /// The returned signal's value is always `()` - it's just for tracking changes.
286    pub fn to_read_signal(&self) -> ReadSignal<()> {
287        self.ensure_main_signal();
288        let main_signal = self.inner.main_signal.lock();
289        main_signal.as_ref().unwrap().read_only()
290    }
291
292    /// Get a regular WriteSignal that can trigger updates.
293    /// Writing to this signal will notify subscribers but won't change the actual data.
294    pub fn to_write_signal(&self) -> WriteSignal<()> {
295        self.ensure_main_signal();
296        let main_signal = self.inner.main_signal.lock();
297        main_signal.as_ref().unwrap().write_only()
298    }
299}
300
301/// A write guard that notifies reactive subscribers when dropped
302pub struct ArcRwSignalWriteGuard<'a, T> {
303    guard: RwLockWriteGuard<'a, T>,
304    trigger: ExtSendTrigger,
305}
306
307impl<T> Drop for ArcRwSignalWriteGuard<'_, T> {
308    fn drop(&mut self) {
309        EXT_EVENT_HANDLER.add_trigger(self.trigger);
310    }
311}
312
313impl<T> std::ops::Deref for ArcRwSignalWriteGuard<'_, T> {
314    type Target = T;
315
316    fn deref(&self) -> &Self::Target {
317        &self.guard
318    }
319}
320
321impl<T> std::ops::DerefMut for ArcRwSignalWriteGuard<'_, T> {
322    fn deref_mut(&mut self) -> &mut Self::Target {
323        &mut self.guard
324    }
325}
326
327// Make ArcRwSignal thread-safe
328unsafe impl<T: Send> Send for ArcRwSignal<T> {}
329unsafe impl<T: Send + Sync> Sync for ArcRwSignal<T> {}
330
331/// Convenience function to create an ArcRwSignal
332pub fn create_arc_rw_signal<T>(value: T) -> ArcRwSignal<T> {
333    ArcRwSignal::new(value)
334}