1use std::{cell::Cell, collections::VecDeque, sync::Arc};
2
3use floem_reactive::{
4 create_effect, create_rw_signal, untrack, with_scope, ReadSignal, RwSignal, Scope, SignalGet,
5 SignalUpdate, SignalWith, WriteSignal,
6};
7use parking_lot::Mutex;
8
9use crate::{
10 app::UserEvent,
11 window_handle::{get_current_view, set_current_view},
12 Application,
13};
14
15#[cfg(feature = "crossbeam")]
16use crossbeam::channel::Receiver;
17#[cfg(not(feature = "crossbeam"))]
18use std::sync::mpsc::Receiver;
19
20#[derive(Debug)]
25pub struct ExtSendTrigger {
26 signal: RwSignal<()>,
27}
28
29impl Copy for ExtSendTrigger {}
30
31impl Clone for ExtSendTrigger {
32 fn clone(&self) -> Self {
33 *self
34 }
35}
36
37impl ExtSendTrigger {
38 pub fn notify(&self) {
39 self.signal.set(());
40 }
41
42 pub fn track(&self) {
43 self.signal.with(|_| {});
44 }
45
46 #[allow(clippy::new_without_default)]
47 pub fn new() -> Self {
48 create_trigger()
49 }
50}
51
52pub fn create_trigger() -> ExtSendTrigger {
53 ExtSendTrigger {
54 signal: create_rw_signal(()),
55 }
56}
57
58unsafe impl Send for ExtSendTrigger {}
59unsafe impl Sync for ExtSendTrigger {}
60
61pub(crate) static EXT_EVENT_HANDLER: ExtEventHandler = ExtEventHandler::new();
62
63pub(crate) struct ExtEventHandler {
64 pub(crate) queue: Mutex<VecDeque<ExtSendTrigger>>,
65}
66
67impl Default for ExtEventHandler {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl ExtEventHandler {
74 pub const fn new() -> Self {
75 Self {
76 queue: Mutex::new(VecDeque::new()),
77 }
78 }
79
80 pub fn add_trigger(&self, trigger: ExtSendTrigger) {
81 {
82 EXT_EVENT_HANDLER.queue.lock().push_back(trigger);
85 }
86 Application::send_proxy_event(UserEvent::Idle);
87 }
88}
89
90pub fn register_ext_trigger(trigger: ExtSendTrigger) {
91 EXT_EVENT_HANDLER.add_trigger(trigger);
92}
93
94pub fn create_ext_action<T: Send + 'static>(
95 cx: Scope,
96 action: impl FnOnce(T) + 'static,
97) -> impl FnOnce(T) {
98 let view = get_current_view();
99 let cx = cx.create_child();
100 let trigger = with_scope(cx, ExtSendTrigger::new);
101 let data = Arc::new(Mutex::new(None));
102
103 {
104 let data = data.clone();
105 let action = Cell::new(Some(action));
106 with_scope(cx, move || {
107 create_effect(move |_| {
108 trigger.track();
109 if let Some(event) = data.lock().take() {
110 untrack(|| {
111 let current_view = get_current_view();
112 set_current_view(view);
113 let action = action.take().unwrap();
114 action(event);
115 set_current_view(current_view);
116 });
117 cx.dispose();
118 }
119 });
120 });
121 }
122
123 move |event| {
124 *data.lock() = Some(event);
125 EXT_EVENT_HANDLER.add_trigger(trigger);
126 }
127}
128
129pub fn update_signal_from_channel<T: Send + 'static>(
130 writer: WriteSignal<Option<T>>,
131 rx: Receiver<T>,
132) {
133 let cx = Scope::new();
134 let trigger = with_scope(cx, ExtSendTrigger::new);
135
136 let channel_closed = cx.create_rw_signal(false);
137 let data = Arc::new(Mutex::new(VecDeque::new()));
138
139 {
140 let data = data.clone();
141 cx.create_effect(move |_| {
142 trigger.track();
143 while let Some(value) = data.lock().pop_front() {
144 writer.set(value);
145 }
146
147 if channel_closed.get() {
148 cx.dispose();
149 }
150 });
151 }
152
153 let send = create_ext_action(cx, move |_| {
154 channel_closed.set(true);
155 });
156
157 std::thread::spawn(move || {
158 while let Ok(event) = rx.recv() {
159 data.lock().push_back(Some(event));
160 EXT_EVENT_HANDLER.add_trigger(trigger);
161 }
162 send(());
163 });
164}
165
166pub fn create_signal_from_channel<T: Send + 'static>(rx: Receiver<T>) -> ReadSignal<Option<T>> {
167 let cx = Scope::new();
168 let trigger = with_scope(cx, ExtSendTrigger::new);
169
170 let channel_closed = cx.create_rw_signal(false);
171 let (read, write) = cx.create_signal(None);
172 let data = Arc::new(Mutex::new(VecDeque::new()));
173
174 {
175 let data = data.clone();
176 cx.create_effect(move |_| {
177 trigger.track();
178 while let Some(value) = data.lock().pop_front() {
179 write.set(value);
180 }
181
182 if channel_closed.get() {
183 cx.dispose();
184 }
185 });
186 }
187
188 let send = create_ext_action(cx, move |_| {
189 channel_closed.set(true);
190 });
191
192 std::thread::spawn(move || {
193 while let Ok(event) = rx.recv() {
194 data.lock().push_back(Some(event));
195 EXT_EVENT_HANDLER.add_trigger(trigger);
196 }
197 send(());
198 });
199
200 read
201}
202
203#[cfg(feature = "tokio")]
204pub fn create_signal_from_tokio_channel<T: Send + 'static>(
205 mut rx: tokio::sync::mpsc::UnboundedReceiver<T>,
206) -> ReadSignal<Option<T>> {
207 let cx = Scope::new();
208 let trigger = with_scope(cx, ExtSendTrigger::new);
209
210 let channel_closed = cx.create_rw_signal(false);
211 let (read, write) = cx.create_signal(None);
212 let data = std::sync::Arc::new(std::sync::Mutex::new(VecDeque::new()));
213
214 {
215 let data = data.clone();
216 cx.create_effect(move |_| {
217 trigger.track();
218 while let Some(value) = data.lock().unwrap().pop_front() {
219 write.set(value);
220 }
221
222 if channel_closed.get() {
223 cx.dispose();
224 }
225 });
226 }
227
228 let send = create_ext_action(cx, move |_| {
229 channel_closed.set(true);
230 });
231
232 tokio::spawn(async move {
233 while let Some(event) = rx.recv().await {
234 data.lock().unwrap().push_back(Some(event));
235 crate::ext_event::register_ext_trigger(trigger);
236 }
237 send(());
238 });
239
240 read
241}
242
243#[cfg(feature = "futures")]
244pub fn create_signal_from_stream<T: 'static>(
245 initial_value: T,
246 stream: impl futures::Stream<Item = T> + 'static,
247) -> ReadSignal<T> {
248 use std::{
249 cell::RefCell,
250 task::{Context, Poll},
251 };
252
253 use futures::task::{waker, ArcWake};
254
255 let cx = Scope::current().create_child();
256 let trigger = with_scope(cx, ExtSendTrigger::new);
257 let (read, write) = cx.create_signal(initial_value);
258
259 struct TriggerWake(ExtSendTrigger);
264 impl ArcWake for TriggerWake {
265 fn wake_by_ref(arc_self: &Arc<Self>) {
266 EXT_EVENT_HANDLER.add_trigger(arc_self.0);
267 }
268 }
269
270 let stream = RefCell::new(Box::pin(stream));
272 let arc_trigger = Arc::new(TriggerWake(trigger));
273
274 cx.create_effect(move |_| {
275 trigger.track();
277 let Ok(mut stream) = stream.try_borrow_mut() else {
278 unreachable!("The waker registers events effecs to be run only at idle")
279 };
280
281 let waker = waker(arc_trigger.clone());
282 let mut context = Context::from_waker(&waker);
283
284 let mut last_value = None;
285 loop {
288 let poll = stream.as_mut().poll_next(&mut context);
289 match poll {
290 Poll::Pending => break,
291 Poll::Ready(Some(v)) => last_value = Some(v),
292 Poll::Ready(None) => {
293 cx.dispose();
295 break;
296 }
297 }
298 }
299 if let Some(v) = last_value {
301 write.set(v);
302 }
303 });
304
305 read
306}