My favorites | Sign in
Project Home Downloads Wiki Issues Source
Checkout   Browse   Changes  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
import unittest
import pickle
import cPickle
import cStringIO
import pickletools
import copy_reg

from test.test_support import TestFailed, have_unicode, TESTFN

# Tests that try a number of pickle protocols should have a
# for proto in protocols:
# kind of outer loop.
assert pickle.HIGHEST_PROTOCOL == cPickle.HIGHEST_PROTOCOL == 2
protocols = range(pickle.HIGHEST_PROTOCOL + 1)

# Copy of test.test_support.run_with_locale. This is needed to support Python
# 2.4, which didn't include it.
def run_with_locale(catstr, *locales):
def decorator(func):
def inner(*args, **kwds):
try:
import locale
category = getattr(locale, catstr)
orig_locale = locale.setlocale(category)
except AttributeError:
# if the test author gives us an invalid category string
raise
except:
# cannot retrieve original locale, so do nothing
locale = orig_locale = None
else:
for loc in locales:
try:
locale.setlocale(category, loc)
break
except:
pass

# now run the function, resetting the locale on exceptions
try:
return func(*args, **kwds)
finally:
if locale and orig_locale:
locale.setlocale(category, orig_locale)
inner.func_name = func.func_name
inner.__doc__ = func.__doc__
return inner
return decorator


# Return True if opcode code appears in the pickle, else False.
def opcode_in_pickle(code, pickle):
for op, dummy, dummy in pickletools.genops(pickle):
if op.code == code:
return True
return False

# Return the number of times opcode code appears in pickle.
def count_opcode(code, pickle):
n = 0
for op, dummy, dummy in pickletools.genops(pickle):
if op.code == code:
n += 1
return n

# We can't very well test the extension registry without putting known stuff
# in it, but we have to be careful to restore its original state. Code
# should do this:
#
# e = ExtensionSaver(extension_code)
# try:
# fiddle w/ the extension registry's stuff for extension_code
# finally:
# e.restore()

class ExtensionSaver:
# Remember current registration for code (if any), and remove it (if
# there is one).
def __init__(self, code):
self.code = code
if code in copy_reg._inverted_registry:
self.pair = copy_reg._inverted_registry[code]
copy_reg.remove_extension(self.pair[0], self.pair[1], code)
else:
self.pair = None

# Restore previous registration for code.
def restore(self):
code = self.code
curpair = copy_reg._inverted_registry.get(code)
if curpair is not None:
copy_reg.remove_extension(curpair[0], curpair[1], code)
pair = self.pair
if pair is not None:
copy_reg.add_extension(pair[0], pair[1], code)

class C:
def __cmp__(self, other):
return cmp(self.__dict__, other.__dict__)

import __main__
__main__.C = C
C.__module__ = "__main__"

class myint(int):
def __init__(self, x):
self.str = str(x)

class initarg(C):

def __init__(self, a, b):
self.a = a
self.b = b

def __getinitargs__(self):
return self.a, self.b

class metaclass(type):
pass

class use_metaclass(object):
__metaclass__ = metaclass

# DATA0 .. DATA2 are the pickles we expect under the various protocols, for
# the object returned by create_data().

# break into multiple strings to avoid confusing font-lock-mode
DATA0 = """(lp1
I0
aL1L
aF2
ac__builtin__
complex
p2
""" + \
"""(F3
F0
tRp3
aI1
aI-1
aI255
aI-255
aI-256
aI65535
aI-65535
aI-65536
aI2147483647
aI-2147483647
aI-2147483648
a""" + \
"""(S'abc'
p4
g4
""" + \
"""(i__main__
C
p5
""" + \
"""(dp6
S'foo'
p7
I1
sS'bar'
p8
I2
sbg5
tp9
ag9
aI5
a.
"""

# Disassembly of DATA0.
DATA0_DIS = """\
0: ( MARK
1: l LIST (MARK at 0)
2: p PUT 1
5: I INT 0
8: a APPEND
9: L LONG 1L
13: a APPEND
14: F FLOAT 2.0
17: a APPEND
18: c GLOBAL '__builtin__ complex'
39: p PUT 2
42: ( MARK
43: F FLOAT 3.0
46: F FLOAT 0.0
49: t TUPLE (MARK at 42)
50: R REDUCE
51: p PUT 3
54: a APPEND
55: I INT 1
58: a APPEND
59: I INT -1
63: a APPEND
64: I INT 255
69: a APPEND
70: I INT -255
76: a APPEND
77: I INT -256
83: a APPEND
84: I INT 65535
91: a APPEND
92: I INT -65535
100: a APPEND
101: I INT -65536
109: a APPEND
110: I INT 2147483647
122: a APPEND
123: I INT -2147483647
136: a APPEND
137: I INT -2147483648
150: a APPEND
151: ( MARK
152: S STRING 'abc'
159: p PUT 4
162: g GET 4
165: ( MARK
166: i INST '__main__ C' (MARK at 165)
178: p PUT 5
181: ( MARK
182: d DICT (MARK at 181)
183: p PUT 6
186: S STRING 'foo'
193: p PUT 7
196: I INT 1
199: s SETITEM
200: S STRING 'bar'
207: p PUT 8
210: I INT 2
213: s SETITEM
214: b BUILD
215: g GET 5
218: t TUPLE (MARK at 151)
219: p PUT 9
222: a APPEND
223: g GET 9
226: a APPEND
227: I INT 5
230: a APPEND
231: . STOP
highest protocol among opcodes = 0
"""

DATA1 = (']q\x01(K\x00L1L\nG@\x00\x00\x00\x00\x00\x00\x00'
'c__builtin__\ncomplex\nq\x02(G@\x08\x00\x00\x00\x00\x00'
'\x00G\x00\x00\x00\x00\x00\x00\x00\x00tRq\x03K\x01J\xff\xff'
'\xff\xffK\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xff'
'J\x01\x00\xff\xffJ\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00'
'\x00\x80J\x00\x00\x00\x80(U\x03abcq\x04h\x04(c__main__\n'
'C\nq\x05oq\x06}q\x07(U\x03fooq\x08K\x01U\x03barq\tK\x02ubh'
'\x06tq\nh\nK\x05e.'
)

# Disassembly of DATA1.
DATA1_DIS = """\
0: ] EMPTY_LIST
1: q BINPUT 1
3: ( MARK
4: K BININT1 0
6: L LONG 1L
10: G BINFLOAT 2.0
19: c GLOBAL '__builtin__ complex'
40: q BINPUT 2
42: ( MARK
43: G BINFLOAT 3.0
52: G BINFLOAT 0.0
61: t TUPLE (MARK at 42)
62: R REDUCE
63: q BINPUT 3
65: K BININT1 1
67: J BININT -1
72: K BININT1 255
74: J BININT -255
79: J BININT -256
84: M BININT2 65535
87: J BININT -65535
92: J BININT -65536
97: J BININT 2147483647
102: J BININT -2147483647
107: J BININT -2147483648
112: ( MARK
113: U SHORT_BINSTRING 'abc'
118: q BINPUT 4
120: h BINGET 4
122: ( MARK
123: c GLOBAL '__main__ C'
135: q BINPUT 5
137: o OBJ (MARK at 122)
138: q BINPUT 6
140: } EMPTY_DICT
141: q BINPUT 7
143: ( MARK
144: U SHORT_BINSTRING 'foo'
149: q BINPUT 8
151: K BININT1 1
153: U SHORT_BINSTRING 'bar'
158: q BINPUT 9
160: K BININT1 2
162: u SETITEMS (MARK at 143)
163: b BUILD
164: h BINGET 6
166: t TUPLE (MARK at 112)
167: q BINPUT 10
169: h BINGET 10
171: K BININT1 5
173: e APPENDS (MARK at 3)
174: . STOP
highest protocol among opcodes = 1
"""

DATA2 = ('\x80\x02]q\x01(K\x00\x8a\x01\x01G@\x00\x00\x00\x00\x00\x00\x00'
'c__builtin__\ncomplex\nq\x02G@\x08\x00\x00\x00\x00\x00\x00G\x00'
'\x00\x00\x00\x00\x00\x00\x00\x86Rq\x03K\x01J\xff\xff\xff\xffK'
'\xffJ\x01\xff\xff\xffJ\x00\xff\xff\xffM\xff\xffJ\x01\x00\xff\xff'
'J\x00\x00\xff\xffJ\xff\xff\xff\x7fJ\x01\x00\x00\x80J\x00\x00\x00'
'\x80(U\x03abcq\x04h\x04(c__main__\nC\nq\x05oq\x06}q\x07(U\x03foo'
'q\x08K\x01U\x03barq\tK\x02ubh\x06tq\nh\nK\x05e.')

# Disassembly of DATA2.
DATA2_DIS = """\
0: \x80 PROTO 2
2: ] EMPTY_LIST
3: q BINPUT 1
5: ( MARK
6: K BININT1 0
8: \x8a LONG1 1L
11: G BINFLOAT 2.0
20: c GLOBAL '__builtin__ complex'
41: q BINPUT 2
43: G BINFLOAT 3.0
52: G BINFLOAT 0.0
61: \x86 TUPLE2
62: R REDUCE
63: q BINPUT 3
65: K BININT1 1
67: J BININT -1
72: K BININT1 255
74: J BININT -255
79: J BININT -256
84: M BININT2 65535
87: J BININT -65535
92: J BININT -65536
97: J BININT 2147483647
102: J BININT -2147483647
107: J BININT -2147483648
112: ( MARK
113: U SHORT_BINSTRING 'abc'
118: q BINPUT 4
120: h BINGET 4
122: ( MARK
123: c GLOBAL '__main__ C'
135: q BINPUT 5
137: o OBJ (MARK at 122)
138: q BINPUT 6
140: } EMPTY_DICT
141: q BINPUT 7
143: ( MARK
144: U SHORT_BINSTRING 'foo'
149: q BINPUT 8
151: K BININT1 1
153: U SHORT_BINSTRING 'bar'
158: q BINPUT 9
160: K BININT1 2
162: u SETITEMS (MARK at 143)
163: b BUILD
164: h BINGET 6
166: t TUPLE (MARK at 112)
167: q BINPUT 10
169: h BINGET 10
171: K BININT1 5
173: e APPENDS (MARK at 5)
174: . STOP
highest protocol among opcodes = 2
"""

def create_data():
c = C()
c.foo = 1
c.bar = 2
x = [0, 1L, 2.0, 3.0+0j]
# Append some integer test cases at cPickle.c's internal size
# cutoffs.
uint1max = 0xff
uint2max = 0xffff
int4max = 0x7fffffff
x.extend([1, -1,
uint1max, -uint1max, -uint1max-1,
uint2max, -uint2max, -uint2max-1,
int4max, -int4max, -int4max-1])
y = ('abc', 'abc', c, c)
x.append(y)
x.append(y)
x.append(5)
return x

class AbstractPickleTests(unittest.TestCase):
# Subclass must define self.dumps, self.loads, self.error.

_testdata = create_data()

def setUp(self):
pass

def dump(self, arg, buf, proto=0):
data = self.dumps(arg, proto)
buf.write(data)

def test_misc(self):
# test various datatypes not tested by testdata
for proto in protocols:
x = myint(4)
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)

x = (1, ())
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)

x = initarg(1, x)
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)

# XXX test __reduce__ protocol?

def test_roundtrip_equality(self):
expected = self._testdata
for proto in protocols:
s = self.dumps(expected, proto)
got = self.loads(s)
self.assertEqual(expected, got)

def test_load_from_canned_string(self):
expected = self._testdata
for canned in DATA0, DATA1, DATA2:
got = self.loads(canned)
self.assertEqual(expected, got)

# There are gratuitous differences between pickles produced by
# pickle and cPickle, largely because cPickle starts PUT indices at
# 1 and pickle starts them at 0. See XXX comment in cPickle's put2() --
# there's a comment with an exclamation point there whose meaning
# is a mystery. cPickle also suppresses PUT for objects with a refcount
# of 1.
def dont_test_disassembly(self):
from pickletools import dis

for proto, expected in (0, DATA0_DIS), (1, DATA1_DIS):
s = self.dumps(self._testdata, proto)
filelike = cStringIO.StringIO()
dis(s, out=filelike)
got = filelike.getvalue()
self.assertEqual(expected, got)

def test_recursive_list(self):
l = []
l.append(l)
for proto in protocols:
s = self.dumps(l, proto)
x = self.loads(s)
self.assertEqual(len(x), 1)
self.assert_(x is x[0])

def test_recursive_dict(self):
d = {}
d[1] = d
for proto in protocols:
s = self.dumps(d, proto)
x = self.loads(s)
self.assertEqual(x.keys(), [1])
self.assert_(x[1] is x)

def test_recursive_inst(self):
i = C()
i.attr = i
for proto in protocols:
s = self.dumps(i, 2)
x = self.loads(s)
self.assertEqual(dir(x), dir(i))
self.assert_(x.attr is x)

def test_recursive_multi(self):
l = []
d = {1:l}
i = C()
i.attr = d
l.append(i)
for proto in protocols:
s = self.dumps(l, proto)
x = self.loads(s)
self.assertEqual(len(x), 1)
self.assertEqual(dir(x[0]), dir(i))
self.assertEqual(x[0].attr.keys(), [1])
self.assert_(x[0].attr[1] is x)

def test_garyp(self):
self.assertRaises(self.error, self.loads, 'garyp')

def test_insecure_strings(self):
insecure = ["abc", "2 + 2", # not quoted
#"'abc' + 'def'", # not a single quoted string
"'abc", # quote is not closed
"'abc\"", # open quote and close quote don't match
"'abc' ?", # junk after close quote
"'\\'", # trailing backslash
# some tests of the quoting rules
#"'abc\"\''",
#"'\\\\a\'\'\'\\\'\\\\\''",
]
for s in insecure:
buf = "S" + s + "\012p0\012."
self.assertRaises(ValueError, self.loads, buf)

if have_unicode:
def test_unicode(self):
endcases = [unicode(''), unicode('<\\u>'), unicode('<\\\u1234>'),
unicode('<\n>'), unicode('<\\>')]
for proto in protocols:
for u in endcases:
p = self.dumps(u, proto)
u2 = self.loads(p)
self.assertEqual(u2, u)

def test_ints(self):
import sys
for proto in protocols:
n = sys.maxint
while n:
for expected in (-n, n):
s = self.dumps(expected, proto)
n2 = self.loads(s)
self.assertEqual(expected, n2)
n = n >> 1

def test_maxint64(self):
maxint64 = (1L << 63) - 1
data = 'I' + str(maxint64) + '\n.'
got = self.loads(data)
self.assertEqual(got, maxint64)

# Try too with a bogus literal.
data = 'I' + str(maxint64) + 'JUNK\n.'
self.assertRaises(ValueError, self.loads, data)

def test_long(self):
for proto in protocols:
# 256 bytes is where LONG4 begins.
for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257:
nbase = 1L << nbits
for npos in nbase-1, nbase, nbase+1:
for n in npos, -npos:
pickle = self.dumps(n, proto)
got = self.loads(pickle)
self.assertEqual(n, got)
# Try a monster. This is quadratic-time in protos 0 & 1, so don't
# bother with those.
nbase = long("deadbeeffeedface", 16)
nbase += nbase << 1000000
for n in nbase, -nbase:
p = self.dumps(n, 2)
got = self.loads(p)
self.assertEqual(n, got)

@run_with_locale('LC_ALL', 'de_DE', 'fr_FR')
def test_float_format(self):
# make sure that floats are formatted locale independent
self.assertEqual(self.dumps(1.2)[0:3], 'F1.')

def test_reduce(self):
pass

def test_getinitargs(self):
pass

def test_metaclass(self):
a = use_metaclass()
for proto in protocols:
s = self.dumps(a, proto)
b = self.loads(s)
self.assertEqual(a.__class__, b.__class__)

def test_structseq(self):
import time
import os

t = time.localtime()
for proto in protocols:
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "stat"):
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)
if hasattr(os, "statvfs"):
t = os.statvfs(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assertEqual(t, u)

# Tests for protocol 2

def test_proto(self):
build_none = pickle.NONE + pickle.STOP
for proto in protocols:
expected = build_none
if proto >= 2:
expected = pickle.PROTO + chr(proto) + expected
p = self.dumps(None, proto)
self.assertEqual(p, expected)

oob = protocols[-1] + 1 # a future protocol
badpickle = pickle.PROTO + chr(oob) + build_none
try:
self.loads(badpickle)
except ValueError, detail:
self.failUnless(str(detail).startswith(
"unsupported pickle protocol"))
else:
self.fail("expected bad protocol number to raise ValueError")

def test_long1(self):
x = 12345678910111213141516178920L
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2)

def test_long4(self):
x = 12345678910111213141516178920L << (256*8)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
self.assertEqual(opcode_in_pickle(pickle.LONG4, s), proto >= 2)

def test_short_tuples(self):
# Map (proto, len(tuple)) to expected opcode.
expected_opcode = {(0, 0): pickle.TUPLE,
(0, 1): pickle.TUPLE,
(0, 2): pickle.TUPLE,
(0, 3): pickle.TUPLE,
(0, 4): pickle.TUPLE,

(1, 0): pickle.EMPTY_TUPLE,
(1, 1): pickle.TUPLE,
(1, 2): pickle.TUPLE,
(1, 3): pickle.TUPLE,
(1, 4): pickle.TUPLE,

(2, 0): pickle.EMPTY_TUPLE,
(2, 1): pickle.TUPLE1,
(2, 2): pickle.TUPLE2,
(2, 3): pickle.TUPLE3,
(2, 4): pickle.TUPLE,
}
a = ()
b = (1,)
c = (1, 2)
d = (1, 2, 3)
e = (1, 2, 3, 4)
for proto in protocols:
for x in a, b, c, d, e:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y, (proto, x, s, y))
expected = expected_opcode[proto, len(x)]
self.assertEqual(opcode_in_pickle(expected, s), True)

def test_singletons(self):
# Map (proto, singleton) to expected opcode.
expected_opcode = {(0, None): pickle.NONE,
(1, None): pickle.NONE,
(2, None): pickle.NONE,

(0, True): pickle.INT,
(1, True): pickle.INT,
(2, True): pickle.NEWTRUE,

(0, False): pickle.INT,
(1, False): pickle.INT,
(2, False): pickle.NEWFALSE,
}
for proto in protocols:
for x in None, False, True:
s = self.dumps(x, proto)
y = self.loads(s)
self.assert_(x is y, (proto, x, s, y))
expected = expected_opcode[proto, x]
self.assertEqual(opcode_in_pickle(expected, s), True)

def test_newobj_tuple(self):
x = MyTuple([1, 2, 3])
x.foo = 42
x.bar = "hello"
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(tuple(x), tuple(y))
self.assertEqual(x.__dict__, y.__dict__)

def test_newobj_list(self):
x = MyList([1, 2, 3])
x.foo = 42
x.bar = "hello"
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(list(x), list(y))
self.assertEqual(x.__dict__, y.__dict__)

def test_newobj_generic(self):
for proto in protocols:
for C in myclasses:
B = C.__base__
x = C(C.sample)
x.foo = 42
s = self.dumps(x, proto)
y = self.loads(s)
detail = (proto, C, B, x, y, type(y))
self.assertEqual(B(x), B(y), detail)
self.assertEqual(x.__dict__, y.__dict__, detail)

# Register a type with copy_reg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
# (EXT[124]) under proto 2, and not in proto 1.

def produce_global_ext(self, extcode, opcode):
e = ExtensionSaver(extcode)
try:
copy_reg.add_extension(__name__, "MyList", extcode)
x = MyList([1, 2, 3])
x.foo = 42
x.bar = "hello"

# Dump using protocol 1 for comparison.
s1 = self.dumps(x, 1)
self.assert_(__name__ in s1)
self.assert_("MyList" in s1)
self.assertEqual(opcode_in_pickle(opcode, s1), False)

y = self.loads(s1)
self.assertEqual(list(x), list(y))
self.assertEqual(x.__dict__, y.__dict__)

# Dump using protocol 2 for test.
s2 = self.dumps(x, 2)
self.assert_(__name__ not in s2)
self.assert_("MyList" not in s2)
self.assertEqual(opcode_in_pickle(opcode, s2), True)

y = self.loads(s2)
self.assertEqual(list(x), list(y))
self.assertEqual(x.__dict__, y.__dict__)

finally:
e.restore()

def test_global_ext1(self):
self.produce_global_ext(0x00000001, pickle.EXT1) # smallest EXT1 code
self.produce_global_ext(0x000000ff, pickle.EXT1) # largest EXT1 code

def test_global_ext2(self):
self.produce_global_ext(0x00000100, pickle.EXT2) # smallest EXT2 code
self.produce_global_ext(0x0000ffff, pickle.EXT2) # largest EXT2 code
self.produce_global_ext(0x0000abcd, pickle.EXT2) # check endianness

def test_global_ext4(self):
self.produce_global_ext(0x00010000, pickle.EXT4) # smallest EXT4 code
self.produce_global_ext(0x7fffffff, pickle.EXT4) # largest EXT4 code
self.produce_global_ext(0x12abcdef, pickle.EXT4) # check endianness

def test_list_chunking(self):
n = 10 # too small to chunk
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
self.assertEqual(num_appends, proto > 0)

n = 2500 # expect at least two chunks when proto > 0
x = range(n)
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_appends = count_opcode(pickle.APPENDS, s)
if proto == 0:
self.assertEqual(num_appends, 0)
else:
self.failUnless(num_appends >= 2)

def test_dict_chunking(self):
n = 10 # too small to chunk
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
self.assertEqual(num_setitems, proto > 0)

n = 2500 # expect at least two chunks when proto > 0
x = dict.fromkeys(range(n))
for proto in protocols:
s = self.dumps(x, proto)
y = self.loads(s)
self.assertEqual(x, y)
num_setitems = count_opcode(pickle.SETITEMS, s)
if proto == 0:
self.assertEqual(num_setitems, 0)
else:
self.failUnless(num_setitems >= 2)

def test_load_empty_string(self):
self.assertRaises(EOFError, self.loads, "")

def test_simple_newobj(self):
x = object.__new__(SimpleNewObj) # avoid __init__
x.abc = 666
for proto in protocols:
s = self.dumps(x, proto)
self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s), proto >= 2)
y = self.loads(s) # will raise TypeError if __init__ called
self.assertEqual(y.abc, 666)
self.assertEqual(x.__dict__, y.__dict__)

def test_newobj_list_slots(self):
x = SlotList([1, 2, 3])
x.foo = 42
x.bar = "hello"
s = self.dumps(x, 2)
y = self.loads(s)
self.assertEqual(list(x), list(y))
self.assertEqual(x.__dict__, y.__dict__)
self.assertEqual(x.foo, y.foo)
self.assertEqual(x.bar, y.bar)

def test_reduce_overrides_default_reduce_ex(self):
for proto in 0, 1, 2:
x = REX_one()
self.assertEqual(x._reduce_called, 0)
s = self.dumps(x, proto)
self.assertEqual(x._reduce_called, 1)
y = self.loads(s)
self.assertEqual(y._reduce_called, 0)

def test_reduce_ex_called(self):
for proto in 0, 1, 2:
x = REX_two()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
self.assertEqual(x._proto, proto)
y = self.loads(s)
self.assertEqual(y._proto, None)

def test_reduce_ex_overrides_reduce(self):
for proto in 0, 1, 2:
x = REX_three()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
self.assertEqual(x._proto, proto)
y = self.loads(s)
self.assertEqual(y._proto, None)

def test_reduce_ex_calls_base(self):
for proto in 0, 1, 2:
x = REX_four()
self.assertEqual(x._proto, None)
s = self.dumps(x, proto)
self.assertEqual(x._proto, proto)
y = self.loads(s)
self.assertEqual(y._proto, proto)

def test_reduce_calls_base(self):
for proto in 0, 1, 2:
x = REX_five()
self.assertEqual(x._reduce_called, 0)
s = self.dumps(x, proto)
self.assertEqual(x._reduce_called, 1)
y = self.loads(s)
self.assertEqual(y._reduce_called, 1)

def test_reduce_bad_iterator(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
# are not iterators
class C(object):
def __reduce__(self):
# 4th item is not an iterator
return list, (), None, [], None
class D(object):
def __reduce__(self):
# 5th item is not an iterator
return dict, (), None, None, []

# Protocol 0 is less strict and also accept iterables.
for proto in 0, 1, 2:
try:
self.dumps(C(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass
try:
self.dumps(D(), proto)
except (AttributeError, pickle.PickleError, cPickle.PickleError):
pass

def test_many_puts_and_gets(self):
# Test that internal data structures correctly deal with lots of
# puts/gets. There were once bugs in cPickle related to this.
keys = ("aaa" + str(i) for i in xrange(100))
large_dict = dict((k, [4, 5, 6]) for k in keys)
obj = [dict(large_dict), dict(large_dict), dict(large_dict)]

for proto in [0, 1, 2]:
dumped = self.dumps(obj, proto)
loaded = self.loads(dumped)
self.assertEqual(loaded, obj,
"Failed protocol %d: %r != %r"
% (proto, obj, loaded))

def test_unpickle_multiple_objs_from_same_file(self):
data1 = ["abcdefg", "abcdefg", 44]
data2 = range(10)
f = cStringIO.StringIO()
self.dump(data1, f)
self.dump(data2, f)
f.seek(0)

self.assertEqual(self.load(f), data1)
self.assertEqual(self.load(f), data2)


# Test classes for reduce_ex

class REX_one(object):
_reduce_called = 0
def __reduce__(self):
self._reduce_called = 1
return REX_one, ()
# No __reduce_ex__ here, but inheriting it from object

class REX_two(object):
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return REX_two, ()
# No __reduce__ here, but inheriting it from object

class REX_three(object):
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return REX_two, ()
def __reduce__(self):
raise TestFailed, "This __reduce__ shouldn't be called"

class REX_four(object):
_proto = None
def __reduce_ex__(self, proto):
self._proto = proto
return object.__reduce_ex__(self, proto)
# Calling base class method should succeed

class REX_five(object):
_reduce_called = 0
def __reduce__(self):
self._reduce_called = 1
return object.__reduce__(self)
# This one used to fail with infinite recursion

# Test classes for newobj

class MyInt(int):
sample = 1

class MyLong(long):
sample = 1L

class MyFloat(float):
sample = 1.0

class MyComplex(complex):
sample = 1.0 + 0.0j

class MyStr(str):
sample = "hello"

class MyUnicode(unicode):
sample = u"hello \u1234"

class MyTuple(tuple):
sample = (1, 2, 3)

class MyList(list):
sample = [1, 2, 3]

class MyDict(dict):
sample = {"a": 1, "b": 2}

myclasses = [MyInt, MyLong, MyFloat,
MyComplex,
MyStr, MyUnicode,
MyTuple, MyList, MyDict]


class SlotList(MyList):
__slots__ = ["foo"]

class SimpleNewObj(object):
def __init__(self, a, b, c):
# raise an error, to make sure this isn't called
raise TypeError("SimpleNewObj.__init__() didn't expect to get called")

class AbstractPickleModuleTests(unittest.TestCase):

def test_dump_closed_file(self):
import os
f = open(TESTFN, "w")
try:
f.close()
self.assertRaises(ValueError, self.module.dump, 123, f)
finally:
os.remove(TESTFN)

def test_load_closed_file(self):
import os
f = open(TESTFN, "w")
try:
f.close()
self.assertRaises(ValueError, self.module.dump, 123, f)
finally:
os.remove(TESTFN)

def test_load_from_and_dump_to_file(self):
stream = cStringIO.StringIO()
data = [123, {}, 124]
self.module.dump(data, stream)
stream.seek(0)
unpickled = self.module.load(stream)
self.assertEqual(unpickled, data)

def test_highest_protocol(self):
# Of course this needs to be changed when HIGHEST_PROTOCOL changes.
self.assertEqual(self.module.HIGHEST_PROTOCOL, 2)

def test_callapi(self):
f = cStringIO.StringIO()
# With and without keyword arguments
self.module.dump(123, f, -1)
self.module.dump(123, file=f, protocol=-1)
self.module.dumps(123, -1)
self.module.dumps(123, protocol=-1)
self.module.Pickler(f, -1)
self.module.Pickler(f, protocol=-1)

class AbstractPersistentPicklerTests(unittest.TestCase):

# This class defines persistent_id() and persistent_load()
# functions that should be used by the pickler. All even integers
# are pickled using persistent ids.

def persistent_id(self, object):
if isinstance(object, int) and object % 2 == 0:
self.id_count += 1
return str(object)
else:
return None

def persistent_load(self, oid):
self.load_count += 1
object = int(oid)
assert object % 2 == 0
return object

def test_persistence(self):
self.id_count = 0
self.load_count = 0
L = range(10)
self.assertEqual(self.loads(self.dumps(L)), L)
self.assertEqual(self.id_count, 5)
self.assertEqual(self.load_count, 5)

def test_bin_persistence(self):
self.id_count = 0
self.load_count = 0
L = range(10)
self.assertEqual(self.loads(self.dumps(L, 1)), L)
self.assertEqual(self.id_count, 5)
self.assertEqual(self.load_count, 5)

class AbstractPicklerUnpicklerObjectTests(unittest.TestCase):

pickler_class = None
unpickler_class = None

def setUp(self):
assert self.pickler_class
assert self.unpickler_class

def dumps(self, obj, proto=0):
buf = cStringIO.StringIO()
pickler = self.pickler_class(buf, proto)
pickler.dump(obj)
return buf.getvalue()

def loads(self, input):
unpickler = self.unpickler_class(cStringIO.StringIO(input))
return unpickler.load()

def test_clear_pickler_memo(self):
# To test whether clear_memo() has any effect, we pickle an object,
# then pickle it again without clearing the memo; the two serialized
# forms should be different. If we clear_memo() and then pickle the
# object again, the third serialized form should be identical to the
# first one we obtained.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)

pickler.dump(data)
first_pickled = f.getvalue()

# Reset StringIO object.
f.seek(0)
f.truncate()

pickler.dump(data)
second_pickled = f.getvalue()

# Reset the Pickler and StringIO objects.
pickler.clear_memo()
f.seek(0)
f.truncate()

pickler.dump(data)
third_pickled = f.getvalue()

self.assertNotEqual(first_pickled, second_pickled)
self.assertEqual(first_pickled, third_pickled)

def test_priming_pickler_memo(self):
# Verify that we can set the Pickler's memo attribute.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)

pickler.dump(data)
first_pickled = f.getvalue()

f = cStringIO.StringIO()
primed = self.pickler_class(f)
primed.memo = pickler.memo

primed.dump(data)
primed_pickled = f.getvalue()

self.assertNotEqual(first_pickled, primed_pickled)

def test_priming_unpickler_memo(self):
# Verify that we can set the Unpickler's memo attribute.
data = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)

pickler.dump(data)
first_pickled = f.getvalue()

f = cStringIO.StringIO()
primed = self.pickler_class(f)
primed.memo = pickler.memo

primed.dump(data)
primed_pickled = f.getvalue()

unpickler = self.unpickler_class(cStringIO.StringIO(first_pickled))
unpickled_data1 = unpickler.load()

self.assertEqual(unpickled_data1, data)

primed = self.unpickler_class(cStringIO.StringIO(primed_pickled))
primed.memo = unpickler.memo
unpickled_data2 = primed.load()

# TODO(collinwinter): add dedicated tests for memo.clear(). This will
# require implementing stream support for unpickling.
primed.memo.clear()

self.assertEqual(unpickled_data2, data)
self.assertTrue(unpickled_data2 is unpickled_data1)

def test_reusing_unpickler_objects(self):
data1 = ["abcdefg", "abcdefg", 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data1)
pickled1 = f.getvalue()

data2 = ["abcdefg", 44, 44]
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(data2)
pickled2 = f.getvalue()

f = cStringIO.StringIO()
f.write(pickled1)
f.seek(0)
unpickler = self.unpickler_class(f)
self.assertEqual(unpickler.load(), data1)

f.seek(0)
f.truncate()
f.write(pickled2)
f.seek(0)
self.assertEqual(unpickler.load(), data2)

def test_pickle_multiple_objects_to_the_same_stream(self):
# Test that we don't optimize out necessary PUT opcodes from the first
# pickle. Even though *that individual pickle* may not need the memo
# state, subsequent pickles written with the same Pickler object are
# relying on the memo state.
val1 = "this is a string"
data = [(val1, 1), (val1, 2), (val1, 3)]

f = cStringIO.StringIO()
pickler = self.pickler_class(f)
for obj in data:
pickler.dump(obj)

f.seek(0)

unpickler = self.unpickler_class(f)
for obj in data:
got_obj = unpickler.load()
self.assertEqual(obj, got_obj)

def test_copy_pickler_memo(self):
pickler = self.pickler_class(cStringIO.StringIO())
pickler.dump("abadsfasd")
memo = pickler.memo.copy()
pickler.memo.clear()
self.assertEqual(len(memo), 1)

def test_copy_unpickler_memo(self):
string = "abasdfasdf"
sample_data = self.dumps([string, string])
unpickler = self.unpickler_class(cStringIO.StringIO(sample_data))
unpickler.load()
memo = unpickler.memo.copy()
self.assertTrue(string in memo.values(), memo)
unpickler.memo.clear()
self.assertTrue(string in memo.values(), memo)
# Make sure we can call copy() and that the unpickler accepts the
# returned object.
unpickler.memo = memo.copy()

def test_priming_picklers_unpicklers(self):
# This test is distilled from the cvs2svn tool. In order to save space
# in their pickle database, cvs2svn "primes" the pickler/unpickler memos
# with common objects so that these objects are represented by GET
# opcodes in the pickles, rather than the much longer sequences they
# would otherwise generate. In order to make use of the priming memos,
# the memos themselves are pickled to the front of the data stream.
# We simulate this process here. Consider this a large integration test,
# while the tests above are intended to be more fine-grained.

# Objects we want to share between pickles.
common1 = "asdfasdfasdfa"
common2 = "asdfadfadfshdifu"
common3 = (5, 6)
common_objects = [common1, common2, common3]

# Combinations of the above objects. These will be sent through
# the data stream.
combo1 = (common1, common2)
combo2 = (6, common3)
combo3 = (common1, common2, common3)

data_stream = cStringIO.StringIO()

# 1. Pickle the common objects, extract the Pickler's memo.
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.dump(common_objects)
pickler_memo = pickler.memo.copy()
priming_data = f.getvalue()

# 2. Unpickle the common objects, extract the Unpickler's memo.
unpickler = self.unpickler_class(cStringIO.StringIO(priming_data))
unpickler.load()
unpickler_memo = unpickler.memo.copy()

# 3. Write the pickler and unpickler memos to the data stream. These
# will be used to retrieve data from the stream.
pickler = self.pickler_class(data_stream)
pickler.dump(pickler_memo)
pickler.dump(unpickler_memo)

# 4. Pickle some more data to the data stream, using the priming memos.
# cvs2svn resets the memo before every object.
pickler = self.pickler_class(data_stream)
pickler.memo = pickler_memo.copy()
pickler.dump(combo1)
pickler.memo = pickler_memo.copy()
pickler.dump(combo2)

# 5. Now start reading the data. We read the priming memos first.
data_stream.seek(0)
unpickler = self.unpickler_class(data_stream)
recv_pickler_memo = unpickler.load()
recv_unpickler_memo = unpickler.load()
self.assertEqual(recv_pickler_memo, pickler_memo)
self.assertEqual(recv_unpickler_memo, unpickler_memo)

# 6. Using the received unpickling primer memo, unpickle the rest of
# the data stream. cvs2svn resets the memo before every object.
unpickler = self.unpickler_class(data_stream)
unpickler.memo = recv_unpickler_memo.copy()
self.assertEqual(unpickler.load(), combo1)
unpickler.memo = recv_unpickler_memo.copy()
self.assertEqual(unpickler.load(), combo2)

# 7. Pickle something using the received pickler memo, then unpickle it
# to make sure that the pickler memo works.
f = cStringIO.StringIO()
pickler = self.pickler_class(f)
pickler.memo = recv_pickler_memo.copy()
pickler.dump(combo3)
f.seek(0)
unpickler = self.unpickler_class(f)
unpickler.memo = recv_unpickler_memo.copy()
self.assertEqual(unpickler.load(), combo3)

Change log

r720 by collinw on Jul 9, 2009   Diff
Backport r686, r689, r690 and r701 from
trunk. This fixes a number of bugs in
cPickle. cvs2svn's test suite now passes
when run against 2009Q1.
Go to: 
Project members, sign in to write a code review

Older revisions

r391 by collinw on Apr 1, 2009   Diff
Backport r389. Fix test_xpickle to
work when src!=obj.
r314 by collinw on Mar 23, 2009   Diff
Backport r312. Add memo attributes
back to cPickle's Pickler and
Unpickler objects.
r265 by collinw on Mar 13, 2009   Diff
Backport r264 to release-2009Q1-maint.
Fix cPickle.load(); add tests.
All revisions of this file

File info

Size: 43108 bytes, 1339 lines
Powered by Google Project Hosting