场景:订单超时事件、礼品券过期事件, 类似这些场景的, 都可以使用延迟队列来处理, 由于 beanstalkd 是单进程, 可以考虑结合 workerman 创建多进程来处理任务
文档地址
https://www.kancloud.cn/vson/php-message-queue/891904
安装
1 | wget https://github.com/beanstalkd/beanstalkd/archive/v1.11.tar.gz |
相关工具
1 | # beanstalk CLI 工具 |
DEMO
1 | # producer |
Windows 安装 Beanstalkd
下载并安装 cygwin
1
21. 添加镜像地址 - http://mirrors.aliyun.com
2. 下载 `gcc`、`gcc-core`、`make`、`automake`, 在 Devel 分支下下载 beanstalkd-win 包
1
21. 解压并进入beanstalkd-win目录
2. 打开CMD窗口,运行 ./beanstalkd.exe -l 127.0.0.1 -p 11300
-
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
/**
* beanstalk: A minimalistic PHP beanstalk client.
*
* Copyright (c) 2009-2013 David Persson
*
* Distributed under the terms of the MIT License.
* Redistributions of files must retain the above copyright notice.
*
* @copyright 2009-2013 David Persson <nperson@gmx.de>
* @license http://www.opensource.org/licenses/mit-license.php The MIT License
* @link http://github.com/davidpersson/beanstalk
*/
/**
* An interface to the beanstalk queue service. Implements the beanstalk
* protocol spec 1.2. Where appropriate the documentation from the protcol has
* been added to the docblocks in this class.
*
* @link https://github.com/kr/beanstalkd/blob/master/doc/protocol.txt
*/
class BeanstalkdService
{
/**
* Holds a boolean indicating whether a connection to the server is
* currently established or not.
*
* @var boolean
*/
public $connected = false;
/**
* Holds configuration values.
*
* @var array
*/
protected $_config = [];
/**
* The current connection resource handle (if any).
*
* @var resource
*/
protected $_connection;
/**
* Generated errors. Will hold a maximum of 200 error messages at any time
* to prevent pilling up messages and using more and more memory. This is
* especially important if this class is used in long-running workers.
*
* @see Socket_Beanstalk::errors()
* @see Socket_Beanstalk::_error()
* @var array
*/
protected $_errors = [];
/**
* Constructor.
*
* @param array $config An array of configuration values:
* - `'persistent'` Whether to make the connection persistent or
* not, defaults to `true` as the FAQ recommends
* persistent connections.
* - `'host'` The beanstalk server hostname or IP address to
* connect to, defaults to `127.0.0.1`.
* - `'port'` The port of the server to connect to, defaults
* to `11300`.
* - `'timeout'` Timeout in seconds when establishing the
* connection, defaults to `1`.
*
* @return void
*/
public function __construct(array $config = [])
{
$defaults = [
'persistent' => true,
'host' => '127.0.0.1',
'port' => 11300,
'timeout' => 1
];
$this->_config = $config + $defaults;
}
/**
* Destructor, disconnects from the server.
*
* @return void
*/
public function __destruct()
{
$this->disconnect();
}
/**
* Initiates a socket connection to the beanstalk server. The resulting
* stream will not have any timeout set on it. Which means it can wait an
* unlimited amount of time until a packet becomes available. This is
* required for doing blocking reads.
*
* @return boolean `true` if the connection was established, `false` otherwise.
* @see Socket_Beanstalk::reserve()
* @see Socket_Beanstalk::$_connection
*/
public function connect()
{
if (isset($this->_connection)) {
$this->disconnect();
}
$function = $this->_config['persistent'] ? 'pfsockopen' : 'fsockopen';
$errNum = null;
$errStr = null;
$params = [$this->_config['host'], $this->_config['port'], &$errNum, &$errStr];
if ($this->_config['timeout']) {
$params[] = $this->_config['timeout'];
}
$this->_connection = @call_user_func_array($function, $params);
if (!empty($errNum) || !empty($errStr)) {
$this->_error("{$errNum}: {$errStr}");
}
$this->connected = is_resource($this->_connection);
if ($this->connected) {
stream_set_timeout($this->_connection, -1);
}
return $this->connected;
}
/**
* Closes the connection to the beanstalk server.
*
* @return boolean `true` if diconnecting was successful.
*/
public function disconnect()
{
if (!is_resource($this->_connection)) {
$this->connected = false;
} else {
$this->connected = !fclose($this->_connection);
if (!$this->connected) {
$this->_connection = null;
}
}
return !$this->connected;
}
/**
* Returns collected error messages.
*
* @return array An array of error messages.
*/
public function errors()
{
return $this->_errors;
}
/**
* Pushes an error message to `Beanstalk::$_errors`. Ensures
* that at any point there are not more than 200 messages.
*
* @param string $message The error message.
*
* @return void
*/
protected function _error($message)
{
if (count($this->_errors) >= 200) {
array_shift($this->_errors);
}
array_push($this->_errors, $message);
}
/**
* Writes a packet to the socket. Prior to writing to the socket will check
* for availability of the connection.
*
* @param string $data
*
* @return integer|boolean number of written bytes or `false` on error.
*/
protected function _write($data)
{
if (!$this->connected && !$this->connect()) {
return false;
}
$data .= "\r\n";
return fwrite($this->_connection, $data, strlen($data));
}
/**
* Reads a packet from the socket. Prior to reading from the socket will
* check for availability of the connection.
*
* @param int $length Number of bytes to read.
*
* @return string|boolean Data or `false` on error.
*/
protected function _read($length = null)
{
if (!$this->connected && !$this->connect()) {
return false;
}
if ($length) {
if (feof($this->_connection)) {
return false;
}
$data = stream_get_contents($this->_connection, $length + 2);
$meta = stream_get_meta_data($this->_connection);
if ($meta['timed_out']) {
$this->_error('Connection timed out.');
return false;
}
$packet = rtrim($data, "\r\n");
} else {
$packet = stream_get_line($this->_connection, 16384, "\r\n");
}
return $packet;
}
/* Producer Commands */
/**
* The `put` command is for any process that wants to insert a job into the queue.
*
* @param integer $pri Jobs with smaller priority values will be scheduled
* before jobs with larger priorities. The most urgent priority is
* 0; the least urgent priority is 4294967295.
* @param integer $delay Seconds to wait before putting the job in the
* ready queue. The job will be in the "delayed" state during this time.
* @param integer $ttr Time to run - Number of seconds to allow a worker to
* run this job. The minimum ttr is 1.
* @param string $data The job body.
*
* @return integer|boolean `false` on error otherwise an integer indicating
* the job id.
*/
public function put($pri, $delay, $ttr, $data)
{
$this->_write(sprintf("put %d %d %d %d\r\n%s", $pri, $delay, $ttr, strlen($data), $data));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'INSERTED':
case 'BURIED':
return (integer)strtok(' '); // job id
case 'EXPECTED_CRLF':
case 'JOB_TOO_BIG':
default:
$this->_error($status);
return false;
}
}
/**
* The `use` command is for producers. Subsequent put commands will put jobs into
* the tube specified by this command. If no use command has been issued, jobs
* will be put into the tube named `default`.
*
* Please note that while obviously this method should better be named
* `use` it is not. This is because `use` is a reserved keyword in PHP.
*
* @param string $tube A name at most 200 bytes. It specifies the tube to
* use. If the tube does not exist, it will be created.
*
* @return string|boolean `false` on error otherwise the name of the tube.
*/
public function choose($tube)
{
$this->_write(sprintf('use %s', $tube));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'USING':
return strtok(' ');
default:
$this->_error($status);
return false;
}
}
/**
* Alias for choose.
*
* @param string $tube
*
* @return string|boolean
* @see Socket_Beanstalk::choose()
*/
public function useTube($tube)
{
return $this->choose($tube);
}
/* Worker Commands */
/**
* Reserve a job (with a timeout)
*
* @param integer $timeout If given specifies number of seconds to wait for
* a job. 0 returns immediately.
*
* @return array|false `false` on error otherwise an array holding job id
* and body.
*/
public function reserve($timeout = null)
{
if (isset($timeout)) {
$this->_write(sprintf('reserve-with-timeout %d', $timeout));
} else {
$this->_write('reserve');
}
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'RESERVED':
return [
'id' => (integer)strtok(' '),
'body' => $this->_read((integer)strtok(' '))
];
case 'DEADLINE_SOON':
case 'TIMED_OUT':
default:
$this->_error($status);
return false;
}
}
/**
* Removes a job from the server entirely.
*
* @param integer $id The id of the job.
*
* @return boolean `false` on error, `true` on success.
*/
public function delete($id)
{
$this->_write(sprintf('delete %d', $id));
$status = $this->_read();
switch ($status) {
case 'DELETED':
return true;
case 'NOT_FOUND':
default:
$this->_error($status);
return false;
}
}
/**
* Puts a reserved job back into the ready queue.
*
* @param integer $id The id of the job.
* @param integer $pri Priority to assign to the job.
* @param integer $delay Number of seconds to wait before putting the job in the ready queue.
*
* @return boolean `false` on error, `true` on success.
*/
public function release($id, $pri, $delay)
{
$this->_write(sprintf('release %d %d %d', $id, $pri, $delay));
$status = $this->_read();
switch ($status) {
case 'RELEASED':
case 'BURIED':
return true;
case 'NOT_FOUND':
default:
$this->_error($status);
return false;
}
}
/**
* Puts a job into the `buried` state Buried jobs are put into a FIFO
* linked list and will not be touched until a client kicks them.
*
* @param integer $id The id of the job.
* @param integer $pri *New* priority to assign to the job.
*
* @return boolean `false` on error, `true` on success.
*/
public function bury($id, $pri)
{
$this->_write(sprintf('bury %d %d', $id, $pri));
$status = $this->_read();
switch ($status) {
case 'BURIED':
return true;
case 'NOT_FOUND':
default:
$this->_error($status);
return false;
}
}
/**
* Allows a worker to request more time to work on a job
*
* @param integer $id The id of the job.
*
* @return boolean `false` on error, `true` on success.
*/
public function touch($id)
{
$this->_write(sprintf('touch %d', $id));
$status = $this->_read();
switch ($status) {
case 'TOUCHED':
return true;
case 'NOT_TOUCHED':
default:
$this->_error($status);
return false;
}
}
/**
* Adds the named tube to the watch list for the current
* connection.
*
* @param string $tube Name of tube to watch.
*
* @return integer|boolean `false` on error otherwise number of tubes in watch list.
*/
public function watch($tube)
{
$this->_write(sprintf('watch %s', $tube));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'WATCHING':
return (integer)strtok(' ');
default:
$this->_error($status);
return false;
}
}
/**
* Remove the named tube from the watch list.
*
* @param string $tube Name of tube to ignore.
*
* @return integer|boolean `false` on error otherwise number of tubes in watch list.
*/
public function ignore($tube)
{
$this->_write(sprintf('ignore %s', $tube));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'WATCHING':
return (integer)strtok(' ');
case 'NOT_IGNORED':
default:
$this->_error($status);
return false;
}
}
/* Other Commands */
/**
* Inspect a job by its id.
*
* @param integer $id The id of the job.
*
* @return string|boolean `false` on error otherwise the body of the job.
*/
public function peek($id)
{
$this->_write(sprintf('peek %d', $id));
return $this->_peekRead();
}
/**
* Inspect the next ready job.
*
* @return string|boolean `false` on error otherwise the body of the job.
*/
public function peekReady()
{
$this->_write('peek-ready');
return $this->_peekRead();
}
/**
* Inspect the job with the shortest delay left.
*
* @return string|boolean `false` on error otherwise the body of the job.
*/
public function peekDelayed()
{
$this->_write('peek-delayed');
return $this->_peekRead();
}
/**
* Inspect the next job in the list of buried jobs.
*
* @return string|boolean `false` on error otherwise the body of the job.
*/
public function peekBuried()
{
$this->_write('peek-buried');
return $this->_peekRead();
}
/**
* Handles response for all peek methods.
*
* @return string|boolean|array `false` on error otherwise the body of the job.
*/
protected function _peekRead()
{
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'FOUND':
return [
'id' => (integer)strtok(' '),
'body' => $this->_read((integer)strtok(' '))
];
case 'NOT_FOUND':
default:
$this->_error($status);
return false;
}
}
/**
* Moves jobs into the ready queue (applies to the current tube).
*
* If there are buried jobs those get kicked only otherwise
* delayed jobs get kicked.
*
* @param integer $bound Upper bound on the number of jobs to kick.
*
* @return integer|boolean False on error otherwise number of job kicked.
*/
public function kick($bound)
{
$this->_write(sprintf('kick %d', $bound));
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'KICKED':
return (integer)strtok(' ');
default:
$this->_error($status);
return false;
}
}
/* Stats Commands */
/**
* Gives statistical information about the specified job if it exists.
*
* @param integer $id The job id
*
* @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary
*/
public function statsJob($id)
{
$this->_write(sprintf('stats-job %d', $id));
return $this->_statsRead();
}
/**
* Gives statistical information about the specified tube if it exists.
*
* @param string $tube Name of the tube.
*
* @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary.
*/
public function statsTube($tube)
{
$this->_write(sprintf('stats-tube %s', $tube));
return $this->_statsRead();
}
/**
* Gives statistical information about the system as a whole.
*
* @return string|boolean `false` on error otherwise a string with a yaml formatted dictionary.
*/
public function stats()
{
$this->_write('stats');
return $this->_statsRead();
}
/**
* Returns a list of all existing tubes.
*
* @return string|boolean `false` on error otherwise a string with a yaml formatted list.
*/
public function listTubes()
{
$this->_write('list-tubes');
return $this->_statsRead();
}
/**
* Returns the tube currently being used by the producer.
*
* @return string|boolean `false` on error otherwise a string with the name of the tube.
*/
public function listTubeUsed()
{
$this->_write('list-tube-used');
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'USING':
return strtok(' ');
default:
$this->_error($status);
return false;
}
}
/**
* Alias for listTubeUsed.
*
* @return string|boolean `false` on error otherwise a string with the name of the tube.
* @see Socket_Beanstalk::listTubeUsed()
*/
public function listTubeChosen()
{
return $this->listTubeUsed();
}
/**
* Returns a list of tubes currently being watched by the worker.
*
* @return string|boolean `false` on error otherwise a string with a yaml formatted list.
*/
public function listTubesWatched()
{
$this->_write('list-tubes-watched');
return $this->_statsRead();
}
/**
* Handles responses for all stat methods.
*
* @param boolean $decode Whether to decode data before returning it or not. Default is `true`.
*
* @return array|string|boolean `false` on error otherwise statistical data.
*/
protected function _statsRead($decode = true)
{
$status = strtok($this->_read(), ' ');
switch ($status) {
case 'OK':
$data = $this->_read((integer)strtok(' '));
return $decode ? $this->_decode($data) : $data;
default:
$this->_error($status);
return false;
}
}
/**
* Decodes YAML data. This is a super naive decoder which just works on a
* subset of YAML which is commonly returned by beanstalk.
*
* @param string $data The data in YAML format, can be either a list or a dictionary.
*
* @return array An (associative) array of the converted data.
*/
protected function _decode($data)
{
$data = array_slice(explode("\n", $data), 1);
$result = [];
foreach ($data as $key => $value) {
if ($value[0] === '-') {
$value = ltrim($value, '- ');
} elseif (strpos($value, ':') !== false) {
list($key, $value) = explode(':', $value);
$value = ltrim($value, ' ');
}
if (is_numeric($value)) {
$value = (integer)$value == $value ? (integer)$value : (float)$value;
}
$result[$key] = $value;
}
return $result;
}
} 使用方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
require 'BeanstalkdService.php';
$beanstalk = new BeanstalkdService([
'persistent' => false, // 是否长连接
'host' => '127.0.0.1',
'port' => 11300, // 端口号默认11300
'timeout' => 3 // 连接超时时间
]);
// 判断beanstalk是否已连接
if (!$beanstalk->connect()) {
exit(current($beanstalk->errors()));
}
// 使用test这个tube
//$beanstalk->useTube('test');
//// 往tube增加数据
//// 参数意义:10表示任务的优先级,0不等待直接放到ready队列中,60处理job的超时时间,job内容
//$put = $beanstalk->put(10, 0, 60, 'test');
//if (!$put) {
// exit('commit job fail');
//} else {
// var_dump($put);
//}
set_time_limit(0);
$beanstalk->watch('test');
try {
while (true) {
$job = $beanstalk->reserve();
if (!empty($job)) {
$beanstalk->delete($job['id']);
file_put_contents('D:/test.txt', json_encode($job) . PHP_EOL, FILE_APPEND);
}
}
} catch (Exception $e) {
$beanstalk->disconnect();
exit($e->getMessage());
}