PK –
žUvT+R` ` .gitmodulesnu €žÙ˜ [submodule "docs/.shared"]
path = docs/.shared
url = https://github.com/amphp/amphp.github.io
PK –
žU‰u¤N N .valgrindrcnu €žÙ˜ --error-limit=no
--trace-children=yes
--track-fds=yes
--undef-value-errors=no
PK –
žU’v3µ phpdoc.dist.xmlnu €žÙ˜
Amp
build/docs
utf8
build/docs
warn
build/log/docs/{DATE}.log
src
PK –
žUeŠ´µ§ § lib/Worker/CallableTask.phpnu €žÙ˜ callable = $callable;
$this->args = $args;
}
public function run(Environment $environment)
{
if ($this->callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
}
if (\is_array($this->callable) && ($this->callable[0] ?? null) instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance method as a callable, the class must be autoloadable');
}
if (!\is_callable($this->callable)) {
$message = 'User-defined functions must be autoloadable (that is, defined in a file autoloaded by composer)';
if (\is_string($this->callable)) {
$message .= \sprintf("; unable to load function '%s'", $this->callable);
}
throw new \Error($message);
}
return ($this->callable)(...$this->args);
}
}
PK –
žUKRƒ lib/Worker/WorkerFactory.phpnu €žÙ˜
*/
function enqueue(Task $task): Promise
{
return pool()->enqueue($task);
}
/**
* Enqueues a callable to be executed by the global worker pool.
*
* @param callable $callable Callable needs to be serializable.
* @param mixed ...$args Arguments have to be serializable.
*
* @return Promise
*/
function enqueueCallable(callable $callable, ...$args)
{
return enqueue(new CallableTask($callable, $args));
}
/**
* Gets a worker from the global worker pool.
*
* @return \Amp\Parallel\Worker\Worker
*/
function worker(): Worker
{
return pool()->getWorker();
}
/**
* Creates a worker using the global worker factory.
*
* @return \Amp\Parallel\Worker\Worker
*/
function create(): Worker
{
return factory()->create();
}
/**
* Gets or sets the global worker factory.
*
* @param WorkerFactory|null $factory
*
* @return WorkerFactory
*/
function factory(WorkerFactory $factory = null): WorkerFactory
{
if ($factory === null) {
$factory = Loop::getState(LOOP_FACTORY_IDENTIFIER);
if ($factory) {
return $factory;
}
$factory = new DefaultWorkerFactory;
}
Loop::setState(LOOP_FACTORY_IDENTIFIER, $factory);
return $factory;
}
PK –
žUþÿÖÍú ú lib/Worker/BasicEnvironment.phpnu €žÙ˜ queue = $queue = new \SplPriorityQueue;
$data = &$this->data;
$this->timer = Loop::repeat(1000, static function (string $watcherId) use ($queue, &$data): void {
$time = \time();
while (!$queue->isEmpty()) {
list($key, $expiration) = $queue->top();
if (!isset($data[$key])) {
// Item removed.
$queue->extract();
continue;
}
$struct = $data[$key];
if ($struct->expire === 0) {
// Item was set again without a TTL.
$queue->extract();
continue;
}
if ($struct->expire !== $expiration) {
// Expiration changed or TTL updated.
$queue->extract();
continue;
}
if ($time < $struct->expire) {
// Item at top has not expired, break out of loop.
break;
}
unset($data[$key]);
$queue->extract();
}
if ($queue->isEmpty()) {
Loop::disable($watcherId);
}
});
Loop::disable($this->timer);
Loop::unreference($this->timer);
}
/**
* @param string $key
*
* @return bool
*/
public function exists(string $key): bool
{
return isset($this->data[$key]);
}
/**
* @param string $key
*
* @return mixed|null Returns null if the key does not exist.
*/
public function get(string $key)
{
if (!isset($this->data[$key])) {
return null;
}
$struct = $this->data[$key];
if ($struct->ttl !== null) {
$expire = \time() + $struct->ttl;
if ($struct->expire < $expire) {
$struct->expire = $expire;
$this->queue->insert([$key, $struct->expire], -$struct->expire);
}
}
return $struct->data;
}
/**
* @param string $key
* @param mixed $value Using null for the value deletes the key.
* @param int $ttl Number of seconds until data is automatically deleted. Use null for unlimited TTL.
*
* @throws \Error If the time-to-live is not a positive integer.
*/
public function set(string $key, $value, int $ttl = null): void
{
if ($value === null) {
$this->delete($key);
return;
}
if ($ttl !== null && $ttl <= 0) {
throw new \Error("The time-to-live must be a positive integer or null");
}
$struct = new class {
use Struct;
public $data;
public $expire = 0;
public $ttl;
};
$struct->data = $value;
if ($ttl !== null) {
$struct->ttl = $ttl;
$struct->expire = \time() + $ttl;
$this->queue->insert([$key, $struct->expire], -$struct->expire);
Loop::enable($this->timer);
}
$this->data[$key] = $struct;
}
/**
* @param string $key
*/
public function delete(string $key): void
{
unset($this->data[$key]);
}
/**
* Alias of exists().
*
* @param $key
*
* @return bool
*/
public function offsetExists($key): bool
{
return $this->exists($key);
}
/**
* Alias of get().
*
* @param string $key
*
* @return mixed
*/
#[\ReturnTypeWillChange]
public function offsetGet($key)
{
return $this->get($key);
}
/**
* Alias of set() with $ttl = null.
*
* @param string $key
* @param mixed $value
*/
public function offsetSet($key, $value): void
{
$this->set($key, $value);
}
/**
* Alias of delete().
*
* @param string $key
*/
public function offsetUnset($key): void
{
$this->delete($key);
}
/**
* Removes all values.
*/
public function clear(): void
{
$this->data = [];
Loop::disable($this->timer);
$this->queue = new \SplPriorityQueue;
}
}
PK –
žU¸6äþÙ Ù lib/Worker/TaskRunner.phpnu €žÙ˜ channel = $channel;
$this->environment = $environment;
}
/**
* Runs the task runner, receiving tasks from the parent and sending the result of those tasks.
*
* @return \Amp\Promise
*/
public function run(): Promise
{
return new Coroutine($this->execute());
}
/**
* @coroutine
*
* @return \Generator
*/
private function execute(): \Generator
{
$job = yield $this->channel->receive();
while ($job instanceof Internal\Job) {
try {
$result = yield call([$job->getTask(), "run"], $this->environment);
$result = new Internal\TaskSuccess($job->getId(), $result);
} catch (\Throwable $exception) {
$result = new Internal\TaskFailure($job->getId(), $exception);
}
$job = null; // Free memory from last job.
try {
yield $this->channel->send($result);
} catch (SerializationException $exception) {
// Could not serialize task result.
yield $this->channel->send(new Internal\TaskFailure($result->getId(), $exception));
}
$result = null; // Free memory from last result.
$job = yield $this->channel->receive();
}
return $job;
}
}
PK –
žUojG¦™ ™ lib/Worker/Pool.phpnu €žÙ˜ isRunning()) {
throw new \Error("The context was already running");
}
$this->context = $context;
$context = &$this->context;
$pending = &$this->pending;
\register_shutdown_function(static function () use (&$context, &$pending): void {
if ($context === null || !$context->isRunning()) {
return;
}
try {
Promise\wait(Promise\timeout(call(function () use ($context, $pending): \Generator {
if ($pending) {
yield $pending;
}
yield $context->send(0);
return yield $context->join();
}), self::SHUTDOWN_TIMEOUT));
} catch (\Throwable $exception) {
if ($context !== null) {
$context->kill();
}
}
});
}
/**
* {@inheritdoc}
*/
public function isRunning(): bool
{
// Report as running unless shutdown or crashed.
return !$this->started || ($this->exitStatus === null && $this->context !== null && $this->context->isRunning());
}
/**
* {@inheritdoc}
*/
public function isIdle(): bool
{
return $this->pending === null;
}
/**
* {@inheritdoc}
*/
public function enqueue(Task $task): Promise
{
if ($this->exitStatus) {
throw new StatusError("The worker has been shut down");
}
$promise = $this->pending = call(function () use ($task): \Generator {
if ($this->pending) {
try {
yield $this->pending;
} catch (\Throwable $exception) {
// Ignore error from prior job.
}
}
if ($this->exitStatus !== null || $this->context === null) {
throw new WorkerException("The worker was shutdown");
}
if (!$this->context->isRunning()) {
if ($this->started) {
throw new WorkerException("The worker crashed");
}
$this->started = true;
yield $this->context->start();
}
$job = new Internal\Job($task);
try {
yield $this->context->send($job);
$result = yield $this->context->receive();
} catch (ChannelException $exception) {
try {
yield Promise\timeout($this->context->join(), self::ERROR_TIMEOUT);
} catch (TimeoutException $timeout) {
$this->kill();
throw new WorkerException("The worker failed unexpectedly", 0, $exception);
}
throw new WorkerException("The worker exited unexpectedly", 0, $exception);
}
if (!$result instanceof Internal\TaskResult) {
$this->kill();
throw new WorkerException("Context did not return a task result");
}
if ($result->getId() !== $job->getId()) {
$this->kill();
throw new WorkerException("Task results returned out of order");
}
return $result->promise();
});
$promise->onResolve(function () use ($promise): void {
if ($this->pending === $promise) {
$this->pending = null;
}
});
return $promise;
}
/**
* {@inheritdoc}
*/
public function shutdown(): Promise
{
if ($this->exitStatus !== null) {
return $this->exitStatus;
}
if ($this->context === null || !$this->context->isRunning()) {
return $this->exitStatus = new Success(-1); // Context crashed?
}
return $this->exitStatus = call(function (): \Generator {
if ($this->pending) {
// If a task is currently running, wait for it to finish.
yield Promise\any([$this->pending]);
}
yield $this->context->send(0);
try {
return yield Promise\timeout($this->context->join(), self::SHUTDOWN_TIMEOUT);
} catch (\Throwable $exception) {
$this->context->kill();
throw new WorkerException("Failed to gracefully shutdown worker", 0, $exception);
} finally {
// Null properties to free memory because the shutdown function has references to these.
$this->context = null;
$this->pending = null;
}
});
}
/**
* {@inheritdoc}
*/
public function kill(): void
{
if ($this->exitStatus !== null || $this->context === null) {
return;
}
if ($this->context->isRunning()) {
$this->context->kill();
$this->exitStatus = new Failure(new WorkerException("The worker was killed"));
return;
}
$this->exitStatus = new Success(0);
// Null properties to free memory because the shutdown function has references to these.
$this->context = null;
$this->pending = null;
}
}
PK –
žU!AÙg # lib/Worker/DefaultWorkerFactory.phpnu €žÙ˜ className = $envClassName;
}
/**
* {@inheritdoc}
*
* The type of worker created depends on the extensions available. If multi-threading is enabled, a WorkerThread
* will be created. If threads are not available a WorkerProcess will be created.
*/
public function create(): Worker
{
if (Parallel::isSupported()) {
return new WorkerParallel($this->className);
}
if (Thread::isSupported()) {
return new WorkerThread($this->className);
}
return new WorkerProcess(
$this->className,
[],
\getenv("AMP_PHP_BINARY") ?: (\defined("AMP_PHP_BINARY") ? \AMP_PHP_BINARY : null)
);
}
}
PK –
žU‚Š¢¢ ¢ lib/Worker/TaskException.phpnu €žÙ˜ name = $name;
$this->trace = $trace;
}
/**
* @deprecated Use TaskFailureThrowable::getOriginalClassName() instead.
*
* Returns the class name of the exception thrown from the task.
*
* @return string
*/
public function getName(): string
{
return $this->name;
}
/**
* @deprecated Use TaskFailureThrowable::getOriginalTraceAsString() instead.
*
* Gets the stack trace at the point the exception was thrown in the task.
*
* @return string
*/
public function getWorkerTrace(): string
{
return $this->trace;
}
}
PK –
žUÇßT T lib/Worker/WorkerProcess.phpnu €žÙ˜ originalMessage = $message;
$this->originalCode = $code;
$this->originalTrace = $trace;
}
/**
* @return string Original exception class name.
*/
public function getOriginalClassName(): string
{
return $this->getName();
}
/**
* @return string Original exception message.
*/
public function getOriginalMessage(): string
{
return $this->originalMessage;
}
/**
* @return int|string Original exception code.
*/
public function getOriginalCode()
{
return $this->originalCode;
}
/**
* Returns the original exception stack trace.
*
* @return array Same as {@see Throwable::getTrace()}, except all function arguments are formatted as strings.
*/
public function getOriginalTrace(): array
{
return $this->originalTrace;
}
/**
* Original backtrace flattened to a human-readable string.
*
* @return string
*/
public function getOriginalTraceAsString(): string
{
return $this->getWorkerTrace();
}
}
PK –
žU9FÍ>Æ Æ lib/Worker/TaskFailureError.phpnu €žÙ˜ originalMessage = $message;
$this->originalCode = $code;
$this->originalTrace = $trace;
}
/**
* @return string Original exception class name.
*/
public function getOriginalClassName(): string
{
return $this->getName();
}
/**
* @return string Original exception message.
*/
public function getOriginalMessage(): string
{
return $this->originalMessage;
}
/**
* @return int|string Original exception code.
*/
public function getOriginalCode()
{
return $this->originalCode;
}
/**
* Returns the original exception stack trace.
*
* @return array Same as {@see Throwable::getTrace()}, except all function arguments are formatted as strings.
*/
public function getOriginalTrace(): array
{
return $this->originalTrace;
}
/**
* Original backtrace flattened to a human-readable string.
*
* @return string
*/
public function getOriginalTraceAsString(): string
{
return $this->getWorkerTrace();
}
}
PK –
žUÅWEÀ lib/Worker/Environment.phpnu €žÙ˜ bindTo(null, null)();
}
if (!\class_exists($className)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
}
if (!\is_subclass_of($className, Environment::class)) {
throw new \Error(\sprintf("The class '%s' does not implement '%s'", $className, Environment::class));
}
$environment = new $className;
if (!\defined("AMP_WORKER")) {
\define("AMP_WORKER", \AMP_CONTEXT);
}
$runner = new TaskRunner($channel, $environment);
return $runner->run();
}, $envClassName, $bootstrapPath));
}
}
PK –
žUM¯:BÉ É &