PEAR2_Net_Transmitter  1.0.0a1
A wrapper for stream functionality
PEAR2/Net/Transmitter/StreamTransmitter.php
Go to the documentation of this file.
00001 <?php
00002 
00022 namespace PEAR2\Net\Transmitter;
00023 
00038 class StreamTransmitter
00039 {
00040     const DIRECTION_BOTH = '|||';
00041     const DIRECTION_SEND = '<<<';
00042     const DIRECTION_RECEIVE = '>>>';
00043 
00047     protected $stream;
00048 
00052     protected $persist;
00053 
00061     public function __construct($stream)
00062     {
00063         if (!self::isStream($stream)) {
00064             throw $this->createException('Invalid stream supplied.', 1);
00065         }
00066         $this->stream = $stream;
00067         $this->persist = (bool) preg_match(
00068             '#\s?persistent\s?#sm', get_resource_type($stream)
00069         );
00070     }
00071 
00079     public static function isStream($var)
00080     {
00081         return is_resource($var)
00082             && (bool) preg_match('#\s?stream$#sm', get_resource_type($var));
00083     }
00084 
00094     public function isFresh()
00095     {
00096         return ftell($this->stream) === 0;
00097     }
00098     
00108     public function setTimeout($seconds, $microseconds = 0)
00109     {
00110         return stream_set_timeout($this->stream, $seconds, $microseconds);
00111     }
00112     
00122     public function setBuffer($size, $direction = self::DIRECTION_BOTH)
00123     {
00124         switch($direction) {
00125         case self::DIRECTION_SEND:
00126             return stream_set_write_buffer($this->stream, $size) === 0;
00127         case self::DIRECTION_RECEIVE:
00128             return stream_set_read_buffer($this->stream, $size) === 0;
00129         default:
00130             return stream_set_write_buffer($this->stream, $size) === 0
00131                 && stream_set_read_buffer($this->stream, $size) === 0;
00132         }
00133     }
00134 
00142     public function send($string)
00143     {
00144         $bytes = 0;
00145         $bytesToSend = (double) sprintf('%u', strlen($string));
00146         while ($bytes < $bytesToSend) {
00147             if ($this->isAcceptingData()) {
00148                 $bytesNow = @fwrite(
00149                     $this->stream, substr($string, $bytes, 0xFFFFF)
00150                 );
00151                 if (0 != $bytesNow) {
00152                     $bytes += $bytesNow;
00153                 } else {
00154                     throw $this->createException(
00155                         'Failed while sending string.', 2
00156                     );
00157                 }
00158             }
00159         }
00160         return $bytes;
00161     }
00162 
00170     public function sendStream($stream)
00171     {
00172         $bytes = 0;
00173         while (!feof($stream)) {
00174             if ($this->isAcceptingData()) {
00175                 $bytesNow = @stream_copy_to_stream(
00176                     $stream, $this->stream, 0xFFFFF
00177                 );
00178                 if (0 != $bytesNow) {
00179                     $bytes += $bytesNow;
00180                 } else {
00181                     throw $this->createException(
00182                         'Failed while sending stream.', 3
00183                     );
00184                 }
00185             }
00186         }
00187         fseek($stream, -$bytes, SEEK_CUR);
00188         return $bytes;
00189     }
00190 
00202     public function receive($length, $what = 'data')
00203     {
00204         $result = '';
00205         while ($length > 0) {
00206             if ($this->isAvailable()) {
00207                 while ($this->isDataAwaiting()) {
00208                     $fragment = fread($this->stream, min($length, 0xFFFFF));
00209                     if ('' !== $fragment) {
00210                         $length -= strlen($fragment);
00211                         $result .= $fragment;
00212                         continue 2;
00213                     }
00214                 }
00215             }
00216             throw $this->createException(
00217                 "Failed while receiving {$what}", 4
00218             );
00219         }
00220         return $result;
00221     }
00222 
00237     public function receiveStream(
00238         $length, array $filters = array(), $what = 'stream data'
00239     ) {
00240         $result = fopen('php://temp', 'r+b');
00241         $appliedFilters = array();
00242         foreach ($filters as $filtername => $params) {
00243             $appliedFilters[] = stream_filter_append(
00244                 $result, $filtername, STREAM_FILTER_WRITE, $params
00245             );
00246         }
00247         
00248         while ($length > 0) {
00249             if ($this->isAvailable()) {
00250                 while ($this->isDataAwaiting()) {
00251                     $fragment = fread($this->stream, min($length, 0xFFFFF));
00252                     if ('' !== $fragment) {
00253                         $length -= strlen($fragment);
00254                         fwrite($result, $fragment);
00255                         continue 2;
00256                     }
00257                 }
00258             }
00259             throw $this->createException(
00260                 "Failed while receiving {$what}", 5
00261             );
00262         }
00263         
00264         foreach ($appliedFilters as $filter) {
00265             stream_filter_remove($filter);
00266         }
00267         rewind($result);
00268         return $result;
00269     }
00270 
00276     public function isAvailable()
00277     {
00278         return self::isStream($this->stream) && !feof($this->stream);
00279     }
00280 
00286     public function isDataAwaiting()
00287     {
00288         return $this->isAvailable();
00289     }
00290 
00297     public function isAcceptingData()
00298     {
00299         $r = $e = null;
00300         $w = array($this->stream);
00301         return self::isStream($this->stream)
00302             && 1 === @/* due to PHP bug #54563 */stream_select($r, $w, $e, 0);
00303     }
00304 
00308     public function __destruct()
00309     {
00310         if (!$this->persist) {
00311             $this->close();
00312         }
00313     }
00314 
00320     public function close()
00321     {
00322         return self::isStream($this->stream) && fclose($this->stream);
00323     }
00324 
00336     protected function createException($message, $code = 0)
00337     {
00338         return new \Exception($message, $code);
00339     }
00340 
00341 }