diff --git a/crates/rkgk/src/api/wall.rs b/crates/rkgk/src/api/wall.rs index d38c0b7..1fd8218 100644 --- a/crates/rkgk/src/api/wall.rs +++ b/crates/rkgk/src/api/wall.rs @@ -1,4 +1,7 @@ -use std::{collections::HashSet, sync::Arc}; +use std::{ + collections::{HashSet, VecDeque}, + sync::Arc, +}; use axum::{ extract::{ @@ -13,16 +16,20 @@ use schema::{ ChunkInfo, Error, LoginRequest, LoginResponse, Notify, Online, Request, Version, WallInfo, }; use serde::{Deserialize, Serialize}; -use tokio::{select, sync::mpsc, time::Instant}; -use tracing::{error, info}; +use tokio::{ + select, + sync::{self, mpsc, oneshot}, + time::Instant, +}; +use tracing::{error, info, instrument}; use crate::{ haku::{Haku, Limits}, login::database::LoginStatus, schema::Vec2, wall::{ - self, chunk_encoder::ChunkEncoder, chunk_iterator::ChunkIterator, ChunkPosition, JoinError, - SessionHandle, UserInit, Wall, + self, auto_save::AutoSave, chunk_images::ChunkImages, chunk_iterator::ChunkIterator, + database::ChunkDataPair, ChunkPosition, JoinError, SessionHandle, UserInit, Wall, }, }; @@ -110,7 +117,7 @@ async fn fallible_websocket(api: Arc, ws: &mut WebSocket) -> eyre::Result<( Some(wall) => wall, None => api.dbs.wall_broker.generate_id().await, }; - let open_wall = api.dbs.wall_broker.open(wall_id); + let open_wall = api.dbs.wall_broker.open(wall_id).await?; let session_handle = match open_wall .wall @@ -178,7 +185,8 @@ async fn fallible_websocket(api: Arc, ws: &mut WebSocket) -> eyre::Result<( SessionLoop::start( open_wall.wall, - open_wall.chunk_encoder, + open_wall.chunk_images, + open_wall.auto_save, session_handle, api.config.haku.clone(), login_request.init.brush, @@ -192,24 +200,33 @@ async fn fallible_websocket(api: Arc, ws: &mut WebSocket) -> eyre::Result<( struct SessionLoop { wall: Arc, - chunk_encoder: Arc, + chunk_images: Arc, + auto_save: Arc, handle: SessionHandle, render_commands_tx: mpsc::Sender, viewport_chunks: ChunkIterator, sent_chunks: HashSet, + pending_images: VecDeque, } enum RenderCommand { - SetBrush { brush: String }, - Plot { points: Vec }, + SetBrush { + brush: String, + }, + + Plot { + points: Vec, + done: oneshot::Sender<()>, + }, } impl SessionLoop { async fn start( wall: Arc, - chunk_encoder: Arc, + chunk_images: Arc, + auto_save: Arc, handle: SessionHandle, limits: Limits, brush: String, @@ -230,18 +247,20 @@ impl SessionLoop { .name(String::from("haku render thread")) .spawn({ let wall = Arc::clone(&wall); - let chunk_encoder = Arc::clone(&chunk_encoder); - move || Self::render_thread(wall, chunk_encoder, limits, render_commands_rx) + let chunk_images = Arc::clone(&chunk_images); + move || Self::render_thread(wall, chunk_images, limits, render_commands_rx) }) .context("could not spawn render thread")?; Ok(Self { wall, - chunk_encoder, + chunk_images, + auto_save, handle, render_commands_tx, viewport_chunks: ChunkIterator::new(ChunkPosition::new(0, 0), ChunkPosition::new(0, 0)), sent_chunks: HashSet::new(), + pending_images: VecDeque::new(), }) } @@ -284,13 +303,28 @@ impl SessionLoop { .await; } wall::EventKind::Plot { points } => { - // We drop commands if we take too long to render instead of lagging - // the WebSocket thread. - // Theoretically this will yield much better responsiveness, but it _will_ - // result in some visual glitches if we're getting bottlenecked. - _ = self.render_commands_tx.try_send(RenderCommand::Plot { - points: points.clone(), - }) + let chunks_to_modify: Vec<_> = + chunks_to_modify(&self.wall, points).into_iter().collect(); + match self.chunk_images.load(chunks_to_modify.clone()).await { + Ok(_) => { + // We drop commands if we take too long to render instead of lagging + // the WebSocket thread. + // Theoretically this will yield much better responsiveness, but it _will_ + // result in some visual glitches if we're getting bottlenecked. + let (done_tx, done_rx) = oneshot::channel(); + _ = self.render_commands_tx.try_send(RenderCommand::Plot { + points: points.clone(), + done: done_tx, + }); + + let auto_save = Arc::clone(&self.auto_save); + tokio::spawn(async move { + _ = done_rx.await; + auto_save.request(chunks_to_modify).await; + }); + } + Err(err) => error!(?err, "while loading chunks for render command"), + } } } @@ -320,47 +354,53 @@ impl SessionLoop { let mut chunk_infos = vec![]; let mut packet = vec![]; - // Number of chunks iterated is limited per packet, so as not to let the client - // stall the server by sending in a huge viewport. - let start = Instant::now(); - let mut iterated = 0; - for i in 0..12000 { - if let Some(position) = self.viewport_chunks.next() { - if self.sent_chunks.contains(&position) { - continue; - } + if self.pending_images.is_empty() { + let mut positions = vec![]; - if let Some(encoded) = self.chunk_encoder.encoded(position).await { - let offset = packet.len(); - packet.extend_from_slice(&encoded); - chunk_infos.push(ChunkInfo { - position, - offset: u32::try_from(offset).context("packet too big")?, - length: u32::try_from(encoded.len()).context("chunk image too big")?, - }); - - // The actual number of chunks per packet is limited by the packet's size, which - // we don't want to be too big, to maintain responsiveness - the client will - // only request more chunks once per frame, so interactions still have time to - // execute. We cap it to 256KiB in hopes that noone has Internet slow enough for - // this to cause a disconnect. - if packet.len() >= 256 * 1024 { - iterated = i; - break; + // Number of chunks iterated is limited per packet, so as not to let the client + // stall the server by sending in a huge viewport. + for _ in 0..9000 { + if let Some(position) = self.viewport_chunks.next() { + if !self.sent_chunks.insert(position) + || !self.chunk_images.chunk_exists(position) + { + continue; } + positions.push(position); + } else { + break; } + } - self.sent_chunks.insert(position); - } else { - iterated = i; + self.pending_images + .extend(self.chunk_images.encoded(positions).await); + } + + while let Some(ChunkDataPair { position, data }) = self.pending_images.pop_front() { + let offset = packet.len(); + packet.extend_from_slice(&data); + chunk_infos.push(ChunkInfo { + position, + offset: u32::try_from(offset).context("packet too big")?, + length: u32::try_from(data.len()).context("chunk image too big")?, + }); + + // The final number of chunks per packet is limited by the packet's size, which + // we don't want to be too big, to maintain responsiveness - the client will + // only request more chunks once per frame, so interactions still have time to + // execute. We cap it to 256KiB in hopes that noone has Internet slow enough for + // this to cause a disconnect. + // + // Note that after this there _may_ be more chunks pending in the queue. + if packet.len() >= 256 * 1024 { break; } } - info!(elapsed = ?start.elapsed(), iterated, "send_chunks"); ws.send(to_message(&Notify::Chunks { chunks: chunk_infos, - has_more: self.viewport_chunks.clone().next().is_some(), + has_more: !self.pending_images.is_empty() + || self.viewport_chunks.clone().next().is_some(), })) .await?; ws.send(Message::Binary(packet)).await?; @@ -370,7 +410,7 @@ impl SessionLoop { fn render_thread( wall: Arc, - chunk_encoder: Arc, + chunk_images: Arc, limits: Limits, mut commands: mpsc::Receiver, ) { @@ -382,29 +422,51 @@ impl SessionLoop { RenderCommand::SetBrush { brush } => { brush_ok = haku.set_brush(&brush).is_ok(); } - RenderCommand::Plot { points } => { + + RenderCommand::Plot { points, done } => { if brush_ok { if let Ok(value) = haku.eval_brush() { for point in points { // Ignore the result. It's better if we render _something_ rather // than nothing. - _ = draw_to_chunks(&haku, value, point, &wall, &chunk_encoder); + _ = draw_to_chunks(&wall, &chunk_images, &haku, value, point); } haku.reset_vm(); } } + _ = done.send(()); } } } } } +fn chunks_to_modify(wall: &Wall, points: &[Vec2]) -> HashSet { + let mut chunks = HashSet::new(); + for point in points { + let paint_area = wall.settings().paint_area as f32; + let left = point.x - paint_area / 2.0; + let top = point.y - paint_area / 2.0; + let top_left_chunk = wall.settings().chunk_at(Vec2::new(left, top)); + let bottom_right_chunk = wall + .settings() + .chunk_at(Vec2::new(left + paint_area, top + paint_area)); + for chunk_y in top_left_chunk.y..bottom_right_chunk.y { + for chunk_x in top_left_chunk.x..bottom_right_chunk.x { + chunks.insert(ChunkPosition::new(chunk_x, chunk_y)); + } + } + } + chunks +} + +#[instrument(skip(wall, chunk_images, haku, value))] fn draw_to_chunks( + wall: &Wall, + chunk_images: &ChunkImages, haku: &Haku, value: Value, center: Vec2, - wall: &Wall, - chunk_encoder: &ChunkEncoder, ) -> eyre::Result<()> { let settings = wall.settings(); @@ -414,10 +476,10 @@ fn draw_to_chunks( let left = center.x - paint_area / 2.0; let top = center.y - paint_area / 2.0; - let left_chunk = f32::floor(left / chunk_size) as i32; - let top_chunk = f32::floor(top / chunk_size) as i32; - let right_chunk = f32::ceil((left + paint_area) / chunk_size) as i32; - let bottom_chunk = f32::ceil((top + paint_area) / chunk_size) as i32; + let left_chunk = settings.chunk_at_1d(left); + let top_chunk = settings.chunk_at_1d(top); + let right_chunk = settings.chunk_at_1d(left + paint_area); + let bottom_chunk = settings.chunk_at_1d(top + paint_area); for chunk_y in top_chunk..bottom_chunk { for chunk_x in left_chunk..right_chunk { let x = f32::floor(-chunk_x as f32 * chunk_size + center.x); @@ -428,11 +490,16 @@ fn draw_to_chunks( } } - for chunk_y in top_chunk..bottom_chunk { - for chunk_x in left_chunk..right_chunk { - chunk_encoder.invalidate_blocking(ChunkPosition::new(chunk_x, chunk_y)) - } - } + // NOTE: Maybe sending in an iterator would be more efficient? + // If there were many chunks modified, (which there probably weren't,) this could allocate + // a lot of memory. + chunk_images.mark_modified_blocking( + (top_chunk..bottom_chunk) + .flat_map(|chunk_y| { + (left_chunk..right_chunk).map(move |chunk_x| ChunkPosition::new(chunk_x, chunk_y)) + }) + .collect(), + ); Ok(()) } diff --git a/crates/rkgk/src/config.rs b/crates/rkgk/src/config.rs index 93d202c..92bda2b 100644 --- a/crates/rkgk/src/config.rs +++ b/crates/rkgk/src/config.rs @@ -4,6 +4,6 @@ use crate::wall; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { - pub wall: wall::Settings, + pub wall_broker: wall::broker::Settings, pub haku: crate::haku::Limits, } diff --git a/crates/rkgk/src/login.rs b/crates/rkgk/src/login.rs index efece57..63221ad 100644 --- a/crates/rkgk/src/login.rs +++ b/crates/rkgk/src/login.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use crate::{id, serialization::DeserializeFromStr}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct UserId([u8; 32]); +pub struct UserId(pub [u8; 32]); impl UserId { pub fn new(rng: &mut dyn RngCore) -> Self { diff --git a/crates/rkgk/src/login/database.rs b/crates/rkgk/src/login/database.rs index f3ed062..39734ac 100644 --- a/crates/rkgk/src/login/database.rs +++ b/crates/rkgk/src/login/database.rs @@ -18,6 +18,7 @@ pub struct Database { command_tx: mpsc::Sender, } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum LoginStatus { ValidUser, UserDoesNotExist, @@ -33,10 +34,12 @@ enum Command { nickname: String, reply: oneshot::Sender>, }, + LogIn { user_id: UserId, reply: oneshot::Sender, }, + UserInfo { user_id: UserId, reply: oneshot::Sender>>, @@ -79,8 +82,10 @@ impl Database { pub fn start(settings: &Settings) -> eyre::Result { let db = Connection::open(&settings.path).context("cannot open login database")?; - db.execute( + db.execute_batch( r#" + PRAGMA application_id = 0x726B674C; -- rkgL + CREATE TABLE IF NOT EXISTS t_users ( user_index INTEGER PRIMARY KEY, @@ -89,78 +94,81 @@ pub fn start(settings: &Settings) -> eyre::Result { last_login_timestamp INTEGER NOT NULL ); "#, - (), )?; let (command_tx, mut command_rx) = mpsc::channel(8); let mut user_id_rng = rand_chacha::ChaCha20Rng::from_entropy(); - tokio::task::spawn_blocking(move || { - let mut s_insert_user = db - .prepare( - r#" - INSERT INTO t_users - (long_user_id, nickname, last_login_timestamp) - VALUES (?, ?, ?); - "#, - ) - .unwrap(); + std::thread::Builder::new() + .name("login database thread".into()) + .spawn(move || { + let mut s_insert_user = db + .prepare( + r#" + INSERT INTO t_users + (long_user_id, nickname, last_login_timestamp) + VALUES (?, ?, ?); + "#, + ) + .unwrap(); - let mut s_log_in = db - .prepare( - r#" - UPDATE OR ABORT t_users - SET last_login_timestamp = ? - WHERE long_user_id = ?; - "#, - ) - .unwrap(); + let mut s_log_in = db + .prepare( + r#" + UPDATE OR ABORT t_users + SET last_login_timestamp = ? + WHERE long_user_id = ?; + "#, + ) + .unwrap(); - let mut s_user_info = db - .prepare( - r#" - SELECT nickname - FROM t_users - WHERE long_user_id = ? - LIMIT 1; - "#, - ) - .unwrap(); + let mut s_user_info = db + .prepare( + r#" + SELECT nickname + FROM t_users + WHERE long_user_id = ? + LIMIT 1; + "#, + ) + .unwrap(); - while let Some(command) = command_rx.blocking_recv() { - match command { - Command::NewUser { nickname, reply } => { - let user_id = UserId::new(&mut user_id_rng); - let result = s_insert_user - .execute((user_id.0, nickname, Utc::now().timestamp())) - .context("could not execute query"); - _ = reply.send(result.map(|_| user_id)); - } + while let Some(command) = command_rx.blocking_recv() { + match command { + Command::NewUser { nickname, reply } => { + let user_id = UserId::new(&mut user_id_rng); + let result = s_insert_user + .execute((user_id.0, nickname, Utc::now().timestamp())) + .context("could not execute query"); + _ = reply.send(result.map(|_| user_id)); + } - Command::LogIn { user_id, reply } => { - // TODO: User expiration. - let login_status = match s_log_in.execute((Utc::now().timestamp(), user_id.0)) { - Ok(_) => LoginStatus::ValidUser, - Err(_) => LoginStatus::UserDoesNotExist, - }; - _ = reply.send(login_status); - } + Command::LogIn { user_id, reply } => { + // TODO: User expiration. + let login_status = + match s_log_in.execute((Utc::now().timestamp(), user_id.0)) { + Ok(_) => LoginStatus::ValidUser, + Err(_) => LoginStatus::UserDoesNotExist, + }; + _ = reply.send(login_status); + } - Command::UserInfo { user_id, reply } => { - let result = s_user_info - .query_row((user_id.0,), |row| { - Ok(UserInfo { - nickname: row.get(0)?, + Command::UserInfo { user_id, reply } => { + let result = s_user_info + .query_row((user_id.0,), |row| { + Ok(UserInfo { + nickname: row.get(0)?, + }) }) - }) - .optional() - .context("could not execute query"); - _ = reply.send(result); + .optional() + .context("could not execute query"); + _ = reply.send(result); + } } } - } - }); + }) + .context("cannot spawn thread")?; Ok(Database { command_tx }) } diff --git a/crates/rkgk/src/main.rs b/crates/rkgk/src/main.rs index 6da9d86..7962b85 100644 --- a/crates/rkgk/src/main.rs +++ b/crates/rkgk/src/main.rs @@ -67,7 +67,9 @@ fn database(config: &Config, paths: &Paths<'_>) -> eyre::Result { }) .context("cannot start up login database")?; - let wall_broker = wall::Broker::new(config.wall); + create_dir_all(paths.database_dir.join("wall"))?; + let wall_broker = + wall::Broker::new(paths.database_dir.join("wall"), config.wall_broker.clone()); Ok(Databases { login, wall_broker }) } @@ -89,7 +91,6 @@ async fn fallible_main() -> eyre::Result<()> { let dbs = Arc::new(database(&config, &paths)?); let api = Arc::new(Api { config, dbs }); - let app = Router::new() .route_service( "/", @@ -116,9 +117,7 @@ async fn main() { let _client = tracy_client::Client::start(); color_eyre::install().unwrap(); - tracing_subscriber::fmt() - .with_span_events(FmtSpan::ACTIVE) - .init(); + tracing_subscriber::fmt().init(); match fallible_main().await { Ok(_) => (), diff --git a/crates/rkgk/src/schema.rs b/crates/rkgk/src/schema.rs index 282dc0d..af22e75 100644 --- a/crates/rkgk/src/schema.rs +++ b/crates/rkgk/src/schema.rs @@ -5,3 +5,9 @@ pub struct Vec2 { pub x: f32, pub y: f32, } + +impl Vec2 { + pub fn new(x: f32, y: f32) -> Self { + Self { x, y } + } +} diff --git a/crates/rkgk/src/wall.rs b/crates/rkgk/src/wall.rs index 9a0a6b9..f678907 100644 --- a/crates/rkgk/src/wall.rs +++ b/crates/rkgk/src/wall.rs @@ -17,11 +17,14 @@ use tracing::info; use crate::{id, login::UserId, schema::Vec2, serialization::DeserializeFromStr}; +pub mod auto_save; pub mod broker; -pub mod chunk_encoder; +pub mod chunk_images; pub mod chunk_iterator; +pub mod database; pub use broker::Broker; +pub use database::Database; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct WallId([u8; 32]); @@ -82,12 +85,18 @@ impl fmt::Display for InvalidWallId { impl Error for InvalidWallId {} -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] +#[derive(Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)] pub struct ChunkPosition { pub x: i32, pub y: i32, } +impl fmt::Debug for ChunkPosition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "({}, {})", self.x, self.y) + } +} + impl ChunkPosition { pub fn new(x: i32, y: i32) -> Self { Self { x, y } @@ -115,11 +124,12 @@ pub struct Settings { } impl Settings { + pub fn chunk_at_1d(&self, x: f32) -> i32 { + f32::floor(x / self.chunk_size as f32) as i32 + } + pub fn chunk_at(&self, position: Vec2) -> ChunkPosition { - ChunkPosition::new( - f32::floor(position.x / self.chunk_size as f32) as i32, - f32::floor(position.y / self.chunk_size as f32) as i32, - ) + ChunkPosition::new(self.chunk_at_1d(position.x), self.chunk_at_1d(position.y)) } } @@ -200,17 +210,19 @@ impl Wall { &self.settings } + pub fn has_chunk(&self, at: ChunkPosition) -> bool { + self.chunks.contains_key(&at) + } + pub fn get_chunk(&self, at: ChunkPosition) -> Option>> { self.chunks.get(&at).map(|chunk| Arc::clone(&chunk)) } pub fn get_or_create_chunk(&self, at: ChunkPosition) -> Arc> { - Arc::clone( - &self - .chunks - .entry(at) - .or_insert_with(|| Arc::new(Mutex::new(Chunk::new(self.settings.chunk_size)))), - ) + Arc::clone(&self.chunks.entry(at).or_insert_with(|| { + info!(?at, "chunk created"); + Arc::new(Mutex::new(Chunk::new(self.settings.chunk_size))) + })) } pub fn join(self: &Arc, session: Session) -> Result { diff --git a/crates/rkgk/src/wall/auto_save.rs b/crates/rkgk/src/wall/auto_save.rs new file mode 100644 index 0000000..ebda9b4 --- /dev/null +++ b/crates/rkgk/src/wall/auto_save.rs @@ -0,0 +1,86 @@ +use std::{backtrace::Backtrace, collections::HashSet, sync::Arc, time::Duration}; + +use dashmap::DashSet; +use serde::{Deserialize, Serialize}; +use tokio::{ + sync::mpsc, + time::{interval, MissedTickBehavior}, +}; +use tracing::{info, instrument}; + +use super::{chunk_images::ChunkImages, ChunkPosition, Database, Wall}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Settings { + pub interval_seconds: u64, +} + +pub struct AutoSave { + requests_tx: mpsc::Sender>, +} + +impl AutoSave { + pub fn new(chunk_images: Arc, settings: Settings) -> Self { + let (requests_tx, requests_rx) = mpsc::channel(8); + + tokio::spawn( + AutoSaveLoop { + chunk_images, + settings, + requests_rx, + unsaved_chunks: HashSet::new(), + } + .enter(), + ); + + Self { requests_tx } + } + + pub async fn request(&self, chunks: Vec) { + _ = self.requests_tx.send(chunks).await; + } +} + +struct AutoSaveLoop { + chunk_images: Arc, + settings: Settings, + + requests_rx: mpsc::Receiver>, + unsaved_chunks: HashSet, +} + +impl AutoSaveLoop { + async fn enter(mut self) { + let mut save_interval = interval(Duration::from_secs(self.settings.interval_seconds)); + save_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { + tokio::select! { + request = self.requests_rx.recv() => { + if let Some(positions) = request { + for position in positions { + self.unsaved_chunks.insert(position); + } + } else { + break; + } + } + + _ = save_interval.tick() => self.save_chunks().await, + + else => break, + } + } + } + + #[instrument(skip(self), fields(num_chunks = self.unsaved_chunks.len()))] + async fn save_chunks(&mut self) { + // NOTE: We don't care about actually using the images here - + // the ChunkImages service writes them to the database by itself, and that's all our + // request is for. + _ = self + .chunk_images + .encoded(self.unsaved_chunks.drain().collect()) + .await; + } +} diff --git a/crates/rkgk/src/wall/broker.rs b/crates/rkgk/src/wall/broker.rs index 01df98e..23c5ddc 100644 --- a/crates/rkgk/src/wall/broker.rs +++ b/crates/rkgk/src/wall/broker.rs @@ -1,19 +1,31 @@ -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; use dashmap::DashMap; use rand::SeedableRng; use rand_chacha::ChaCha20Rng; +use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; -use tracing::info; +use tracing::{info, instrument}; -use super::{chunk_encoder::ChunkEncoder, Settings, Wall, WallId}; +use super::{ + auto_save::{self, AutoSave}, + chunk_images::ChunkImages, + database, Database, Wall, WallId, +}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Settings { + pub default_wall_settings: super::Settings, + pub auto_save: auto_save::Settings, +} /// The broker is the main way to access wall data. /// /// It handles dynamically loading and unloading walls as they're needed. /// It also handles database threads for each wall. pub struct Broker { - wall_settings: Settings, + databases_dir: PathBuf, + settings: Settings, open_walls: DashMap, rng: Mutex, } @@ -21,14 +33,17 @@ pub struct Broker { #[derive(Clone)] pub struct OpenWall { pub wall: Arc, - pub chunk_encoder: Arc, + pub chunk_images: Arc, + pub db: Arc, + pub auto_save: Arc, } impl Broker { - pub fn new(wall_settings: Settings) -> Self { - info!(?wall_settings, "Broker::new"); + pub fn new(databases_dir: PathBuf, settings: Settings) -> Self { + info!(?settings, "Broker::new"); Self { - wall_settings, + databases_dir, + settings, open_walls: DashMap::new(), rng: Mutex::new(ChaCha20Rng::from_entropy()), } @@ -41,14 +56,35 @@ impl Broker { WallId::new(&mut *rng) } - pub fn open(&self, wall_id: WallId) -> OpenWall { - let wall = Arc::new(Wall::new(self.wall_settings)); - self.open_walls - .entry(wall_id) - .or_insert_with(|| OpenWall { - chunk_encoder: Arc::new(ChunkEncoder::start(Arc::clone(&wall))), - wall, - }) - .clone() + #[instrument(skip(self), fields(%wall_id))] + pub async fn open(&self, wall_id: WallId) -> eyre::Result { + let open_wall = self.open_walls.entry(wall_id); + + match open_wall { + dashmap::Entry::Vacant(entry) => { + let db = Arc::new(database::start(database::Settings { + path: self.databases_dir.join(format!("{wall_id}.db")), + wall_id, + default_wall_settings: self.settings.default_wall_settings, + })?); + let wall = Arc::new(Wall::new(*db.wall_settings())); + let chunk_images = Arc::new(ChunkImages::new(Arc::clone(&wall), Arc::clone(&db))); + let auto_save = Arc::new(AutoSave::new( + Arc::clone(&chunk_images), + self.settings.auto_save.clone(), + )); + let open_wall = OpenWall { + wall, + chunk_images, + db, + auto_save, + }; + + entry.insert(open_wall.clone()); + + Ok(open_wall) + } + dashmap::Entry::Occupied(entry) => Ok(entry.get().clone()), + } } } diff --git a/crates/rkgk/src/wall/chunk_encoder.rs b/crates/rkgk/src/wall/chunk_encoder.rs deleted file mode 100644 index dfb7d71..0000000 --- a/crates/rkgk/src/wall/chunk_encoder.rs +++ /dev/null @@ -1,104 +0,0 @@ -use std::sync::Arc; - -use indexmap::IndexMap; -use tokio::sync::{mpsc, oneshot}; - -use super::{ChunkPosition, Wall}; - -/// Service which encodes chunks to WebP images and caches them in an LRU fashion. -pub struct ChunkEncoder { - commands_tx: mpsc::Sender, -} - -enum Command { - GetEncoded { - chunk: ChunkPosition, - reply: oneshot::Sender>>, - }, - - Invalidate { - chunk: ChunkPosition, - }, -} - -impl ChunkEncoder { - pub fn start(wall: Arc) -> Self { - let (commands_tx, commands_rx) = mpsc::channel(32); - - tokio::spawn(Self::service(wall, commands_rx)); - - Self { commands_tx } - } - - pub async fn encoded(&self, chunk: ChunkPosition) -> Option> { - let (tx, rx) = oneshot::channel(); - self.commands_tx - .send(Command::GetEncoded { chunk, reply: tx }) - .await - .ok()?; - rx.await.ok().flatten() - } - - pub async fn invalidate(&self, chunk: ChunkPosition) { - _ = self.commands_tx.send(Command::Invalidate { chunk }).await; - } - - pub fn invalidate_blocking(&self, chunk: ChunkPosition) { - _ = self - .commands_tx - .blocking_send(Command::Invalidate { chunk }); - } - - async fn encode(wall: &Wall, chunk: ChunkPosition) -> Option> { - let pixmap = { - // Clone out the pixmap to avoid unnecessary chunk mutex contention while the - // chunk is being encoded. - let chunk_ref = wall.get_chunk(chunk)?; - let chunk = chunk_ref.lock().await; - chunk.pixmap.clone() - }; - - let image = tokio::task::spawn_blocking(move || { - let webp = webp::Encoder::new( - pixmap.data(), - webp::PixelLayout::Rgba, - pixmap.width(), - pixmap.height(), - ); - // NOTE: There's an unnecessary copy here. Wonder if that kills performance much. - webp.encode_lossless().to_vec() - }) - .await - .ok()?; - - Some(Arc::from(image)) - } - - async fn service(wall: Arc, mut commands_rx: mpsc::Receiver) { - let mut encoded_lru: IndexMap>> = IndexMap::new(); - - while let Some(command) = commands_rx.recv().await { - match command { - Command::GetEncoded { chunk, reply } => { - if let Some(encoded) = encoded_lru.get(&chunk) { - _ = reply.send(encoded.clone()) - } else { - let encoded = Self::encode(&wall, chunk).await; - // TODO: Make this capacity configurable. - // 598 is chosen because under the default configuration, it would - // correspond to roughly two 3840x2160 displays. - if encoded_lru.len() >= 598 { - encoded_lru.shift_remove_index(0); - } - encoded_lru.insert(chunk, encoded.clone()); - _ = reply.send(encoded); - } - } - - Command::Invalidate { chunk } => { - encoded_lru.shift_remove(&chunk); - } - } - } - } -} diff --git a/crates/rkgk/src/wall/chunk_images.rs b/crates/rkgk/src/wall/chunk_images.rs new file mode 100644 index 0000000..40f1210 --- /dev/null +++ b/crates/rkgk/src/wall/chunk_images.rs @@ -0,0 +1,238 @@ +use std::sync::Arc; + +use dashmap::DashSet; +use eyre::Context; +use haku::render::tiny_skia::{IntSize, Pixmap}; +use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, info, instrument}; + +use super::{database::ChunkDataPair, ChunkPosition, Database, Wall}; + +/// Chunk image encoding, caching, and storage service. +pub struct ChunkImages { + wall: Arc, + async_loop: Arc, + commands_tx: mpsc::Sender, +} + +enum Command { + Encode { + chunks: Vec, + reply: oneshot::Sender>, + }, + + Load { + chunks: Vec, + reply: oneshot::Sender>, + }, + + MarkModified { + chunks: Vec, + }, +} + +impl ChunkImages { + pub fn new(wall: Arc, db: Arc) -> Self { + let (commands_tx, commands_rx) = mpsc::channel(32); + + let async_loop = Arc::new(ChunkImageLoop { + wall: Arc::clone(&wall), + db, + chunks_in_db: DashSet::new(), + }); + tokio::spawn(Arc::clone(&async_loop).enter(commands_rx)); + + Self { + wall, + async_loop, + commands_tx, + } + } + + pub async fn encoded(&self, chunks: Vec) -> Vec { + let (tx, rx) = oneshot::channel(); + _ = self + .commands_tx + .send(Command::Encode { chunks, reply: tx }) + .await + .ok(); + rx.await.ok().unwrap_or_default() + } + + pub async fn load(&self, chunks: Vec) -> eyre::Result<()> { + let (tx, rx) = oneshot::channel(); + self.commands_tx + .send(Command::Load { chunks, reply: tx }) + .await + .context("database is offline")?; + rx.await.context("failed to load chunks")? + } + + pub fn mark_modified_blocking(&self, chunks: Vec) { + _ = self + .commands_tx + .blocking_send(Command::MarkModified { chunks }); + } + + pub fn chunk_exists(&self, position: ChunkPosition) -> bool { + self.wall.has_chunk(position) || self.async_loop.chunks_in_db.contains(&position) + } +} + +struct ChunkImageLoop { + wall: Arc, + db: Arc, + chunks_in_db: DashSet, +} + +impl ChunkImageLoop { + #[instrument(skip(self, reply))] + async fn encode( + self: Arc, + mut chunks: Vec, + reply: oneshot::Sender>, + ) { + // Any chunks that are _not_ loaded, we will send back directly from the database. + let mut unloaded = vec![]; + chunks.retain(|&position| { + let is_loaded = self.wall.has_chunk(position); + unloaded.push(position); + is_loaded + }); + + // Any chunks that _are_ loaded, we will encode into WebP. + let this = Arc::clone(&self); + let loaded = tokio::task::spawn_blocking(move || { + chunks + .into_par_iter() + .flat_map(|position| -> Option<_> { + let pixmap = { + // Clone out the pixmap to avoid unnecessary chunk mutex contention while the + // chunk is being encoded. + let chunk_ref = this.wall.get_chunk(position)?; + let chunk = chunk_ref.blocking_lock(); + chunk.pixmap.clone() + }; + + let webp = webp::Encoder::new( + pixmap.data(), + webp::PixelLayout::Rgba, + pixmap.width(), + pixmap.height(), + ); + // NOTE: There's an unnecessary copy here. Wonder if that kills performance much. + Some(ChunkDataPair { + position, + data: Arc::from(webp.encode_lossless().to_vec()), + }) + }) + .collect::>() + }) + .await + .unwrap(); + + // We'll also write the loaded chunks back to the database while we have the images. + // No point wasting the encoding time. + if !loaded.is_empty() { + info!(num = loaded.len(), "writing loaded chunks to database"); + } + _ = self.db.write_chunks(loaded.clone()).await; + for &ChunkDataPair { position, .. } in &loaded { + self.chunks_in_db.insert(position); + } + + let mut all = loaded; + match self.db.read_chunks(unloaded).await { + Ok(mut chunks) => all.append(&mut chunks), + Err(err) => error!(?err, "read_chunks failed to read unloaded chunks"), + } + + _ = reply.send(all); + } + + async fn load_inner(self: Arc, mut chunks: Vec) -> eyre::Result<()> { + // Skip already loaded chunks. + chunks.retain(|&position| !self.wall.has_chunk(position)); + if chunks.is_empty() { + return Ok(()); + } + + info!(?chunks, "to load"); + + let chunks = self.db.read_chunks(chunks.clone()).await?; + + let chunks2 = chunks.clone(); + let decoded = tokio::task::spawn_blocking(move || { + chunks2 + .par_iter() + .flat_map(|ChunkDataPair { position, data }| { + webp::Decoder::new(data) + .decode() + .and_then(|image| { + info!(?position, "decoded"); + let size = IntSize::from_wh(image.width(), image.height())?; + Pixmap::from_vec(image.to_vec(), size) + }) + .map(|pixmap| (*position, pixmap)) + }) + .collect::>() + }) + .await + .context("failed to decode chunks from the database")?; + + // I don't know yet if locking all the chunks is a good idea at this point. + // I can imagine contended chunks having some trouble loading. + let chunk_arcs: Vec<_> = chunks + .iter() + .map(|ChunkDataPair { position, .. }| self.wall.get_or_create_chunk(*position)) + .collect(); + let mut chunk_refs = Vec::with_capacity(chunk_arcs.len()); + for arc in &chunk_arcs { + chunk_refs.push(arc.lock().await); + } + + for ((_, pixmap), mut chunk) in decoded.into_iter().zip(chunk_refs) { + chunk.pixmap = pixmap; + } + + Ok(()) + } + + #[instrument(skip(self, reply))] + async fn load( + self: Arc, + chunks: Vec, + reply: oneshot::Sender>, + ) { + _ = reply.send(self.load_inner(chunks).await); + } + + async fn enter(self: Arc, mut commands_rx: mpsc::Receiver) { + let all_chunks = self + .db + .get_all_chunks() + .await + .expect("could not list chunks in the database"); + for position in all_chunks { + self.chunks_in_db.insert(position); + } + + while let Some(command) = commands_rx.recv().await { + match command { + Command::Encode { chunks, reply } => { + // TODO: This should have a caching layer. + tokio::spawn(Arc::clone(&self).encode(chunks, reply)); + } + + Command::Load { chunks, reply } => { + tokio::spawn(Arc::clone(&self).load(chunks, reply)); + } + + Command::MarkModified { chunks } => { + // TODO: This should invalidate data from the caching layer. + } + } + } + } +} diff --git a/crates/rkgk/src/wall/chunk_iterator.rs b/crates/rkgk/src/wall/chunk_iterator.rs index 9a77005..9f4144e 100644 --- a/crates/rkgk/src/wall/chunk_iterator.rs +++ b/crates/rkgk/src/wall/chunk_iterator.rs @@ -1,3 +1,5 @@ +use std::iter::Take; + use super::ChunkPosition; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -8,13 +10,35 @@ pub struct ChunkIterator { } impl ChunkIterator { - pub fn new(top_left: ChunkPosition, bottom_right: ChunkPosition) -> Self { + pub fn new(start: ChunkPosition, end: ChunkPosition) -> Self { + let top_left = ChunkPosition::new(start.x.min(end.x), start.y.min(end.y)); + let bottom_right = ChunkPosition::new(start.x.max(end.x), start.y.max(end.y)); Self { cursor: top_left, left: top_left.x, bottom_right, } } + + pub fn take_next(&mut self, n: i32) -> Take { + assert!(n > 0); + + let take = (*self).take(n as usize); + + let x = self.cursor.x - self.left; + let width = self.bottom_right.x - self.left; + if width != 0 { + self.cursor.x = self.left + (x + n) % width; + self.cursor.y += n / width; + } else { + // In a width = 0 configuration, we iterate vertically. + // This is probably not the right thing to do, but we're just doing this to guard + // against malicious clients. + self.cursor.y += n; + } + + take + } } impl Iterator for ChunkIterator { diff --git a/crates/rkgk/src/wall/database.rs b/crates/rkgk/src/wall/database.rs new file mode 100644 index 0000000..56a168c --- /dev/null +++ b/crates/rkgk/src/wall/database.rs @@ -0,0 +1,324 @@ +use std::{ + convert::identity, + path::{Path, PathBuf}, + sync::Arc, +}; + +use chrono::Utc; +use eyre::Context; +use rusqlite::Connection; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, info, instrument}; + +use crate::login::UserId; + +use super::{ChunkPosition, WallId}; + +pub struct Settings { + pub path: PathBuf, + pub wall_id: WallId, + pub default_wall_settings: super::Settings, +} + +pub struct Database { + wall_settings: super::Settings, + command_tx: mpsc::Sender, +} + +#[derive(Debug, Clone)] +pub struct ChunkDataPair { + pub position: ChunkPosition, + pub data: Arc<[u8]>, +} + +enum Command { + SetWallInfo { + created_by: UserId, + title: String, + reply: oneshot::Sender>, + }, + + WriteChunks { + chunks: Vec, + reply: oneshot::Sender>, + }, + + ReadChunks { + chunks: Vec, + reply: oneshot::Sender>, + }, + + GetAllChunks { + reply: oneshot::Sender>>, + }, +} + +impl Database { + pub fn wall_settings(&self) -> &super::Settings { + &self.wall_settings + } + + #[instrument(skip(self), err(Debug))] + pub async fn write_chunks(&self, chunks: Vec) -> eyre::Result<()> { + let (tx, rx) = oneshot::channel(); + self.command_tx + .send(Command::WriteChunks { chunks, reply: tx }) + .await + .context("database is offline")?; + rx.await.context("database returned an error")? + } + + pub async fn read_chunks( + &self, + chunks: Vec, + ) -> eyre::Result> { + let (tx, rx) = oneshot::channel(); + self.command_tx + .send(Command::ReadChunks { chunks, reply: tx }) + .await + .context("database is offline")?; + rx.await.context("database did not return anything") + } + + pub async fn get_all_chunks(&self) -> eyre::Result> { + let (tx, rx) = oneshot::channel(); + self.command_tx + .send(Command::GetAllChunks { reply: tx }) + .await + .context("database is offline")?; + rx.await.context("database did not return anything")? + } +} + +#[instrument(name = "wall::database::start", skip(settings), fields(wall_id = %settings.wall_id))] +pub fn start(settings: Settings) -> eyre::Result { + let db = Connection::open(settings.path).context("cannot open wall database")?; + + let major: u32 = env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(); + let minor: u32 = env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(); + let patch: u32 = env!("CARGO_PKG_VERSION_PATCH").parse().unwrap(); + let version = major * 1_000_000 + minor * 1_000 + patch; + + info!("initial setup"); + + db.execute_batch( + r#" + PRAGMA application_id = 0x726B6757; -- rkgW + + CREATE TABLE IF NOT EXISTS + t_file_info ( + id INTEGER PRIMARY KEY CHECK (id = 1), + version INTEGER NOT NULL, + wall_id BLOB NOT NULL, + mtime INTEGER NOT NULL DEFAULT (unixepoch()) + ); + + CREATE TABLE IF NOT EXISTS + t_wall_settings ( + id INTEGER PRIMARY KEY CHECK (id = 1), + max_chunks INTEGER NOT NULL, + max_sessions INTEGER NOT NULL, + paint_area INTEGER NOT NULL, + chunk_size INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS + t_wall_info ( + id INTEGER PRIMARY KEY CHECK (id = 1), + created_by BLOB NOT NULL, + title TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS + t_chunks_v0 ( + chunk_index INTEGER PRIMARY KEY, + chunk_x INTEGER NOT NULL, + chunk_y INTEGER NOT NULL, + mtime INTEGER NOT NULL DEFAULT (unixepoch()), + webp_data BLOB NOT NULL, + + UNIQUE(chunk_x, chunk_y) ON CONFLICT REPLACE + ); + + CREATE INDEX IF NOT EXISTS + t_chunks_v0_index_xy ON t_chunks_v0 + (chunk_x, chunk_y); + "#, + )?; + + info!("set file version and wall ID"); + + db.execute( + r#" + INSERT OR IGNORE + INTO t_file_info + (version, wall_id, mtime) + VALUES (?, ?, unixepoch()); + "#, + (version, settings.wall_id.0), + )?; + + info!("set wall mtime"); + + db.execute( + r#" + UPDATE t_file_info + SET mtime = unixepoch(); + "#, + (), + )?; + + info!("initialize/get wall settings"); + + db.execute( + r#" + INSERT OR IGNORE + INTO t_wall_settings + (max_chunks, max_sessions, paint_area, chunk_size) + VALUES (?, ?, ?, ?); + "#, + ( + settings.default_wall_settings.max_chunks, + settings.default_wall_settings.max_sessions, + settings.default_wall_settings.paint_area, + settings.default_wall_settings.chunk_size, + ), + )?; + + let wall_settings = db.query_row( + r#" + SELECT + max_chunks, max_sessions, paint_area, chunk_size + FROM t_wall_settings; + "#, + (), + |row| { + Ok(super::Settings { + max_chunks: row.get(0)?, + max_sessions: row.get(1)?, + paint_area: row.get(2)?, + chunk_size: row.get(3)?, + }) + }, + )?; + + let (command_tx, mut command_rx) = mpsc::channel(8); + + std::thread::Builder::new() + .name(format!("database thread {}", settings.wall_id)) + .spawn(move || { + let mut s_set_wall_info = db + .prepare( + r#" + INSERT OR REPLACE + INTO t_wall_info + (created_by, title) + VALUES (?, ?); + "#, + ) + .unwrap(); + + let mut s_write_chunk = db + .prepare( + r#" + INSERT + INTO t_chunks_v0 + (chunk_x, chunk_y, webp_data, mtime) + VALUES (?, ?, ?, unixepoch()); + "#, + ) + .unwrap(); + + let mut s_read_chunk = db + .prepare( + r#" + SELECT webp_data + FROM t_chunks_v0 + WHERE chunk_x = ? AND chunk_y = ?; + "#, + ) + .unwrap(); + + let mut s_get_all_chunks = db + .prepare( + r#" + SELECT chunk_x, chunk_y + FROM t_chunks_v0; + "#, + ) + .unwrap(); + + while let Some(command) = command_rx.blocking_recv() { + match command { + Command::SetWallInfo { + created_by, + title, + reply, + } => { + _ = reply.send( + s_set_wall_info + .execute((created_by.0, title)) + .map(|_| ()) + .context("failed to set wall info"), + ); + } + + Command::WriteChunks { chunks, reply } => { + let mut result = Ok(()); + for ChunkDataPair { position, data } in chunks { + if let Err(error) = + s_write_chunk.execute((position.x, position.y, &data[..])) + { + result = Err(error).with_context(|| { + format!("failed to update chunk at {position:?}") + }); + } + } + _ = reply.send(result.context( + "failed to update one or more chunks; see context for last error", + )); + } + + Command::ReadChunks { chunks, reply } => { + let result = chunks + .into_iter() + .flat_map(|position| { + s_read_chunk + .query_row((position.x, position.y), |row| { + Ok(ChunkDataPair { + position, + data: Arc::from(row.get::<_, Vec>(0)?), + }) + }) + .inspect_err(|err| { + if err != &rusqlite::Error::QueryReturnedNoRows { + error!(?err, ?position, "while reading chunk"); + } + }) + .ok() + }) + .collect(); + _ = reply.send(result); + } + + Command::GetAllChunks { reply } => { + _ = reply.send( + s_get_all_chunks + .query_map((), |row| { + Ok(ChunkPosition::new(row.get(0)?, row.get(1)?)) + }) + .map(|chunks| chunks.collect::>()) + .and_then(identity) + .context("failed to query all chunks"), + ) + } + } + } + }) + .context("cannot spawn thread")?; + + Ok(Database { + command_tx, + wall_settings, + }) +} diff --git a/rkgk.toml b/rkgk.toml index 66d450a..4a2202b 100644 --- a/rkgk.toml +++ b/rkgk.toml @@ -1,4 +1,4 @@ -[wall] +[wall_broker.default_wall_settings] # The settings below control the creation of new walls. @@ -13,7 +13,8 @@ max_sessions = 128 # The size of chunks. # Choosing an appropriate size for chunks is a tradeoff between performance and disk space - 168 is -# chosen as a reasonable default +# chosen as a reasonable default which is just small enough to perform operations on fairly quickly +# and responsively. chunk_size = 168 # The size of the area that can be drawn over by a brush, in pixels. @@ -21,6 +22,11 @@ chunk_size = 168 # can produce. paint_area = 504 +[wall_broker.auto_save] + +# How often should modified chunks be saved to the database. +interval_seconds = 10 + [haku] # The settings below control the Haku runtime on the server side. diff --git a/static/canvas-renderer.js b/static/canvas-renderer.js index ca93b35..ac76512 100644 --- a/static/canvas-renderer.js +++ b/static/canvas-renderer.js @@ -90,10 +90,6 @@ class CanvasRenderer extends HTMLElement { this.ctx.globalCompositeOperation = "source-over"; this.ctx.drawImage(chunk.canvas, x, y); } - - this.ctx.globalCompositeOperation = "difference"; - this.ctx.fillStyle = "white"; - this.ctx.fillText(`${chunkX}, ${chunkY}`, x, y + 12); } } diff --git a/static/index.js b/static/index.js index 131eec6..a0e61ab 100644 --- a/static/index.js +++ b/static/index.js @@ -86,7 +86,6 @@ reticleRenderer.connectViewport(canvasRenderer.viewport); let sendViewportUpdate = debounce(updateInterval, () => { let visibleRect = canvasRenderer.getVisibleChunkRect(); session.sendViewport(visibleRect); - console.log("visibleRect", visibleRect); }); canvasRenderer.addEventListener(".viewportUpdate", sendViewportUpdate); sendViewportUpdate();