Skip to content

Commit

Permalink
WIP: Support for SIGTERM
Browse files Browse the repository at this point in the history
  • Loading branch information
Virv12 committed Sep 30, 2024
1 parent 01596ee commit f065ce3
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 24 deletions.
10 changes: 10 additions & 0 deletions pixie-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pixie-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ postcard = { version = "1.0.8", default-features = false, features = ["alloc"] }
serde = "1.0.193"
serde_derive = "1.0.193"
serde_yaml = "0.9"
tokio = { version = "1.34.0", features = ["macros", "fs", "rt-multi-thread", "sync"] }
tokio = { version = "1.34.0", features = ["macros", "fs", "rt-multi-thread", "sync", "signal"] }
serde_json = "1.0.108"
hostfile = "0.2.0"
hex = "0.4.3"
Expand Down
12 changes: 12 additions & 0 deletions pixie-server/src/dnsmasq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ pub async fn main(state: Arc<State>) -> Result<()> {
.context("Failed to start dnsmasq")?,
};

log::info!("Starting");

let signal_kind = tokio::signal::unix::SignalKind::terminate();
let mut signal = tokio::signal::unix::signal(signal_kind)?;

loop {
tokio::select! {
_ = units_rx.changed() => {
Expand All @@ -140,6 +145,13 @@ pub async fn main(state: Arc<State>) -> Result<()> {
dnsmasq.reload()?;
}
}

_ = signal.recv() => {
break;
}
}
}

log::info!("Shutting down");
Ok(())
}
11 changes: 9 additions & 2 deletions pixie-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async fn image(
}

async fn gc(extract::State(state): extract::State<Arc<State>>) -> String {
state.gc_chunks().unwrap();
state.gc_chunks();
"".to_owned()
}

Expand Down Expand Up @@ -191,8 +191,15 @@ pub async fn main(state: Arc<State>) -> Result<()> {
.layer(TraceLayer::new_for_http())
.with_state(state);

let signal_kind = tokio::signal::unix::SignalKind::terminate();
let mut signal = tokio::signal::unix::signal(signal_kind)?;

let listener = TcpListener::bind(listen_on).await?;
axum::serve(listener, router).await?;
axum::serve(listener, router)
.with_graceful_shutdown(async move {
signal.recv().await;
})
.await?;

Ok(())
}
43 changes: 28 additions & 15 deletions pixie-server/src/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,37 @@ pub async fn main(state: Arc<State>) -> Result<()> {
let socket = UdpSocket::bind((state.config.hosts.listen_on, PING_PORT)).await?;
log::info!("Listening on {}", socket.local_addr()?);

let signal_kind = tokio::signal::unix::SignalKind::terminate();
let mut signal = tokio::signal::unix::signal(signal_kind)?;

let mut buf = [0; PACKET_LEN];
loop {
let (len, peer_addr) = socket.recv_from(&mut buf).await?;
let IpAddr::V4(peer_ip) = peer_addr.ip() else {
bail!("IPv6 is not supported")
};
let peer_mac = match find_mac(peer_ip) {
Ok(peer_mac) => peer_mac,
Err(err) => {
log::error!("Error handling ping packet: {}", err);
continue;
tokio::select! {
res = socket.recv_from(&mut buf) => {
let (len, peer_addr) = res?;
let IpAddr::V4(peer_ip) = peer_addr.ip() else {
bail!("IPv6 is not supported")
};
let peer_mac = match find_mac(peer_ip) {
Ok(peer_mac) => peer_mac,
Err(err) => {
log::error!("Error handling ping packet: {}", err);
continue;
}
};
let time = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
state.set_ping(peer_mac, time, buf[..len].to_owned());
}
};

let time = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
state.set_ping(peer_mac, time, buf[..len].to_owned());
_ = signal.recv() => {
break;
}
};
}

log::info!("Shutting down");
Ok(())
}
7 changes: 2 additions & 5 deletions pixie-server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl State {
})
}

pub fn gc_chunks(&self) -> Result<()> {
pub fn gc_chunks(&self) {
self.image_stats.send_modify(|image_stats| {
let mut chunk_stats = self.chunk_stats.lock().unwrap();
let mut cnt = 0;
Expand All @@ -169,10 +169,9 @@ impl State {
}
});
});
Ok(())
}

pub fn add_chunk(&self, hash: ChunkHash, data: &[u8]) -> Result<()> {
pub fn add_chunk(&self, hash: ChunkHash, data: &[u8]) {
let path = self.storage_dir.join("chunks").join(hex::encode(hash));

self.image_stats.send_modify(|image_stats| {
Expand All @@ -189,8 +188,6 @@ impl State {
image_stats.reclaimable += data.len() as u64;
}
});

Ok(())
}

pub fn add_image(&self, name: String, image: Image) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion pixie-server/src/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn handle_request(state: &State, req: TcpRequest, peer_mac: MacAddr6) -> R
Vec::new()
}
TcpRequest::UploadChunk(hash, data) => {
state.add_chunk(hash, &data)?;
state.add_chunk(hash, &data);
Vec::new()
}
TcpRequest::UploadImage(name, image) => {
Expand Down

0 comments on commit f065ce3

Please sign in to comment.