-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathutils.rs
134 lines (125 loc) · 3.98 KB
/
utils.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use anyhow::anyhow;
use resources::{
informer::{EventHandler, Informer, ListerWatcher, ResyncHandler},
models,
objects::{ingress::Ingress, service::Service},
};
use tokio::sync::mpsc::Sender;
use tokio_tungstenite::connect_async;
use crate::{Notification, CONFIG};
pub fn create_svc_informer(tx: Sender<Notification>) -> Informer<Service> {
let lw = ListerWatcher {
lister: Box::new(|_| {
Box::pin(async {
let res = reqwest::get(CONFIG.api_server_endpoint.join("/api/v1/services")?)
.await?
.json::<models::Response<Vec<Service>>>()
.await?;
let res = res.data.ok_or_else(|| anyhow!("Lister failed"))?;
Ok(res)
})
}),
watcher: Box::new(|_| {
Box::pin(async {
let mut url = CONFIG.api_server_endpoint.join("/api/v1/watch/services")?;
url.set_scheme("ws").ok();
let (stream, _) = connect_async(url).await?;
Ok(stream)
})
}),
};
// create event handler closures
let tx_add = tx.clone();
let tx_update = tx.clone();
let tx_delete = tx.clone();
let eh = EventHandler {
add_cls: Box::new(move |_| {
let tx_add = tx_add.clone();
Box::pin(async move {
tx_add.send(Notification).await?;
Ok(())
})
}),
update_cls: Box::new(move |_| {
let tx_update = tx_update.clone();
Box::pin(async move {
tx_update.send(Notification).await?;
Ok(())
})
}),
delete_cls: Box::new(move |_| {
let tx_delete = tx_delete.clone();
Box::pin(async move {
tx_delete.send(Notification).await?;
Ok(())
})
}),
};
let rh = ResyncHandler(Box::new(move |_| {
let tx = tx.clone();
Box::pin(async move {
tx.send(Notification).await?;
Ok(())
})
}));
// start the informer
Informer::new(lw, eh, rh)
}
pub fn create_ingress_informer(tx: Sender<Notification>) -> Informer<Ingress> {
let lw = ListerWatcher {
lister: Box::new(|_| {
Box::pin(async {
let res = reqwest::get(CONFIG.api_server_endpoint.join("/api/v1/ingresses")?)
.await?
.json::<models::Response<Vec<Ingress>>>()
.await?;
let res = res.data.ok_or_else(|| anyhow!("Lister failed"))?;
Ok(res)
})
}),
watcher: Box::new(|_| {
Box::pin(async {
let mut url = CONFIG.api_server_endpoint.join("/api/v1/watch/ingresses")?;
url.set_scheme("ws").ok();
let (stream, _) = connect_async(url).await?;
Ok(stream)
})
}),
};
// create event handler closures
let tx_add = tx.clone();
let tx_update = tx.clone();
let tx_delete = tx.clone();
let eh = EventHandler {
add_cls: Box::new(move |_| {
let tx_add = tx_add.clone();
Box::pin(async move {
tx_add.send(Notification).await?;
Ok(())
})
}),
update_cls: Box::new(move |_| {
let tx_update = tx_update.clone();
Box::pin(async move {
tx_update.send(Notification).await?;
Ok(())
})
}),
delete_cls: Box::new(move |_| {
let tx_delete = tx_delete.clone();
Box::pin(async move {
tx_delete.send(Notification).await?;
Ok(())
})
}),
};
let rh = ResyncHandler(Box::new(move |_| {
let tx = tx.clone();
Box::pin(async move {
tx.send(Notification).await?;
Ok(())
})
}));
// start the informer
Informer::new(lw, eh, rh)
}