jjzjj

php - Guzzle HTTP客户端多线程下载文件 : EachPromises vs Pool objects

coder 2024-04-19 原文

出于测试目的,我有一个包含 2000 个图像 URI(字符串)的数组,我使用此函数异步下载。经过一些谷歌搜索、测试和尝试后,我想出了 2 个它们都可以工作的函数(老实说 downloadFilesAsync2 抛出一个 InvalidArgumentException在最后一行)。

downloadFilesAsync2 函数基于 GuzzleHttp\Promise\EachPromise 类,downloadFilesAsync1 基于 GuzzleHttp\Pool 类。

这两个函数都很好地异步下载了 2000 个文件,同时限制了 10 个线程。

我知道它们有效,但除此之外别无其他。我想知道是否有人可以解释这两种方法,一种是否比另一种更好,含义等。

// for the purpose of this question i've reduced the array to 5 files!
$uris = array /
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-01.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-02.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-03.jpg",
  "https://cdn.enchufix.com/media/catalog/product/u/n/unix-48120-04.jpg",
);

function downloadFilesAsync2(array $uris, string $dir, $overwrite=true) {
    $client   = new \GuzzleHttp\Client();
    $requests = array();
    foreach ($uris as $i => $uri) {
        $loc = $dir . DIRECTORY_SEPARATOR . basename($uri);
        if ($overwrite && file_exists($loc)) unlink($loc);
        $requests[] = new GuzzleHttp\Psr7\Request('GET', $uri, ['sink' => $loc]);
        echo "Downloading $uri to $loc" . PHP_EOL;
    }
    $pool = new \GuzzleHttp\Pool($client, $requests, [
        'concurrency' => 10,
        'fulfilled' => function (\Psr\Http\Message\ResponseInterface $response, $index) {
            // this is delivered each successful response
            echo 'success: '.$response->getStatusCode().PHP_EOL;
        },
        'rejected' => function ($reason, $index) {
            // this is delivered each failed request
            echo 'failed: '.$reason.PHP_EOL;
        },
    ]);
    $promise = $pool->promise();  // Start transfers and create a promise
    $promise->wait();   // Force the pool of requests to complete.
}

function downloadFilesAsync1(array $uris, string $dir, $overwrite=true) {
    $client = new \GuzzleHttp\Client();
    $promises = (function () use ($client, $uris, $dir, $overwrite) {
        foreach ($uris as $uri) {
            $loc = $dir . DIRECTORY_SEPARATOR . basename($uri);
            if ($overwrite && file_exists($loc)) unlink($loc);
            yield $client->requestAsync('GET', $uri, ['sink' => $loc]);
            echo "Downloading $uri to $loc" . PHP_EOL;
        }
    })();
    (new \GuzzleHttp\Promise\EachPromise(
        $promises, [
        'concurrency' => 10,
        'fulfilled'   => function (\Psr\Http\Message\ResponseInterface $response) {
            //            echo "\t=>\tDONE! status:" . $response->getStatusCode() . PHP_EOL;
        },
        'rejected'    => function ($reason, $index) {
            echo 'ERROR => ' . strtok($reason->getMessage(), "\n") . PHP_EOL;
        },
    ])
    )->promise()->wait();
}

最佳答案

首先,我将解决 downloadFilesAsync2 中的 InvalidArgumentException方法。这种方法实际上有两个问题。两者都与此有关:

$requests[] = $client->request('GET', $uri, ['sink' => $loc]);

第一个问题是 Client::request()是一个同步实用方法,它包装了 $client->requestAsync()->wait() . $client->request()将返回 Psr\Http\Message\ResponseInterface 的实例,结果 $requests[]实际上会填充 ResponseInterface实现。这就是最终导致 InvalidArgumentException 作为 $requests 的原因不包含任何 Psr\Http\Message\RequestInterface的,异常是从 Pool::__construct() 中抛出的.

此方法的更正版本应包含看起来更像的代码:

$requests = [
    new Request('GET', 'www.google.com', [], null, 1.1),
    new Request('GET', 'www.ebay.com', [], null, 1.1),
    new Request('GET', 'www.cnn.com', [], null, 1.1),
    new Request('GET', 'www.red.com', [], null, 1.1),
];

$pool = new Pool($client, $requests, [
    'concurrency' => 10,
    'fulfilled' => function(ResponseInterface $response) {
        // do something
    },
    'rejected' => function($reason, $index) {
        // do something error handling
    },
    'options' => ['sink' => $some_location,],
]);

$promise = $pool->promise();
$promise->wait();

要回答您的第二个问题“这两种方法有什么区别”,答案很简单,没有。为了解释这一点,让我复制并粘贴 Pool::__construct() :

/**
 * @param ClientInterface $client   Client used to send the requests.
 * @param array|\Iterator $requests Requests or functions that return
 *                                  requests to send concurrently.
 * @param array           $config   Associative array of options
 *     - concurrency: (int) Maximum number of requests to send concurrently
 *     - options: Array of request options to apply to each request.
 *     - fulfilled: (callable) Function to invoke when a request completes.
 *     - rejected: (callable) Function to invoke when a request is rejected.
 */
public function __construct(
    ClientInterface $client,
    $requests,
    array $config = []
) {
    // Backwards compatibility.
    if (isset($config['pool_size'])) {
        $config['concurrency'] = $config['pool_size'];
    } elseif (!isset($config['concurrency'])) {
        $config['concurrency'] = 25;
    }

    if (isset($config['options'])) {
        $opts = $config['options'];
        unset($config['options']);
    } else {
        $opts = [];
    }

    $iterable = \GuzzleHttp\Promise\iter_for($requests);
    $requests = function () use ($iterable, $client, $opts) {
        foreach ($iterable as $key => $rfn) {
            if ($rfn instanceof RequestInterface) {
                yield $key => $client->sendAsync($rfn, $opts);
            } elseif (is_callable($rfn)) {
                yield $key => $rfn($opts);
            } else {
                throw new \InvalidArgumentException('Each value yielded by '
                    . 'the iterator must be a Psr7\Http\Message\RequestInterface '
                    . 'or a callable that returns a promise that fulfills '
                    . 'with a Psr7\Message\Http\ResponseInterface object.');
            }
        }
    };

    $this->each = new EachPromise($requests(), $config);
}

现在,如果我们将其与 downloadFilesAsync1 中代码的简化版本进行比较方法:

$promises = (function () use ($client, $uris) {
    foreach ($uris as $uri) {
        yield $client->requestAsync('GET', $uri, ['sink' => $some_location]);
    }
})();
(new \GuzzleHttp\Promise\EachPromise(
    $promises, [
    'concurrency' => 10,
    'fulfilled'   => function (\Psr\Http\Message\ResponseInterface $response) {
        // do something
    },
    'rejected'    => function ($reason, $index) {
        // do something
    },
])
)->promise()->wait();

在这两个示例中,都有一个生成器生成解析为 ResponseInterface 实例的 promise 该生成器连同配置数组(已完成的可调用、已拒绝的可调用、并发)也被送入 EachPromise 的新实例中。 .

总结:

  1. downloadFilesAsync1在功能上与使用 Pool 相同只是没有内置到 Pool::__construct() 中的错误检查.

  2. downloadFilesAsync2 中有一些错误当 Pool 时,这将导致文件在收到 InvalidArgumentException 之前以同步方式下载。被实例化。

我唯一的建议是:使用您觉得更直观的那个。

关于php - Guzzle HTTP客户端多线程下载文件 : EachPromises vs Pool objects,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51361739/

有关php - Guzzle HTTP客户端多线程下载文件 : EachPromises vs Pool objects的更多相关文章

  1. ruby - 如何使用 Ruby aws/s3 Gem 生成安全 URL 以从 s3 下载文件 - 2

    我正在编写一个小脚本来定位aws存储桶中的特定文件,并创建一个临时验证的url以发送给同事。(理想情况下,这将创建类似于在控制台上右键单击存储桶中的文件并复制链接地址的结果)。我研究过回形针,它似乎不符合这个标准,但我可能只是不知道它的全部功能。我尝试了以下方法:defauthenticated_url(file_name,bucket)AWS::S3::S3Object.url_for(file_name,bucket,:secure=>true,:expires=>20*60)end产生这种类型的结果:...-1.amazonaws.com/file_path/file.zip.A

  2. ruby - RuntimeError(自动加载常量 Apps 多线程时检测到循环依赖 - 2

    我收到这个错误:RuntimeError(自动加载常量Apps时检测到循环依赖当我使用多线程时。下面是我的代码。为什么会这样?我尝试多线程的原因是因为我正在编写一个HTML抓取应用程序。对Nokogiri::HTML(open())的调用是一个同步阻塞调用,需要1秒才能返回,我有100,000多个页面要访问,所以我试图运行多个线程来解决这个问题。有更好的方法吗?classToolsController0)app.website=array.join(',')putsapp.websiteelseapp.website="NONE"endapp.saveapps=Apps.order("

  3. ruby - Rails 开发服务器、PDFKit 和多线程 - 2

    我有一个使用PDFKit呈现网页的pdf版本的Rails应用程序。我使用Thin作为开发服务器。问题是当我处于开发模式时。当我使用“bundleexecrailss”启动我的服务器并尝试呈现任何PDF时,整个过程会陷入僵局,因为当您呈现PDF时,会向服务器请求一些额外的资源,如图像和css,看起来只有一个线程.如何配置Rails开发服务器以运行多个工作线程?非常感谢。 最佳答案 我找到的最简单的解决方案是unicorn.geminstallunicorn创建一个unicorn.conf:worker_processes3然后使用它:

  4. ruby-on-rails - HTTParty 的内存问题和下载大文件 - 2

    这会导致Ruby出现内存问题吗?我知道如果大小超过10KB,Open-URI会写入TempFile。但是HTTParty会在写入TempFile之前尝试将整个PDF保存到内存吗?src=Tempfile.new("file.pdf")src.binmodesrc.writeHTTParty.get("large_file.pdf").parsed_response 最佳答案 您可以使用Net::HTTP。参见thedocumentation(特别是标题为“流媒体响应机构”的部分)。这是文档中的示例:uri=URI('http://e

  5. ruby - 强制浏览器下载文件而不是打开文件 - 2

    我要下载http://foobar.com/song.mp3作为song.mp3,而不是让Chrome在其native中打开它浏览器中的播放器。我怎样才能做到这一点? 最佳答案 您只需要确保发送这些header:Content-Disposition:attachment;filename=song.mp3;Content-Type:application/octet-streamContent-Transfer-Encoding:binarysend_file方法为您完成:get'/:file'do|file|file=File.

  6. ruby - 检查网络文件是否存在,而不下载它? - 2

    是否可以在不实际下载文件的情况下检查文件是否存在?我有这么大的(~40mb)文件,例如:http://mirrors.sohu.com/mysql/MySQL-6.0/MySQL-6.0.11-0.glibc23.src.rpm这与ruby​​不严格相关,但如果发件人可以设置内容长度就好了。RestClient.get"http://mirrors.sohu.com/mysql/MySQL-6.0/MySQL-6.0.11-0.glibc23.src.rpm",headers:{"Content-Length"=>100} 最佳答案

  7. ruby - 在 TCPServer (Ruby) 中,我如何从客户端获取 IP/MAC? - 2

    我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!

  8. ruby-on-rails - 这个 C 和 PHP 程序员如何学习 Ruby 和 Rails? - 2

    按照目前的情况,这个问题不适合我们的问答形式。我们希望答案得到事实、引用或专业知识的支持,但这个问题可能会引发辩论、争论、投票或扩展讨论。如果您觉得这个问题可以改进并可能重新打开,visitthehelpcenter指导。关闭9年前。我来自C、php和bash背景,很容易学习,因为它们都有相同的C结构,我可以将其与我已经知道的联系起来。然后2年前我学了Python并且学得很好,Python对我来说比Ruby更容易学。然后从去年开始,我一直在尝试学习Ruby,然后是Rails,我承认,直到现在我还是学不会,讽刺的是那些打着简单易学的烙印,但是对于我这样一个老练的程序员来说,我只是无法将它

  9. Ruby 多线程/多处理读物 - 2

    任何人都可以推荐任何详细介绍Ruby多线程/多处理的复杂性的好的多线程/处理书籍/网站吗?我尝试使用ruby​​线程,基本上在1.9vm上的无死锁代码中它在jruby中遇到了死锁。是的,我意识到差异很大(jruby没有GIL),但我想知道是否有用于ruby​​中多线程编程的策略或类集,我只需要继续阅读。旁注:从java到ruby​​必须定义是否需要重新输入锁,这有点奇怪。 最佳答案 如果你使用Ruby1.9,你可以试试Fiber,它是Ruby中线程的一大改进http://ruby-doc.org/core-1.9/classes/F

  10. ruby-on-rails - 为什么我必须在使用客户验证器后重新加载 rspec 中的记录? - 2

    我有一个模型User,它在创建后的回调中创建了选项#Userhas_one:user_optionsafter_create:create_optionsprivatedefcreate_optionsUserOptions.create(user:self)end我对此有一些简单的Rspec覆盖:describe"newuser"doit"createsuser_optionsaftertheuseriscreated"douser=create(:user)user.user_options.shouldbe_kind_of(UserOptions)endend一切正常,直到我将自

随机推荐