1: <?php
2: namespace phpcassa;
3:
4: use phpcassa\ColumnSlice;
5: use phpcassa\Schema\DataType;
6: use phpcassa\Schema\DataType\BytesType;
7: use phpcassa\Schema\DataType\CompositeType;
8: use phpcassa\Schema\DataType\Serialized;
9:
10: use phpcassa\Iterator\IndexedColumnFamilyIterator;
11: use phpcassa\Iterator\RangeColumnFamilyIterator;
12: use phpcassa\Iterator\RangeTokenColumnFamilyIterator;
13:
14: use phpcassa\Batch\CfMutator;
15:
16: use phpcassa\Util\Clock;
17:
18: use cassandra\InvalidRequestException;
19: use cassandra\NotFoundException;
20:
21: use cassandra\Mutation;
22: use cassandra\Deletion;
23: use cassandra\ConsistencyLevel;
24: use cassandra\Column;
25: use cassandra\ColumnParent;
26: use cassandra\ColumnPath;
27: use cassandra\ColumnOrSuperColumn;
28: use cassandra\CounterColumn;
29: use cassandra\IndexClause;
30: use cassandra\IndexExpression;
31: use cassandra\SlicePredicate;
32: use cassandra\SliceRange;
33:
34: 35: 36: 37: 38:
39: abstract class AbstractColumnFamily {
40:
41:
42: const DEFAULT_ROW_COUNT = 100;
43:
44: const DEFAULT_BUFFER_SIZE = 100;
45:
46: 47: 48: 49: 50: 51: 52: 53: 54: 55: 56: 57:
58: const DICTIONARY_FORMAT = 1;
59:
60: 61: 62: 63: 64: 65: 66: 67: 68: 69: 70: 71:
72: const ARRAY_FORMAT = 2;
73:
74: 75: 76: 77: 78: 79: 80: 81: 82: 83: 84: 85: 86:
87: const OBJECT_FORMAT = 3;
88:
89:
90: public $column_family;
91:
92:
93: public $is_super;
94:
95:
96: protected $cf_data_type;
97:
98:
99: protected $col_name_type;
100:
101:
102: protected $supercol_name_type;
103:
104:
105: protected $col_type_dict;
106:
107:
108: 109: 110: 111:
112: public $autopack_names;
113:
114: 115: 116: 117:
118: public $autopack_values;
119:
120: 121: 122: 123:
124: public $autopack_keys;
125:
126:
127: public $read_consistency_level = ConsistencyLevel::ONE;
128:
129: public $write_consistency_level = ConsistencyLevel::ONE;
130:
131: 132: 133: 134: 135: 136:
137: public $return_format = self::DICTIONARY_FORMAT;
138:
139: 140: 141: 142: 143: 144:
145: public $insert_format = self::DICTIONARY_FORMAT;
146:
147: 148: 149: 150: 151: 152:
153: public $buffer_size = 100;
154:
155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175:
176: public function __construct($pool,
177: $column_family,
178: $autopack_names=true,
179: $autopack_values=true,
180: $read_consistency_level=ConsistencyLevel::ONE,
181: $write_consistency_level=ConsistencyLevel::ONE,
182: $buffer_size=self::DEFAULT_BUFFER_SIZE) {
183:
184: $this->pool = $pool;
185: $this->column_family = $column_family;
186: $this->read_consistency_level = $read_consistency_level;
187: $this->write_consistency_level = $write_consistency_level;
188: $this->buffer_size = $buffer_size;
189:
190: $ks = $this->pool->describe_keyspace();
191:
192: $cf_def = null;
193: foreach($ks->cf_defs as $cfdef) {
194: if ($cfdef->name == $this->column_family) {
195: $cf_def = $cfdef;
196: break;
197: }
198: }
199: if ($cf_def == null)
200: throw new NotFoundException();
201: else
202: $this->cfdef = $cf_def;
203:
204: $this->cf_data_type = new BytesType();
205: $this->col_name_type = new BytesType();
206: $this->supercol_name_type = new BytesType();
207: $this->key_type = new BytesType();
208: $this->col_type_dict = array();
209:
210: $this->is_super = $this->cfdef->column_type === 'Super';
211: $this->has_counters = self::endswith(
212: $this->cfdef->default_validation_class,
213: "CounterColumnType");
214:
215: $this->set_autopack_names($autopack_names);
216: $this->set_autopack_values($autopack_values);
217: $this->set_autopack_keys(true);
218: }
219:
220: protected static function endswith($str, $suffix) {
221: $suffix_len = strlen($suffix);
222: return substr_compare($str, $suffix, strlen($str)-$suffix_len, $suffix_len) === 0;
223: }
224:
225: 226: 227:
228: public function set_autopack_names($pack_names) {
229: if ($pack_names) {
230: if ($this->autopack_names)
231: return;
232: $this->autopack_names = true;
233: if (!$this->is_super) {
234: $this->col_name_type = DataType::get_type_for($this->cfdef->comparator_type);
235: } else {
236: $this->col_name_type = DataType::get_type_for($this->cfdef->subcomparator_type);
237: $this->supercol_name_type = DataType::get_type_for($this->cfdef->comparator_type);
238: }
239: } else {
240: $this->autopack_names = false;
241: }
242: }
243:
244: 245: 246:
247: public function set_autopack_values($pack_values) {
248: if ($pack_values) {
249: $this->autopack_values = true;
250: $this->cf_data_type = DataType::get_type_for($this->cfdef->default_validation_class);
251: foreach($this->cfdef->column_metadata as $coldef) {
252: $this->col_type_dict[$coldef->name] =
253: DataType::get_type_for($coldef->validation_class);
254: }
255: } else {
256: $this->autopack_values = false;
257: }
258: }
259:
260: 261: 262: 263: 264:
265: public function set_autopack_keys($pack_keys) {
266: if ($pack_keys) {
267: $this->autopack_keys = true;
268: if (property_exists('\cassandra\CfDef', "key_validation_class")) {
269: $this->key_type = DataType::get_type_for($this->cfdef->key_validation_class);
270: } else {
271: $this->key_type = new BytesType();
272: }
273: } else {
274: $this->autopack_keys = false;
275: }
276: }
277:
278: 279: 280: 281: 282: 283: 284: 285: 286: 287: 288:
289: public function get($key,
290: $column_slice=null,
291: $column_names=null,
292: $consistency_level=null) {
293:
294: $column_parent = $this->create_column_parent();
295: $predicate = $this->create_slice_predicate($column_names, $column_slice);
296: return $this->_get($key, $column_parent, $predicate, $consistency_level);
297: }
298:
299: protected function _get($key, $cp, $slice, $cl) {
300: $resp = $this->pool->call("get_slice",
301: $this->pack_key($key),
302: $cp, $slice, $this->rcl($cl));
303:
304: if (count($resp) == 0)
305: throw new NotFoundException();
306:
307: return $this->unpack_coscs($resp);
308: }
309:
310: 311: 312: 313: 314: 315: 316: 317: 318: 319: 320: 321: 322: 323:
324: public function multiget($keys,
325: $column_slice=null,
326: $column_names=null,
327: $consistency_level=null,
328: $buffer_size=16) {
329:
330: $cp = $this->create_column_parent();
331: $slice = $this->create_slice_predicate($column_names, $column_slice);
332:
333: return $this->_multiget($keys, $cp, $slice, $consistency_level, $buffer_size);
334: }
335:
336: protected function _multiget($keys, $cp, $slice, $cl, $buffsz) {
337: $ret = array();
338:
339: $have_dict = ($this->return_format == self::DICTIONARY_FORMAT);
340: $should_serialize = ($this->key_type instanceof Serialized);
341: if ($have_dict) {
342: if ($should_serialize) {
343: foreach($keys as $key) {
344: $ret[serialize($key)] = null;
345: }
346: } else {
347: foreach($keys as $key) {
348: $ret[$key] = null;
349: }
350: }
351: }
352:
353: $cl = $this->rcl($cl);
354:
355: $resp = array();
356: if(count($keys) <= $buffsz) {
357: $resp = $this->pool->call("multiget_slice",
358: array_map(array($this, "pack_key"), $keys),
359: $cp, $slice, $cl);
360: } else {
361: $subset_keys = array();
362: $i = 0;
363: foreach($keys as $key) {
364: $i += 1;
365: $subset_keys[] = $key;
366: if ($i == $buffsz) {
367: $sub_resp = $this->pool->call("multiget_slice",
368: array_map(array($this, "pack_key"), $subset_keys),
369: $cp, $slice, $cl);
370: $subset_keys = array();
371: $i = 0;
372: $resp = $resp + $sub_resp;
373: }
374: }
375: if (count($subset_keys) != 0) {
376: $sub_resp = $this->pool->call("multiget_slice",
377: array_map(array($this, "pack_key"), $subset_keys),
378: $cp, $slice, $cl);
379: $resp = $resp + $sub_resp;
380: }
381: }
382:
383: $non_empty_keys = array();
384: foreach($resp as $key => $val) {
385: if (count($val) > 0) {
386: $unpacked_key = $this->unpack_key($key, $have_dict);
387:
388: if ($have_dict) {
389: $non_empty_keys[$unpacked_key] = 1;
390: $ret[$unpacked_key] = $this->unpack_coscs($val);
391: } else {
392: $ret[] = array($unpacked_key, $this->unpack_coscs($val));
393: }
394: }
395: }
396:
397: if ($have_dict) {
398: if ($should_serialize) {
399: foreach($keys as $key) {
400: $skey = serialize($key);
401: if (!isset($non_empty_keys[$skey]))
402: unset($ret[$skey]);
403: }
404: } else {
405: foreach($keys as $key) {
406: if (!isset($non_empty_keys[$key]))
407: unset($ret[$key]);
408: }
409: }
410: }
411:
412: return $ret;
413: }
414:
415: 416: 417: 418: 419: 420: 421: 422: 423: 424: 425:
426: public function get_count($key,
427: $column_slice=null,
428: $column_names=null,
429: $consistency_level=null) {
430:
431: $cp = $this->create_column_parent();
432: $slice = $this->create_slice_predicate(
433: $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);
434: return $this->_get_count($key, $cp, $slice, $consistency_level);
435: }
436:
437: protected function _get_count($key, $cp, $slice, $cl) {
438: $packed_key = $this->pack_key($key);
439: return $this->pool->call("get_count", $packed_key, $cp, $slice, $this->rcl($cl));
440: }
441:
442: 443: 444: 445: 446: 447: 448: 449: 450: 451: 452:
453: public function multiget_count($keys,
454: $column_slice=null,
455: $column_names=null,
456: $consistency_level=null) {
457:
458: $cp = $this->create_column_parent();
459: $slice = $this->create_slice_predicate(
460: $column_names, $column_slice, null, ColumnSlice::MAX_COUNT);
461:
462: return $this->_multiget_count($keys, $cp, $slice, $consistency_level);
463: }
464:
465: protected function _multiget_count($keys, $cp, $slice, $cl) {
466:
467: $ret = array();
468: $have_dict = ($this->return_format == self::DICTIONARY_FORMAT);
469: if ($have_dict) {
470: foreach($keys as $key) {
471: $ret[$key] = null;
472: }
473: }
474:
475: $packed_keys = array_map(array($this, "pack_key"), $keys);
476: $results = $this->pool->call("multiget_count", $packed_keys, $cp, $slice,
477: $this->rcl($cl));
478:
479: $non_empty_keys = array();
480: foreach ($results as $key => $count) {
481: $unpacked_key = $this->unpack_key($key, $have_dict);
482:
483: if ($have_dict) {
484: $non_empty_keys[$unpacked_key] = 1;
485: $ret[$unpacked_key] = $count;
486: } else {
487: $ret[] = array($unpacked_key, $count);
488: }
489: }
490:
491: if ($have_dict) {
492: foreach($keys as $key) {
493: if (!isset($non_empty_keys[$key]))
494: unset($ret[$key]);
495: }
496: }
497:
498: return $ret;
499: }
500:
501: 502: 503: 504: 505: 506: 507: 508: 509: 510: 511: 512: 513: 514: 515: 516: 517:
518: public function get_range($key_start="",
519: $key_finish="",
520: $row_count=self::DEFAULT_ROW_COUNT,
521: $column_slice=null,
522: $column_names=null,
523: $consistency_level=null,
524: $buffer_size=null) {
525:
526: $cp = $this->create_column_parent();
527: $slice = $this->create_slice_predicate($column_names, $column_slice);
528:
529: return $this->_get_range($key_start, $key_finish, $row_count,
530: $cp, $slice, $consistency_level, $buffer_size);
531: }
532:
533: protected function _get_range($start, $finish, $count, $cp, $slice, $cl, $buffsz) {
534:
535: if ($buffsz == null)
536: $buffsz = $this->buffer_size;
537: if ($buffsz < 2) {
538: $ire = new InvalidRequestException();
539: $ire->message = 'buffer_size cannot be less than 2';
540: throw $ire;
541: }
542:
543: return new RangeColumnFamilyIterator($this, $buffsz, $start, $finish,
544: $count, $cp, $slice, $this->rcl($cl));
545: }
546:
547:
548:
549:
550:
551: 552: 553: 554: 555: 556: 557: 558: 559: 560: 561: 562: 563: 564: 565: 566: 567: 568: 569: 570: 571: 572: 573: 574: 575: 576: 577: 578: 579: 580:
581: public function get_range_by_token($token_start="",
582: $token_finish="",
583: $row_count=self::DEFAULT_ROW_COUNT,
584: $column_slice=null,
585: $column_names=null,
586: $consistency_level=null,
587: $buffer_size=null) {
588:
589: $cp = $this->create_column_parent();
590: $slice = $this->create_slice_predicate($column_names, $column_slice);
591:
592: return $this->_get_range_by_token($token_start, $token_finish, $row_count,
593: $cp, $slice, $consistency_level, $buffer_size);
594: }
595:
596: protected function _get_range_by_token($tokenstart, $tokenfinish, $count, $cp, $slice, $cl, $buffsz) {
597:
598: if ($buffsz == null)
599: $buffsz = $this->buffer_size;
600: if ($buffsz < 2) {
601: $ire = new InvalidRequestException();
602: $ire->message = 'buffer_size cannot be less than 2';
603: throw $ire;
604: }
605:
606: return new RangeTokenColumnFamilyIterator($this, $buffsz, $tokenstart, $tokenfinish,
607: $count, $cp, $slice, $this->rcl($cl));
608: }
609:
610:
611:
612:
613:
614: 615: 616: 617: 618: 619: 620: 621: 622: 623: 624: 625:
626: public function get_indexed_slices($index_clause,
627: $column_slice=null,
628: $column_names=null,
629: $consistency_level=null,
630: $buffer_size=null) {
631:
632: if ($buffer_size == null)
633: $buffer_size = $this->buffer_size;
634: if ($buffer_size < 2) {
635: $ire = new InvalidRequestException();
636: $ire->message = 'buffer_size cannot be less than 2';
637: throw $ire;
638: }
639:
640: $new_clause = new IndexClause();
641: foreach($index_clause->expressions as $expr) {
642: $new_expr = new IndexExpression();
643: $new_expr->column_name = $this->pack_name($expr->column_name);
644: $new_expr->value = $this->pack_value($expr->value, $new_expr->column_name);
645: $new_expr->op = $expr->op;
646: $new_clause->expressions[] = $new_expr;
647: }
648: $new_clause->start_key = $index_clause->start_key;
649: $new_clause->count = $index_clause->count;
650:
651: $column_parent = $this->create_column_parent();
652: $predicate = $this->create_slice_predicate($column_names, $column_slice);
653:
654: return new IndexedColumnFamilyIterator($this, $new_clause, $buffer_size,
655: $column_parent, $predicate,
656: $this->rcl($consistency_level));
657: }
658:
659: 660: 661: 662: 663: 664: 665: 666: 667: 668: 669: 670: 671:
672: public function insert($key,
673: $columns,
674: $timestamp=null,
675: $ttl=null,
676: $consistency_level=null) {
677:
678: if ($timestamp === null)
679: $timestamp = Clock::get_time();
680:
681: $cfmap = array();
682: $packed_key = $this->pack_key($key);
683: $cfmap[$packed_key][$this->column_family] =
684: $this->make_mutation($columns, $timestamp, $ttl);
685:
686: return $this->pool->call("batch_mutate", $cfmap, $this->wcl($consistency_level));
687: }
688:
689: 690: 691: 692: 693: 694: 695: 696: 697: 698: 699: 700: 701: 702:
703: public function batch_insert($rows, $timestamp=null, $ttl=null, $consistency_level=null) {
704: if ($timestamp === null)
705: $timestamp = Clock::get_time();
706:
707: $cfmap = array();
708: if ($this->insert_format == self::DICTIONARY_FORMAT) {
709: foreach($rows as $key => $columns) {
710: $packed_key = $this->pack_key($key, $handle_serialize=true);
711: $ttlRow = $this->get_ttl($ttl, $packed_key);
712: $cfmap[$packed_key][$this->column_family] =
713: $this->make_mutation($columns, $timestamp, $ttlRow);
714: }
715: } else if ($this->insert_format == self::ARRAY_FORMAT) {
716: foreach($rows as $row) {
717: list($key, $columns) = $row;
718: $packed_key = $this->pack_key($key);
719: $ttlRow = $this->get_ttl($ttl, $packed_key);
720: $cfmap[$packed_key][$this->column_family] =
721: $this->make_mutation($columns, $timestamp, $ttlRow);
722: }
723: } else {
724: throw new UnexpectedValueException("Bad insert_format selected");
725: }
726:
727: return $this->pool->call("batch_mutate", $cfmap, $this->wcl($consistency_level));
728: }
729:
730: 731: 732: 733: 734: 735: 736: 737: 738:
739: public function batch($consistency_level=null) {
740: return new CfMutator($this, $consistency_level);
741: }
742:
743: 744: 745: 746: 747: 748: 749: 750: 751: 752: 753:
754: public function remove($key, $column_names=null, $consistency_level=null) {
755:
756: if ($column_names === null || count($column_names) == 1)
757: {
758: $cp = new ColumnPath();
759: $cp->column_family = $this->column_family;
760:
761: if ($column_names !== null) {
762: if ($this->is_super)
763: $cp->super_column = $this->pack_name($column_names[0], true);
764: else
765: $cp->column = $this->pack_name($column_names[0], false);
766: }
767: return $this->_remove_single($key, $cp, $consistency_level);
768: } else {
769: $deletion = new Deletion();
770: if ($column_names !== null)
771: $deletion->predicate = $this->create_slice_predicate($column_names, null);
772:
773: return $this->_remove_multi($key, $deletion, $consistency_level);
774: }
775: }
776:
777: protected function _remove_single($key, $cp, $cl) {
778: $timestamp = Clock::get_time();
779: $packed_key = $this->pack_key($key);
780: return $this->pool->call("remove", $packed_key, $cp, $timestamp,
781: $this->wcl($cl));
782: }
783:
784: protected function _remove_multi($key, $deletion, $cl) {
785: $timestamp = Clock::get_time();
786: $deletion->timestamp = $timestamp;
787: $mutation = new Mutation();
788: $mutation->deletion = $deletion;
789:
790: $packed_key = $this->pack_key($key);
791: $mut_map = array($packed_key => array($this->column_family => array($mutation)));
792:
793: return $this->pool->call("batch_mutate", $mut_map, $this->wcl($cl));
794: }
795:
796: 797: 798: 799: 800: 801: 802: 803: 804: 805: 806: 807: 808: 809:
810: public function remove_counter($key, $column, $consistency_level=null) {
811: $cp = new ColumnPath();
812: $packed_key = $this->pack_key($key);
813: $cp->column_family = $this->column_family;
814: $cp->column = $this->pack_name($column);
815: $this->pool->call("remove_counter", $packed_key, $cp,
816: $this->wcl($consistency_level));
817: }
818:
819: 820: 821: 822: 823: 824: 825: 826: 827: 828: 829:
830: public function truncate() {
831: return $this->pool->call("truncate", $this->column_family);
832: }
833:
834:
835:
836:
837: protected function rcl($read_consistency_level) {
838: if ($read_consistency_level === null)
839: return $this->read_consistency_level;
840: else
841: return $read_consistency_level;
842: }
843:
844: protected function wcl($write_consistency_level) {
845: if ($write_consistency_level === null)
846: return $this->write_consistency_level;
847: else
848: return $write_consistency_level;
849: }
850:
851: protected function create_slice_predicate(
852: $column_names,
853: $column_slice,
854: $is_super=NULL,
855: $default_count=ColumnSlice::DEFAULT_COLUMN_COUNT)
856: {
857: if ($is_super === null)
858: $is_super = $this->is_super;
859:
860: $predicate = new SlicePredicate();
861: if ($column_names !== null) {
862: $packed_cols = array();
863: foreach($column_names as $col)
864: $packed_cols[] = $this->pack_name($col, $is_super);
865: $predicate->column_names = $packed_cols;
866: } else {
867: if ($column_slice !== null) {
868: $slice_range = new SliceRange();
869:
870: $column_start = $column_slice->start;
871: if ($column_start !== null and $column_start != '') {
872: if ($column_slice->reversed)
873: $slice_end = self::SLICE_FINISH;
874: else
875: $slice_end = self::SLICE_START;
876:
877: $slice_range->start = $this->pack_name(
878: $column_start, $is_super, $slice_end);
879: } else {
880: $slice_range->start = '';
881: }
882:
883: $column_finish = $column_slice->finish;
884: if ($column_finish !== null and $column_finish != '') {
885: if ($column_slice->reversed)
886: $slice_end = self::SLICE_START;
887: else
888: $slice_end = self::SLICE_FINISH;
889:
890: $slice_range->finish = $this->pack_name(
891: $column_finish, $is_super, $slice_end);
892: } else {
893: $slice_range->finish = '';
894: }
895:
896: $slice_range->reversed = $column_slice->reversed;
897: $slice_range->count = $column_slice->count;
898: } else {
899: $slice_range = new ColumnSlice("", "", $default_count);
900: }
901: $predicate->slice_range = $slice_range;
902: }
903: return $predicate;
904: }
905:
906: protected function create_column_parent($super_column=null) {
907: $column_parent = new ColumnParent();
908: $column_parent->column_family = $this->column_family;
909: if ($super_column !== null) {
910: $column_parent->super_column = $this->pack_name($super_column, true);
911: } else {
912: $column_parent->super_column = null;
913: }
914: return $column_parent;
915: }
916:
917:
918: const NON_SLICE = 0;
919:
920: const SLICE_START = 1;
921:
922: const SLICE_FINISH = 2;
923:
924:
925: public function pack_name($value,
926: $is_supercol_name=false,
927: $slice_end=self::NON_SLICE,
928: $handle_serialize=false) {
929: if (!$this->autopack_names)
930: return $value;
931: if ($slice_end === self::NON_SLICE && ($value === null || $value === "")) {
932: throw new \UnexpectedValueException("Column names may not be null");
933: }
934: if ($is_supercol_name)
935: return $this->supercol_name_type->pack($value, true, $slice_end, $handle_serialize);
936: else
937: return $this->col_name_type->pack($value, true, $slice_end, $handle_serialize);
938: }
939:
940: protected function unpack_name($b, $is_supercol_name=false, $handle_serialize=true) {
941: if (!$this->autopack_names || $b === null)
942: return $b;
943:
944: if ($is_supercol_name)
945: return $this->supercol_name_type->unpack($b, $handle_serialize);
946: else
947: return $this->col_name_type->unpack($b, $handle_serialize);
948: }
949:
950:
951: public function pack_key($key, $handle_serialize=false) {
952: if (!$this->autopack_keys || $key === "")
953: return $key;
954: return $this->key_type->pack($key, true, null, $handle_serialize);
955: }
956:
957:
958: public function unpack_key($b, $handle_serialize=true) {
959: if (!$this->autopack_keys)
960: return $b;
961: return $this->key_type->unpack($b, $handle_serialize);
962: }
963:
964: protected function get_data_type_for_col($col_name) {
965: if (isset($this->col_type_dict[$col_name]))
966: return $this->col_type_dict[$col_name];
967: else
968: return $this->cf_data_type;
969: }
970:
971: protected function pack_value($value, $col_name) {
972: if (!$this->autopack_values)
973: return $value;
974:
975: if (isset($this->col_type_dict[$col_name])) {
976: $dtype = $this->col_type_dict[$col_name];
977: return $dtype->pack($value, false);
978: } else {
979: return $this->cf_data_type->pack($value, false);
980: }
981: }
982:
983: protected function unpack_value($value, $col_name) {
984: if (!$this->autopack_values)
985: return $value;
986:
987: if (isset($this->col_type_dict[$col_name])) {
988: $dtype = $this->col_type_dict[$col_name];
989: return $dtype->unpack($value, false);
990: } else {
991: return $this->cf_data_type->unpack($value, false);
992: }
993: }
994:
995:
996: public function keyslices_to_array($keyslices) {
997: $ret = array();
998: if ($this->return_format == self::DICTIONARY_FORMAT) {
999: foreach($keyslices as $keyslice) {
1000: $key = $this->unpack_key($keyslice->key);
1001: $columns = $keyslice->columns;
1002: $ret[$key] = $this->unpack_coscs($columns);
1003: }
1004: } else {
1005: foreach($keyslices as $keyslice) {
1006: $key = $this->unpack_key($keyslice->key, false);
1007: $columns = $keyslice->columns;
1008: $ret[] = array($key, $this->unpack_coscs($columns));
1009: }
1010: }
1011: return $ret;
1012: }
1013:
1014: protected function unpack_coscs($array_of_coscs) {
1015: if(count($array_of_coscs) == 0)
1016: return $array_of_coscs;
1017:
1018: $format = $this->return_format;
1019: if ($format == self::DICTIONARY_FORMAT) {
1020: return $this->coscs_to_dict($array_of_coscs);
1021: } else if ($format == self::ARRAY_FORMAT) {
1022: return $this->coscs_to_array($array_of_coscs);
1023: } else {
1024: return $this->unpack_coscs_attrs($array_of_coscs);
1025: }
1026: }
1027:
1028: protected function coscs_to_dict($array_of_coscs) {
1029: $ret = array();
1030: $first = $array_of_coscs[0];
1031: if($first->column) {
1032: foreach($array_of_coscs as $cosc) {
1033: $name = $this->unpack_name($cosc->column->name, false);
1034: $value = $this->unpack_value($cosc->column->value, $cosc->column->name);
1035: $ret[$name] = $value;
1036: }
1037: } else if ($first->counter_column) {
1038: foreach($array_of_coscs as $cosc) {
1039: $name = $this->unpack_name($cosc->counter_column->name, false);
1040: $ret[$name] = $cosc->counter_column->value;
1041: }
1042: }
1043: return $ret;
1044: }
1045:
1046: protected function coscs_to_array($array_of_coscs) {
1047: $ret = array();
1048: $first = $array_of_coscs[0];
1049: if($first->column) {
1050: foreach($array_of_coscs as $cosc) {
1051: $name = $this->unpack_name(
1052: $cosc->column->name, false, $handle_serialize=false);
1053: $value = $this->unpack_value($cosc->column->value, $cosc->column->name);
1054: $ret[] = array($name, $value);
1055: }
1056: } else if ($first->counter_column) {
1057: foreach($array_of_coscs as $cosc) {
1058: $name = $this->unpack_name(
1059: $cosc->counter_column->name, false, $handle_serialize=false);
1060: $ret[] = array($name, $cosc->counter_column->value);
1061: }
1062: }
1063: return $ret;
1064: }
1065:
1066: protected function unpack_coscs_attrs($array_of_coscs) {
1067: $ret = array();
1068: $first = $array_of_coscs[0];
1069: if($first->column) {
1070: foreach($array_of_coscs as $cosc) {
1071: $col = $cosc->column;
1072: $col->value = $this->unpack_value($col->value, $col->name);
1073: $col->name = $this->unpack_name(
1074: $col->name, false, $handle_serialize=false);
1075: $ret[] = $col;
1076: }
1077: } else {
1078: foreach($array_of_coscs as $cosc) {
1079: $col = $cosc->counter_column;
1080: $col->name = $this->unpack_name(
1081: $col->name, false, $handle_serialize=false);
1082: $ret[] = $col;
1083: }
1084: }
1085: return $ret;
1086: }
1087:
1088:
1089: public function make_mutation($array, $timestamp=null, $ttl=null) {
1090: $coscs = $this->pack_data($array, $timestamp, $ttl);
1091: $ret = array();
1092: foreach($coscs as $cosc) {
1093: $mutation = new Mutation();
1094: $mutation->column_or_supercolumn = $cosc;
1095: $ret[] = $mutation;
1096: }
1097: return $ret;
1098: }
1099:
1100: protected function pack_data($data, $timestamp=null, $ttl=null) {
1101: if($timestamp === null)
1102: $timestamp = Clock::get_time();
1103:
1104: if ($this->insert_format == self::DICTIONARY_FORMAT) {
1105: return $this->dict_to_coscs($data, $timestamp, $ttl);
1106: } else if ($this->insert_format == self::ARRAY_FORMAT) {
1107: return $this->array_to_coscs($data, $timestamp, $ttl);
1108: } else {
1109: throw new UnexpectedValueException("Bad insert_format selected");
1110: }
1111: }
1112:
1113: protected function dict_to_coscs($data, $timestamp, $ttl) {
1114: $have_counters = $this->has_counters;
1115: $ret = array();
1116: foreach ($data as $name => $value) {
1117: $c_or_sc = new ColumnOrSuperColumn();
1118: if($have_counters) {
1119: $sub = new CounterColumn();
1120: $c_or_sc->counter_column = $sub;
1121: } else {
1122: $sub = new Column();
1123: $c_or_sc->column = $sub;
1124: $sub->timestamp = $timestamp;
1125: $sub->ttl = $this->get_ttl($ttl, $name);
1126: }
1127: $sub->name = $this->pack_name(
1128: $name, false, self::NON_SLICE, true);
1129: $sub->value = $this->pack_value($value, $sub->name);
1130: $ret[] = $c_or_sc;
1131: }
1132: return $ret;
1133: }
1134:
1135: protected function array_to_coscs($data, $timestamp, $ttl) {
1136: $have_counters = $this->has_counters;
1137: $ret = array();
1138: foreach ($data as $col) {
1139: list($name, $value) = $col;
1140: $c_or_sc = new ColumnOrSuperColumn();
1141: if($have_counters) {
1142: $sub = new CounterColumn();
1143: $c_or_sc->counter_column = $sub;
1144: } else {
1145: $sub = new Column();
1146: $c_or_sc->column = $sub;
1147: $sub->timestamp = $timestamp;
1148: $sub->ttl = $this->get_ttl($ttl, $name);
1149: }
1150: $sub->name = $this->pack_name(
1151: $name, false, self::NON_SLICE, false);
1152: $sub->value = $this->pack_value($value, $sub->name);
1153: $ret[] = $c_or_sc;
1154: }
1155: return $ret;
1156: }
1157:
1158: 1159: 1160: 1161: 1162: 1163: 1164:
1165: protected function get_ttl($ttl, $packed_key) {
1166: if(is_array($ttl)){
1167: if(isset($ttl[$packed_key]))
1168: return $ttl[$packed_key];
1169: return null;
1170: } else {
1171: return $ttl;
1172: }
1173: }
1174: }
1175: