implement chunk unloading (garbage collection)

unloading is done together with the autosave process, because it's the most natural place to do it.
since we literally just wrote to the database and can easily check if the chunk hasn't been written to since.
This commit is contained in:
りき萌 2025-06-28 09:38:31 +02:00
parent 1d2f98348f
commit 4bd00e92d8
6 changed files with 90 additions and 24 deletions

View file

@ -412,7 +412,7 @@ impl SessionLoop {
}
self.pending_images
.extend(self.chunk_images.encoded(positions).await);
.extend(self.chunk_images.encoded(positions).await.data);
}
while let Some(ChunkDataPair { position, data }) = self.pending_images.pop_front() {
@ -567,9 +567,10 @@ fn draw_to_chunks(wall: &Wall, render_area: RenderArea, vm: &mut Haku) {
let y = f32::floor(-chunk_y as f32 * chunk_size);
let chunk_ref = wall.get_or_create_chunk(ChunkPosition::new(chunk_x, chunk_y));
let mut chunk = chunk_ref.blocking_lock();
chunk.touch();
let mut canvas = ChunkCanvas::new(&mut chunk).translated(x, y);
if let Err(e) = vm.render(&mut canvas, 256) {
error!(chunk_x, chunk_y, "draw_to_chunks2 exception: {e:?}");
info!(chunk_x, chunk_y, "drawing failed: {e}");
}
}
}

View file

@ -6,6 +6,7 @@ use std::{
atomic::{self, AtomicU32},
Arc, Weak,
},
time::Instant,
};
use dashmap::DashMap;
@ -111,15 +112,21 @@ impl ChunkPosition {
}
pub struct Chunk {
pub last_mod: Instant,
pub pixmap: Pixmap,
}
impl Chunk {
pub fn new(size: u32) -> Self {
Self {
last_mod: Instant::now(),
pixmap: Pixmap::new(size, size).unwrap(),
}
}
pub fn touch(&mut self) {
self.last_mod = Instant::now();
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
@ -151,6 +158,7 @@ impl Settings {
}
}
/// Represents the loaded portion of a wall.
pub struct Wall {
settings: Settings,

View file

@ -1,29 +1,39 @@
use std::{collections::HashSet, sync::Arc, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::{Duration, Instant},
};
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc,
task::JoinSet,
time::{interval, MissedTickBehavior},
};
use tracing::{info, instrument};
use super::{chunk_images::ChunkImages, ChunkPosition};
use super::{chunk_images::ChunkImages, ChunkPosition, Wall};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Settings {
pub interval_seconds: u64,
}
/// Auto save and garbage collection service.
///
/// These two have to live together under one service, because garbage collection has to be aware of
/// which chunks are yet to be saved.
pub struct AutoSave {
requests_tx: mpsc::Sender<Vec<ChunkPosition>>,
}
impl AutoSave {
pub fn new(chunk_images: Arc<ChunkImages>, settings: Settings) -> Self {
pub fn new(wall: Arc<Wall>, chunk_images: Arc<ChunkImages>, settings: Settings) -> Self {
let (requests_tx, requests_rx) = mpsc::channel(8);
tokio::spawn(
AutoSaveLoop {
wall,
chunk_images,
settings,
requests_rx,
@ -41,6 +51,7 @@ impl AutoSave {
}
struct AutoSaveLoop {
wall: Arc<Wall>,
chunk_images: Arc<ChunkImages>,
settings: Settings,
@ -65,7 +76,7 @@ impl AutoSaveLoop {
}
}
_ = save_interval.tick() => self.save_chunks().await,
_ = save_interval.tick() => self.save_and_gc_chunks().await,
else => break,
}
@ -73,16 +84,46 @@ impl AutoSaveLoop {
}
#[instrument(skip(self), fields(num_chunks = self.unsaved_chunks.len()))]
async fn save_chunks(&mut self) {
// NOTE: We don't care about actually using the images here -
// the ChunkImages service writes them to the database by itself, and that's all our
async fn save_and_gc_chunks(&mut self) {
// NOTE: We don't care about actually using the images here.
// The ChunkImages service writes them to the database by itself, and that's all our
// request is for.
if !self.unsaved_chunks.is_empty() {
info!("saving chunks");
_ = self
let encoded = self
.chunk_images
.encoded(self.unsaved_chunks.drain().collect())
.await;
info!("collecting garbage");
let mut join_set = JoinSet::new();
for new in encoded.new {
let wall = self.wall.clone();
join_set.spawn(async move {
// Try GCing the chunk. If there's anyone other than the wall, or if it's been
// modified since we last touched it, don't GC. It will be collected in
// the next cycle.
// This shouldn't result in any race conditions, because remove_if locks the
// shard of the dashmap in which the chunk is stored, preventing read or write
// access by any other threads.
let removed = wall.chunks.remove_if(&new.position, |_k, chunk_ref| {
let Ok(chunk) = chunk_ref.try_lock() else {
return false;
};
Arc::strong_count(chunk_ref) == 1 && chunk.last_mod == new.last_mod
});
removed.is_some()
});
}
let mut gc_count: usize = 0;
while let Some(was_removed) = join_set.join_next().await {
if let Ok(true) = was_removed {
gc_count += 1;
}
}
info!(gc_count, loaded = self.wall.chunks.len(), "gc done");
}
}
}

View file

@ -70,6 +70,7 @@ impl Broker {
let wall = Arc::new(Wall::new(*db.wall_settings()));
let chunk_images = Arc::new(ChunkImages::new(Arc::clone(&wall), Arc::clone(&db)));
let auto_save = Arc::new(AutoSave::new(
Arc::clone(&wall),
Arc::clone(&chunk_images),
self.settings.auto_save.clone(),
));

View file

@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use dashmap::DashSet;
use eyre::Context;
@ -16,10 +16,22 @@ pub struct ChunkImages {
commands_tx: mpsc::Sender<Command>,
}
#[derive(Debug, Clone, Default)]
pub struct EncodeResult {
pub data: Vec<ChunkDataPair>,
pub new: Vec<NewlyEncoded>,
}
#[derive(Debug, Clone, Copy)]
pub struct NewlyEncoded {
pub position: ChunkPosition,
pub last_mod: Instant,
}
enum Command {
Encode {
chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<Vec<ChunkDataPair>>,
reply: oneshot::Sender<EncodeResult>,
},
Load {
@ -46,7 +58,7 @@ impl ChunkImages {
}
}
pub async fn encoded(&self, chunks: Vec<ChunkPosition>) -> Vec<ChunkDataPair> {
pub async fn encoded(&self, chunks: Vec<ChunkPosition>) -> EncodeResult {
let (tx, rx) = oneshot::channel();
_ = self
.commands_tx
@ -81,7 +93,7 @@ impl ChunkImageLoop {
async fn encode(
self: Arc<Self>,
mut chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<Vec<ChunkDataPair>>,
reply: oneshot::Sender<EncodeResult>,
) {
// Any chunks that are _not_ loaded, we will send back directly from the database.
let mut unloaded = vec![];
@ -93,16 +105,16 @@ impl ChunkImageLoop {
// Any chunks that _are_ loaded, we will encode into WebP.
let this = Arc::clone(&self);
let loaded = tokio::task::spawn_blocking(move || {
let (loaded, new): (Vec<_>, Vec<_>) = tokio::task::spawn_blocking(move || {
chunks
.into_par_iter()
.flat_map(|position| -> Option<_> {
let pixmap = {
let (pixmap, last_mod) = {
// Clone out the pixmap to avoid unnecessary chunk mutex contention while the
// chunk is being encoded.
let chunk_ref = this.wall.get_chunk(position)?;
let chunk = chunk_ref.blocking_lock();
chunk.pixmap.clone()
(chunk.pixmap.clone(), chunk.last_mod)
};
let webp = webp::Encoder::new(
@ -112,12 +124,15 @@ impl ChunkImageLoop {
pixmap.height(),
);
// NOTE: There's an unnecessary copy here. Wonder if that kills performance much.
Some(ChunkDataPair {
position,
data: Arc::from(webp.encode_lossless().to_vec()),
})
Some((
ChunkDataPair {
position,
data: Arc::from(webp.encode_lossless().to_vec()),
},
NewlyEncoded { position, last_mod },
))
})
.collect::<Vec<_>>()
.unzip()
})
.await
.unwrap();
@ -138,7 +153,7 @@ impl ChunkImageLoop {
Err(err) => error!(?err, "read_chunks failed to read unloaded chunks"),
}
_ = reply.send(all);
_ = reply.send(EncodeResult { data: all, new });
}
async fn load_inner(self: Arc<Self>, mut chunks: Vec<ChunkPosition>) -> eyre::Result<()> {

View file

@ -1,4 +1,4 @@
use std::{convert::identity, path::PathBuf, sync::Arc};
use std::{convert::identity, path::PathBuf, sync::Arc, time::Instant};
use eyre::Context;
use rusqlite::Connection;