1
1
mirror of https://github.com/kean/Nuke.git synced 2024-11-28 12:04:01 +03:00

Add ImagePipelineActor

This commit is contained in:
kean 2024-07-28 12:32:38 -04:00
parent a30faed641
commit cc2789f9f7
23 changed files with 315 additions and 293 deletions

View File

@ -12,7 +12,6 @@
0C09B1661FE9A65700E8FE3B /* fixture.jpeg in Resources */ = {isa = PBXBuildFile; fileRef = 0C09B1651FE9A65600E8FE3B /* fixture.jpeg */; };
0C09B1691FE9A65700E8FE3B /* fixture.jpeg in Resources */ = {isa = PBXBuildFile; fileRef = 0C09B1651FE9A65600E8FE3B /* fixture.jpeg */; };
0C09B16F1FE9A6D800E8FE3B /* Helpers.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C7C068D1BCA888800089D7F /* Helpers.swift */; };
0C0F7BF12287F6EE0034E656 /* TaskTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C0F7BF02287F6EE0034E656 /* TaskTests.swift */; };
0C0FD5E01CA47FE1002A78FB /* DataLoader.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C0FD5D01CA47FE1002A78FB /* DataLoader.swift */; };
0C0FD5EC1CA47FE1002A78FB /* ImagePipeline.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C0FD5D31CA47FE1002A78FB /* ImagePipeline.swift */; };
0C0FD5FC1CA47FE1002A78FB /* ImageCache.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C0FD5D71CA47FE1002A78FB /* ImageCache.swift */; };
@ -108,7 +107,6 @@
0C64F73D2438371A001983C6 /* img_751.heic in Resources */ = {isa = PBXBuildFile; fileRef = 0C64F73C243836B5001983C6 /* img_751.heic */; };
0C64F73F243838BF001983C6 /* swift.png in Resources */ = {isa = PBXBuildFile; fileRef = 0C64F73E243838BF001983C6 /* swift.png */; };
0C68F609208A1F40007DC696 /* ImageDecoderRegistryTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C68F608208A1F40007DC696 /* ImageDecoderRegistryTests.swift */; };
0C69FA4E1D4E222D00DA9982 /* ImagePrefetcherTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CD195291D4348AC00E011BB /* ImagePrefetcherTests.swift */; };
0C6B5BDB257010B400D763F2 /* image-p3.jpg in Resources */ = {isa = PBXBuildFile; fileRef = 0C6B5BDA257010B400D763F2 /* image-p3.jpg */; };
0C6B5BE1257010D300D763F2 /* ImagePipelineFormatsTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C6B5BE0257010D300D763F2 /* ImagePipelineFormatsTests.swift */; };
0C6CF0CD1DAF789C007B8C0E /* XCTestCaseExtensions.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0C7C068E1BCA888800089D7F /* XCTestCaseExtensions.swift */; };
@ -231,6 +229,7 @@
0CB644C92856807F00916267 /* fixture.jpeg in Resources */ = {isa = PBXBuildFile; fileRef = 0C09B1651FE9A65600E8FE3B /* fixture.jpeg */; };
0CB644CA2856807F00916267 /* swift.png in Resources */ = {isa = PBXBuildFile; fileRef = 0C64F73E243838BF001983C6 /* swift.png */; };
0CBA07862852DA8B00CE29F4 /* ImagePipeline+Error.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CBA07852852DA8B00CE29F4 /* ImagePipeline+Error.swift */; };
0CC04B0A2C5698D500F1164D /* ImagePipelineActor.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CC04B092C5698D500F1164D /* ImagePipelineActor.swift */; };
0CC36A1925B8BC2500811018 /* RateLimiter.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CC36A1825B8BC2500811018 /* RateLimiter.swift */; };
0CC36A2525B8BC4900811018 /* Operation.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CC36A2425B8BC4900811018 /* Operation.swift */; };
0CC36A2C25B8BC6300811018 /* LinkedList.swift in Sources */ = {isa = PBXBuildFile; fileRef = 0CC36A2B25B8BC6300811018 /* LinkedList.swift */; };
@ -492,6 +491,7 @@
0CB6449B28567E5400916267 /* ImageViewLoadingOptionsTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ImageViewLoadingOptionsTests.swift; sourceTree = "<group>"; };
0CB644AA28567EEA00916267 /* ImageViewExtensionsProgressiveDecodingTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ImageViewExtensionsProgressiveDecodingTests.swift; sourceTree = "<group>"; };
0CBA07852852DA8B00CE29F4 /* ImagePipeline+Error.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = "ImagePipeline+Error.swift"; sourceTree = "<group>"; };
0CC04B092C5698D500F1164D /* ImagePipelineActor.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ImagePipelineActor.swift; sourceTree = "<group>"; };
0CC36A1825B8BC2500811018 /* RateLimiter.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = RateLimiter.swift; sourceTree = "<group>"; };
0CC36A2425B8BC4900811018 /* Operation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Operation.swift; sourceTree = "<group>"; };
0CC36A2B25B8BC6300811018 /* LinkedList.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LinkedList.swift; sourceTree = "<group>"; };
@ -959,6 +959,7 @@
isa = PBXGroup;
children = (
0C0FD5D31CA47FE1002A78FB /* ImagePipeline.swift */,
0CC04B092C5698D500F1164D /* ImagePipelineActor.swift */,
0CF1754B22913F9800A8946E /* ImagePipeline+Configuration.swift */,
0C53C8B0263C968200E62D03 /* ImagePipeline+Delegate.swift */,
0C78A2A6263F4E680051E0FF /* ImagePipeline+Cache.swift */,
@ -1619,7 +1620,6 @@
4480674C2A448C9F00DE7CF8 /* DataPublisherTests.swift in Sources */,
0CD37C9A25BA36D5006C2C36 /* ImagePipelineLoadDataTests.swift in Sources */,
0C75279F1D473AEF00EC6222 /* MockImageProcessor.swift in Sources */,
0C69FA4E1D4E222D00DA9982 /* ImagePrefetcherTests.swift in Sources */,
0CB26807208F25C2004C83F4 /* DataCacheTests.swift in Sources */,
0C880532242E7B1500F8C5B3 /* ImagePipelineDecodingTests.swift in Sources */,
0C91B0F02438E352007F9100 /* RoundedCornersTests.swift in Sources */,
@ -1668,7 +1668,6 @@
0C68F609208A1F40007DC696 /* ImageDecoderRegistryTests.swift in Sources */,
0CE6202526543EC700AAB8C3 /* ImagePipelinePublisherTests.swift in Sources */,
0C91B0EE2438E307007F9100 /* CircleTests.swift in Sources */,
0C0F7BF12287F6EE0034E656 /* TaskTests.swift in Sources */,
0CC6271525BDF7A100466F04 /* ImagePipelineImageCacheTests.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
@ -1725,6 +1724,7 @@
0C78A2A7263F4E680051E0FF /* ImagePipeline+Cache.swift in Sources */,
0CA4ECD026E68FC000BAC8E5 /* DataCaching.swift in Sources */,
0CA4ECCA26E6868300BAC8E5 /* ImageProcessingOptions.swift in Sources */,
0CC04B0A2C5698D500F1164D /* ImagePipelineActor.swift in Sources */,
0C53C8B1263C968200E62D03 /* ImagePipeline+Delegate.swift in Sources */,
0CA4ECBC26E6856300BAC8E5 /* ImageDecompression.swift in Sources */,
0CA4ECD326E68FDC00BAC8E5 /* ImageCaching.swift in Sources */,

View File

@ -18,7 +18,8 @@ import AppKit
/// The pipeline maintains a strong reference to the task until the request
/// finishes or fails; you do not need to maintain a reference to the task unless
/// it is useful for your app.
public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Sendable {
@ImagePipelineActor
public final class ImageTask: Hashable {
/// An identifier that uniquely identifies the task within a given pipeline.
/// Unique only within that pipeline.
public let taskId: Int64
@ -28,15 +29,15 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
/// The priority of the task. The priority can be updated dynamically even
/// for a task that is already running.
public var priority: ImageRequest.Priority {
get { withLock { $0.priority } }
public nonisolated var priority: ImageRequest.Priority {
get { nonisolatedState.withLock { $0.priority } }
set { setPriority(newValue) }
}
/// Returns the current download progress. Returns zeros before the download
/// is started and the expected size of the resource is known.
public var currentProgress: Progress {
withLock { $0.progress }
public nonisolated var currentProgress: Progress {
nonisolatedState.withLock { $0.progress }
}
/// The download progress.
@ -59,8 +60,8 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
}
/// The current state of the task.
public var state: State {
withLock { $0.state }
public nonisolated var state: State {
nonisolatedState.withLock { $0.state }
}
/// The state of the image task.
@ -94,7 +95,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
}
/// The stream of progress updates.
public var progress: AsyncStream<Progress> {
public nonisolated var progress: AsyncStream<Progress> {
makeStream {
if case .progress(let value) = $0 { return value }
return nil
@ -105,7 +106,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
/// progressive decoding.
///
/// - seealso: ``ImagePipeline/Configuration-swift.struct/isProgressiveDecodingEnabled``
public var previews: AsyncStream<ImageResponse> {
public nonisolated var previews: AsyncStream<ImageResponse> {
makeStream {
if case .preview(let value) = $0 { return value }
return nil
@ -115,7 +116,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
// MARK: - Events
/// The events sent by the pipeline during the task execution.
public var events: AsyncStream<Event> { makeStream { $0 } }
public nonisolated var events: AsyncStream<Event> { makeStream { $0 } }
/// An event produced during the runetime of the task.
public enum Event: Sendable {
@ -132,59 +133,57 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
case finished(Result<ImageResponse, ImagePipeline.Error>)
}
private var publicState: PublicState
private nonisolated let nonisolatedState: ImageTaskNonisolatedState
private let isDataTask: Bool
private let onEvent: ((Event, ImageTask) -> Void)?
private let lock: os_unfair_lock_t
private let queue: DispatchQueue
private weak var pipeline: ImagePipeline?
// State synchronized on `pipeline.queue`.
var _task: Task<ImageResponse, Error>!
private var _task: Task<ImageResponse, Error>!
var _continuation: UnsafeContinuation<ImageResponse, Error>?
var _state: State = .running
private var _events: PassthroughSubject<Event, Never>?
deinit {
lock.deinitialize(count: 1)
lock.deallocate()
}
init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) {
nonisolated init(taskId: Int64, request: ImageRequest, isDataTask: Bool, pipeline: ImagePipeline, onEvent: ((Event, ImageTask) -> Void)?) {
self.taskId = taskId
self.request = request
self.publicState = PublicState(priority: request.priority)
self.nonisolatedState = ImageTaskNonisolatedState(priority: request.priority)
self.isDataTask = isDataTask
self.pipeline = pipeline
self.queue = pipeline.queue
self.onEvent = onEvent
lock = .allocate(capacity: 1)
lock.initialize(to: os_unfair_lock())
self._task = Task { @ImagePipelineActor in
try await withUnsafeThrowingContinuation { continuation in
self._continuation = continuation
pipeline.startImageTask(self, isDataTask: isDataTask)
}
}
}
/// Marks task as being cancelled.
///
/// The pipeline will immediately cancel any work associated with a task
/// unless there is an equivalent outstanding task running.
public func cancel() {
let didChange: Bool = withLock {
public nonisolated func cancel() {
let didChange: Bool = nonisolatedState.withLock {
guard $0.state == .running else { return false }
$0.state = .cancelled
return true
}
guard didChange else { return } // Make sure it gets called once (expensive)
pipeline?.imageTaskCancelCalled(self)
Task {
await pipeline?.imageTaskCancelCalled(self)
}
}
private func setPriority(_ newValue: ImageRequest.Priority) {
let didChange: Bool = withLock {
private nonisolated func setPriority(_ newValue: ImageRequest.Priority) {
let didChange: Bool = nonisolatedState.withLock {
guard $0.priority != newValue else { return false }
$0.priority = newValue
return $0.state == .running
}
guard didChange else { return }
pipeline?.imageTaskUpdatePriorityCalled(self, priority: newValue)
Task {
await pipeline?.imageTaskUpdatePriorityCalled(self, priority: newValue)
}
}
// MARK: Internals
@ -210,7 +209,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
_dispatch(.preview(response))
}
case let .progress(value):
withLock { $0.progress = value }
nonisolatedState.withLock { $0.progress = value }
_dispatch(.progress(value))
case let .error(error):
_finish(.failure(error))
@ -228,7 +227,7 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
guard _state == .running else { return false }
_state = state
if onEvent == nil {
withLock { $0.state = state }
nonisolatedState.withLock { $0.state = state }
}
return true
}
@ -262,19 +261,13 @@ public final class ImageTask: Hashable, CustomStringConvertible, @unchecked Send
// MARK: Hashable
public func hash(into hasher: inout Hasher) {
public nonisolated func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self).hashValue)
}
public static func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
public static nonisolated func == (lhs: ImageTask, rhs: ImageTask) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}
// MARK: CustomStringConvertible
public var description: String {
"ImageTask(id: \(taskId), priority: \(priority), progress: \(currentProgress.completed) / \(currentProgress.total), state: \(state))"
}
}
@available(*, deprecated, renamed: "ImageTask", message: "Async/Await support was added directly to the existing `ImageTask` type")
@ -283,9 +276,9 @@ public typealias AsyncImageTask = ImageTask
// MARK: - ImageTask (Private)
extension ImageTask {
private func makeStream<T>(of closure: @Sendable @escaping (Event) -> T?) -> AsyncStream<T> {
private nonisolated func makeStream<T>(of closure: @Sendable @escaping (Event) -> T?) -> AsyncStream<T> {
AsyncStream { continuation in
self.queue.async {
Task { @ImagePipelineActor in
guard let events = self._makeEventsSubject() else {
return continuation.finish()
}
@ -309,7 +302,6 @@ extension ImageTask {
}
}
// Synchronized on `pipeline.queue`
private func _makeEventsSubject() -> PassthroughSubject<Event, Never>? {
guard _state == .running else {
return nil
@ -319,19 +311,33 @@ extension ImageTask {
}
return _events!
}
}
private func withLock<T>(_ closure: (inout PublicState) -> T) -> T {
/// Contains the state synchronized using the internal lock.
///
/// - warning: Must be accessed using `withLock`.
private final class ImageTaskNonisolatedState: @unchecked(Sendable) {
var state: ImageTask.State = .running
var priority: ImageRequest.Priority
var progress = ImageTask.Progress(completed: 0, total: 0)
private let lock: os_unfair_lock_t
deinit {
lock.deinitialize(count: 1)
lock.deallocate()
}
init(priority: ImageRequest.Priority) {
self.priority = priority
lock = .allocate(capacity: 1)
lock.initialize(to: os_unfair_lock())
}
func withLock<T>(_ closure: (ImageTaskNonisolatedState) -> T) -> T {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
return closure(&publicState)
}
/// Contains the state synchronized using the internal lock.
///
/// - warning: Must be accessed using `withLock`.
private struct PublicState {
var state: ImageTask.State = .running
var priority: ImageRequest.Priority
var progress = Progress(completed: 0, total: 0)
return closure(self)
}
}

View File

@ -38,3 +38,13 @@ final class Atomic<T>: @unchecked Sendable {
return closure(&_value)
}
}
extension Atomic where T: BinaryInteger {
func incremented() -> T {
withLock {
let value = $0
$0 += 1
return value
}
}
}

View File

@ -5,81 +5,83 @@
import Foundation
import Combine
/// A publisher that starts a new `ImageTask` when a subscriber is added.
///
/// If the requested image is available in the memory cache, the value is
/// delivered immediately. When the subscription is cancelled, the task also
/// gets cancelled.
///
/// - note: In case the pipeline has `isProgressiveDecodingEnabled` option enabled
/// and the image being downloaded supports progressive decoding, the publisher
/// might emit more than a single value.
struct ImagePublisher: Publisher, Sendable {
typealias Output = ImageResponse
typealias Failure = ImagePipeline.Error
#warning("TODO: uncomment/move to NukeLegacy?")
let request: ImageRequest
let pipeline: ImagePipeline
func receive<S>(subscriber: S) where S: Subscriber, S: Sendable, Failure == S.Failure, Output == S.Input {
let subscription = ImageSubscription(
request: self.request,
pipeline: self.pipeline,
subscriber: subscriber
)
subscriber.receive(subscription: subscription)
}
}
private final class ImageSubscription<S>: Subscription where S: Subscriber, S: Sendable, S.Input == ImageResponse, S.Failure == ImagePipeline.Error {
private var task: ImageTask?
private let subscriber: S?
private let request: ImageRequest
private let pipeline: ImagePipeline
private var isStarted = false
init(request: ImageRequest, pipeline: ImagePipeline, subscriber: S) {
self.pipeline = pipeline
self.request = request
self.subscriber = subscriber
}
func request(_ demand: Subscribers.Demand) {
guard demand > 0 else { return }
guard let subscriber else { return }
if let image = pipeline.cache[request] {
_ = subscriber.receive(ImageResponse(container: image, request: request, cacheType: .memory))
if !image.isPreview {
subscriber.receive(completion: .finished)
return
}
}
task = pipeline.loadImage(
with: request,
progress: { response, _, _ in
if let response {
// Send progressively decoded image (if enabled and if any)
_ = subscriber.receive(response)
}
},
completion: { result in
switch result {
case let .success(response):
_ = subscriber.receive(response)
subscriber.receive(completion: .finished)
case let .failure(error):
subscriber.receive(completion: .failure(error))
}
}
)
}
func cancel() {
task?.cancel()
task = nil
}
}
///// A publisher that starts a new `ImageTask` when a subscriber is added.
/////
///// If the requested image is available in the memory cache, the value is
///// delivered immediately. When the subscription is cancelled, the task also
///// gets cancelled.
/////
///// - note: In case the pipeline has `isProgressiveDecodingEnabled` option enabled
///// and the image being downloaded supports progressive decoding, the publisher
///// might emit more than a single value.
//struct ImagePublisher: Publisher, Sendable {
// typealias Output = ImageResponse
// typealias Failure = ImagePipeline.Error
//
// let request: ImageRequest
// let pipeline: ImagePipeline
//
// func receive<S>(subscriber: S) where S: Subscriber, S: Sendable, Failure == S.Failure, Output == S.Input {
// let subscription = ImageSubscription(
// request: self.request,
// pipeline: self.pipeline,
// subscriber: subscriber
// )
// subscriber.receive(subscription: subscription)
// }
//}
//
//private final class ImageSubscription<S>: Subscription where S: Subscriber, S: Sendable, S.Input == ImageResponse, S.Failure == ImagePipeline.Error {
// private var task: ImageTask?
// private let subscriber: S?
// private let request: ImageRequest
// private let pipeline: ImagePipeline
// private var isStarted = false
//
// init(request: ImageRequest, pipeline: ImagePipeline, subscriber: S) {
// self.pipeline = pipeline
// self.request = request
// self.subscriber = subscriber
//
// }
//
// func request(_ demand: Subscribers.Demand) {
// guard demand > 0 else { return }
// guard let subscriber else { return }
//
// if let image = pipeline.cache[request] {
// _ = subscriber.receive(ImageResponse(container: image, request: request, cacheType: .memory))
//
// if !image.isPreview {
// subscriber.receive(completion: .finished)
// return
// }
// }
//
// task = pipeline.loadImage(
// with: request,
// progress: { response, _, _ in
// if let response {
// // Send progressively decoded image (if enabled and if any)
// _ = subscriber.receive(response)
// }
// },
// completion: { result in
// switch result {
// case let .success(response):
// _ = subscriber.receive(response)
// subscriber.receive(completion: .finished)
// case let .failure(error):
// subscriber.receive(completion: .failure(error))
// }
// }
// )
// }
//
// func cancel() {
// task?.cancel()
// task = nil
// }
//}

View File

@ -13,12 +13,12 @@ import Foundation
/// The implementation supports quick bursts of requests which can be executed
/// without any delays when "the bucket is full". This is important to prevent
/// rate limiter from affecting "normal" requests flow.
final class RateLimiter: @unchecked Sendable {
@ImagePipelineActor
final class RateLimiter {
// This type isn't really Sendable and requires the caller to use the same
// queue as it does for synchronization.
private let bucket: TokenBucket
private let queue: DispatchQueue
private var pending = LinkedList<Work>() // fast append, fast remove first
private var isExecutingPendingTasks = false
@ -30,8 +30,7 @@ final class RateLimiter: @unchecked Sendable {
/// - rate: Maximum number of requests per second. 80 by default.
/// - burst: Maximum number of requests which can be executed without any
/// delays when "bucket is full". 25 by default.
init(queue: DispatchQueue, rate: Int = 80, burst: Int = 25) {
self.queue = queue
nonisolated init(rate: Int = 80, burst: Int = 25) {
self.bucket = TokenBucket(rate: Double(rate), burst: Double(burst))
}
@ -56,7 +55,11 @@ final class RateLimiter: @unchecked Sendable {
let bucketRate = 1000.0 / bucket.rate
let delay = Int(2.1 * bucketRate) // 14 ms for rate 80 (default)
let bounds = min(100, max(15, delay))
queue.asyncAfter(deadline: .now() + .milliseconds(bounds)) { self.executePendingTasks() }
#warning("correct?")
Task {
try? await Task.sleep(nanoseconds: UInt64(bounds) * 1_000_000)
self.executePendingTasks()
}
}
private func executePendingTasks() {

View File

@ -6,7 +6,7 @@ import Foundation
/// Resumable data support. For more info see:
/// - https://developer.apple.com/library/content/qa/qa1761/_index.html
struct ResumableData: @unchecked Sendable {
struct ResumableData: Sendable {
let data: Data
let validator: String // Either Last-Modified or ETag

View File

@ -62,10 +62,6 @@ public protocol ImagePipelineDelegate: AnyObject, Sendable {
// MARK: ImageTask
/// Gets called when the task is created. Unlike other methods, it is called
/// immediately on the caller's queue.
func imageTaskCreated(_ task: ImageTask, pipeline: ImagePipeline)
/// Gets called when the task receives an event.
func imageTask(_ task: ImageTask, didReceiveEvent event: ImageTask.Event, pipeline: ImagePipeline)
@ -124,8 +120,6 @@ extension ImagePipelineDelegate {
return response
}
public func imageTaskCreated(_ task: ImageTask, pipeline: ImagePipeline) {}
public func imageTask(_ task: ImageTask, didReceiveEvent event: ImageTask.Event, pipeline: ImagePipeline) {}
public func imageTaskDidStart(_ task: ImageTask, pipeline: ImagePipeline) {}

View File

@ -13,21 +13,22 @@ import UIKit
import AppKit
#endif
/// The pipeline downloads and caches images, and prepares them for display.
public final class ImagePipeline: @unchecked Sendable {
/// The pipeline downloads and caches images, and prepares them for display.
@ImagePipelineActor
public final class ImagePipeline {
/// Returns the shared image pipeline.
public static var shared: ImagePipeline {
public nonisolated static var shared: ImagePipeline {
get { _shared.value }
set { _shared.value = newValue }
}
private static let _shared = Atomic(value: ImagePipeline(configuration: .withURLCache))
private nonisolated static let _shared = Atomic(value: ImagePipeline(configuration: .withURLCache))
/// The pipeline configuration.
public let configuration: Configuration
public nonisolated let configuration: Configuration
/// Provides access to the underlying caching subsystems.
public var cache: ImagePipeline.Cache { .init(pipeline: self) }
public nonisolated var cache: ImagePipeline.Cache { .init(pipeline: self) }
let delegate: any ImagePipelineDelegate
@ -38,27 +39,15 @@ public final class ImagePipeline: @unchecked Sendable {
private let tasksFetchOriginalImage: TaskPool<TaskFetchOriginalImageKey, ImageResponse, Error>
private let tasksFetchOriginalData: TaskPool<TaskFetchOriginalDataKey, (Data, URLResponse?), Error>
// The queue on which the entire subsystem is synchronized.
let queue = DispatchQueue(label: "com.github.kean.Nuke.ImagePipeline", qos: .userInitiated)
private var isInvalidated = false
private var nextTaskId: Int64 {
os_unfair_lock_lock(lock)
defer { os_unfair_lock_unlock(lock) }
_nextTaskId += 1
return _nextTaskId
}
private var _nextTaskId: Int64 = 0
private let lock: os_unfair_lock_t
private nonisolated let nextTaskId = Atomic<Int64>(value: 0)
let rateLimiter: RateLimiter?
let id = UUID()
var onTaskStarted: ((ImageTask) -> Void)? // Debug purposes
deinit {
lock.deinitialize(count: 1)
lock.deallocate()
ResumableDataStorage.shared.unregister(self)
}
@ -67,9 +56,9 @@ public final class ImagePipeline: @unchecked Sendable {
/// - parameters:
/// - configuration: The pipeline configuration.
/// - delegate: Provides more ways to customize the pipeline behavior on per-request basis.
public init(configuration: Configuration = Configuration(), delegate: (any ImagePipelineDelegate)? = nil) {
public nonisolated init(configuration: Configuration = Configuration(), delegate: (any ImagePipelineDelegate)? = nil) {
self.configuration = configuration
self.rateLimiter = configuration.isRateLimiterEnabled ? RateLimiter(queue: queue) : nil
self.rateLimiter = configuration.isRateLimiterEnabled ? RateLimiter() : nil
self.delegate = delegate ?? ImagePipelineDefaultDelegate()
(configuration.dataLoader as? DataLoader)?.prefersIncrementalDelivery = configuration.isProgressiveDecodingEnabled
@ -79,9 +68,6 @@ public final class ImagePipeline: @unchecked Sendable {
self.tasksFetchOriginalImage = TaskPool(isCoalescingEnabled)
self.tasksFetchOriginalData = TaskPool(isCoalescingEnabled)
self.lock = .allocate(capacity: 1)
self.lock.initialize(to: os_unfair_lock())
ResumableDataStorage.shared.register(self)
}
@ -99,7 +85,7 @@ public final class ImagePipeline: @unchecked Sendable {
/// - parameters:
/// - configuration: The pipeline configuration.
/// - delegate: Provides more ways to customize the pipeline behavior on per-request basis.
public convenience init(delegate: (any ImagePipelineDelegate)? = nil, _ configure: (inout ImagePipeline.Configuration) -> Void) {
public nonisolated convenience init(delegate: (any ImagePipelineDelegate)? = nil, _ configure: (inout ImagePipeline.Configuration) -> Void) {
var configuration = ImagePipeline.Configuration()
configure(&configuration)
self.init(configuration: configuration, delegate: delegate)
@ -107,8 +93,8 @@ public final class ImagePipeline: @unchecked Sendable {
/// Invalidates the pipeline and cancels all outstanding tasks. Any new
/// requests will immediately fail with ``ImagePipeline/Error/pipelineInvalidated`` error.
public func invalidate() {
queue.async {
public nonisolated func invalidate() {
Task { @ImagePipelineActor in
guard !self.isInvalidated else { return }
self.isInvalidated = true
self.tasks.keys.forEach(self.cancelImageTask)
@ -120,14 +106,14 @@ public final class ImagePipeline: @unchecked Sendable {
/// Creates a task with the given URL.
///
/// The task starts executing the moment it is created.
public func imageTask(with url: URL) -> ImageTask {
public nonisolated func imageTask(with url: URL) -> ImageTask {
imageTask(with: ImageRequest(url: url))
}
/// Creates a task with the given request.
///
/// The task starts executing the moment it is created.
public func imageTask(with request: ImageRequest) -> ImageTask {
public nonisolated func imageTask(with request: ImageRequest) -> ImageTask {
makeStartedImageTask(with: request)
}
@ -160,7 +146,7 @@ public final class ImagePipeline: @unchecked Sendable {
/// - request: An image request.
/// - completion: A closure to be called on the main thread when the request
/// is finished.
@discardableResult public func loadImage(
@discardableResult public nonisolated func loadImage(
with url: URL,
completion: @escaping (_ result: Result<ImageResponse, Error>) -> Void
) -> ImageTask {
@ -173,7 +159,7 @@ public final class ImagePipeline: @unchecked Sendable {
/// - request: An image request.
/// - completion: A closure to be called on the main thread when the request
/// is finished.
@discardableResult public func loadImage(
@discardableResult public nonisolated func loadImage(
with request: ImageRequest,
completion: @escaping (_ result: Result<ImageResponse, Error>) -> Void
) -> ImageTask {
@ -188,7 +174,7 @@ public final class ImagePipeline: @unchecked Sendable {
/// the progress is updated.
/// - completion: A closure to be called on the main thread when the request
/// is finished.
@discardableResult public func loadImage(
@discardableResult public nonisolated func loadImage(
with request: ImageRequest,
queue: DispatchQueue? = nil,
progress: ((_ response: ImageResponse?, _ completed: Int64, _ total: Int64) -> Void)?,
@ -199,7 +185,7 @@ public final class ImagePipeline: @unchecked Sendable {
}, completion: completion)
}
func _loadImage(
nonisolated func _loadImage(
with request: ImageRequest,
isDataTask: Bool = false,
queue callbackQueue: DispatchQueue? = nil,
@ -216,7 +202,8 @@ public final class ImagePipeline: @unchecked Sendable {
case .preview(let response): progress?(response, task.currentProgress)
case .cancelled: break // The legacy APIs do not send cancellation events
case .finished(let result):
_ = task._setState(.completed) // Important to do it on the callback queue
#warning("it should be isolated")
// _ = task._setState(.completed) // Important to do it on the callback queue
completion(result)
}
}
@ -224,26 +211,29 @@ public final class ImagePipeline: @unchecked Sendable {
}
// nuke-13: requires callbacks to be @MainActor @Sendable or deprecate this entire API
private func dispatchCallback(to callbackQueue: DispatchQueue?, _ closure: @escaping () -> Void) {
let box = UncheckedSendableBox(value: closure)
if callbackQueue === self.queue {
closure()
} else {
(callbackQueue ?? self.configuration._callbackQueue).async {
box.value()
}
}
private nonisolated func dispatchCallback(to callbackQueue: DispatchQueue?, _ closure: @escaping () -> Void) {
#warning("remove this")
closure()
// let box = UncheckedSendableBox(value: closure)
// if callbackQueue === self.queue {
// closure()
// } else {
// (callbackQueue ?? self.configuration._callbackQueue).async {
// box.value()
// }
// }
}
// MARK: - Loading Data (Closures)
/// Loads image data for the given request. The data doesn't get decoded
/// or processed in any other way.
@discardableResult public func loadData(with request: ImageRequest, completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) -> ImageTask {
@discardableResult public nonisolated func loadData(with request: ImageRequest, completion: @escaping (Result<(data: Data, response: URLResponse?), Error>) -> Void) -> ImageTask {
_loadData(with: request, queue: nil, progress: nil, completion: completion)
}
private func _loadData(
private nonisolated func _loadData(
with request: ImageRequest,
queue: DispatchQueue?,
progress progressHandler: ((_ completed: Int64, _ total: Int64) -> Void)?,
@ -273,7 +263,7 @@ public final class ImagePipeline: @unchecked Sendable {
/// callbacks. By default, the pipeline uses `.main` queue.
/// - progress: A closure to be called periodically on the main thread when the progress is updated.
/// - completion: A closure to be called on the main thread when the request is finished.
@discardableResult public func loadData(
@discardableResult public nonisolated func loadData(
with request: ImageRequest,
queue: DispatchQueue? = nil,
progress progressHandler: ((_ completed: Int64, _ total: Int64) -> Void)?,
@ -293,36 +283,25 @@ public final class ImagePipeline: @unchecked Sendable {
// MARK: - Loading Images (Combine)
/// Returns a publisher which starts a new ``ImageTask`` when a subscriber is added.
public func imagePublisher(with url: URL) -> AnyPublisher<ImageResponse, Error> {
public nonisolated func imagePublisher(with url: URL) -> AnyPublisher<ImageResponse, Error> {
imagePublisher(with: ImageRequest(url: url))
}
/// Returns a publisher which starts a new ``ImageTask`` when a subscriber is added.
public func imagePublisher(with request: ImageRequest) -> AnyPublisher<ImageResponse, Error> {
ImagePublisher(request: request, pipeline: self).eraseToAnyPublisher()
public nonisolated func imagePublisher(with request: ImageRequest) -> AnyPublisher<ImageResponse, Error> {
#warning("TODO: reimplement")
fatalError()
// ImagePublisher(request: request, pipeline: self).eraseToAnyPublisher()
}
// MARK: - ImageTask (Internal)
private func makeStartedImageTask(with request: ImageRequest, isDataTask: Bool = false, onEvent: ((ImageTask.Event, ImageTask) -> Void)? = nil) -> ImageTask {
let task = ImageTask(taskId: nextTaskId, request: request, isDataTask: isDataTask, pipeline: self, onEvent: onEvent)
// Important to call it before `imageTaskStartCalled`
if !isDataTask {
delegate.imageTaskCreated(task, pipeline: self)
}
task._task = Task {
try await withUnsafeThrowingContinuation { continuation in
self.queue.async {
task._continuation = continuation
self.startImageTask(task, isDataTask: isDataTask)
}
}
}
return task
private nonisolated func makeStartedImageTask(with request: ImageRequest, isDataTask: Bool = false, onEvent: ((ImageTask.Event, ImageTask) -> Void)? = nil) -> ImageTask {
ImageTask(taskId: nextTaskId.incremented(), request: request, isDataTask: isDataTask, pipeline: self, onEvent: onEvent)
}
// By this time, the task has `continuation` set and is fully wired.
private func startImageTask(_ task: ImageTask, isDataTask: Bool) {
func startImageTask(_ task: ImageTask, isDataTask: Bool) {
guard task._state != .cancelled else {
// The task gets started asynchronously in a `Task` and cancellation
// can happen before the pipeline reached `startImageTask`. In that
@ -348,13 +327,11 @@ public final class ImagePipeline: @unchecked Sendable {
// MARK: - Image Task Events
func imageTaskCancelCalled(_ task: ImageTask) {
queue.async { self.cancelImageTask(task) }
self.cancelImageTask(task)
}
func imageTaskUpdatePriorityCalled(_ task: ImageTask, priority: ImageRequest.Priority) {
queue.async {
self.tasks[task]?.setPriority(priority.taskPriority)
}
self.tasks[task]?.setPriority(priority.taskPriority)
}
func imageTask(_ task: ImageTask, didProcessEvent event: ImageTask.Event, isDataTask: Bool) {

View File

@ -0,0 +1,13 @@
// The MIT License (MIT)
//
// Copyright (c) 2015-2024 Alexander Grebenyuk (github.com/kean).
import Foundation
// swiftlint:disable convenience_type
@globalActor
public struct ImagePipelineActor {
public actor ImagePipelineActor { }
public static let shared = ImagePipelineActor()
}
// swiftlint:enable convenience_type

View File

@ -11,7 +11,8 @@ import Foundation
///
/// All ``ImagePrefetcher`` methods are thread-safe and are optimized to be used
/// even from the main thread during scrolling.
public final class ImagePrefetcher: @unchecked Sendable {
@ImagePipelineActor
public final class ImagePrefetcher {
/// Pauses the prefetching.
///
/// - note: When you pause, the prefetcher will finish outstanding tasks
@ -27,7 +28,9 @@ public final class ImagePrefetcher: @unchecked Sendable {
public var priority: ImageRequest.Priority = .low {
didSet {
let newValue = priority
pipeline.queue.async { self.didUpdatePriority(to: newValue) }
Task {
self.didUpdatePriority(to: newValue)
}
}
}
@ -53,7 +56,7 @@ public final class ImagePrefetcher: @unchecked Sendable {
public var didComplete: (@MainActor @Sendable () -> Void)?
private let pipeline: ImagePipeline
private var tasks = [TaskLoadImageKey: Task]()
private var tasks = [TaskLoadImageKey: PrefetchTask]()
private let destination: Destination
private var _priority: ImageRequest.Priority = .low
let queue = OperationQueue() // internal for testing
@ -64,20 +67,21 @@ public final class ImagePrefetcher: @unchecked Sendable {
/// - pipeline: The pipeline used for loading images.
/// - destination: By default load images in all cache layers.
/// - maxConcurrentRequestCount: 2 by default.
public init(pipeline: ImagePipeline = ImagePipeline.shared,
destination: Destination = .memoryCache,
maxConcurrentRequestCount: Int = 2) {
public nonisolated init(
pipeline: ImagePipeline = ImagePipeline.shared,
destination: Destination = .memoryCache,
maxConcurrentRequestCount: Int = 2
) {
self.pipeline = pipeline
self.destination = destination
self.queue.maxConcurrentOperationCount = maxConcurrentRequestCount
self.queue.underlyingQueue = pipeline.queue
}
deinit {
let tasks = self.tasks.values // Make sure we don't retain self
self.tasks.removeAll()
pipeline.queue.async {
Task { @ImagePipelineActor in
for task in tasks {
task.cancel()
}
@ -87,7 +91,7 @@ public final class ImagePrefetcher: @unchecked Sendable {
/// Starts prefetching images for the given URL.
///
/// See also ``startPrefetching(with:)-718dg`` that works with ``ImageRequest``.
public func startPrefetching(with urls: [URL]) {
public nonisolated func startPrefetching(with urls: [URL]) {
startPrefetching(with: urls.map { ImageRequest(url: $0) })
}
@ -101,8 +105,8 @@ public final class ImagePrefetcher: @unchecked Sendable {
/// (`.low` by default).
///
/// See also ``startPrefetching(with:)-1jef2`` that works with `URL`.
public func startPrefetching(with requests: [ImageRequest]) {
pipeline.queue.async {
public nonisolated func startPrefetching(with requests: [ImageRequest]) {
Task { @ImagePipelineActor in
self._startPrefetching(with: requests)
}
}
@ -126,24 +130,27 @@ public final class ImagePrefetcher: @unchecked Sendable {
guard tasks[key] == nil else {
return
}
let task = Task(request: request, key: key)
let task = PrefetchTask(request: request, key: key)
task.operation = queue.add { [weak self] finish in
guard let self else { return finish() }
self.loadImage(task: task, finish: finish)
Task { @ImagePipelineActor in
self.loadImage(task: task, finish: finish)
}
}
tasks[key] = task
return
}
private func loadImage(task: Task, finish: @escaping () -> Void) {
task.imageTask = pipeline._loadImage(with: task.request, isDataTask: destination == .diskCache, queue: pipeline.queue, progress: nil) { [weak self] _ in
#warning("use async/await")
private func loadImage(task: PrefetchTask, finish: @escaping () -> Void) {
task.imageTask = pipeline._loadImage(with: task.request, isDataTask: destination == .diskCache, progress: nil) { [weak self] _ in
self?._remove(task)
finish()
}
task.onCancelled = finish
}
private func _remove(_ task: Task) {
private func _remove(_ task: PrefetchTask) {
guard tasks[task.key] === task else { return } // Should never happen
tasks[task.key] = nil
sendCompletionIfNeeded()
@ -172,8 +179,8 @@ public final class ImagePrefetcher: @unchecked Sendable {
/// of ``ImagePrefetcher``.
///
/// See also ``stopPrefetching(with:)-2tcyq`` that works with `URL`.
public func stopPrefetching(with requests: [ImageRequest]) {
pipeline.queue.async {
public nonisolated func stopPrefetching(with requests: [ImageRequest]) {
Task { @ImagePipelineActor in
for request in requests {
self._stopPrefetching(with: request)
}
@ -187,8 +194,8 @@ public final class ImagePrefetcher: @unchecked Sendable {
}
/// Stops all prefetching tasks.
public func stopPrefetching() {
pipeline.queue.async {
public nonisolated func stopPrefetching() {
Task { @ImagePipelineActor in
self.tasks.values.forEach { $0.cancel() }
self.tasks.removeAll()
}
@ -202,7 +209,7 @@ public final class ImagePrefetcher: @unchecked Sendable {
}
}
private final class Task: @unchecked Sendable {
private final class PrefetchTask: @unchecked Sendable {
let key: TaskLoadImageKey
let request: ImageRequest
weak var imageTask: ImageTask?

View File

@ -19,6 +19,7 @@ class AsyncPipelineTask<Value: Sendable>: AsyncTask<Value, ImagePipeline.Error>,
// Returns all image tasks subscribed to the current pipeline task.
// A suboptimal approach just to make the new DiskCachPolicy.automatic work.
@ImagePipelineActor
protocol ImageTaskSubscribers {
var imageTasks: [ImageTask] { get }
}
@ -50,12 +51,9 @@ extension AsyncPipelineTask {
guard decoder.isAsynchronous else {
return completion(decode())
}
operation = pipeline.configuration.imageDecodingQueue.add { [weak self] in
guard let self else { return }
operation = pipeline.configuration.imageDecodingQueue.add {
let response = decode()
self.pipeline.queue.async {
completion(response)
}
completion(response)
}
}
}

View File

@ -14,8 +14,8 @@ import Foundation
/// automatically cancels them, updates the priority, etc. Most steps in the
/// image pipeline are represented using Operation to take advantage of these features.
///
/// - warning: Must be thread-confined!
class AsyncTask<Value: Sendable, Error: Sendable>: AsyncTaskSubscriptionDelegate, @unchecked Sendable {
@ImagePipelineActor
class AsyncTask<Value: Sendable, Error: Sendable>: AsyncTaskSubscriptionDelegate {
private struct Subscription {
let closure: (Event) -> Void
@ -218,6 +218,7 @@ class AsyncTask<Value: Sendable, Error: Sendable>: AsyncTaskSubscriptionDelegate
extension AsyncTask {
/// Publishes the results of the task.
@ImagePipelineActor
struct Publisher {
fileprivate let task: AsyncTask
@ -281,7 +282,8 @@ extension AsyncTask.Event: Equatable where Value: Equatable, Error: Equatable {}
/// Represents a subscription to a task. The observer must retain a strong
/// reference to a subscription.
struct TaskSubscription: Sendable {
@ImagePipelineActor
struct TaskSubscription {
private let task: any AsyncTaskSubscriptionDelegate
private let key: TaskSubscriptionKey
@ -311,7 +313,8 @@ struct TaskSubscription: Sendable {
}
}
private protocol AsyncTaskSubscriptionDelegate: AnyObject, Sendable {
@ImagePipelineActor
private protocol AsyncTaskSubscriptionDelegate: AnyObject {
func unsubsribe(key: TaskSubscriptionKey)
func setPriority(_ priority: TaskPriority, for observer: TaskSubscriptionKey)
}
@ -320,12 +323,12 @@ private typealias TaskSubscriptionKey = Int
// MARK: - TaskPool
/// Contains the tasks which haven't completed yet.
@ImagePipelineActor
final class TaskPool<Key: Hashable, Value: Sendable, Error: Sendable> {
private let isCoalescingEnabled: Bool
private var map = [Key: AsyncTask<Value, Error>]()
init(_ isCoalescingEnabled: Bool) {
nonisolated init(_ isCoalescingEnabled: Bool) {
self.isCoalescingEnabled = isCoalescingEnabled
}

View File

@ -6,7 +6,7 @@ import Foundation
/// Fetches original image from the data loader (`DataLoading`) and stores it
/// in the disk cache (`DataCaching`).
final class TaskFetchOriginalData: AsyncPipelineTask<(Data, URLResponse?)>, @unchecked Sendable {
final class TaskFetchOriginalData: AsyncPipelineTask<(Data, URLResponse?)> {
private var urlResponse: URLResponse?
private var resumableData: ResumableData?
private var resumedDataCount: Int64 = 0
@ -54,7 +54,7 @@ final class TaskFetchOriginalData: AsyncPipelineTask<(Data, URLResponse?)>, @unc
guard let self else {
return finish()
}
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.loadData(urlRequest: urlRequest, finish: finish)
}
}
@ -83,14 +83,14 @@ final class TaskFetchOriginalData: AsyncPipelineTask<(Data, URLResponse?)>, @unc
let dataLoader = pipeline.delegate.dataLoader(for: request, pipeline: pipeline)
let dataTask = dataLoader.loadData(with: urlRequest, didReceiveData: { [weak self] data, response in
guard let self else { return }
self.pipeline.queue.async {
Task {
self.dataTask(didReceiveData: data, response: response)
}
}, completion: { [weak self] error in
finish() // Finish the operation!
guard let self else { return }
signpost(self, "LoadImageData", .end, "Finished with size \(Formatter.bytes(self.data.count))")
self.pipeline.queue.async {
Task {
self.dataTaskDidFinish(error: error)
}
})

View File

@ -5,7 +5,7 @@
import Foundation
/// Receives data from ``TaskLoadImageData`` and decodes it as it arrives.
final class TaskFetchOriginalImage: AsyncPipelineTask<ImageResponse>, @unchecked Sendable {
final class TaskFetchOriginalImage: AsyncPipelineTask<ImageResponse> {
private var decoder: (any ImageDecoding)?
override func start() {
@ -38,8 +38,12 @@ final class TaskFetchOriginalImage: AsyncPipelineTask<ImageResponse>, @unchecked
return
}
decode(context, decoder: decoder) { [weak self] in
self?.didFinishDecoding(context: context, result: $0)
#warning("implement using async/await")
decode(context, decoder: decoder) { [weak self] result in
guard let self else { return }
Task {
await self.didFinishDecoding(context: context, result: result)
}
}
}

View File

@ -6,7 +6,7 @@ import Foundation
/// Fetches data using the publisher provided with the request.
/// Unlike `TaskFetchOriginalImageData`, there is no resumable data involved.
final class TaskFetchWithPublisher: AsyncPipelineTask<(Data, URLResponse?)>, @unchecked Sendable {
final class TaskFetchWithPublisher: AsyncPipelineTask<(Data, URLResponse?)> {
private lazy var data = Data()
override func start() {
@ -19,7 +19,7 @@ final class TaskFetchWithPublisher: AsyncPipelineTask<(Data, URLResponse?)>, @un
guard let self else {
return finish()
}
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.loadData { finish() }
}
}
@ -40,12 +40,12 @@ final class TaskFetchWithPublisher: AsyncPipelineTask<(Data, URLResponse?)>, @un
let cancellable = publisher.sink(receiveCompletion: { [weak self] result in
finish() // Finish the operation!
guard let self else { return }
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.dataTaskDidFinish(result)
}
}, receiveValue: { [weak self] data in
guard let self else { return }
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.data.append(data)
}
})

View File

@ -30,8 +30,12 @@ final class TaskLoadImage: AsyncPipelineTask<ImageResponse>, @unchecked Sendable
guard let decoder = pipeline.delegate.imageDecoder(for: context, pipeline: pipeline) else {
return didFinishDecoding(with: nil)
}
decode(context, decoder: decoder) { [weak self] in
self?.didFinishDecoding(with: try? $0.get())
#warning("implement using async/awiat")
decode(context, decoder: decoder) { [weak self] result in
guard let self else { return }
Task {
await self.didFinishDecoding(with: try? result.get())
}
}
}
@ -82,7 +86,7 @@ final class TaskLoadImage: AsyncPipelineTask<ImageResponse>, @unchecked Sendable
ImagePipeline.Error.processingFailed(processor: processor, context: context, error: error)
}
}
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.operation = nil
self.didFinishProcessing(result: result, isCompleted: isCompleted)
}
@ -117,7 +121,7 @@ final class TaskLoadImage: AsyncPipelineTask<ImageResponse>, @unchecked Sendable
let response = signpost(isCompleted ? "DecompressImage" : "DecompressProgressiveImage") {
self.pipeline.delegate.decompress(response: response, request: self.request, pipeline: self.pipeline)
}
self.pipeline.queue.async {
Task { @ImagePipelineActor in
self.operation = nil
self.didReceiveDecompressedImage(response, isCompleted: isCompleted)
}

View File

@ -29,11 +29,6 @@ final class ImagePipelineObserver: ImagePipelineDelegate, @unchecked Sendable {
lock.unlock()
}
func imageTaskCreated(_ task: ImageTask, pipeline: ImagePipeline) {
onTaskCreated?(task)
append(.created)
}
func imageTaskDidStart(_ task: ImageTask, pipeline: ImagePipeline) {
startedTaskCount += 1
NotificationCenter.default.post(name: ImagePipelineObserver.didStartTask, object: self, userInfo: [ImagePipelineObserver.taskKey: task])

View File

@ -29,7 +29,7 @@ extension ImageResponse: Equatable {
}
extension ImagePipeline {
func reconfigured(_ configure: (inout ImagePipeline.Configuration) -> Void) -> ImagePipeline {
nonisolated func reconfigured(_ configure: (inout ImagePipeline.Configuration) -> Void) -> ImagePipeline {
var configuration = self.configuration
configure(&configuration)
return ImagePipeline(configuration: configuration)

View File

@ -174,24 +174,25 @@ class ImagePipelineAsyncAwaitTests: XCTestCase, @unchecked Sendable {
XCTAssertEqual(recordedProgress, [])
}
func testCancelAsyncImageTask() async throws {
dataLoader.queue.isSuspended = true
pipeline.queue.suspend()
let task = pipeline.imageTask(with: Test.url)
observer = NotificationCenter.default.addObserver(forName: MockDataLoader.DidStartTask, object: dataLoader, queue: OperationQueue()) { _ in
task.cancel()
}
pipeline.queue.resume()
var caughtError: Error?
do {
_ = try await task.image
} catch {
caughtError = error
}
XCTAssertTrue(caughtError is CancellationError)
}
#warning("reimplement")
// func testCancelAsyncImageTask() async throws {
// dataLoader.queue.isSuspended = true
//
// pipeline.queue.suspend()
// let task = pipeline.imageTask(with: Test.url)
// observer = NotificationCenter.default.addObserver(forName: MockDataLoader.DidStartTask, object: dataLoader, queue: OperationQueue()) { _ in
// task.cancel()
// }
// pipeline.queue.resume()
//
// var caughtError: Error?
// do {
// _ = try await task.image
// } catch {
// caughtError = error
// }
// XCTAssertTrue(caughtError is CancellationError)
// }
// MARK: - Load Data

View File

@ -425,9 +425,11 @@ extension XCTestCase {
dataLoader.isSuspended = true
let expectation = self.expectation(description: "registered")
expectation.expectedFulfillmentCount = count
pipeline.onTaskStarted = { _ in
expectation.fulfill()
}
#warning("reimplement")
// pipeline.onTaskStarted = { _ in
// expectation.fulfill()
// }
closure()
wait(for: [expectation], timeout: 5)
dataLoader.isSuspended = false

View File

@ -5,6 +5,7 @@
import XCTest
@testable import Nuke
#warning("reimplement (remove from target")
final class ImagePrefetcherTests: XCTestCase {
private var pipeline: ImagePipeline!
private var dataLoader: MockDataLoader!

View File

@ -19,7 +19,7 @@ class RateLimiterTests: XCTestCase {
queue.setSpecific(key: queueKey, value: ())
// Note: we set very short rate to avoid bucket form being refilled too quickly
rateLimiter = RateLimiter(queue: queue, rate: 10, burst: 2)
rateLimiter = RateLimiter(rate: 10, burst: 2)
}
func testThatBurstIsExecutedimmediately() {

View File

@ -5,6 +5,8 @@
import XCTest
@testable import Nuke
#warning("reimplement")
class TaskTests: XCTestCase {
// MARK: - Starter