My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes    
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
/*
* Copyright 2010 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.appengine.tools.mapreduce;

import com.google.appengine.api.datastore.DatastoreService;
import com.google.appengine.api.datastore.Entity;
import com.google.appengine.api.datastore.EntityTranslator;
import com.google.appengine.api.datastore.Key;
import com.google.appengine.api.datastore.KeyFactory;

import java.util.ArrayList;
import java.util.List;

/**
* DatastoreMutationPool allows you to pool datastore operations such that
* they are applied in batches requiring fewer datastore API calls. Mutations
* are accumulated until they reach a count limit on the number of unflushed
* mutations, until they reach a size limit on the byte count of unflushed
* mutations, or until a manual flush is requested.
*
*/
public class DatastoreMutationPool {
/**
* Default number of operations to batch before automatically flushing
*/
public static final int DEFAULT_COUNT_LIMIT = 100;

/**
* Default size (in bytes) of operations to batch before automatically
* flushing.
*
* <p>The current value is 256 KB.
*/
public static final int DEFAULT_SIZE_LIMIT = 1 << 18;

private DatastoreService ds;

private int countLimit;
private int sizeLimit;

private List<Entity> puts = new ArrayList<Entity>();
private int putsSize;

private List<Key> deletes = new ArrayList<Key>();
private int deletesSize;

/**
* Initialize a datastore mutation pool with the default batch mutation count
* limit of {@value #DEFAULT_COUNT_LIMIT} and the default batch mutation
* size limit of {@value #DEFAULT_SIZE_LIMIT}.
*/
public DatastoreMutationPool(DatastoreService ds) {
this(ds, DEFAULT_COUNT_LIMIT, DEFAULT_SIZE_LIMIT);
}

/**
* Initialize a batch datastore mutation with the given count and size limits.
*/
public DatastoreMutationPool(DatastoreService ds, int countLimit,
int sizeLimit) {
this.ds = ds;
this.countLimit = countLimit;
this.sizeLimit = sizeLimit;
}

/**
* Adds a mutation inserting the given {@code entity}.
*/
public void put(Entity entity) {
int putSize = EntityTranslator.convertToPb(entity).getSerializedSize();

// Do this before the add so that we guarantee that size is never > sizeLimit
if (putsSize + putSize >= sizeLimit) {
flushPuts();
}

putsSize += putSize;
puts.add(entity);

if (puts.size() >= countLimit) {
flushPuts();
}
}

private void flushPuts() {
ds.put(puts);
puts.clear();
putsSize = 0;
}

/**
* Adds a mutation deleting the entity corresponding to {@code key}.
*/
public void delete(Key key) {
// This is probably a serious overestimation, but I can't see a good
// way to find the size in the public API.
int deleteSize = KeyFactory.keyToString(key).length();

// Do this before the add so that we guarantee that size is never > sizeLimit
if (deletesSize + deleteSize >= sizeLimit) {
flushDeletes();
}

deletesSize += deleteSize;
deletes.add(key);

if (deletes.size() >= countLimit) {
flushDeletes();
}
}

private void flushDeletes() {
ds.delete(deletes);
deletes.clear();
deletesSize = 0;
}

/**
* Flushes all outstanding mutations.
*/
public void flush() {
if (puts.size() > 0) {
flushPuts();
}

if (deletes.size() > 0) {
flushDeletes();
}
}
}

Change log

r207 by mike.aizatsky on Jul 15, 2011   Diff
Due to some problems this CL includes more
than one change:

- MapReduceServlet has got some protection
against taskqueue executed the same
  task more than once (yes, it can happen)
- Added some code which was not exported
before
- Lots of whitespace and author tag
changes due to some internal tool changes.


Go to: 
Project members, sign in to write a code review

Older revisions

r15 by frewstore on Jun 3, 2010   Diff
Initial checkin of the Java mapper
All revisions of this file

File info

Size: 3970 bytes, 144 lines
Powered by Google Project Hosting