Chapter 17: Receiving emitted values using OperationQueue

Hey guys, I’m reading the Chapter 17, and I got a stuck when it comes to OperationQueue.

I’m just testing on my Playground, and I found that my result doesn’t the same as the book says. What I tried is:

var subscriptions = Set<AnyCancellable>()
let queue = OperationQueue()

(1...10).publisher
    .receive(on: queue)
    .sink(
        receiveCompletion: { print("Completed with", $0) },
        receiveValue: { print("Received \($0) on thread \(Thread.current.number)") }
    )
    .store(in: &subscriptions)

and its result on my local machine (Xcode 12.2 (12B45b)) is:

Received 1 on thread 4
Received 2 on thread 3
Received 3 on thread 7
Received 7 on thread 8
Received 6 on thread 9
Completed with finished

I guess it should’ve sent all values from 1 to 10, but some values seem to be missing. This issue also reproducible when I replace the OperationQueue() to DispatchQueue.global().

Is it kind a bug in Xcode? Or do I miss something?

1 Like

As far as I understand the problem is that both: receiveCompletion and receiveValue — are processed concurrently in this case. The example uses in fact a concurrent OperationQueue with maxConcurrentOperationCount other than 1. This results in the fact that completion is able to occur before all the values are delivered for processing. As soon as the subscription receives completion — values are not processed any more.

The same happens with concurrent DispatchQueue.

While I did not find a word about using concurrent schedulers for .receive(on:) in official documentation, still there are some discussions and recommendations not to use concurrent schedulers for this operator. We know why :slight_smile:

If still using OperationQueue — set maxConcurrentOperationCount to 1 to provide sequential events delivery.