Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
veluca93 committed Sep 27, 2024
1 parent fdd3de6 commit 83dae42
Show file tree
Hide file tree
Showing 7 changed files with 2,090 additions and 339 deletions.
62 changes: 62 additions & 0 deletions pixie-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pixie-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ postcard = { version = "1.0.8", default-features = false, features = ["alloc"] }
serde = "1.0.193"
serde_derive = "1.0.193"
serde_yaml = "0.9"
tokio = { version = "1.34.0", features = ["macros", "fs", "rt-multi-thread"] }
tokio = { version = "1.34.0", features = ["macros", "fs", "rt-multi-thread", "sync"] }
serde_json = "1.0.108"
hostfile = "0.2.0"
hex = "0.4.3"
axum = { version = "0.7.2", features = ["ws"] }
tower-http = { version = "0.5.0", features = ["fs", "compression-gzip", "auth", "trace"] }
bytes = "1.5.0"
http-body-util = "0.1.0"
futures = "0.3.30"
tokio-stream = { version = "0.1.16", features = ["sync"] }

[dependencies.pixie-shared]
path = "../pixie-shared"
Expand Down
86 changes: 26 additions & 60 deletions pixie-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@ use std::{net::Ipv4Addr, sync::Arc};

use anyhow::Result;
use axum::{
extract::{
self,
ws::{self, Message},
Path,
},
body::Body,
extract::{self, Path},
http::StatusCode,
response::IntoResponse,
routing::get,
Router,
};
use futures::StreamExt;
use macaddr::MacAddr6;

use pixie_shared::{HttpConfig, WsUpdate};
use pixie_shared::{HttpConfig, StatusUpdate};
use tokio::net::TcpListener;
use tokio_stream::wrappers::WatchStream;
use tower_http::{
compression::CompressionLayer, services::ServeDir, trace::TraceLayer,
validate_request::ValidateRequestHeaderLayer,
Expand Down Expand Up @@ -146,58 +145,26 @@ async fn gc(extract::State(state): extract::State<Arc<State>>) -> String {
"".to_owned()
}

async fn ws(
extract::State(state): extract::State<Arc<State>>,
ws: extract::ws::WebSocketUpgrade,
) -> axum::response::Response {
ws.on_upgrade(move |mut socket| async move {
let msg = WsUpdate::Config(state.config.clone());
let msg = serde_json::to_string(&msg).unwrap();
let msg = Message::Text(msg);
socket.send(msg).await.unwrap();

let msg = WsUpdate::HostMap(state.hostmap.clone());
let msg = serde_json::to_string(&msg).unwrap();
let msg = Message::Text(msg);
socket.send(msg).await.unwrap();

let mut units_rx = state.units.subscribe();
units_rx.mark_changed();

let mut image_rx = state.image_stats.subscribe();
image_rx.mark_changed();

'main_loop: loop {
tokio::select! {
ret = units_rx.changed() => {
ret.unwrap();
let msg = {
let units = units_rx.borrow_and_update();
let msg = WsUpdate::Units(units.clone());
let msg = serde_json::to_string(&msg).unwrap();
ws::Message::Text(msg)
};
socket.send(msg).await.unwrap();
}
ret = image_rx.changed() => {
ret.unwrap();
let msg = {
let image_stats = image_rx.borrow_and_update();
let msg = WsUpdate::ImageStats(image_stats.clone());
let msg = serde_json::to_string(&msg).unwrap();
ws::Message::Text(msg)
};
socket.send(msg).await.unwrap();
}
packet = socket.recv() => {
let packet = packet.unwrap().unwrap();
if let Message::Close(_) = packet {
break 'main_loop;
}
}
};
}
})
async fn status(extract::State(state): extract::State<Arc<State>>) -> impl IntoResponse {
let initial_messages = vec![
StatusUpdate::Config(state.config.clone()),
StatusUpdate::HostMap(state.hostmap.clone()),
];
let mut units_rx = state.units.subscribe();
units_rx.mark_changed();
let units_rx = WatchStream::new(units_rx);

let mut image_rx = state.image_stats.subscribe();
image_rx.mark_changed();
let image_rx = WatchStream::new(image_rx);

let messages =
futures::stream::iter(initial_messages.into_iter()).chain(futures::stream::select(
image_rx.map(|x| StatusUpdate::ImageStats(x)),
units_rx.map(|x| StatusUpdate::Units(x)),
));
let lines = messages.map(|msg| serde_json::to_string(&msg).map(|x| x + "\n"));
Body::from_stream(lines)
}

pub async fn main(state: Arc<State>) -> Result<()> {
Expand All @@ -209,15 +176,14 @@ pub async fn main(state: Arc<State>) -> Result<()> {
let admin_path = state.storage_dir.join("admin");

let router = Router::new()
.route("/admin/ws", get(ws))
.route("/admin/status", get(status))
.route("/admin/gc", get(gc))
.route("/admin/action/:unit/:action", get(action))
.route("/admin/image/:unit/:image", get(image))
.nest_service(
"/",
ServeDir::new(&admin_path).append_index_html_on_directories(true),
)
.layer(CompressionLayer::new())
.layer(ValidateRequestHeaderLayer::basic("admin", password))
.layer(TraceLayer::new_for_http())
.with_state(state);
Expand Down
2 changes: 1 addition & 1 deletion pixie-shared/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ pub enum TcpRequest {

#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg(feature = "std")]
pub enum WsUpdate {
pub enum StatusUpdate {
Config(config::Config),
HostMap(HashMap<Ipv4Addr, String>),
Units(Vec<Unit>),
Expand Down
Loading

0 comments on commit 83dae42

Please sign in to comment.