diff --git a/composer.json b/composer.json index 7f79b38..c06ec0d 100644 --- a/composer.json +++ b/composer.json @@ -15,15 +15,15 @@ "pull-request": "https://github.com/hyperf/hyperf/pulls" }, "require": { - "php": ">=8.1", - "hyperf/codec": "~3.1.0", - "hyperf/collection": "~3.1.0", - "hyperf/command": "~3.1.0", - "hyperf/context": "~3.1.0", - "hyperf/contract": "~3.1.0", - "hyperf/coroutine": "~3.1.0", - "hyperf/stringable": "~3.1.0", - "hyperf/support": "~3.1.0", + "php": ">=8.2", + "hyperf/codec": "~3.2.0", + "hyperf/collection": "~3.2.0", + "hyperf/command": "~3.2.0", + "hyperf/context": "~3.2.0", + "hyperf/contract": "~3.2.0", + "hyperf/coroutine": "~3.2.0", + "hyperf/stringable": "~3.2.0", + "hyperf/support": "~3.2.0", "psr/container": "^1.0 || ^2.0", "psr/event-dispatcher": "^1.0" }, @@ -51,7 +51,7 @@ }, "extra": { "branch-alias": { - "dev-master": "3.1-dev" + "dev-master": "3.2-dev" }, "hyperf": { "config": "Hyperf\\AsyncQueue\\ConfigProvider" diff --git a/publish/async_queue.php b/publish/async_queue.php index e449628..042e27c 100644 --- a/publish/async_queue.php +++ b/publish/async_queue.php @@ -13,6 +13,7 @@ return [ 'default' => [ + 'enable' => true, // Whether to enable auto register consumer process. 'driver' => RedisDriver::class, 'redis' => [ 'pool' => 'default', diff --git a/src/ConfigProvider.php b/src/ConfigProvider.php index 65b2182..f09b4d0 100644 --- a/src/ConfigProvider.php +++ b/src/ConfigProvider.php @@ -32,6 +32,9 @@ public function __invoke(): array ReloadFailedMessageCommand::class, DynamicReloadMessageCommand::class, ], + 'listeners' => [ + Listener\RegisterConsumerProcessesListener::class, + ], 'publish' => [ [ 'id' => 'config', diff --git a/src/ConsumerManager.php b/src/ConsumerManager.php new file mode 100644 index 0000000..d7b029c --- /dev/null +++ b/src/ConsumerManager.php @@ -0,0 +1,63 @@ +container->get(ConfigInterface::class); + $pools = $config->get('async_queue', []); + + foreach ($pools as $pool => $config) { + if (! ($config['enable'] ?? false)) { + continue; + } + + $this->createProcess($pool, $config); + } + } + + protected function createProcess(string $pool, array $config): void + { + $process = new class($this->container, $pool, $config) extends AbstractProcess { + public function __construct( + protected ContainerInterface $container, + protected string $pool, + array $config + ) { + parent::__construct($container); + $this->name = "queue.{$pool}"; + $this->nums = $config['processes'] ?? 1; + } + + public function handle(): void + { + $driver = $this->container->get(DriverFactory::class)->get($this->pool); + $driver->consume(); + } + }; + + ProcessManager::register($process); + } +} diff --git a/src/Functions.php b/src/Functions.php index 5a33578..10ef956 100644 --- a/src/Functions.php +++ b/src/Functions.php @@ -21,8 +21,10 @@ function dispatch(JobInterface $job, ?int $delay = null, ?int $maxAttempts = nul $job->setMaxAttempts($maxAttempts); } + $pool ??= $job->getPoolName(); + return ApplicationContext::getContainer() ->get(DriverFactory::class) - ->get($pool ?? 'default') + ->get($pool) ->push($job, $delay ?? 0); } diff --git a/src/Job.php b/src/Job.php index 313695a..952431c 100644 --- a/src/Job.php +++ b/src/Job.php @@ -38,7 +38,7 @@ public function getMaxAttempts(): int public function uncompress(): static { - foreach ($this as $key => $value) { + foreach (get_object_vars($this) as $key => $value) { if ($value instanceof UnCompressInterface) { $this->{$key} = $value->uncompress(); } @@ -49,7 +49,7 @@ public function uncompress(): static public function compress(): static { - foreach ($this as $key => $value) { + foreach (get_object_vars($this) as $key => $value) { if ($value instanceof CompressInterface) { $this->{$key} = $value->compress(); } @@ -57,4 +57,9 @@ public function compress(): static return $this; } + + public function getPoolName(): string + { + return 'default'; + } } diff --git a/src/JobInterface.php b/src/JobInterface.php index fbd5e07..014adb4 100644 --- a/src/JobInterface.php +++ b/src/JobInterface.php @@ -27,4 +27,6 @@ public function handle(); public function setMaxAttempts(int $maxAttempts): static; public function getMaxAttempts(): int; + + public function getPoolName(): string; } diff --git a/src/JobMessage.php b/src/JobMessage.php index 07b26d8..f389b34 100644 --- a/src/JobMessage.php +++ b/src/JobMessage.php @@ -31,8 +31,6 @@ public function __serialize(): array } return [ - $this->job, // Compatible with old version, will be removed at v3.2 - $this->attempts, // Compatible with old version, will be removed at v3.2 'job' => $this->job, 'attempts' => $this->attempts, ]; @@ -40,13 +38,6 @@ public function __serialize(): array public function __unserialize(array $data): void { - if (array_is_list($data)) { // Compatible with old version, will be removed at v3.2 - $data = [ - 'job' => $data[0], - 'attempts' => $data[1], - ]; - } - $job = $data['job']; if ($job instanceof UnCompressInterface) { diff --git a/src/Listener/RegisterConsumerProcessesListener.php b/src/Listener/RegisterConsumerProcessesListener.php new file mode 100644 index 0000000..525e8d1 --- /dev/null +++ b/src/Listener/RegisterConsumerProcessesListener.php @@ -0,0 +1,39 @@ +container->get(ConsumerManager::class)->run(); + } +} diff --git a/tests/JobMessageTest.php b/tests/JobMessageTest.php index 4816e46..31ef362 100644 --- a/tests/JobMessageTest.php +++ b/tests/JobMessageTest.php @@ -16,7 +16,6 @@ use Hyperf\AsyncQueue\JobMessage; use Hyperf\AsyncQueue\MessageInterface; use HyperfTest\AsyncQueue\Stub\DemoJob; -use HyperfTest\AsyncQueue\Stub\OldJobMessage; use PHPUnit\Framework\Attributes\CoversNothing; use PHPUnit\Framework\TestCase; @@ -36,19 +35,18 @@ public function testJobMessageSerialize() $serialized = $message->__serialize(); - $this->assertEquals($serialized[0], $serialized['job']); - $this->assertEquals($serialized[1], $serialized['attempts']); $this->assertArrayHasKey('job', $serialized); $this->assertArrayHasKey('attempts', $serialized); + $this->assertInstanceOf(JobInterface::class, $serialized['job']); + $this->assertSame(0, $serialized['attempts']); $this->assertInstanceOf(MessageInterface::class, $message); $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(JobInterface::class, $message->job()); $this->assertInstanceOf(DemoJob::class, $message->job()); $this->assertSame($id, $message->job()->id); } - public function testJobMessageSerializeCompatible() + public function testJobMessageUnserialize() { $id = rand(0, 9999); $message = new JobMessage( @@ -57,61 +55,26 @@ public function testJobMessageSerializeCompatible() $serialized = $message->__serialize(); - $this->assertEquals($serialized[0], $serialized['job']); - $this->assertEquals($serialized[1], $serialized['attempts']); + $this->assertArrayHasKey('job', $serialized); + $this->assertArrayHasKey('attempts', $serialized); $message = unserialize(serialize($message)); $this->assertInstanceOf(MessageInterface::class, $message); $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(JobInterface::class, $message->job()); $this->assertInstanceOf(DemoJob::class, $message->job()); $this->assertSame($id, $message->job()->id); $this->assertSame(0, $message->getAttempts()); $serialized = [ - 'job' => $serialized['job'] ?? $serialized[0], + 'job' => $serialized['job'], 'attempts' => 3, ]; $message->__unserialize($serialized); $this->assertInstanceOf(MessageInterface::class, $message); $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(JobInterface::class, $message->job()); $this->assertInstanceOf(DemoJob::class, $message->job()); $this->assertSame($id, $message->job()->id); $this->assertSame(3, $message->getAttempts()); - - $serialized = [new DemoJob($id), 5]; - $message->__unserialize($serialized); - - $this->assertInstanceOf(MessageInterface::class, $message); - $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(DemoJob::class, $message->job()); - $this->assertSame($id, $message->job()->id); - $this->assertSame(5, $message->getAttempts()); - } - - public function testUnserializeAsOldJobMessage() - { - $id = rand(0, 9999); - $message = new JobMessage( - new DemoJob($id) - ); - - $serialized = serialize($message); - $serialized = str_replace( - sprintf('O:%d:"%s', strlen(JobMessage::class), JobMessage::class), - sprintf('O:%d:"%s', strlen(OldJobMessage::class), OldJobMessage::class), - $serialized - ); - $message = unserialize($serialized); - - $this->assertInstanceOf(MessageInterface::class, $message); - $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(JobInterface::class, $message->job()); - $this->assertInstanceOf(DemoJob::class, $message->job()); - $this->assertSame($id, $message->job()->id); - $this->assertSame(0, $message->getAttempts()); } } diff --git a/tests/RedisDriverTest.php b/tests/RedisDriverTest.php index cccd3cc..80544b7 100644 --- a/tests/RedisDriverTest.php +++ b/tests/RedisDriverTest.php @@ -116,7 +116,7 @@ public function testAsyncQueueJobGenerate() $driver->push(new DemoJob($id, $model)); $serialized = (string) Context::get('test.async-queue.lpush.value'); - $this->assertSame(264, strlen($serialized)); + $this->assertSame(248, strlen($serialized)); /** @var JobMessage $class */ $class = $packer->unpack($serialized); diff --git a/tests/Stub/OldJobMessage.php b/tests/Stub/OldJobMessage.php deleted file mode 100644 index 53cdd76..0000000 --- a/tests/Stub/OldJobMessage.php +++ /dev/null @@ -1,31 +0,0 @@ -uncompress(); - } - - $this->job = $job; - $this->attempts = $attempts; - } -}