1use crate::tools::tools;
23use chrono::{Datelike, TimeZone, Timelike, Utc};
24use derive_more::{Add, Div, Mul, Neg, Sub};
25use serde::{Deserialize, Serialize};
26use std::fmt;
27use std::ops::{Add, Mul, Sub, SubAssign};
28use std::time::Duration;
29
30pub const TIME_MILLIS_BYTES: usize = size_of::<TimeMillis>();
31#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
32pub struct TimeMillisBytes(pub [u8; TIME_MILLIS_BYTES]);
33
34impl TimeMillisBytes {
35 pub(crate) fn as_ref(&self) -> &[u8] {
36 self.0.as_ref()
37 }
38
39 pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
40 if bytes.len() != TIME_MILLIS_BYTES {
41 anyhow::bail!("Invalid time millis bytes length: {}", bytes.len());
42 }
43
44 Ok(Self(bytes.try_into()?))
45 }
46}
47
48#[derive(Ord, PartialOrd, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)]
49pub struct TimeMillis(pub i64);
50
51impl TimeMillis {
52 pub fn zero() -> Self {
53 Self(0)
54 }
55 pub const MAX: Self = Self(i64::MAX);
56 pub const MIN: Self = Self(i64::MIN);
57
58 pub fn random() -> TimeMillis {
59 TimeMillis(tools::random_u32() as i64)
60 }
61
62 pub fn saturating_add(self, other: DurationMillis) -> TimeMillis {
63 TimeMillis(self.0.saturating_add(other.0))
64 }
65
66 pub fn saturating_sub_duration(self, other: DurationMillis) -> TimeMillis {
67 TimeMillis(self.0.saturating_sub(other.0))
68 }
69 pub fn saturating_sub_time(self, other: TimeMillis) -> DurationMillis {
70 DurationMillis(self.0.saturating_sub(other.0))
71 }
72
73 pub fn as_secs(&self) -> i64 {
74 self.0 / 1000
75 }
76
77 pub fn part_nanos(&self) -> i64 {
78 (self.0 % 1000) * 1_000_000
79 }
80
81 pub fn encode_be(self) -> TimeMillisBytes {
82 TimeMillisBytes(i64::to_be_bytes(self.0))
83 }
84 pub fn timestamp_decode_be(timestamp_bytes: &TimeMillisBytes) -> Self {
85 let time_millis = i64::from_be_bytes(timestamp_bytes.0);
86 Self(time_millis)
87 }
88
89 pub fn from_epoch_offset_str(duration_millis_str: &str) -> anyhow::Result<Self> {
90 Ok(Self::zero() + DurationMillis::parse(duration_millis_str)?)
91 }
92
93 pub fn parse(s: &str) -> anyhow::Result<Self> {
95 let parts: Vec<&str> = s.split('.').collect();
96 anyhow::ensure!(parts.len() == 3 && parts[0].len() >= 8 && parts[1].len() == 6 && parts[2].len() == 3,
97 "Invalid TimeMillis string (expected YYYYMMDD.HHMMSS.mmm): {:?}", s);
98
99 let year: i32 = parts[0][..parts[0].len()-4].parse()?;
100 let month: u32 = parts[0][parts[0].len()-4..parts[0].len()-2].parse()?;
101 let day: u32 = parts[0][parts[0].len()-2..].parse()?;
102 let hour: u32 = parts[1][0..2].parse()?;
103 let minute: u32 = parts[1][2..4].parse()?;
104 let second: u32 = parts[1][4..6].parse()?;
105 let millis: i64 = parts[2].parse()?;
106
107 let dt = Utc.with_ymd_and_hms(year, month, day, hour, minute, second)
108 .single()
109 .ok_or_else(|| anyhow::anyhow!("Invalid UTC date/time in TimeMillis string: {:?}", s))?;
110
111 Ok(Self(dt.timestamp() * 1000 + millis))
112 }
113}
114
115impl fmt::Display for TimeMillis {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 let millis = self.0;
118 let secs = millis / 1000;
119 let millis_part = millis % 1000;
120
121 let dt = Utc.timestamp_opt(secs, 0);
123 match dt {
124 chrono::LocalResult::Single(dt) => write!(f, "{}{:02}{:02}.{:02}{:02}{:02}.{:03}", dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), dt.second(), millis_part),
125 chrono::LocalResult::Ambiguous(_dt_earlier, dt) => write!(f, "{}{:02}{:02}.{:02}{:02}{:02}.{:03} (ambiguous)", dt.year(), dt.month(), dt.day(), dt.hour(), dt.minute(), dt.second(), millis_part),
126 _ => write!(f, "<invalid-timestamp:{}>", self.0),
127 }
128 }
129}
130
131impl fmt::Debug for TimeMillis {
132 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133 fmt::Display::fmt(self, f)
134 }
135}
136
137impl Sub<TimeMillis> for TimeMillis {
138 type Output = DurationMillis;
139
140 fn sub(self, rhs: TimeMillis) -> Self::Output {
141 DurationMillis(self.0.saturating_sub(rhs.0))
142 }
143}
144
145impl SubAssign<DurationMillis> for TimeMillis {
146 fn sub_assign(&mut self, rhs: DurationMillis) {
147 self.0 = self.0.saturating_sub(rhs.0);
148 }
149}
150
151impl Add<DurationMillis> for TimeMillis {
152 type Output = TimeMillis;
153
154 fn add(self, rhs: DurationMillis) -> Self::Output {
155 Self(self.0.saturating_add(rhs.0))
156 }
157}
158
159impl Sub<DurationMillis> for TimeMillis {
160 type Output = TimeMillis;
161
162 fn sub(self, rhs: DurationMillis) -> Self::Output {
163 Self(self.0.saturating_sub(rhs.0))
164 }
165}
166
167impl Add<Duration> for TimeMillis {
168 type Output = TimeMillis;
169
170 fn add(self, rhs: Duration) -> Self::Output {
171 Self(self.0.saturating_add(rhs.as_millis() as i64))
172 }
173}
174impl Mul<Duration> for TimeMillis {
175 type Output = TimeMillis;
176
177 fn mul(self, rhs: Duration) -> Self::Output {
178 Self(self.0.saturating_mul(rhs.as_millis() as i64))
179 }
180}
181
182impl From<std::time::SystemTime> for TimeMillis {
183 fn from(time: std::time::SystemTime) -> Self {
184 TimeMillis(time.duration_since(std::time::SystemTime::UNIX_EPOCH).unwrap_or_default().as_millis() as i64)
185 }
186}
187
188pub const DURATION_MILLIS_BYTES: usize = size_of::<DurationMillis>();
189#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)]
190pub struct DurationMillisBytes(pub [u8; DURATION_MILLIS_BYTES]);
191
192impl DurationMillisBytes {
193 pub(crate) fn as_ref(&self) -> &[u8] {
194 self.0.as_ref()
195 }
196
197 pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
198 if bytes.len() != DURATION_MILLIS_BYTES {
199 anyhow::bail!("Invalid duration millis bytes length: {}", bytes.len());
200 }
201
202 Ok(Self(bytes.try_into()?))
203 }
204}
205
206#[derive(Ord, PartialOrd, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash, Add, Sub, Neg, Mul, Div)]
207pub struct DurationMillis(pub i64);
208
209impl DurationMillis {
210 pub fn zero() -> Self {
211 Self(0)
212 }
213 pub fn encode_be(self) -> DurationMillisBytes {
214 DurationMillisBytes(i64::to_be_bytes(self.0))
215 }
216
217 pub const fn const_add(self, rhs: DurationMillis) -> DurationMillis {
219 DurationMillis(self.0 + rhs.0)
220 }
221 pub const fn const_mul(&self, rhs: i64) -> DurationMillis {
222 Self(self.0 * rhs)
223 }
224
225 pub const fn abs(self) -> DurationMillis {
226 Self(self.0.abs())
227 }
228
229 pub fn parse(input: &str) -> anyhow::Result<Self> {
234 let s = input.trim();
235 if s.is_empty() {
236 return Ok(DurationMillis::zero());
237 }
238 if s == "0" || s == "+0" || s == "-0" {
239 return Ok(DurationMillis::zero());
240 }
241
242 let mut idx = 0usize;
243 let bytes = s.as_bytes();
244
245 let mut sign: i64 = 1;
246 if bytes[0] == b'+' {
247 idx += 1;
248 } else if bytes[0] == b'-' {
249 sign = -1;
250 idx += 1;
251 }
252
253 if idx >= s.len() {
254 anyhow::bail!("Invalid duration: {:?}", input);
255 }
256
257 let mut total: i64 = 0;
258 let mut saw_any = false;
259
260 while idx < s.len() {
261 let num_start = idx;
263 while idx < s.len() {
264 let c = bytes[idx];
265
266 if c.is_ascii_digit() {
267 idx += 1;
268 } else {
269 break;
270 }
271 }
272 if idx == num_start {
273 anyhow::bail!("Invalid duration: expected number at {:?}", &s[idx..]);
274 }
275
276 let n: i64 = s[num_start..idx]
277 .parse()
278 .map_err(|e| anyhow::anyhow!("Invalid duration number {:?}: {}", &s[num_start..idx], e))?;
279
280 if idx >= s.len() {
281 anyhow::bail!("Invalid duration: missing unit after {}", n);
282 }
283
284 let rest = &s[idx..];
286 let (unit_millis, unit_len) = if rest.starts_with("μs") {
287 (MILLIS_IN_MILLISECOND.0, "μs".len())
288 } else if rest.starts_with('M') {
289 (MILLIS_IN_MONTH.0, 1)
290 } else if rest.starts_with('W') {
291 (MILLIS_IN_WEEK.0, 1)
292 } else if rest.starts_with('D') {
293 (MILLIS_IN_DAY.0, 1)
294 } else if rest.starts_with('h') {
295 (MILLIS_IN_HOUR.0, 1)
296 } else if rest.starts_with('m') {
297 (MILLIS_IN_MINUTE.0, 1)
298 } else if rest.starts_with('s') {
299 (MILLIS_IN_SECOND.0, 1)
300 } else {
301 anyhow::bail!("Invalid duration: unknown unit at {:?}", rest);
302 };
303
304 idx += unit_len;
306
307 let chunk = n
309 .checked_mul(unit_millis)
310 .ok_or_else(|| anyhow::anyhow!("Duration overflow"))?;
311 total = total
312 .checked_add(chunk)
313 .ok_or_else(|| anyhow::anyhow!("Duration overflow"))?;
314
315 saw_any = true;
316 }
317
318 if !saw_any {
319 anyhow::bail!("Invalid duration: {:?}", input);
320 }
321
322 Ok(DurationMillis(
323 total
324 .checked_mul(sign)
325 .ok_or_else(|| anyhow::anyhow!("Duration overflow"))?,
326 ))
327 }
328}
329
330pub const MILLIS_IN_MILLISECOND: DurationMillis = DurationMillis(1);
331pub const MILLIS_IN_SECOND: DurationMillis = MILLIS_IN_MILLISECOND.const_mul(1000);
332pub const MILLIS_IN_MINUTE: DurationMillis = MILLIS_IN_SECOND.const_mul(60);
333pub const MILLIS_IN_HOUR: DurationMillis = MILLIS_IN_MINUTE.const_mul(60);
334pub const MILLIS_IN_DAY: DurationMillis = MILLIS_IN_HOUR.const_mul(24);
335pub const MILLIS_IN_WEEK: DurationMillis = MILLIS_IN_DAY.const_mul(7);
336pub const MILLIS_IN_MONTH: DurationMillis = MILLIS_IN_WEEK.const_mul(4); pub const MILLIS_IN_YEAR: DurationMillis = MILLIS_IN_MONTH.const_mul(12); impl fmt::Display for DurationMillis {
340 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
341 let mut s = String::with_capacity(32);
342
343 if self.0 < 0 {
344 s.push('-');
345 }
346
347 let mut remaining = self.0.saturating_abs();
348
349 let mut do_unit = |span: DurationMillis, descr: &str| {
350 if remaining >= span.0 {
351 let units = remaining / span.0;
352 remaining -= units * span.0;
353 s.push_str(&format!("{}{}", units, descr))
354 }
355 };
356
357 do_unit(MILLIS_IN_MONTH, "M");
358 do_unit(MILLIS_IN_WEEK, "W");
359 do_unit(MILLIS_IN_DAY, "D");
360 do_unit(MILLIS_IN_HOUR, "h");
361 do_unit(MILLIS_IN_MINUTE, "m");
362 do_unit(MILLIS_IN_SECOND, "s");
363 do_unit(MILLIS_IN_MILLISECOND, "μs"); if s.is_empty() || s == "-" {
366 s = "0".to_string()
367 }
368
369 write!(f, "{}", s)
370 }
371}
372
373impl fmt::Debug for DurationMillis {
374 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
375 fmt::Display::fmt(self, f)
376 }
377}
378
379
380impl From<DurationMillis> for Duration {
381 fn from(value: DurationMillis) -> Self {
382 if value.0 <= 0 {
383 Duration::ZERO
384 } else {
385 Duration::from_millis(value.0 as u64)
386 }
387 }
388}
389
390pub fn to_bucket(timestamp: &TimeMillis, bucket_size: DurationMillis) -> TimeMillis {
391 TimeMillis((timestamp.0 / bucket_size.0) * bucket_size.0)
392}
393
394#[cfg(test)]
395mod tests {
396 use crate::tools::time::{MILLIS_IN_HOUR, MILLIS_IN_MINUTE, MILLIS_IN_MONTH, DurationMillis, MILLIS_IN_SECOND, MILLIS_IN_WEEK, MILLIS_IN_DAY, TimeMillis};
397
398 #[tokio::test]
399 async fn duration_display_test() {
400 assert_eq!(MILLIS_IN_MONTH.to_string(), "1M");
401 assert_eq!(MILLIS_IN_HOUR.to_string(), "1h");
402 assert_eq!((MILLIS_IN_HOUR.const_mul(3) + MILLIS_IN_MINUTE.const_mul(27)).to_string(), "3h27m");
403 assert_eq!((MILLIS_IN_HOUR.const_mul(26) + MILLIS_IN_MINUTE.const_mul(67)).to_string(), "1D3h7m");
404 assert_eq!((-MILLIS_IN_SECOND.const_mul(5)).to_string(), "-5s");
406 assert_eq!((-MILLIS_IN_MINUTE.const_mul(2) + -MILLIS_IN_SECOND.const_mul(5)).to_string(), "-2m5s");
407 assert_eq!(DurationMillis::zero().to_string(), "0");
408 }
409
410 #[test]
411 fn duration_parse_roundtrip_examples() -> anyhow::Result<()> {
412 let d1 = MILLIS_IN_WEEK.const_mul(3) + MILLIS_IN_DAY.const_mul(3);
413 assert_eq!(DurationMillis::parse(&d1.to_string())?, d1);
414
415 let d2 = MILLIS_IN_WEEK.const_mul(2) + MILLIS_IN_DAY.const_mul(5);
416 assert_eq!(DurationMillis::parse(&d2.to_string())?, d2);
417
418 assert_eq!(DurationMillis::parse("")?, DurationMillis::zero());
419 assert_eq!(DurationMillis::parse("0")?, DurationMillis::zero());
420
421 Ok(())
422 }
423
424 #[test]
425 fn duration_parse_negative() -> anyhow::Result<()> {
426 let d = -(MILLIS_IN_SECOND.const_mul(5) + MILLIS_IN_MINUTE.const_mul(2));
427 assert_eq!(DurationMillis::parse("-2m5s")?, d);
428 Ok(())
429 }
430
431 #[test]
432 fn time_millis_display() {
433 assert_eq!(TimeMillis(0).to_string(), "19700101.000000.000"); assert_eq!(TimeMillis(1_700_000_000_000).to_string(), "20231114.221320.000"); assert_eq!(TimeMillis(1_672_531_200_000).to_string(), "20230101.000000.000"); assert_eq!(TimeMillis(1_700_000_000_123).to_string(), "20231114.221320.123"); assert_eq!(TimeMillis(1_700_000_000_999).to_string(), "20231114.221320.999"); }
440
441 #[test]
442 fn time_millis_parse_roundtrip() -> anyhow::Result<()> {
443 let cases = [
444 TimeMillis(0), TimeMillis(1_700_000_000_000), TimeMillis(1_672_531_200_000), TimeMillis(1_700_000_000_123), TimeMillis(1_700_000_000_999), ];
450 for t in cases {
451 let s = t.to_string();
452 let parsed = TimeMillis::parse(&s)?;
453 assert_eq!(t, parsed, "round-trip failed for {} (string: {:?})", t.0, s);
454 }
455 Ok(())
456 }
457
458 #[test]
459 fn time_millis_parse_known_strings() -> anyhow::Result<()> {
460 assert_eq!(TimeMillis::parse("19700101.000000.000")?, TimeMillis(0));
461 assert_eq!(TimeMillis::parse("20231114.221320.000")?, TimeMillis(1_700_000_000_000));
462 assert_eq!(TimeMillis::parse("20231114.221320.123")?, TimeMillis(1_700_000_000_123));
463 Ok(())
464 }
465
466 #[test]
467 fn time_millis_parse_errors() {
468 assert!(TimeMillis::parse("").is_err());
469 assert!(TimeMillis::parse("20231114.221320").is_err()); assert!(TimeMillis::parse("20231114.221320.1234").is_err()); assert!(TimeMillis::parse("20231114.22132x.000").is_err()); assert!(TimeMillis::parse("20231332.000000.000").is_err()); }
474
475 #[test]
476 fn time_millis_arithmetic_saturates_on_overflow() {
477 assert_eq!(TimeMillis::MIN - TimeMillis::MAX, DurationMillis(i64::MIN));
479 assert_eq!(TimeMillis(0) - TimeMillis(1), DurationMillis(-1));
480 assert_eq!(TimeMillis::MAX - TimeMillis::MIN, DurationMillis(i64::MAX));
481
482 assert_eq!(TimeMillis::MIN - DurationMillis(i64::MAX), TimeMillis::MIN);
484 assert_eq!(TimeMillis(0) - DurationMillis(1), TimeMillis(-1));
485
486 assert_eq!(TimeMillis::MAX + DurationMillis(1), TimeMillis::MAX);
488 assert_eq!(TimeMillis::MIN + DurationMillis(-1), TimeMillis::MIN);
489
490 let mut time = TimeMillis::MIN;
492 time -= DurationMillis(1);
493 assert_eq!(time, TimeMillis::MIN);
494 }
495}