1#![allow(clippy::type_complexity)]
10
11use std::{
12 collections::{BTreeMap, btree_map},
13 sync::{
14 Arc,
15 atomic::{AtomicU32, Ordering},
16 },
17};
18
19use futures::{FutureExt, pin_mut};
20use pinnacle_api_defs::pinnacle::signal::v1::{SignalRequest, StreamControl};
21use tokio::sync::{
22 mpsc::{UnboundedSender, unbounded_channel},
23 oneshot,
24};
25use tokio_stream::{StreamExt, wrappers::UnboundedReceiverStream};
26use tonic::Streaming;
27
28use crate::{
29 BlockOnTokio,
30 input::libinput::DeviceHandle,
31 output::OutputHandle,
32 tag::TagHandle,
33 window::{LayoutMode, WindowHandle},
34};
35
36pub(crate) trait Signal {
37 type Callback;
38}
39
40macro_rules! signals {
41 ( $(
42 $( #[$cfg_enum:meta] )* $enum:ident => {
43 $(
44 $( #[$cfg:meta] )* $name:ident = {
45 enum_name = $renamed:ident,
46 callback_type = $cb:ty,
47 client_request = $req:ident,
48 on_response = $on_resp:expr,
49 }
50 )*
51 }
52 )* ) => {$(
53 $(
54 $( #[$cfg] )*
55 pub(crate) struct $name;
56
57 impl $crate::signal::Signal for $name {
58 type Callback = $cb;
59 }
60
61 impl SignalData<$name> {
62 pub(crate) fn add_callback(&mut self, callback: <$name as Signal>::Callback) -> SignalHandle {
63 if self.callback_count.load(::std::sync::atomic::Ordering::SeqCst) == 0 {
64 self.connect()
65 }
66
67 let Some(callback_sender) = self.callback_sender.as_ref() else {
68 unreachable!("signal should already be connected here");
69 };
70
71 let Some(remove_callback_sender) = self.remove_callback_sender.clone() else {
72 unreachable!("signal should already be connected here");
73 };
74
75 callback_sender
76 .send((self.current_id, callback))
77 .expect("failed to send callback");
78
79 let handle = SignalHandle::new(self.current_id, remove_callback_sender);
80
81 self.current_id.0 += 1;
82
83 handle
84 }
85
86 fn reset(&mut self) {
87 self.callback_sender.take();
88 self.dc_pinger.take();
89 self.remove_callback_sender.take();
90 self.callback_count = Default::default();
91 self.current_id = SignalConnId::default();
92 }
93
94 fn connect(&mut self) {
95 self.reset();
96
97 let channels = connect_signal::<_, _, <$name as Signal>::Callback, _, _>(
98 self.callback_count.clone(),
99 |out| {
100 $crate::client::Client::signal().$req(out)
101 .block_on_tokio()
102 .expect("failed to request signal connection")
103 .into_inner()
104 },
105 $on_resp,
106 );
107
108 self.callback_sender.replace(channels.callback_sender);
109 self.dc_pinger.replace(channels.dc_pinger);
110 self.remove_callback_sender
111 .replace(channels.remove_callback_sender);
112 }
113 }
114 )*
115
116 $( #[$cfg_enum] )*
117 pub enum $enum {
118 $( $( #[$cfg] )* $renamed($cb),)*
119 }
120 )*};
121}
122
123signals! {
124 OutputSignal => {
126 OutputConnect = {
133 enum_name = Connect,
134 callback_type = SingleOutputFn,
135 client_request = output_connect,
136 on_response = |response, callbacks| {
137 let handle = OutputHandle { name: response.output_name };
138
139 for callback in callbacks {
140 callback(&handle);
141 }
142 },
143 }
144 OutputDisconnect = {
148 enum_name = Disconnect,
149 callback_type = SingleOutputFn,
150 client_request = output_disconnect,
151 on_response = |response, callbacks| {
152 let handle = OutputHandle { name: response.output_name };
153
154 for callback in callbacks {
155 callback(&handle);
156 }
157 },
158 }
159 OutputResize = {
163 enum_name = Resize,
164 callback_type = Box<dyn FnMut(&OutputHandle, u32, u32) + Send + 'static>,
165 client_request = output_resize,
166 on_response = |response, callbacks| {
167 let handle = OutputHandle { name: response.output_name };
168
169 for callback in callbacks {
170 callback(&handle, response.logical_width, response.logical_height)
171 }
172 },
173 }
174 OutputMove = {
178 enum_name = Move,
179 callback_type = Box<dyn FnMut(&OutputHandle, i32, i32) + Send + 'static>,
180 client_request = output_move,
181 on_response = |response, callbacks| {
182 let handle = OutputHandle { name: response.output_name };
183
184 for callback in callbacks {
185 callback(&handle, response.x, response.y)
186 }
187 },
188 }
189 OutputPointerEnter = {
193 enum_name = PointerEnter,
194 callback_type = SingleOutputFn,
195 client_request = output_pointer_enter,
196 on_response = |response, callbacks| {
197 let handle = OutputHandle { name: response.output_name };
198
199 for callback in callbacks {
200 callback(&handle);
201 }
202 },
203 }
204 OutputPointerLeave = {
208 enum_name = PointerLeave,
209 callback_type = SingleOutputFn,
210 client_request = output_pointer_leave,
211 on_response = |response, callbacks| {
212 let handle = OutputHandle { name: response.output_name };
213
214 for callback in callbacks {
215 callback(&handle);
216 }
217 },
218 }
219 OutputFocused = {
223 enum_name = Focused,
224 callback_type = SingleOutputFn,
225 client_request = output_focused,
226 on_response = |response, callbacks| {
227 let handle = OutputHandle { name: response.output_name };
228
229 for callback in callbacks {
230 callback(&handle);
231 }
232 },
233 }
234 }
235 WindowSignal => {
237 WindowPointerEnter = {
241 enum_name = PointerEnter,
242 callback_type = SingleWindowFn,
243 client_request = window_pointer_enter,
244 on_response = |response, callbacks| {
245 let handle = WindowHandle { id: response.window_id };
246
247 for callback in callbacks {
248 callback(&handle);
249 }
250 },
251 }
252 WindowPointerLeave = {
256 enum_name = PointerLeave,
257 callback_type = SingleWindowFn,
258 client_request = window_pointer_leave,
259 on_response = |response, callbacks| {
260 let handle = WindowHandle { id: response.window_id };
261
262 for callback in callbacks {
263 callback(&handle);
264 }
265 },
266 }
267 WindowFocused = {
271 enum_name = Focused,
272 callback_type = SingleWindowFn,
273 client_request = window_focused,
274 on_response = |response, callbacks| {
275 let handle = WindowHandle { id: response.window_id };
276
277 for callback in callbacks {
278 callback(&handle);
279 }
280 },
281 }
282 WindowTitleChanged = {
286 enum_name = TitleChanged,
287 callback_type = Box<dyn FnMut(&WindowHandle, &str) + Send + 'static>,
288 client_request = window_title_changed,
289 on_response = |response, callbacks| {
290 let handle = WindowHandle { id: response.window_id };
291 let title = response.title;
292
293 for callback in callbacks {
294 callback(&handle, &title);
295 }
296 },
297 }
298
299 WindowLayoutModeChanged = {
303 enum_name = LayoutModeChanged,
304 callback_type = Box<dyn FnMut(&WindowHandle, LayoutMode) + Send + 'static>,
305 client_request = window_layout_mode_changed,
306 on_response = |response, callbacks| {
307 let handle = WindowHandle { id: response.window_id };
308
309 if let Ok(layout_mode) = response.layout_mode().try_into() {
310 for callback in callbacks {
311 callback(&handle, layout_mode);
312 }
313 }
314 },
315 }
316
317
318 WindowCreated = {
322 enum_name = Created,
323 callback_type = SingleWindowFn,
324 client_request = window_created,
325 on_response = |response, callbacks| {
326 let handle = WindowHandle { id: response.window_id };
327 for callback in callbacks {
328 callback(&handle);
329 }
330 },
331 }
332
333 WindowDestroyed = {
339 enum_name = Destroyed,
340 callback_type = Box<dyn FnMut(&WindowHandle, &str, &str) + Send + 'static>,
341 client_request = window_destroyed,
342 on_response = |response, callbacks| {
343 let handle = WindowHandle { id: response.window_id };
344 let title = response.title;
345 let app_id = response.app_id;
346
347 for callback in callbacks {
348 callback(&handle, &title, &app_id);
349 }
350 },
351 }
352 }
353 TagSignal => {
355 TagActive = {
357 enum_name = Active,
358 callback_type = Box<dyn FnMut(&TagHandle, bool) + Send + 'static>,
359 client_request = tag_active,
360 on_response = |response, callbacks| {
361 let handle = TagHandle { id: response.tag_id };
362
363 for callback in callbacks {
364 callback(&handle, response.active);
365 }
366 },
367 }
368 TagCreated = {
370 enum_name = Created,
371 callback_type = Box<dyn FnMut(&TagHandle) + Send + 'static>,
372 client_request = tag_created,
373 on_response = |response, callbacks| {
374 let handle = TagHandle { id: response.tag_id };
375
376 for callback in callbacks {
377 callback(&handle);
378 }
379 },
380 }
381 TagRemoved = {
383 enum_name = Removed,
384 callback_type = Box<dyn FnMut(&TagHandle) + Send + 'static>,
385 client_request = tag_removed,
386 on_response = |response, callbacks| {
387 let handle = TagHandle { id: response.tag_id };
388
389 for callback in callbacks {
390 callback(&handle);
391 }
392 },
393 }
394 }
395 InputSignal => {
397 InputDeviceAdded = {
399 enum_name = DeviceAdded,
400 callback_type = Box<dyn FnMut(&DeviceHandle) + Send + 'static>,
401 client_request = input_device_added,
402 on_response = |response, callbacks| {
403 let handle = DeviceHandle { sysname: response.device_sysname };
404
405 for callback in callbacks {
406 callback(&handle);
407 }
408 },
409 }
410 }
411}
412
413pub(crate) type SingleOutputFn = Box<dyn FnMut(&OutputHandle) + Send + 'static>;
414pub(crate) type SingleWindowFn = Box<dyn FnMut(&WindowHandle) + Send + 'static>;
415
416pub(crate) struct SignalState {
417 pub(crate) output_connect: SignalData<OutputConnect>,
418 pub(crate) output_disconnect: SignalData<OutputDisconnect>,
419 pub(crate) output_resize: SignalData<OutputResize>,
420 pub(crate) output_move: SignalData<OutputMove>,
421 pub(crate) output_pointer_enter: SignalData<OutputPointerEnter>,
422 pub(crate) output_pointer_leave: SignalData<OutputPointerLeave>,
423 pub(crate) output_focused: SignalData<OutputFocused>,
424
425 pub(crate) window_pointer_enter: SignalData<WindowPointerEnter>,
426 pub(crate) window_pointer_leave: SignalData<WindowPointerLeave>,
427 pub(crate) window_focused: SignalData<WindowFocused>,
428 pub(crate) window_title_changed: SignalData<WindowTitleChanged>,
429 pub(crate) window_layout_mode_changed: SignalData<WindowLayoutModeChanged>,
430 pub(crate) window_created: SignalData<WindowCreated>,
431 pub(crate) window_destroyed: SignalData<WindowDestroyed>,
432
433 pub(crate) tag_active: SignalData<TagActive>,
434 pub(crate) tag_created: SignalData<TagCreated>,
435 pub(crate) tag_removed: SignalData<TagRemoved>,
436
437 pub(crate) input_device_added: SignalData<InputDeviceAdded>,
438}
439
440impl std::fmt::Debug for SignalState {
441 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
442 f.debug_struct("SignalState").finish()
443 }
444}
445
446impl SignalState {
447 pub(crate) fn new() -> Self {
448 Self {
449 output_connect: SignalData::new(),
450 output_disconnect: SignalData::new(),
451 output_resize: SignalData::new(),
452 output_move: SignalData::new(),
453 output_pointer_enter: SignalData::new(),
454 output_pointer_leave: SignalData::new(),
455 output_focused: SignalData::new(),
456
457 window_pointer_enter: SignalData::new(),
458 window_pointer_leave: SignalData::new(),
459 window_focused: SignalData::new(),
460 window_title_changed: SignalData::new(),
461 window_layout_mode_changed: SignalData::new(),
462 window_created: SignalData::new(),
463 window_destroyed: SignalData::new(),
464
465 tag_active: SignalData::new(),
466 tag_created: SignalData::new(),
467 tag_removed: SignalData::new(),
468
469 input_device_added: SignalData::new(),
470 }
471 }
472
473 pub(crate) fn shutdown(&mut self) {
474 self.output_connect.reset();
475 self.output_disconnect.reset();
476 self.output_resize.reset();
477 self.output_move.reset();
478 self.output_pointer_enter.reset();
479 self.output_pointer_leave.reset();
480 self.output_focused.reset();
481
482 self.window_pointer_enter.reset();
483 self.window_pointer_leave.reset();
484 self.window_focused.reset();
485 self.window_title_changed.reset();
486 self.window_layout_mode_changed.reset();
487 self.window_created.reset();
488 self.window_destroyed.reset();
489
490 self.tag_active.reset();
491 self.tag_created.reset();
492 self.tag_removed.reset();
493
494 self.input_device_added.reset();
495 }
496}
497
498#[derive(Default, Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
499pub(crate) struct SignalConnId(pub(crate) u32);
500
501pub(crate) struct SignalData<S: Signal> {
502 callback_sender: Option<UnboundedSender<(SignalConnId, S::Callback)>>,
503 remove_callback_sender: Option<UnboundedSender<SignalConnId>>,
504 dc_pinger: Option<oneshot::Sender<()>>,
505 current_id: SignalConnId,
506 callback_count: Arc<AtomicU32>,
507}
508
509impl<S: Signal> SignalData<S> {
510 fn new() -> Self {
511 Self {
512 callback_sender: Default::default(),
513 remove_callback_sender: Default::default(),
514 dc_pinger: Default::default(),
515 current_id: Default::default(),
516 callback_count: Default::default(),
517 }
518 }
519}
520
521struct ConnectSignalChannels<F> {
522 callback_sender: UnboundedSender<(SignalConnId, F)>,
523 dc_pinger: oneshot::Sender<()>,
524 remove_callback_sender: UnboundedSender<SignalConnId>,
525}
526
527fn connect_signal<Req, Resp, F, T, O>(
528 callback_count: Arc<AtomicU32>,
529 to_in_stream: T,
530 mut on_response: O,
531) -> ConnectSignalChannels<F>
532where
533 Req: SignalRequest + Send + 'static,
534 Resp: Send + 'static,
535 F: Send + 'static,
536 T: FnOnce(UnboundedReceiverStream<Req>) -> Streaming<Resp>,
537 O: FnMut(Resp, btree_map::ValuesMut<'_, SignalConnId, F>) + Send + 'static,
538{
539 let (control_sender, recv) = unbounded_channel::<Req>();
540 let out_stream = UnboundedReceiverStream::new(recv);
541
542 let mut in_stream = to_in_stream(out_stream);
543
544 let (callback_sender, mut callback_recv) = unbounded_channel::<(SignalConnId, F)>();
545 let (remove_callback_sender, mut remove_callback_recv) = unbounded_channel::<SignalConnId>();
546 let (dc_pinger, mut dc_ping_recv) = oneshot::channel::<()>();
547
548 let signal_future = async move {
549 let mut callbacks = BTreeMap::<SignalConnId, F>::new();
550
551 control_sender
552 .send(Req::from_control(StreamControl::Ready))
553 .map_err(|err| {
554 println!("{err}");
555 err
556 })
557 .expect("send failed");
558
559 loop {
560 let in_stream_next = in_stream.next().fuse();
561 pin_mut!(in_stream_next);
562 let callback_recv_recv = callback_recv.recv().fuse();
563 pin_mut!(callback_recv_recv);
564 let remove_callback_recv_recv = remove_callback_recv.recv().fuse();
565 pin_mut!(remove_callback_recv_recv);
566 let mut dc_ping_recv_fuse = (&mut dc_ping_recv).fuse();
567
568 futures::select! {
569 response = in_stream_next => {
570 let Some(response) = response else { continue };
571
572 match response {
573 Ok(response) => {
574 on_response(response, callbacks.values_mut());
575
576 control_sender
577 .send(Req::from_control(StreamControl::Ready))
578 .expect("send failed");
579
580 tokio::task::yield_now().await;
581 }
582 Err(status) => eprintln!("Error in recv: {status}"),
583 }
584 }
585 callback = callback_recv_recv => {
586 if let Some((id, callback)) = callback {
587 callbacks.insert(id, callback);
588 callback_count.fetch_add(1, Ordering::SeqCst);
589 }
590 }
591 remove = remove_callback_recv_recv => {
592 if let Some(id) = remove {
593 if callbacks.remove(&id).is_some() {
594 assert!(callback_count.fetch_sub(1, Ordering::SeqCst) > 0);
595 }
596 if callbacks.is_empty() {
597 assert!(callback_count.load(Ordering::SeqCst) == 0);
598 control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed");
599 break;
600 }
601 }
602 }
603 _dc = dc_ping_recv_fuse => {
604 let _ = control_sender.send(Req::from_control(StreamControl::Disconnect));
605 break;
606 }
607 }
608 }
609 };
610
611 tokio::spawn(signal_future);
612
613 ConnectSignalChannels {
614 callback_sender,
615 dc_pinger,
616 remove_callback_sender,
617 }
618}
619
620#[derive(Debug, Clone)]
624pub struct SignalHandle {
625 id: SignalConnId,
626 remove_callback_sender: UnboundedSender<SignalConnId>,
627}
628
629impl SignalHandle {
630 pub(crate) fn new(
631 id: SignalConnId,
632 remove_callback_sender: UnboundedSender<SignalConnId>,
633 ) -> Self {
634 Self {
635 id,
636 remove_callback_sender,
637 }
638 }
639
640 pub fn disconnect(&self) {
642 self.remove_callback_sender
643 .send(self.id)
644 .expect("failed to disconnect signal");
645 }
646}