My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/local/bin/python2.5

"""The point of this is to offer InfiniteSortedObjectSequence which lets
you build up an "infinitely large" list, and then sort it.

The use case is that you first call Append() any number of times
with a pickelable object, and then you call Sort().

The implementation of Append() uses an in-memory list that spills over to disk
when the specified limits are exceeded.

The implementation of Sort() does an in-memory sort for the easy case, or
it uses the n-way mergesort from http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
for case when the data set is too large to fit into memory.

More advanced options involve how the object are stored to disk (which pickle protocol
and should the files be compressed).
"""

import itertools
import sys
import os
import pickle_io
import mergesort
import zlib
import gzip
import cPickle
import cStringIO
import types

from tr_persist import block

def _EstimateSize(obj):
t = type(obj)
if t == types.TupleType:
res = 8 # overhead est
for inner in obj:
t = type(inner)
if t == types.StringType:
res += len(inner)
elif t == types.IntType:
res += 4
elif t == types.LongType:
res += 8
else:
print "what? ", t
res += 4 * len(inner) # total guess
return res
elif t == types.StringType:
return 8 + len(obj) # assume some overhead
else:
return 8 + 4 * len(obj) # total guess



class InfiniteSortedObjectSequence:
"""Used to sort any number of objects.
We try to store them in memory and when our limit
is reached we spill over to disk.

The usage pattern is you call Append() repeatedly
and then when done call Sort(). If the data is in memory
then Sort() just sorts it, else it performs a mergesort on
the objects that were written to disk.

When this object is destroyed it returns memory and deletes
temporary disk files.
"""

def __init__(self,
limit = 10000,
tmp_base = '/tmp',
maxopen = 100,
maxrecords = 128,
maxdata = 1024 * 1024 * 10,
protocol = 2,
level = 1):
"""limit is the max number of entries to store in memory before flushing to disk
tmp_base is the directory where objects are written when memory is exceeded
maxrecords - max records to put in 1 batch when writing a temp file
maxdata - max bytes to buffer in memory, very rough
"""
if tmp_base is None:
tmp_base = '/tmp'
self.limit = limit
self.tmp_base = tmp_base
self.ar = []
self.files = []
self.maxopen = maxopen
self.maxrecords = maxrecords
self.maxdata = maxdata
self.protocol = protocol
self.level = level
self.running_data = 0



def Append(self, obj):
"""Add one object to the dataset, and spill to disk if memory limits
are exceeded.
"""
self.ar.append(obj)
if self.limit is None or self.maxdata is not None:
self.running_data += _EstimateSize(obj)
if self.running_data >= self.maxdata:
print "NEW FLUSH ", len(self.ar), self.running_data, self.maxdata
self._Flush()
else:
if len(self.ar) >= self.limit:
self._Flush()

def _Flush(self):
"""Write a batch to disk."""
if len(self.ar) == 0:
return
fn = "%s/mr.%d" % (self.tmp_base, len(self.files))
self.files.append(fn)
w = block.ObjectBlockWriter(fn, maxrecords = self.maxrecords,
maxdata = self.maxdata,
protocol = self.protocol)

for ent in sorted(self.ar):
w.write(ent)
w = None
print "INFINITE FILE ", fn, os.stat(fn).st_size
self.ar = []
self.running_data = 0

def Sort(self):
"""Sort the data set. If the data fit in memory then we just call sorted() and return
the list, while if we had to spill to disk we return a generator that returns all items.
"""
if len(self.files) == 0: # special case, small data set
return sorted(self.ar)
self._Flush()
# pairs is generator of (obj, file)
xfiles = self.files
while len(xfiles) > 0:
todo = xfiles[0 : self.maxopen]
rest = xfiles[self.maxopen :]
print "INFINITE TODO ", len(todo), todo
print "INFINITE REST ", len(rest), rest
pairs = mergesort.mergesort([block.ObjectBlockReader(fn) for fn in todo])
if len(rest) == 0:
print "INFINITE A"
# firsts is generator of objects
return (p[0] for p in pairs)
else:
fn = "%s/mr.%d" % (self.tmp_base, len(self.files))
print "INFINITE B ", fn
self.files.append(fn)
rest.append(fn)
xfiles = rest
#self.writer(fn, (p[0] for p in pairs))
w = block.ObjectBlockWriter(fn, maxrecords = self.maxrecords,
maxdata = self.maxdata)
for p in pairs:
w.write(p[0])
print "FILE ", fn, os.stat(fn).st_size
print "BUG"
return None

def __del__(self):
"""Remove all temporary files on destruction."""
for fn in self.files:
os.unlink(fn)

def stats(self):
size = sum([os.stat(f).st_size for f in self.files])
return { 'nfiles' : len(self.files),
'size' : size}



if __name__ == '__main__':
import random
import time

# print _EstimateSize("yes")
# print _EstimateSize( ("alpha", "beta"))
# print _EstimateSize( ("alpha", 3))
# print _EstimateSize( (4L, "beta"))
# sys.exit(1)

random.seed(0)
if True:
ins = InfiniteSortedObjectSequence(limit = 97, maxdata = 1024)
for x in xrange(200):
ins.Append("%04d" % x)
for ents in ins.Sort():
print "ent=", ents
sys.exit(1)

t1 = time.time()
# ins = InfiniteSortedObjectSequence()
ins = InfiniteSortedObjectSequence(reader = ZipBatchPickleReader,
writer = ZipBatchPickleWriter)
for x in xrange(1024000/4):
s = ""
for i in xrange(10):
s += chr(random.randrange(97, 97 + 26))
ins.Append("%06d %s" % (x % 10379, s))
t2 = time.time()
print "sorting.."
ne = 0
mod = 1
for ents in ins.Sort():
ne += 1
if ne % mod == 0:
print "sample: %8d: '%s'" % (ne, ents)
mod *= 2
stats = ins.stats()
ins = None
t3 = time.time()
print "Ne: ", ne
print "Add: %.2f" % (t2-t1)
print "Sort/Print: %.2f" % (t3-t2)
print "Stats: ", stats
Show details Hide details

Change log

r122 by david.spencerian on Aug 27, 2008   Diff
misc
Go to: 
Project members, sign in to write a code review

Older revisions

r71 by david.spencerian on Jul 31, 2008   Diff
misc
r70 by david.spencerian on Jul 31, 2008   Diff
misc
r68 by david.spencerian on Jul 30, 2008   Diff
misc
All revisions of this file

File info

Size: 6371 bytes, 218 lines
Hosted by Google Code