1: <?php
2: namespace phpcassa\Connection;
3:
4: use phpcassa\Connection\ConnectionWrapper;
5: use phpcassa\Connection\MaxRetriesException;
6: use phpcassa\Connection\NoServerAvailable;
7:
8: use cassandra\TimedOutException;
9: use cassandra\NotFoundException;
10: use cassandra\UnavailableException;
11:
12: /**
13: * A pool of connections to a set of servers in a cluster.
14: * Each ConnectionPool is keyspace specific.
15: *
16: * @package phpcassa\Connection
17: */
18: class ConnectionPool {
19:
20: const BASE_BACKOFF = 0.1;
21: const MICROS = 1000000;
22: const MAX_RETRIES = 2147483647; // 2^31 - 1
23:
24: const DEFAULT_MAX_RETRIES = 5;
25: const DEFAULT_RECYCLE = 10000;
26:
27: protected static $default_servers = array('localhost:9160');
28:
29: public $keyspace;
30: protected $servers;
31: protected $pool_size;
32: protected $send_timeout;
33: protected $recv_timeout;
34: protected $credentials;
35: protected $framed_transport;
36: protected $queue;
37: protected $keyspace_description = NULL;
38:
39: /**
40: * int $max_retries how many times an operation should be retried before
41: * throwing a MaxRetriesException. Using 0 disables retries; using -1 causes
42: * unlimited retries. The default is 5.
43: */
44: public $max_retries = self::DEFAULT_MAX_RETRIES;
45:
46: /**
47: * int $recycle after this many operations, a connection will be automatically
48: * closed and replaced. Defaults to 10,000.
49: */
50: public $recycle = self::DEFAULT_RECYCLE;
51:
52: /**
53: * Constructs a ConnectionPool.
54: *
55: * @param string $keyspace the keyspace all connections will use
56: * @param mixed $servers an array of strings representing the servers to
57: * open connections to. Each item in the array should be a string
58: * of the form 'host' or 'host:port'. If a port is not given, 9160
59: * is assumed. If $servers is NULL, 'localhost:9160' will be used.
60: * @param int $pool_size the number of open connections to keep in the pool.
61: * If $pool_size is left as NULL, max(5, count($servers) * 2) will be
62: * used.
63: * @param int $max_retries how many times an operation should be retried before
64: * throwing a MaxRetriesException. Using 0 disables retries; using -1 causes
65: * unlimited retries. The default is 5.
66: * @param int $send_timeout the socket send timeout in milliseconds. Defaults to 5000.
67: * @param int $recv_timeout the socket receive timeout in milliseconds. Defaults to 5000.
68: * @param int $recycle after this many operations, a connection will be automatically
69: * closed and replaced. Defaults to 10,000.
70: * @param mixed $credentials if using authentication or authorization with Cassandra,
71: * a username and password need to be supplied. This should be in the form
72: * array("username" => username, "password" => password)
73: * @param bool $framed_transport whether to use framed transport or buffered transport.
74: * This must match Cassandra's configuration. In Cassandra 0.7, framed transport
75: * is the default. The default value is true.
76: */
77: public function __construct($keyspace,
78: $servers=NULL,
79: $pool_size=NULL,
80: $max_retries=self::DEFAULT_MAX_RETRIES,
81: $send_timeout=5000,
82: $recv_timeout=5000,
83: $recycle=self::DEFAULT_RECYCLE,
84: $credentials=NULL,
85: $framed_transport=true)
86: {
87: $this->keyspace = $keyspace;
88: $this->send_timeout = $send_timeout;
89: $this->recv_timeout = $recv_timeout;
90: $this->recycle = $recycle;
91: $this->max_retries = $max_retries;
92: $this->credentials = $credentials;
93: $this->framed_transport = $framed_transport;
94:
95: $this->stats = array(
96: 'created' => 0,
97: 'failed' => 0,
98: 'recycled' => 0);
99:
100: if (is_null($servers))
101: $servers = self::$default_servers;
102: $this->servers = $servers;
103:
104: if (is_null($pool_size))
105: $this->pool_size = max(count($this->servers) * 2, 5);
106: else
107: $this->pool_size = $pool_size;
108:
109: $this->queue = array();
110:
111: // Randomly permute the server list
112: shuffle($this->servers);
113: $this->list_position = 0;
114: }
115:
116: protected function make_conn() {
117: // Keep trying to make a new connection, stopping after we've
118: // tried every server twice
119: $err = "";
120: foreach (range(1, count($this->servers) * 2) as $i)
121: {
122: try {
123: $this->list_position = ($this->list_position + 1) % count($this->servers);
124: $new_conn = new ConnectionWrapper($this->keyspace, $this->servers[$this->list_position],
125: $this->credentials, $this->framed_transport, $this->send_timeout, $this->recv_timeout);
126: array_push($this->queue, $new_conn);
127: $this->stats['created'] += 1;
128: return;
129: } catch (\TException $e) {
130: $h = $this->servers[$this->list_position];
131: $err = $e;
132: $msg = $e->getMessage();
133: $class = get_class($e);
134: $this->error_log("Error connecting to $h: $class: $msg", 0);
135: $this->stats['failed'] += 1;
136: }
137: }
138: throw new NoServerAvailable("An attempt was made to connect to every server twice, but " .
139: "all attempts failed. The last error was: " . get_class($err) .":". $err->getMessage());
140: }
141:
142: /**
143: * Adds connections to the pool until $pool_size connections
144: * are in the pool.
145: */
146: public function fill() {
147: while (count($this->queue) < $this->pool_size)
148: $this->make_conn();
149: }
150:
151: /**
152: * Retrieves a connection from the pool.
153: *
154: * If the pool has fewer than $pool_size connections in
155: * it, a new connection will be created.
156: *
157: * @return ConnectionWrapper a connection
158: */
159: public function get() {
160: $num_conns = count($this->queue);
161: if ($num_conns < $this->pool_size) {
162: try {
163: $this->make_conn();
164: } catch (NoServerAvailable $e) {
165: if ($num_conns == 0)
166: throw $e;
167: }
168: }
169: return array_shift($this->queue);
170: }
171:
172: /**
173: * Returns a connection to the pool.
174: * @param ConnectionWrapper $connection
175: */
176: public function return_connection($connection) {
177: if ($connection->op_count >= $this->recycle) {
178: $this->stats['recycled'] += 1;
179: $connection->close();
180: $this->make_conn();
181: $connection = $this->get();
182: }
183: array_push($this->queue, $connection);
184: }
185:
186: /**
187: * Gets the keyspace description, caching the results for later lookups.
188: * @return mixed
189: */
190: public function describe_keyspace() {
191: if (NULL === $this->keyspace_description) {
192: $this->keyspace_description = $this->call("describe_keyspace", $this->keyspace);
193: }
194:
195: return $this->keyspace_description;
196: }
197:
198: /**
199: * Closes all connections in the pool.
200: */
201: public function dispose() {
202: foreach($this->queue as $conn)
203: $conn->close();
204: }
205:
206: /**
207: * Closes all connections in the pool.
208: */
209: public function close() {
210: $this->dispose();
211: }
212:
213: /**
214: * Returns information about the number of opened connections, failed
215: * operations, and recycled connections.
216: * @return array Stats in the form array("failed" => failure_count,
217: * "created" => creation_count, "recycled" => recycle_count)
218: */
219: public function stats() {
220: return $this->stats;
221: }
222:
223: /**
224: * Performs a Thrift operation using a connection from the pool.
225: * The first argument should be the name of the function. The following
226: * arguments should be the arguments for that Thrift function.
227: *
228: * If the connect fails with any exception other than a NotFoundException,
229: * the connection will be closed and replaced in the pool. If the
230: * Exception is suitable for retrying the operation (TimedOutException,
231: * UnavailableException, TTransportException), the operation will be
232: * retried with a new connection after an exponentially increasing
233: * backoff is performed.
234: *
235: * To avoid automatic retries, create a ConnectionPool with the
236: * $max_retries argument set to 0.
237: *
238: * In general, this method should *not* be used by users of the
239: * library. It is primarily intended for internal use, but is left
240: * exposed as an open workaround if needed.
241: *
242: * @return mixed
243: */
244: public function call() {
245: $args = func_get_args(); // Get all of the args passed to this function
246: $f = array_shift($args); // pull the function from the beginning
247:
248: $retry_count = 0;
249: if ($this->max_retries == -1)
250: $tries = self::MAX_RETRIES;
251: elseif ($this->max_retries == 0)
252: $tries = 1;
253: else
254: $tries = $this->max_retries + 1;
255:
256: foreach (range(1, $tries) as $retry_count) {
257: $conn = $this->get();
258:
259: $conn->op_count += 1;
260: try {
261: $resp = call_user_func_array(array($conn->client, $f), $args);
262: $this->return_connection($conn);
263: return $resp;
264: } catch (NotFoundException $nfe) {
265: $this->return_connection($conn);
266: throw $nfe;
267: } catch (TimedOutException $toe) {
268: $last_err = $toe;
269: $this->handle_conn_failure($conn, $f, $toe, $retry_count);
270: } catch (UnavailableException $ue) {
271: $last_err = $ue;
272: $this->handle_conn_failure($conn, $f, $ue, $retry_count);
273: } catch (\TTransportException $tte) {
274: $last_err = $tte;
275: $this->handle_conn_failure($conn, $f, $tte, $retry_count);
276: } catch (\Exception $e) {
277: $this->handle_conn_failure($conn, $f, $e, $retry_count);
278: throw $e;
279: }
280: }
281: throw new MaxRetriesException("An attempt to execute $f failed $tries times.".
282: " The last error was " . get_class($last_err) . ":" . $last_err->getMessage());
283: }
284:
285: protected function handle_conn_failure($conn, $f, $exc, $retry_count) {
286: $err = (string)$exc;
287: $this->error_log("Error performing $f on $conn->server: $err", 0);
288: $conn->close();
289: $this->stats['failed'] += 1;
290: usleep(self::BASE_BACKOFF * pow(2, $retry_count) * self::MICROS);
291: $this->make_conn();
292: }
293:
294: /**
295: * This method called every time an error is logged. By default, it will
296: * call the PHP builtin function error_log() with a messageType of 0. To
297: * change this behavior, you can create a subclass and override this
298: * method.
299: *
300: * Note that PHP has strange logging behavior. In particular, if you are
301: * running the PHP cli and you haven't set a directive for error_log in
302: * your php.ini, this will log to stdout even if you've called
303: * error_reporting(0), which is supposed to suppress all logging.
304: *
305: * @param string $errorMsg
306: * @param int $messageType
307: */
308: protected function error_log($errorMsg, $messageType=0) {
309: error_log($errorMsg, $messageType);
310: }
311: }
312: