blackbox: change session id to u64

Summary:
When considering what to do with scuba logging, we'd like some ID to chain
tables together, and is also somehow friendly to scuba.

This diff adds such an ID. It's a combination of pid (3 bytes) and the
timestamp (5 bytes). The timestamp would give scuba a change to delta-compress
the IDs. The 3-byte pid would make it possible to still see the pids using the
`blackbox` command.

Reviewed By: markbt

Differential Revision: D15755285

fbshipit-source-id: ad2dcc49514bfcd9ef474fbe0d389b9865fb2001
This commit is contained in:
Jun Wu 2019-07-06 02:41:45 -07:00 committed by Facebook Github Bot
parent 0467d7ad61
commit 4d2530188b
3 changed files with 49 additions and 41 deletions

View File

@ -59,6 +59,7 @@ def blackbox(ui, repo, **opts):
ui.pager("blackbox")
sidcolor = {}
debugflag = ui.debugflag
for sid, ts, msg, json in reversed(events):
if showtimestamp:
localtime = time.localtime(ts)
@ -72,9 +73,12 @@ def blackbox(ui, repo, **opts):
if color is None:
color = len(sidcolor) % 4
sidcolor[sid] = color
if not debugflag:
# The lowest 3 bytes are "pid". See blackbox.rs.
sid = sid & 0xFFFFFF
ui.write("%10d" % sid, label="blackbox.session.%d" % color)
ui.write(" ")
if ui.debugflag:
if debugflag:
ui.write(json, label="blackbox.json")
else:
ui.write(msg.strip(), label="blackbox.message")

View File

@ -78,7 +78,7 @@ fn filter(
start: f64,
end: f64,
json: String,
) -> PyResult<Vec<(u32, f64, String, String)>> {
) -> PyResult<Vec<(u64, f64, String, String)>> {
if let Ok(blackbox) = blackbox::SINGLETON.lock() {
if let Some(blackbox) = blackbox.deref() {
// Blackbox uses millisecond integers. Translate seconds to milliseconds.

View File

@ -25,7 +25,7 @@ pub struct Blackbox {
opts: BlackboxOptions,
// An ID that can be "grouped by" to figure everything about a session.
session_id: u32,
session_id: u64,
// The on-disk files are considered bad (ex. no permissions, or no disk space)
// and further write attempts will be ignored.
@ -48,7 +48,7 @@ pub struct BlackboxOptions {
#[derive(Debug)]
pub struct Entry<T> {
pub timestamp: u64,
pub session_id: u32,
pub session_id: u64,
pub data: T,
// Prevent constructing `Entry` directly.
@ -63,7 +63,7 @@ pub trait ToValue {
/// Specify how to filter entries by indexes. Input of [`Blackbox::filter`].
pub enum IndexFilter {
/// Filter by session ID.
SessionId(u32),
SessionId(u64),
/// Filter by time range.
Time(u64, u64),
@ -82,7 +82,7 @@ pub enum IndexFilter {
// renaming the directory used for logging.
const TIMESTAMP_BYTES: usize = 8;
const SESSION_ID_BYTES: usize = 4;
const SESSION_ID_BYTES: usize = 8;
const HEADER_BYTES: usize = TIMESTAMP_BYTES + SESSION_ID_BYTES;
impl BlackboxOptions {
@ -95,7 +95,7 @@ impl BlackboxOptions {
.index("timestamp", |_| {
vec![IndexOutput::Reference(0..TIMESTAMP_BYTES as u64)]
})
.index("id", |_| {
.index("session_id", |_| {
vec![IndexOutput::Reference(
TIMESTAMP_BYTES as u64..HEADER_BYTES as u64,
)]
@ -110,15 +110,14 @@ impl BlackboxOptions {
}
Ok(log) => log,
};
let mut blackbox = Blackbox {
let blackbox = Blackbox {
log,
opts: self,
// pid is used as an initial guess of "unique" session id
session_id: unsafe { libc::getpid() } as u32,
session_id: new_session_id(),
is_broken: Cell::new(false),
last_write_time: Cell::new(0),
};
blackbox.refresh_session_id();
Ok(blackbox)
}
@ -150,19 +149,16 @@ impl Blackbox {
///
/// Currently, uniqueness is not guaranteed, but perhaps "good enough".
pub fn refresh_session_id(&mut self) {
loop {
if let Ok(mut iter) = self
.log
.lookup(INDEX_SESSION_ID, &u32_to_slice(self.session_id)[..])
{
if let Some(Ok(_)) = iter.next() {
// Try a different ID.
self.session_id = rand::random();
continue;
let session_id = new_session_id();
if self.session_id >= session_id {
self.session_id += 1 << 23;
} else {
self.session_id = session_id;
}
}
break;
}
pub fn session_id(&self) -> u64 {
self.session_id
}
/// Log an event. Write it to disk immediately.
@ -262,7 +258,7 @@ impl<'a, T: Deserialize<'a>> Entry<T> {
if bytes.len() >= HEADER_BYTES {
let mut cur = Cursor::new(bytes);
let timestamp = cur.read_u64::<BigEndian>().unwrap();
let session_id = cur.read_u32::<BigEndian>().unwrap();
let session_id = cur.read_u64::<BigEndian>().unwrap();
let pos = cur.position();
let bytes = cur.into_inner();
let bytes = &bytes[pos as usize..];
@ -281,10 +277,10 @@ impl<'a, T: Deserialize<'a>> Entry<T> {
}
impl<T: Serialize> Entry<T> {
fn to_vec(data: &T, timestamp: u64, session_id: u32) -> Option<Vec<u8>> {
fn to_vec(data: &T, timestamp: u64, session_id: u64) -> Option<Vec<u8>> {
let mut buf = Vec::with_capacity(32);
buf.write_u64::<BigEndian>(timestamp).unwrap();
buf.write_u32::<BigEndian>(session_id).unwrap();
buf.write_u64::<BigEndian>(session_id).unwrap();
if serde_cbor::to_writer(&mut buf, data).is_ok() {
Some(buf)
@ -306,8 +302,8 @@ impl IndexFilter {
fn index_range(&self) -> (Box<[u8]>, Box<[u8]>) {
match self {
IndexFilter::SessionId(id) => (
u32_to_slice(*id).to_vec().into_boxed_slice(),
u32_to_slice(*id + 1).to_vec().into_boxed_slice(),
u64_to_slice(*id).to_vec().into_boxed_slice(),
u64_to_slice(*id + 1).to_vec().into_boxed_slice(),
),
IndexFilter::Time(start, end) => (
u64_to_slice(*start).to_vec().into_boxed_slice(),
@ -342,16 +338,30 @@ fn u64_to_slice(value: u64) -> [u8; 8] {
unsafe { std::mem::transmute(value.to_be()) }
}
fn u32_to_slice(value: u32) -> [u8; 4] {
unsafe { std::mem::transmute(value.to_be()) }
}
fn time_to_u64(time: &SystemTime) -> u64 {
time.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
// The session_id is intended to be:
// 1. Somehow unique among multiple machines for at least 3 months
// (for analysis over time).
// 2. Related to timestamp. So Scuba might be able to delta-compress them.
//
// At the time of writing, millisecond percision seems already enough to
// distinguish sessions across machines. To make it more "future proof", take
// some bits from the pid.
//
// At the time of writing, /proc/sys/kernel/pid_max shows pid can fit in 3
// bytes.
fn new_session_id() -> u64 {
// 40 bits from millisecond timestamp. That's 34 years.
// 24 bits from pid.
((time_to_u64(&SystemTime::now()) & 0xffffffffff) << 24)
| ((unsafe { libc::getpid() } as u64) & 0xffffff)
}
#[cfg(test)]
mod tests {
use super::*;
@ -383,6 +393,7 @@ mod tests {
let events = vec![Event::A(0), Event::B("Foo".to_string()), Event::A(12)];
let session_count = 4;
let first_session_id = blackbox.session_id();
for _ in 0..session_count {
for event in events.iter() {
blackbox.log(event);
@ -393,11 +404,10 @@ mod tests {
}
let time_end = SystemTime::now();
// Test find by session id (pid if no conflict).
let pid = unsafe { libc::getpid() } as u32;
// Test find by session id.
assert_eq!(
blackbox
.filter::<Event>(IndexFilter::SessionId(pid), None)
.filter::<Event>(IndexFilter::SessionId(first_session_id), None)
.len(),
events.len()
);
@ -431,12 +441,6 @@ mod tests {
// Check logging with multiple blackboxes.
let blackbox = BlackboxOptions::new().open(&dir.path().join("2")).unwrap();
assert_eq!(
blackbox
.filter::<Event>(IndexFilter::SessionId(pid), None)
.len(),
1
);
assert_eq!(
blackbox.filter::<Event>(IndexFilter::Nop, None).len(),
entries.len()
@ -461,7 +465,7 @@ mod tests {
// Corrupt log
let log_path = dir.path().join("0").join("log");
backup(&log_path);
for (bytes, corrupted_count) in [(1, 1), (60, 2), (160, 4)].iter() {
for (bytes, corrupted_count) in [(1, 1), (60, 2), (160, 3)].iter() {
// Corrupt the last few bytes.
corrupt(&log_path, *bytes);