PEAR2_Net_Transmitter
1.0.0a1
A wrapper for stream functionality
|
00001 <?php 00002 00022 namespace PEAR2\Net\Transmitter; 00023 00038 class StreamTransmitter 00039 { 00040 const DIRECTION_BOTH = '|||'; 00041 const DIRECTION_SEND = '<<<'; 00042 const DIRECTION_RECEIVE = '>>>'; 00043 00047 protected $stream; 00048 00052 protected $persist; 00053 00061 public function __construct($stream) 00062 { 00063 if (!self::isStream($stream)) { 00064 throw $this->createException('Invalid stream supplied.', 1); 00065 } 00066 $this->stream = $stream; 00067 $this->persist = (bool) preg_match( 00068 '#\s?persistent\s?#sm', get_resource_type($stream) 00069 ); 00070 } 00071 00079 public static function isStream($var) 00080 { 00081 return is_resource($var) 00082 && (bool) preg_match('#\s?stream$#sm', get_resource_type($var)); 00083 } 00084 00094 public function isFresh() 00095 { 00096 return ftell($this->stream) === 0; 00097 } 00098 00108 public function setTimeout($seconds, $microseconds = 0) 00109 { 00110 return stream_set_timeout($this->stream, $seconds, $microseconds); 00111 } 00112 00122 public function setBuffer($size, $direction = self::DIRECTION_BOTH) 00123 { 00124 switch($direction) { 00125 case self::DIRECTION_SEND: 00126 return stream_set_write_buffer($this->stream, $size) === 0; 00127 case self::DIRECTION_RECEIVE: 00128 return stream_set_read_buffer($this->stream, $size) === 0; 00129 default: 00130 return stream_set_write_buffer($this->stream, $size) === 0 00131 && stream_set_read_buffer($this->stream, $size) === 0; 00132 } 00133 } 00134 00142 public function send($string) 00143 { 00144 $bytes = 0; 00145 $bytesToSend = (double) sprintf('%u', strlen($string)); 00146 while ($bytes < $bytesToSend) { 00147 if ($this->isAcceptingData()) { 00148 $bytesNow = @fwrite( 00149 $this->stream, substr($string, $bytes, 0xFFFFF) 00150 ); 00151 if (0 != $bytesNow) { 00152 $bytes += $bytesNow; 00153 } else { 00154 throw $this->createException( 00155 'Failed while sending string.', 2 00156 ); 00157 } 00158 } 00159 } 00160 return $bytes; 00161 } 00162 00170 public function sendStream($stream) 00171 { 00172 $bytes = 0; 00173 while (!feof($stream)) { 00174 if ($this->isAcceptingData()) { 00175 $bytesNow = @stream_copy_to_stream( 00176 $stream, $this->stream, 0xFFFFF 00177 ); 00178 if (0 != $bytesNow) { 00179 $bytes += $bytesNow; 00180 } else { 00181 throw $this->createException( 00182 'Failed while sending stream.', 3 00183 ); 00184 } 00185 } 00186 } 00187 fseek($stream, -$bytes, SEEK_CUR); 00188 return $bytes; 00189 } 00190 00202 public function receive($length, $what = 'data') 00203 { 00204 $result = ''; 00205 while ($length > 0) { 00206 if ($this->isAvailable()) { 00207 while ($this->isDataAwaiting()) { 00208 $fragment = fread($this->stream, min($length, 0xFFFFF)); 00209 if ('' !== $fragment) { 00210 $length -= strlen($fragment); 00211 $result .= $fragment; 00212 continue 2; 00213 } 00214 } 00215 } 00216 throw $this->createException( 00217 "Failed while receiving {$what}", 4 00218 ); 00219 } 00220 return $result; 00221 } 00222 00237 public function receiveStream( 00238 $length, array $filters = array(), $what = 'stream data' 00239 ) { 00240 $result = fopen('php://temp', 'r+b'); 00241 $appliedFilters = array(); 00242 foreach ($filters as $filtername => $params) { 00243 $appliedFilters[] = stream_filter_append( 00244 $result, $filtername, STREAM_FILTER_WRITE, $params 00245 ); 00246 } 00247 00248 while ($length > 0) { 00249 if ($this->isAvailable()) { 00250 while ($this->isDataAwaiting()) { 00251 $fragment = fread($this->stream, min($length, 0xFFFFF)); 00252 if ('' !== $fragment) { 00253 $length -= strlen($fragment); 00254 fwrite($result, $fragment); 00255 continue 2; 00256 } 00257 } 00258 } 00259 throw $this->createException( 00260 "Failed while receiving {$what}", 5 00261 ); 00262 } 00263 00264 foreach ($appliedFilters as $filter) { 00265 stream_filter_remove($filter); 00266 } 00267 rewind($result); 00268 return $result; 00269 } 00270 00276 public function isAvailable() 00277 { 00278 return self::isStream($this->stream) && !feof($this->stream); 00279 } 00280 00286 public function isDataAwaiting() 00287 { 00288 return $this->isAvailable(); 00289 } 00290 00297 public function isAcceptingData() 00298 { 00299 $r = $e = null; 00300 $w = array($this->stream); 00301 return self::isStream($this->stream) 00302 && 1 === @/* due to PHP bug #54563 */stream_select($r, $w, $e, 0); 00303 } 00304 00308 public function __destruct() 00309 { 00310 if (!$this->persist) { 00311 $this->close(); 00312 } 00313 } 00314 00320 public function close() 00321 { 00322 return self::isStream($this->stream) && fclose($this->stream); 00323 } 00324 00336 protected function createException($message, $code = 0) 00337 { 00338 return new \Exception($message, $code); 00339 } 00340 00341 }