veecle_telemetry/collector/
mod.rs1#[cfg(feature = "std")]
26mod json_exporter;
27#[cfg(feature = "std")]
28mod pretty_exporter;
29#[cfg(feature = "std")]
30mod test_exporter;
31
32use core::fmt::Debug;
33#[cfg(feature = "enable")]
34use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
35use core::{error, fmt};
36
37#[cfg(feature = "std")]
38pub use json_exporter::ConsoleJsonExporter;
39#[cfg(feature = "std")]
40pub use pretty_exporter::ConsolePrettyExporter;
41#[cfg(feature = "std")]
42#[doc(hidden)]
43pub use test_exporter::TestExporter;
44
45use crate::TraceId;
46#[cfg(feature = "enable")]
47use crate::protocol::ExecutionId;
48use crate::protocol::InstanceMessage;
49#[cfg(feature = "enable")]
50use crate::protocol::{
51 LogMessage, SpanAddEventMessage, SpanAddLinkMessage, SpanCloseMessage, SpanCreateMessage,
52 SpanEnterMessage, SpanExitMessage, SpanSetAttributeMessage, TelemetryMessage, TracingMessage,
53};
54
55pub trait Export: Debug {
77 fn export(&self, message: InstanceMessage<'_>);
82}
83
84#[derive(Debug)]
93pub struct Collector {
94 #[cfg(feature = "enable")]
95 inner: CollectorInner,
96}
97
98#[cfg(feature = "enable")]
99#[derive(Debug)]
100struct CollectorInner {
101 execution_id: ExecutionId,
102
103 exporter: &'static (dyn Export + Sync),
104
105 trace_id_prefix: u64,
106 trace_id_counter: AtomicU64,
107}
108
109#[cfg(feature = "enable")]
110#[derive(Debug)]
111struct NopExporter;
112
113#[cfg(feature = "enable")]
114impl Export for NopExporter {
115 fn export(&self, _: InstanceMessage) {}
116}
117
118#[cfg(feature = "enable")]
121static mut GLOBAL_COLLECTOR: Collector = Collector {
122 inner: CollectorInner {
123 execution_id: ExecutionId::from_raw(0),
124 exporter: &NO_EXPORTER,
125
126 trace_id_prefix: 0,
127 trace_id_counter: AtomicU64::new(0),
128 },
129};
130static NO_COLLECTOR: Collector = Collector {
131 #[cfg(feature = "enable")]
132 inner: CollectorInner {
133 execution_id: ExecutionId::from_raw(0),
134 exporter: &NO_EXPORTER,
135
136 trace_id_prefix: 0,
137 trace_id_counter: AtomicU64::new(0),
138 },
139};
140#[cfg(feature = "enable")]
141static NO_EXPORTER: NopExporter = NopExporter;
142
143#[cfg(feature = "enable")]
144static GLOBAL_INIT: AtomicUsize = AtomicUsize::new(0);
145
146#[cfg(feature = "enable")]
151const UNINITIALIZED: usize = 0;
152#[cfg(feature = "enable")]
153const INITIALIZING: usize = 1;
154#[cfg(feature = "enable")]
155const INITIALIZED: usize = 2;
156
157#[cfg(feature = "enable")]
162pub fn set_exporter(
163 execution_id: ExecutionId,
164 exporter: &'static (dyn Export + Sync),
165) -> Result<(), SetExporterError> {
166 if GLOBAL_INIT
167 .compare_exchange(
168 UNINITIALIZED,
169 INITIALIZING,
170 Ordering::Acquire,
171 Ordering::Relaxed,
172 )
173 .is_ok()
174 {
175 unsafe { GLOBAL_COLLECTOR = Collector::new(execution_id, exporter) }
177 GLOBAL_INIT.store(INITIALIZED, Ordering::Release);
178
179 Ok(())
180 } else {
181 Err(SetExporterError(()))
182 }
183}
184
185pub fn get_collector() -> &'static Collector {
189 #[cfg(not(feature = "enable"))]
190 {
191 &NO_COLLECTOR
192 }
193
194 #[cfg(feature = "enable")]
203 if GLOBAL_INIT.load(Ordering::Acquire) != INITIALIZED {
204 &NO_COLLECTOR
205 } else {
206 unsafe {
208 #[expect(clippy::deref_addrof, reason = "false positive")]
209 &*&raw const GLOBAL_COLLECTOR
210 }
211 }
212}
213
214#[derive(Debug)]
218pub struct SetExporterError(());
219
220impl SetExporterError {
221 const MESSAGE: &'static str = "a global exporter has already been set";
222}
223
224impl fmt::Display for SetExporterError {
225 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
226 fmt.write_str(Self::MESSAGE)
227 }
228}
229
230impl error::Error for SetExporterError {}
231
232impl Collector {
233 #[cfg(feature = "enable")]
234 fn new(execution_id: ExecutionId, exporter: &'static (dyn Export + Sync)) -> Self {
235 let execution_id_raw = *execution_id;
236 let trace_id_prefix = (execution_id_raw >> 64) as u64;
237 let initial_counter_value = execution_id_raw as u64;
238
239 Self {
240 inner: CollectorInner {
241 execution_id,
242 exporter,
243 trace_id_prefix,
244 trace_id_counter: AtomicU64::new(initial_counter_value),
245 },
246 }
247 }
248
249 #[inline]
250 pub(crate) fn generate_trace_id(&self) -> TraceId {
251 #[cfg(not(feature = "enable"))]
252 {
253 TraceId(0)
254 }
255
256 #[cfg(feature = "enable")]
257 if self.inner.trace_id_prefix == 0 {
258 TraceId(0)
259 } else {
260 let suffix = self.inner.trace_id_counter.fetch_add(1, Ordering::Relaxed);
261
262 TraceId(((self.inner.trace_id_prefix as u128) << 32) | (suffix as u128))
263 }
264 }
265}
266
267#[cfg(feature = "enable")]
268impl Collector {
269 #[inline]
297 pub fn collect_external(&self, message: InstanceMessage<'_>) {
298 self.inner.exporter.export(message);
299 }
300
301 #[inline]
302 pub(crate) fn new_span(&self, span: SpanCreateMessage<'_>) {
303 self.tracing_message(TracingMessage::CreateSpan(span));
304 }
305
306 #[inline]
307 pub(crate) fn enter_span(&self, enter: SpanEnterMessage) {
308 self.tracing_message(TracingMessage::EnterSpan(enter));
309 }
310
311 #[inline]
312 pub(crate) fn exit_span(&self, exit: SpanExitMessage) {
313 self.tracing_message(TracingMessage::ExitSpan(exit));
314 }
315
316 #[inline]
317 pub(crate) fn close_span(&self, span: SpanCloseMessage) {
318 self.tracing_message(TracingMessage::CloseSpan(span));
319 }
320
321 #[inline]
322 pub(crate) fn span_event(&self, event: SpanAddEventMessage<'_>) {
323 self.tracing_message(TracingMessage::AddEvent(event));
324 }
325
326 #[inline]
327 pub(crate) fn span_link(&self, link: SpanAddLinkMessage) {
328 self.tracing_message(TracingMessage::AddLink(link));
329 }
330
331 #[inline]
332 pub(crate) fn span_attribute(&self, attribute: SpanSetAttributeMessage<'_>) {
333 self.tracing_message(TracingMessage::SetAttribute(attribute));
334 }
335
336 #[inline]
337 pub(crate) fn log_message(&self, log: LogMessage<'_>) {
338 self.inner.exporter.export(InstanceMessage {
339 execution: self.inner.execution_id,
340 message: TelemetryMessage::Log(log),
341 });
342 }
343
344 #[inline]
345 fn tracing_message(&self, message: TracingMessage<'_>) {
346 self.inner.exporter.export(InstanceMessage {
347 execution: self.inner.execution_id,
348 message: TelemetryMessage::Tracing(message),
349 });
350 }
351}