Overview

Namespaces

  • cassandra
  • None
  • PHP
  • phpcassa
    • Batch
    • Connection
    • Index
    • Iterator
    • Schema
      • DataType
    • Util
    • UUID

Classes

  • ConnectionPool

Exceptions

  • MaxRetriesException
  • NoServerAvailable
  • Overview
  • Namespace
  • Class
  • Tree
  1: <?php
  2: namespace phpcassa\Connection;
  3: 
  4: use phpcassa\Connection\ConnectionWrapper;
  5: use phpcassa\Connection\MaxRetriesException;
  6: use phpcassa\Connection\NoServerAvailable;
  7: 
  8: use cassandra\TimedOutException;
  9: use cassandra\NotFoundException;
 10: use cassandra\UnavailableException;
 11: 
 12: /**
 13:  * A pool of connections to a set of servers in a cluster.
 14:  * Each ConnectionPool is keyspace specific.
 15:  *
 16:  * @package phpcassa\Connection
 17:  */
 18: class ConnectionPool {
 19: 
 20:     const BASE_BACKOFF = 0.1;
 21:     const MICROS = 1000000;
 22:     const MAX_RETRIES = 2147483647; // 2^31 - 1
 23: 
 24:     const DEFAULT_MAX_RETRIES = 5;
 25:     const DEFAULT_RECYCLE = 10000;
 26: 
 27:     protected static $default_servers = array('localhost:9160');
 28: 
 29:     public $keyspace;
 30:     protected $servers;
 31:     protected $pool_size;
 32:     protected $send_timeout;
 33:     protected $recv_timeout;
 34:     protected $credentials;
 35:     protected $framed_transport;
 36:     protected $queue;
 37:     protected $keyspace_description = NULL;
 38: 
 39:     /**
 40:      * int $max_retries how many times an operation should be retried before
 41:      *     throwing a MaxRetriesException. Using 0 disables retries; using -1 causes
 42:      *     unlimited retries. The default is 5.
 43:      */
 44:     public $max_retries = self::DEFAULT_MAX_RETRIES;
 45: 
 46:     /**
 47:      * int $recycle after this many operations, a connection will be automatically
 48:      *     closed and replaced. Defaults to 10,000.
 49:      */
 50:     public $recycle = self::DEFAULT_RECYCLE;
 51: 
 52:     /**
 53:      * Constructs a ConnectionPool.
 54:      *
 55:      * @param string $keyspace the keyspace all connections will use
 56:      * @param mixed $servers an array of strings representing the servers to
 57:      *        open connections to.  Each item in the array should be a string
 58:      *        of the form 'host' or 'host:port'.  If a port is not given, 9160
 59:      *        is assumed.  If $servers is NULL, 'localhost:9160' will be used.
 60:      * @param int $pool_size the number of open connections to keep in the pool.
 61:      *        If $pool_size is left as NULL, max(5, count($servers) * 2) will be
 62:      *        used.
 63:      * @param int $max_retries how many times an operation should be retried before
 64:      *        throwing a MaxRetriesException. Using 0 disables retries; using -1 causes
 65:      *        unlimited retries. The default is 5.
 66:      * @param int $send_timeout the socket send timeout in milliseconds. Defaults to 5000.
 67:      * @param int $recv_timeout the socket receive timeout in milliseconds. Defaults to 5000.
 68:      * @param int $recycle after this many operations, a connection will be automatically
 69:      *        closed and replaced. Defaults to 10,000.
 70:      * @param mixed $credentials if using authentication or authorization with Cassandra,
 71:      *        a username and password need to be supplied. This should be in the form
 72:      *        array("username" => username, "password" => password)
 73:      * @param bool $framed_transport whether to use framed transport or buffered transport.
 74:      *        This must match Cassandra's configuration.  In Cassandra 0.7, framed transport
 75:      *        is the default. The default value is true.
 76:      */
 77:     public function __construct($keyspace,
 78:                                 $servers=NULL,
 79:                                 $pool_size=NULL,
 80:                                 $max_retries=self::DEFAULT_MAX_RETRIES,
 81:                                 $send_timeout=5000,
 82:                                 $recv_timeout=5000,
 83:                                 $recycle=self::DEFAULT_RECYCLE,
 84:                                 $credentials=NULL,
 85:                                 $framed_transport=true)
 86:     {
 87:         $this->keyspace = $keyspace;
 88:         $this->send_timeout = $send_timeout;
 89:         $this->recv_timeout = $recv_timeout;
 90:         $this->recycle = $recycle;
 91:         $this->max_retries = $max_retries;
 92:         $this->credentials = $credentials;
 93:         $this->framed_transport = $framed_transport;
 94: 
 95:         $this->stats = array(
 96:             'created' => 0,
 97:             'failed' => 0,
 98:             'recycled' => 0);
 99: 
100:         if (is_null($servers))
101:             $servers = self::$default_servers;
102:         $this->servers = $servers;
103: 
104:         if (is_null($pool_size))
105:             $this->pool_size = max(count($this->servers) * 2, 5);
106:         else
107:             $this->pool_size = $pool_size;
108: 
109:         $this->queue = array();
110: 
111:         // Randomly permute the server list
112:         shuffle($this->servers);
113:         $this->list_position = 0;
114:     }
115: 
116:     protected function make_conn() {
117:         // Keep trying to make a new connection, stopping after we've
118:         // tried every server twice
119:         $err = "";
120:         foreach (range(1, count($this->servers) * 2) as $i)
121:         {
122:             try {
123:                 $this->list_position = ($this->list_position + 1) % count($this->servers);
124:                 $new_conn = new ConnectionWrapper($this->keyspace, $this->servers[$this->list_position],
125:                     $this->credentials, $this->framed_transport, $this->send_timeout, $this->recv_timeout);
126:                 array_push($this->queue, $new_conn);
127:                 $this->stats['created'] += 1;
128:                 return;
129:             } catch (\TException $e) {
130:                 $h = $this->servers[$this->list_position];
131:                 $err = $e;
132:                 $msg = $e->getMessage();
133:                 $class = get_class($e);
134:                 $this->error_log("Error connecting to $h: $class: $msg", 0);
135:                 $this->stats['failed'] += 1;
136:             }
137:         }
138:         throw new NoServerAvailable("An attempt was made to connect to every server twice, but " .
139:             "all attempts failed. The last error was: " . get_class($err) .":". $err->getMessage());
140:     }
141: 
142:     /**
143:      * Adds connections to the pool until $pool_size connections
144:      * are in the pool.
145:      */
146:     public function fill() {
147:         while (count($this->queue) < $this->pool_size)
148:             $this->make_conn();
149:     }
150: 
151:     /**
152:      * Retrieves a connection from the pool.
153:      *
154:      * If the pool has fewer than $pool_size connections in
155:      * it, a new connection will be created.
156:      *
157:      * @return ConnectionWrapper a connection
158:      */
159:     public function get() {
160:         $num_conns = count($this->queue);
161:         if ($num_conns < $this->pool_size) {
162:             try {
163:                 $this->make_conn();
164:             } catch (NoServerAvailable $e) {
165:                 if ($num_conns == 0)
166:                     throw $e;
167:             }
168:         }
169:         return array_shift($this->queue);
170:     }
171: 
172:     /**
173:      * Returns a connection to the pool.
174:      * @param ConnectionWrapper $connection
175:      */
176:     public function return_connection($connection) {
177:         if ($connection->op_count >= $this->recycle) {
178:             $this->stats['recycled'] += 1;
179:             $connection->close();
180:             $this->make_conn();
181:             $connection = $this->get();
182:         }
183:         array_push($this->queue, $connection);
184:     }
185: 
186:     /**
187:      * Gets the keyspace description, caching the results for later lookups.
188:      * @return mixed
189:      */
190:     public function describe_keyspace() {
191:         if (NULL === $this->keyspace_description) {
192:             $this->keyspace_description = $this->call("describe_keyspace", $this->keyspace);
193:         }
194: 
195:         return $this->keyspace_description;
196:     }
197: 
198:     /**
199:      * Closes all connections in the pool.
200:      */
201:     public function dispose() {
202:         foreach($this->queue as $conn)
203:             $conn->close();
204:     }
205: 
206:     /**
207:      * Closes all connections in the pool.
208:      */
209:     public function close() {
210:         $this->dispose();
211:     }
212: 
213:     /**
214:      * Returns information about the number of opened connections, failed
215:      * operations, and recycled connections.
216:      * @return array Stats in the form array("failed" => failure_count,
217:      *         "created" => creation_count, "recycled" => recycle_count)
218:      */
219:     public function stats() {
220:         return $this->stats;
221:     }
222: 
223:     /**
224:      * Performs a Thrift operation using a connection from the pool.
225:      * The first argument should be the name of the function. The following
226:      * arguments should be the arguments for that Thrift function.
227:      *
228:      * If the connect fails with any exception other than a NotFoundException,
229:      * the connection will be closed and replaced in the pool. If the
230:      * Exception is suitable for retrying the operation (TimedOutException,
231:      * UnavailableException, TTransportException), the operation will be
232:      * retried with a new connection after an exponentially increasing
233:      * backoff is performed.
234:      *
235:      * To avoid automatic retries, create a ConnectionPool with the
236:      * $max_retries argument set to 0.
237:      *
238:      * In general, this method should *not* be used by users of the
239:      * library. It is primarily intended for internal use, but is left
240:      * exposed as an open workaround if needed.
241:      *
242:      * @return mixed
243:      */
244:     public function call() {
245:         $args = func_get_args(); // Get all of the args passed to this function
246:         $f = array_shift($args); // pull the function from the beginning
247: 
248:         $retry_count = 0;
249:         if ($this->max_retries == -1)
250:             $tries =  self::MAX_RETRIES;
251:         elseif ($this->max_retries == 0)
252:             $tries = 1;
253:         else
254:             $tries = $this->max_retries + 1;
255: 
256:         foreach (range(1, $tries) as $retry_count) {
257:             $conn = $this->get();
258: 
259:             $conn->op_count += 1;
260:             try {
261:                 $resp = call_user_func_array(array($conn->client, $f), $args);
262:                 $this->return_connection($conn);
263:                 return $resp;
264:             } catch (NotFoundException $nfe) {
265:                 $this->return_connection($conn);
266:                 throw $nfe;
267:             } catch (TimedOutException $toe) {
268:                 $last_err = $toe;
269:                 $this->handle_conn_failure($conn, $f, $toe, $retry_count);
270:             } catch (UnavailableException $ue) {
271:                 $last_err = $ue;
272:                 $this->handle_conn_failure($conn, $f, $ue, $retry_count);
273:             } catch (\TTransportException $tte) {
274:                 $last_err = $tte;
275:                 $this->handle_conn_failure($conn, $f, $tte, $retry_count);
276:             } catch (\Exception $e) {
277:                 $this->handle_conn_failure($conn, $f, $e, $retry_count);
278:                 throw $e;
279:             }
280:         }
281:         throw new MaxRetriesException("An attempt to execute $f failed $tries times.".
282:             " The last error was " . get_class($last_err) . ":" . $last_err->getMessage());
283:     }
284: 
285:     protected function handle_conn_failure($conn, $f, $exc, $retry_count) {
286:         $err = (string)$exc;
287:         $this->error_log("Error performing $f on $conn->server: $err", 0);
288:         $conn->close();
289:         $this->stats['failed'] += 1;
290:         usleep(self::BASE_BACKOFF * pow(2, $retry_count) * self::MICROS);
291:         $this->make_conn();
292:     }
293: 
294:     /**
295:      * This method called every time an error is logged. By default, it will
296:      * call the PHP builtin function error_log() with a messageType of 0. To
297:      * change this behavior, you can create a subclass and override this
298:      * method.
299:      *
300:      * Note that PHP has strange logging behavior. In particular, if you are
301:      * running the PHP cli and you haven't set a directive for error_log in
302:      * your php.ini, this will log to stdout even if you've called
303:      * error_reporting(0), which is supposed to suppress all logging.
304:      *
305:      * @param string $errorMsg
306:      * @param int $messageType
307:      */
308:     protected function error_log($errorMsg, $messageType=0) {
309:         error_log($errorMsg, $messageType);
310:     }
311: }
312: 
phpcassa API documentation generated by ApiGen 2.8.0