NativeResponse.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Response;
  11. use Psr\Log\LoggerInterface;
  12. use Symfony\Component\HttpClient\Chunk\FirstChunk;
  13. use Symfony\Component\HttpClient\Exception\TransportException;
  14. use Symfony\Component\HttpClient\Internal\Canary;
  15. use Symfony\Component\HttpClient\Internal\ClientState;
  16. use Symfony\Component\HttpClient\Internal\NativeClientState;
  17. use Symfony\Contracts\HttpClient\ResponseInterface;
  18. /**
  19. * @author Nicolas Grekas <p@tchwork.com>
  20. *
  21. * @internal
  22. */
  23. final class NativeResponse implements ResponseInterface, StreamableInterface
  24. {
  25. use CommonResponseTrait;
  26. use TransportResponseTrait;
  27. /**
  28. * @var resource
  29. */
  30. private $context;
  31. private string $url;
  32. private $resolver;
  33. private $onProgress;
  34. private ?int $remaining = null;
  35. /**
  36. * @var resource|null
  37. */
  38. private $buffer;
  39. private $multi;
  40. private float $pauseExpiry = 0.0;
  41. /**
  42. * @internal
  43. */
  44. public function __construct(NativeClientState $multi, $context, string $url, array $options, array &$info, callable $resolver, ?callable $onProgress, ?LoggerInterface $logger)
  45. {
  46. $this->multi = $multi;
  47. $this->id = $id = (int) $context;
  48. $this->context = $context;
  49. $this->url = $url;
  50. $this->logger = $logger;
  51. $this->timeout = $options['timeout'];
  52. $this->info = &$info;
  53. $this->resolver = $resolver;
  54. $this->onProgress = $onProgress;
  55. $this->inflate = !isset($options['normalized_headers']['accept-encoding']);
  56. $this->shouldBuffer = $options['buffer'] ?? true;
  57. // Temporary resource to dechunk the response stream
  58. $this->buffer = fopen('php://temp', 'w+');
  59. $info['user_data'] = $options['user_data'];
  60. $info['max_duration'] = $options['max_duration'];
  61. ++$multi->responseCount;
  62. $this->initializer = static function (self $response) {
  63. return null === $response->remaining;
  64. };
  65. $pauseExpiry = &$this->pauseExpiry;
  66. $info['pause_handler'] = static function (float $duration) use (&$pauseExpiry) {
  67. $pauseExpiry = 0 < $duration ? microtime(true) + $duration : 0;
  68. };
  69. $this->canary = new Canary(static function () use ($multi, $id) {
  70. if (null !== ($host = $multi->openHandles[$id][6] ?? null) && 0 >= --$multi->hosts[$host]) {
  71. unset($multi->hosts[$host]);
  72. }
  73. unset($multi->openHandles[$id], $multi->handlesActivity[$id]);
  74. });
  75. }
  76. /**
  77. * {@inheritdoc}
  78. */
  79. public function getInfo(string $type = null): mixed
  80. {
  81. if (!$info = $this->finalInfo) {
  82. $info = $this->info;
  83. $info['url'] = implode('', $info['url']);
  84. unset($info['size_body'], $info['request_header']);
  85. if (null === $this->buffer) {
  86. $this->finalInfo = $info;
  87. }
  88. }
  89. return null !== $type ? $info[$type] ?? null : $info;
  90. }
  91. public function __destruct()
  92. {
  93. try {
  94. $this->doDestruct();
  95. } finally {
  96. // Clear the DNS cache when all requests completed
  97. if (0 >= --$this->multi->responseCount) {
  98. $this->multi->responseCount = 0;
  99. $this->multi->dnsCache = [];
  100. }
  101. }
  102. }
  103. private function open(): void
  104. {
  105. $url = $this->url;
  106. set_error_handler(function ($type, $msg) use (&$url) {
  107. if (\E_NOTICE !== $type || 'fopen(): Content-type not specified assuming application/x-www-form-urlencoded' !== $msg) {
  108. throw new TransportException($msg);
  109. }
  110. $this->logger && $this->logger->info(sprintf('%s for "%s".', $msg, $url ?? $this->url));
  111. });
  112. try {
  113. $this->info['start_time'] = microtime(true);
  114. [$resolver, $url] = ($this->resolver)($this->multi);
  115. while (true) {
  116. $context = stream_context_get_options($this->context);
  117. if ($proxy = $context['http']['proxy'] ?? null) {
  118. $this->info['debug'] .= "* Establish HTTP proxy tunnel to {$proxy}\n";
  119. $this->info['request_header'] = $url;
  120. } else {
  121. $this->info['debug'] .= "* Trying {$this->info['primary_ip']}...\n";
  122. $this->info['request_header'] = $this->info['url']['path'].$this->info['url']['query'];
  123. }
  124. $this->info['request_header'] = sprintf("> %s %s HTTP/%s \r\n", $context['http']['method'], $this->info['request_header'], $context['http']['protocol_version']);
  125. $this->info['request_header'] .= implode("\r\n", $context['http']['header'])."\r\n\r\n";
  126. if (\array_key_exists('peer_name', $context['ssl']) && null === $context['ssl']['peer_name']) {
  127. unset($context['ssl']['peer_name']);
  128. $this->context = stream_context_create([], ['options' => $context] + stream_context_get_params($this->context));
  129. }
  130. // Send request and follow redirects when needed
  131. $this->handle = $h = fopen($url, 'r', false, $this->context);
  132. self::addResponseHeaders(stream_get_meta_data($h)['wrapper_data'], $this->info, $this->headers, $this->info['debug']);
  133. $url = $resolver($this->multi, $this->headers['location'][0] ?? null, $this->context);
  134. if (null === $url) {
  135. break;
  136. }
  137. $this->logger && $this->logger->info(sprintf('Redirecting: "%s %s"', $this->info['http_code'], $url ?? $this->url));
  138. }
  139. } catch (\Throwable $e) {
  140. $this->close();
  141. $this->multi->handlesActivity[$this->id][] = null;
  142. $this->multi->handlesActivity[$this->id][] = $e;
  143. return;
  144. } finally {
  145. $this->info['pretransfer_time'] = $this->info['total_time'] = microtime(true) - $this->info['start_time'];
  146. restore_error_handler();
  147. }
  148. if (isset($context['ssl']['capture_peer_cert_chain']) && isset(($context = stream_context_get_options($this->context))['ssl']['peer_certificate_chain'])) {
  149. $this->info['peer_certificate_chain'] = $context['ssl']['peer_certificate_chain'];
  150. }
  151. stream_set_blocking($h, false);
  152. $this->context = $this->resolver = null;
  153. // Create dechunk buffers
  154. if (isset($this->headers['content-length'])) {
  155. $this->remaining = (int) $this->headers['content-length'][0];
  156. } elseif ('chunked' === ($this->headers['transfer-encoding'][0] ?? null)) {
  157. stream_filter_append($this->buffer, 'dechunk', \STREAM_FILTER_WRITE);
  158. $this->remaining = -1;
  159. } else {
  160. $this->remaining = -2;
  161. }
  162. $this->multi->handlesActivity[$this->id] = [new FirstChunk()];
  163. if ('HEAD' === $context['http']['method'] || \in_array($this->info['http_code'], [204, 304], true)) {
  164. $this->multi->handlesActivity[$this->id][] = null;
  165. $this->multi->handlesActivity[$this->id][] = null;
  166. return;
  167. }
  168. $host = parse_url($this->info['redirect_url'] ?? $this->url, \PHP_URL_HOST);
  169. $this->multi->lastTimeout = null;
  170. $this->multi->openHandles[$this->id] = [&$this->pauseExpiry, $h, $this->buffer, $this->onProgress, &$this->remaining, &$this->info, $host];
  171. $this->multi->hosts[$host] = 1 + ($this->multi->hosts[$host] ?? 0);
  172. }
  173. /**
  174. * {@inheritdoc}
  175. */
  176. private function close(): void
  177. {
  178. $this->canary->cancel();
  179. $this->handle = $this->buffer = $this->inflate = $this->onProgress = null;
  180. }
  181. /**
  182. * {@inheritdoc}
  183. */
  184. private static function schedule(self $response, array &$runningResponses): void
  185. {
  186. if (!isset($runningResponses[$i = $response->multi->id])) {
  187. $runningResponses[$i] = [$response->multi, []];
  188. }
  189. $runningResponses[$i][1][$response->id] = $response;
  190. if (null === $response->buffer) {
  191. // Response already completed
  192. $response->multi->handlesActivity[$response->id][] = null;
  193. $response->multi->handlesActivity[$response->id][] = null !== $response->info['error'] ? new TransportException($response->info['error']) : null;
  194. }
  195. }
  196. /**
  197. * {@inheritdoc}
  198. *
  199. * @param NativeClientState $multi
  200. */
  201. private static function perform(ClientState $multi, array &$responses = null): void
  202. {
  203. foreach ($multi->openHandles as $i => [$pauseExpiry, $h, $buffer, $onProgress]) {
  204. if ($pauseExpiry) {
  205. if (microtime(true) < $pauseExpiry) {
  206. continue;
  207. }
  208. $multi->openHandles[$i][0] = 0;
  209. }
  210. $hasActivity = false;
  211. $remaining = &$multi->openHandles[$i][4];
  212. $info = &$multi->openHandles[$i][5];
  213. $e = null;
  214. // Read incoming buffer and write it to the dechunk one
  215. try {
  216. if ($remaining && '' !== $data = (string) fread($h, 0 > $remaining ? 16372 : $remaining)) {
  217. fwrite($buffer, $data);
  218. $hasActivity = true;
  219. $multi->sleep = false;
  220. if (-1 !== $remaining) {
  221. $remaining -= \strlen($data);
  222. }
  223. }
  224. } catch (\Throwable $e) {
  225. $hasActivity = $onProgress = false;
  226. }
  227. if (!$hasActivity) {
  228. if ($onProgress) {
  229. try {
  230. // Notify the progress callback so that it can e.g. cancel
  231. // the request if the stream is inactive for too long
  232. $info['total_time'] = microtime(true) - $info['start_time'];
  233. $onProgress();
  234. } catch (\Throwable $e) {
  235. // no-op
  236. }
  237. }
  238. } elseif ('' !== $data = stream_get_contents($buffer, -1, 0)) {
  239. rewind($buffer);
  240. ftruncate($buffer, 0);
  241. if (null === $e) {
  242. $multi->handlesActivity[$i][] = $data;
  243. }
  244. }
  245. if (null !== $e || !$remaining || feof($h)) {
  246. // Stream completed
  247. $info['total_time'] = microtime(true) - $info['start_time'];
  248. $info['starttransfer_time'] = $info['starttransfer_time'] ?: $info['total_time'];
  249. if ($onProgress) {
  250. try {
  251. $onProgress(-1);
  252. } catch (\Throwable $e) {
  253. // no-op
  254. }
  255. }
  256. if (null === $e) {
  257. if (0 < $remaining) {
  258. $e = new TransportException(sprintf('Transfer closed with %s bytes remaining to read.', $remaining));
  259. } elseif (-1 === $remaining && fwrite($buffer, '-') && '' !== stream_get_contents($buffer, -1, 0)) {
  260. $e = new TransportException('Transfer closed with outstanding data remaining from chunked response.');
  261. }
  262. }
  263. $multi->handlesActivity[$i][] = null;
  264. $multi->handlesActivity[$i][] = $e;
  265. if (null !== ($host = $multi->openHandles[$i][6] ?? null) && 0 >= --$multi->hosts[$host]) {
  266. unset($multi->hosts[$host]);
  267. }
  268. unset($multi->openHandles[$i]);
  269. $multi->sleep = false;
  270. }
  271. }
  272. if (null === $responses) {
  273. return;
  274. }
  275. $maxHosts = $multi->maxHostConnections;
  276. foreach ($responses as $i => $response) {
  277. if (null !== $response->remaining || null === $response->buffer) {
  278. continue;
  279. }
  280. if ($response->pauseExpiry && microtime(true) < $response->pauseExpiry) {
  281. // Create empty open handles to tell we still have pending requests
  282. $multi->openHandles[$i] = [\INF, null, null, null];
  283. } elseif ($maxHosts && $maxHosts > ($multi->hosts[parse_url($response->url, \PHP_URL_HOST)] ?? 0)) {
  284. // Open the next pending request - this is a blocking operation so we do only one of them
  285. $response->open();
  286. $multi->sleep = false;
  287. self::perform($multi);
  288. $maxHosts = 0;
  289. }
  290. }
  291. }
  292. /**
  293. * {@inheritdoc}
  294. *
  295. * @param NativeClientState $multi
  296. */
  297. private static function select(ClientState $multi, float $timeout): int
  298. {
  299. if (!$multi->sleep = !$multi->sleep) {
  300. return -1;
  301. }
  302. $_ = $handles = [];
  303. $now = null;
  304. foreach ($multi->openHandles as [$pauseExpiry, $h]) {
  305. if (null === $h) {
  306. continue;
  307. }
  308. if ($pauseExpiry && ($now ?? $now = microtime(true)) < $pauseExpiry) {
  309. $timeout = min($timeout, $pauseExpiry - $now);
  310. continue;
  311. }
  312. $handles[] = $h;
  313. }
  314. if (!$handles) {
  315. usleep((int) (1E6 * $timeout));
  316. return 0;
  317. }
  318. return stream_select($handles, $_, $_, (int) $timeout, (int) (1E6 * ($timeout - (int) $timeout)));
  319. }
  320. }