The parallel\Events class

(0.9.0)

The Event Loop

The Event loop monitors the state of sets of futures and or channels (targets) in order to perform read (parallel\Future::value(), parallel\Channel::recv()) and write (parallel\Channel::send()) operations as the targets become available and the operations may be performed without blocking the event loop.

ÀàÕªÒª

final parallel\Events implements Countable , Traversable {
/* Input */
public setInput ( Input $input ) : void
/* Targets */
public addChannel ( parallel\Channel $channel ) : void
public addFuture ( string $name , parallel\Future $future ) : void
public remove ( string $target ) : void
/* Behaviour */
public setBlocking ( bool $blocking ) : void
public setTimeout ( int $timeout ) : void
/* Polling */
public poll ( void ) : ?Event
}

Table of Contents

User Contributed Notes

s dot laufer at homegear dot email 22-Dec-2019 03:02
<?php
/**
Example showing the usage of events.

The documentation still is very thin, so I'm not sure, the example is the best solution. But it works.
*/
use parallel\{Channel,Runtime,Events,Events\Event};

$myThread = function(Channel $channel) {
   
$events = new Events();
   
$events->addChannel($channel);
   
//$events->setBlocking(false); //Uncomment to don't block on Events::poll()
   
$events->setTimeout(1000000); //Comment when not blocking

   
while(true)
    {
       
/*
        ...
        Your code.
        ...
        */

        //Read all available events
       
try
        {
           
$event = NULL;
            do
            {
               
$event = $events->poll(); //Returns non-null if there is an event
               
if($event && $event->source == 'myChannel')
                {
                   
//It seems, the target gets deleted after returning an event,
                    //so add it again.
                   
$events->addChannel($channel);
                    if(
$event->type == Event\Type::Read)
                    {
                        if(
is_array($event->value) && count($event->value) > 0)
                        {
                            if(
$event->value['name'] == 'stop')
                            {
                                echo
'Stopping thread';
                                return;
//Stop
                           
}
                            else
                            {
                                echo
'Event: '.$event->value['name'].' => '.$event->value['value'].PHP_EOL;
                            }
                        }
                    }
                    else if(
$event->type == Event\Type::Close) return; //Stop
               
}
            }
            while(
$event);
        }
        catch(
Events\Error\Timeout $ex)
        {
           
//Timeout
           
echo 'Timeout'.PHP_EOL;
        }
    }
};

class
MyClass {
    private
$runtime;
    private
$future;
    private
$channel;

    public function
start() {
       
//Create runtime
       
$this->runtime = new Runtime();

       
//Create buffered channel.
        //Buffered channels don't block on Channel::send().
        //Note that target names need to be unique within the process.
       
$this->channel = Channel::make('myChannel', Channel::Infinite);

        global
$myThread;
       
$this->future = $this->runtime->run($myThread, [$this->channel]);
    }

    public function
stop() {
       
$this->channel->send(['name' => 'stop', 'value' => true]);

       
$this->future->value(); //Wait for thread to finish
       
$this->channel->close();
    }

    public function
emit(string $name, $value)
    {
       
$this->channel->send(['name' => $name, 'value' => $value]);
    }
}

$a = new MyClass();
$a->start();

for(
$i = 0; $i < 5; $i++)
{
   
$a->emit('test', $i);
   
sleep(0.5);
}

sleep(2);

for(
$i = 5; $i < 10; $i++)
{
   
$a->emit('test', $i);
   
sleep(0.5);
}

$a->stop();
?>