1use std::io::SeekFrom;
4
5use anyhow::Context;
6use axum::{
7 body::Body,
8 extract::{Path, State},
9 response::{IntoResponse, Response},
10 Json,
11};
12use axum_extra::{extract::WithRejection, typed_header::TypedHeaderRejection, TypedHeader};
13use bobashare::{generate_randomized_id, storage::file::CreateUploadError};
14use chrono::{DateTime, TimeDelta, Utc};
15use displaydoc::Display;
16use futures_util::TryStreamExt;
17use headers::{ContentLength, ContentType};
18use hyper::{header, HeaderMap, StatusCode};
19use serde::Serialize;
20use thiserror::Error;
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
22use tracing::{event, instrument, Instrument, Level};
23
24use super::ApiErrorExt;
25use crate::{clamp_expiry, str_to_duration, AppState};
26
27#[derive(Debug, Clone, Serialize)]
29pub struct UploadResponse {
30 pub id: String,
32 pub url: String,
34 pub direct_url: String,
36 pub filename: String,
38 pub mimetype: String,
40 pub expiry_date: Option<DateTime<Utc>>,
42 pub delete_key: String,
44}
45
46#[derive(Debug, Error, Display)]
48pub enum UploadError {
49 AlreadyExists,
51 ParseHeader { name: String, source: anyhow::Error },
53 TooLarge { size: u64, max: u64 },
55
56 Cancelled(#[source] anyhow::Error),
58
59 InternalServer(#[from] anyhow::Error),
61}
62impl From<TypedHeaderRejection> for UploadError {
63 fn from(rej: TypedHeaderRejection) -> Self {
64 Self::ParseHeader {
65 name: rej.name().to_string(),
66 source: rej.into(),
67 }
68 }
69}
70impl IntoResponse for UploadError {
71 fn into_response(self) -> Response {
72 let code = match self {
73 Self::AlreadyExists => StatusCode::CONFLICT,
74 Self::ParseHeader { name: _, source: _ } => StatusCode::BAD_REQUEST,
75 Self::TooLarge { size: _, max: _ } => StatusCode::PAYLOAD_TOO_LARGE,
76 Self::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::InternalServer(_) => StatusCode::INTERNAL_SERVER_ERROR,
78 };
79
80 if let Self::Cancelled(_) = self {
81 let error = anyhow::Error::new(self);
82 event!(
83 Level::INFO,
84 error = format!("{error:#}"),
85 "returning empty response to cancelled upload"
86 );
87 ().into_response()
88 } else {
89 self.into_response_with_code(code)
90 }
91 }
92}
93
94#[instrument(skip(state, filename, headers, body), fields(id))]
130pub async fn put(
131 state: State<&'static AppState>,
132 filename: Path<String>,
133 WithRejection(TypedHeader(mimetype), _): WithRejection<TypedHeader<ContentType>, UploadError>,
134 WithRejection(TypedHeader(content_length), _): WithRejection<
135 TypedHeader<ContentLength>,
136 UploadError,
137 >,
138 headers: HeaderMap,
139 body: Body,
140) -> Result<impl IntoResponse, UploadError> {
141 if content_length.0 > state.max_file_size {
147 event!(
148 Level::INFO,
149 size = content_length.0,
150 max = state.max_file_size,
151 "file is too large"
152 );
153 return Err(UploadError::TooLarge {
154 size: content_length.0,
155 max: state.max_file_size,
156 });
157 }
158 let id = generate_randomized_id(state.id_length);
159 tracing::Span::current().record("id", &id);
160 event!(Level::DEBUG, "generated random ID for upload");
161
162 let mimetype = mimetype.into();
163
164 let expiry = match headers.get("Bobashare-Expiry") {
165 None => {
167 event!(
168 Level::DEBUG,
169 "`Bobashare-Expiry` header not provided, using default"
170 );
171 Some(state.default_expiry)
172 }
173 Some(e) => {
175 let expiry = e.to_str().map_err(|e| UploadError::ParseHeader {
176 name: String::from("Bobashare-Expiry"),
177 source: anyhow::Error::new(e).context("error converting to string"),
178 })?;
179
180 event!(Level::DEBUG, "`Bobashare-Expiry` header says {}", expiry);
181
182 let expiry = if expiry == "never" {
183 None
184 } else {
185 Some(
186 TimeDelta::from_std(str_to_duration(expiry).map_err(|e| {
187 UploadError::ParseHeader {
188 name: String::from("Bobashare-Expiry"),
189 source: anyhow::Error::new(e).context("error parsing duration string"),
190 }
191 })?)
192 .map_err(|e| UploadError::ParseHeader {
193 name: String::from("Bobashare-Expiry"),
194 source: anyhow::Error::new(e).context("error converting duration"),
195 })?,
196 )
197 };
198
199 clamp_expiry(state.max_expiry, expiry)
201 }
202 };
203 event!(Level::DEBUG, expiry = %expiry.map_or_else(|| String::from("never"), |e| e.to_string()));
204
205 let delete_key = headers
206 .get("Bobashare-Delete-Key")
207 .map(|k| {
208 k.to_str().map_err(|e| UploadError::ParseHeader {
209 name: String::from("Bobashare-Delete-Key"),
210 source: anyhow::Error::new(e).context("error converting to string"),
211 })
212 })
213 .transpose()?
214 .map(ToString::to_string);
215 if delete_key.is_some() {
216 event!(Level::DEBUG, delete_key, "custom delete key was provided");
217 } else {
218 event!(Level::DEBUG, "delete_key will be randomly generated");
219 }
220
221 let mut upload = state
222 .backend
223 .create_upload(&id, &filename, mimetype, expiry, delete_key)
224 .await
225 .map_err(|e| {
226 if let CreateUploadError::AlreadyExists = e {
227 UploadError::AlreadyExists
228 } else {
229 UploadError::InternalServer(
230 anyhow::Error::new(e).context("error while initializing upload"),
231 )
232 }
233 })?;
234 event!(
235 Level::TRACE,
236 upload = format!("{upload:?}"),
237 "created upload handle"
238 );
239
240 let mut file_writer = BufWriter::new(&mut upload.file);
241 event!(Level::DEBUG, "streaming file to disk");
242 let stream_file_task = async {
243 let mut body = body.into_data_stream();
244 loop {
245 let chunk = body.try_next().await.context("error reading body");
246 match chunk {
247 Ok(ch) => match ch {
248 Some(c) => {
249 event!(
250 Level::TRACE,
251 "writing chunk of {} bytes to file buffer",
252 c.len()
253 );
254 file_writer
255 .write_all(&c)
256 .await
257 .context("error writing to file")?;
258 }
259 None => break,
260 },
261 Err(e) => {
262 return Err(UploadError::Cancelled(e));
263 }
264 }
265 }
266 Ok(())
267 };
268
269 let mut shutdown_rx = state.shutdown_tx.subscribe();
270 tokio::select! {
271 res = stream_file_task => {
272 if let Err(e) = res {
273 event!(Level::INFO, "upload was cancelled; it will be deleted");
274 upload
275 .flush()
276 .await
277 .context("error flushing cancelled upload before deletion")?;
278 state
279 .backend
280 .delete_upload(id)
281 .await
282 .context("error deleting cancelled upload")?;
283 event!(Level::INFO, "upload was deleted successfully");
284 return Err(e);
285 }
286 },
287 _ = shutdown_rx.recv() => {
288 event!(Level::INFO, "server is shutting down; deleting lock");
289 upload.drop_lock().await.context("error deleting lock of cancelled upload")?;
290 return Err(UploadError::InternalServer(anyhow::anyhow!("server is shutting down")));
291 }
292 };
293
294 file_writer
295 .flush()
296 .await
297 .context("error flushing file buffer")?;
298
299 let detect_plaintext_span = tracing::span!(Level::INFO, "detect_plaintext");
300 async {
301 tracing::event!(Level::INFO, "detecting whether the upload is plaintext");
302 let upload = &mut upload;
303 if let Err(err) = upload.file.seek(SeekFrom::Start(0)).await {
304 tracing::event!(Level::ERROR, ?err, "error seeking to beginning of file");
305 return;
306 };
307 let mut buf = [0; 1024];
308 if let Err(err) = upload.file.read(&mut buf).await {
309 tracing::event!(Level::ERROR, ?err, "error reading first 1024 bytes of file");
310 return;
311 };
312
313 if std::str::from_utf8(&buf).is_ok() {
315 tracing::event!(Level::INFO, "upload is plaintext");
316 upload.metadata.mimetype = mime::TEXT_PLAIN_UTF_8;
317 } else {
318 tracing::event!(Level::INFO, "upload is not plaintext");
319 }
320 }
321 .instrument(detect_plaintext_span)
322 .await;
323
324 let metadata = upload
325 .flush()
326 .await
327 .context("error flushing upload metadata to disk")?;
328 event!(Level::DEBUG, "flushed upload metadata to disk");
329
330 let url = state.base_url.join(&metadata.id).unwrap().to_string();
332 let direct_url = state.raw_url.join(&metadata.id).unwrap().to_string();
333 event!(
334 Level::INFO,
335 url,
336 filename = metadata.filename,
337 mimetype = %metadata.mimetype,
338 expiry = %metadata
339 .expiry_date
340 .map_or_else(|| String::from("never"), |e| e.to_string()),
341 "successfully created upload"
342 );
343 Ok((
344 StatusCode::CREATED,
345 [
346 (header::CONTENT_LOCATION, direct_url.clone()),
347 (header::LOCATION, url.clone()),
348 ],
349 Json(UploadResponse {
350 id,
351 url,
352 direct_url,
353 filename: metadata.filename,
354 mimetype: metadata.mimetype.to_string(),
355 expiry_date: metadata.expiry_date,
356 delete_key: metadata.delete_key,
357 }),
358 ))
359}