persistence in database

only maybe a little bit shitty but it works very well
still needs chunk unloading
This commit is contained in:
liquidex 2024-08-18 12:28:57 +02:00
parent 2594afcc1b
commit 70e979057c
16 changed files with 970 additions and 273 deletions

View file

@ -1,4 +1,7 @@
use std::{collections::HashSet, sync::Arc};
use std::{
collections::{HashSet, VecDeque},
sync::Arc,
};
use axum::{
extract::{
@ -13,16 +16,20 @@ use schema::{
ChunkInfo, Error, LoginRequest, LoginResponse, Notify, Online, Request, Version, WallInfo,
};
use serde::{Deserialize, Serialize};
use tokio::{select, sync::mpsc, time::Instant};
use tracing::{error, info};
use tokio::{
select,
sync::{self, mpsc, oneshot},
time::Instant,
};
use tracing::{error, info, instrument};
use crate::{
haku::{Haku, Limits},
login::database::LoginStatus,
schema::Vec2,
wall::{
self, chunk_encoder::ChunkEncoder, chunk_iterator::ChunkIterator, ChunkPosition, JoinError,
SessionHandle, UserInit, Wall,
self, auto_save::AutoSave, chunk_images::ChunkImages, chunk_iterator::ChunkIterator,
database::ChunkDataPair, ChunkPosition, JoinError, SessionHandle, UserInit, Wall,
},
};
@ -110,7 +117,7 @@ async fn fallible_websocket(api: Arc<Api>, ws: &mut WebSocket) -> eyre::Result<(
Some(wall) => wall,
None => api.dbs.wall_broker.generate_id().await,
};
let open_wall = api.dbs.wall_broker.open(wall_id);
let open_wall = api.dbs.wall_broker.open(wall_id).await?;
let session_handle = match open_wall
.wall
@ -178,7 +185,8 @@ async fn fallible_websocket(api: Arc<Api>, ws: &mut WebSocket) -> eyre::Result<(
SessionLoop::start(
open_wall.wall,
open_wall.chunk_encoder,
open_wall.chunk_images,
open_wall.auto_save,
session_handle,
api.config.haku.clone(),
login_request.init.brush,
@ -192,24 +200,33 @@ async fn fallible_websocket(api: Arc<Api>, ws: &mut WebSocket) -> eyre::Result<(
struct SessionLoop {
wall: Arc<Wall>,
chunk_encoder: Arc<ChunkEncoder>,
chunk_images: Arc<ChunkImages>,
auto_save: Arc<AutoSave>,
handle: SessionHandle,
render_commands_tx: mpsc::Sender<RenderCommand>,
viewport_chunks: ChunkIterator,
sent_chunks: HashSet<ChunkPosition>,
pending_images: VecDeque<ChunkDataPair>,
}
enum RenderCommand {
SetBrush { brush: String },
Plot { points: Vec<Vec2> },
SetBrush {
brush: String,
},
Plot {
points: Vec<Vec2>,
done: oneshot::Sender<()>,
},
}
impl SessionLoop {
async fn start(
wall: Arc<Wall>,
chunk_encoder: Arc<ChunkEncoder>,
chunk_images: Arc<ChunkImages>,
auto_save: Arc<AutoSave>,
handle: SessionHandle,
limits: Limits,
brush: String,
@ -230,18 +247,20 @@ impl SessionLoop {
.name(String::from("haku render thread"))
.spawn({
let wall = Arc::clone(&wall);
let chunk_encoder = Arc::clone(&chunk_encoder);
move || Self::render_thread(wall, chunk_encoder, limits, render_commands_rx)
let chunk_images = Arc::clone(&chunk_images);
move || Self::render_thread(wall, chunk_images, limits, render_commands_rx)
})
.context("could not spawn render thread")?;
Ok(Self {
wall,
chunk_encoder,
chunk_images,
auto_save,
handle,
render_commands_tx,
viewport_chunks: ChunkIterator::new(ChunkPosition::new(0, 0), ChunkPosition::new(0, 0)),
sent_chunks: HashSet::new(),
pending_images: VecDeque::new(),
})
}
@ -284,13 +303,28 @@ impl SessionLoop {
.await;
}
wall::EventKind::Plot { points } => {
let chunks_to_modify: Vec<_> =
chunks_to_modify(&self.wall, points).into_iter().collect();
match self.chunk_images.load(chunks_to_modify.clone()).await {
Ok(_) => {
// We drop commands if we take too long to render instead of lagging
// the WebSocket thread.
// Theoretically this will yield much better responsiveness, but it _will_
// result in some visual glitches if we're getting bottlenecked.
let (done_tx, done_rx) = oneshot::channel();
_ = self.render_commands_tx.try_send(RenderCommand::Plot {
points: points.clone(),
})
done: done_tx,
});
let auto_save = Arc::clone(&self.auto_save);
tokio::spawn(async move {
_ = done_rx.await;
auto_save.request(chunks_to_modify).await;
});
}
Err(err) => error!(?err, "while loading chunks for render command"),
}
}
}
@ -320,47 +354,53 @@ impl SessionLoop {
let mut chunk_infos = vec![];
let mut packet = vec![];
if self.pending_images.is_empty() {
let mut positions = vec![];
// Number of chunks iterated is limited per packet, so as not to let the client
// stall the server by sending in a huge viewport.
let start = Instant::now();
let mut iterated = 0;
for i in 0..12000 {
for _ in 0..9000 {
if let Some(position) = self.viewport_chunks.next() {
if self.sent_chunks.contains(&position) {
if !self.sent_chunks.insert(position)
|| !self.chunk_images.chunk_exists(position)
{
continue;
}
positions.push(position);
} else {
break;
}
}
if let Some(encoded) = self.chunk_encoder.encoded(position).await {
self.pending_images
.extend(self.chunk_images.encoded(positions).await);
}
while let Some(ChunkDataPair { position, data }) = self.pending_images.pop_front() {
let offset = packet.len();
packet.extend_from_slice(&encoded);
packet.extend_from_slice(&data);
chunk_infos.push(ChunkInfo {
position,
offset: u32::try_from(offset).context("packet too big")?,
length: u32::try_from(encoded.len()).context("chunk image too big")?,
length: u32::try_from(data.len()).context("chunk image too big")?,
});
// The actual number of chunks per packet is limited by the packet's size, which
// The final number of chunks per packet is limited by the packet's size, which
// we don't want to be too big, to maintain responsiveness - the client will
// only request more chunks once per frame, so interactions still have time to
// execute. We cap it to 256KiB in hopes that noone has Internet slow enough for
// this to cause a disconnect.
//
// Note that after this there _may_ be more chunks pending in the queue.
if packet.len() >= 256 * 1024 {
iterated = i;
break;
}
}
self.sent_chunks.insert(position);
} else {
iterated = i;
break;
}
}
info!(elapsed = ?start.elapsed(), iterated, "send_chunks");
ws.send(to_message(&Notify::Chunks {
chunks: chunk_infos,
has_more: self.viewport_chunks.clone().next().is_some(),
has_more: !self.pending_images.is_empty()
|| self.viewport_chunks.clone().next().is_some(),
}))
.await?;
ws.send(Message::Binary(packet)).await?;
@ -370,7 +410,7 @@ impl SessionLoop {
fn render_thread(
wall: Arc<Wall>,
chunk_encoder: Arc<ChunkEncoder>,
chunk_images: Arc<ChunkImages>,
limits: Limits,
mut commands: mpsc::Receiver<RenderCommand>,
) {
@ -382,29 +422,51 @@ impl SessionLoop {
RenderCommand::SetBrush { brush } => {
brush_ok = haku.set_brush(&brush).is_ok();
}
RenderCommand::Plot { points } => {
RenderCommand::Plot { points, done } => {
if brush_ok {
if let Ok(value) = haku.eval_brush() {
for point in points {
// Ignore the result. It's better if we render _something_ rather
// than nothing.
_ = draw_to_chunks(&haku, value, point, &wall, &chunk_encoder);
_ = draw_to_chunks(&wall, &chunk_images, &haku, value, point);
}
haku.reset_vm();
}
}
_ = done.send(());
}
}
}
}
}
fn chunks_to_modify(wall: &Wall, points: &[Vec2]) -> HashSet<ChunkPosition> {
let mut chunks = HashSet::new();
for point in points {
let paint_area = wall.settings().paint_area as f32;
let left = point.x - paint_area / 2.0;
let top = point.y - paint_area / 2.0;
let top_left_chunk = wall.settings().chunk_at(Vec2::new(left, top));
let bottom_right_chunk = wall
.settings()
.chunk_at(Vec2::new(left + paint_area, top + paint_area));
for chunk_y in top_left_chunk.y..bottom_right_chunk.y {
for chunk_x in top_left_chunk.x..bottom_right_chunk.x {
chunks.insert(ChunkPosition::new(chunk_x, chunk_y));
}
}
}
chunks
}
#[instrument(skip(wall, chunk_images, haku, value))]
fn draw_to_chunks(
wall: &Wall,
chunk_images: &ChunkImages,
haku: &Haku,
value: Value,
center: Vec2,
wall: &Wall,
chunk_encoder: &ChunkEncoder,
) -> eyre::Result<()> {
let settings = wall.settings();
@ -414,10 +476,10 @@ fn draw_to_chunks(
let left = center.x - paint_area / 2.0;
let top = center.y - paint_area / 2.0;
let left_chunk = f32::floor(left / chunk_size) as i32;
let top_chunk = f32::floor(top / chunk_size) as i32;
let right_chunk = f32::ceil((left + paint_area) / chunk_size) as i32;
let bottom_chunk = f32::ceil((top + paint_area) / chunk_size) as i32;
let left_chunk = settings.chunk_at_1d(left);
let top_chunk = settings.chunk_at_1d(top);
let right_chunk = settings.chunk_at_1d(left + paint_area);
let bottom_chunk = settings.chunk_at_1d(top + paint_area);
for chunk_y in top_chunk..bottom_chunk {
for chunk_x in left_chunk..right_chunk {
let x = f32::floor(-chunk_x as f32 * chunk_size + center.x);
@ -428,11 +490,16 @@ fn draw_to_chunks(
}
}
for chunk_y in top_chunk..bottom_chunk {
for chunk_x in left_chunk..right_chunk {
chunk_encoder.invalidate_blocking(ChunkPosition::new(chunk_x, chunk_y))
}
}
// NOTE: Maybe sending in an iterator would be more efficient?
// If there were many chunks modified, (which there probably weren't,) this could allocate
// a lot of memory.
chunk_images.mark_modified_blocking(
(top_chunk..bottom_chunk)
.flat_map(|chunk_y| {
(left_chunk..right_chunk).map(move |chunk_x| ChunkPosition::new(chunk_x, chunk_y))
})
.collect(),
);
Ok(())
}

View file

@ -4,6 +4,6 @@ use crate::wall;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
pub wall: wall::Settings,
pub wall_broker: wall::broker::Settings,
pub haku: crate::haku::Limits,
}

View file

@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize};
use crate::{id, serialization::DeserializeFromStr};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UserId([u8; 32]);
pub struct UserId(pub [u8; 32]);
impl UserId {
pub fn new(rng: &mut dyn RngCore) -> Self {

View file

@ -18,6 +18,7 @@ pub struct Database {
command_tx: mpsc::Sender<Command>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LoginStatus {
ValidUser,
UserDoesNotExist,
@ -33,10 +34,12 @@ enum Command {
nickname: String,
reply: oneshot::Sender<eyre::Result<UserId>>,
},
LogIn {
user_id: UserId,
reply: oneshot::Sender<LoginStatus>,
},
UserInfo {
user_id: UserId,
reply: oneshot::Sender<eyre::Result<Option<UserInfo>>>,
@ -79,8 +82,10 @@ impl Database {
pub fn start(settings: &Settings) -> eyre::Result<Database> {
let db = Connection::open(&settings.path).context("cannot open login database")?;
db.execute(
db.execute_batch(
r#"
PRAGMA application_id = 0x726B674C; -- rkgL
CREATE TABLE IF NOT EXISTS
t_users (
user_index INTEGER PRIMARY KEY,
@ -89,14 +94,15 @@ pub fn start(settings: &Settings) -> eyre::Result<Database> {
last_login_timestamp INTEGER NOT NULL
);
"#,
(),
)?;
let (command_tx, mut command_rx) = mpsc::channel(8);
let mut user_id_rng = rand_chacha::ChaCha20Rng::from_entropy();
tokio::task::spawn_blocking(move || {
std::thread::Builder::new()
.name("login database thread".into())
.spawn(move || {
let mut s_insert_user = db
.prepare(
r#"
@ -140,7 +146,8 @@ pub fn start(settings: &Settings) -> eyre::Result<Database> {
Command::LogIn { user_id, reply } => {
// TODO: User expiration.
let login_status = match s_log_in.execute((Utc::now().timestamp(), user_id.0)) {
let login_status =
match s_log_in.execute((Utc::now().timestamp(), user_id.0)) {
Ok(_) => LoginStatus::ValidUser,
Err(_) => LoginStatus::UserDoesNotExist,
};
@ -160,7 +167,8 @@ pub fn start(settings: &Settings) -> eyre::Result<Database> {
}
}
}
});
})
.context("cannot spawn thread")?;
Ok(Database { command_tx })
}

View file

@ -67,7 +67,9 @@ fn database(config: &Config, paths: &Paths<'_>) -> eyre::Result<Databases> {
})
.context("cannot start up login database")?;
let wall_broker = wall::Broker::new(config.wall);
create_dir_all(paths.database_dir.join("wall"))?;
let wall_broker =
wall::Broker::new(paths.database_dir.join("wall"), config.wall_broker.clone());
Ok(Databases { login, wall_broker })
}
@ -89,7 +91,6 @@ async fn fallible_main() -> eyre::Result<()> {
let dbs = Arc::new(database(&config, &paths)?);
let api = Arc::new(Api { config, dbs });
let app = Router::new()
.route_service(
"/",
@ -116,9 +117,7 @@ async fn main() {
let _client = tracy_client::Client::start();
color_eyre::install().unwrap();
tracing_subscriber::fmt()
.with_span_events(FmtSpan::ACTIVE)
.init();
tracing_subscriber::fmt().init();
match fallible_main().await {
Ok(_) => (),

View file

@ -5,3 +5,9 @@ pub struct Vec2 {
pub x: f32,
pub y: f32,
}
impl Vec2 {
pub fn new(x: f32, y: f32) -> Self {
Self { x, y }
}
}

View file

@ -17,11 +17,14 @@ use tracing::info;
use crate::{id, login::UserId, schema::Vec2, serialization::DeserializeFromStr};
pub mod auto_save;
pub mod broker;
pub mod chunk_encoder;
pub mod chunk_images;
pub mod chunk_iterator;
pub mod database;
pub use broker::Broker;
pub use database::Database;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WallId([u8; 32]);
@ -82,12 +85,18 @@ impl fmt::Display for InvalidWallId {
impl Error for InvalidWallId {}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
#[derive(Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct ChunkPosition {
pub x: i32,
pub y: i32,
}
impl fmt::Debug for ChunkPosition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "({}, {})", self.x, self.y)
}
}
impl ChunkPosition {
pub fn new(x: i32, y: i32) -> Self {
Self { x, y }
@ -115,11 +124,12 @@ pub struct Settings {
}
impl Settings {
pub fn chunk_at_1d(&self, x: f32) -> i32 {
f32::floor(x / self.chunk_size as f32) as i32
}
pub fn chunk_at(&self, position: Vec2) -> ChunkPosition {
ChunkPosition::new(
f32::floor(position.x / self.chunk_size as f32) as i32,
f32::floor(position.y / self.chunk_size as f32) as i32,
)
ChunkPosition::new(self.chunk_at_1d(position.x), self.chunk_at_1d(position.y))
}
}
@ -200,17 +210,19 @@ impl Wall {
&self.settings
}
pub fn has_chunk(&self, at: ChunkPosition) -> bool {
self.chunks.contains_key(&at)
}
pub fn get_chunk(&self, at: ChunkPosition) -> Option<Arc<Mutex<Chunk>>> {
self.chunks.get(&at).map(|chunk| Arc::clone(&chunk))
}
pub fn get_or_create_chunk(&self, at: ChunkPosition) -> Arc<Mutex<Chunk>> {
Arc::clone(
&self
.chunks
.entry(at)
.or_insert_with(|| Arc::new(Mutex::new(Chunk::new(self.settings.chunk_size)))),
)
Arc::clone(&self.chunks.entry(at).or_insert_with(|| {
info!(?at, "chunk created");
Arc::new(Mutex::new(Chunk::new(self.settings.chunk_size)))
}))
}
pub fn join(self: &Arc<Self>, session: Session) -> Result<SessionHandle, JoinError> {

View file

@ -0,0 +1,86 @@
use std::{backtrace::Backtrace, collections::HashSet, sync::Arc, time::Duration};
use dashmap::DashSet;
use serde::{Deserialize, Serialize};
use tokio::{
sync::mpsc,
time::{interval, MissedTickBehavior},
};
use tracing::{info, instrument};
use super::{chunk_images::ChunkImages, ChunkPosition, Database, Wall};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Settings {
pub interval_seconds: u64,
}
pub struct AutoSave {
requests_tx: mpsc::Sender<Vec<ChunkPosition>>,
}
impl AutoSave {
pub fn new(chunk_images: Arc<ChunkImages>, settings: Settings) -> Self {
let (requests_tx, requests_rx) = mpsc::channel(8);
tokio::spawn(
AutoSaveLoop {
chunk_images,
settings,
requests_rx,
unsaved_chunks: HashSet::new(),
}
.enter(),
);
Self { requests_tx }
}
pub async fn request(&self, chunks: Vec<ChunkPosition>) {
_ = self.requests_tx.send(chunks).await;
}
}
struct AutoSaveLoop {
chunk_images: Arc<ChunkImages>,
settings: Settings,
requests_rx: mpsc::Receiver<Vec<ChunkPosition>>,
unsaved_chunks: HashSet<ChunkPosition>,
}
impl AutoSaveLoop {
async fn enter(mut self) {
let mut save_interval = interval(Duration::from_secs(self.settings.interval_seconds));
save_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
request = self.requests_rx.recv() => {
if let Some(positions) = request {
for position in positions {
self.unsaved_chunks.insert(position);
}
} else {
break;
}
}
_ = save_interval.tick() => self.save_chunks().await,
else => break,
}
}
}
#[instrument(skip(self), fields(num_chunks = self.unsaved_chunks.len()))]
async fn save_chunks(&mut self) {
// NOTE: We don't care about actually using the images here -
// the ChunkImages service writes them to the database by itself, and that's all our
// request is for.
_ = self
.chunk_images
.encoded(self.unsaved_chunks.drain().collect())
.await;
}
}

View file

@ -1,19 +1,31 @@
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};
use dashmap::DashMap;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tracing::info;
use tracing::{info, instrument};
use super::{chunk_encoder::ChunkEncoder, Settings, Wall, WallId};
use super::{
auto_save::{self, AutoSave},
chunk_images::ChunkImages,
database, Database, Wall, WallId,
};
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Settings {
pub default_wall_settings: super::Settings,
pub auto_save: auto_save::Settings,
}
/// The broker is the main way to access wall data.
///
/// It handles dynamically loading and unloading walls as they're needed.
/// It also handles database threads for each wall.
pub struct Broker {
wall_settings: Settings,
databases_dir: PathBuf,
settings: Settings,
open_walls: DashMap<WallId, OpenWall>,
rng: Mutex<ChaCha20Rng>,
}
@ -21,14 +33,17 @@ pub struct Broker {
#[derive(Clone)]
pub struct OpenWall {
pub wall: Arc<Wall>,
pub chunk_encoder: Arc<ChunkEncoder>,
pub chunk_images: Arc<ChunkImages>,
pub db: Arc<Database>,
pub auto_save: Arc<AutoSave>,
}
impl Broker {
pub fn new(wall_settings: Settings) -> Self {
info!(?wall_settings, "Broker::new");
pub fn new(databases_dir: PathBuf, settings: Settings) -> Self {
info!(?settings, "Broker::new");
Self {
wall_settings,
databases_dir,
settings,
open_walls: DashMap::new(),
rng: Mutex::new(ChaCha20Rng::from_entropy()),
}
@ -41,14 +56,35 @@ impl Broker {
WallId::new(&mut *rng)
}
pub fn open(&self, wall_id: WallId) -> OpenWall {
let wall = Arc::new(Wall::new(self.wall_settings));
self.open_walls
.entry(wall_id)
.or_insert_with(|| OpenWall {
chunk_encoder: Arc::new(ChunkEncoder::start(Arc::clone(&wall))),
#[instrument(skip(self), fields(%wall_id))]
pub async fn open(&self, wall_id: WallId) -> eyre::Result<OpenWall> {
let open_wall = self.open_walls.entry(wall_id);
match open_wall {
dashmap::Entry::Vacant(entry) => {
let db = Arc::new(database::start(database::Settings {
path: self.databases_dir.join(format!("{wall_id}.db")),
wall_id,
default_wall_settings: self.settings.default_wall_settings,
})?);
let wall = Arc::new(Wall::new(*db.wall_settings()));
let chunk_images = Arc::new(ChunkImages::new(Arc::clone(&wall), Arc::clone(&db)));
let auto_save = Arc::new(AutoSave::new(
Arc::clone(&chunk_images),
self.settings.auto_save.clone(),
));
let open_wall = OpenWall {
wall,
})
.clone()
chunk_images,
db,
auto_save,
};
entry.insert(open_wall.clone());
Ok(open_wall)
}
dashmap::Entry::Occupied(entry) => Ok(entry.get().clone()),
}
}
}

View file

@ -1,104 +0,0 @@
use std::sync::Arc;
use indexmap::IndexMap;
use tokio::sync::{mpsc, oneshot};
use super::{ChunkPosition, Wall};
/// Service which encodes chunks to WebP images and caches them in an LRU fashion.
pub struct ChunkEncoder {
commands_tx: mpsc::Sender<Command>,
}
enum Command {
GetEncoded {
chunk: ChunkPosition,
reply: oneshot::Sender<Option<Arc<[u8]>>>,
},
Invalidate {
chunk: ChunkPosition,
},
}
impl ChunkEncoder {
pub fn start(wall: Arc<Wall>) -> Self {
let (commands_tx, commands_rx) = mpsc::channel(32);
tokio::spawn(Self::service(wall, commands_rx));
Self { commands_tx }
}
pub async fn encoded(&self, chunk: ChunkPosition) -> Option<Arc<[u8]>> {
let (tx, rx) = oneshot::channel();
self.commands_tx
.send(Command::GetEncoded { chunk, reply: tx })
.await
.ok()?;
rx.await.ok().flatten()
}
pub async fn invalidate(&self, chunk: ChunkPosition) {
_ = self.commands_tx.send(Command::Invalidate { chunk }).await;
}
pub fn invalidate_blocking(&self, chunk: ChunkPosition) {
_ = self
.commands_tx
.blocking_send(Command::Invalidate { chunk });
}
async fn encode(wall: &Wall, chunk: ChunkPosition) -> Option<Arc<[u8]>> {
let pixmap = {
// Clone out the pixmap to avoid unnecessary chunk mutex contention while the
// chunk is being encoded.
let chunk_ref = wall.get_chunk(chunk)?;
let chunk = chunk_ref.lock().await;
chunk.pixmap.clone()
};
let image = tokio::task::spawn_blocking(move || {
let webp = webp::Encoder::new(
pixmap.data(),
webp::PixelLayout::Rgba,
pixmap.width(),
pixmap.height(),
);
// NOTE: There's an unnecessary copy here. Wonder if that kills performance much.
webp.encode_lossless().to_vec()
})
.await
.ok()?;
Some(Arc::from(image))
}
async fn service(wall: Arc<Wall>, mut commands_rx: mpsc::Receiver<Command>) {
let mut encoded_lru: IndexMap<ChunkPosition, Option<Arc<[u8]>>> = IndexMap::new();
while let Some(command) = commands_rx.recv().await {
match command {
Command::GetEncoded { chunk, reply } => {
if let Some(encoded) = encoded_lru.get(&chunk) {
_ = reply.send(encoded.clone())
} else {
let encoded = Self::encode(&wall, chunk).await;
// TODO: Make this capacity configurable.
// 598 is chosen because under the default configuration, it would
// correspond to roughly two 3840x2160 displays.
if encoded_lru.len() >= 598 {
encoded_lru.shift_remove_index(0);
}
encoded_lru.insert(chunk, encoded.clone());
_ = reply.send(encoded);
}
}
Command::Invalidate { chunk } => {
encoded_lru.shift_remove(&chunk);
}
}
}
}
}

View file

@ -0,0 +1,238 @@
use std::sync::Arc;
use dashmap::DashSet;
use eyre::Context;
use haku::render::tiny_skia::{IntSize, Pixmap};
use rayon::iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator};
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info, instrument};
use super::{database::ChunkDataPair, ChunkPosition, Database, Wall};
/// Chunk image encoding, caching, and storage service.
pub struct ChunkImages {
wall: Arc<Wall>,
async_loop: Arc<ChunkImageLoop>,
commands_tx: mpsc::Sender<Command>,
}
enum Command {
Encode {
chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<Vec<ChunkDataPair>>,
},
Load {
chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<eyre::Result<()>>,
},
MarkModified {
chunks: Vec<ChunkPosition>,
},
}
impl ChunkImages {
pub fn new(wall: Arc<Wall>, db: Arc<Database>) -> Self {
let (commands_tx, commands_rx) = mpsc::channel(32);
let async_loop = Arc::new(ChunkImageLoop {
wall: Arc::clone(&wall),
db,
chunks_in_db: DashSet::new(),
});
tokio::spawn(Arc::clone(&async_loop).enter(commands_rx));
Self {
wall,
async_loop,
commands_tx,
}
}
pub async fn encoded(&self, chunks: Vec<ChunkPosition>) -> Vec<ChunkDataPair> {
let (tx, rx) = oneshot::channel();
_ = self
.commands_tx
.send(Command::Encode { chunks, reply: tx })
.await
.ok();
rx.await.ok().unwrap_or_default()
}
pub async fn load(&self, chunks: Vec<ChunkPosition>) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.commands_tx
.send(Command::Load { chunks, reply: tx })
.await
.context("database is offline")?;
rx.await.context("failed to load chunks")?
}
pub fn mark_modified_blocking(&self, chunks: Vec<ChunkPosition>) {
_ = self
.commands_tx
.blocking_send(Command::MarkModified { chunks });
}
pub fn chunk_exists(&self, position: ChunkPosition) -> bool {
self.wall.has_chunk(position) || self.async_loop.chunks_in_db.contains(&position)
}
}
struct ChunkImageLoop {
wall: Arc<Wall>,
db: Arc<Database>,
chunks_in_db: DashSet<ChunkPosition>,
}
impl ChunkImageLoop {
#[instrument(skip(self, reply))]
async fn encode(
self: Arc<Self>,
mut chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<Vec<ChunkDataPair>>,
) {
// Any chunks that are _not_ loaded, we will send back directly from the database.
let mut unloaded = vec![];
chunks.retain(|&position| {
let is_loaded = self.wall.has_chunk(position);
unloaded.push(position);
is_loaded
});
// Any chunks that _are_ loaded, we will encode into WebP.
let this = Arc::clone(&self);
let loaded = tokio::task::spawn_blocking(move || {
chunks
.into_par_iter()
.flat_map(|position| -> Option<_> {
let pixmap = {
// Clone out the pixmap to avoid unnecessary chunk mutex contention while the
// chunk is being encoded.
let chunk_ref = this.wall.get_chunk(position)?;
let chunk = chunk_ref.blocking_lock();
chunk.pixmap.clone()
};
let webp = webp::Encoder::new(
pixmap.data(),
webp::PixelLayout::Rgba,
pixmap.width(),
pixmap.height(),
);
// NOTE: There's an unnecessary copy here. Wonder if that kills performance much.
Some(ChunkDataPair {
position,
data: Arc::from(webp.encode_lossless().to_vec()),
})
})
.collect::<Vec<_>>()
})
.await
.unwrap();
// We'll also write the loaded chunks back to the database while we have the images.
// No point wasting the encoding time.
if !loaded.is_empty() {
info!(num = loaded.len(), "writing loaded chunks to database");
}
_ = self.db.write_chunks(loaded.clone()).await;
for &ChunkDataPair { position, .. } in &loaded {
self.chunks_in_db.insert(position);
}
let mut all = loaded;
match self.db.read_chunks(unloaded).await {
Ok(mut chunks) => all.append(&mut chunks),
Err(err) => error!(?err, "read_chunks failed to read unloaded chunks"),
}
_ = reply.send(all);
}
async fn load_inner(self: Arc<Self>, mut chunks: Vec<ChunkPosition>) -> eyre::Result<()> {
// Skip already loaded chunks.
chunks.retain(|&position| !self.wall.has_chunk(position));
if chunks.is_empty() {
return Ok(());
}
info!(?chunks, "to load");
let chunks = self.db.read_chunks(chunks.clone()).await?;
let chunks2 = chunks.clone();
let decoded = tokio::task::spawn_blocking(move || {
chunks2
.par_iter()
.flat_map(|ChunkDataPair { position, data }| {
webp::Decoder::new(data)
.decode()
.and_then(|image| {
info!(?position, "decoded");
let size = IntSize::from_wh(image.width(), image.height())?;
Pixmap::from_vec(image.to_vec(), size)
})
.map(|pixmap| (*position, pixmap))
})
.collect::<Vec<(ChunkPosition, Pixmap)>>()
})
.await
.context("failed to decode chunks from the database")?;
// I don't know yet if locking all the chunks is a good idea at this point.
// I can imagine contended chunks having some trouble loading.
let chunk_arcs: Vec<_> = chunks
.iter()
.map(|ChunkDataPair { position, .. }| self.wall.get_or_create_chunk(*position))
.collect();
let mut chunk_refs = Vec::with_capacity(chunk_arcs.len());
for arc in &chunk_arcs {
chunk_refs.push(arc.lock().await);
}
for ((_, pixmap), mut chunk) in decoded.into_iter().zip(chunk_refs) {
chunk.pixmap = pixmap;
}
Ok(())
}
#[instrument(skip(self, reply))]
async fn load(
self: Arc<Self>,
chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<eyre::Result<()>>,
) {
_ = reply.send(self.load_inner(chunks).await);
}
async fn enter(self: Arc<Self>, mut commands_rx: mpsc::Receiver<Command>) {
let all_chunks = self
.db
.get_all_chunks()
.await
.expect("could not list chunks in the database");
for position in all_chunks {
self.chunks_in_db.insert(position);
}
while let Some(command) = commands_rx.recv().await {
match command {
Command::Encode { chunks, reply } => {
// TODO: This should have a caching layer.
tokio::spawn(Arc::clone(&self).encode(chunks, reply));
}
Command::Load { chunks, reply } => {
tokio::spawn(Arc::clone(&self).load(chunks, reply));
}
Command::MarkModified { chunks } => {
// TODO: This should invalidate data from the caching layer.
}
}
}
}
}

View file

@ -1,3 +1,5 @@
use std::iter::Take;
use super::ChunkPosition;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -8,13 +10,35 @@ pub struct ChunkIterator {
}
impl ChunkIterator {
pub fn new(top_left: ChunkPosition, bottom_right: ChunkPosition) -> Self {
pub fn new(start: ChunkPosition, end: ChunkPosition) -> Self {
let top_left = ChunkPosition::new(start.x.min(end.x), start.y.min(end.y));
let bottom_right = ChunkPosition::new(start.x.max(end.x), start.y.max(end.y));
Self {
cursor: top_left,
left: top_left.x,
bottom_right,
}
}
pub fn take_next(&mut self, n: i32) -> Take<Self> {
assert!(n > 0);
let take = (*self).take(n as usize);
let x = self.cursor.x - self.left;
let width = self.bottom_right.x - self.left;
if width != 0 {
self.cursor.x = self.left + (x + n) % width;
self.cursor.y += n / width;
} else {
// In a width = 0 configuration, we iterate vertically.
// This is probably not the right thing to do, but we're just doing this to guard
// against malicious clients.
self.cursor.y += n;
}
take
}
}
impl Iterator for ChunkIterator {

View file

@ -0,0 +1,324 @@
use std::{
convert::identity,
path::{Path, PathBuf},
sync::Arc,
};
use chrono::Utc;
use eyre::Context;
use rusqlite::Connection;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info, instrument};
use crate::login::UserId;
use super::{ChunkPosition, WallId};
pub struct Settings {
pub path: PathBuf,
pub wall_id: WallId,
pub default_wall_settings: super::Settings,
}
pub struct Database {
wall_settings: super::Settings,
command_tx: mpsc::Sender<Command>,
}
#[derive(Debug, Clone)]
pub struct ChunkDataPair {
pub position: ChunkPosition,
pub data: Arc<[u8]>,
}
enum Command {
SetWallInfo {
created_by: UserId,
title: String,
reply: oneshot::Sender<eyre::Result<()>>,
},
WriteChunks {
chunks: Vec<ChunkDataPair>,
reply: oneshot::Sender<eyre::Result<()>>,
},
ReadChunks {
chunks: Vec<ChunkPosition>,
reply: oneshot::Sender<Vec<ChunkDataPair>>,
},
GetAllChunks {
reply: oneshot::Sender<eyre::Result<Vec<ChunkPosition>>>,
},
}
impl Database {
pub fn wall_settings(&self) -> &super::Settings {
&self.wall_settings
}
#[instrument(skip(self), err(Debug))]
pub async fn write_chunks(&self, chunks: Vec<ChunkDataPair>) -> eyre::Result<()> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::WriteChunks { chunks, reply: tx })
.await
.context("database is offline")?;
rx.await.context("database returned an error")?
}
pub async fn read_chunks(
&self,
chunks: Vec<ChunkPosition>,
) -> eyre::Result<Vec<ChunkDataPair>> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::ReadChunks { chunks, reply: tx })
.await
.context("database is offline")?;
rx.await.context("database did not return anything")
}
pub async fn get_all_chunks(&self) -> eyre::Result<Vec<ChunkPosition>> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::GetAllChunks { reply: tx })
.await
.context("database is offline")?;
rx.await.context("database did not return anything")?
}
}
#[instrument(name = "wall::database::start", skip(settings), fields(wall_id = %settings.wall_id))]
pub fn start(settings: Settings) -> eyre::Result<Database> {
let db = Connection::open(settings.path).context("cannot open wall database")?;
let major: u32 = env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap();
let minor: u32 = env!("CARGO_PKG_VERSION_MINOR").parse().unwrap();
let patch: u32 = env!("CARGO_PKG_VERSION_PATCH").parse().unwrap();
let version = major * 1_000_000 + minor * 1_000 + patch;
info!("initial setup");
db.execute_batch(
r#"
PRAGMA application_id = 0x726B6757; -- rkgW
CREATE TABLE IF NOT EXISTS
t_file_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
version INTEGER NOT NULL,
wall_id BLOB NOT NULL,
mtime INTEGER NOT NULL DEFAULT (unixepoch())
);
CREATE TABLE IF NOT EXISTS
t_wall_settings (
id INTEGER PRIMARY KEY CHECK (id = 1),
max_chunks INTEGER NOT NULL,
max_sessions INTEGER NOT NULL,
paint_area INTEGER NOT NULL,
chunk_size INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS
t_wall_info (
id INTEGER PRIMARY KEY CHECK (id = 1),
created_by BLOB NOT NULL,
title TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS
t_chunks_v0 (
chunk_index INTEGER PRIMARY KEY,
chunk_x INTEGER NOT NULL,
chunk_y INTEGER NOT NULL,
mtime INTEGER NOT NULL DEFAULT (unixepoch()),
webp_data BLOB NOT NULL,
UNIQUE(chunk_x, chunk_y) ON CONFLICT REPLACE
);
CREATE INDEX IF NOT EXISTS
t_chunks_v0_index_xy ON t_chunks_v0
(chunk_x, chunk_y);
"#,
)?;
info!("set file version and wall ID");
db.execute(
r#"
INSERT OR IGNORE
INTO t_file_info
(version, wall_id, mtime)
VALUES (?, ?, unixepoch());
"#,
(version, settings.wall_id.0),
)?;
info!("set wall mtime");
db.execute(
r#"
UPDATE t_file_info
SET mtime = unixepoch();
"#,
(),
)?;
info!("initialize/get wall settings");
db.execute(
r#"
INSERT OR IGNORE
INTO t_wall_settings
(max_chunks, max_sessions, paint_area, chunk_size)
VALUES (?, ?, ?, ?);
"#,
(
settings.default_wall_settings.max_chunks,
settings.default_wall_settings.max_sessions,
settings.default_wall_settings.paint_area,
settings.default_wall_settings.chunk_size,
),
)?;
let wall_settings = db.query_row(
r#"
SELECT
max_chunks, max_sessions, paint_area, chunk_size
FROM t_wall_settings;
"#,
(),
|row| {
Ok(super::Settings {
max_chunks: row.get(0)?,
max_sessions: row.get(1)?,
paint_area: row.get(2)?,
chunk_size: row.get(3)?,
})
},
)?;
let (command_tx, mut command_rx) = mpsc::channel(8);
std::thread::Builder::new()
.name(format!("database thread {}", settings.wall_id))
.spawn(move || {
let mut s_set_wall_info = db
.prepare(
r#"
INSERT OR REPLACE
INTO t_wall_info
(created_by, title)
VALUES (?, ?);
"#,
)
.unwrap();
let mut s_write_chunk = db
.prepare(
r#"
INSERT
INTO t_chunks_v0
(chunk_x, chunk_y, webp_data, mtime)
VALUES (?, ?, ?, unixepoch());
"#,
)
.unwrap();
let mut s_read_chunk = db
.prepare(
r#"
SELECT webp_data
FROM t_chunks_v0
WHERE chunk_x = ? AND chunk_y = ?;
"#,
)
.unwrap();
let mut s_get_all_chunks = db
.prepare(
r#"
SELECT chunk_x, chunk_y
FROM t_chunks_v0;
"#,
)
.unwrap();
while let Some(command) = command_rx.blocking_recv() {
match command {
Command::SetWallInfo {
created_by,
title,
reply,
} => {
_ = reply.send(
s_set_wall_info
.execute((created_by.0, title))
.map(|_| ())
.context("failed to set wall info"),
);
}
Command::WriteChunks { chunks, reply } => {
let mut result = Ok(());
for ChunkDataPair { position, data } in chunks {
if let Err(error) =
s_write_chunk.execute((position.x, position.y, &data[..]))
{
result = Err(error).with_context(|| {
format!("failed to update chunk at {position:?}")
});
}
}
_ = reply.send(result.context(
"failed to update one or more chunks; see context for last error",
));
}
Command::ReadChunks { chunks, reply } => {
let result = chunks
.into_iter()
.flat_map(|position| {
s_read_chunk
.query_row((position.x, position.y), |row| {
Ok(ChunkDataPair {
position,
data: Arc::from(row.get::<_, Vec<u8>>(0)?),
})
})
.inspect_err(|err| {
if err != &rusqlite::Error::QueryReturnedNoRows {
error!(?err, ?position, "while reading chunk");
}
})
.ok()
})
.collect();
_ = reply.send(result);
}
Command::GetAllChunks { reply } => {
_ = reply.send(
s_get_all_chunks
.query_map((), |row| {
Ok(ChunkPosition::new(row.get(0)?, row.get(1)?))
})
.map(|chunks| chunks.collect::<Result<_, _>>())
.and_then(identity)
.context("failed to query all chunks"),
)
}
}
}
})
.context("cannot spawn thread")?;
Ok(Database {
command_tx,
wall_settings,
})
}

View file

@ -1,4 +1,4 @@
[wall]
[wall_broker.default_wall_settings]
# The settings below control the creation of new walls.
@ -13,7 +13,8 @@ max_sessions = 128
# The size of chunks.
# Choosing an appropriate size for chunks is a tradeoff between performance and disk space - 168 is
# chosen as a reasonable default
# chosen as a reasonable default which is just small enough to perform operations on fairly quickly
# and responsively.
chunk_size = 168
# The size of the area that can be drawn over by a brush, in pixels.
@ -21,6 +22,11 @@ chunk_size = 168
# can produce.
paint_area = 504
[wall_broker.auto_save]
# How often should modified chunks be saved to the database.
interval_seconds = 10
[haku]
# The settings below control the Haku runtime on the server side.

View file

@ -90,10 +90,6 @@ class CanvasRenderer extends HTMLElement {
this.ctx.globalCompositeOperation = "source-over";
this.ctx.drawImage(chunk.canvas, x, y);
}
this.ctx.globalCompositeOperation = "difference";
this.ctx.fillStyle = "white";
this.ctx.fillText(`${chunkX}, ${chunkY}`, x, y + 12);
}
}

View file

@ -86,7 +86,6 @@ reticleRenderer.connectViewport(canvasRenderer.viewport);
let sendViewportUpdate = debounce(updateInterval, () => {
let visibleRect = canvasRenderer.getVisibleChunkRect();
session.sendViewport(visibleRect);
console.log("visibleRect", visibleRect);
});
canvasRenderer.addEventListener(".viewportUpdate", sendViewportUpdate);
sendViewportUpdate();