aboutsummaryrefslogtreecommitdiffstats
path: root/public/system/storage/vendor/guzzlehttp/guzzle/src/Pool.php
blob: 7b9d83a4eaa9479f6b8d22ca2b09b8d22935eff7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
<?php
namespace GuzzleHttp;

use GuzzleHttp\Event\BeforeEvent;
use GuzzleHttp\Event\RequestEvents;
use GuzzleHttp\Message\RequestInterface;
use GuzzleHttp\Message\ResponseInterface;
use GuzzleHttp\Ring\Core;
use GuzzleHttp\Ring\Future\FutureInterface;
use GuzzleHttp\Event\ListenerAttacherTrait;
use GuzzleHttp\Event\EndEvent;
use React\Promise\Deferred;
use React\Promise\FulfilledPromise;
use React\Promise\PromiseInterface;
use React\Promise\RejectedPromise;

/**
 * Sends and iterator of requests concurrently using a capped pool size.
 *
 * The Pool object implements FutureInterface, meaning it can be used later
 * when necessary, the requests provided to the pool can be cancelled, and
 * you can check the state of the pool to know if it has been dereferenced
 * (sent) or has been cancelled.
 *
 * When sending the pool, keep in mind that no results are returned: callers
 * are expected to handle results asynchronously using Guzzle's event system.
 * When requests complete, more are added to the pool to ensure that the
 * requested pool size is always filled as much as possible.
 *
 * IMPORTANT: Do not provide a pool size greater that what the utilized
 * underlying RingPHP handler can support. This will result is extremely poor
 * performance.
 */
class Pool implements FutureInterface
{
    use ListenerAttacherTrait;

    /** @var \GuzzleHttp\ClientInterface */
    private $client;

    /** @var \Iterator Yields requests */
    private $iter;

    /** @var Deferred */
    private $deferred;

    /** @var PromiseInterface */
    private $promise;

    private $waitQueue = [];
    private $eventListeners = [];
    private $poolSize;
    private $isRealized = false;

    /**
     * The option values for 'before', 'complete', 'error' and 'end' can be a
     * callable, an associative array containing event data, or an array of
     * event data arrays. Event data arrays contain the following keys:
     *
     * - fn: callable to invoke that receives the event
     * - priority: Optional event priority (defaults to 0)
     * - once: Set to true so that the event is removed after it is triggered
     *
     * @param ClientInterface $client   Client used to send the requests.
     * @param array|\Iterator $requests Requests to send in parallel
     * @param array           $options  Associative array of options
     *     - pool_size: (callable|int)   Maximum number of requests to send
     *                                   concurrently, or a callback that receives
     *                                   the current queue size and returns the
     *                                   number of new requests to send
     *     - before:    (callable|array) Receives a BeforeEvent
     *     - complete:  (callable|array) Receives a CompleteEvent
     *     - error:     (callable|array) Receives a ErrorEvent
     *     - end:       (callable|array) Receives an EndEvent
     */
    public function __construct(
        ClientInterface $client,
        $requests,
        array $options = []
    ) {
        $this->client = $client;
        $this->iter = $this->coerceIterable($requests);
        $this->deferred = new Deferred();
        $this->promise = $this->deferred->promise();
        $this->poolSize = isset($options['pool_size'])
            ? $options['pool_size'] : 25;
        $this->eventListeners = $this->prepareListeners(
            $options,
            ['before', 'complete', 'error', 'end']
        );
    }

    /**
     * Sends multiple requests in parallel and returns an array of responses
     * and exceptions that uses the same ordering as the provided requests.
     *
     * IMPORTANT: This method keeps every request and response in memory, and
     * as such, is NOT recommended when sending a large number or an
     * indeterminate number of requests concurrently.
     *
     * @param ClientInterface $client   Client used to send the requests
     * @param array|\Iterator $requests Requests to send in parallel
     * @param array           $options  Passes through the options available in
     *                                  {@see GuzzleHttp\Pool::__construct}
     *
     * @return BatchResults Returns a container for the results.
     * @throws \InvalidArgumentException if the event format is incorrect.
     */
    public static function batch(
        ClientInterface $client,
        $requests,
        array $options = []
    ) {
        $hash = new \SplObjectStorage();
        foreach ($requests as $request) {
            $hash->attach($request);
        }

        // In addition to the normally run events when requests complete, add
        // and event to continuously track the results of transfers in the hash.
        (new self($client, $requests, RequestEvents::convertEventArray(
            $options,
            ['end'],
            [
                'priority' => RequestEvents::LATE,
                'fn'       => function (EndEvent $e) use ($hash) {
                    $hash[$e->getRequest()] = $e->getException()
                        ? $e->getException()
                        : $e->getResponse();
                }
            ]
        )))->wait();

        return new BatchResults($hash);
    }

    /**
     * Creates a Pool and immediately sends the requests.
     *
     * @param ClientInterface $client   Client used to send the requests
     * @param array|\Iterator $requests Requests to send in parallel
     * @param array           $options  Passes through the options available in
     *                                  {@see GuzzleHttp\Pool::__construct}
     */
    public static function send(
        ClientInterface $client,
        $requests,
        array $options = []
    ) {
        $pool = new self($client, $requests, $options);
        $pool->wait();
    }

    private function getPoolSize()
    {
        return is_callable($this->poolSize)
            ? call_user_func($this->poolSize, count($this->waitQueue))
            : $this->poolSize;
    }

    /**
     * Add as many requests as possible up to the current pool limit.
     */
    private function addNextRequests()
    {
        $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
        while ($limit--) {
            if (!$this->addNextRequest()) {
                break;
            }
        }
    }

    public function wait()
    {
        if ($this->isRealized) {
            return false;
        }

        // Seed the pool with N number of requests.
        $this->addNextRequests();

        // Stop if the pool was cancelled while transferring requests.
        if ($this->isRealized) {
            return false;
        }

        // Wait on any outstanding FutureResponse objects.
        while ($response = array_pop($this->waitQueue)) {
            try {
                $response->wait();
            } catch (\Exception $e) {
                // Eat exceptions because they should be handled asynchronously
            }
            $this->addNextRequests();
        }

        // Clean up no longer needed state.
        $this->isRealized = true;
        $this->waitQueue = $this->eventListeners = [];
        $this->client = $this->iter = null;
        $this->deferred->resolve(true);

        return true;
    }

    /**
     * {@inheritdoc}
     *
     * Attempt to cancel all outstanding requests (requests that are queued for
     * dereferencing). Returns true if all outstanding requests can be
     * cancelled.
     *
     * @return bool
     */
    public function cancel()
    {
        if ($this->isRealized) {
            return false;
        }

        $success = $this->isRealized = true;
        foreach ($this->waitQueue as $response) {
            if (!$response->cancel()) {
                $success = false;
            }
        }

        return $success;
    }

    /**
     * Returns a promise that is invoked when the pool completed. There will be
     * no passed value.
     *
     * {@inheritdoc}
     */
    public function then(
        callable $onFulfilled = null,
        callable $onRejected = null,
        callable $onProgress = null
    ) {
        return $this->promise->then($onFulfilled, $onRejected, $onProgress);
    }

    public function promise()
    {
        return $this->promise;
    }

    private function coerceIterable($requests)
    {
        if ($requests instanceof \Iterator) {
            return $requests;
        } elseif (is_array($requests)) {
            return new \ArrayIterator($requests);
        }

        throw new \InvalidArgumentException('Expected Iterator or array. '
            . 'Found ' . Core::describeType($requests));
    }

    /**
     * Adds the next request to pool and tracks what requests need to be
     * dereferenced when completing the pool.
     */
    private function addNextRequest()
    {
        add_next:

        if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
            return false;
        }

        $request = $this->iter->current();
        $this->iter->next();

        if (!($request instanceof RequestInterface)) {
            throw new \InvalidArgumentException(sprintf(
                'All requests in the provided iterator must implement '
                . 'RequestInterface. Found %s',
                Core::describeType($request)
            ));
        }

        // Be sure to use "lazy" futures, meaning they do not send right away.
        $request->getConfig()->set('future', 'lazy');
        $hash = spl_object_hash($request);
        $this->attachListeners($request, $this->eventListeners);
        $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
        $response = $this->client->send($request);
        $this->waitQueue[$hash] = $response;
        $promise = $response->promise();

        // Don't recursively call itself for completed or rejected responses.
        if ($promise instanceof FulfilledPromise
            || $promise instanceof RejectedPromise
        ) {
            try {
                $this->finishResponse($request, $response->wait(), $hash);
            } catch (\Exception $e) {
                $this->finishResponse($request, $e, $hash);
            }
            goto add_next;
        }

        // Use this function for both resolution and rejection.
        $thenFn = function ($value) use ($request, $hash) {
            $this->finishResponse($request, $value, $hash);
            if (!$request->getConfig()->get('_pool_retries')) {
                $this->addNextRequests();
            }
        };

        $promise->then($thenFn, $thenFn);

        return true;
    }

    public function _trackRetries(BeforeEvent $e)
    {
        $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
    }

    private function finishResponse($request, $value, $hash)
    {
        unset($this->waitQueue[$hash]);
        $result = $value instanceof ResponseInterface
            ? ['request' => $request, 'response' => $value, 'error' => null]
            : ['request' => $request, 'response' => null, 'error' => $value];
        $this->deferred->notify($result);
    }
}