diff --git a/crates/rkgk/src/api/wall.rs b/crates/rkgk/src/api/wall.rs index dbe2d51..7d4f25d 100644 --- a/crates/rkgk/src/api/wall.rs +++ b/crates/rkgk/src/api/wall.rs @@ -412,7 +412,7 @@ impl SessionLoop { } self.pending_images - .extend(self.chunk_images.encoded(positions).await); + .extend(self.chunk_images.encoded(positions).await.data); } while let Some(ChunkDataPair { position, data }) = self.pending_images.pop_front() { @@ -567,9 +567,10 @@ fn draw_to_chunks(wall: &Wall, render_area: RenderArea, vm: &mut Haku) { let y = f32::floor(-chunk_y as f32 * chunk_size); let chunk_ref = wall.get_or_create_chunk(ChunkPosition::new(chunk_x, chunk_y)); let mut chunk = chunk_ref.blocking_lock(); + chunk.touch(); let mut canvas = ChunkCanvas::new(&mut chunk).translated(x, y); if let Err(e) = vm.render(&mut canvas, 256) { - error!(chunk_x, chunk_y, "draw_to_chunks2 exception: {e:?}"); + info!(chunk_x, chunk_y, "drawing failed: {e}"); } } } diff --git a/crates/rkgk/src/wall.rs b/crates/rkgk/src/wall.rs index e75b374..e283cbe 100644 --- a/crates/rkgk/src/wall.rs +++ b/crates/rkgk/src/wall.rs @@ -6,6 +6,7 @@ use std::{ atomic::{self, AtomicU32}, Arc, Weak, }, + time::Instant, }; use dashmap::DashMap; @@ -111,15 +112,21 @@ impl ChunkPosition { } pub struct Chunk { + pub last_mod: Instant, pub pixmap: Pixmap, } impl Chunk { pub fn new(size: u32) -> Self { Self { + last_mod: Instant::now(), pixmap: Pixmap::new(size, size).unwrap(), } } + + pub fn touch(&mut self) { + self.last_mod = Instant::now(); + } } #[derive(Debug, Clone, Copy, Deserialize, Serialize)] @@ -151,6 +158,7 @@ impl Settings { } } +/// Represents the loaded portion of a wall. pub struct Wall { settings: Settings, diff --git a/crates/rkgk/src/wall/auto_save.rs b/crates/rkgk/src/wall/auto_save.rs index 55f26d1..5029305 100644 --- a/crates/rkgk/src/wall/auto_save.rs +++ b/crates/rkgk/src/wall/auto_save.rs @@ -1,29 +1,39 @@ -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Duration, Instant}, +}; use serde::{Deserialize, Serialize}; use tokio::{ sync::mpsc, + task::JoinSet, time::{interval, MissedTickBehavior}, }; use tracing::{info, instrument}; -use super::{chunk_images::ChunkImages, ChunkPosition}; +use super::{chunk_images::ChunkImages, ChunkPosition, Wall}; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Settings { pub interval_seconds: u64, } +/// Auto save and garbage collection service. +/// +/// These two have to live together under one service, because garbage collection has to be aware of +/// which chunks are yet to be saved. pub struct AutoSave { requests_tx: mpsc::Sender>, } impl AutoSave { - pub fn new(chunk_images: Arc, settings: Settings) -> Self { + pub fn new(wall: Arc, chunk_images: Arc, settings: Settings) -> Self { let (requests_tx, requests_rx) = mpsc::channel(8); tokio::spawn( AutoSaveLoop { + wall, chunk_images, settings, requests_rx, @@ -41,6 +51,7 @@ impl AutoSave { } struct AutoSaveLoop { + wall: Arc, chunk_images: Arc, settings: Settings, @@ -65,7 +76,7 @@ impl AutoSaveLoop { } } - _ = save_interval.tick() => self.save_chunks().await, + _ = save_interval.tick() => self.save_and_gc_chunks().await, else => break, } @@ -73,16 +84,46 @@ impl AutoSaveLoop { } #[instrument(skip(self), fields(num_chunks = self.unsaved_chunks.len()))] - async fn save_chunks(&mut self) { - // NOTE: We don't care about actually using the images here - - // the ChunkImages service writes them to the database by itself, and that's all our + async fn save_and_gc_chunks(&mut self) { + // NOTE: We don't care about actually using the images here. + // The ChunkImages service writes them to the database by itself, and that's all our // request is for. if !self.unsaved_chunks.is_empty() { info!("saving chunks"); - _ = self + let encoded = self .chunk_images .encoded(self.unsaved_chunks.drain().collect()) .await; + + info!("collecting garbage"); + let mut join_set = JoinSet::new(); + for new in encoded.new { + let wall = self.wall.clone(); + join_set.spawn(async move { + // Try GCing the chunk. If there's anyone other than the wall, or if it's been + // modified since we last touched it, don't GC. It will be collected in + // the next cycle. + // This shouldn't result in any race conditions, because remove_if locks the + // shard of the dashmap in which the chunk is stored, preventing read or write + // access by any other threads. + let removed = wall.chunks.remove_if(&new.position, |_k, chunk_ref| { + let Ok(chunk) = chunk_ref.try_lock() else { + return false; + }; + + Arc::strong_count(chunk_ref) == 1 && chunk.last_mod == new.last_mod + }); + removed.is_some() + }); + } + + let mut gc_count: usize = 0; + while let Some(was_removed) = join_set.join_next().await { + if let Ok(true) = was_removed { + gc_count += 1; + } + } + info!(gc_count, loaded = self.wall.chunks.len(), "gc done"); } } } diff --git a/crates/rkgk/src/wall/broker.rs b/crates/rkgk/src/wall/broker.rs index dae3a92..96bfe8f 100644 --- a/crates/rkgk/src/wall/broker.rs +++ b/crates/rkgk/src/wall/broker.rs @@ -70,6 +70,7 @@ impl Broker { let wall = Arc::new(Wall::new(*db.wall_settings())); let chunk_images = Arc::new(ChunkImages::new(Arc::clone(&wall), Arc::clone(&db))); let auto_save = Arc::new(AutoSave::new( + Arc::clone(&wall), Arc::clone(&chunk_images), self.settings.auto_save.clone(), )); diff --git a/crates/rkgk/src/wall/chunk_images.rs b/crates/rkgk/src/wall/chunk_images.rs index 734db9c..da8dddc 100644 --- a/crates/rkgk/src/wall/chunk_images.rs +++ b/crates/rkgk/src/wall/chunk_images.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use dashmap::DashSet; use eyre::Context; @@ -16,10 +16,22 @@ pub struct ChunkImages { commands_tx: mpsc::Sender, } +#[derive(Debug, Clone, Default)] +pub struct EncodeResult { + pub data: Vec, + pub new: Vec, +} + +#[derive(Debug, Clone, Copy)] +pub struct NewlyEncoded { + pub position: ChunkPosition, + pub last_mod: Instant, +} + enum Command { Encode { chunks: Vec, - reply: oneshot::Sender>, + reply: oneshot::Sender, }, Load { @@ -46,7 +58,7 @@ impl ChunkImages { } } - pub async fn encoded(&self, chunks: Vec) -> Vec { + pub async fn encoded(&self, chunks: Vec) -> EncodeResult { let (tx, rx) = oneshot::channel(); _ = self .commands_tx @@ -81,7 +93,7 @@ impl ChunkImageLoop { async fn encode( self: Arc, mut chunks: Vec, - reply: oneshot::Sender>, + reply: oneshot::Sender, ) { // Any chunks that are _not_ loaded, we will send back directly from the database. let mut unloaded = vec![]; @@ -93,16 +105,16 @@ impl ChunkImageLoop { // Any chunks that _are_ loaded, we will encode into WebP. let this = Arc::clone(&self); - let loaded = tokio::task::spawn_blocking(move || { + let (loaded, new): (Vec<_>, Vec<_>) = tokio::task::spawn_blocking(move || { chunks .into_par_iter() .flat_map(|position| -> Option<_> { - let pixmap = { + let (pixmap, last_mod) = { // Clone out the pixmap to avoid unnecessary chunk mutex contention while the // chunk is being encoded. let chunk_ref = this.wall.get_chunk(position)?; let chunk = chunk_ref.blocking_lock(); - chunk.pixmap.clone() + (chunk.pixmap.clone(), chunk.last_mod) }; let webp = webp::Encoder::new( @@ -112,12 +124,15 @@ impl ChunkImageLoop { pixmap.height(), ); // NOTE: There's an unnecessary copy here. Wonder if that kills performance much. - Some(ChunkDataPair { - position, - data: Arc::from(webp.encode_lossless().to_vec()), - }) + Some(( + ChunkDataPair { + position, + data: Arc::from(webp.encode_lossless().to_vec()), + }, + NewlyEncoded { position, last_mod }, + )) }) - .collect::>() + .unzip() }) .await .unwrap(); @@ -138,7 +153,7 @@ impl ChunkImageLoop { Err(err) => error!(?err, "read_chunks failed to read unloaded chunks"), } - _ = reply.send(all); + _ = reply.send(EncodeResult { data: all, new }); } async fn load_inner(self: Arc, mut chunks: Vec) -> eyre::Result<()> { diff --git a/crates/rkgk/src/wall/database.rs b/crates/rkgk/src/wall/database.rs index 6b70b93..4810d6a 100644 --- a/crates/rkgk/src/wall/database.rs +++ b/crates/rkgk/src/wall/database.rs @@ -1,4 +1,4 @@ -use std::{convert::identity, path::PathBuf, sync::Arc}; +use std::{convert::identity, path::PathBuf, sync::Arc, time::Instant}; use eyre::Context; use rusqlite::Connection;