My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package org.wyki.cassandra.pelops;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Deletion;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SuperColumn;
import org.wyki.cassandra.pelops.ThriftPool.Connection;
import static org.wyki.cassandra.pelops.StringHelper.toBytes;

/**
* Facilitates the mutation of data within a Cassandra keyspace: the desired mutations should first be specified by
* calling methods such as <code>writeColumn(...)</code>, which should then be sent to Cassandra in a single batch by
* calling <code>execute(...)</code>. After the desired batch of mutations has been executed, the <code>Mutator</code>
* object can not be re-used.
*
* @author dominicwilliams
*
*/
public class Mutator extends KeyspaceOperand {

/**
* Execute the mutations that have been specified by sending them to Cassandra in a single batch.
* @param cLevel The Cassandra consistency level to be used
* @throws Exception
*/
public void execute(final ConsistencyLevel cLevel) throws Exception {
IOperation operation = new IOperation() {
@Override
public Object execute(Connection conn) throws Exception {
// Send batch mutation job to Thrift connection
conn.getAPI().batch_mutate(keyspace, batch, cLevel);
// Flush connection
conn.flush();
// Nothing to return
return null;
}
};
tryOperation(operation);
}

/**
* Write a column value.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param column The value of the column
*/
public void writeColumn(String rowKey, String colFamily, Column column) {
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
cosc.setColumn(column);
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(cosc);
getMutationList(rowKey, colFamily).add(mutation);
}

/**
* Write a list of columns to a key
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param columns The list of columns to write
*/
public void writeColumns(String rowKey, String colFamily, List<Column> columns) {
for (Column column : columns) {
writeColumn(rowKey, colFamily, column);
}
}

/**
* Write a single sub-column value to a super column. If wish to write multiple sub-columns for a
* super column, then it is more efficient to use <code>writeSubColumns</code>
* @param rowKey The key of the row to modify
* @param colFamily The name of the super column family to operate on
* @param colName The name of the super column
* @param subColumn The sub-column
*/
public void writeSubColumn(String rowKey, String colFamily, String colName, Column subColumn) {
writeSubColumn(rowKey, colFamily, toBytes(colName), subColumn);
}

/**
* Write a single sub-column value to a super column. If wish to write multiple sub-columns for a
* super column, then it is more efficient to use <code>writeSubColumns</code>
* @param rowKey The key of the row to modify
* @param colFamily The name of the super column family to operate on
* @param colName The name of the super column
* @param subColumn The sub-column
*/
public void writeSubColumn(String rowKey, String colFamily, byte[] colName, Column subColumn) {
List<Column> subColumns = new ArrayList<Column>(1);
subColumns.add(subColumn);
writeSubColumns(rowKey, colFamily, colName, subColumns);
}

/**
* Write multiple sub-column values to a super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the super column family to operate on
* @param colName The name of the super column
* @param subColumns A list of the sub-columns to write
*/
public void writeSubColumns(String rowKey, String colFamily, String colName, List<Column> subColumns) {
writeSubColumns(rowKey, colFamily, toBytes(colName), subColumns);
}

/**
* Write multiple sub-column values to a super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the super column family to operate on
* @param colName The name of the super column
* @param subColumns A list of the sub-columns to write
*/
public void writeSubColumns(String rowKey, String colFamily, byte[] colName, List<Column> subColumns) {
SuperColumn scol = new SuperColumn(colName, subColumns);
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
cosc.setSuper_column(scol);
Mutation mutation = new Mutation();
mutation.setColumn_or_supercolumn(cosc);
getMutationList(rowKey, colFamily).add(mutation);
}

/**
* Delete a column or super column
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the column or super column to delete.
*/
public void deleteColumn(String rowKey, String colFamily, String colName) {
deleteColumn(rowKey, colFamily, toBytes(colName));
}

/**
* Delete a column or super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the column or super column to delete.
*/
public void deleteColumn(String rowKey, String colFamily, byte[] colName) {
List<byte[]> colNames = new ArrayList<byte[]>(1);
colNames.add(colName);
deleteColumns(rowKey, colFamily, colNames);
}

/**
* Delete a list of columns or super columns.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colNames The column and/or super column names to delete
*/
public void deleteColumns(String rowKey, String colFamily, byte[]... colNames) {
List<byte[]> colNameList = new ArrayList<byte[]>(Arrays.asList(colNames));
deleteColumns(rowKey, colFamily, colNameList);
}

/**
* Delete a list of columns or super columns.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colNames The column and/or super column names to delete
* @throws UnsupportedEncodingException
*/
public void deleteColumns(String rowKey, String colFamily, String... colNames) {
List<byte[]> colNameList = new ArrayList<byte[]>(colNames.length);
for (String colName : colNames)
colNameList.add(toBytes(colName));
deleteColumns(rowKey, colFamily, colNameList);
}

/**
* Delete a list of columns or super columns.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colNames The column and/or super column names to delete
*/
public void deleteColumns(String rowKey, String colFamily, List<byte[]> colNames) {
SlicePredicate pred = new SlicePredicate();
pred.setColumn_names(colNames);
Deletion deletion = new Deletion(timestamp);
deletion.setPredicate(pred);
Mutation mutation = new Mutation();
mutation.setDeletion(deletion);
getMutationList(rowKey, colFamily).add(mutation);
}

/**
* Delete a column or super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the super column to modify.
* @param subColName The name of the sub-column to delete.
*/
public void deleteSubColumn(String rowKey, String colFamily, String colName, String subColName) {
deleteSubColumn(rowKey, colFamily, toBytes(colName), toBytes(subColName));
}

/**
* Delete a column or super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the super column to modify.
* @param subColName The name of the sub-column to delete.
*/
public void deleteSubColumn(String rowKey, String colFamily, byte[] colName, String subColName) {
deleteSubColumn(rowKey, colFamily, colName, toBytes(subColName));
}

/**
* Delete a column or super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the super column to modify.
* @param subColName The name of the sub-column to delete.
*/
public void deleteSubColumn(String rowKey, String colFamily, String colName, byte[] subColName) {
deleteSubColumn(rowKey, colFamily, toBytes(colName), subColName);
}

/**
* Delete a column or super column.
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param colName The name of the super column to modify.
* @param subColName The name of the sub-column to delete.
*/
public void deleteSubColumn(String rowKey, String colFamily, byte[] colName, byte[] subColName) {
List<byte[]> subColNames = new ArrayList<byte[]>(1);
subColNames.add(subColName);
deleteSubColumns(rowKey, colFamily, colName, subColNames);
}

/**
* Delete a list of sub-columns
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param column name The name of the super column to modify
* @param subColNames The sub-column names to delete
*/
public void deleteSubColumns(String rowKey, String colFamily, String colName, String... subColNames) {
deleteSubColumns(rowKey, colFamily, toBytes(colName), subColNames);
}

/**
* Delete a list of sub-columns
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param column name The name of the super column to modify
* @param subColNames The sub-column names to delete
*/
public void deleteSubColumns(String rowKey, String colFamily, byte[] colName, String... subColNames) {
List<byte[]> subColNamesList = new ArrayList<byte[]>(subColNames.length);
for (String subColName : subColNames)
subColNamesList.add(toBytes(subColName));
deleteSubColumns(rowKey, colFamily, colName, subColNamesList);
}

/**
* Delete a list of sub-columns
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param column name The name of the super column to modify
* @param subColNames The sub-column names to delete
*/
public void deleteSubColumns(String rowKey, String colFamily, String colName, List<byte[]> subColNames) {
deleteSubColumns(rowKey, colFamily, toBytes(colName), subColNames);
}

/**
* Delete a list of sub-columns
* @param rowKey The key of the row to modify
* @param colFamily The name of the column family to modify
* @param column name The name of the super column to modify
* @param subColNames The sub-column names to delete
*/
public void deleteSubColumns(String rowKey, String colFamily, byte[] colName, List<byte[]> subColNames) {
SlicePredicate pred = new SlicePredicate();
pred.setColumn_names(subColNames);
Deletion deletion = new Deletion(timestamp);
deletion.setSuper_column(colName);
deletion.setPredicate(pred);
Mutation mutation = new Mutation();
mutation.setDeletion(deletion);
getMutationList(rowKey, colFamily).add(mutation);
}

/**
* Create new Column object with the time stamp passed to the constructor
* @param colName The column name
* @param colValue The column value
* @return An appropriate <code>Column</code> object
* @throws UnsupportedEncodingException
*/
public Column newColumn(String colName, String colValue) {
return newColumn(colName, toBytes(colValue));
}

/**
* Create new Column object with the time stamp passed to the constructor
* @param colName The column name
* @param colValue The column value
* @return An appropriate <code>Column</code> object
* @throws UnsupportedEncodingException
*/
public Column newColumn(byte[] colName, String colValue) {
return newColumn(colName, toBytes(colValue));
}

/**
* Create new Column object with the time stamp passed to the constructor
* @param colName The column name
* @param colValue The column value
* @return An appropriate <code>Column</code> object
* @throws UnsupportedEncodingException
*/
public Column newColumn(String colName, byte[] colValue) {
return newColumn(toBytes(colName), colValue);
}

/**
* Create new Column object with the time stamp passed to the constructor
* @param colName The column name
* @param colValue The column value
* @return An appropriate <code>Column</code> object
* @throws UnsupportedEncodingException
*/
public Column newColumn(byte[] colName, byte[] colValue) {
return new Column(colName, colValue, timestamp);
}

/**
* Create a list of <code>Column</code> objects.
* @param columns The columns from which to compose the list
* @return A list of <code>Column</code> objects
*/
public List<Column> newColumnList(Column... columns) {
ArrayList<Column> list = new ArrayList<Column>(columns.length);
for (Column column : columns)
list.add(column);
return list;
}

/**
* Get the default time stamp used by this <code>Mutator</code> instance as a byte[].
* @param microsToMillis If the time stamp is UTC microseconds (as is a self-constructed time stamp), whether to convert this into a standard milliseconds value
* @return A byte array containing the time stamp <code>long</code> value
*/
public byte[] getMutationTimestamp(boolean microsToMillis) {
long result = timestamp;
if (microsToMillis)
result /= 1000;
return NumberHelper.toBytes(result);
}

/**
* Get the raw time stamp value used by this <code>Mutator</code> instance.
* @return The raw time stamp value being used
*/
public long getMutationTimestampValue() {
return timestamp;
}

@SuppressWarnings("serial")
class MutationList extends ArrayList<Mutation> {}
@SuppressWarnings("serial")
class MutationsByCf extends HashMap<String, List<Mutation>> {}
@SuppressWarnings("serial")
class MutationsByKey extends HashMap<String, Map<String, List<Mutation>>> {}

private final Map<String, Map<String, List<Mutation>>> batch;
private final long timestamp;

/**
* Create a batch mutation operation.
* @param keyspace The keyspace the batch mutation will modify
*/
protected Mutator(ThriftPool thrift, String keyspace) {
this(thrift, keyspace, System.currentTimeMillis() * 1000);
}

/**
* Create a batch mutation operation.
* @param keyspace The keyspace the batch mutation will modify
* @param timestamp The time stamp to use for the operation. This should be in microseconds.
*/
protected Mutator(ThriftPool thrift, String keyspace, long timestamp) {
super(thrift, keyspace);
this.timestamp = timestamp;
batch = new MutationsByKey();
}

private MutationList getMutationList(String key, String colFamily) {
MutationsByCf mutsByCf = (MutationsByCf) batch.get(key);
if (mutsByCf == null) {
mutsByCf = new MutationsByCf();
batch.put(key, mutsByCf);
}
MutationList mutList = (MutationList) mutsByCf.get(colFamily);
if (mutList == null) {
mutList = new MutationList();
mutsByCf.put(colFamily, mutList);
}
return mutList;
}
}

Change log

r4 by thedwilliams on Jun 8, 2010   Diff
[No log message]
Go to: 
Project members, sign in to write a code review

Older revisions

r3 by thedwilliams on Jun 8, 2010   Diff
[No log message]
r2 by thedwilliams on Jun 8, 2010   Diff
[No log message]
All revisions of this file

File info

Size: 18275 bytes, 410 lines
Powered by Google Project Hosting