AmpBody.php 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. <?php
  2. /*
  3. * This file is part of the Symfony package.
  4. *
  5. * (c) Fabien Potencier <fabien@symfony.com>
  6. *
  7. * For the full copyright and license information, please view the LICENSE
  8. * file that was distributed with this source code.
  9. */
  10. namespace Symfony\Component\HttpClient\Internal;
  11. use Amp\ByteStream\InputStream;
  12. use Amp\ByteStream\ResourceInputStream;
  13. use Amp\Http\Client\RequestBody;
  14. use Amp\Promise;
  15. use Amp\Success;
  16. use Symfony\Component\HttpClient\Exception\TransportException;
  17. /**
  18. * @author Nicolas Grekas <p@tchwork.com>
  19. *
  20. * @internal
  21. */
  22. class AmpBody implements RequestBody, InputStream
  23. {
  24. private $body;
  25. private array $info;
  26. private \Closure $onProgress;
  27. private ?int $offset = 0;
  28. private int $length = -1;
  29. private ?int $uploaded = null;
  30. /**
  31. * @param \Closure|resource|string $body
  32. */
  33. public function __construct($body, &$info, \Closure $onProgress)
  34. {
  35. $this->info = &$info;
  36. $this->onProgress = $onProgress;
  37. if (\is_resource($body)) {
  38. $this->offset = ftell($body);
  39. $this->length = fstat($body)['size'];
  40. $this->body = new ResourceInputStream($body);
  41. } elseif (\is_string($body)) {
  42. $this->length = \strlen($body);
  43. $this->body = $body;
  44. } else {
  45. $this->body = $body;
  46. }
  47. }
  48. public function createBodyStream(): InputStream
  49. {
  50. if (null !== $this->uploaded) {
  51. $this->uploaded = null;
  52. if (\is_string($this->body)) {
  53. $this->offset = 0;
  54. } elseif ($this->body instanceof ResourceInputStream) {
  55. fseek($this->body->getResource(), $this->offset);
  56. }
  57. }
  58. return $this;
  59. }
  60. public function getHeaders(): Promise
  61. {
  62. return new Success([]);
  63. }
  64. public function getBodyLength(): Promise
  65. {
  66. return new Success($this->length - $this->offset);
  67. }
  68. public function read(): Promise
  69. {
  70. $this->info['size_upload'] += $this->uploaded;
  71. $this->uploaded = 0;
  72. ($this->onProgress)();
  73. $chunk = $this->doRead();
  74. $chunk->onResolve(function ($e, $data) {
  75. if (null !== $data) {
  76. $this->uploaded = \strlen($data);
  77. } else {
  78. $this->info['upload_content_length'] = $this->info['size_upload'];
  79. }
  80. });
  81. return $chunk;
  82. }
  83. public static function rewind(RequestBody $body): RequestBody
  84. {
  85. if (!$body instanceof self) {
  86. return $body;
  87. }
  88. $body->uploaded = null;
  89. if ($body->body instanceof ResourceInputStream) {
  90. fseek($body->body->getResource(), $body->offset);
  91. return new $body($body->body, $body->info, $body->onProgress);
  92. }
  93. if (\is_string($body->body)) {
  94. $body->offset = 0;
  95. }
  96. return $body;
  97. }
  98. private function doRead(): Promise
  99. {
  100. if ($this->body instanceof ResourceInputStream) {
  101. return $this->body->read();
  102. }
  103. if (null === $this->offset || !$this->length) {
  104. return new Success();
  105. }
  106. if (\is_string($this->body)) {
  107. $this->offset = null;
  108. return new Success($this->body);
  109. }
  110. if ('' === $data = ($this->body)(16372)) {
  111. $this->offset = null;
  112. return new Success();
  113. }
  114. if (!\is_string($data)) {
  115. throw new TransportException(sprintf('Return value of the "body" option callback must be string, "%s" returned.', get_debug_type($data)));
  116. }
  117. return new Success($data);
  118. }
  119. }