Overview

Namespaces

  • cassandra
  • None
  • PHP
  • phpcassa
    • Batch
    • Connection
    • Index
    • Iterator
    • Schema
      • DataType
    • Util
    • UUID

Classes

  • AbstractColumnFamily
  • ColumnFamily
  • ColumnSlice
  • SuperColumnFamily
  • SystemManager
  • UUID
  • Overview
  • Namespace
  • Class
  • Tree
   1: <?php
   2: namespace phpcassa;
   3: 
   4: use phpcassa\ColumnSlice;
   5: use phpcassa\Schema\DataType;
   6: use phpcassa\Schema\DataType\BytesType;
   7: use phpcassa\Schema\DataType\CompositeType;
   8: use phpcassa\Schema\DataType\Serialized;
   9: 
  10: use phpcassa\Iterator\IndexedColumnFamilyIterator;
  11: use phpcassa\Iterator\RangeColumnFamilyIterator;
  12: use phpcassa\Iterator\RangeTokenColumnFamilyIterator;
  13: 
  14: use phpcassa\Batch\CfMutator;
  15: 
  16: use phpcassa\Util\Clock;
  17: 
  18: use cassandra\InvalidRequestException;
  19: use cassandra\NotFoundException;
  20: 
  21: use cassandra\Mutation;
  22: use cassandra\Deletion;
  23: use cassandra\ConsistencyLevel;
  24: use cassandra\Column;
  25: use cassandra\ColumnParent;
  26: use cassandra\ColumnPath;
  27: use cassandra\ColumnOrSuperColumn;
  28: use cassandra\CounterColumn;
  29: use cassandra\IndexClause;
  30: use cassandra\IndexExpression;
  31: use cassandra\SlicePredicate;
  32: use cassandra\SliceRange;
  33: 
  34: /**
  35:  * Functions and constants used both in ColumnFamily and SuperColumnFamily
  36:  *
  37:  * @package phpcassa
  38:  */
  39: abstract class AbstractColumnFamily {
  40: 
  41:     /** The default limit to the number of rows retrieved in queries. */
  42:     const DEFAULT_ROW_COUNT = 100;
  43: 
  44:     const DEFAULT_BUFFER_SIZE = 100;
  45: 
  46:     /**
  47:      * Data that is returned will be stored in a "dictionary" format,
  48:      * where array keys correspond to row keys, super column names,
  49:      * or column names. Data that is inserted should match this format.
  50:      *
  51:      * When using non-scalar types or floats with this format, array keys
  52:      * must be serialized and unserialized. This is typically a good reason
  53:      * to use one of the other formats.
  54:      *
  55:      * This format is what phpcassa has historically used. This may be used
  56:      * for both ColumnFamily::insert_format and ColumnFamily::return_format.
  57:      */
  58:     const DICTIONARY_FORMAT = 1;
  59: 
  60:     /**
  61:      * Data that is returned will be stored in normal, non-mapping arrays.
  62:      * For example, a column will be represented as array($name, $value),
  63:      * and a row returned with multiget() will be represented as
  64:      * array($rowkey, array(array($colname1, $colval1), array($colname2, $colval2))).
  65:      *
  66:      * Data that is inserted should match this format. Serialization is not needed with
  67:      * this format.
  68:      *
  69:      * This may be used for both ColumnFamily::insert_format and
  70:      * ColumnFamily::return_format.
  71:      */
  72:     const ARRAY_FORMAT = 2;
  73: 
  74:     /**
  75:      * Data will be returned in a object-based format, roughly matching
  76:      * what Thrift returns directly.  This means that results will contain:
  77:      * Column objects, which have $name, $value, $timestamp, and $ttl attributes;
  78:      * CounterColumn objects, which have $name and $value attributes;
  79:      * SuperColumn objects, which have $name and $columns attributes, where
  80:      * $columns is an array of Column or CounterColumn objects.
  81:      *
  82:      * This format can currently only be used for ColumnFamily::return_format,
  83:      * not ColumnFamily::insert_format.
  84:      *
  85:      * Unserialization is not required for this format.
  86:      */
  87:     const OBJECT_FORMAT = 3;
  88: 
  89:     /** @internal */
  90:     public $column_family;
  91: 
  92:     /** @internal */
  93:     public $is_super;
  94: 
  95:     /** @internal */
  96:     protected $cf_data_type;
  97: 
  98:     /** @internal */
  99:     protected $col_name_type;
 100: 
 101:     /** @internal */
 102:     protected $supercol_name_type;
 103: 
 104:     /** @internal */
 105:     protected $col_type_dict;
 106: 
 107: 
 108:     /**
 109:      * Whether column names should be packed and unpacked automatically.
 110:      * Default is true.
 111:      */
 112:     public $autopack_names;
 113: 
 114:     /**
 115:      * Whether column values should be packed and unpacked automatically.
 116:      * Default is true.
 117:      */
 118:     public $autopack_values;
 119: 
 120:     /**
 121:      * Whether keys should be packed and unpacked automatically.
 122:      * Default is true.
 123:      */
 124:     public $autopack_keys;
 125: 
 126:     /** @var ConsistencyLevel the default read consistency level */
 127:     public $read_consistency_level = ConsistencyLevel::ONE;
 128:     /** @var ConsistencyLevel the default write consistency level */
 129:     public $write_consistency_level = ConsistencyLevel::ONE;
 130: 
 131:     /**
 132:      * The format that data will be returned in.
 133:      *
 134:      * Valid values include DICTIONARY_FORMAT, ARRAY_FORMAT,
 135:      * and OBJECT_FORMAT.
 136:      */
 137:     public $return_format = self::DICTIONARY_FORMAT;
 138: 
 139:     /**
 140:      * The format that data should be inserted in.
 141:      *
 142:      * Valid values include DICTIONARY_FORMAT and
 143:      * ARRAY_FORMAT.
 144:      */
 145:     public $insert_format = self::DICTIONARY_FORMAT;
 146: 
 147:     /**
 148:      * @var int When calling `get_range`, the intermediate results need
 149:      *       to be buffered if we are fetching many rows, otherwise the Cassandra
 150:      *       server will overallocate memory and fail.  This is the size of
 151:      *       that buffer in number of rows. The default is 100.
 152:      */
 153:     public $buffer_size = 100;
 154: 
 155:     /**
 156:      * Constructs a ColumnFamily.
 157:      *
 158:      * @param phpcassa\Connection\ConnectionPool $pool the pool to use when
 159:      *        querying Cassandra
 160:      * @param string $column_family the name of the column family in Cassandra
 161:      * @param bool $autopack_names whether or not to automatically convert column names
 162:      *        to and from their binary representation in Cassandra
 163:      *        based on their comparator type
 164:      * @param bool $autopack_values whether or not to automatically convert column values
 165:      *        to and from their binary representation in Cassandra
 166:      *        based on their validator type
 167:      * @param ConsistencyLevel $read_consistency_level the default consistency
 168:      *        level for read operations on this column family
 169:      * @param ConsistencyLevel $write_consistency_level the default consistency
 170:      *        level for write operations on this column family
 171:      * @param int $buffer_size When calling `get_range`, the intermediate results need
 172:      *        to be buffered if we are fetching many rows, otherwise the Cassandra
 173:      *        server will overallocate memory and fail.  This is the size of
 174:      *        that buffer in number of rows.
 175:      */
 176:     public function __construct($pool,
 177:         $column_family,
 178:         $autopack_names=true,
 179:         $autopack_values=true,
 180:         $read_consistency_level=ConsistencyLevel::ONE,
 181:         $write_consistency_level=ConsistencyLevel::ONE,
 182:         $buffer_size=self::DEFAULT_BUFFER_SIZE) {
 183: 
 184:         $this->pool = $pool;
 185:         $this->column_family = $column_family;
 186:         $this->read_consistency_level = $read_consistency_level;
 187:         $this->write_consistency_level = $write_consistency_level;
 188:         $this->buffer_size = $buffer_size;
 189: 
 190:         $ks = $this->pool->describe_keyspace();
 191: 
 192:         $cf_def = null;
 193:         foreach($ks->cf_defs as $cfdef) {
 194:             if ($cfdef->name == $this->column_family) {
 195:                 $cf_def = $cfdef;
 196:                 break;
 197:             }
 198:         }
 199:         if ($cf_def == null)
 200:             throw new NotFoundException();
 201:         else
 202:             $this->cfdef = $cf_def;
 203: 
 204:         $this->cf_data_type = new BytesType();
 205:         $this->col_name_type = new BytesType();
 206:         $this->supercol_name_type = new BytesType();
 207:         $this->key_type = new BytesType();
 208:         $this->col_type_dict = array();
 209: 
 210:         $this->is_super = $this->cfdef->column_type === 'Super';
 211:         $this->has_counters = self::endswith(
 212:             $this->cfdef->default_validation_class,
 213:             "CounterColumnType");
 214: 
 215:         $this->set_autopack_names($autopack_names);
 216:         $this->set_autopack_values($autopack_values);
 217:         $this->set_autopack_keys(true);
 218:     }
 219: 
 220:     protected static function endswith($str, $suffix) {
 221:         $suffix_len = strlen($suffix);
 222:         return substr_compare($str, $suffix, strlen($str)-$suffix_len, $suffix_len) === 0;
 223:     }
 224: 
 225:     /**
 226:      * @param bool $pack_names whether or not column names are automatically packed/unpacked
 227:      */
 228:     public function set_autopack_names($pack_names) {
 229:         if ($pack_names) {
 230:             if ($this->autopack_names)
 231:                 return;
 232:             $this->autopack_names = true;
 233:             if (!$this->is_super) {
 234:                 $this->col_name_type = DataType::get_type_for($this->cfdef->comparator_type);
 235:             } else {
 236:                 $this->col_name_type = DataType::get_type_for($this->cfdef->subcomparator_type);
 237:                 $this->supercol_name_type = DataType::get_type_for($this->cfdef->comparator_type);
 238:             }
 239:         } else {
 240:             $this->autopack_names = false;
 241:         }
 242:     }
 243: 
 244:     /**
 245:      * @param bool $pack_values whether or not column values are automatically packed/unpacked
 246:      */
 247:     public function set_autopack_values($pack_values) {
 248:         if ($pack_values) {
 249:             $this->autopack_values = true;
 250:             $this->cf_data_type = DataType::get_type_for($this->cfdef->default_validation_class);
 251:             foreach($this->cfdef->column_metadata as $coldef) {
 252:                 $this->col_type_dict[$coldef->name] =
 253:                     DataType::get_type_for($coldef->validation_class);
 254:             }
 255:         } else {
 256:             $this->autopack_values = false;
 257:         }
 258:     }
 259: 
 260:     /**
 261:      * @param bool $pack_keys whether or not keys are automatically packed/unpacked
 262:      *
 263:      * Available since Cassandra 0.8.0.
 264:      */
 265:     public function set_autopack_keys($pack_keys) {
 266:         if ($pack_keys) {
 267:             $this->autopack_keys = true;
 268:             if (property_exists('\cassandra\CfDef', "key_validation_class")) {
 269:                 $this->key_type = DataType::get_type_for($this->cfdef->key_validation_class);
 270:             } else {
 271:                 $this->key_type = new BytesType();
 272:             }
 273:         } else {
 274:             $this->autopack_keys = false;
 275:         }
 276:     }
 277: 
 278:     /**
 279:      * Fetch a row from this column family.
 280:      *
 281:      * @param string $key row key to fetch
 282:      * @param \phpcassa\ColumnSlice a slice of columns to fetch, or null
 283:      * @param mixed[] $column_names limit the columns or super columns fetched to this list
 284:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 285:      *        number of nodes that must respond before the operation returns
 286:      *
 287:      * @return mixed array(column_name => column_value)
 288:      */
 289:     public function get($key,
 290:         $column_slice=null,
 291:         $column_names=null,
 292:         $consistency_level=null) {
 293: 
 294:         $column_parent = $this->create_column_parent();
 295:         $predicate = $this->create_slice_predicate($column_names, $column_slice);
 296:         return $this->_get($key, $column_parent, $predicate, $consistency_level);
 297:     }
 298: 
 299:     protected function _get($key, $cp, $slice, $cl) {
 300:         $resp = $this->pool->call("get_slice",
 301:             $this->pack_key($key),
 302:             $cp, $slice, $this->rcl($cl));
 303: 
 304:         if (count($resp) == 0)
 305:             throw new NotFoundException();
 306: 
 307:         return $this->unpack_coscs($resp);
 308:     }
 309: 
 310:     /**
 311:      * Fetch a set of rows from this column family.
 312:      *
 313:      * @param string[] $keys row keys to fetch
 314:      * @param \phpcassa\ColumnSlice a slice of columns to fetch, or null
 315:      * @param mixed[] $column_names limit the columns or super columns fetched to this list
 316:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 317:      *        number of nodes that must respond before the operation returns
 318:      * @param int $buffer_size the number of keys to multiget at a single time. If your
 319:      *        rows are large, having a high buffer size gives poor performance; if your
 320:      *        rows are small, consider increasing this value.
 321:      *
 322:      * @return mixed array(key => array(column_name => column_value))
 323:      */
 324:     public function multiget($keys,
 325:         $column_slice=null,
 326:         $column_names=null,
 327:         $consistency_level=null,
 328:         $buffer_size=16)  {
 329: 
 330:         $cp = $this->create_column_parent();
 331:         $slice = $this->create_slice_predicate($column_names, $column_slice);
 332: 
 333:         return $this->_multiget($keys, $cp, $slice, $consistency_level, $buffer_size);
 334:     }
 335: 
 336:     protected function _multiget($keys, $cp, $slice, $cl, $buffsz) {
 337:         $ret = array();
 338: 
 339:         $have_dict = ($this->return_format == self::DICTIONARY_FORMAT);
 340:         $should_serialize = ($this->key_type instanceof Serialized);
 341:         if ($have_dict) {
 342:             if ($should_serialize) {
 343:                 foreach($keys as $key) {
 344:                     $ret[serialize($key)] = null;
 345:                 }
 346:             } else {
 347:                 foreach($keys as $key) {
 348:                     $ret[$key] = null;
 349:                 }
 350:             }
 351:         }
 352: 
 353:         $cl = $this->rcl($cl);
 354: 
 355:         $resp = array();
 356:         if(count($keys) <= $buffsz) {
 357:             $resp = $this->pool->call("multiget_slice",
 358:                 array_map(array($this, "pack_key"), $keys),
 359:                 $cp, $slice, $cl);
 360:         } else {
 361:             $subset_keys = array();
 362:             $i = 0;
 363:             foreach($keys as $key) {
 364:                 $i += 1;
 365:                 $subset_keys[] = $key;
 366:                 if ($i == $buffsz) {
 367:                     $sub_resp = $this->pool->call("multiget_slice",
 368:                         array_map(array($this, "pack_key"), $subset_keys),
 369:                         $cp, $slice, $cl);
 370:                     $subset_keys = array();
 371:                     $i = 0;
 372:                     $resp = $resp + $sub_resp;
 373:                 }
 374:             }
 375:             if (count($subset_keys) != 0) {
 376:                 $sub_resp = $this->pool->call("multiget_slice",
 377:                     array_map(array($this, "pack_key"), $subset_keys),
 378:                     $cp, $slice, $cl);
 379:                 $resp = $resp + $sub_resp;
 380:             }
 381:         }
 382: 
 383:         $non_empty_keys = array();
 384:         foreach($resp as $key => $val) {
 385:             if (count($val) > 0) {
 386:                 $unpacked_key = $this->unpack_key($key, $have_dict);
 387: 
 388:                 if ($have_dict) {
 389:                     $non_empty_keys[$unpacked_key] = 1;
 390:                     $ret[$unpacked_key] = $this->unpack_coscs($val);
 391:                 } else {
 392:                     $ret[] = array($unpacked_key, $this->unpack_coscs($val));
 393:                 }
 394:             }
 395:         }
 396: 
 397:         if ($have_dict) {
 398:             if ($should_serialize) {
 399:                 foreach($keys as $key) {
 400:                     $skey = serialize($key);
 401:                     if (!isset($non_empty_keys[$skey]))
 402:                         unset($ret[$skey]);
 403:                 }
 404:             } else {
 405:                 foreach($keys as $key) {
 406:                     if (!isset($non_empty_keys[$key]))
 407:                         unset($ret[$key]);
 408:                 }
 409:             }
 410:         }
 411: 
 412:         return $ret;
 413:     }
 414: 
 415:     /**
 416:      * Count the number of columns in a row.
 417:      *
 418:      * @param string $key row to be counted
 419:      * @param \phpcassa\ColumnSlice a slice of columns to count, or null
 420:      * @param mixed[] $column_names limit the possible columns or super columns counted to this list
 421:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 422:      *        number of nodes that must respond before the operation returns
 423:      *
 424:      * @return int
 425:      */
 426:     public function get_count($key,
 427:         $column_slice=null,
 428:         $column_names=null,
 429:         $consistency_level=null) {
 430: 
 431:         $cp = $this->create_column_parent();
 432:         $slice = $this->create_slice_predicate(
 433:             $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);
 434:         return $this->_get_count($key, $cp, $slice, $consistency_level);
 435:     }
 436: 
 437:     protected function _get_count($key, $cp, $slice, $cl) {
 438:         $packed_key = $this->pack_key($key);
 439:         return $this->pool->call("get_count", $packed_key, $cp, $slice, $this->rcl($cl));
 440:     }
 441: 
 442:     /**
 443:      * Count the number of columns in a set of rows.
 444:      *
 445:      * @param string[] $keys rows to be counted
 446:      * @param \phpcassa\ColumnSlice a slice of columns to count, or null
 447:      * @param mixed[] $column_names limit the possible columns or super columns counted to this list
 448:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 449:      *        number of nodes that must respond before the operation returns
 450:      *
 451:      * @return mixed array(row_key => row_count)
 452:      */
 453:     public function multiget_count($keys,
 454:         $column_slice=null,
 455:         $column_names=null,
 456:         $consistency_level=null) {
 457: 
 458:         $cp = $this->create_column_parent();
 459:         $slice = $this->create_slice_predicate(
 460:             $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);
 461: 
 462:         return $this->_multiget_count($keys, $cp, $slice, $consistency_level);
 463:     }
 464: 
 465:     protected function _multiget_count($keys, $cp, $slice, $cl) {
 466: 
 467:         $ret = array();
 468:         $have_dict = ($this->return_format == self::DICTIONARY_FORMAT);
 469:         if ($have_dict) {
 470:             foreach($keys as $key) {
 471:                 $ret[$key] = null;
 472:             }
 473:         }
 474: 
 475:         $packed_keys = array_map(array($this, "pack_key"), $keys);
 476:         $results = $this->pool->call("multiget_count", $packed_keys, $cp, $slice,
 477:             $this->rcl($cl));
 478: 
 479:         $non_empty_keys = array();
 480:         foreach ($results as $key => $count) {
 481:             $unpacked_key = $this->unpack_key($key, $have_dict);
 482: 
 483:             if ($have_dict) {
 484:                 $non_empty_keys[$unpacked_key] = 1;
 485:                 $ret[$unpacked_key] = $count;
 486:             } else {
 487:                 $ret[] = array($unpacked_key, $count);
 488:             }
 489:         }
 490: 
 491:         if ($have_dict) {
 492:             foreach($keys as $key) {
 493:                 if (!isset($non_empty_keys[$key]))
 494:                     unset($ret[$key]);
 495:             }
 496:         }
 497: 
 498:         return $ret;
 499:     }
 500: 
 501:     /**
 502:      * Get an iterator over a range of rows.
 503:      *
 504:      * @param mixed $key_start fetch rows with a key >= this
 505:      * @param mixed $key_finish fetch rows with a key <= this
 506:      * @param int $row_count limit the number of rows returned to this amount
 507:      * @param \phpcassa\ColumnSlice a slice of columns to fetch, or null
 508:      * @param mixed[] $column_names limit the columns or super columns fetched to this list
 509:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 510:      *        number of nodes that must respond before the operation returns
 511:      * @param int $buffer_size When calling `get_range`, the intermediate results need
 512:      *        to be buffered if we are fetching many rows, otherwise the Cassandra
 513:      *        server will overallocate memory and fail.  This is the size of
 514:      *        that buffer in number of rows.
 515:      *
 516:      * @return phpcassa\Iterator\RangeColumnFamilyIterator
 517:      */
 518:     public function get_range($key_start="",
 519:         $key_finish="",
 520:         $row_count=self::DEFAULT_ROW_COUNT,
 521:         $column_slice=null,
 522:         $column_names=null,
 523:         $consistency_level=null,
 524:         $buffer_size=null) {
 525: 
 526:         $cp = $this->create_column_parent();
 527:         $slice = $this->create_slice_predicate($column_names, $column_slice);
 528: 
 529:         return $this->_get_range($key_start, $key_finish, $row_count,
 530:             $cp, $slice, $consistency_level, $buffer_size);
 531:     }
 532: 
 533:     protected function _get_range($start, $finish, $count, $cp, $slice, $cl, $buffsz) {
 534: 
 535:         if ($buffsz == null)
 536:             $buffsz = $this->buffer_size;
 537:         if ($buffsz < 2) {
 538:             $ire = new InvalidRequestException();
 539:             $ire->message = 'buffer_size cannot be less than 2';
 540:             throw $ire;
 541:         }
 542: 
 543:         return new RangeColumnFamilyIterator($this, $buffsz, $start, $finish,
 544:             $count, $cp, $slice, $this->rcl($cl));
 545:     }
 546: 
 547: 
 548: 
 549: 
 550: 
 551:     /**
 552:      * Get an iterator over a token range.
 553:      * 
 554:      * Example usages of get_range_by_token() :
 555:      *   1. You can iterate part of the ring.
 556:      *      This helps to start several processes,
 557:      *      that scans the ring in parallel in fashion similar to Hadoop.
 558:      *      Then full ring scan will take only 1 / <number of processes>
 559:      * 
 560:      *   2. You can iterate "local" token range for each Cassandra node.
 561:      *      You can start one process on each cassandra node,
 562:      *      that iterates only on token range for this node.
 563:      *      In this case you minimize the network traffic between nodes.
 564:      * 
 565:      *   3. Combinations of the above.
 566:      *
 567:      * @param string $token_start fetch rows with a token >= this
 568:      * @param string $token_finish fetch rows with a token <= this
 569:      * @param int $row_count limit the number of rows returned to this amount
 570:      * @param \phpcassa\ColumnSlice a slice of columns to fetch, or null
 571:      * @param mixed[] $column_names limit the columns or super columns fetched to this list
 572:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 573:      *        number of nodes that must respond before the operation returns
 574:      * @param int $buffer_size When calling `get_range`, the intermediate results need
 575:      *        to be buffered if we are fetching many rows, otherwise the Cassandra
 576:      *        server will overallocate memory and fail.  This is the size of
 577:      *        that buffer in number of rows.
 578:      *
 579:      * @return phpcassa\Iterator\RangeColumnFamilyIterator
 580:      */
 581:     public function get_range_by_token($token_start="",
 582:                               $token_finish="",
 583:                               $row_count=self::DEFAULT_ROW_COUNT,
 584:                               $column_slice=null,
 585:                               $column_names=null,
 586:                               $consistency_level=null,
 587:                               $buffer_size=null) {
 588: 
 589:         $cp = $this->create_column_parent();
 590:         $slice = $this->create_slice_predicate($column_names, $column_slice);
 591: 
 592:         return $this->_get_range_by_token($token_start, $token_finish, $row_count,
 593:             $cp, $slice, $consistency_level, $buffer_size);
 594:     }
 595: 
 596:     protected function _get_range_by_token($tokenstart, $tokenfinish, $count, $cp, $slice, $cl, $buffsz) {
 597: 
 598:         if ($buffsz == null)
 599:             $buffsz = $this->buffer_size;
 600:         if ($buffsz < 2) {
 601:             $ire = new InvalidRequestException();
 602:             $ire->message = 'buffer_size cannot be less than 2';
 603:             throw $ire;
 604:         }
 605: 
 606:         return new RangeTokenColumnFamilyIterator($this, $buffsz, $tokenstart, $tokenfinish,
 607:                                              $count, $cp, $slice, $this->rcl($cl));
 608:     }
 609: 
 610: 
 611: 
 612: 
 613: 
 614:     /**
 615:      * Fetch a set of rows from this column family based on an index clause.
 616:      *
 617:      * @param phpcassa\Index\IndexClause $index_clause limits the keys that are returned based
 618:      *        on expressions that compare the value of a column to a given value.  At least
 619:      *        one of the expressions in the IndexClause must be on an indexed column.
 620:      * @param phpcassa\ColumnSlice a slice of columns to fetch, or null
 621:      * @param mixed[] $column_names limit the columns or super columns fetched to this list
 622:      * number of nodes that must respond before the operation returns
 623:      *
 624:      * @return phpcassa\Iterator\IndexedColumnFamilyIterator
 625:      */
 626:     public function get_indexed_slices($index_clause,
 627:         $column_slice=null,
 628:         $column_names=null,
 629:         $consistency_level=null,
 630:         $buffer_size=null) {
 631: 
 632:         if ($buffer_size == null)
 633:             $buffer_size = $this->buffer_size;
 634:         if ($buffer_size < 2) {
 635:             $ire = new InvalidRequestException();
 636:             $ire->message = 'buffer_size cannot be less than 2';
 637:             throw $ire;
 638:         }
 639: 
 640:         $new_clause = new IndexClause();
 641:         foreach($index_clause->expressions as $expr) {
 642:             $new_expr = new IndexExpression();
 643:             $new_expr->column_name = $this->pack_name($expr->column_name);
 644:             $new_expr->value = $this->pack_value($expr->value, $new_expr->column_name);
 645:             $new_expr->op = $expr->op;
 646:             $new_clause->expressions[] = $new_expr;
 647:         }
 648:         $new_clause->start_key = $index_clause->start_key;
 649:         $new_clause->count = $index_clause->count;
 650: 
 651:         $column_parent = $this->create_column_parent();
 652:         $predicate = $this->create_slice_predicate($column_names, $column_slice);
 653: 
 654:         return new IndexedColumnFamilyIterator($this, $new_clause, $buffer_size,
 655:             $column_parent, $predicate,
 656:             $this->rcl($consistency_level));
 657:     }
 658: 
 659:     /**
 660:      * Insert or update columns in a row.
 661:      *
 662:      * @param string $key the row to insert or update the columns in
 663:      * @param mixed[] $columns array(column_name => column_value) the columns to insert or update
 664:      * @param int $timestamp the timestamp to use for this insertion. Leaving this as null will
 665:      *        result in a timestamp being generated for you
 666:      * @param int $ttl time to live for the columns; after ttl seconds they will be deleted
 667:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 668:      *        number of nodes that must respond before the operation returns
 669:      *
 670:      * @return int the timestamp for the operation
 671:      */
 672:     public function insert($key,
 673:         $columns,
 674:         $timestamp=null,
 675:         $ttl=null,
 676:         $consistency_level=null) {
 677: 
 678:         if ($timestamp === null)
 679:             $timestamp = Clock::get_time();
 680: 
 681:         $cfmap = array();
 682:         $packed_key = $this->pack_key($key);
 683:         $cfmap[$packed_key][$this->column_family] =
 684:             $this->make_mutation($columns, $timestamp, $ttl);
 685: 
 686:         return $this->pool->call("batch_mutate", $cfmap, $this->wcl($consistency_level));
 687:     }
 688: 
 689:     /**
 690:      * Insert or update columns in multiple rows. Note that this operation is only atomic
 691:      * per row.
 692:      *
 693:      * @param array $rows an array of keys, each of which maps to an array of columns. This
 694:      *        looks like array(key => array(column_name => column_value))
 695:      * @param int $timestamp the timestamp to use for these insertions. Leaving this as null will
 696:      *        result in a timestamp being generated for you
 697:      * @param int $ttl time to live for the columns; after ttl seconds they will be deleted
 698:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 699:      *        number of nodes that must respond before the operation returns
 700:      *
 701:      * @return int the timestamp for the operation
 702:      */
 703:     public function batch_insert($rows, $timestamp=null, $ttl=null, $consistency_level=null) {
 704:         if ($timestamp === null)
 705:             $timestamp = Clock::get_time();
 706: 
 707:         $cfmap = array();
 708:         if ($this->insert_format == self::DICTIONARY_FORMAT) {
 709:             foreach($rows as $key => $columns) {
 710:                 $packed_key = $this->pack_key($key, $handle_serialize=true);
 711:                 $ttlRow = $this->get_ttl($ttl, $packed_key);
 712:                 $cfmap[$packed_key][$this->column_family] =
 713:                     $this->make_mutation($columns, $timestamp, $ttlRow);
 714:             }
 715:         } else if ($this->insert_format == self::ARRAY_FORMAT) {
 716:             foreach($rows as $row) {
 717:                 list($key, $columns) = $row;
 718:                 $packed_key = $this->pack_key($key);
 719:                 $ttlRow = $this->get_ttl($ttl, $packed_key);
 720:                 $cfmap[$packed_key][$this->column_family] =
 721:                     $this->make_mutation($columns, $timestamp, $ttlRow);
 722:             }
 723:         } else {
 724:             throw new UnexpectedValueException("Bad insert_format selected");
 725:         }
 726: 
 727:         return $this->pool->call("batch_mutate", $cfmap, $this->wcl($consistency_level));
 728:     }
 729: 
 730:     /**
 731:      * Create a new phpcassa\Batch\CfMutator instance targetting this column
 732:      * family.
 733:      *
 734:      * @param phpcassa\ConsistencyLevel $consistency_level the consistency
 735:      *        level the batch mutator will write at; if left as NULL, this
 736:      *        defaults to phpcassa\ColumnFamily::write_consistency_level.
 737:      * @return a phpcassa\Batch\CfMutator instance
 738:      */
 739:     public function batch($consistency_level=null) {
 740:         return new CfMutator($this, $consistency_level);
 741:     }
 742: 
 743:     /**
 744:      * Delete a row or a set of columns or supercolumns from a row.
 745:      *
 746:      * @param string $key the row to remove columns from
 747:      * @param mixed[] $column_names the columns or supercolumns to remove.
 748:      *                If null, the entire row will be removed.
 749:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 750:      *        number of nodes that must respond before the operation returns
 751:      *
 752:      * @return int the timestamp for the operation
 753:      */
 754:     public function remove($key, $column_names=null, $consistency_level=null) {
 755: 
 756:         if ($column_names === null || count($column_names) == 1)
 757:         {
 758:             $cp = new ColumnPath();
 759:             $cp->column_family = $this->column_family;
 760: 
 761:             if ($column_names !== null) {
 762:                 if ($this->is_super)
 763:                     $cp->super_column = $this->pack_name($column_names[0], true);
 764:                 else
 765:                     $cp->column = $this->pack_name($column_names[0], false);
 766:             }
 767:             return $this->_remove_single($key, $cp, $consistency_level);
 768:         } else {
 769:             $deletion = new Deletion();
 770:             if ($column_names !== null)
 771:                 $deletion->predicate = $this->create_slice_predicate($column_names, null);
 772: 
 773:             return $this->_remove_multi($key, $deletion, $consistency_level);
 774:         }
 775:     }
 776: 
 777:     protected function _remove_single($key, $cp, $cl) {
 778:         $timestamp = Clock::get_time();
 779:         $packed_key = $this->pack_key($key);
 780:         return $this->pool->call("remove", $packed_key, $cp, $timestamp,
 781:             $this->wcl($cl));
 782:     }
 783: 
 784:     protected function _remove_multi($key, $deletion, $cl) {
 785:         $timestamp = Clock::get_time();
 786:         $deletion->timestamp = $timestamp;
 787:         $mutation = new Mutation();
 788:         $mutation->deletion = $deletion;
 789: 
 790:         $packed_key = $this->pack_key($key);
 791:         $mut_map = array($packed_key => array($this->column_family => array($mutation)));
 792: 
 793:         return $this->pool->call("batch_mutate", $mut_map, $this->wcl($cl));
 794:     }
 795: 
 796:     /**
 797:      * Remove a counter at the specified location.
 798:      *
 799:      * Note that counters have limited support for deletes: if you remove a
 800:      * counter, you must wait to issue any following update until the delete
 801:      * has reached all the nodes and all of them have been fully compacted.
 802:      *
 803:      * Available in Cassandra 0.8.0 and later.
 804:      *
 805:      * @param string $key the key for the row
 806:      * @param mixed $column the column name of the counter
 807:      * @param ConsistencyLevel $consistency_level affects the guaranteed
 808:      *        number of nodes that must respond before the operation returns
 809:      */
 810:     public function remove_counter($key, $column, $consistency_level=null) {
 811:         $cp = new ColumnPath();
 812:         $packed_key = $this->pack_key($key);
 813:         $cp->column_family = $this->column_family;
 814:         $cp->column = $this->pack_name($column);
 815:         $this->pool->call("remove_counter", $packed_key, $cp,
 816:             $this->wcl($consistency_level));
 817:     }
 818: 
 819:     /**
 820:      * Mark the entire column family as deleted.
 821:      *
 822:      * From the user's perspective a successful call to truncate will result
 823:      * complete data deletion from cfname. Internally, however, disk space
 824:      * will not be immediatily released, as with all deletes in cassandra,
 825:      * this one only marks the data as deleted.
 826:      *
 827:      * The operation succeeds only if all hosts in the cluster at available
 828:      * and will throw an UnavailableException if some hosts are down.
 829:      */
 830:     public function truncate() {
 831:         return $this->pool->call("truncate", $this->column_family);
 832:     }
 833: 
 834: 
 835:     /********************* Helper functions *************************/
 836: 
 837:     protected function rcl($read_consistency_level) {
 838:         if ($read_consistency_level === null)
 839:             return $this->read_consistency_level;
 840:         else
 841:             return $read_consistency_level;
 842:     }
 843: 
 844:     protected function wcl($write_consistency_level) {
 845:         if ($write_consistency_level === null)
 846:             return $this->write_consistency_level;
 847:         else
 848:             return $write_consistency_level;
 849:     }
 850: 
 851:     protected function create_slice_predicate(
 852:         $column_names,
 853:         $column_slice,
 854:         $is_super=NULL,
 855:         $default_count=ColumnSlice::DEFAULT_COLUMN_COUNT)
 856:     {
 857:         if ($is_super === null)
 858:             $is_super = $this->is_super;
 859: 
 860:         $predicate = new SlicePredicate();
 861:         if ($column_names !== null) {
 862:             $packed_cols = array();
 863:             foreach($column_names as $col)
 864:                 $packed_cols[] = $this->pack_name($col, $is_super);
 865:             $predicate->column_names = $packed_cols;
 866:         } else {
 867:             if ($column_slice !== null) {
 868:                 $slice_range = new SliceRange();
 869: 
 870:                 $column_start = $column_slice->start;
 871:                 if ($column_start !== null and $column_start != '') {
 872:                     if ($column_slice->reversed)
 873:                         $slice_end = self::SLICE_FINISH;
 874:                     else
 875:                         $slice_end = self::SLICE_START;
 876: 
 877:                     $slice_range->start = $this->pack_name(
 878:                         $column_start, $is_super, $slice_end);
 879:                 } else {
 880:                     $slice_range->start = '';
 881:                 }
 882: 
 883:                 $column_finish = $column_slice->finish;
 884:                 if ($column_finish !== null and $column_finish != '') {
 885:                     if ($column_slice->reversed)
 886:                         $slice_end = self::SLICE_START;
 887:                     else
 888:                         $slice_end = self::SLICE_FINISH;
 889: 
 890:                     $slice_range->finish = $this->pack_name(
 891:                         $column_finish, $is_super, $slice_end);
 892:                 } else {
 893:                     $slice_range->finish = '';
 894:                 }
 895: 
 896:                 $slice_range->reversed = $column_slice->reversed;
 897:                 $slice_range->count = $column_slice->count;
 898:             } else {
 899:                 $slice_range = new ColumnSlice("", "", $default_count);
 900:             }
 901:             $predicate->slice_range = $slice_range;
 902:         }
 903:         return $predicate;
 904:     }
 905: 
 906:     protected function create_column_parent($super_column=null) {
 907:         $column_parent = new ColumnParent();
 908:         $column_parent->column_family = $this->column_family;
 909:         if ($super_column !== null) {
 910:             $column_parent->super_column = $this->pack_name($super_column, true);
 911:         } else {
 912:             $column_parent->super_column = null;
 913:         }
 914:         return $column_parent;
 915:     }
 916: 
 917:     /** @internal */
 918:     const NON_SLICE = 0;
 919:     /** @internal */
 920:     const SLICE_START = 1;
 921:     /** @internal */
 922:     const SLICE_FINISH = 2;
 923: 
 924:     /** @internal */
 925:     public function pack_name($value,
 926:         $is_supercol_name=false,
 927:         $slice_end=self::NON_SLICE,
 928:         $handle_serialize=false) {
 929:         if (!$this->autopack_names)
 930:             return $value;
 931:         if ($slice_end === self::NON_SLICE && ($value === null || $value === "")) {
 932:             throw new \UnexpectedValueException("Column names may not be null");
 933:         }
 934:         if ($is_supercol_name)
 935:             return $this->supercol_name_type->pack($value, true, $slice_end, $handle_serialize);
 936:         else
 937:             return $this->col_name_type->pack($value, true, $slice_end, $handle_serialize);
 938:     }
 939: 
 940:     protected function unpack_name($b, $is_supercol_name=false, $handle_serialize=true) {
 941:         if (!$this->autopack_names || $b === null)
 942:             return $b;
 943: 
 944:         if ($is_supercol_name)
 945:             return $this->supercol_name_type->unpack($b, $handle_serialize);
 946:         else
 947:             return $this->col_name_type->unpack($b, $handle_serialize);
 948:     }
 949: 
 950:     /** @internal */
 951:     public function pack_key($key, $handle_serialize=false) {
 952:         if (!$this->autopack_keys || $key === "")
 953:             return $key;
 954:         return $this->key_type->pack($key, true, null, $handle_serialize);
 955:     }
 956: 
 957:     /** @internal */
 958:     public function unpack_key($b, $handle_serialize=true) {
 959:         if (!$this->autopack_keys)
 960:             return $b;
 961:         return $this->key_type->unpack($b, $handle_serialize);
 962:     }
 963: 
 964:     protected function get_data_type_for_col($col_name) {
 965:         if (isset($this->col_type_dict[$col_name]))
 966:             return $this->col_type_dict[$col_name];
 967:         else
 968:             return $this->cf_data_type;
 969:     }
 970: 
 971:     protected function pack_value($value, $col_name) {
 972:         if (!$this->autopack_values)
 973:             return $value;
 974: 
 975:         if (isset($this->col_type_dict[$col_name])) {
 976:             $dtype = $this->col_type_dict[$col_name];
 977:             return $dtype->pack($value, false);
 978:         } else {
 979:             return $this->cf_data_type->pack($value, false);
 980:         }
 981:     }
 982: 
 983:     protected function unpack_value($value, $col_name) {
 984:         if (!$this->autopack_values)
 985:             return $value;
 986: 
 987:         if (isset($this->col_type_dict[$col_name])) {
 988:             $dtype = $this->col_type_dict[$col_name];
 989:             return $dtype->unpack($value, false);
 990:         } else {
 991:             return $this->cf_data_type->unpack($value, false);
 992:         }
 993:     }
 994: 
 995:     /** @internal */
 996:     public function keyslices_to_array($keyslices) {
 997:         $ret = array();
 998:         if ($this->return_format == self::DICTIONARY_FORMAT) {
 999:             foreach($keyslices as $keyslice) {
1000:                 $key = $this->unpack_key($keyslice->key);
1001:                 $columns = $keyslice->columns;
1002:                 $ret[$key] = $this->unpack_coscs($columns);
1003:             }
1004:         } else {
1005:             foreach($keyslices as $keyslice) {
1006:                 $key = $this->unpack_key($keyslice->key, false);
1007:                 $columns = $keyslice->columns;
1008:                 $ret[] = array($key, $this->unpack_coscs($columns));
1009:             }
1010:         }
1011:         return $ret;
1012:     }
1013: 
1014:     protected function unpack_coscs($array_of_coscs) {
1015:         if(count($array_of_coscs) == 0)
1016:             return $array_of_coscs;
1017: 
1018:         $format = $this->return_format;
1019:         if ($format == self::DICTIONARY_FORMAT) {
1020:             return $this->coscs_to_dict($array_of_coscs);
1021:         } else if ($format == self::ARRAY_FORMAT) {
1022:             return $this->coscs_to_array($array_of_coscs);
1023:         } else { // self::OBJECT_FORMAT
1024:             return $this->unpack_coscs_attrs($array_of_coscs);
1025:         }
1026:     }
1027: 
1028:     protected function coscs_to_dict($array_of_coscs) {
1029:         $ret = array();
1030:         $first = $array_of_coscs[0];
1031:         if($first->column) { // normal columns
1032:             foreach($array_of_coscs as $cosc) {
1033:                 $name = $this->unpack_name($cosc->column->name, false);
1034:                 $value = $this->unpack_value($cosc->column->value, $cosc->column->name);
1035:                 $ret[$name] = $value;
1036:             }
1037:         } else if ($first->counter_column) {
1038:             foreach($array_of_coscs as $cosc) {
1039:                 $name = $this->unpack_name($cosc->counter_column->name, false);
1040:                 $ret[$name] = $cosc->counter_column->value;
1041:             }
1042:         }
1043:         return $ret;
1044:     }
1045: 
1046:     protected function coscs_to_array($array_of_coscs) {
1047:         $ret = array();
1048:         $first = $array_of_coscs[0];
1049:         if($first->column) { // normal columns
1050:             foreach($array_of_coscs as $cosc) {
1051:                 $name = $this->unpack_name(
1052:                     $cosc->column->name, false, $handle_serialize=false);
1053:                 $value = $this->unpack_value($cosc->column->value, $cosc->column->name);
1054:                 $ret[] = array($name, $value);
1055:             }
1056:         } else if ($first->counter_column) {
1057:             foreach($array_of_coscs as $cosc) {
1058:                 $name = $this->unpack_name(
1059:                     $cosc->counter_column->name, false, $handle_serialize=false);
1060:                 $ret[] = array($name, $cosc->counter_column->value);
1061:             }
1062:         }
1063:         return $ret;
1064:     }
1065: 
1066:     protected function unpack_coscs_attrs($array_of_coscs) {
1067:         $ret = array();
1068:         $first = $array_of_coscs[0];
1069:         if($first->column) { // normal columns
1070:             foreach($array_of_coscs as $cosc) {
1071:                 $col = $cosc->column;
1072:                 $col->value = $this->unpack_value($col->value, $col->name);
1073:                 $col->name = $this->unpack_name(
1074:                     $col->name, false, $handle_serialize=false);
1075:                 $ret[] = $col;
1076:             }
1077:         } else { // counter columns
1078:             foreach($array_of_coscs as $cosc) {
1079:                 $col = $cosc->counter_column;
1080:                 $col->name = $this->unpack_name(
1081:                     $col->name, false, $handle_serialize=false);
1082:                 $ret[] = $col;
1083:             }
1084:         }
1085:         return $ret;
1086:     }
1087: 
1088:     /** @internal */
1089:     public function make_mutation($array, $timestamp=null, $ttl=null) {
1090:         $coscs = $this->pack_data($array, $timestamp, $ttl);
1091:         $ret = array();
1092:         foreach($coscs as $cosc) {
1093:             $mutation = new Mutation();
1094:             $mutation->column_or_supercolumn = $cosc;
1095:             $ret[] = $mutation;
1096:         }
1097:         return $ret;
1098:     }
1099: 
1100:     protected function pack_data($data, $timestamp=null, $ttl=null) {
1101:         if($timestamp === null)
1102:             $timestamp = Clock::get_time();
1103: 
1104:         if ($this->insert_format == self::DICTIONARY_FORMAT) {
1105:             return $this->dict_to_coscs($data, $timestamp, $ttl);
1106:         } else if ($this->insert_format == self::ARRAY_FORMAT) {
1107:             return $this->array_to_coscs($data, $timestamp, $ttl);
1108:         } else {
1109:             throw new UnexpectedValueException("Bad insert_format selected");
1110:         }
1111:     }
1112: 
1113:     protected function dict_to_coscs($data, $timestamp, $ttl) {
1114:         $have_counters = $this->has_counters;
1115:         $ret = array();
1116:         foreach ($data as $name => $value) {
1117:             $c_or_sc = new ColumnOrSuperColumn();
1118:             if($have_counters) {
1119:                 $sub = new CounterColumn();
1120:                 $c_or_sc->counter_column = $sub;
1121:             } else {
1122:                 $sub = new Column();
1123:                 $c_or_sc->column = $sub;
1124:                 $sub->timestamp = $timestamp;
1125:                 $sub->ttl = $this->get_ttl($ttl, $name);
1126:             }
1127:             $sub->name = $this->pack_name(
1128:                 $name, false, self::NON_SLICE, true);
1129:             $sub->value = $this->pack_value($value, $sub->name);
1130:             $ret[] = $c_or_sc;
1131:         }
1132:         return $ret;
1133:     }
1134: 
1135:     protected function array_to_coscs($data, $timestamp, $ttl) {
1136:         $have_counters = $this->has_counters;
1137:         $ret = array();
1138:         foreach ($data as $col) {
1139:             list($name, $value) = $col;
1140:             $c_or_sc = new ColumnOrSuperColumn();
1141:             if($have_counters) {
1142:                 $sub = new CounterColumn();
1143:                 $c_or_sc->counter_column = $sub;
1144:             } else {
1145:                 $sub = new Column();
1146:                 $c_or_sc->column = $sub;
1147:                 $sub->timestamp = $timestamp;
1148:                 $sub->ttl = $this->get_ttl($ttl, $name);
1149:             }
1150:             $sub->name = $this->pack_name(
1151:                 $name, false, self::NON_SLICE, false);
1152:             $sub->value = $this->pack_value($value, $sub->name);
1153:             $ret[] = $c_or_sc;
1154:         }
1155:         return $ret;
1156:     }
1157: 
1158:  /**
1159:   * 
1160:   * Return a ttl from an array of TTL or uniq int ttl and a key in this array 
1161:   * @param int/mixed[] $ttl time to live for the columns or rows; after ttl seconds they will be deleted
1162:   * @param string $packed_key the row or column to get the ttl from
1163:   * @return int or NULL in case of no ttl was found
1164:   */
1165:     protected function get_ttl($ttl, $packed_key) {
1166:         if(is_array($ttl)){
1167:             if(isset($ttl[$packed_key]))
1168:                 return $ttl[$packed_key];
1169:             return null;
1170:         } else {
1171:             return $ttl;
1172:         }
1173:     }
1174: }
1175: 
phpcassa API documentation generated by ApiGen 2.8.0