1: <?php
2:
3: 4: 5: 6: 7: 8: 9:
10:
11: 12: 13: 14: 15: 16:
17: class Swift_Transport_StreamBuffer extends Swift_ByteStream_AbstractFilterableInputStream implements Swift_Transport_IoBuffer
18: {
19:
20: private $_stream;
21:
22:
23: private $_in;
24:
25:
26: private $_out;
27:
28:
29: private $_params = array();
30:
31:
32: private $_replacementFactory;
33:
34:
35: private $_translations = array();
36:
37: 38: 39: 40:
41: public function __construct(Swift_ReplacementFilterFactory $replacementFactory)
42: {
43: $this->_replacementFactory = $replacementFactory;
44: }
45:
46: 47: 48: 49: 50:
51: public function initialize(array $params)
52: {
53: $this->_params = $params;
54: switch ($params['type']) {
55: case self::TYPE_PROCESS:
56: $this->_establishProcessConnection();
57: break;
58: case self::TYPE_SOCKET:
59: default:
60: $this->_establishSocketConnection();
61: break;
62: }
63: }
64:
65: 66: 67: 68: 69:
70: public function setParam($param, $value)
71: {
72: if (isset($this->_stream)) {
73: switch ($param) {
74: case 'timeout':
75: if ($this->_stream) {
76: stream_set_timeout($this->_stream, $value);
77: }
78: break;
79:
80: case 'blocking':
81: if ($this->_stream) {
82: stream_set_blocking($this->_stream, 1);
83: }
84:
85: }
86: }
87: $this->_params[$param] = $value;
88: }
89:
90: public function startTLS()
91: {
92: return stream_socket_enable_crypto($this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT);
93: }
94:
95: 96: 97:
98: public function terminate()
99: {
100: if (isset($this->_stream)) {
101: switch ($this->_params['type']) {
102: case self::TYPE_PROCESS:
103: fclose($this->_in);
104: fclose($this->_out);
105: proc_close($this->_stream);
106: break;
107: case self::TYPE_SOCKET:
108: default:
109: fclose($this->_stream);
110: break;
111: }
112: }
113: $this->_stream = null;
114: $this->_out = null;
115: $this->_in = null;
116: }
117:
118: 119: 120: 121: 122:
123: public function setWriteTranslations(array $replacements)
124: {
125: foreach ($this->_translations as $search => $replace) {
126: if (!isset($replacements[$search])) {
127: $this->removeFilter($search);
128: unset($this->_translations[$search]);
129: }
130: }
131:
132: foreach ($replacements as $search => $replace) {
133: if (!isset($this->_translations[$search])) {
134: $this->addFilter(
135: $this->_replacementFactory->createFilter($search, $replace), $search
136: );
137: $this->_translations[$search] = true;
138: }
139: }
140: }
141:
142: 143: 144: 145: 146: 147: 148:
149: public function readLine($sequence)
150: {
151: if (isset($this->_out) && !feof($this->_out)) {
152: $line = fgets($this->_out);
153: if (strlen($line)==0) {
154: $metas = stream_get_meta_data($this->_out);
155: if ($metas['timed_out']) {
156: throw new Swift_IoException(
157: 'Connection to ' .
158: $this->_getReadConnectionDescription() .
159: ' Timed Out'
160: );
161: }
162: }
163:
164: return $line;
165: }
166: }
167:
168: 169: 170: 171: 172: 173: 174: 175:
176: public function read($length)
177: {
178: if (isset($this->_out) && !feof($this->_out)) {
179: $ret = fread($this->_out, $length);
180: if (strlen($ret)==0) {
181: $metas = stream_get_meta_data($this->_out);
182: if ($metas['timed_out']) {
183: throw new Swift_IoException(
184: 'Connection to ' .
185: $this->_getReadConnectionDescription() .
186: ' Timed Out'
187: );
188: }
189: }
190:
191: return $ret;
192: }
193: }
194:
195:
196: public function setReadPointer($byteOffset)
197: {
198: }
199:
200:
201:
202:
203: protected function _flush()
204: {
205: if (isset($this->_in)) {
206: fflush($this->_in);
207: }
208: }
209:
210:
211: protected function _commit($bytes)
212: {
213: if (isset($this->_in)
214: && fwrite($this->_in, $bytes))
215: {
216: return ++$this->_sequence;
217: }
218: }
219:
220:
221:
222: 223: 224: 225:
226: private function _establishSocketConnection()
227: {
228: $host = $this->_params['host'];
229: if (!empty($this->_params['protocol'])) {
230: $host = $this->_params['protocol'] . '://' . $host;
231: }
232: $timeout = 15;
233: if (!empty($this->_params['timeout'])) {
234: $timeout = $this->_params['timeout'];
235: }
236: $options = array();
237: if (!empty($this->_params['sourceIp'])) {
238: $options['socket']['bindto']=$this->_params['sourceIp'].':0';
239: }
240: $this->_stream = @stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options));
241: if (false === $this->_stream) {
242: throw new Swift_TransportException(
243: 'Connection could not be established with host ' . $this->_params['host'] .
244: ' [' . $errstr . ' #' . $errno . ']'
245: );
246: }
247: if (!empty($this->_params['blocking'])) {
248: stream_set_blocking($this->_stream, 1);
249: } else {
250: stream_set_blocking($this->_stream, 0);
251: }
252: stream_set_timeout($this->_stream, $timeout);
253: $this->_in =& $this->_stream;
254: $this->_out =& $this->_stream;
255: }
256:
257: 258: 259: 260:
261: private function _establishProcessConnection()
262: {
263: $command = $this->_params['command'];
264: $descriptorSpec = array(
265: 0 => array('pipe', 'r'),
266: 1 => array('pipe', 'w'),
267: 2 => array('pipe', 'w')
268: );
269: $this->_stream = proc_open($command, $descriptorSpec, $pipes);
270: stream_set_blocking($pipes[2], 0);
271: if ($err = stream_get_contents($pipes[2])) {
272: throw new Swift_TransportException(
273: 'Process could not be started [' . $err . ']'
274: );
275: }
276: $this->_in =& $pipes[0];
277: $this->_out =& $pipes[1];
278: }
279:
280: private function _getReadConnectionDescription()
281: {
282: switch ($this->_params['type']) {
283: case self::TYPE_PROCESS:
284: return 'Process '.$this->_params['command'];
285: break;
286:
287: case self::TYPE_SOCKET:
288: default:
289: $host = $this->_params['host'];
290: if (!empty($this->_params['protocol'])) {
291: $host = $this->_params['protocol'] . '://' . $host;
292: }
293: $host.=':'.$this->_params['port'];
294:
295: return $host;
296: break;
297: }
298: }
299: }
300: