����JFIF��x�x����'
| Server IP : 78.140.185.180 / Your IP : 216.73.216.170 Web Server : LiteSpeed System : Linux cpanel13.v.fozzy.com 4.18.0-513.11.1.lve.el8.x86_64 #1 SMP Thu Jan 18 16:21:02 UTC 2024 x86_64 User : builderbox ( 1072) PHP Version : 7.3.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /home/builderbox/./././www/vendor/laravel/horizon/src/Repositories/ |
Upload File : |
<?php
namespace Laravel\Horizon\Repositories;
use Carbon\CarbonImmutable;
use Illuminate\Contracts\Redis\Factory as RedisFactory;
use Laravel\Horizon\Contracts\MetricsRepository;
use Laravel\Horizon\Lock;
use Laravel\Horizon\LuaScripts;
use Laravel\Horizon\WaitTimeCalculator;
class RedisMetricsRepository implements MetricsRepository
{
/**
* The Redis connection instance.
*
* @var \Illuminate\Contracts\Redis\Factory
*/
public $redis;
/**
* Create a new repository instance.
*
* @param \Illuminate\Contracts\Redis\Factory $redis
* @return void
*/
public function __construct(RedisFactory $redis)
{
$this->redis = $redis;
}
/**
* Get all of the class names that have metrics measurements.
*
* @return array
*/
public function measuredJobs()
{
$classes = (array) $this->connection()->smembers('measured_jobs');
return collect($classes)->map(function ($class) {
return preg_match('/job:(.*)$/', $class, $matches) ? $matches[1] : $class;
})->all();
}
/**
* Get all of the queues that have metrics measurements.
*
* @return array
*/
public function measuredQueues()
{
$queues = (array) $this->connection()->smembers('measured_queues');
return collect($queues)->map(function ($class) {
return preg_match('/queue:(.*)$/', $class, $matches) ? $matches[1] : $class;
})->all();
}
/**
* Get the jobs processed per minute since the last snapshot.
*
* @return float
*/
public function jobsProcessedPerMinute()
{
return round($this->throughput() / $this->minutesSinceLastSnapshot());
}
/**
* Get the application's total throughput since the last snapshot.
*
* @return int
*/
public function throughput()
{
return collect($this->measuredQueues())->reduce(function ($carry, $queue) {
return $carry + $this->connection()->hget('queue:'.$queue, 'throughput');
}, 0);
}
/**
* Get the throughput for a given job.
*
* @param string $job
* @return int
*/
public function throughputForJob($job)
{
return $this->throughputFor('job:'.$job);
}
/**
* Get the throughput for a given queue.
*
* @param string $queue
* @return int
*/
public function throughputForQueue($queue)
{
return $this->throughputFor('queue:'.$queue);
}
/**
* Get the throughput for a given key.
*
* @param string $key
* @return int
*/
protected function throughputFor($key)
{
return (int) $this->connection()->hget($key, 'throughput');
}
/**
* Get the average runtime for a given job in milliseconds.
*
* @param string $job
* @return float
*/
public function runtimeForJob($job)
{
return $this->runtimeFor('job:'.$job);
}
/**
* Get the average runtime for a given queue in milliseconds.
*
* @param string $queue
* @return float
*/
public function runtimeForQueue($queue)
{
return $this->runtimeFor('queue:'.$queue);
}
/**
* Get the average runtime for a given key in milliseconds.
*
* @param string $key
* @return float
*/
protected function runtimeFor($key)
{
return (float) $this->connection()->hget($key, 'runtime');
}
/**
* Get the queue that has the longest runtime.
*
* @return int
*/
public function queueWithMaximumRuntime()
{
return collect($this->measuredQueues())->sortBy(function ($queue) {
if ($snapshots = $this->connection()->zrange('snapshot:queue:'.$queue, -1, 1)) {
return json_decode($snapshots[0])->runtime;
}
})->last();
}
/**
* Get the queue that has the most throughput.
*
* @return int
*/
public function queueWithMaximumThroughput()
{
return collect($this->measuredQueues())->sortBy(function ($queue) {
if ($snapshots = $this->connection()->zrange('snapshot:queue:'.$queue, -1, 1)) {
return json_decode($snapshots[0])->throughput;
}
})->last();
}
/**
* Increment the metrics information for a job.
*
* @param string $job
* @param float $runtime
* @return void
*/
public function incrementJob($job, $runtime)
{
$this->connection()->eval(LuaScripts::updateMetrics(), 2,
'job:'.$job, 'measured_jobs', str_replace(',', '.', $runtime)
);
}
/**
* Increment the metrics information for a queue.
*
* @param string $queue
* @param float $runtime
* @return void
*/
public function incrementQueue($queue, $runtime)
{
$this->connection()->eval(LuaScripts::updateMetrics(), 2,
'queue:'.$queue, 'measured_queues', str_replace(',', '.', $runtime)
);
}
/**
* Get all of the snapshots for the given job.
*
* @param string $job
* @return array
*/
public function snapshotsForJob($job)
{
return $this->snapshotsFor('job:'.$job);
}
/**
* Get all of the snapshots for the given queue.
*
* @param string $queue
* @return array
*/
public function snapshotsForQueue($queue)
{
return $this->snapshotsFor('queue:'.$queue);
}
/**
* Get all of the snapshots for the given key.
*
* @param string $key
* @return array
*/
protected function snapshotsFor($key)
{
return collect($this->connection()->zrange('snapshot:'.$key, 0, -1))
->map(function ($snapshot) {
return (object) json_decode($snapshot, true);
})->values()->all();
}
/**
* Store a snapshot of the metrics information.
*
* @return void
*/
public function snapshot()
{
collect($this->measuredJobs())->each(function ($job) {
$this->storeSnapshotForJob($job);
});
collect($this->measuredQueues())->each(function ($queue) {
$this->storeSnapshotForQueue($queue);
});
$this->storeSnapshotTimestamp();
}
/**
* Store a snapshot for the given job.
*
* @param string $job
* @return void
*/
protected function storeSnapshotForJob($job)
{
$data = $this->baseSnapshotData($key = 'job:'.$job);
$this->connection()->zadd(
'snapshot:'.$key, $time = CarbonImmutable::now()->getTimestamp(), json_encode([
'throughput' => $data['throughput'],
'runtime' => $data['runtime'],
'time' => $time,
])
);
$this->connection()->zremrangebyrank(
'snapshot:'.$key, 0, -abs(1 + config('horizon.metrics.trim_snapshots.job', 24))
);
}
/**
* Store a snapshot for the given queue.
*
* @param string $queue
* @return void
*/
protected function storeSnapshotForQueue($queue)
{
$data = $this->baseSnapshotData($key = 'queue:'.$queue);
$this->connection()->zadd(
'snapshot:'.$key, $time = CarbonImmutable::now()->getTimestamp(), json_encode([
'throughput' => $data['throughput'],
'runtime' => $data['runtime'],
'wait' => app(WaitTimeCalculator::class)->calculateFor($queue),
'time' => $time,
])
);
$this->connection()->zremrangebyrank(
'snapshot:'.$key, 0, -abs(1 + config('horizon.metrics.trim_snapshots.queue', 24))
);
}
/**
* Get the base snapshot data for a given key.
*
* @param string $key
* @return array
*/
protected function baseSnapshotData($key)
{
$responses = $this->connection()->transaction(function ($trans) use ($key) {
$trans->hmget($key, ['throughput', 'runtime']);
$trans->del($key);
});
$snapshot = array_values($responses[0]);
return [
'throughput' => $snapshot[0],
'runtime' => $snapshot[1],
];
}
/**
* Get the number of minutes passed since the last snapshot.
*
* @return float
*/
protected function minutesSinceLastSnapshot()
{
$lastSnapshotAt = $this->connection()->get('last_snapshot_at')
?: $this->storeSnapshotTimestamp();
return max(
(CarbonImmutable::now()->getTimestamp() - $lastSnapshotAt) / 60, 1
);
}
/**
* Store the current timestamp as the "last snapshot timestamp".
*
* @return int
*/
protected function storeSnapshotTimestamp()
{
return tap(CarbonImmutable::now()->getTimestamp(), function ($timestamp) {
$this->connection()->set('last_snapshot_at', $timestamp);
});
}
/**
* Attempt to acquire a lock to monitor the queue wait times.
*
* @return bool
*/
public function acquireWaitTimeMonitorLock()
{
return app(Lock::class)->get('monitor:time-to-clear');
}
/**
* Clear the metrics for a key.
*
* @param string $key
* @return void
*/
public function forget($key)
{
$this->connection()->del($key);
}
/**
* Get the Redis connection instance.
*
* @return \Illuminate\Redis\Connections\Connection
*/
public function connection()
{
return $this->redis->connection('horizon');
}
}