bobashare_web/api/v1/
upload.rs

1//! API to create an upload
2
3use std::io::SeekFrom;
4
5use anyhow::Context;
6use axum::{
7    body::Body,
8    extract::{Path, State},
9    response::{IntoResponse, Response},
10    Json,
11};
12use axum_extra::{extract::WithRejection, typed_header::TypedHeaderRejection, TypedHeader};
13use bobashare::{generate_randomized_id, storage::file::CreateUploadError};
14use chrono::{DateTime, TimeDelta, Utc};
15use displaydoc::Display;
16use futures_util::TryStreamExt;
17use headers::{ContentLength, ContentType};
18use hyper::{header, HeaderMap, StatusCode};
19use serde::Serialize;
20use thiserror::Error;
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
22use tracing::{event, instrument, Instrument, Level};
23
24use super::ApiErrorExt;
25use crate::{clamp_expiry, str_to_duration, AppState};
26
27/// The JSON API response after uploading a file
28#[derive(Debug, Clone, Serialize)]
29pub struct UploadResponse {
30    /// ID of the upload (used in URL)
31    pub id: String,
32    /// url to the upload
33    pub url: String,
34    /// direct url to download the raw uploaded file
35    pub direct_url: String,
36    /// the name of the file
37    pub filename: String,
38    /// the MIME type of the uploaded file
39    pub mimetype: String,
40    /// expiration date in RFC 3339 format, null if the upload never expires
41    pub expiry_date: Option<DateTime<Utc>>,
42    /// key to delete the upload later before it's expired
43    pub delete_key: String,
44}
45
46/// Errors that could occur during upload
47#[derive(Debug, Error, Display)]
48pub enum UploadError {
49    /// an upload already exists with the same id
50    AlreadyExists,
51    /// error parsing `{name}` header
52    ParseHeader { name: String, source: anyhow::Error },
53    /// file is too large ({size} > {max})
54    TooLarge { size: u64, max: u64 },
55
56    /// upload was cancelled
57    Cancelled(#[source] anyhow::Error),
58
59    /// internal server error
60    InternalServer(#[from] anyhow::Error),
61}
62impl From<TypedHeaderRejection> for UploadError {
63    fn from(rej: TypedHeaderRejection) -> Self {
64        Self::ParseHeader {
65            name: rej.name().to_string(),
66            source: rej.into(),
67        }
68    }
69}
70impl IntoResponse for UploadError {
71    fn into_response(self) -> Response {
72        let code = match self {
73            Self::AlreadyExists => StatusCode::CONFLICT,
74            Self::ParseHeader { name: _, source: _ } => StatusCode::BAD_REQUEST,
75            Self::TooLarge { size: _, max: _ } => StatusCode::PAYLOAD_TOO_LARGE,
76            Self::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, // unused
77            Self::InternalServer(_) => StatusCode::INTERNAL_SERVER_ERROR,
78        };
79
80        if let Self::Cancelled(_) = self {
81            let error = anyhow::Error::new(self);
82            event!(
83                Level::INFO,
84                error = format!("{error:#}"),
85                "returning empty response to cancelled upload"
86            );
87            ().into_response()
88        } else {
89            self.into_response_with_code(code)
90        }
91    }
92}
93
94/// Create an upload
95///
96/// # Request
97///
98/// `PUT /api/v1/upload/:filename`
99///
100/// NOTE: The first URL will use the randomized upload ID as the filename.
101///
102/// ## Headers
103///
104/// - `Content-Type` (required) -- mimetype -- the mime type (file format) of
105///   the file. Note that it will be ignored if the file is plaintext.
106/// - `Bobashare-Expiry` (optional) -- number -- duration until the upload
107///   should expire
108///   - specify `0` for no expiry
109///   - examples (see [`str_to_duration`] for more information):
110///     - `1d` -- 1 day
111///     - `1h` -- 1 hour
112///     - `1m` -- 1 minute
113///     - `1s` -- 1 second
114///
115/// - `Bobashare-Delete-Key` (optional) -- string -- custom key to use for
116///   deleting the file later; if not provided, one will be randomly generated
117///
118/// ## Body
119///
120/// Should contain the contents of the file to upload
121///
122/// # Response
123///
124/// ## Success
125///
126/// - 201 Created
127/// - `Location` header containing the URL of the upload
128/// - JSON body created from [`UploadResponse`]
129#[instrument(skip(state, filename, headers, body), fields(id))]
130pub async fn put(
131    state: State<&'static AppState>,
132    filename: Path<String>,
133    WithRejection(TypedHeader(mimetype), _): WithRejection<TypedHeader<ContentType>, UploadError>,
134    WithRejection(TypedHeader(content_length), _): WithRejection<
135        TypedHeader<ContentLength>,
136        UploadError,
137    >,
138    headers: HeaderMap,
139    body: Body,
140) -> Result<impl IntoResponse, UploadError> {
141    // hyper will automatically make sure the body is <= the content-length, so we
142    // can rely on it here
143    //
144    // also note that hyper seems to intercept the Content-Length header and return
145    // its own empty response instead of using WithRejection here
146    if content_length.0 > state.max_file_size {
147        event!(
148            Level::INFO,
149            size = content_length.0,
150            max = state.max_file_size,
151            "file is too large"
152        );
153        return Err(UploadError::TooLarge {
154            size: content_length.0,
155            max: state.max_file_size,
156        });
157    }
158    let id = generate_randomized_id(state.id_length);
159    tracing::Span::current().record("id", &id);
160    event!(Level::DEBUG, "generated random ID for upload");
161
162    let mimetype = mimetype.into();
163
164    let expiry = match headers.get("Bobashare-Expiry") {
165        // if header not found, use default expiry
166        None => {
167            event!(
168                Level::DEBUG,
169                "`Bobashare-Expiry` header not provided, using default"
170            );
171            Some(state.default_expiry)
172        }
173        // otherwise, clamp the requested expiry to the max
174        Some(e) => {
175            let expiry = e.to_str().map_err(|e| UploadError::ParseHeader {
176                name: String::from("Bobashare-Expiry"),
177                source: anyhow::Error::new(e).context("error converting to string"),
178            })?;
179
180            event!(Level::DEBUG, "`Bobashare-Expiry` header says {}", expiry);
181
182            let expiry = if expiry == "never" {
183                None
184            } else {
185                Some(
186                    TimeDelta::from_std(str_to_duration(expiry).map_err(|e| {
187                        UploadError::ParseHeader {
188                            name: String::from("Bobashare-Expiry"),
189                            source: anyhow::Error::new(e).context("error parsing duration string"),
190                        }
191                    })?)
192                    .map_err(|e| UploadError::ParseHeader {
193                        name: String::from("Bobashare-Expiry"),
194                        source: anyhow::Error::new(e).context("error converting duration"),
195                    })?,
196                )
197            };
198
199            // TODO: should we return an error if expiry is too large instead?
200            clamp_expiry(state.max_expiry, expiry)
201        }
202    };
203    event!(Level::DEBUG, expiry = %expiry.map_or_else(|| String::from("never"), |e| e.to_string()));
204
205    let delete_key = headers
206        .get("Bobashare-Delete-Key")
207        .map(|k| {
208            k.to_str().map_err(|e| UploadError::ParseHeader {
209                name: String::from("Bobashare-Delete-Key"),
210                source: anyhow::Error::new(e).context("error converting to string"),
211            })
212        })
213        .transpose()?
214        .map(ToString::to_string);
215    if delete_key.is_some() {
216        event!(Level::DEBUG, delete_key, "custom delete key was provided");
217    } else {
218        event!(Level::DEBUG, "delete_key will be randomly generated");
219    }
220
221    let mut upload = state
222        .backend
223        .create_upload(&id, &filename, mimetype, expiry, delete_key)
224        .await
225        .map_err(|e| {
226            if let CreateUploadError::AlreadyExists = e {
227                UploadError::AlreadyExists
228            } else {
229                UploadError::InternalServer(
230                    anyhow::Error::new(e).context("error while initializing upload"),
231                )
232            }
233        })?;
234    event!(
235        Level::TRACE,
236        upload = format!("{upload:?}"),
237        "created upload handle"
238    );
239
240    let mut file_writer = BufWriter::new(&mut upload.file);
241    event!(Level::DEBUG, "streaming file to disk");
242    let stream_file_task = async {
243        let mut body = body.into_data_stream();
244        loop {
245            let chunk = body.try_next().await.context("error reading body");
246            match chunk {
247                Ok(ch) => match ch {
248                    Some(c) => {
249                        event!(
250                            Level::TRACE,
251                            "writing chunk of {} bytes to file buffer",
252                            c.len()
253                        );
254                        file_writer
255                            .write_all(&c)
256                            .await
257                            .context("error writing to file")?;
258                    }
259                    None => break,
260                },
261                Err(e) => {
262                    return Err(UploadError::Cancelled(e));
263                }
264            }
265        }
266        Ok(())
267    };
268
269    let mut shutdown_rx = state.shutdown_tx.subscribe();
270    tokio::select! {
271        res = stream_file_task => {
272            if let Err(e) = res {
273                event!(Level::INFO, "upload was cancelled; it will be deleted");
274                upload
275                    .flush()
276                    .await
277                    .context("error flushing cancelled upload before deletion")?;
278                state
279                    .backend
280                    .delete_upload(id)
281                    .await
282                    .context("error deleting cancelled upload")?;
283                event!(Level::INFO, "upload was deleted successfully");
284                return Err(e);
285            }
286        },
287        _ = shutdown_rx.recv() => {
288            event!(Level::INFO, "server is shutting down; deleting lock");
289            upload.drop_lock().await.context("error deleting lock of cancelled upload")?;
290            return Err(UploadError::InternalServer(anyhow::anyhow!("server is shutting down")));
291        }
292    };
293
294    file_writer
295        .flush()
296        .await
297        .context("error flushing file buffer")?;
298
299    let detect_plaintext_span = tracing::span!(Level::INFO, "detect_plaintext");
300    async {
301        tracing::event!(Level::INFO, "detecting whether the upload is plaintext");
302        let upload = &mut upload;
303        if let Err(err) = upload.file.seek(SeekFrom::Start(0)).await {
304            tracing::event!(Level::ERROR, ?err, "error seeking to beginning of file");
305            return;
306        };
307        let mut buf = [0; 1024];
308        if let Err(err) = upload.file.read(&mut buf).await {
309            tracing::event!(Level::ERROR, ?err, "error reading first 1024 bytes of file");
310            return;
311        };
312
313        // TODO: would be nice to support other text encodings
314        if std::str::from_utf8(&buf).is_ok() {
315            tracing::event!(Level::INFO, "upload is plaintext");
316            upload.metadata.mimetype = mime::TEXT_PLAIN_UTF_8;
317        } else {
318            tracing::event!(Level::INFO, "upload is not plaintext");
319        }
320    }
321    .instrument(detect_plaintext_span)
322    .await;
323
324    let metadata = upload
325        .flush()
326        .await
327        .context("error flushing upload metadata to disk")?;
328    event!(Level::DEBUG, "flushed upload metadata to disk");
329
330    // SAFETY: this shouldn't fail because `metadata.id` should be valid in a URL
331    let url = state.base_url.join(&metadata.id).unwrap().to_string();
332    let direct_url = state.raw_url.join(&metadata.id).unwrap().to_string();
333    event!(
334        Level::INFO,
335        url,
336        filename = metadata.filename,
337        mimetype = %metadata.mimetype,
338        expiry = %metadata
339            .expiry_date
340            .map_or_else(|| String::from("never"), |e| e.to_string()),
341        "successfully created upload"
342    );
343    Ok((
344        StatusCode::CREATED,
345        [
346            (header::CONTENT_LOCATION, direct_url.clone()),
347            (header::LOCATION, url.clone()),
348        ],
349        Json(UploadResponse {
350            id,
351            url,
352            direct_url,
353            filename: metadata.filename,
354            mimetype: metadata.mimetype.to_string(),
355            expiry_date: metadata.expiry_date,
356            delete_key: metadata.delete_key,
357        }),
358    ))
359}