a whole load of work in progress

This commit is contained in:
りき萌 2024-08-10 23:13:20 +02:00
parent caec0b8ac9
commit 26ba098183
63 changed files with 3234 additions and 321 deletions

25
crates/rkgk/Cargo.toml Normal file
View file

@ -0,0 +1,25 @@
[package]
name = "rkgk"
version = "0.1.0"
edition = "2021"
[dependencies]
axum = { version = "0.7.5", features = ["macros", "ws"] }
base64 = "0.22.1"
chrono = "0.4.38"
color-eyre = "0.6.3"
copy_dir = "0.1.3"
dashmap = "6.0.1"
derive_more = { version = "1.0.0", features = ["try_from"] }
eyre = "0.6.12"
haku.workspace = true
rand = "0.8.5"
rand_chacha = "0.3.1"
rusqlite = { version = "0.32.1", features = ["bundled"] }
serde = { version = "1.0.206", features = ["derive"] }
serde_json = "1.0.124"
tokio = { version = "1.39.2", features = ["full"] }
toml = "0.8.19"
tower-http = { version = "0.5.2", features = ["fs"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["fmt"] }

62
crates/rkgk/src/api.rs Normal file
View file

@ -0,0 +1,62 @@
use std::sync::Arc;
use axum::{
extract::State,
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use crate::Databases;
mod wall;
pub fn router<S>(dbs: Arc<Databases>) -> Router<S> {
Router::new()
.route("/login", post(login_new))
.route("/wall", get(wall::wall))
.with_state(dbs)
}
#[derive(Deserialize)]
struct NewUserParams {
nickname: String,
}
#[derive(Serialize)]
#[serde(tag = "status", rename_all = "camelCase")]
enum NewUserResponse {
#[serde(rename_all = "camelCase")]
Ok { user_id: String },
#[serde(rename_all = "camelCase")]
Error { message: String },
}
async fn login_new(dbs: State<Arc<Databases>>, params: Json<NewUserParams>) -> impl IntoResponse {
if !(1..=32).contains(&params.nickname.len()) {
return (
StatusCode::BAD_REQUEST,
Json(NewUserResponse::Error {
message: "nickname must be 1..=32 characters long".into(),
}),
);
}
match dbs.login.new_user(params.0.nickname).await {
Ok(user_id) => (
StatusCode::OK,
Json(NewUserResponse::Ok {
user_id: user_id.to_string(),
}),
),
Err(error) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(NewUserResponse::Error {
message: error.to_string(),
}),
),
}
}

155
crates/rkgk/src/api/wall.rs Normal file
View file

@ -0,0 +1,155 @@
use std::sync::Arc;
use axum::{
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
response::Response,
};
use eyre::{bail, Context, OptionExt};
use schema::{Error, LoginRequest, LoginResponse, Online, Version, WallInfo};
use serde::{Deserialize, Serialize};
use tokio::select;
use tracing::{error, info};
use crate::{
login::database::LoginStatus,
wall::{Event, JoinError, Session},
Databases,
};
mod schema;
pub async fn wall(State(dbs): State<Arc<Databases>>, ws: WebSocketUpgrade) -> Response {
ws.on_upgrade(|ws| websocket(dbs, ws))
}
fn to_message<T>(value: &T) -> Message
where
T: Serialize,
{
Message::Text(serde_json::to_string(value).expect("cannot serialize response to JSON"))
}
fn from_message<'de, T>(message: &'de Message) -> eyre::Result<T>
where
T: Deserialize<'de>,
{
match message {
Message::Text(json) => {
serde_json::from_str(json).context("could not deserialize JSON text message")
}
_ => bail!("expected a text message"),
}
}
async fn recv_expect(ws: &mut WebSocket) -> eyre::Result<Message> {
Ok(ws
.recv()
.await
.ok_or_eyre("connection closed unexpectedly")??)
}
async fn websocket(dbs: Arc<Databases>, mut ws: WebSocket) {
match fallible_websocket(dbs, &mut ws).await {
Ok(()) => (),
Err(e) => {
_ = ws
.send(to_message(&Error {
error: format!("{e:?}"),
}))
.await
}
}
}
async fn fallible_websocket(dbs: Arc<Databases>, ws: &mut WebSocket) -> eyre::Result<()> {
#[cfg(debug_assertions)]
let version = format!("{}-dev", env!("CARGO_PKG_VERSION"));
#[cfg(not(debug_assertions))]
let version = format!("{}", env!("CARGO_PKG_VERSION"));
ws.send(to_message(&Version { version })).await?;
let login_request: LoginRequest = from_message(&recv_expect(ws).await?)?;
let user_id = *login_request.user_id();
match dbs
.login
.log_in(user_id)
.await
.context("error while logging in")?
{
LoginStatus::ValidUser => (),
LoginStatus::UserDoesNotExist => {
ws.send(to_message(&LoginResponse::UserDoesNotExist))
.await?;
return Ok(());
}
}
let wall_id = match login_request {
LoginRequest::New { .. } => dbs.wall_broker.generate_id().await,
LoginRequest::Join { wall, .. } => wall,
};
let wall = dbs.wall_broker.open(wall_id);
let mut session_handle = match wall.join(Session::new(user_id)) {
Ok(handle) => handle,
Err(error) => {
ws.send(to_message(&match error {
// NOTE: Respond with the same error code, because it doesn't matter to the user -
// either way the room is way too contended for them to join.
JoinError::TooManyCurrentSessions => LoginResponse::TooManySessions,
JoinError::IdsExhausted => LoginResponse::TooManySessions,
}))
.await?;
return Ok(());
}
};
let mut users_online = vec![];
for online in wall.online() {
let user_info = match dbs.login.user_info(online.user_id).await {
Ok(Some(user_info)) => user_info,
Ok(None) | Err(_) => {
error!(?online, "could not get info about online user");
continue;
}
};
users_online.push(Online {
session_id: online.session_id,
nickname: user_info.nickname,
cursor: online.cursor,
})
}
let users_online = users_online;
ws.send(to_message(&LoginResponse::LoggedIn {
wall: wall_id,
wall_info: WallInfo {
chunk_size: wall.settings().chunk_size,
online: users_online,
},
session_id: session_handle.session_id,
}))
.await?;
loop {
select! {
Some(message) = ws.recv() => {
let kind = from_message(&message?)?;
wall.event(Event { session_id: session_handle.session_id, kind });
}
Ok(event) = session_handle.event_receiver.recv() => {
ws.send(to_message(&event)).await?;
}
else => break,
}
}
Ok(())
}

View file

@ -0,0 +1,70 @@
use serde::{Deserialize, Serialize};
use crate::{
login::UserId,
schema::Vec2,
wall::{self, SessionId, WallId},
};
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Version {
pub version: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Error {
pub error: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(
tag = "login",
rename_all = "camelCase",
rename_all_fields = "camelCase"
)]
pub enum LoginRequest {
New { user: UserId },
Join { user: UserId, wall: WallId },
}
impl LoginRequest {
pub fn user_id(&self) -> &UserId {
match self {
LoginRequest::New { user } => user,
LoginRequest::Join { user, .. } => user,
}
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Online {
pub session_id: SessionId,
pub nickname: String,
pub cursor: Option<Vec2>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WallInfo {
pub chunk_size: u32,
pub online: Vec<Online>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(
tag = "response",
rename_all = "camelCase",
rename_all_fields = "camelCase"
)]
pub enum LoginResponse {
LoggedIn {
wall: WallId,
wall_info: WallInfo,
session_id: SessionId,
},
UserDoesNotExist,
TooManySessions,
}

51
crates/rkgk/src/binary.rs Normal file
View file

@ -0,0 +1,51 @@
use std::{error::Error, fmt};
pub struct Reader<'a> {
slice: &'a [u8],
}
impl<'a> Reader<'a> {
pub fn new(slice: &'a [u8]) -> Self {
Self { slice }
}
pub fn read_u8(&mut self) -> Result<u8, OutOfData> {
if !self.slice.is_empty() {
Ok(self.slice[0])
} else {
Err(OutOfData)
}
}
pub fn read_u16(&mut self) -> Result<u16, OutOfData> {
if self.slice.len() >= 2 {
Ok(u16::from_le_bytes([self.slice[0], self.slice[1]]))
} else {
Err(OutOfData)
}
}
pub fn read_u32(&mut self) -> Result<u32, OutOfData> {
if self.slice.len() >= 4 {
Ok(u32::from_le_bytes([
self.slice[0],
self.slice[1],
self.slice[2],
self.slice[3],
]))
} else {
Err(OutOfData)
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct OutOfData;
impl fmt::Display for OutOfData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("reader ran out of data")
}
}
impl Error for OutOfData {}

View file

@ -0,0 +1,8 @@
use serde::{Deserialize, Serialize};
use crate::wall;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
pub wall: wall::Settings,
}

27
crates/rkgk/src/id.rs Normal file
View file

@ -0,0 +1,27 @@
use std::fmt;
use base64::Engine;
pub fn serialize(f: &mut fmt::Formatter<'_>, prefix: &str, bytes: &[u8; 32]) -> fmt::Result {
f.write_str(prefix)?;
let mut buffer = [b'0'; 43];
base64::engine::general_purpose::STANDARD_NO_PAD
.encode_slice(bytes, &mut buffer)
.unwrap();
f.write_str(std::str::from_utf8(&buffer).unwrap())?;
Ok(())
}
pub struct InvalidId;
pub fn deserialize(s: &str, prefix: &str) -> Result<[u8; 32], InvalidId> {
let mut bytes = [0; 32];
let b64 = s.strip_prefix(prefix).ok_or(InvalidId)?;
let decoded = base64::engine::general_purpose::STANDARD_NO_PAD
.decode_slice(b64, &mut bytes)
.map_err(|_| InvalidId)?;
if decoded != bytes.len() {
return Err(InvalidId);
}
Ok(bytes)
}

View file

@ -0,0 +1,23 @@
use std::time::Duration;
use axum::{routing::get, Router};
use tokio::time::sleep;
pub fn router<S>() -> Router<S> {
Router::new()
.route("/stall", get(stall))
.route("/back-up", get(back_up))
.with_state(())
}
async fn stall() -> String {
loop {
// Sleep for a day, I guess. Just to uphold the connection forever without really using any
// significant resources.
sleep(Duration::from_secs(60 * 60 * 24)).await;
}
}
async fn back_up() -> String {
"".into()
}

70
crates/rkgk/src/login.rs Normal file
View file

@ -0,0 +1,70 @@
use std::{
error::Error,
fmt::{self},
str::FromStr,
};
use rand::RngCore;
pub mod database;
pub use database::Database;
use serde::{Deserialize, Serialize};
use crate::{id, serialization::DeserializeFromStr};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct UserId([u8; 32]);
impl UserId {
pub fn new(rng: &mut dyn RngCore) -> Self {
let mut bytes = [0; 32];
rng.fill_bytes(&mut bytes[..]);
Self(bytes)
}
}
impl fmt::Display for UserId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
id::serialize(f, "user_", &self.0)
}
}
impl FromStr for UserId {
type Err = InvalidUserId;
fn from_str(s: &str) -> Result<Self, Self::Err> {
id::deserialize(s, "user_")
.map(Self)
.map_err(|_| InvalidUserId)
}
}
impl Serialize for UserId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for UserId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(DeserializeFromStr::new("user ID"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvalidUserId;
impl fmt::Display for InvalidUserId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("invalid user ID")
}
}
impl Error for InvalidUserId {}

View file

@ -0,0 +1,166 @@
use std::path::PathBuf;
use chrono::Utc;
use eyre::{eyre, Context};
use rand::SeedableRng;
use rusqlite::{Connection, OptionalExtension};
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;
use super::UserId;
pub struct Settings {
pub path: PathBuf,
}
#[derive(Debug, Clone)]
pub struct Database {
command_tx: mpsc::Sender<Command>,
}
pub enum LoginStatus {
ValidUser,
UserDoesNotExist,
}
#[derive(Debug, Clone)]
pub struct UserInfo {
pub nickname: String,
}
enum Command {
NewUser {
nickname: String,
reply: oneshot::Sender<eyre::Result<UserId>>,
},
LogIn {
user_id: UserId,
reply: oneshot::Sender<LoginStatus>,
},
UserInfo {
user_id: UserId,
reply: oneshot::Sender<eyre::Result<Option<UserInfo>>>,
},
}
impl Database {
pub async fn new_user(&self, nickname: String) -> eyre::Result<UserId> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::NewUser {
nickname,
reply: tx,
})
.await
.map_err(|_| eyre!("database is too contended"))?;
rx.await.map_err(|_| eyre!("database is not available"))?
}
pub async fn log_in(&self, user_id: UserId) -> eyre::Result<LoginStatus> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::LogIn { user_id, reply: tx })
.await
.map_err(|_| eyre!("database is too contended"))?;
rx.await.map_err(|_| eyre!("database is not available"))
}
pub async fn user_info(&self, user_id: UserId) -> eyre::Result<Option<UserInfo>> {
let (tx, rx) = oneshot::channel();
self.command_tx
.send(Command::UserInfo { user_id, reply: tx })
.await
.map_err(|_| eyre!("database is too contended"))?;
rx.await.map_err(|_| eyre!("database is not available"))?
}
}
#[instrument(name = "login::database::start", skip(settings))]
pub fn start(settings: &Settings) -> eyre::Result<Database> {
let db = Connection::open(&settings.path).context("cannot open login database")?;
db.execute(
r#"
CREATE TABLE IF NOT EXISTS
t_users (
user_index INTEGER PRIMARY KEY,
long_user_id BLOB NOT NULL,
nickname TEXT NOT NULL,
last_login_timestamp INTEGER NOT NULL
);
"#,
(),
)?;
let (command_tx, mut command_rx) = mpsc::channel(8);
let mut user_id_rng = rand_chacha::ChaCha20Rng::from_entropy();
tokio::task::spawn_blocking(move || {
let mut s_insert_user = db
.prepare(
r#"
INSERT INTO t_users
(long_user_id, nickname, last_login_timestamp)
VALUES (?, ?, ?);
"#,
)
.unwrap();
let mut s_log_in = db
.prepare(
r#"
UPDATE OR ABORT t_users
SET last_login_timestamp = ?
WHERE long_user_id = ?;
"#,
)
.unwrap();
let mut s_user_info = db
.prepare(
r#"
SELECT nickname
FROM t_users
WHERE long_user_id = ?
LIMIT 1;
"#,
)
.unwrap();
while let Some(command) = command_rx.blocking_recv() {
match command {
Command::NewUser { nickname, reply } => {
let user_id = UserId::new(&mut user_id_rng);
let result = s_insert_user
.execute((user_id.0, nickname, Utc::now().timestamp()))
.context("could not execute query");
_ = reply.send(result.map(|_| user_id));
}
Command::LogIn { user_id, reply } => {
// TODO: User expiration.
let login_status = match s_log_in.execute((Utc::now().timestamp(), user_id.0)) {
Ok(_) => LoginStatus::ValidUser,
Err(_) => LoginStatus::UserDoesNotExist,
};
_ = reply.send(login_status);
}
Command::UserInfo { user_id, reply } => {
let result = s_user_info
.query_row((user_id.0,), |row| {
Ok(UserInfo {
nickname: row.get(0)?,
})
})
.optional()
.context("could not execute query");
_ = reply.send(result);
}
}
}
});
Ok(Database { command_tx })
}

115
crates/rkgk/src/main.rs Normal file
View file

@ -0,0 +1,115 @@
use std::{
fs::{copy, create_dir_all, remove_dir_all},
path::Path,
sync::Arc,
};
use axum::Router;
use config::Config;
use copy_dir::copy_dir;
use eyre::Context;
use tokio::{fs, net::TcpListener};
use tower_http::services::{ServeDir, ServeFile};
use tracing::{info, info_span};
use tracing_subscriber::fmt::format::FmtSpan;
mod api;
mod binary;
mod config;
mod id;
#[cfg(debug_assertions)]
mod live_reload;
mod login;
pub mod schema;
mod serialization;
mod wall;
struct Paths<'a> {
target_dir: &'a Path,
database_dir: &'a Path,
}
fn build(paths: &Paths<'_>) -> eyre::Result<()> {
let _span = info_span!("build").entered();
_ = remove_dir_all(paths.target_dir);
create_dir_all(paths.target_dir).context("cannot create target directory")?;
copy_dir("static", paths.target_dir.join("static")).context("cannot copy static directory")?;
create_dir_all(paths.target_dir.join("static/wasm"))
.context("cannot create static/wasm directory")?;
copy(
"target/wasm32-unknown-unknown/wasm-dev/haku_wasm.wasm",
paths.target_dir.join("static/wasm/haku.wasm"),
)
.context("cannot copy haku.wasm file")?;
Ok(())
}
pub struct Databases {
pub login: login::Database,
pub wall_broker: wall::Broker,
}
fn database(config: &Config, paths: &Paths<'_>) -> eyre::Result<Databases> {
create_dir_all(paths.database_dir).context("cannot create directory for databases")?;
let login = login::database::start(&login::database::Settings {
path: paths.database_dir.join("login.db"),
})
.context("cannot start up login database")?;
let wall_broker = wall::Broker::new(config.wall);
Ok(Databases { login, wall_broker })
}
async fn fallible_main() -> eyre::Result<()> {
let paths = Paths {
target_dir: Path::new("target/site"),
database_dir: Path::new("database"),
};
let config: Config = toml::from_str(
&fs::read_to_string("rkgk.toml")
.await
.context("cannot read config file")?,
)
.context("cannot deserialize config file")?;
build(&paths)?;
let dbs = Arc::new(database(&config, &paths)?);
let app = Router::new()
.route_service(
"/",
ServeFile::new(paths.target_dir.join("static/index.html")),
)
.nest_service("/static", ServeDir::new(paths.target_dir.join("static")))
.nest("/api", api::router(dbs.clone()));
#[cfg(debug_assertions)]
let app = app.nest("/dev/live-reload", live_reload::router());
let listener = TcpListener::bind("0.0.0.0:8080")
.await
.expect("cannot bind to port");
info!("listening on port 8080");
axum::serve(listener, app).await.expect("cannot serve app");
Ok(())
}
#[tokio::main]
async fn main() {
color_eyre::install().unwrap();
tracing_subscriber::fmt()
.with_span_events(FmtSpan::ACTIVE)
.init();
match fallible_main().await {
Ok(_) => (),
Err(error) => println!("{error:?}"),
}
}

View file

@ -0,0 +1,7 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
pub struct Vec2 {
pub x: f32,
pub y: f32,
}

View file

@ -0,0 +1,36 @@
use std::{fmt::Display, marker::PhantomData, str::FromStr};
use serde::de::{Error, Visitor};
pub struct DeserializeFromStr<T> {
expecting: &'static str,
_phantom: PhantomData<T>,
}
impl<T> DeserializeFromStr<T> {
pub fn new(expecting: &'static str) -> Self {
Self {
expecting,
_phantom: PhantomData,
}
}
}
impl<'de, T> Visitor<'de> for DeserializeFromStr<T>
where
T: FromStr,
T::Err: Display,
{
type Value = T;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str(self.expecting)
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
T::from_str(v).map_err(|e| Error::custom(e))
}
}

246
crates/rkgk/src/wall.rs Normal file
View file

@ -0,0 +1,246 @@
use std::{
error::Error,
fmt,
str::FromStr,
sync::{
atomic::{self, AtomicU32},
Arc, Weak,
},
};
use dashmap::DashMap;
use haku::render::tiny_skia::Pixmap;
use rand::RngCore;
use serde::{Deserialize, Serialize};
use tokio::sync::{broadcast, Mutex};
use crate::{id, login::UserId, schema::Vec2, serialization::DeserializeFromStr};
pub mod broker;
pub use broker::Broker;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct WallId([u8; 32]);
impl WallId {
pub fn new(rng: &mut dyn RngCore) -> Self {
let mut bytes = [0; 32];
rng.fill_bytes(&mut bytes);
Self(bytes)
}
}
impl fmt::Display for WallId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
id::serialize(f, "wall_", &self.0)
}
}
impl FromStr for WallId {
type Err = InvalidWallId;
fn from_str(s: &str) -> Result<Self, Self::Err> {
id::deserialize(s, "wall_")
.map(WallId)
.map_err(|_| InvalidWallId)
}
}
impl Serialize for WallId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}
impl<'de> Deserialize<'de> for WallId {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_str(DeserializeFromStr::new("wall ID"))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Deserialize, Serialize)]
pub struct SessionId(u32);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct InvalidWallId;
impl fmt::Display for InvalidWallId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("invalid wall ID")
}
}
impl Error for InvalidWallId {}
pub struct Chunk {
pixmap: Pixmap,
}
impl Chunk {
pub fn new(size: u32) -> Self {
Self {
pixmap: Pixmap::new(size, size).unwrap(),
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
pub struct Settings {
pub max_chunks: usize,
pub max_sessions: usize,
pub chunk_size: u32,
}
pub struct Wall {
settings: Settings,
chunks: DashMap<(i32, i32), Arc<Mutex<Chunk>>>,
sessions: DashMap<SessionId, Session>,
session_id_counter: AtomicU32,
event_sender: broadcast::Sender<Event>,
}
pub struct Session {
pub user_id: UserId,
pub cursor: Option<Vec2>,
}
pub struct SessionHandle {
pub wall: Weak<Wall>,
pub event_receiver: broadcast::Receiver<Event>,
pub session_id: SessionId,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Event {
pub session_id: SessionId,
pub kind: EventKind,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(
tag = "event",
rename_all = "camelCase",
rename_all_fields = "camelCase"
)]
pub enum EventKind {
Cursor { position: Vec2 },
SetBrush { brush: String },
Plot { points: Vec<Vec2> },
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Online {
pub session_id: SessionId,
pub user_id: UserId,
pub cursor: Option<Vec2>,
}
impl Wall {
pub fn new(settings: Settings) -> Self {
Self {
settings,
chunks: DashMap::new(),
sessions: DashMap::new(),
session_id_counter: AtomicU32::new(0),
event_sender: broadcast::channel(16).0,
}
}
pub fn settings(&self) -> &Settings {
&self.settings
}
pub fn get_chunk(&self, at: (i32, i32)) -> Option<Arc<Mutex<Chunk>>> {
self.chunks.get(&at).map(|chunk| Arc::clone(&chunk))
}
pub fn get_or_create_chunk(&self, at: (i32, i32)) -> Arc<Mutex<Chunk>> {
Arc::clone(
&self
.chunks
.entry(at)
.or_insert_with(|| Arc::new(Mutex::new(Chunk::new(self.settings.chunk_size)))),
)
}
pub fn join(self: &Arc<Self>, session: Session) -> Result<SessionHandle, JoinError> {
let session_id = SessionId(
self.session_id_counter
.fetch_add(1, atomic::Ordering::Relaxed),
);
self.sessions.insert(session_id, session);
Ok(SessionHandle {
wall: Arc::downgrade(self),
event_receiver: self.event_sender.subscribe(),
session_id,
})
}
pub fn online(&self) -> Vec<Online> {
self.sessions
.iter()
.map(|r| Online {
session_id: *r.key(),
user_id: r.user_id,
cursor: r.value().cursor,
})
.collect()
}
pub fn event(&self, event: Event) {
if let Some(mut session) = self.sessions.get_mut(&event.session_id) {
match &event.kind {
EventKind::SetBrush { brush } => {}
EventKind::Cursor { position } => {
session.cursor = Some(*position);
}
EventKind::Plot { points } => {}
}
}
_ = self.event_sender.send(event);
}
}
impl Session {
pub fn new(user_id: UserId) -> Self {
Self {
user_id,
cursor: None,
}
}
}
impl Drop for SessionHandle {
fn drop(&mut self) {
if let Some(wall) = self.wall.upgrade() {
wall.sessions.remove(&self.session_id);
// After the session is removed, the wall will be garbage collected later.
}
}
}
pub enum JoinError {
TooManyCurrentSessions,
IdsExhausted,
}
pub enum EventError {
DeadSession,
}

View file

@ -0,0 +1,53 @@
use std::sync::Arc;
use dashmap::DashMap;
use rand::SeedableRng;
use rand_chacha::ChaCha20Rng;
use tokio::sync::Mutex;
use tracing::info;
use super::{Settings, Wall, WallId};
/// The broker is the main way to access wall data.
///
/// It handles dynamically loading and unloading walls as they're needed.
/// It also handles database threads for each wall.
pub struct Broker {
wall_settings: Settings,
open_walls: DashMap<WallId, OpenWall>,
rng: Mutex<ChaCha20Rng>,
}
struct OpenWall {
wall: Arc<Wall>,
}
impl Broker {
pub fn new(wall_settings: Settings) -> Self {
info!(?wall_settings, "Broker::new");
Self {
wall_settings,
open_walls: DashMap::new(),
rng: Mutex::new(ChaCha20Rng::from_entropy()),
}
}
pub async fn generate_id(&self) -> WallId {
// TODO: Will lock contention be an issue with generating wall IDs?
// We only have one of these RNGs per rkgk instance.
let mut rng = self.rng.lock().await;
WallId::new(&mut *rng)
}
pub fn open(&self, wall_id: WallId) -> Arc<Wall> {
Arc::clone(
&self
.open_walls
.entry(wall_id)
.or_insert_with(|| OpenWall {
wall: Arc::new(Wall::new(self.wall_settings)),
})
.wall,
)
}
}