AmpClientState.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Internal;
  11. use Amp\CancellationToken;
  12. use Amp\Deferred;
  13. use Amp\Http\Client\Connection\ConnectionLimitingPool;
  14. use Amp\Http\Client\Connection\DefaultConnectionFactory;
  15. use Amp\Http\Client\InterceptedHttpClient;
  16. use Amp\Http\Client\Interceptor\RetryRequests;
  17. use Amp\Http\Client\PooledHttpClient;
  18. use Amp\Http\Client\Request;
  19. use Amp\Http\Client\Response;
  20. use Amp\Http\Tunnel\Http1TunnelConnector;
  21. use Amp\Http\Tunnel\Https1TunnelConnector;
  22. use Amp\Promise;
  23. use Amp\Socket\Certificate;
  24. use Amp\Socket\ClientTlsContext;
  25. use Amp\Socket\ConnectContext;
  26. use Amp\Socket\Connector;
  27. use Amp\Socket\DnsConnector;
  28. use Amp\Socket\SocketAddress;
  29. use Amp\Success;
  30. use Psr\Log\LoggerInterface;
  31. /**
  32. * Internal representation of the Amp client's state.
  33. *
  34. * @author Nicolas Grekas <p@tchwork.com>
  35. *
  36. * @internal
  37. */
  38. final class AmpClientState extends ClientState
  39. {
  40. public array $dnsCache = [];
  41. public int $responseCount = 0;
  42. public array $pushedResponses = [];
  43. private array $clients = [];
  44. private \Closure $clientConfigurator;
  45. private int $maxHostConnections;
  46. private int $maxPendingPushes;
  47. private $logger;
  48. public function __construct(?callable $clientConfigurator, int $maxHostConnections, int $maxPendingPushes, ?LoggerInterface &$logger)
  49. {
  50. $clientConfigurator ??= static fn (PooledHttpClient $client) => new InterceptedHttpClient($client, new RetryRequests(2));
  51. $this->clientConfigurator = $clientConfigurator instanceof \Closure ? $clientConfigurator : \Closure::fromCallable($clientConfigurator);
  52. $this->maxHostConnections = $maxHostConnections;
  53. $this->maxPendingPushes = $maxPendingPushes;
  54. $this->logger = &$logger;
  55. }
  56. /**
  57. * @return Promise<Response>
  58. */
  59. public function request(array $options, Request $request, CancellationToken $cancellation, array &$info, \Closure $onProgress, &$handle): Promise
  60. {
  61. if ($options['proxy']) {
  62. if ($request->hasHeader('proxy-authorization')) {
  63. $options['proxy']['auth'] = $request->getHeader('proxy-authorization');
  64. }
  65. // Matching "no_proxy" should follow the behavior of curl
  66. $host = $request->getUri()->getHost();
  67. foreach ($options['proxy']['no_proxy'] as $rule) {
  68. $dotRule = '.'.ltrim($rule, '.');
  69. if ('*' === $rule || $host === $rule || substr($host, -\strlen($dotRule)) === $dotRule) {
  70. $options['proxy'] = null;
  71. break;
  72. }
  73. }
  74. }
  75. $request = clone $request;
  76. if ($request->hasHeader('proxy-authorization')) {
  77. $request->removeHeader('proxy-authorization');
  78. }
  79. if ($options['capture_peer_cert_chain']) {
  80. $info['peer_certificate_chain'] = [];
  81. }
  82. $request->addEventListener(new AmpListener($info, $options['peer_fingerprint']['pin-sha256'] ?? [], $onProgress, $handle));
  83. $request->setPushHandler(function ($request, $response) use ($options): Promise {
  84. return $this->handlePush($request, $response, $options);
  85. });
  86. ($request->hasHeader('content-length') ? new Success((int) $request->getHeader('content-length')) : $request->getBody()->getBodyLength())
  87. ->onResolve(static function ($e, $bodySize) use (&$info) {
  88. if (null !== $bodySize && 0 <= $bodySize) {
  89. $info['upload_content_length'] = ((1 + $info['upload_content_length']) ?? 1) - 1 + $bodySize;
  90. }
  91. });
  92. [$client, $connector] = $this->getClient($options);
  93. $response = $client->request($request, $cancellation);
  94. $response->onResolve(static function ($e) use ($connector, &$handle) {
  95. if (null === $e) {
  96. $handle = $connector->handle;
  97. }
  98. });
  99. return $response;
  100. }
  101. private function getClient(array $options): array
  102. {
  103. $options = [
  104. 'bindto' => $options['bindto'] ?: '0',
  105. 'verify_peer' => $options['verify_peer'],
  106. 'capath' => $options['capath'],
  107. 'cafile' => $options['cafile'],
  108. 'local_cert' => $options['local_cert'],
  109. 'local_pk' => $options['local_pk'],
  110. 'ciphers' => $options['ciphers'],
  111. 'capture_peer_cert_chain' => $options['capture_peer_cert_chain'] || $options['peer_fingerprint'],
  112. 'proxy' => $options['proxy'],
  113. ];
  114. $key = md5(serialize($options));
  115. if (isset($this->clients[$key])) {
  116. return $this->clients[$key];
  117. }
  118. $context = new ClientTlsContext('');
  119. $options['verify_peer'] || $context = $context->withoutPeerVerification();
  120. $options['cafile'] && $context = $context->withCaFile($options['cafile']);
  121. $options['capath'] && $context = $context->withCaPath($options['capath']);
  122. $options['local_cert'] && $context = $context->withCertificate(new Certificate($options['local_cert'], $options['local_pk']));
  123. $options['ciphers'] && $context = $context->withCiphers($options['ciphers']);
  124. $options['capture_peer_cert_chain'] && $context = $context->withPeerCapturing();
  125. $connector = $handleConnector = new class() implements Connector {
  126. public $connector;
  127. public $uri;
  128. public $handle;
  129. public function connect(string $uri, ConnectContext $context = null, CancellationToken $token = null): Promise
  130. {
  131. $result = $this->connector->connect($this->uri ?? $uri, $context, $token);
  132. $result->onResolve(function ($e, $socket) {
  133. $this->handle = null !== $socket ? $socket->getResource() : false;
  134. });
  135. return $result;
  136. }
  137. };
  138. $connector->connector = new DnsConnector(new AmpResolver($this->dnsCache));
  139. $context = (new ConnectContext())
  140. ->withTcpNoDelay()
  141. ->withTlsContext($context);
  142. if ($options['bindto']) {
  143. if (file_exists($options['bindto'])) {
  144. $connector->uri = 'unix://'.$options['bindto'];
  145. } else {
  146. $context = $context->withBindTo($options['bindto']);
  147. }
  148. }
  149. if ($options['proxy']) {
  150. $proxyUrl = parse_url($options['proxy']['url']);
  151. $proxySocket = new SocketAddress($proxyUrl['host'], $proxyUrl['port']);
  152. $proxyHeaders = $options['proxy']['auth'] ? ['Proxy-Authorization' => $options['proxy']['auth']] : [];
  153. if ('ssl' === $proxyUrl['scheme']) {
  154. $connector = new Https1TunnelConnector($proxySocket, $context->getTlsContext(), $proxyHeaders, $connector);
  155. } else {
  156. $connector = new Http1TunnelConnector($proxySocket, $proxyHeaders, $connector);
  157. }
  158. }
  159. $maxHostConnections = 0 < $this->maxHostConnections ? $this->maxHostConnections : \PHP_INT_MAX;
  160. $pool = new DefaultConnectionFactory($connector, $context);
  161. $pool = ConnectionLimitingPool::byAuthority($maxHostConnections, $pool);
  162. return $this->clients[$key] = [($this->clientConfigurator)(new PooledHttpClient($pool)), $handleConnector];
  163. }
  164. private function handlePush(Request $request, Promise $response, array $options): Promise
  165. {
  166. $deferred = new Deferred();
  167. $authority = $request->getUri()->getAuthority();
  168. if ($this->maxPendingPushes <= \count($this->pushedResponses[$authority] ?? [])) {
  169. $fifoUrl = key($this->pushedResponses[$authority]);
  170. unset($this->pushedResponses[$authority][$fifoUrl]);
  171. $this->logger && $this->logger->debug(sprintf('Evicting oldest pushed response: "%s"', $fifoUrl));
  172. }
  173. $url = (string) $request->getUri();
  174. $this->logger && $this->logger->debug(sprintf('Queueing pushed response: "%s"', $url));
  175. $this->pushedResponses[$authority][] = [$url, $deferred, $request, $response, [
  176. 'proxy' => $options['proxy'],
  177. 'bindto' => $options['bindto'],
  178. 'local_cert' => $options['local_cert'],
  179. 'local_pk' => $options['local_pk'],
  180. ]];
  181. return $deferred->promise();
  182. }
  183. }