floem_reactive/
runtime.rs

1use std::{
2    any::{Any, TypeId},
3    cell::{Cell, RefCell},
4    cmp::Reverse,
5    collections::{HashMap, HashSet},
6    rc::Rc,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        OnceLock,
10    },
11    thread::{self, ThreadId},
12};
13
14use smallvec::SmallVec;
15
16use crate::{
17    effect::{run_effect, EffectPriority, EffectTrait},
18    id::Id,
19    signal::SignalState,
20    sync_runtime::SYNC_RUNTIME,
21};
22
23thread_local! {
24pub(crate) static RUNTIME: Runtime = Runtime::new();
25}
26
27static UI_THREAD_ID: OnceLock<ThreadId> = OnceLock::new();
28#[cfg(debug_assertions)]
29static UI_THREAD_SET_LOCATION: OnceLock<&'static std::panic::Location<'static>> = OnceLock::new();
30static ENFORCE_UI_THREAD: AtomicBool = AtomicBool::new(false);
31
32/// The internal reactive Runtime which stores all the reactive system states in a
33/// thread local.
34pub struct Runtime {
35    pub(crate) current_effect: RefCell<Option<Rc<dyn EffectTrait>>>,
36    pub(crate) current_scope: RefCell<Id>,
37    pub(crate) children: RefCell<HashMap<Id, HashSet<Id>>>,
38    pub(crate) signals: RefCell<HashMap<Id, SignalState>>,
39    pub(crate) effects: RefCell<HashMap<Id, Rc<dyn EffectTrait>>>,
40    pub(crate) contexts: RefCell<HashMap<TypeId, Box<dyn Any>>>,
41    pub(crate) batching: Cell<bool>,
42    pub(crate) pending_effects: RefCell<SmallVec<[Id; 10]>>,
43    pub(crate) pending_effects_set: RefCell<HashSet<Id>>,
44}
45
46impl Default for Runtime {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl Runtime {
53    pub(crate) fn new() -> Self {
54        Self {
55            current_effect: RefCell::new(None),
56            current_scope: RefCell::new(Id::next()),
57            children: RefCell::new(HashMap::new()),
58            signals: Default::default(),
59            effects: Default::default(),
60            contexts: Default::default(),
61            batching: Cell::new(false),
62            pending_effects: RefCell::new(SmallVec::new()),
63            pending_effects_set: RefCell::new(HashSet::new()),
64        }
65    }
66
67    /// Call this once on the UI thread during initialization.
68    #[cfg_attr(debug_assertions, track_caller)]
69    pub fn init_on_ui_thread() {
70        let current = thread::current().id();
71        match UI_THREAD_ID.set(current) {
72            Ok(_) => {}
73            Err(_) => {
74                assert_eq!(
75                    UI_THREAD_ID.get(),
76                    Some(&current),
77                    "UI thread id already set to a different thread"
78                );
79            }
80        }
81        #[cfg(debug_assertions)]
82        {
83            let caller = std::panic::Location::caller();
84            let _ = UI_THREAD_SET_LOCATION.set(caller);
85        }
86        ENFORCE_UI_THREAD.store(true, Ordering::Relaxed);
87    }
88
89    #[cfg_attr(debug_assertions, track_caller)]
90    pub fn assert_ui_thread() {
91        if !ENFORCE_UI_THREAD.load(Ordering::Relaxed) {
92            return;
93        }
94
95        let current = thread::current().id();
96        match UI_THREAD_ID.get() {
97            Some(ui_id) => {
98                if *ui_id != current {
99                    #[cfg(debug_assertions)]
100                    {
101                        let caller = std::panic::Location::caller();
102                        let set_at = UI_THREAD_SET_LOCATION.get();
103                        let set_info = set_at
104                            .map(|loc| format!(" (set at {}:{})", loc.file(), loc.line()))
105                            .unwrap_or_default();
106                        panic!(
107                            "Unsync runtime access from non-UI thread\n  expected UI thread: {:?}{}\n  current thread: {:?}\n  caller: {}:{}",
108                            ui_id,
109                            set_info,
110                            current,
111                            caller.file(),
112                            caller.line(),
113                        );
114                    }
115                    #[cfg(not(debug_assertions))]
116                    {
117                        assert_eq!(
118                            *ui_id, current,
119                            "Unsync runtime access from non-UI thread: expected {:?}, got {:?}",
120                            ui_id, current
121                        );
122                    }
123                }
124            }
125            None => {
126                // Once enforcement is on, first access defines the UI thread.
127                let _ = UI_THREAD_ID.set(current);
128            }
129        }
130    }
131
132    pub fn is_ui_thread() -> bool {
133        if !ENFORCE_UI_THREAD.load(Ordering::Relaxed) {
134            true
135        } else {
136            UI_THREAD_ID
137                .get()
138                .map(|id| *id == thread::current().id())
139                .unwrap_or(false)
140        }
141    }
142
143    pub(crate) fn register_effect(&self, effect: &Rc<dyn EffectTrait>) {
144        self.effects
145            .borrow_mut()
146            .insert(effect.id(), effect.clone());
147    }
148
149    pub(crate) fn remove_effect(&self, id: Id) {
150        self.effects.borrow_mut().remove(&id);
151    }
152
153    pub(crate) fn get_effect(&self, id: Id) -> Option<Rc<dyn EffectTrait>> {
154        self.effects.borrow().get(&id).cloned()
155    }
156
157    pub(crate) fn add_pending_effect(&self, effect_id: Id) {
158        let mut set = self.pending_effects_set.borrow_mut();
159        if set.insert(effect_id) {
160            self.pending_effects.borrow_mut().push(effect_id);
161        }
162    }
163
164    pub(crate) fn run_pending_effects(&self) {
165        loop {
166            let mut pending_effects = self.pending_effects.take();
167            if pending_effects.is_empty() {
168                break;
169            }
170            pending_effects.sort_by_key(|id| {
171                let priority = self
172                    .get_effect(*id)
173                    .map(|effect| effect.priority())
174                    .unwrap_or(EffectPriority::Normal);
175                (Reverse(priority), *id)
176            });
177            for effect_id in pending_effects {
178                self.pending_effects_set.borrow_mut().remove(&effect_id);
179                if let Some(effect) = self.get_effect(effect_id) {
180                    run_effect(effect);
181                }
182            }
183        }
184    }
185
186    /// Drain any queued work from the sync runtime and run pending UI effects.
187    pub fn drain_pending_work() {
188        Runtime::assert_ui_thread();
189        let pending_effects = SYNC_RUNTIME.take_pending_effects();
190        let pending_disposals = SYNC_RUNTIME.take_pending_disposals();
191        RUNTIME.with(|runtime| {
192            for id in pending_effects {
193                runtime.add_pending_effect(id);
194            }
195            for id in pending_disposals {
196                id.dispose();
197            }
198            runtime.run_pending_effects();
199        });
200    }
201
202    /// Returns true if there is queued work for this runtime or the sync runtime.
203    pub fn has_pending_work() -> bool {
204        RUNTIME.with(|runtime| !runtime.pending_effects.borrow().is_empty())
205            || SYNC_RUNTIME.has_pending_effects()
206            || SYNC_RUNTIME.has_pending_disposals()
207    }
208
209    /// Set a waker that will be called when a sync signal is updated off the UI thread.
210    /// The waker should nudge the UI event loop (e.g., by sending a proxy event).
211    pub fn set_sync_effect_waker(waker: impl Fn() + Send + Sync + 'static) {
212        SYNC_RUNTIME.set_waker(waker);
213    }
214}