floem_reactive/
runtime.rs

1use std::{
2    any::{Any, TypeId},
3    cell::{Cell, RefCell},
4    cmp::Reverse,
5    collections::{HashMap, HashSet},
6    rc::Rc,
7    sync::{
8        atomic::{AtomicBool, Ordering},
9        OnceLock,
10    },
11    thread::{self, ThreadId},
12};
13
14use smallvec::SmallVec;
15
16use crate::{
17    effect::{run_effect, EffectPriority, EffectTrait},
18    id::Id,
19    signal::SignalState,
20    sync_runtime::SYNC_RUNTIME,
21};
22
23/// Type alias for context storage within a scope.
24pub(crate) type ScopeContexts = HashMap<TypeId, Box<dyn Any>>;
25
26thread_local! {
27pub(crate) static RUNTIME: Runtime = Runtime::new();
28}
29
30static UI_THREAD_ID: OnceLock<ThreadId> = OnceLock::new();
31#[cfg(debug_assertions)]
32static UI_THREAD_SET_LOCATION: OnceLock<&'static std::panic::Location<'static>> = OnceLock::new();
33static ENFORCE_UI_THREAD: AtomicBool = AtomicBool::new(false);
34
35/// The internal reactive Runtime which stores all the reactive system states in a
36/// thread local.
37pub struct Runtime {
38    pub(crate) current_effect: RefCell<Option<Rc<dyn EffectTrait>>>,
39    pub(crate) current_scope: RefCell<Id>,
40    pub(crate) children: RefCell<HashMap<Id, HashSet<Id>>>,
41    pub(crate) parents: RefCell<HashMap<Id, Id>>,
42    pub(crate) signals: RefCell<HashMap<Id, SignalState>>,
43    pub(crate) effects: RefCell<HashMap<Id, Rc<dyn EffectTrait>>>,
44    pub(crate) scope_contexts: RefCell<HashMap<Id, ScopeContexts>>,
45    pub(crate) batching: Cell<bool>,
46    pub(crate) pending_effects: RefCell<SmallVec<[Id; 10]>>,
47    pub(crate) pending_effects_set: RefCell<HashSet<Id>>,
48}
49
50impl Default for Runtime {
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56impl Runtime {
57    pub(crate) fn new() -> Self {
58        Self {
59            current_effect: RefCell::new(None),
60            current_scope: RefCell::new(Id::next()),
61            children: RefCell::new(HashMap::new()),
62            parents: RefCell::new(HashMap::new()),
63            signals: Default::default(),
64            effects: Default::default(),
65            scope_contexts: Default::default(),
66            batching: Cell::new(false),
67            pending_effects: RefCell::new(SmallVec::new()),
68            pending_effects_set: RefCell::new(HashSet::new()),
69        }
70    }
71
72    /// Call this once on the UI thread during initialization.
73    #[cfg_attr(debug_assertions, track_caller)]
74    pub fn init_on_ui_thread() {
75        let current = thread::current().id();
76        match UI_THREAD_ID.set(current) {
77            Ok(_) => {}
78            Err(_) => {
79                assert_eq!(
80                    UI_THREAD_ID.get(),
81                    Some(&current),
82                    "UI thread id already set to a different thread"
83                );
84            }
85        }
86        #[cfg(debug_assertions)]
87        {
88            let caller = std::panic::Location::caller();
89            let _ = UI_THREAD_SET_LOCATION.set(caller);
90        }
91        ENFORCE_UI_THREAD.store(true, Ordering::Relaxed);
92    }
93
94    #[cfg_attr(debug_assertions, track_caller)]
95    pub fn assert_ui_thread() {
96        if !ENFORCE_UI_THREAD.load(Ordering::Relaxed) {
97            return;
98        }
99
100        let current = thread::current().id();
101        match UI_THREAD_ID.get() {
102            Some(ui_id) => {
103                if *ui_id != current {
104                    #[cfg(debug_assertions)]
105                    {
106                        let caller = std::panic::Location::caller();
107                        let set_at = UI_THREAD_SET_LOCATION.get();
108                        let set_info = set_at
109                            .map(|loc| format!(" (set at {}:{})", loc.file(), loc.line()))
110                            .unwrap_or_default();
111                        panic!(
112                            "Unsync runtime access from non-UI thread\n  expected UI thread: {:?}{}\n  current thread: {:?}\n  caller: {}:{}",
113                            ui_id,
114                            set_info,
115                            current,
116                            caller.file(),
117                            caller.line(),
118                        );
119                    }
120                    #[cfg(not(debug_assertions))]
121                    {
122                        assert_eq!(
123                            *ui_id, current,
124                            "Unsync runtime access from non-UI thread: expected {:?}, got {:?}",
125                            ui_id, current
126                        );
127                    }
128                }
129            }
130            None => {
131                // Once enforcement is on, first access defines the UI thread.
132                let _ = UI_THREAD_ID.set(current);
133            }
134        }
135    }
136
137    pub fn is_ui_thread() -> bool {
138        if !ENFORCE_UI_THREAD.load(Ordering::Relaxed) {
139            true
140        } else {
141            UI_THREAD_ID
142                .get()
143                .map(|id| *id == thread::current().id())
144                .unwrap_or(false)
145        }
146    }
147
148    pub(crate) fn register_effect(&self, effect: &Rc<dyn EffectTrait>) {
149        self.effects
150            .borrow_mut()
151            .insert(effect.id(), effect.clone());
152    }
153
154    pub(crate) fn remove_effect(&self, id: Id) {
155        self.effects.borrow_mut().remove(&id);
156    }
157
158    pub(crate) fn get_effect(&self, id: Id) -> Option<Rc<dyn EffectTrait>> {
159        self.effects.borrow().get(&id).cloned()
160    }
161
162    pub(crate) fn add_pending_effect(&self, effect_id: Id) {
163        let mut set = self.pending_effects_set.borrow_mut();
164        if set.insert(effect_id) {
165            self.pending_effects.borrow_mut().push(effect_id);
166        }
167    }
168
169    pub(crate) fn run_pending_effects(&self) {
170        loop {
171            let mut pending_effects = self.pending_effects.take();
172            if pending_effects.is_empty() {
173                break;
174            }
175            pending_effects.sort_by_key(|id| {
176                let priority = self
177                    .get_effect(*id)
178                    .map(|effect| effect.priority())
179                    .unwrap_or(EffectPriority::Normal);
180                (Reverse(priority), *id)
181            });
182            for effect_id in pending_effects {
183                self.pending_effects_set.borrow_mut().remove(&effect_id);
184                if let Some(effect) = self.get_effect(effect_id) {
185                    run_effect(effect);
186                }
187            }
188        }
189    }
190
191    /// Drain any queued work from the sync runtime and run pending UI effects.
192    pub fn drain_pending_work() {
193        Runtime::assert_ui_thread();
194        let pending_effects = SYNC_RUNTIME.take_pending_effects();
195        let pending_disposals = SYNC_RUNTIME.take_pending_disposals();
196        RUNTIME.with(|runtime| {
197            for id in pending_effects {
198                runtime.add_pending_effect(id);
199            }
200            for id in pending_disposals {
201                id.dispose();
202            }
203            runtime.run_pending_effects();
204        });
205    }
206
207    /// Returns true if there is queued work for this runtime or the sync runtime.
208    pub fn has_pending_work() -> bool {
209        RUNTIME.with(|runtime| !runtime.pending_effects.borrow().is_empty())
210            || SYNC_RUNTIME.has_pending_effects()
211            || SYNC_RUNTIME.has_pending_disposals()
212    }
213
214    /// Set a waker that will be called when a sync signal is updated off the UI thread.
215    /// The waker should nudge the UI event loop (e.g., by sending a proxy event).
216    pub fn set_sync_effect_waker(waker: impl Fn() + Send + Sync + 'static) {
217        SYNC_RUNTIME.set_waker(waker);
218    }
219}