sync
This commit is contained in:
parent
26ba098183
commit
2f7bcbb14e
30 changed files with 1691 additions and 315 deletions
|
@ -11,4 +11,5 @@ arrayvec = { version = "0.7.4", default-features = false }
|
|||
dlmalloc = { version = "0.2.6", features = ["global"] }
|
||||
haku.workspace = true
|
||||
log.workspace = true
|
||||
paste = "1.0.15"
|
||||
|
||||
|
|
|
@ -14,10 +14,10 @@ use haku::{
|
|||
},
|
||||
sexp::{parse_toplevel, Ast, Parser},
|
||||
system::{ChunkId, System, SystemImage},
|
||||
value::{BytecodeLoc, Closure, FunctionName, Ref},
|
||||
value::{BytecodeLoc, Closure, FunctionName, Ref, Value},
|
||||
vm::{Exception, Vm, VmImage, VmLimits},
|
||||
};
|
||||
use log::info;
|
||||
use log::{debug, info};
|
||||
|
||||
pub mod logging;
|
||||
mod panicking;
|
||||
|
@ -66,6 +66,41 @@ impl Default for Limits {
|
|||
}
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
extern "C" fn haku_limits_new() -> *mut Limits {
|
||||
Box::leak(Box::new(Limits::default()))
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_limits_destroy(limits: *mut Limits) {
|
||||
drop(Box::from_raw(limits))
|
||||
}
|
||||
|
||||
macro_rules! limit_setter {
|
||||
($name:tt) => {
|
||||
paste::paste! {
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn [<haku_limits_set_ $name>](limits: *mut Limits, value: usize) {
|
||||
debug!("set limit {} = {value}", stringify!($name));
|
||||
|
||||
let limits = &mut *limits;
|
||||
limits.$name = value;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
limit_setter!(max_chunks);
|
||||
limit_setter!(max_defs);
|
||||
limit_setter!(ast_capacity);
|
||||
limit_setter!(chunk_capacity);
|
||||
limit_setter!(stack_capacity);
|
||||
limit_setter!(call_stack_capacity);
|
||||
limit_setter!(ref_capacity);
|
||||
limit_setter!(fuel);
|
||||
limit_setter!(pixmap_stack_capacity);
|
||||
limit_setter!(transform_stack_capacity);
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct Instance {
|
||||
limits: Limits,
|
||||
|
@ -76,13 +111,16 @@ struct Instance {
|
|||
defs_image: DefsImage,
|
||||
vm: Vm,
|
||||
vm_image: VmImage,
|
||||
|
||||
value: Value,
|
||||
exception: Option<Exception>,
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_instance_new() -> *mut Instance {
|
||||
// TODO: This should be a parameter.
|
||||
let limits = Limits::default();
|
||||
unsafe extern "C" fn haku_instance_new(limits: *const Limits) -> *mut Instance {
|
||||
let limits = *limits;
|
||||
debug!("creating new instance with limits: {limits:?}");
|
||||
|
||||
let system = System::new(limits.max_chunks);
|
||||
|
||||
let defs = Defs::new(limits.max_defs);
|
||||
|
@ -108,6 +146,7 @@ unsafe extern "C" fn haku_instance_new() -> *mut Instance {
|
|||
defs_image,
|
||||
vm,
|
||||
vm_image,
|
||||
value: Value::Nil,
|
||||
exception: None,
|
||||
});
|
||||
Box::leak(instance)
|
||||
|
@ -125,6 +164,12 @@ unsafe extern "C" fn haku_reset(instance: *mut Instance) {
|
|||
instance.defs.restore_image(&instance.defs_image);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_reset_vm(instance: *mut Instance) {
|
||||
let instance = &mut *instance;
|
||||
instance.vm.restore_image(&instance.vm_image);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_has_exception(instance: *mut Instance) -> bool {
|
||||
(*instance).exception.is_some()
|
||||
|
@ -285,13 +330,13 @@ unsafe extern "C" fn haku_compile_brush(
|
|||
}
|
||||
|
||||
struct PixmapLock {
|
||||
pixmap: Option<Pixmap>,
|
||||
pixmap: Pixmap,
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
extern "C" fn haku_pixmap_new(width: u32, height: u32) -> *mut PixmapLock {
|
||||
Box::leak(Box::new(PixmapLock {
|
||||
pixmap: Some(Pixmap::new(width, height).expect("invalid pixmap size")),
|
||||
pixmap: Pixmap::new(width, height).expect("invalid pixmap size"),
|
||||
}))
|
||||
}
|
||||
|
||||
|
@ -302,32 +347,18 @@ unsafe extern "C" fn haku_pixmap_destroy(pixmap: *mut PixmapLock) {
|
|||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_pixmap_data(pixmap: *mut PixmapLock) -> *mut u8 {
|
||||
let pixmap = (*pixmap)
|
||||
.pixmap
|
||||
.as_mut()
|
||||
.expect("pixmap is already being rendered to");
|
||||
|
||||
let pixmap = &mut (*pixmap).pixmap;
|
||||
pixmap.pixels_mut().as_mut_ptr() as *mut u8
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_pixmap_clear(pixmap: *mut PixmapLock) {
|
||||
let pixmap = (*pixmap)
|
||||
.pixmap
|
||||
.as_mut()
|
||||
.expect("pixmap is already being rendered to");
|
||||
let pixmap = &mut (*pixmap).pixmap;
|
||||
pixmap.pixels_mut().fill(PremultipliedColorU8::TRANSPARENT);
|
||||
}
|
||||
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_render_brush(
|
||||
instance: *mut Instance,
|
||||
brush: *const Brush,
|
||||
pixmap_a: *mut PixmapLock,
|
||||
pixmap_b: *mut PixmapLock,
|
||||
translation_x: f32,
|
||||
translation_y: f32,
|
||||
) -> StatusCode {
|
||||
unsafe extern "C" fn haku_eval_brush(instance: *mut Instance, brush: *const Brush) -> StatusCode {
|
||||
let instance = &mut *instance;
|
||||
let brush = &*brush;
|
||||
|
||||
|
@ -347,7 +378,7 @@ unsafe extern "C" fn haku_render_brush(
|
|||
return StatusCode::OutOfRefSlots;
|
||||
};
|
||||
|
||||
let scribble = match instance.vm.run(&instance.system, closure_id) {
|
||||
instance.value = match instance.vm.run(&instance.system, closure_id) {
|
||||
Ok(value) => value,
|
||||
Err(exn) => {
|
||||
instance.exception = Some(exn);
|
||||
|
@ -355,47 +386,36 @@ unsafe extern "C" fn haku_render_brush(
|
|||
}
|
||||
};
|
||||
|
||||
let mut render = |pixmap: *mut PixmapLock| {
|
||||
let pixmap_locked = (*pixmap)
|
||||
.pixmap
|
||||
.take()
|
||||
.expect("pixmap is already being rendered to");
|
||||
StatusCode::Ok
|
||||
}
|
||||
|
||||
let mut renderer = Renderer::new(
|
||||
pixmap_locked,
|
||||
&RendererLimits {
|
||||
pixmap_stack_capacity: instance.limits.pixmap_stack_capacity,
|
||||
transform_stack_capacity: instance.limits.transform_stack_capacity,
|
||||
},
|
||||
);
|
||||
renderer.translate(translation_x, translation_y);
|
||||
match renderer.render(&instance.vm, scribble) {
|
||||
Ok(()) => (),
|
||||
Err(exn) => {
|
||||
instance.exception = Some(exn);
|
||||
return StatusCode::RenderException;
|
||||
}
|
||||
}
|
||||
#[no_mangle]
|
||||
unsafe extern "C" fn haku_render_value(
|
||||
instance: *mut Instance,
|
||||
pixmap: *mut PixmapLock,
|
||||
translation_x: f32,
|
||||
translation_y: f32,
|
||||
) -> StatusCode {
|
||||
let instance = &mut *instance;
|
||||
|
||||
let pixmap_locked = renderer.finish();
|
||||
let pixmap_locked = &mut (*pixmap).pixmap;
|
||||
|
||||
(*pixmap).pixmap = Some(pixmap_locked);
|
||||
|
||||
StatusCode::Ok
|
||||
};
|
||||
|
||||
match render(pixmap_a) {
|
||||
StatusCode::Ok => (),
|
||||
other => return other,
|
||||
}
|
||||
if !pixmap_b.is_null() {
|
||||
match render(pixmap_b) {
|
||||
StatusCode::Ok => (),
|
||||
other => return other,
|
||||
let mut renderer = Renderer::new(
|
||||
pixmap_locked,
|
||||
&RendererLimits {
|
||||
pixmap_stack_capacity: instance.limits.pixmap_stack_capacity,
|
||||
transform_stack_capacity: instance.limits.transform_stack_capacity,
|
||||
},
|
||||
);
|
||||
renderer.translate(translation_x, translation_y);
|
||||
match renderer.render(&instance.vm, instance.value) {
|
||||
Ok(()) => (),
|
||||
Err(exn) => {
|
||||
instance.exception = Some(exn);
|
||||
instance.vm.restore_image(&instance.vm_image);
|
||||
return StatusCode::RenderException;
|
||||
}
|
||||
}
|
||||
|
||||
instance.vm.restore_image(&instance.vm_image);
|
||||
|
||||
StatusCode::Ok
|
||||
}
|
||||
|
|
|
@ -15,18 +15,23 @@ pub struct RendererLimits {
|
|||
pub transform_stack_capacity: usize,
|
||||
}
|
||||
|
||||
pub struct Renderer {
|
||||
pixmap_stack: Vec<Pixmap>,
|
||||
pub enum RenderTarget<'a> {
|
||||
Borrowed(&'a mut Pixmap),
|
||||
Owned(Pixmap),
|
||||
}
|
||||
|
||||
pub struct Renderer<'a> {
|
||||
pixmap_stack: Vec<RenderTarget<'a>>,
|
||||
transform_stack: Vec<Transform>,
|
||||
}
|
||||
|
||||
impl Renderer {
|
||||
pub fn new(pixmap: Pixmap, limits: &RendererLimits) -> Self {
|
||||
impl<'a> Renderer<'a> {
|
||||
pub fn new(pixmap: &'a mut Pixmap, limits: &RendererLimits) -> Self {
|
||||
assert!(limits.pixmap_stack_capacity > 0);
|
||||
assert!(limits.transform_stack_capacity > 0);
|
||||
|
||||
let mut blend_stack = Vec::with_capacity(limits.pixmap_stack_capacity);
|
||||
blend_stack.push(pixmap);
|
||||
blend_stack.push(RenderTarget::Borrowed(pixmap));
|
||||
|
||||
let mut transform_stack = Vec::with_capacity(limits.transform_stack_capacity);
|
||||
transform_stack.push(Transform::identity());
|
||||
|
@ -55,7 +60,10 @@ impl Renderer {
|
|||
}
|
||||
|
||||
fn pixmap_mut(&mut self) -> &mut Pixmap {
|
||||
self.pixmap_stack.last_mut().unwrap()
|
||||
match self.pixmap_stack.last_mut().unwrap() {
|
||||
RenderTarget::Borrowed(pixmap) => pixmap,
|
||||
RenderTarget::Owned(pixmap) => pixmap,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn render(&mut self, vm: &Vm, value: Value) -> Result<(), Exception> {
|
||||
|
@ -123,10 +131,6 @@ impl Renderer {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn finish(mut self) -> Pixmap {
|
||||
self.pixmap_stack.drain(..).next().unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
fn default_paint() -> Paint<'static> {
|
||||
|
|
6
crates/rkgk-image-ops/Cargo.toml
Normal file
6
crates/rkgk-image-ops/Cargo.toml
Normal file
|
@ -0,0 +1,6 @@
|
|||
[package]
|
||||
name = "rkgk-image-ops"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
14
crates/rkgk-image-ops/src/lib.rs
Normal file
14
crates/rkgk-image-ops/src/lib.rs
Normal file
|
@ -0,0 +1,14 @@
|
|||
pub fn add(left: u64, right: u64) -> u64 {
|
||||
left + right
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn it_works() {
|
||||
let result = add(2, 2);
|
||||
assert_eq!(result, 4);
|
||||
}
|
||||
}
|
|
@ -13,8 +13,10 @@ dashmap = "6.0.1"
|
|||
derive_more = { version = "1.0.0", features = ["try_from"] }
|
||||
eyre = "0.6.12"
|
||||
haku.workspace = true
|
||||
indexmap = "2.4.0"
|
||||
rand = "0.8.5"
|
||||
rand_chacha = "0.3.1"
|
||||
rayon = "1.10.0"
|
||||
rusqlite = { version = "0.32.1", features = ["bundled"] }
|
||||
serde = { version = "1.0.206", features = ["derive"] }
|
||||
serde_json = "1.0.124"
|
||||
|
@ -23,3 +25,9 @@ toml = "0.8.19"
|
|||
tower-http = { version = "0.5.2", features = ["fs"] }
|
||||
tracing = "0.1.40"
|
||||
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }
|
||||
tracy-client = { version = "0.17.1", optional = true}
|
||||
webp = "0.3.0"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
memory-profiling = ["dep:tracy-client"]
|
||||
|
|
|
@ -9,15 +9,20 @@ use axum::{
|
|||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::Databases;
|
||||
use crate::{config::Config, Databases};
|
||||
|
||||
mod wall;
|
||||
|
||||
pub fn router<S>(dbs: Arc<Databases>) -> Router<S> {
|
||||
pub struct Api {
|
||||
pub config: Config,
|
||||
pub dbs: Arc<Databases>,
|
||||
}
|
||||
|
||||
pub fn router<S>(api: Arc<Api>) -> Router<S> {
|
||||
Router::new()
|
||||
.route("/login", post(login_new))
|
||||
.route("/wall", get(wall::wall))
|
||||
.with_state(dbs)
|
||||
.with_state(api)
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
|
@ -35,7 +40,7 @@ enum NewUserResponse {
|
|||
Error { message: String },
|
||||
}
|
||||
|
||||
async fn login_new(dbs: State<Arc<Databases>>, params: Json<NewUserParams>) -> impl IntoResponse {
|
||||
async fn login_new(api: State<Arc<Api>>, params: Json<NewUserParams>) -> impl IntoResponse {
|
||||
if !(1..=32).contains(¶ms.nickname.len()) {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
|
@ -45,7 +50,7 @@ async fn login_new(dbs: State<Arc<Databases>>, params: Json<NewUserParams>) -> i
|
|||
);
|
||||
}
|
||||
|
||||
match dbs.login.new_user(params.0.nickname).await {
|
||||
match api.dbs.login.new_user(params.0.nickname).await {
|
||||
Ok(user_id) => (
|
||||
StatusCode::OK,
|
||||
Json(NewUserResponse::Ok {
|
||||
|
|
|
@ -8,21 +8,30 @@ use axum::{
|
|||
response::Response,
|
||||
};
|
||||
use eyre::{bail, Context, OptionExt};
|
||||
use schema::{Error, LoginRequest, LoginResponse, Online, Version, WallInfo};
|
||||
use haku::value::Value;
|
||||
use schema::{
|
||||
ChunkInfo, Error, LoginRequest, LoginResponse, Notify, Online, Request, Version, WallInfo,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::select;
|
||||
use tokio::{select, sync::mpsc, time::Instant};
|
||||
use tracing::{error, info};
|
||||
|
||||
use crate::{
|
||||
haku::{Haku, Limits},
|
||||
login::database::LoginStatus,
|
||||
wall::{Event, JoinError, Session},
|
||||
Databases,
|
||||
schema::Vec2,
|
||||
wall::{
|
||||
self, chunk_encoder::ChunkEncoder, chunk_iterator::ChunkIterator, ChunkPosition, JoinError,
|
||||
SessionHandle, UserInit, Wall,
|
||||
},
|
||||
};
|
||||
|
||||
use super::Api;
|
||||
|
||||
mod schema;
|
||||
|
||||
pub async fn wall(State(dbs): State<Arc<Databases>>, ws: WebSocketUpgrade) -> Response {
|
||||
ws.on_upgrade(|ws| websocket(dbs, ws))
|
||||
pub async fn wall(State(api): State<Arc<Api>>, ws: WebSocketUpgrade) -> Response {
|
||||
ws.on_upgrade(|ws| websocket(api, ws))
|
||||
}
|
||||
|
||||
fn to_message<T>(value: &T) -> Message
|
||||
|
@ -51,8 +60,8 @@ async fn recv_expect(ws: &mut WebSocket) -> eyre::Result<Message> {
|
|||
.ok_or_eyre("connection closed unexpectedly")??)
|
||||
}
|
||||
|
||||
async fn websocket(dbs: Arc<Databases>, mut ws: WebSocket) {
|
||||
match fallible_websocket(dbs, &mut ws).await {
|
||||
async fn websocket(api: Arc<Api>, mut ws: WebSocket) {
|
||||
match fallible_websocket(api, &mut ws).await {
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
_ = ws
|
||||
|
@ -64,7 +73,7 @@ async fn websocket(dbs: Arc<Databases>, mut ws: WebSocket) {
|
|||
}
|
||||
}
|
||||
|
||||
async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Result<()> {
|
||||
async fn fallible_websocket(api: Arc<Api>, ws: &mut WebSocket) -> eyre::Result<()> {
|
||||
#[cfg(debug_assertions)]
|
||||
let version = format!("{}-dev", env!("CARGO_PKG_VERSION"));
|
||||
#[cfg(not(debug_assertions))]
|
||||
|
@ -73,9 +82,10 @@ async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Re
|
|||
ws.send(to_message(&Version { version })).await?;
|
||||
|
||||
let login_request: LoginRequest = from_message(&recv_expect(ws).await?)?;
|
||||
let user_id = *login_request.user_id();
|
||||
let user_id = login_request.user;
|
||||
|
||||
match dbs
|
||||
match api
|
||||
.dbs
|
||||
.login
|
||||
.log_in(user_id)
|
||||
.await
|
||||
|
@ -88,14 +98,24 @@ async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Re
|
|||
return Ok(());
|
||||
}
|
||||
}
|
||||
let user_info = api
|
||||
.dbs
|
||||
.login
|
||||
.user_info(user_id)
|
||||
.await
|
||||
.context("cannot get user info")?
|
||||
.ok_or_eyre("user seems to have vanished")?;
|
||||
|
||||
let wall_id = match login_request {
|
||||
LoginRequest::New { .. } => dbs.wall_broker.generate_id().await,
|
||||
LoginRequest::Join { wall, .. } => wall,
|
||||
let wall_id = match login_request.wall {
|
||||
Some(wall) => wall,
|
||||
None => api.dbs.wall_broker.generate_id().await,
|
||||
};
|
||||
let wall = dbs.wall_broker.open(wall_id);
|
||||
let open_wall = api.dbs.wall_broker.open(wall_id);
|
||||
|
||||
let mut session_handle = match wall.join(Session::new(user_id)) {
|
||||
let session_handle = match open_wall
|
||||
.wall
|
||||
.join(wall::Session::new(user_id, login_request.init.clone()))
|
||||
{
|
||||
Ok(handle) => handle,
|
||||
Err(error) => {
|
||||
ws.send(to_message(&match error {
|
||||
|
@ -110,8 +130,8 @@ async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Re
|
|||
};
|
||||
|
||||
let mut users_online = vec![];
|
||||
for online in wall.online() {
|
||||
let user_info = match dbs.login.user_info(online.user_id).await {
|
||||
for online in open_wall.wall.online() {
|
||||
let user_info = match api.dbs.login.user_info(online.user_id).await {
|
||||
Ok(Some(user_info)) => user_info,
|
||||
Ok(None) | Err(_) => {
|
||||
error!(?online, "could not get info about online user");
|
||||
|
@ -122,6 +142,9 @@ async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Re
|
|||
session_id: online.session_id,
|
||||
nickname: user_info.nickname,
|
||||
cursor: online.cursor,
|
||||
init: UserInit {
|
||||
brush: online.brush,
|
||||
},
|
||||
})
|
||||
}
|
||||
let users_online = users_online;
|
||||
|
@ -129,25 +152,278 @@ async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Re
|
|||
ws.send(to_message(&LoginResponse::LoggedIn {
|
||||
wall: wall_id,
|
||||
wall_info: WallInfo {
|
||||
chunk_size: wall.settings().chunk_size,
|
||||
chunk_size: open_wall.wall.settings().chunk_size,
|
||||
paint_area: open_wall.wall.settings().paint_area,
|
||||
online: users_online,
|
||||
haku_limits: api.config.haku.clone(),
|
||||
},
|
||||
session_id: session_handle.session_id,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
loop {
|
||||
select! {
|
||||
Some(message) = ws.recv() => {
|
||||
let kind = from_message(&message?)?;
|
||||
wall.event(Event { session_id: session_handle.session_id, kind });
|
||||
open_wall.wall.event(wall::Event {
|
||||
session_id: session_handle.session_id,
|
||||
kind: wall::EventKind::Join {
|
||||
nickname: user_info.nickname,
|
||||
init: login_request.init.clone(),
|
||||
},
|
||||
});
|
||||
// Leave event is sent in SessionHandle's Drop implementation.
|
||||
// This technically means that inbetween the user getting logged in and sending the Join event,
|
||||
// we may end up losing the user and sending a Leave event, but Leave events are idempotent -
|
||||
// they're only used to clean up the state of an existing user, but the user is not _required_
|
||||
// to exist on clients already.
|
||||
// ...Well, we'll see how much havoc that wreaks in practice. Especially without TypeScript
|
||||
// to remind us about unexpected nulls.
|
||||
|
||||
SessionLoop::start(
|
||||
open_wall.wall,
|
||||
open_wall.chunk_encoder,
|
||||
session_handle,
|
||||
api.config.haku.clone(),
|
||||
login_request.init.brush,
|
||||
)
|
||||
.await?
|
||||
.event_loop(ws)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct SessionLoop {
|
||||
wall: Arc<Wall>,
|
||||
chunk_encoder: Arc<ChunkEncoder>,
|
||||
handle: SessionHandle,
|
||||
render_commands_tx: mpsc::Sender<RenderCommand>,
|
||||
viewport_chunks: ChunkIterator,
|
||||
}
|
||||
|
||||
enum RenderCommand {
|
||||
SetBrush { brush: String },
|
||||
Plot { points: Vec<Vec2> },
|
||||
}
|
||||
|
||||
impl SessionLoop {
|
||||
async fn start(
|
||||
wall: Arc<Wall>,
|
||||
chunk_encoder: Arc<ChunkEncoder>,
|
||||
handle: SessionHandle,
|
||||
limits: Limits,
|
||||
brush: String,
|
||||
) -> eyre::Result<Self> {
|
||||
// Limit how many commands may come in _pretty darn hard_ because these can be really
|
||||
// CPU-intensive.
|
||||
// If this ends up dropping commands - it's your fault for trying to DoS my server!
|
||||
let (render_commands_tx, render_commands_rx) = mpsc::channel(1);
|
||||
|
||||
render_commands_tx
|
||||
.send(RenderCommand::SetBrush { brush })
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// We spawn our own thread so as not to clog the tokio blocking thread pool with our
|
||||
// rendering shenanigans.
|
||||
std::thread::Builder::new()
|
||||
.name(String::from("haku render thread"))
|
||||
.spawn({
|
||||
let wall = Arc::clone(&wall);
|
||||
let chunk_encoder = Arc::clone(&chunk_encoder);
|
||||
move || Self::render_thread(wall, chunk_encoder, limits, render_commands_rx)
|
||||
})
|
||||
.context("could not spawn render thread")?;
|
||||
|
||||
Ok(Self {
|
||||
wall,
|
||||
chunk_encoder,
|
||||
handle,
|
||||
render_commands_tx,
|
||||
viewport_chunks: ChunkIterator::new(ChunkPosition::new(0, 0), ChunkPosition::new(0, 0)),
|
||||
})
|
||||
}
|
||||
|
||||
async fn event_loop(&mut self, ws: &mut WebSocket) -> eyre::Result<()> {
|
||||
loop {
|
||||
select! {
|
||||
Some(message) = ws.recv() => {
|
||||
let request = from_message(&message?)?;
|
||||
self.process_request(ws, request).await?;
|
||||
}
|
||||
|
||||
Ok(wall_event) = self.handle.event_receiver.recv() => {
|
||||
ws.send(to_message(&Notify::Wall { wall_event })).await?;
|
||||
}
|
||||
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process_request(&mut self, ws: &mut WebSocket, request: Request) -> eyre::Result<()> {
|
||||
match request {
|
||||
Request::Wall { wall_event } => {
|
||||
match &wall_event {
|
||||
// This match only concerns itself with drawing-related events to offload
|
||||
// all the evaluation and drawing work to this session's drawing thread.
|
||||
wall::EventKind::Join { .. }
|
||||
| wall::EventKind::Leave
|
||||
| wall::EventKind::Cursor { .. } => (),
|
||||
|
||||
wall::EventKind::SetBrush { brush } => {
|
||||
// SetBrush is not dropped because it is a very important event.
|
||||
_ = self
|
||||
.render_commands_tx
|
||||
.send(RenderCommand::SetBrush {
|
||||
brush: brush.clone(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
wall::EventKind::Plot { points } => {
|
||||
// We drop commands if we take too long to render instead of lagging
|
||||
// the WebSocket thread.
|
||||
// Theoretically this will yield much better responsiveness, but it _will_
|
||||
// result in some visual glitches if we're getting bottlenecked.
|
||||
_ = self.render_commands_tx.try_send(RenderCommand::Plot {
|
||||
points: points.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
self.wall.event(wall::Event {
|
||||
session_id: self.handle.session_id,
|
||||
kind: wall_event,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(event) = session_handle.event_receiver.recv() => {
|
||||
ws.send(to_message(&event)).await?;
|
||||
Request::Viewport {
|
||||
top_left,
|
||||
bottom_right,
|
||||
} => {
|
||||
self.viewport_chunks = ChunkIterator::new(top_left, bottom_right);
|
||||
self.send_chunks(ws).await?;
|
||||
}
|
||||
|
||||
else => break,
|
||||
Request::MoreChunks => {
|
||||
self.send_chunks(ws).await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_chunks(&mut self, ws: &mut WebSocket) -> eyre::Result<()> {
|
||||
let mut chunk_infos = vec![];
|
||||
let mut packet = vec![];
|
||||
|
||||
// Number of chunks iterated is limited to 300 per packet, so as not to let the client
|
||||
// stall the server by sending in a huge viewport.
|
||||
for _ in 0..300 {
|
||||
if let Some(position) = self.viewport_chunks.next() {
|
||||
if let Some(encoded) = self.chunk_encoder.encoded(position).await {
|
||||
let offset = packet.len();
|
||||
packet.extend_from_slice(&encoded);
|
||||
chunk_infos.push(ChunkInfo {
|
||||
position,
|
||||
offset: u32::try_from(offset).context("packet too big")?,
|
||||
length: u32::try_from(encoded.len()).context("chunk image too big")?,
|
||||
});
|
||||
|
||||
// The actual number of chunks per packet is limited by the packet's size, which
|
||||
// we don't want to be too big, to maintain responsiveness - the client will
|
||||
// only request more chunks once per frame, so interactions still have time to
|
||||
// execute. We cap it to 256KiB in hopes that noone has Internet slow enough for
|
||||
// this to cause a disconnect.
|
||||
if packet.len() >= 256 * 1024 {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Length 0 indicates the server acknowledged the chunk, but it has no
|
||||
// image data.
|
||||
// This is used by clients to know that the chunk doesn't need downloading.
|
||||
chunk_infos.push(ChunkInfo {
|
||||
position,
|
||||
offset: 0,
|
||||
length: 0,
|
||||
});
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ws.send(to_message(&Notify::Chunks {
|
||||
chunks: chunk_infos,
|
||||
}))
|
||||
.await?;
|
||||
ws.send(Message::Binary(packet)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn render_thread(
|
||||
wall: Arc<Wall>,
|
||||
chunk_encoder: Arc<ChunkEncoder>,
|
||||
limits: Limits,
|
||||
mut commands: mpsc::Receiver<RenderCommand>,
|
||||
) {
|
||||
let mut haku = Haku::new(limits);
|
||||
let mut brush_ok = false;
|
||||
|
||||
while let Some(command) = commands.blocking_recv() {
|
||||
match command {
|
||||
RenderCommand::SetBrush { brush } => {
|
||||
brush_ok = haku.set_brush(&brush).is_ok();
|
||||
}
|
||||
RenderCommand::Plot { points } => {
|
||||
if brush_ok {
|
||||
if let Ok(value) = haku.eval_brush() {
|
||||
for point in points {
|
||||
// Ignore the result. It's better if we render _something_ rather
|
||||
// than nothing.
|
||||
_ = draw_to_chunks(&haku, value, point, &wall, &chunk_encoder);
|
||||
}
|
||||
haku.reset_vm();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn draw_to_chunks(
|
||||
haku: &Haku,
|
||||
value: Value,
|
||||
center: Vec2,
|
||||
wall: &Wall,
|
||||
chunk_encoder: &ChunkEncoder,
|
||||
) -> eyre::Result<()> {
|
||||
let settings = wall.settings();
|
||||
|
||||
let chunk_size = settings.chunk_size as f32;
|
||||
let paint_area = settings.paint_area as f32;
|
||||
|
||||
let left = center.x - paint_area / 2.0;
|
||||
let top = center.y - paint_area / 2.0;
|
||||
|
||||
let left_chunk = f32::floor(left / chunk_size) as i32;
|
||||
let top_chunk = f32::floor(top / chunk_size) as i32;
|
||||
let right_chunk = f32::ceil((left + paint_area) / chunk_size) as i32;
|
||||
let bottom_chunk = f32::ceil((top + paint_area) / chunk_size) as i32;
|
||||
for chunk_y in top_chunk..bottom_chunk {
|
||||
for chunk_x in left_chunk..right_chunk {
|
||||
let x = f32::floor(-chunk_x as f32 * chunk_size + center.x);
|
||||
let y = f32::floor(-chunk_y as f32 * chunk_size + center.y);
|
||||
let chunk_ref = wall.get_or_create_chunk(ChunkPosition::new(chunk_x, chunk_y));
|
||||
let mut chunk = chunk_ref.blocking_lock();
|
||||
haku.render_value(&mut chunk.pixmap, value, Vec2 { x, y })?;
|
||||
}
|
||||
}
|
||||
|
||||
for chunk_y in top_chunk..bottom_chunk {
|
||||
for chunk_x in left_chunk..right_chunk {
|
||||
chunk_encoder.invalidate_blocking(ChunkPosition::new(chunk_x, chunk_y))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
|
|||
use crate::{
|
||||
login::UserId,
|
||||
schema::Vec2,
|
||||
wall::{self, SessionId, WallId},
|
||||
wall::{self, ChunkPosition, SessionId, UserInit, WallId},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||
|
@ -19,23 +19,12 @@ pub struct Error {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
|
||||
#[serde(
|
||||
tag = "login",
|
||||
rename_all = "camelCase",
|
||||
rename_all_fields = "camelCase"
|
||||
)]
|
||||
pub enum LoginRequest {
|
||||
New { user: UserId },
|
||||
Join { user: UserId, wall: WallId },
|
||||
}
|
||||
|
||||
impl LoginRequest {
|
||||
pub fn user_id(&self) -> &UserId {
|
||||
match self {
|
||||
LoginRequest::New { user } => user,
|
||||
LoginRequest::Join { user, .. } => user,
|
||||
}
|
||||
}
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LoginRequest {
|
||||
pub user: UserId,
|
||||
/// If null, a new wall is created.
|
||||
pub wall: Option<WallId>,
|
||||
pub init: UserInit,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
@ -44,12 +33,15 @@ pub struct Online {
|
|||
pub session_id: SessionId,
|
||||
pub nickname: String,
|
||||
pub cursor: Option<Vec2>,
|
||||
pub init: UserInit,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WallInfo {
|
||||
pub chunk_size: u32,
|
||||
pub paint_area: u32,
|
||||
pub haku_limits: crate::haku::Limits,
|
||||
pub online: Vec<Online>,
|
||||
}
|
||||
|
||||
|
@ -68,3 +60,40 @@ pub enum LoginResponse {
|
|||
UserDoesNotExist,
|
||||
TooManySessions,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(
|
||||
tag = "request",
|
||||
rename_all = "camelCase",
|
||||
rename_all_fields = "camelCase"
|
||||
)]
|
||||
pub enum Request {
|
||||
Wall {
|
||||
wall_event: wall::EventKind,
|
||||
},
|
||||
|
||||
Viewport {
|
||||
top_left: ChunkPosition,
|
||||
bottom_right: ChunkPosition,
|
||||
},
|
||||
|
||||
MoreChunks,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct ChunkInfo {
|
||||
pub position: ChunkPosition,
|
||||
pub offset: u32,
|
||||
pub length: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
#[serde(
|
||||
tag = "notify",
|
||||
rename_all = "camelCase",
|
||||
rename_all_fields = "camelCase"
|
||||
)]
|
||||
pub enum Notify {
|
||||
Wall { wall_event: wall::Event },
|
||||
Chunks { chunks: Vec<ChunkInfo> },
|
||||
}
|
||||
|
|
|
@ -5,4 +5,5 @@ use crate::wall;
|
|||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct Config {
|
||||
pub wall: wall::Settings,
|
||||
pub haku: crate::haku::Limits,
|
||||
}
|
||||
|
|
163
crates/rkgk/src/haku.rs
Normal file
163
crates/rkgk/src/haku.rs
Normal file
|
@ -0,0 +1,163 @@
|
|||
//! High-level wrapper for Haku.
|
||||
|
||||
// TODO: This should be used as the basis for haku-wasm as well as haku tests in the future to
|
||||
// avoid duplicating code.
|
||||
|
||||
use eyre::{bail, Context, OptionExt};
|
||||
use haku::{
|
||||
bytecode::{Chunk, Defs, DefsImage},
|
||||
compiler::{Compiler, Source},
|
||||
render::{tiny_skia::Pixmap, Renderer, RendererLimits},
|
||||
sexp::{Ast, Parser},
|
||||
system::{ChunkId, System, SystemImage},
|
||||
value::{BytecodeLoc, Closure, FunctionName, Ref, Value},
|
||||
vm::{Vm, VmImage, VmLimits},
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::schema::Vec2;
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
// NOTE: For serialization, this struct does _not_ have serde(rename_all = "camelCase") on it,
|
||||
// because we do some dynamic typing magic over on the JavaScript side to automatically call all
|
||||
// the appropriate functions for setting these limits on the client side.
|
||||
pub struct Limits {
|
||||
pub max_chunks: usize,
|
||||
pub max_defs: usize,
|
||||
pub ast_capacity: usize,
|
||||
pub chunk_capacity: usize,
|
||||
pub stack_capacity: usize,
|
||||
pub call_stack_capacity: usize,
|
||||
pub ref_capacity: usize,
|
||||
pub fuel: usize,
|
||||
pub pixmap_stack_capacity: usize,
|
||||
pub transform_stack_capacity: usize,
|
||||
}
|
||||
|
||||
pub struct Haku {
|
||||
limits: Limits,
|
||||
|
||||
system: System,
|
||||
system_image: SystemImage,
|
||||
defs: Defs,
|
||||
defs_image: DefsImage,
|
||||
vm: Vm,
|
||||
vm_image: VmImage,
|
||||
|
||||
brush: Option<ChunkId>,
|
||||
}
|
||||
|
||||
impl Haku {
|
||||
pub fn new(limits: Limits) -> Self {
|
||||
let system = System::new(limits.max_chunks);
|
||||
let defs = Defs::new(limits.max_defs);
|
||||
let vm = Vm::new(
|
||||
&defs,
|
||||
&VmLimits {
|
||||
stack_capacity: limits.stack_capacity,
|
||||
call_stack_capacity: limits.call_stack_capacity,
|
||||
ref_capacity: limits.ref_capacity,
|
||||
fuel: limits.fuel,
|
||||
},
|
||||
);
|
||||
|
||||
let system_image = system.image();
|
||||
let defs_image = defs.image();
|
||||
let vm_image = vm.image();
|
||||
|
||||
Self {
|
||||
limits,
|
||||
system,
|
||||
system_image,
|
||||
defs,
|
||||
defs_image,
|
||||
vm,
|
||||
vm_image,
|
||||
brush: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn reset(&mut self) {
|
||||
self.system.restore_image(&self.system_image);
|
||||
self.defs.restore_image(&self.defs_image);
|
||||
}
|
||||
|
||||
pub fn set_brush(&mut self, code: &str) -> eyre::Result<()> {
|
||||
self.reset();
|
||||
|
||||
let ast = Ast::new(self.limits.ast_capacity);
|
||||
let mut parser = Parser::new(ast, code);
|
||||
let root = haku::sexp::parse_toplevel(&mut parser);
|
||||
let ast = parser.ast;
|
||||
|
||||
let src = Source {
|
||||
code,
|
||||
ast: &ast,
|
||||
system: &self.system,
|
||||
};
|
||||
|
||||
let mut chunk = Chunk::new(self.limits.chunk_capacity)
|
||||
.expect("chunk capacity must be representable as a 16-bit number");
|
||||
let mut compiler = Compiler::new(&mut self.defs, &mut chunk);
|
||||
haku::compiler::compile_expr(&mut compiler, &src, root)
|
||||
.context("failed to compile the chunk")?;
|
||||
|
||||
if !compiler.diagnostics.is_empty() {
|
||||
bail!("diagnostics were emitted");
|
||||
}
|
||||
|
||||
let chunk_id = self.system.add_chunk(chunk).context("too many chunks")?;
|
||||
self.brush = Some(chunk_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn eval_brush(&mut self) -> eyre::Result<Value> {
|
||||
let brush = self
|
||||
.brush
|
||||
.ok_or_eyre("brush is not compiled and ready to be used")?;
|
||||
|
||||
let closure_id = self
|
||||
.vm
|
||||
.create_ref(Ref::Closure(Closure {
|
||||
start: BytecodeLoc {
|
||||
chunk_id: brush,
|
||||
offset: 0,
|
||||
},
|
||||
name: FunctionName::Anonymous,
|
||||
param_count: 0,
|
||||
captures: vec![],
|
||||
}))
|
||||
.context("not enough ref slots to create initial closure")?;
|
||||
|
||||
let scribble = self
|
||||
.vm
|
||||
.run(&self.system, closure_id)
|
||||
.context("an exception occurred while evaluating the scribble")?;
|
||||
|
||||
Ok(scribble)
|
||||
}
|
||||
|
||||
pub fn render_value(
|
||||
&self,
|
||||
pixmap: &mut Pixmap,
|
||||
value: Value,
|
||||
translation: Vec2,
|
||||
) -> eyre::Result<()> {
|
||||
let mut renderer = Renderer::new(
|
||||
pixmap,
|
||||
&RendererLimits {
|
||||
pixmap_stack_capacity: self.limits.pixmap_stack_capacity,
|
||||
transform_stack_capacity: self.limits.transform_stack_capacity,
|
||||
},
|
||||
);
|
||||
renderer.translate(translation.x, translation.y);
|
||||
let result = renderer.render(&self.vm, value);
|
||||
|
||||
result.context("an exception occurred while rendering the scribble")
|
||||
}
|
||||
|
||||
pub fn reset_vm(&mut self) {
|
||||
self.vm.restore_image(&self.vm_image);
|
||||
}
|
||||
}
|
|
@ -5,7 +5,7 @@ use base64::Engine;
|
|||
pub fn serialize(f: &mut fmt::Formatter<'_>, prefix: &str, bytes: &[u8; 32]) -> fmt::Result {
|
||||
f.write_str(prefix)?;
|
||||
let mut buffer = [b'0'; 43];
|
||||
base64::engine::general_purpose::STANDARD_NO_PAD
|
||||
base64::engine::general_purpose::URL_SAFE_NO_PAD
|
||||
.encode_slice(bytes, &mut buffer)
|
||||
.unwrap();
|
||||
f.write_str(std::str::from_utf8(&buffer).unwrap())?;
|
||||
|
@ -17,7 +17,7 @@ pub struct InvalidId;
|
|||
pub fn deserialize(s: &str, prefix: &str) -> Result<[u8; 32], InvalidId> {
|
||||
let mut bytes = [0; 32];
|
||||
let b64 = s.strip_prefix(prefix).ok_or(InvalidId)?;
|
||||
let decoded = base64::engine::general_purpose::STANDARD_NO_PAD
|
||||
let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
|
||||
.decode_slice(b64, &mut bytes)
|
||||
.map_err(|_| InvalidId)?;
|
||||
if decoded != bytes.len() {
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::{
|
|||
sync::Arc,
|
||||
};
|
||||
|
||||
use api::Api;
|
||||
use axum::Router;
|
||||
use config::Config;
|
||||
use copy_dir::copy_dir;
|
||||
|
@ -16,6 +17,7 @@ use tracing_subscriber::fmt::format::FmtSpan;
|
|||
mod api;
|
||||
mod binary;
|
||||
mod config;
|
||||
mod haku;
|
||||
mod id;
|
||||
#[cfg(debug_assertions)]
|
||||
mod live_reload;
|
||||
|
@ -24,6 +26,11 @@ pub mod schema;
|
|||
mod serialization;
|
||||
mod wall;
|
||||
|
||||
#[cfg(feature = "memory-profiling")]
|
||||
#[global_allocator]
|
||||
static GLOBAL_ALLOCATOR: tracy_client::ProfiledAllocator<std::alloc::System> =
|
||||
tracy_client::ProfiledAllocator::new(std::alloc::System, 100);
|
||||
|
||||
struct Paths<'a> {
|
||||
target_dir: &'a Path,
|
||||
database_dir: &'a Path,
|
||||
|
@ -81,13 +88,15 @@ async fn fallible_main() -> eyre::Result<()> {
|
|||
build(&paths)?;
|
||||
let dbs = Arc::new(database(&config, &paths)?);
|
||||
|
||||
let api = Arc::new(Api { config, dbs });
|
||||
|
||||
let app = Router::new()
|
||||
.route_service(
|
||||
"/",
|
||||
ServeFile::new(paths.target_dir.join("static/index.html")),
|
||||
)
|
||||
.nest_service("/static", ServeDir::new(paths.target_dir.join("static")))
|
||||
.nest("/api", api::router(dbs.clone()));
|
||||
.nest("/api", api::router(api));
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let app = app.nest("/dev/live-reload", live_reload::router());
|
||||
|
@ -103,6 +112,9 @@ async fn fallible_main() -> eyre::Result<()> {
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
#[cfg(feature = "memory-profiling")]
|
||||
let _client = tracy_client::Client::start();
|
||||
|
||||
color_eyre::install().unwrap();
|
||||
tracing_subscriber::fmt()
|
||||
.with_span_events(FmtSpan::ACTIVE)
|
||||
|
|
|
@ -13,10 +13,13 @@ use haku::render::tiny_skia::Pixmap;
|
|||
use rand::RngCore;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::{broadcast, Mutex};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{id, login::UserId, schema::Vec2, serialization::DeserializeFromStr};
|
||||
|
||||
pub mod broker;
|
||||
pub mod chunk_encoder;
|
||||
pub mod chunk_iterator;
|
||||
|
||||
pub use broker::Broker;
|
||||
|
||||
|
@ -79,8 +82,20 @@ impl fmt::Display for InvalidWallId {
|
|||
|
||||
impl Error for InvalidWallId {}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
|
||||
pub struct ChunkPosition {
|
||||
pub x: i32,
|
||||
pub y: i32,
|
||||
}
|
||||
|
||||
impl ChunkPosition {
|
||||
pub fn new(x: i32, y: i32) -> Self {
|
||||
Self { x, y }
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Chunk {
|
||||
pixmap: Pixmap,
|
||||
pub pixmap: Pixmap,
|
||||
}
|
||||
|
||||
impl Chunk {
|
||||
|
@ -95,13 +110,23 @@ impl Chunk {
|
|||
pub struct Settings {
|
||||
pub max_chunks: usize,
|
||||
pub max_sessions: usize,
|
||||
pub paint_area: u32,
|
||||
pub chunk_size: u32,
|
||||
}
|
||||
|
||||
impl Settings {
|
||||
pub fn chunk_at(&self, position: Vec2) -> ChunkPosition {
|
||||
ChunkPosition::new(
|
||||
f32::floor(position.x / self.chunk_size as f32) as i32,
|
||||
f32::floor(position.y / self.chunk_size as f32) as i32,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Wall {
|
||||
settings: Settings,
|
||||
|
||||
chunks: DashMap<(i32, i32), Arc<Mutex<Chunk>>>,
|
||||
chunks: DashMap<ChunkPosition, Arc<Mutex<Chunk>>>,
|
||||
|
||||
sessions: DashMap<SessionId, Session>,
|
||||
session_id_counter: AtomicU32,
|
||||
|
@ -109,9 +134,17 @@ pub struct Wall {
|
|||
event_sender: broadcast::Sender<Event>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UserInit {
|
||||
// Provide a brush upon initialization, so that the user always has a valid brush set.
|
||||
pub brush: String,
|
||||
}
|
||||
|
||||
pub struct Session {
|
||||
pub user_id: UserId,
|
||||
pub cursor: Option<Vec2>,
|
||||
pub brush: String,
|
||||
}
|
||||
|
||||
pub struct SessionHandle {
|
||||
|
@ -134,6 +167,9 @@ pub struct Event {
|
|||
rename_all_fields = "camelCase"
|
||||
)]
|
||||
pub enum EventKind {
|
||||
Join { nickname: String, init: UserInit },
|
||||
Leave,
|
||||
|
||||
Cursor { position: Vec2 },
|
||||
|
||||
SetBrush { brush: String },
|
||||
|
@ -146,6 +182,7 @@ pub struct Online {
|
|||
pub session_id: SessionId,
|
||||
pub user_id: UserId,
|
||||
pub cursor: Option<Vec2>,
|
||||
pub brush: String,
|
||||
}
|
||||
|
||||
impl Wall {
|
||||
|
@ -163,11 +200,11 @@ impl Wall {
|
|||
&self.settings
|
||||
}
|
||||
|
||||
pub fn get_chunk(&self, at: (i32, i32)) -> Option<Arc<Mutex<Chunk>>> {
|
||||
pub fn get_chunk(&self, at: ChunkPosition) -> Option<Arc<Mutex<Chunk>>> {
|
||||
self.chunks.get(&at).map(|chunk| Arc::clone(&chunk))
|
||||
}
|
||||
|
||||
pub fn get_or_create_chunk(&self, at: (i32, i32)) -> Arc<Mutex<Chunk>> {
|
||||
pub fn get_or_create_chunk(&self, at: ChunkPosition) -> Arc<Mutex<Chunk>> {
|
||||
Arc::clone(
|
||||
&self
|
||||
.chunks
|
||||
|
@ -198,6 +235,7 @@ impl Wall {
|
|||
session_id: *r.key(),
|
||||
user_id: r.user_id,
|
||||
cursor: r.value().cursor,
|
||||
brush: r.value().brush.clone(),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
@ -205,12 +243,17 @@ impl Wall {
|
|||
pub fn event(&self, event: Event) {
|
||||
if let Some(mut session) = self.sessions.get_mut(&event.session_id) {
|
||||
match &event.kind {
|
||||
EventKind::SetBrush { brush } => {}
|
||||
// Join and Leave are events that only get broadcasted through the wall such that
|
||||
// all users get them. We don't need to react to them in any way.
|
||||
EventKind::Join { .. } | EventKind::Leave => (),
|
||||
|
||||
EventKind::Cursor { position } => {
|
||||
session.cursor = Some(*position);
|
||||
}
|
||||
EventKind::Plot { points } => {}
|
||||
|
||||
// Drawing events are handled by the owner session's thread to make drawing as
|
||||
// parallel as possible.
|
||||
EventKind::SetBrush { .. } | EventKind::Plot { .. } => (),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -219,10 +262,11 @@ impl Wall {
|
|||
}
|
||||
|
||||
impl Session {
|
||||
pub fn new(user_id: UserId) -> Self {
|
||||
pub fn new(user_id: UserId, user_init: UserInit) -> Self {
|
||||
Self {
|
||||
user_id,
|
||||
cursor: None,
|
||||
brush: user_init.brush,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -231,6 +275,10 @@ impl Drop for SessionHandle {
|
|||
fn drop(&mut self) {
|
||||
if let Some(wall) = self.wall.upgrade() {
|
||||
wall.sessions.remove(&self.session_id);
|
||||
wall.event(Event {
|
||||
session_id: self.session_id,
|
||||
kind: EventKind::Leave,
|
||||
});
|
||||
// After the session is removed, the wall will be garbage collected later.
|
||||
}
|
||||
}
|
||||
|
@ -240,7 +288,3 @@ pub enum JoinError {
|
|||
TooManyCurrentSessions,
|
||||
IdsExhausted,
|
||||
}
|
||||
|
||||
pub enum EventError {
|
||||
DeadSession,
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ use rand_chacha::ChaCha20Rng;
|
|||
use tokio::sync::Mutex;
|
||||
use tracing::info;
|
||||
|
||||
use super::{Settings, Wall, WallId};
|
||||
use super::{chunk_encoder::ChunkEncoder, Settings, Wall, WallId};
|
||||
|
||||
/// The broker is the main way to access wall data.
|
||||
///
|
||||
|
@ -18,8 +18,10 @@ pub struct Broker {
|
|||
rng: Mutex<ChaCha20Rng>,
|
||||
}
|
||||
|
||||
struct OpenWall {
|
||||
wall: Arc<Wall>,
|
||||
#[derive(Clone)]
|
||||
pub struct OpenWall {
|
||||
pub wall: Arc<Wall>,
|
||||
pub chunk_encoder: Arc<ChunkEncoder>,
|
||||
}
|
||||
|
||||
impl Broker {
|
||||
|
@ -39,15 +41,14 @@ impl Broker {
|
|||
WallId::new(&mut *rng)
|
||||
}
|
||||
|
||||
pub fn open(&self, wall_id: WallId) -> Arc<Wall> {
|
||||
Arc::clone(
|
||||
&self
|
||||
.open_walls
|
||||
.entry(wall_id)
|
||||
.or_insert_with(|| OpenWall {
|
||||
wall: Arc::new(Wall::new(self.wall_settings)),
|
||||
})
|
||||
.wall,
|
||||
)
|
||||
pub fn open(&self, wall_id: WallId) -> OpenWall {
|
||||
let wall = Arc::new(Wall::new(self.wall_settings));
|
||||
self.open_walls
|
||||
.entry(wall_id)
|
||||
.or_insert_with(|| OpenWall {
|
||||
chunk_encoder: Arc::new(ChunkEncoder::start(Arc::clone(&wall))),
|
||||
wall,
|
||||
})
|
||||
.clone()
|
||||
}
|
||||
}
|
||||
|
|
104
crates/rkgk/src/wall/chunk_encoder.rs
Normal file
104
crates/rkgk/src/wall/chunk_encoder.rs
Normal file
|
@ -0,0 +1,104 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use indexmap::IndexMap;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
use super::{ChunkPosition, Wall};
|
||||
|
||||
/// Service which encodes chunks to WebP images and caches them in an LRU fashion.
|
||||
pub struct ChunkEncoder {
|
||||
commands_tx: mpsc::Sender<Command>,
|
||||
}
|
||||
|
||||
enum Command {
|
||||
GetEncoded {
|
||||
chunk: ChunkPosition,
|
||||
reply: oneshot::Sender<Option<Arc<[u8]>>>,
|
||||
},
|
||||
|
||||
Invalidate {
|
||||
chunk: ChunkPosition,
|
||||
},
|
||||
}
|
||||
|
||||
impl ChunkEncoder {
|
||||
pub fn start(wall: Arc<Wall>) -> Self {
|
||||
let (commands_tx, commands_rx) = mpsc::channel(32);
|
||||
|
||||
tokio::spawn(Self::service(wall, commands_rx));
|
||||
|
||||
Self { commands_tx }
|
||||
}
|
||||
|
||||
pub async fn encoded(&self, chunk: ChunkPosition) -> Option<Arc<[u8]>> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.commands_tx
|
||||
.send(Command::GetEncoded { chunk, reply: tx })
|
||||
.await
|
||||
.ok()?;
|
||||
rx.await.ok().flatten()
|
||||
}
|
||||
|
||||
pub async fn invalidate(&self, chunk: ChunkPosition) {
|
||||
_ = self.commands_tx.send(Command::Invalidate { chunk }).await;
|
||||
}
|
||||
|
||||
pub fn invalidate_blocking(&self, chunk: ChunkPosition) {
|
||||
_ = self
|
||||
.commands_tx
|
||||
.blocking_send(Command::Invalidate { chunk });
|
||||
}
|
||||
|
||||
async fn encode(wall: &Wall, chunk: ChunkPosition) -> Option<Arc<[u8]>> {
|
||||
let pixmap = {
|
||||
// Clone out the pixmap to avoid unnecessary chunk mutex contention while the
|
||||
// chunk is being encoded.
|
||||
let chunk_ref = wall.get_chunk(chunk)?;
|
||||
let chunk = chunk_ref.lock().await;
|
||||
chunk.pixmap.clone()
|
||||
};
|
||||
|
||||
let image = tokio::task::spawn_blocking(move || {
|
||||
let webp = webp::Encoder::new(
|
||||
pixmap.data(),
|
||||
webp::PixelLayout::Rgba,
|
||||
pixmap.width(),
|
||||
pixmap.height(),
|
||||
);
|
||||
// NOTE: There's an unnecessary copy here. Wonder if that kills performance much.
|
||||
webp.encode_lossless().to_vec()
|
||||
})
|
||||
.await
|
||||
.ok()?;
|
||||
|
||||
Some(Arc::from(image))
|
||||
}
|
||||
|
||||
async fn service(wall: Arc<Wall>, mut commands_rx: mpsc::Receiver<Command>) {
|
||||
let mut encoded_lru: IndexMap<ChunkPosition, Option<Arc<[u8]>>> = IndexMap::new();
|
||||
|
||||
while let Some(command) = commands_rx.recv().await {
|
||||
match command {
|
||||
Command::GetEncoded { chunk, reply } => {
|
||||
if let Some(encoded) = encoded_lru.get(&chunk) {
|
||||
_ = reply.send(encoded.clone())
|
||||
} else {
|
||||
let encoded = Self::encode(&wall, chunk).await;
|
||||
// TODO: Make this capacity configurable.
|
||||
// 598 is chosen because under the default configuration, it would
|
||||
// correspond to roughly two 3840x2160 displays.
|
||||
if encoded_lru.len() >= 598 {
|
||||
encoded_lru.shift_remove_index(0);
|
||||
}
|
||||
encoded_lru.insert(chunk, encoded.clone());
|
||||
_ = reply.send(encoded);
|
||||
}
|
||||
}
|
||||
|
||||
Command::Invalidate { chunk } => {
|
||||
encoded_lru.shift_remove(&chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
37
crates/rkgk/src/wall/chunk_iterator.rs
Normal file
37
crates/rkgk/src/wall/chunk_iterator.rs
Normal file
|
@ -0,0 +1,37 @@
|
|||
use super::ChunkPosition;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct ChunkIterator {
|
||||
cursor: ChunkPosition,
|
||||
left: i32,
|
||||
bottom_right: ChunkPosition,
|
||||
}
|
||||
|
||||
impl ChunkIterator {
|
||||
pub fn new(top_left: ChunkPosition, bottom_right: ChunkPosition) -> Self {
|
||||
Self {
|
||||
cursor: top_left,
|
||||
left: top_left.x,
|
||||
bottom_right,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for ChunkIterator {
|
||||
type Item = ChunkPosition;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let position = self.cursor;
|
||||
|
||||
self.cursor.x += 1;
|
||||
if self.cursor.y > self.bottom_right.y {
|
||||
return None;
|
||||
}
|
||||
if self.cursor.x > self.bottom_right.x {
|
||||
self.cursor.x = self.left;
|
||||
self.cursor.y += 1;
|
||||
}
|
||||
|
||||
Some(position)
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue