1use std::io::SeekFrom;
4
5use anyhow::Context;
6use axum::{
7    body::Body,
8    extract::{Path, State},
9    response::{IntoResponse, Response},
10    Json,
11};
12use axum_extra::{extract::WithRejection, typed_header::TypedHeaderRejection, TypedHeader};
13use bobashare::{generate_randomized_id, storage::file::CreateUploadError};
14use chrono::{DateTime, TimeDelta, Utc};
15use displaydoc::Display;
16use futures_util::TryStreamExt;
17use headers::{ContentLength, ContentType};
18use hyper::{header, HeaderMap, StatusCode};
19use serde::Serialize;
20use thiserror::Error;
21use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufWriter};
22use tracing::{event, instrument, Instrument, Level};
23
24use super::ApiErrorExt;
25use crate::{clamp_expiry, str_to_duration, AppState};
26
27#[derive(Debug, Clone, Serialize)]
29pub struct UploadResponse {
30    pub id: String,
32    pub url: String,
34    pub direct_url: String,
36    pub filename: String,
38    pub mimetype: String,
40    pub expiry_date: Option<DateTime<Utc>>,
42    pub delete_key: String,
44}
45
46#[derive(Debug, Error, Display)]
48pub enum UploadError {
49    AlreadyExists,
51    ParseHeader { name: String, source: anyhow::Error },
53    TooLarge { size: u64, max: u64 },
55
56    Cancelled(#[source] anyhow::Error),
58
59    InternalServer(#[from] anyhow::Error),
61}
62impl From<TypedHeaderRejection> for UploadError {
63    fn from(rej: TypedHeaderRejection) -> Self {
64        Self::ParseHeader {
65            name: rej.name().to_string(),
66            source: rej.into(),
67        }
68    }
69}
70impl IntoResponse for UploadError {
71    fn into_response(self) -> Response {
72        let code = match self {
73            Self::AlreadyExists => StatusCode::CONFLICT,
74            Self::ParseHeader { name: _, source: _ } => StatusCode::BAD_REQUEST,
75            Self::TooLarge { size: _, max: _ } => StatusCode::PAYLOAD_TOO_LARGE,
76            Self::Cancelled(_) => StatusCode::INTERNAL_SERVER_ERROR, Self::InternalServer(_) => StatusCode::INTERNAL_SERVER_ERROR,
78        };
79
80        if let Self::Cancelled(_) = self {
81            let error = anyhow::Error::new(self);
82            event!(
83                Level::INFO,
84                error = format!("{error:#}"),
85                "returning empty response to cancelled upload"
86            );
87            ().into_response()
88        } else {
89            self.into_response_with_code(code)
90        }
91    }
92}
93
94#[instrument(skip(state, filename, headers, body), fields(id))]
130pub async fn put(
131    state: State<&'static AppState>,
132    filename: Path<String>,
133    WithRejection(TypedHeader(mimetype), _): WithRejection<TypedHeader<ContentType>, UploadError>,
134    WithRejection(TypedHeader(content_length), _): WithRejection<
135        TypedHeader<ContentLength>,
136        UploadError,
137    >,
138    headers: HeaderMap,
139    body: Body,
140) -> Result<impl IntoResponse, UploadError> {
141    if content_length.0 > state.max_file_size {
147        event!(
148            Level::INFO,
149            size = content_length.0,
150            max = state.max_file_size,
151            "file is too large"
152        );
153        return Err(UploadError::TooLarge {
154            size: content_length.0,
155            max: state.max_file_size,
156        });
157    }
158    let id = generate_randomized_id(state.id_length);
159    tracing::Span::current().record("id", &id);
160    event!(Level::DEBUG, "generated random ID for upload");
161
162    let mimetype = mimetype.into();
163
164    let expiry = match headers.get("Bobashare-Expiry") {
165        None => {
167            event!(
168                Level::DEBUG,
169                "`Bobashare-Expiry` header not provided, using default"
170            );
171            Some(state.default_expiry)
172        }
173        Some(e) => {
175            let expiry = e.to_str().map_err(|e| UploadError::ParseHeader {
176                name: String::from("Bobashare-Expiry"),
177                source: anyhow::Error::new(e).context("error converting to string"),
178            })?;
179
180            event!(Level::DEBUG, "`Bobashare-Expiry` header says {}", expiry);
181
182            let expiry = if expiry == "never" {
183                None
184            } else {
185                Some(
186                    TimeDelta::from_std(str_to_duration(expiry).map_err(|e| {
187                        UploadError::ParseHeader {
188                            name: String::from("Bobashare-Expiry"),
189                            source: anyhow::Error::new(e).context("error parsing duration string"),
190                        }
191                    })?)
192                    .map_err(|e| UploadError::ParseHeader {
193                        name: String::from("Bobashare-Expiry"),
194                        source: anyhow::Error::new(e).context("error converting duration"),
195                    })?,
196                )
197            };
198
199            clamp_expiry(state.max_expiry, expiry)
201        }
202    };
203    event!(Level::DEBUG, expiry = %expiry.map_or_else(|| String::from("never"), |e| e.to_string()));
204
205    let delete_key = headers
206        .get("Bobashare-Delete-Key")
207        .map(|k| {
208            k.to_str().map_err(|e| UploadError::ParseHeader {
209                name: String::from("Bobashare-Delete-Key"),
210                source: anyhow::Error::new(e).context("error converting to string"),
211            })
212        })
213        .transpose()?
214        .map(ToString::to_string);
215    if delete_key.is_some() {
216        event!(Level::DEBUG, delete_key, "custom delete key was provided");
217    } else {
218        event!(Level::DEBUG, "delete_key will be randomly generated");
219    }
220
221    let mut upload = state
222        .backend
223        .create_upload(&id, &filename, mimetype, expiry, delete_key)
224        .await
225        .map_err(|e| {
226            if let CreateUploadError::AlreadyExists = e {
227                UploadError::AlreadyExists
228            } else {
229                UploadError::InternalServer(
230                    anyhow::Error::new(e).context("error while initializing upload"),
231                )
232            }
233        })?;
234    event!(
235        Level::TRACE,
236        upload = format!("{upload:?}"),
237        "created upload handle"
238    );
239
240    let mut file_writer = BufWriter::new(&mut upload.file);
241    event!(Level::DEBUG, "streaming file to disk");
242    let stream_file_task = async {
243        let mut body = body.into_data_stream();
244        loop {
245            let chunk = body.try_next().await.context("error reading body");
246            match chunk {
247                Ok(ch) => match ch {
248                    Some(c) => {
249                        event!(
250                            Level::TRACE,
251                            "writing chunk of {} bytes to file buffer",
252                            c.len()
253                        );
254                        file_writer
255                            .write_all(&c)
256                            .await
257                            .context("error writing to file")?;
258                    }
259                    None => break,
260                },
261                Err(e) => {
262                    return Err(UploadError::Cancelled(e));
263                }
264            }
265        }
266        Ok(())
267    };
268
269    let mut shutdown_rx = state.shutdown_tx.subscribe();
270    tokio::select! {
271        res = stream_file_task => {
272            if let Err(e) = res {
273                event!(Level::INFO, "upload was cancelled; it will be deleted");
274                upload
275                    .flush()
276                    .await
277                    .context("error flushing cancelled upload before deletion")?;
278                state
279                    .backend
280                    .delete_upload(id)
281                    .await
282                    .context("error deleting cancelled upload")?;
283                event!(Level::INFO, "upload was deleted successfully");
284                return Err(e);
285            }
286        },
287        _ = shutdown_rx.recv() => {
288            event!(Level::INFO, "server is shutting down; deleting lock");
289            upload.drop_lock().await.context("error deleting lock of cancelled upload")?;
290            return Err(UploadError::InternalServer(anyhow::anyhow!("server is shutting down")));
291        }
292    };
293
294    file_writer
295        .flush()
296        .await
297        .context("error flushing file buffer")?;
298
299    let detect_plaintext_span = tracing::span!(Level::INFO, "detect_plaintext");
300    async {
301        tracing::event!(Level::INFO, "detecting whether the upload is plaintext");
302        let upload = &mut upload;
303        if let Err(err) = upload.file.seek(SeekFrom::Start(0)).await {
304            tracing::event!(Level::ERROR, ?err, "error seeking to beginning of file");
305            return;
306        };
307        let mut buf = [0; 1024];
308        if let Err(err) = upload.file.read(&mut buf).await {
309            tracing::event!(Level::ERROR, ?err, "error reading first 1024 bytes of file");
310            return;
311        };
312
313        if std::str::from_utf8(&buf).is_ok() {
315            tracing::event!(Level::INFO, "upload is plaintext");
316            upload.metadata.mimetype = mime::TEXT_PLAIN_UTF_8;
317        } else {
318            tracing::event!(Level::INFO, "upload is not plaintext");
319        }
320    }
321    .instrument(detect_plaintext_span)
322    .await;
323
324    let metadata = upload
325        .flush()
326        .await
327        .context("error flushing upload metadata to disk")?;
328    event!(Level::DEBUG, "flushed upload metadata to disk");
329
330    let url = state.base_url.join(&metadata.id).unwrap().to_string();
332    let direct_url = state.raw_url.join(&metadata.id).unwrap().to_string();
333    event!(
334        Level::INFO,
335        url,
336        filename = metadata.filename,
337        mimetype = %metadata.mimetype,
338        expiry = %metadata
339            .expiry_date
340            .map_or_else(|| String::from("never"), |e| e.to_string()),
341        "successfully created upload"
342    );
343    Ok((
344        StatusCode::CREATED,
345        [
346            (header::CONTENT_LOCATION, direct_url.clone()),
347            (header::LOCATION, url.clone()),
348        ],
349        Json(UploadResponse {
350            id,
351            url,
352            direct_url,
353            filename: metadata.filename,
354            mimetype: metadata.mimetype.to_string(),
355            expiry_date: metadata.expiry_date,
356            delete_key: metadata.delete_key,
357        }),
358    ))
359}