LibWeb: Implement min option for ReadableStreamBYOBReader.read()

When the min option is given the read will only be fulfilled when there
are min or more elements available in the readable byte stream.

When the min option is not given the default value for min is 1.
This commit is contained in:
Kenneth Myhra 2024-07-08 16:39:52 +02:00 committed by Andreas Kling
parent 3850214aac
commit 907dc84c1e
Notes: sideshowbarker 2024-07-17 00:16:31 +09:00
9 changed files with 165 additions and 40 deletions

View File

@ -0,0 +1,4 @@
This -⌄
will not be processed as one chunk
This -> will be processed as one chunk
Stream closed

View File

@ -0,0 +1,68 @@
<script src="../include.js"></script>
<script>
const CHUNK1 = "This -⌄";
const CHUNK2 = " will not be processed as one chunk";
const CHUNK3 = "This ->";
const CHUNK4 = " will be processed as one chunk";
const CHUNK5 = "WHF!";
// A read is fulfilled when there are N or more elements available in the stream.
// We start off by allowing a read to be fulfilled with 1 element available.
let minElementsAvailable = 1;
let pullCount = 0;
const readableStream = new ReadableStream({
start(controller) {
},
pull(controller) {
++pullCount;
const encoder = new TextEncoder();
if (pullCount === 1) {
controller.enqueue(encoder.encode(CHUNK1));
} else if (pullCount === 2) {
controller.enqueue(encoder.encode(CHUNK2));
} else if (pullCount === 3) {
controller.enqueue(encoder.encode(CHUNK3));
} else if (pullCount === 4) {
controller.enqueue(encoder.encode(CHUNK4));
// FIXME: Move/remove this controller.close() when we resolve the FIXME right below.
controller.close();
}
// FIXME: This should have been fulfilled since we are closing the controller at this point, currently we do not.
//} else if (pullCount === 5) {
// controller.enqueue(encoder.encode(CHUNK5));
// controller.close();
//}
// Now a read will only be fulfilled when at least 8 elements is available in the stream.
if (pullCount === 2) {
minElementsAvailable = 8;
}
},
type: "bytes",
});
async function readStream(stream) {
const reader = stream.getReader({ mode: "byob" });
let buffer = new ArrayBuffer(200);
let offset = 0;
let byteLength = 40;
while (true) {
let result = await reader.read(new Uint8Array(buffer, offset, byteLength), { min: minElementsAvailable });
if (result.done) {
println("Stream closed");
break;
}
println(new TextDecoder().decode(result.value));
buffer = result.value.buffer;
offset += result.value.byteLength;
}
}
asyncTest(async done => {
await readStream(readableStream);
done();
});
</script>

View File

@ -1133,7 +1133,7 @@ WebIDL::ExceptionOr<ReadableStreamPair> readable_byte_stream_tee(JS::Realm& real
auto read_into_request = realm.heap().allocate_without_realm<ByteStreamTeeBYOBReadRequest>(realm, stream, params, cancel_promise, *byob_branch, *other_branch, for_branch2);
// 5. Perform ! ReadableStreamBYOBReaderRead(reader, view, 1, readIntoRequest).
readable_stream_byob_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamBYOBReader>>(), view, read_into_request);
readable_stream_byob_reader_read(params->reader.get<JS::NonnullGCPtr<ReadableStreamBYOBReader>>(), view, 1, read_into_request);
});
// 17. Let pull1Algorithm be the following steps:
@ -1656,34 +1656,36 @@ void readable_byte_stream_controller_fill_head_pull_into_descriptor(ReadableByte
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue
bool readable_byte_stream_controller_fill_pull_into_descriptor_from_queue(ReadableByteStreamController& controller, PullIntoDescriptor& pull_into_descriptor)
{
// 1. Let elementSize be pullIntoDescriptor.[[elementSize]].
auto element_size = pull_into_descriptor.element_size;
// 2. Let currentAlignedBytes be pullIntoDescriptors bytes filled (pullIntoDescriptors bytes filled mod elementSize).
auto current_aligned_bytes = pull_into_descriptor.bytes_filled - (pull_into_descriptor.bytes_filled % pull_into_descriptor.element_size);
// 3. Let maxBytesToCopy be min(controller.[[queueTotalSize]], pullIntoDescriptors byte length pullIntoDescriptors bytes filled).
// 1. Let maxBytesToCopy be min(controller.[[queueTotalSize]], pullIntoDescriptors byte length pullIntoDescriptors bytes filled).
auto max_bytes_to_copy = min(controller.queue_total_size(), pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled);
// 4. Let maxBytesFilled be pullIntoDescriptors bytes filled + maxBytesToCopy.
// 2. Let maxBytesFilled be pullIntoDescriptors bytes filled + maxBytesToCopy.
u64 max_bytes_filled = pull_into_descriptor.bytes_filled + max_bytes_to_copy;
// 5. Let maxAlignedBytes be maxBytesFilled (maxBytesFilled mod elementSize).
auto max_aligned_bytes = max_bytes_filled - (max_bytes_filled % element_size);
// 6. Let totalBytesToCopyRemaining be maxBytesToCopy.
// 3. Let totalBytesToCopyRemaining be maxBytesToCopy.
auto total_bytes_to_copy_remaining = max_bytes_to_copy;
// 7. Let ready be false.
// 4. Let ready be false.
bool ready = false;
// 8. If maxAlignedBytes > currentAlignedBytes,
if (max_aligned_bytes > current_aligned_bytes) {
// 5. Assert: pullIntoDescriptors bytes filled < pullIntoDescriptors minimum fill.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill);
// 6. Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptors element size.
auto remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size;
// 7. Let maxAlignedBytes be maxBytesFilled remainderBytes.
auto max_aligned_bytes = max_bytes_filled - remainder_bytes;
// 8. If maxAlignedBytes ≥ pullIntoDescriptors minimum fill,
if (max_aligned_bytes >= pull_into_descriptor.minimum_fill) {
// 1. Set totalBytesToCopyRemaining to maxAlignedBytes pullIntoDescriptors bytes filled.
total_bytes_to_copy_remaining = max_aligned_bytes - pull_into_descriptor.bytes_filled;
// 2. Set ready to true.
ready = true;
// NOTE: A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it.
}
// 9. Let queue be controller.[[queue]].
@ -1735,8 +1737,8 @@ bool readable_byte_stream_controller_fill_pull_into_descriptor_from_queue(Readab
// 2. Assert: pullIntoDescriptors bytes filled > 0.
VERIFY(pull_into_descriptor.bytes_filled > 0);
// 3. Assert: pullIntoDescriptors bytes filled < pullIntoDescriptors element size.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.element_size);
// 3. Assert: pullIntoDescriptors bytes filled < pullIntoDescriptors minimum fill.
VERIFY(pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill);
}
// 12. Return ready.
@ -1789,7 +1791,7 @@ JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm
// 3. Assert: bytesFilled ≤ pullIntoDescriptors byte length.
VERIFY(bytes_filled <= pull_into_descriptor.byte_length);
// 4. Assert: bytesFilled mod elementSize is 0.
// 4. Assert: the remainder after dividing bytesFilled by elementSize is 0.
VERIFY(bytes_filled % element_size == 0);
// 5. Let buffer be ! TransferArrayBuffer(pullIntoDescriptors buffer).
@ -1800,7 +1802,7 @@ JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm
}
// https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into
void readable_byte_stream_controller_pull_into(ReadableByteStreamController& controller, WebIDL::ArrayBufferView& view, ReadIntoRequest& read_into_request)
void readable_byte_stream_controller_pull_into(ReadableByteStreamController& controller, WebIDL::ArrayBufferView& view, u64 min, ReadIntoRequest& read_into_request)
{
auto& vm = controller.vm();
auto& realm = controller.realm();
@ -1832,7 +1834,16 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
}
}
// 5. Let byteOffset be view.[[ByteOffset]].
// 5. Let minimumFill be min × elementSize.
u64 minimum_fill = min * element_size;
// 6. Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
VERIFY(minimum_fill <= view.byte_length());
// 7. Assert: the remainder after dividing minimumFill by elementSize is 0.
VERIFY(minimum_fill % element_size == 0);
// 8. Let byteOffset be view.[[ByteOffset]].
auto byte_offset = view.byte_offset();
// 6. Let byteLength be view.[[ByteLength]].
@ -1862,6 +1873,7 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
.byte_offset = byte_offset,
.byte_length = byte_length,
.bytes_filled = 0,
.minimum_fill = minimum_fill,
.element_size = element_size,
.view_constructor = *ctor,
.reader_type = ReaderType::Byob,
@ -1935,7 +1947,7 @@ void readable_byte_stream_controller_pull_into(ReadableByteStreamController& con
}
// https://streams.spec.whatwg.org/#readable-stream-byob-reader-read
void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::ArrayBufferView& view, ReadIntoRequest& read_into_request)
void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::ArrayBufferView& view, u64 min, ReadIntoRequest& read_into_request)
{
// 1. Let stream be reader.[[stream]].
auto stream = reader.stream();
@ -1952,7 +1964,7 @@ void readable_stream_byob_reader_read(ReadableStreamBYOBReader& reader, WebIDL::
}
// 5. Otherwise, perform ! ReadableByteStreamControllerPullInto(stream.[[controller]], view, readIntoRequest).
else {
readable_byte_stream_controller_pull_into(*stream->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>(), view, read_into_request);
readable_byte_stream_controller_pull_into(*stream->controller()->get<JS::NonnullGCPtr<ReadableByteStreamController>>(), view, min, read_into_request);
}
}
@ -2261,8 +2273,7 @@ WebIDL::ExceptionOr<void> readable_byte_stream_controller_respond_in_readable_st
}
// 4. If pullIntoDescriptors bytes filled < pullIntoDescriptors minimum fill, return.
// FIXME: Support minimum fill.
if (pull_into_descriptor.bytes_filled < pull_into_descriptor.element_size)
if (pull_into_descriptor.bytes_filled < pull_into_descriptor.minimum_fill)
return {};
// NOTE: A descriptor for a read() request that is not yet filled up to its minimum length will stay at the head of the queue, so the underlying source can keep filling it.
@ -2722,8 +2733,8 @@ WebIDL::ExceptionOr<void> readable_byte_stream_controller_close(ReadableByteStre
// 1. Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
auto& first_pending_pull_into = controller.pending_pull_intos().first();
// 2. If firstPendingPullIntos bytes filled > 0,
if (first_pending_pull_into.bytes_filled > 0) {
// 2. If the remainder after dividing firstPendingPullIntos bytes filled by firstPendingPullIntos element size is not 0,
if (first_pending_pull_into.bytes_filled % first_pending_pull_into.element_size != 0) {
// 1. Let e be a new TypeError exception.
auto error = JS::TypeError::create(realm, "Cannot close controller in the middle of processing a write request"sv);
@ -3454,8 +3465,8 @@ void readable_byte_stream_controller_commit_pull_into_descriptor(ReadableStream&
// 4. If stream.[[state]] is "closed",
if (stream.is_closed()) {
// 1. Assert: pullIntoDescriptors bytes filled is 0.
VERIFY(pull_into_descriptor.bytes_filled == 0);
// 1. Assert: the remainder after dividing pullIntoDescriptors bytes filled by pullIntoDescriptors element size is 0.
VERIFY(pull_into_descriptor.bytes_filled % pull_into_descriptor.element_size == 0);
// 2. Set done to true.
done = true;

View File

@ -15,6 +15,7 @@
#include <LibWeb/WebIDL/CallbackType.h>
#include <LibWeb/WebIDL/ExceptionOr.h>
#include <LibWeb/WebIDL/Promise.h>
#include <LibWeb/WebIDL/Types.h>
namespace Web::Streams {
@ -61,8 +62,8 @@ void readable_stream_reader_generic_release(ReadableStreamGenericReaderMixin&);
void readable_stream_default_reader_error_read_requests(ReadableStreamDefaultReader&, JS::Value error);
void readable_stream_byob_reader_error_read_into_requests(ReadableStreamBYOBReader&, JS::Value error);
JS::Value readable_byte_stream_controller_convert_pull_into_descriptor(JS::Realm&, PullIntoDescriptor const&);
void readable_byte_stream_controller_pull_into(ReadableByteStreamController&, WebIDL::ArrayBufferView&, ReadIntoRequest&);
void readable_stream_byob_reader_read(ReadableStreamBYOBReader&, WebIDL::ArrayBufferView&, ReadIntoRequest&);
void readable_byte_stream_controller_pull_into(ReadableByteStreamController&, WebIDL::ArrayBufferView&, u64 min, ReadIntoRequest&);
void readable_stream_byob_reader_read(ReadableStreamBYOBReader&, WebIDL::ArrayBufferView&, u64 min, ReadIntoRequest&);
void readable_byte_stream_controller_fill_head_pull_into_descriptor(ReadableByteStreamController const&, u64 size, PullIntoDescriptor&);
void readable_stream_default_reader_read(ReadableStreamDefaultReader&, ReadRequest&);

View File

@ -155,6 +155,7 @@ void ReadableByteStreamController::pull_steps(JS::NonnullGCPtr<ReadRequest> read
.byte_offset = 0,
.byte_length = *m_auto_allocate_chunk_size,
.bytes_filled = 0,
.minimum_fill = 1,
.element_size = 1,
.view_constructor = *realm.intrinsics().uint8_array_constructor(),
.reader_type = ReaderType::Default,

View File

@ -41,6 +41,10 @@ struct PullIntoDescriptor {
// A nonnegative integer number of bytes that have been written into the buffer so far
u64 bytes_filled;
// https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill
// A positive integer representing the minimum number of bytes that must be written into the buffer before the associated read() request may be fulfilled. By default, this equals the element size.
u64 minimum_fill;
// https://streams.spec.whatwg.org/#pull-into-descriptor-element-size
// A positive integer representing the number of bytes that can be written into the buffer at a time, using views of the type described by the view constructor
u64 element_size;

View File

@ -107,7 +107,7 @@ private:
JS_DEFINE_ALLOCATOR(BYOBReaderReadIntoRequest);
// https://streams.spec.whatwg.org/#byob-reader-read
JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::ArrayBufferView>& view)
JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::ArrayBufferView>& view, ReadableStreamBYOBReaderReadOptions options)
{
auto& realm = this->realm();
@ -129,16 +129,42 @@ JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
// 4. If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
// 4. If options["min"] is 0, return a promise rejected with a TypeError exception.
if (options.min == 0) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::TypeError, "options[\"min\'] cannot have a value of 0."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
// 5. If view has a [[TypedArrayName]] internal slot,
if (view->is_typed_array_base()) {
auto const& typed_array = *view->bufferable_object().get<JS::NonnullGCPtr<JS::TypedArrayBase>>();
// 1. If options["min"] > view.[[ArrayLength]], return a promise rejected with a RangeError exception.
if (options.min > typed_array.array_length().length()) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::RangeError, "options[\"min\"] cannot be larger than the length of the view."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
}
// 6. Otherwise (i.e., it is a DataView),
if (view->is_data_view()) {
// 1. If options["min"] > view.[[ByteLength]], return a promise rejected with a RangeError exception.
if (options.min > view->byte_length()) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::RangeError, "options[\"min\"] cannot be larger than the length of the view."sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
}
// 7. If this.[[stream]] is undefined, return a promise rejected with a TypeError exception.
if (!m_stream) {
WebIDL::SimpleException exception { WebIDL::SimpleExceptionType::TypeError, "Cannot read from an empty stream"sv };
return WebIDL::create_rejected_promise_from_exception(realm, move(exception));
}
// 5. Let promise be a new promise.
// 8. Let promise be a new promise.
auto promise_capability = WebIDL::create_promise(realm);
// 6. Let readIntoRequest be a new read-into request with the following items:
// 9. Let readIntoRequest be a new read-into request with the following items:
// chunk steps, given chunk
// Resolve promise with «[ "value" → chunk, "done" → false ]».
// close steps, given chunk
@ -147,10 +173,10 @@ JS::NonnullGCPtr<JS::Promise> ReadableStreamBYOBReader::read(JS::Handle<WebIDL::
// Reject promise with e.
auto read_into_request = heap().allocate_without_realm<BYOBReaderReadIntoRequest>(realm, promise_capability);
// 7. Perform ! ReadableStreamBYOBReaderRead(this, view, readIntoRequest).
readable_stream_byob_reader_read(*this, *view, *read_into_request);
// 10. Perform ! ReadableStreamBYOBReaderRead(this, view, options["min"], readIntoRequest).
readable_stream_byob_reader_read(*this, *view, options.min, *read_into_request);
// 8. Return promise.
// 11. Return promise.
return JS::NonnullGCPtr { verify_cast<JS::Promise>(*promise_capability->promise()) };
}
}

View File

@ -13,9 +13,15 @@
#include <LibWeb/Bindings/PlatformObject.h>
#include <LibWeb/Forward.h>
#include <LibWeb/Streams/ReadableStreamGenericReader.h>
#include <LibWeb/WebIDL/Types.h>
namespace Web::Streams {
// https://streams.spec.whatwg.org/#dictdef-readablestreambyobreaderreadoptions
struct ReadableStreamBYOBReaderReadOptions {
WebIDL::UnsignedLongLong min = 1;
};
// https://streams.spec.whatwg.org/#read-into-request
class ReadIntoRequest : public JS::Cell {
JS_CELL(ReadIntoRequest, JS::Cell);
@ -45,7 +51,7 @@ public:
virtual ~ReadableStreamBYOBReader() override = default;
JS::NonnullGCPtr<JS::Promise> read(JS::Handle<WebIDL::ArrayBufferView>&);
JS::NonnullGCPtr<JS::Promise> read(JS::Handle<WebIDL::ArrayBufferView>&, ReadableStreamBYOBReaderReadOptions options = {});
void release_lock();

View File

@ -7,8 +7,12 @@
interface ReadableStreamBYOBReader {
constructor(ReadableStream stream);
Promise<ReadableStreamReadResult> read(ArrayBufferView view);
Promise<ReadableStreamReadResult> read(ArrayBufferView view, optional ReadableStreamBYOBReaderReadOptions options = {});
undefined releaseLock();
};
ReadableStreamBYOBReader includes ReadableStreamGenericReader;
dictionary ReadableStreamBYOBReaderReadOptions {
[EnforceRange] unsigned long long min = 1;
};