Summary:
This diff sets two Rust lints to warn in fbcode:
```
[rust]
warn_lints = bare_trait_objects, ellipsis_inclusive_range_patterns
```
and fixes occurrences of those warnings within common/rust, hg, and mononoke.
Both of these lints are set to warn by default starting with rustc 1.37. Enabling them early avoids writing even more new code that needs to be fixed when we pull in 1.37 in six weeks.
Upstream tracking issue: https://github.com/rust-lang/rust/issues/54910
Reviewed By: Imxset21
Differential Revision: D16200291
fbshipit-source-id: aca11a7a944e9fa95f94e226b52f6f053b97ec74
Summary:
Previously:
```
error[E0599]: no method named `boxify` found for type `futures::future::result_::FutureResult<_, _>` in the current scope
--> experimental/dtolnay/thttp/main.rs:23:21
|
23 | let transport = try_boxfuture!(HttpClient::new(ENDPOINT));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= help: items from traits can only be used if the trait is in scope
= note: this error originates in a macro outside of the current crate (in Nightly builds, run with -Z external-macro-backtrace for more info)
help: the following trait is implemented but not in scope, perhaps add a `use` for it:
|
1 | use futures_ext::FutureExt;
|
```
Reviewed By: Imxset21
Differential Revision: D16134586
fbshipit-source-id: 324d2c7223c5e71ffc1f4390cd8fc0b488243c00
Summary:
When the underlying stream in a `BytesStream` hits EOF, and the `BytesStream` is asked for more data, it'll busy loop until more data becomes available.
This happens because when the `BytesStream` hits EOF on the underlying stream, it'll record that the stream is done, and won't ever poll the underlying stream again in `poll_buffer`. However, in `poll_buffer_until`, we essentially busy loop through calls to `poll_buffer`
When that happens, the process goes into an infinite loop of:
- Checking that it doesn't have enough bytes.
- Observing that the stream is done.
- Calling `poll_buffer` and not making any progress.
Instead, the right behavior would be to return a successful read of length zero in `read()` to indicate EOF to the caller. This is what this patch does.
It's worth noting that even if the underlying stream returned more data after it reported it was exhausted (callers should avoid polling them again ... or use `fuse()`), the `BytesStream` won't ever poll for that data, since it skips the poll because `stream_done` is set.
Reviewed By: farnz
Differential Revision: D16130672
fbshipit-source-id: a61c39feb1aa1ac74bbef909e47d698051477533
Summary:
```
pub fn top_level_launch<F>(future: F) -> Result<F::Item, F::Error>
where
F: Future + Send + 'static,
F::Item: Send,
F::Error: Send;
```
Starts the Tokio runtime using the supplied future to bootstrap the execution.
### Similar APIs
This function is equivalent to [`tokio::run`](https://docs.rs/tokio/0.1.22/tokio/fn.run.html) except that it also returns the future's resolved value as a Result. Thus it requires `F::Item: Send` and `F::Error: Send`.
This function has the same signature as [`Future::wait`](https://docs.rs/futures/0.1.28/futures/future/trait.Future.html#method.wait) which also goes from `F: Future` -> `Result<F::Item, F::Error>`, but `wait` requires an ambient futures runtime to already exist.
### Details
This function does the following:
- Start the Tokio runtime using a default configuration.
- Spawn the given future onto the thread pool.
- Block the current thread until the runtime shuts down.
- Send ownership of the future's resolved value back to the caller's thread.
Note that the function will not return immediately once the original future has completed. Instead it waits for the entire runtime to become idle.
### Panics
This function panics if called from the context of an executor.
Reviewed By: bolinfest
Differential Revision: D16116198
fbshipit-source-id: 52009e17c1ccc566e0c156a86f8fd5844e0e2491
Summary: Introduce `bounded_traversal_stream` operation with is similar to `bounde_traversal` but does not do `fold` hence it is not structure preserving, and returns stream as result.
Reviewed By: farnz
Differential Revision: D16108231
fbshipit-source-id: 3854ff5e18bcf2d7aa3dc75a2b66d27c59ea4f2c
Summary: After using it for sometime I found that it is more convenient to introduce addition parameter `OutCtx` which represent all information needed about node for `fold` operation instead of using `In`. Note this implementation is strictly more general, and isomorphic to previous one if `OutCtx == In`.
Reviewed By: farnz
Differential Revision: D16029193
fbshipit-source-id: f562baa023d737ce1db2936987f6c59bcd0c3761
Summary: Since we own `In` elements `unfold` can have mutable reference instead of immutable one. This can simplify some code, see `BlobRepo::find_entries_in_manifest` for example.
Reviewed By: farnz
Differential Revision: D15939878
fbshipit-source-id: 9c767240bec279f24f922e0771ac919072b3a56c
Summary:
I find that it is common pattern
```
// create unnecessary to reduce scope of lock guard
let output = {
let mut value = mutex_value.lock().expect("lock poisoned");
... // do some stuff here
};
```
This extension simplifies this pattern to
```
let output = mutex_value.with(|value| /* do some stuff here */);
```
Reviewed By: Imxset21, farnz
Differential Revision: D15577135
fbshipit-source-id: 6b22b20dda79e532ff5ec8ce75cda8b1c1404368
Summary: `bounded_traversal` traverses implicit asynchronous tree specified by `init` and `unfold` arguments, and it also does backward pass with `fold` operation. All `unfold` and `fold` operations are executed in parallel if they do not depend on each other (not related by ancestor-descendant relation in implicit tree) with amount of concurrency constrained by `scheduled_max`.
Reviewed By: jsgf
Differential Revision: D15197796
fbshipit-source-id: 1145497f5cb1c0effee47a4d27698bcf9d88f840
Summary:
We've hit an issue of slow pushes to Mononoke when a commit modifies a lot of
files (>500 in our case). turned out that the problem was in the fact that
we have only one master write connection open, and each blobstore write
requires a write to mysql because of multiplexed blobstore. Because we have
only one connection open all our mysql writes are serialized, and the push is
taking too much time. It's especially bad in non-master regions.
To mitigate the issue let's add a batching in the blobstore sync queue. When
clients call `blobstore_sync_queue.add(...)` we'll send this new entry via the
channel to a separate task that would send writes in batches. That allows us to
increase throughput significantly.
Reviewed By: jsgf
Differential Revision: D15248288
fbshipit-source-id: 22bab284b0cbe552b4b51bab4027813b4278fd14
Summary:
This test was failing consistently on my devserver when I was testing an update to tp2 crates.io. The update did not have any future or tokio changes. This test also fails on other devservers.
This test was added in D9561367.
The test fails on my devserver with:
```
test test::asynchronize_parallel ... FAILED
failures:
---- test::asynchronize_parallel stdout ----
thread 'test::asynchronize_parallel' panicked at 'Parallel sleep time 43.284909ms much greater than
40ms for 20 threads - each thread sleeps for 20ms', common/rust/futures-ext/src/lib.rs:741:9
note: Run with `RUST_BACKTRACE=1` for a backtrace.
failures:
test::asynchronize_parallel
```
Reviewed By: jsgf
Differential Revision: D14055989
fbshipit-source-id: 14a87a1dfe6ff7e273a08052695fd6c9c9ed37c7
Summary:
We've had this problem for almost a year now, and I've finally made some
progress.
The problem was in tests failing randomly with error like
```
- remote: * pushrebase failed * (glob)
- remote: msg: "pushrebase failed Conflicts([PushrebaseConflict { left: MPath(\"1\"), right: MPath(\"1\") }])"
+ remote: Jan 25 08:46:24.067 ERRO Error in hgcli proxy, error: Connection reset by peer (os error 104), root_cause: Os {
+ remote: code: 104,
+ remote: kind: ConnectionReset,
+ remote: message: "Connection reset by peer"
remote: * backtrace* (glob)
```
or
```
remote: * pushrebase failed * (glob)
remote: msg: "pushrebase failed Conflicts([PushrebaseConflict { left: MPath(\"1\"), right: MPath(\"1\") }])"
+ remote: Jan 25 08:47:59.966 ERRO Error in hgcli proxy, error: Connection reset by peer (os error 104), root_cause: Os {
+ remote: code: 104,
+ remote: kind: ConnectionReset,
+ remote: message: "Connection reset by peer"
+ remote: }, backtrace:
```
note that the problem are slightly different. In the first case the actual error message is completely lost + we get unnecessary
ConnectionReset problem message. In the second case it's just `ConnectionReset`.
This diff fixes the problem of the lost error message (problem #1) and hides `ConnectionReset` problem (problem #2).
Problem #1 was due to a bug in streamfork. Before this diff if streamfork hit
an error, then it might have not sent already received input to one of the
outputs. This diff fixes it.
This diff just hides Problem #2. If we see a ConnectionReset then an error
won't be reported. That's a hack which should be fixed, but at the moment
a) The bug is not easily debuggable
b) The problem is not urgent and shouldn't cause problems
In some cases server actually sends Connection reset, but in that case
mercurial stil gives us self-explanatory message
```
abort: stream ended unexpectedly (got 0 bytes, expected 4
``
Reviewed By: lukaspiatkowski
Differential Revision: D13818558
fbshipit-source-id: 7a2cba8cd0fcef8211451df3dea558fe2d60fa60
Summary:
Previously we had a timeout per session i.e. multiple wireproto command will
share the same timeout. It had a few disadvantages:
1) The main disadvantage was that if connection had timed out we didn't log
stats such as number of files, response size etc and we didn't log parameters
to scribe. The latter is even a bigger problem, because we usually want to
replay requests that were slow and timed out and not the requests that finished
quickly.
2) The less important disadvantage is that we have clients that do small
request from the server and then keep the connection open for a long time.
Eventually we kill the connection and log it as an error. With this change
the connection will be open until client closes it. That might potentially be
a problem, and if that's the case then we can reintroduce perconnection
timeout.
Initially I was planning to use tokio::util::timer to implement all the
timeouts, but it has different behaviour for stream - it only allows to set
per-item timeout, while we want timeout for the whole stream.
(https://docs.rs/tokio/0.1/tokio/timer/struct.Timeout.html#futures-and-streams)
To overcome it I implemented simple combinator StreamWithTimeout which does
exactly what I want.
Reviewed By: HarveyHunt
Differential Revision: D13731966
fbshipit-source-id: 211240267c7568cedd18af08155d94bf9246ecc3
Summary:
`asynchronize` does two conceptually separate things:
1. Given a closure that can do blocking I/O or is CPU heavy, create a future
that runs that closure inside a Tokio task.
2. Given a future, run it on a new Tokio task and shuffle the result back to
the caller via a channel.
Split these two things out into their own functions - one to make the future,
one to spawn it and recover the result. For now, this is no net change - but
`spawn_future` is likely to come in useful once we need more parallelism than
we get from I/O alone, and `closure_to_blocking_future` at least signals intent
when we allow a long-running function to take over a Tokio task.
Reviewed By: jsgf
Differential Revision: D9635812
fbshipit-source-id: e15aeeb305c8499219b89a542962cb7c4b740354
Summary:
`asynchronize` currently does not warn the event loop that it's
running blocking code, so we can end up starving the thread pool of threads.
We can't use `blocking` directly, because it won't spawn a synchronous task
onto a fresh Tokio task, so your "parallel" futures end up running in series.
Instead, use it inside `asynchronize` so that we can pick up extra threads in
the thread pool as and when we need them due to heavy load.
While in here, fix up `asynchronize` to only work on synchronous tasks and
push the boxing out one layer. Filenodes needs a specific change that's
worth extra eyes.
Reviewed By: jsgf
Differential Revision: D9631141
fbshipit-source-id: 06f79c4cb697288d3fadc96448a9173e38df425f
Summary:
We have suspect timings in Mononoke where `asynchronize` is used to
turn a blocking function into a future. Add a test case to ensure that
`asynchronize` itself cannot be causing accidental serialization.
Reviewed By: jsgf
Differential Revision: D9561367
fbshipit-source-id: 14f03e3f003f258450bb897498001050dee0b40d
Summary: The latest release of `tokio` updates `tokio::timer` to include a new `Timeout` type and a `.timeout()` method on `Future`s. As such, our internal implementation of `.timeout()` in `FutureExt` is no longer needed.
Reviewed By: jsgf
Differential Revision: D9617519
fbshipit-source-id: b84fd47a3ee4fc1f7c0a52e308317b93f28f04da
Summary:
It makes startup unbearably slow, and doesn't add any benefits at all. Revert
it
Reviewed By: purplefox
Differential Revision: D9358741
fbshipit-source-id: 26469941304f737c856a6ffca5e577848ad30955
Summary:
Should be functionally equivalent and semantically more appropriate
This also makes a couple of small API changes:
- The inner function is expected to just return a Result - IntoFuture is
overkill if its supposed to be synchronous in the first place
- `asynchronize` itself returns `impl Future` rather than being intrinsically
boxed.
- Restructure dieselfilenodes::add_filenodes to only asynchronize the insert
itself.
Reviewed By: farnz
Differential Revision: D8959535
fbshipit-source-id: fef9164e3be0069bd0d93573642cd57bb5babb73
Summary: I was surprised these didn't exist, but they work fine.
Reviewed By: Imxset21
Differential Revision: D9016243
fbshipit-source-id: 8405009f536b2e1aa0c9c28be59bb8c1d9ab7a4f
Summary: As in the comment to the macro: if the condition is not met, return a BoxFuture with Err inside
Reviewed By: farnz
Differential Revision: D8877731
fbshipit-source-id: 7f31a1739155201ea2be30901b8cda2511f49b03
Summary: Iterating over the code on server is a bit painful and it has grown a lot, splitting it should speed up future refactories and make it more maintainable
Reviewed By: jsgf, StanislavGlebik
Differential Revision: D8859811
fbshipit-source-id: 7c56f9f835f45eca322955cb3b9eadd87fbb30a1
Summary:
This is a series of patches which adds Cargo.toml files to all the crates and tries to build them. There is individual patch for each crate which tells whether that crate build successfully right now using cargo or not, and if not, reason behind that.
Following are the reasons why the crates don't build:
* failure_ext and netstring crates which are internal
* error related to tokio_io, there might be an patched version of tokio_io internally
* actix-web depends on httparse which uses nightly features
All the build is done using rustc version `rustc 1.27.0-dev`.
Pull Request resolved: https://github.com/facebookexperimental/mononoke/pull/7
Differential Revision: D8778746
Pulled By: jsgf
fbshipit-source-id: 927a7a20b1d5c9643869b26c0eab09e90048443e
Summary:
Such methods exist in new futures library, but since we are not using it yet,
let's add these methods to our FutureExt
Reviewed By: jsgf
Differential Revision: D8644300
fbshipit-source-id: e35ff95ff0db3aa5f3e7fba1a77cb826b59873f4
Summary: This diff adds the methods `timeout()` and `on_timeout()` to the `FuturesExt` trait. The former sets a time limit for the execution of a `Future` -- effectively shorthand for `tokio_timer::Deadline`. The latter takes a callback (which may itself return a `Future`) which is called if the wrapped `Future` times out; this is useful for doing things like logging the state of the program to debug why things are timing out.
Reviewed By: StanislavGlebik
Differential Revision: D8508008
fbshipit-source-id: 6d085c35b68d1cf5a24446be8af77eb30028a7db
Summary:
Unfortunately I have to remove tracing. The reason is because tracing doesn't
work with Streams. For now it should be fine because enabling tracing in prod
is still not possible because of the memory and cpu overhead.
Reviewed By: farnz
Differential Revision: D8381855
fbshipit-source-id: e28b4396c81527bdf30fa1703c634688cf645ada
Summary:
AsyncWrite and Sink traits have very similar interfaces. This diff creates an
adapter between them.
The primary motivation for this adapter is in Mononoke. Currently mononoke
wireproto methods that return bundles have to buffer them in memory. This have
a few drawbacks. First of all, it increases memory usage and secondly it
increases the latency.
The proper fix would to make Bundle2Encoder return a Stream instead of
BoxFuture<(), Error>. However that would require changing the whole
async-compression crate, and that's too difficult. We may want to eventually do
it, however for now SinkAsyncWrite seems like a good enough fix.
Reviewed By: farnz
Differential Revision: D8379585
fbshipit-source-id: 19af9452ba09318a7505dda44ef765e8c09b004d
Summary: Going to take a while to get to everything, but here's a start.
Reviewed By: Imxset21
Differential Revision: D8311107
fbshipit-source-id: ada1908b320a5277eda2587d7e8f26b13b952154
Summary: Replace all occurrences of the deprecated put_X::<BigEndian> methods with put_X_be in the files marked with T29077977, and remove any allow deprecation lines in the files and unused imports that might be introduced.
Reviewed By: jsgf
Differential Revision: D7928695
fbshipit-source-id: 8f16915f6b08aa55521637fff58a6cc27c13321a
Summary:
Specialized revsets to make pull faster.
Previous Union + Intersect combination was extremely slow because it fetched a
lot of stuff that wasn't used.
Reviewed By: farnz
Differential Revision: D7829394
fbshipit-source-id: c038f184c305e48e18b6fcb0f83bab9e9a42b098
Summary:
This is a great suggestion from lukaspiatkowski. This method allows us to take synchronous code, schedule it on the default tokio thread pool and convert it to the future.
The great use case is diesel connections.
Reviewed By: lukaspiatkowski
Differential Revision: D7685244
fbshipit-source-id: ba5a99a7ed977a3aa8b5115049cdd71d9b11112c
Summary:
In previous diff upstream select_all() was added. However it had a bug.
In
```
match self.inner.poll() {
...
Async::Ready(_) => {
return Async::Ready(None);
}
}
```
Async::Ready(None) was returned for the case when inner poll returned `Async::Ready(None)`
and for the case when inner poll returned `Async::Ready(Some(None, remaining))`.
The former was correct, however the latter wasn't. `Async::Ready(Some(None, remaining))`
happens when one of the internal streams was exhausted, however there can be
many more internal streams that are not exhausted yet. Before this diff these
non-exhausted elements would be just lost, so this diff fixes it.
Reviewed By: farnz
Differential Revision: D7550918
fbshipit-source-id: 908af9fed17744b884aa40afdccfc4654520048b
Summary:
This is a simple but useful combinator that queries lots of streams in parallel
and merges the results.
This is code is taken from futures-0.2.0, but it was modified so that it works
with futures-0.1.*. It was re-formatted with our linter.
The code had a bug that will be fixed in the next diffs
Reviewed By: farnz
Differential Revision: D7550919
fbshipit-source-id: c5b394065c0184a89dfab6a9de699725bc2bd6c2
Summary: We had a fun bit of debugging because an out-of-order `oneshot::recv()` gave a `oneshot::Canceled` error. Give it an enum for errors, so that we can distinguish dropping the rx channel from calling `oneshot::recv()` before `oneshot::send()`
Reviewed By: StanislavGlebik
Differential Revision: D7382354
fbshipit-source-id: c96f4ac40449a5864b7ba79f43f9af402de7735b
Summary: This new method is useful in my future work where I rely on this easy to use and readable API to be able to get over the fact that futures combinators take Stream by value. With this one can chain on the end of consuming Stream and by using the "remainder" Future get the consumed Stream.
Reviewed By: StanislavGlebik
Differential Revision: D6965532
fbshipit-source-id: 3ab19851b3d48c43c8d7e3a96ae5c03a7d242960
Summary: The Framed and FramedParts duet fits following diffs a bit better than FramedStream and ReadLeadingBuffer.
Reviewed By: jsgf
Differential Revision: D6567554
fbshipit-source-id: 88d117ad9e8227f9de278037b333da7ffc4fdf1f
Summary:
The no-compression decompressor cann't provide framing as other decompressors, so a safe approach would be not to have it at all.
I replaced occurances of no-compression with `Either` reader that seem to be a pattern present in the community (f.e. `futures::future::Either` or `itertools::Either`)
Reviewed By: jsgf
Differential Revision: D6555922
fbshipit-source-id: 998dafab8d9b2f00d058ce2f9e0aced76cf15b4e
Summary:
This diff does a few things:
- Change the rust versions in `third-party{2,-buck}/config.py` to 1.21.
- Update the tp2 symlinks for `rust` and `rust-crates-io`
- Fix build breakages due to new errors/warnings from rustc.
Reviewed By: jsgf, Imxset21
Differential Revision: D6319954
fbshipit-source-id: cd4fe9e0d6f26c1a6c9c3f1256d84cb002bb83d6