The parallel\Channel class

(0.9.0)

Unbuffered Channels

An unbuffered channel will block on calls to parallel\Channel::send() until there is a receiver, and block on calls to parallel\Channel::recv() until there is a sender. This means an unbuffered channel is not only a way to share data among tasks but also a simple method of synchronization.

An unbuffered channel is the fastest way to share data among tasks, requiring the least copying.

Buffered Channels

A buffered channel will not block on calls to parallel\Channel::send() until capacity is reached, calls to parallel\Channel::recv() will block until there is data in the buffer.

Closures over Channels

A powerful feature of parallel channels is that they allow the exchange of closures between tasks (and runtimes).

When a closure is sent over a channel the closure is buffered, it doesn't change the buffering of the channel transmitting the closure, but it does effect the static scope inside the closure: The same closure sent to different runtimes, or the same runtime, will not share their static scope.

This means that whenever a closure is executed that was transmitted by a channel, static state will be as it was when the closure was buffered.

Anonymous Channels

The anonymous channel constructor allows the programmer to avoid assigning names to every channel: parallel will generate a unique name for anonymous channels.

ÀàÕªÒª

final parallel\Channel {
/* Anonymous Constructor */
public __construct ( void )
public __construct ( int $capacity )
/* Access */
public make ( string $name ) : Channel
public make ( string $name , int $capacity ) : Channel
public open ( string $name ) : Channel
/* Sharing */
public recv ( void ) : mixed
public send ( mixed $value ) : void
/* Closing */
public close ( void ) : void
/* Constant for Infinitely Buffered */
const Infinite ;
}

Table of Contents

User Contributed Notes

hdvianna 16-Jan-2020 06:00
This is an example of using a channel to produce data for consumers. In this example, the producer Runtime instance will send the time in seconds in which the consumers shall sleep.

<?php

use parallel\{Runtime, Channel};

main($argv);

function
main(array $argv)
{
    if (
count($argv) !== 3) {
        echo
"Type: hello-parallel.php <number-of-tasks> <maximum-time-of-sleep (in seconds)>" . PHP_EOL;
        echo
"Example: hello-parallel.php 5 3" . PHP_EOL;
        die;
    } else {
       
$numberOfTasks = intval($argv[1]);
       
$maximumTimeOfSleep = intval($argv[2]);
       
$t1 = microtime(true);
       
parallelize($numberOfTasks, $maximumTimeOfSleep);
       
$endTime = microtime(true) - $t1;
        echo
PHP_EOL."Finished $numberOfTasks task(s) in {$endTime}s".PHP_EOL;
    }
}

function
parallelize(int $numberOfTasks, int $maximumTimeOfSleep)
{
   
$channel = new Channel();

   
$taskIds = array_map(function () use ($maximumTimeOfSleep) {
        return
$id = uniqid("task::");
    },
range(0, $numberOfTasks - 1));

   
$timesToSleep = array_map(function () use ($maximumTimeOfSleep) {
        return
rand(1, $maximumTimeOfSleep);
    },
$taskIds);

   
$producer = new Runtime();
   
$producerFuture = $producer->run(function (Channel $channel, array $timesToSleep) {
        foreach (
$timesToSleep as $timeToSleep) {
           
$channel->send($timeToSleep);
        }
    }, [
$channel, $timesToSleep]);

   
$consumerFutures = array_map(function (string $id) use ($channel) {
       
$runtime = new Runtime();
        return
$runtime->run(function (string $id, Channel $channel) {
           
$timeToSleep = $channel->recv();
            echo
"Hello from $id. I will sleep for $timeToSleep second(s).".PHP_EOL;
           
sleep($timeToSleep);
            echo
"$id slept for $timeToSleep second(s).".PHP_EOL;
            return
$timeToSleep;
        }, [
$id, $channel]);
    },
$taskIds);

   
wait($consumerFutures);
   
wait([$producerFuture]);
}

function
wait(array $futures)
{
    return
array_map(function ($future) {
        return
$future->value();
    },
$futures);
}
rustysun 18-Sep-2019 10:28
an example used unbuffered channel.
<?php

use parallel\{Channel,Runtime};

$sum=function(array $a, Channel $ch) {
   
$sum=0;
    foreach (
$a as $v) {
       
$sum+=$v;
    }
   
$ch->send($sum);
};
try {
   
$a=[7, 2, 8, 1, 4, 0, 9, 10];
   
//unbuffered channel
   
$runtime=new Runtime;
   
$ch2=new Channel;
   
$runtime->run($sum, [array_slice($a, 0, $num), $ch2]);
   
$runtime->run($sum, [array_slice($a, $num), $ch2]);
   
//receive from channel
   
$x=$ch2->recv();
   
$y=$ch2->recv();
   
$ch2->close();
    echo
"\nch2:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
    echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
    echo
"\nException:", $e->getMessage();
}

//output
//ch2:18  23      41
rustysun 18-Sep-2019 04:08
<?php
use parallel\Channel;

function
sum(array $a, Channel $ch) {
   
$sum=0;
    foreach (
$a as $v) {
       
$sum+=$v;
    }
   
$ch->send($sum);
}

try {
   
$a=[7, 2, 8, 1, 4, 0, 9, 10];
   
$ch1=Channel::make('sum', 2);
   
$ch2=new Channel;
   
$num=count($a) / 2;
   
sum(array_slice($a, 0, $num), $ch1);
   
sum(array_slice($a, $num), $ch1);

   
//receive from channel
   
$x=$ch1->recv();
   
$y=$ch1->recv();
   
$ch1->close();
    echo
"\nch1:", $x, "\t", $y, "\t", $x + $y, "\n";
} catch(
Error $err) {
    echo
"\nError:", $err->getMessage();
} catch(
Exception $e) {
    echo
"\nException:", $e->getMessage();
}