Cond 类

(PECL pthreads < 3.0.0)

简介

Warning

pthreads v3 中已经将 Cond 类移除。

Cond 类中的静态方法可以用来直接访问 Posix 的条件变量。

类摘要

Cond {
/* 方法 */
final public static broadcast ( int $condition ) : bool
final public static create ( void ) : int
final public static destroy ( int $condition ) : bool
final public static signal ( int $condition ) : bool
final public static wait ( int $condition , int $mutex [, int $timeout ] ) : bool
}

Table of Contents

User Contributed Notes

312036773 at qq dot com 20-Dec-2015 06:39
public function run(){
        Mutex::lock($this->mutex);
        if($this->account->getBalance() >= drawCost){
            usleep ( 500 );
            $this->account->draw(drawCost);
            echo \Thread::getCurrentThreadId () . "__get__" . drawCost . ",The current Balance is:{$this->account->getBalance()}<br/>";
            Cond::broadcast($this->cond);
        }
        else {
            echo \Thread::getCurrentThreadId () . "__get fail__,the current Balance is:{$this->account->getBalance()}<br/>";

//Use Wait Can be not Running......Why ???

            Cond::wait($this->cond,$this->mutex);
        }
        Mutex::unlock($this->mutex);
  }
jan 25-May-2013 01:11
As Cond (nor Mutex) is good for queuing tasks, and arrays/objects stored a "serialized" (can't use it as excepted for signals outside the run()) here is a simple task queue trait:
<?php
define
('T',1000000);
trait
TaskQueue{
    private
$queue_len = 0;
    private
$queue_pointer = 0;
    private
$queue_curr = 0;
    protected function
queueInit(){$this->queue_len = 0; $this->queue_pointer = 0; $this->queue_curr = 0;}
    protected function
getNextQueued(){
        if (
$this->queue_len == 0) return null;
       
$p = $this->queue_curr;
       
$x = 0;
       
$ret = [];
        do{
           
$key = "queue_func_{$p}";
            if (
$this->$key){
               
$ret['func'] = $this->$key;
                unset(
$this->$key);
               
$key = "queue_params_{$p}";
               
$prs = $this->$key;
                unset(
$this->$key);
               
$ret['params'] = [];
                for (
$i = 0; $i < $prs; $i++){
                   
$key = "queue_param_{$p}_{$i}";
                   
$ret['params'][] =  $this->$key;
                    unset(
$this->$key);
                }
               
$this->queue_len--;
               
$this->queue_curr++;
                return
$ret;
            }
            else
$p++;
            if (
$x++ > 1000){   // the queue is corrupt
               
echo "\n\t\t task queue is corrupt for ".get_called_class()."\n";
               
$this->queue_len = 0;
               
$this->queue_pointer = 0;
                return
null;
            }
        }while(isset(
$this->$key));
        return
null;

    }
    protected function
addToQueue($func,$params = []){
        if (!
is_array($params) || !@method_exists($this,$func)) return false;
       
$p = $this->queue_pointer;
       
$key = "queue_func_{$p}";
       
$this->$key = $func;
       
$key = "queue_params_{$p}";
       
$this->$key = count($params);
       
$i = 0;
        foreach (
$params as $pa){
           
$key = "queue_param_{$p}_{$i}";
           
$this->$key = $pa;
           
$i++;
        }
       
$this->queue_pointer++;
       
$this->queue_len++;
        return
true;
    }
}

class
A extends Thread{
    use
TaskQueue;
    private
$sig_term;
    public function
__construct(){
       
$this->cond = Cond::create();
       
$this->sig_term = false;
       
$this->queueInit(); // threads don't like default parameters
       
$this->start();
    }

    public function
run(){
        while(!
$this->sig_term){
            while((
$qi = $this->getNextQueued())){
               
call_user_func_array([$this,$qi['func']],$qi['params']);
            }
            echo
"now doing MAIN -- ";
           
usleep(1*T);
            echo
"finished MAIN\n";
        }
       
Mutex::destroy($m);
    }

    private function
reallyDoA($a){
        echo
"doing A: {$a} -- ";
       
usleep(0.7*T);
        echo
"finished A\n";

    }
    public function
doA($param1){
       
$this->addToQueue('reallyDoA',[$param1]);
    }
    private function
reallyDoB($a,$b){
        echo
"doing B: {$a},{$b} -- ";
       
usleep(0.1*T);
        echo
"finished B\n";

    }
    public function
doB($param1, $param2){
       
$this->addToQueue('reallyDoB',[$param1,$param2]);
    }
    public function
term(){$this->sig_term = true;}
}

$a = new A();
usleep(1.2*T);
$a->doA(2);
$a->doA(3);
$a->doB("now b","something");
usleep(5.4*T);
$a->doA(5);
$a->doA(7);

usleep(3*T);
$a->term();
$a->join();
?>
very useful if you need syncronization from an outside event (ie: async socket read/write - your run() takes care of reading with timeout / signalig on_packet events, but writing may occur at any time (ie async heartbeeting) and fwrite($sock) while stream_select($sock) is waiting for timeout causes segfaults...)