tower_http/trace/
on_eos.rs1use super::{Latency, DEFAULT_MESSAGE_LEVEL};
2use crate::{classify::grpc_errors_as_failures::ParsedGrpcStatus, LatencyUnit};
3use http::header::HeaderMap;
4use std::time::Duration;
5use tracing::{Level, Span};
6
7pub trait OnEos {
14    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span);
26}
27
28impl OnEos for () {
29    #[inline]
30    fn on_eos(self, _: Option<&HeaderMap>, _: Duration, _: &Span) {}
31}
32
33impl<F> OnEos for F
34where
35    F: FnOnce(Option<&HeaderMap>, Duration, &Span),
36{
37    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, span: &Span) {
38        self(trailers, stream_duration, span)
39    }
40}
41
42#[derive(Clone, Debug)]
46pub struct DefaultOnEos {
47    level: Level,
48    latency_unit: LatencyUnit,
49}
50
51impl Default for DefaultOnEos {
52    fn default() -> Self {
53        Self {
54            level: DEFAULT_MESSAGE_LEVEL,
55            latency_unit: LatencyUnit::Millis,
56        }
57    }
58}
59
60impl DefaultOnEos {
61    pub fn new() -> Self {
63        Self::default()
64    }
65
66    pub fn level(mut self, level: Level) -> Self {
73        self.level = level;
74        self
75    }
76
77    pub fn latency_unit(mut self, latency_unit: LatencyUnit) -> Self {
81        self.latency_unit = latency_unit;
82        self
83    }
84}
85
86impl OnEos for DefaultOnEos {
87    fn on_eos(self, trailers: Option<&HeaderMap>, stream_duration: Duration, _span: &Span) {
88        let stream_duration = Latency {
89            unit: self.latency_unit,
90            duration: stream_duration,
91        };
92        let status = trailers.and_then(|trailers| {
93            match crate::classify::grpc_errors_as_failures::classify_grpc_metadata(
94                trailers,
95                crate::classify::GrpcCode::Ok.into_bitmask(),
96            ) {
97                ParsedGrpcStatus::Success
98                | ParsedGrpcStatus::HeaderNotString
99                | ParsedGrpcStatus::HeaderNotInt => Some(0),
100                ParsedGrpcStatus::NonSuccess(status) => Some(status.get()),
101                ParsedGrpcStatus::GrpcStatusHeaderMissing => None,
102            }
103        });
104
105        event_dynamic_lvl!(self.level, %stream_duration, status, "end of stream");
106    }
107}