generalize a parallelize method for timers, use for all trip viz

This commit is contained in:
Dustin Carlino 2019-05-25 19:11:29 -07:00
parent bc69a6b4d4
commit 40efcc7b05
6 changed files with 52 additions and 44 deletions

View File

@ -7,8 +7,10 @@ edition = "2018"
[dependencies] [dependencies]
bincode = "1.1.2" bincode = "1.1.2"
lazy_static = "1.3.0" lazy_static = "1.3.0"
num_cpus = "1.10.0"
rand = { version = "0.6.5", features = ["serde1"] } rand = { version = "0.6.5", features = ["serde1"] }
rand_xorshift = "0.1.1" rand_xorshift = "0.1.1"
scoped_threadpool = "0.1.9"
serde = "1.0.89" serde = "1.0.89"
serde_derive = "1.0.89" serde_derive = "1.0.89"
serde_json = "1.0.39" serde_json = "1.0.39"

View File

@ -283,6 +283,39 @@ impl<'a> Timer<'a> {
} }
} }
} }
pub fn parallelize<I, O, F: Fn(I) -> O>(
&mut self,
timer_name: &str,
requests: Vec<I>,
cb: F,
) -> Vec<O>
where
I: Send,
O: Send,
F: Send + Clone + Copy,
{
scoped_threadpool::Pool::new(num_cpus::get() as u32).scoped(|scope| {
let (tx, rx) = std::sync::mpsc::channel();
let mut results: Vec<Option<O>> = std::iter::repeat_with(|| None)
.take(requests.len())
.collect();
for (idx, req) in requests.into_iter().enumerate() {
let tx = tx.clone();
scope.execute(move || {
tx.send((idx, cb(req))).unwrap();
});
}
drop(tx);
self.start_iter(timer_name, results.len());
for (idx, result) in rx.iter() {
self.next();
results[idx] = Some(result);
}
results.into_iter().map(|x| x.unwrap()).collect()
})
}
} }
impl<'a> std::ops::Drop for Timer<'a> { impl<'a> std::ops::Drop for Timer<'a> {

View File

@ -22,21 +22,22 @@ impl TripsVisualizer {
let popdat: PopDat = abstutil::read_binary("../data/shapes/popdat", &mut timer) let popdat: PopDat = abstutil::read_binary("../data/shapes/popdat", &mut timer)
.expect("Couldn't load popdat"); .expect("Couldn't load popdat");
let mut all_trips = clip_trips(&popdat, ui, 1_000, &mut timer); let mut all_trips = clip_trips(&popdat, ui, 1_000, &mut timer);
let requests = all_trips let map = &ui.primary.map;
.iter() let routes = timer.parallelize(
.map(|trip| trip.path_req(&ui.primary.map)) "calculate paths with geometry",
.collect(); all_trips.iter().map(|trip| trip.path_req(map)).collect(),
let paths = ui.primary.map.calculate_paths(requests, &mut timer); |req| {
(
req.clone(),
map.pathfind(req.clone())
.and_then(|path| path.trace(map, req.start.dist_along(), None)),
)
},
);
let mut final_trips = Vec::new(); let mut final_trips = Vec::new();
timer.start_iter("route geometry", paths.len()); for (mut trip, (req, maybe_route)) in all_trips.drain(..).zip(routes) {
for (mut trip, (req, maybe_path)) in all_trips.drain(..).zip(paths) { if let Some(route) = maybe_route {
timer.next();
// TODO path.trace is slow too and could be parallelized. Generalize
// calculate_paths to just execute a callback and do the nice timer management.
if let Some(route) = maybe_path
.and_then(|path| path.trace(&ui.primary.map, req.start.dist_along(), None))
{
trip.route = Some(route); trip.route = Some(route);
final_trips.push(trip); final_trips.push(trip);
} else { } else {

View File

@ -10,10 +10,8 @@ abstutil = { path = "../abstutil" }
geom = { path = "../geom" } geom = { path = "../geom" }
gtfs = { path = "../gtfs" } gtfs = { path = "../gtfs" }
nbez = "0.1.0" nbez = "0.1.0"
num_cpus = "1.10.0"
ordered-float = "1.0.1" ordered-float = "1.0.1"
petgraph = { version = "0.4.13", features = ["serde-1"] } petgraph = { version = "0.4.13", features = ["serde-1"] }
pretty_assertions = "0.6.1" pretty_assertions = "0.6.1"
scoped_threadpool = "0.1.9"
serde = "1.0.89" serde = "1.0.89"
serde_derive = "1.0.89" serde_derive = "1.0.89"

View File

@ -697,31 +697,4 @@ impl Map {
side2[idx] side2[idx]
} }
} }
// Parallelizes pathfind()
pub fn calculate_paths(
&self,
requests: Vec<PathRequest>,
timer: &mut Timer,
) -> Vec<(PathRequest, Option<Path>)> {
scoped_threadpool::Pool::new(num_cpus::get() as u32).scoped(|scope| {
let (tx, rx) = std::sync::mpsc::channel();
let mut results: Vec<(PathRequest, Option<Path>)> = Vec::new();
for (idx, req) in requests.into_iter().enumerate() {
results.push((req.clone(), None));
let tx = tx.clone();
scope.execute(move || {
tx.send((idx, self.pathfind(req))).unwrap();
});
}
drop(tx);
timer.start_iter("calculate paths", results.len());
for (idx, path) in rx.iter() {
timer.next();
results[idx].1 = path;
}
results
})
}
} }

View File

@ -155,12 +155,13 @@ impl TripSpawner {
timer: &mut Timer, timer: &mut Timer,
retry_if_no_room: bool, retry_if_no_room: bool,
) { ) {
let paths = map.calculate_paths( let paths = timer.parallelize(
"calculate paths",
self.trips self.trips
.iter() .iter()
.map(|(_, _, _, spec)| spec.get_pathfinding_request(map, parking)) .map(|(_, _, _, spec)| spec.get_pathfinding_request(map, parking))
.collect(), .collect(),
timer, |req| (req.clone(), map.pathfind(req)),
); );
for ((start_time, ped_id, car_id, spec), (req, maybe_path)) in for ((start_time, ped_id, car_id, spec), (req, maybe_path)) in
self.trips.drain(..).zip(paths) self.trips.drain(..).zip(paths)