1: <?php
2:
3: namespace phpcassa\Batch;
4:
5: use phpcassa\Util\Clock;
6: use cassandra\Deletion;
7: use cassandra\Mutation;
8: use cassandra\SlicePredicate;
9:
10: 11: 12:
13: abstract class AbstractMutator
14: {
15: protected $pool;
16: protected $buffer = array();
17: protected $cl;
18:
19: 20: 21: 22: 23: 24: 25: 26: 27:
28: public function send($consistency_level=null) {
29: if ($consistency_level === null)
30: $wcl = $this->cl;
31: else
32: $wcl = $consistency_level;
33:
34: $mutations = array();
35: foreach ($this->buffer as $mut_set) {
36: list($key, $cf, $cols) = $mut_set;
37:
38: if (isset($mutations[$key])) {
39: $key_muts = $mutations[$key];
40: } else {
41: $key_muts = array();
42: }
43:
44: if (isset($key_muts[$cf])) {
45: $cf_muts = $key_muts[$cf];
46: } else {
47: $cf_muts = array();
48: }
49:
50: $cf_muts = array_merge($cf_muts, $cols);
51: $key_muts[$cf] = $cf_muts;
52: $mutations[$key] = $key_muts;
53: }
54:
55: if (!empty($mutations)) {
56: $this->pool->call('batch_mutate', $mutations, $wcl);
57: }
58: $this->buffer = array();
59: }
60:
61: protected function enqueue($key, $cf, $mutations) {
62: $mut = array($key, $cf->column_family, $mutations);
63: $this->buffer[] = $mut;
64: }
65:
66: protected function insert_cf($column_family, $key, $columns, $timestamp=null, $ttl=null) {
67: if (!empty($columns)) {
68: if ($timestamp === null)
69: $timestamp = Clock::get_time();
70: $key = $column_family->pack_key($key);
71: $mut_list = $column_family->make_mutation($columns, $timestamp, $ttl);
72: $this->enqueue($key, $column_family, $mut_list);
73: }
74: return $this;
75: }
76:
77: protected function remove_cf($column_family, $key, $columns=null, $super_column=null, $timestamp=null) {
78: if ($timestamp === null)
79: $timestamp = Clock::get_time();
80: $deletion = new Deletion();
81: $deletion->timestamp = $timestamp;
82:
83: if ($super_column !== null) {
84: $deletion->super_column = $column_family->pack_name($super_column, true);
85: }
86: if ($columns !== null) {
87: $is_super = $column_family->is_super && $super_column === null;
88: $packed_cols = array();
89: foreach ($columns as $col) {
90: $packed_cols[] = $column_family->pack_name($col, $is_super);
91: }
92: $predicate = new SlicePredicate();
93: $predicate->column_names = $packed_cols;
94: $deletion->predicate = $predicate;
95: }
96:
97: $mutation = new Mutation();
98: $mutation->deletion = $deletion;
99: $packed_key = $column_family->pack_key($key);
100: $this->enqueue($packed_key, $column_family, array($mutation));
101:
102: return $this;
103: }
104: }
105: