My favorites
▼
|
Sign in
cacherl
Erlang version of memcached with data persistence
Project Home
Wiki
Issues
Source
Checkout
Browse
Changes
Source path:
svn
/
trunk
/
memcached
/
src
/
memcached_client.erl
‹r8
r26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
-module(memcached_client).
-export([new/1, get/2, set/4, delete/3]).
-export([get_callback/4, set_callback/4, delete_callback/4]).
-record(memcached, { sockets }).
-define(TIMEOUT, 1000000).
new(Servers) ->
Sockets = lists:map(fun({IP, Port}) ->
{ok, Socket} = gen_tcp:connect(IP, Port, [binary, {active, false}, {packet, raw}]),
Socket
end, Servers),
#memcached{sockets=array:from_list(Sockets)}.
get_key_index(#memcached{sockets=Sockets}, Key) ->
Hash = erlang:phash2(Key),
Hash rem array:size(Sockets).
get_key_socket(#memcached{sockets=Sockets}=M, Key) ->
array:get(get_key_index(M, Key), Sockets).
recv_header(Sock, Callback, Args, <<"\r\n", Rest/binary>>, Header) ->
H = binary_to_list(Header),
Tokens = string:tokens(H, " "),
apply(?MODULE, Callback, [Sock, Rest, Tokens, Args]);
recv_header(Sock, Callback, Args, <<>>, Header) ->
{ok, Bin} = gen_tcp:recv(Sock, 0, ?TIMEOUT),
recv_header(Sock, Callback, Args, Bin, Header);
recv_header(Sock, Callback, Args, <<B:8, Rest/binary>>, Header) ->
recv_header(Sock, Callback, Args, Rest, <<Header/binary, B:8>>).
recv_body(Sock, Len, Rest) when size(Rest) < Len+2 ->
{ok, Bin} = gen_tcp:recv(Sock, 0, ?TIMEOUT),
recv_body(Sock, Len, <<Rest/binary, Bin/binary>>);
recv_body(_Sock, Len, Data) ->
<<Body:Len/binary, "\r\n", Rest/binary>> = Data,
{ok, Body, Rest}.
%-------------------------------------------------
normalize_key(Key) when is_binary(Key) ->
Key;
normalize_key(Key) ->
list_to_binary(Key).
get(M, Keys) ->
% TODO parallel fetching
Results = lists:foldl(fun(K, RL) ->
Sock = get_key_socket(M, K),
KeyBin = normalize_key(K),
Get = <<"get ", KeyBin/binary, "\r\n">>,
ok = gen_tcp:send(Sock, Get),
{ok, _Rest, R} = recv_header(Sock, get_callback, [], <<>>, <<>>),
case R of
[V|_] ->
RL ++ [V];
[] ->
RL
end
end, [], Keys),
{ok, Results}.
set(M, Key, Value, Exptime) ->
Sock = get_key_socket(M, Key),
Extra = io_lib:format("~p ~p ~p", [0, Exptime, size(Value)]),
ExtraBin = list_to_binary(Extra),
Set = <<"set ", Key/binary, " ", ExtraBin/binary, "\r\n", Value/binary, "\r\n">>,
ok = gen_tcp:send(Sock, Set),
recv_header(Sock, set_callback, [], <<>>, <<>>).
delete(M, Key, Time) ->
Sock = get_key_socket(M, Key),
Extra = io_lib:format("~p", [Time]),
ExtraBin = list_to_binary(Extra),
Delete = <<"delete ", Key/binary, " ", ExtraBin/binary, "\r\n">>,
ok = gen_tcp:send(Sock, Delete),
recv_header(Sock, delete_callback, [], <<>>, <<>>).
% the header callback for GET command
get_callback(Sock, Rest, ["VALUE", Key, Flag1, Len1], Results) ->
{_Flag, []} = string:to_integer(Flag1),
{Len, []} = string:to_integer(Len1),
{ok, Body, Rest2} = recv_body(Sock, Len, Rest),
Results2 = Results ++ [{list_to_binary(Key), Body}],
recv_header(Sock, get_callback, Results2, Rest2, <<>>);
get_callback(_Sock, Rest, ["END"], Results) ->
{ok, Rest, Results};
get_callback(_Sock, Rest, [Other|_], Results) ->
{error, Other, Rest, Results}.
% the header callback for SET command
set_callback(_Sock, Rest, ["STORED"], _) ->
{ok, stored, Rest};
set_callback(_Sock, Rest, ["NOT_STORED"], _) ->
{ok, not_stored, Rest};
set_callback(_Sock, _Data, Other, _) ->
{error, {unknown, Other}}.
% the header callback for DELETE command
delete_callback(_Sock, Rest, ["DELETED"], _) ->
{ok, deleted, Rest};
delete_callback(_Sock, Rest, ["NOT_FOUND"], _) ->
{ok, not_found, Rest}.
Show details
Hide details
Change log
r11
by echou327 on Jan 30, 2008
Diff
[No log message]
Go to:
/trunk/Emakefile
/trunk/Makefile
/trunk/cacherl.config
/trunk/memcached
/trunk/memcached/src
...nk/memcached/src/init_schema.erl
/trunk/memcached/src/memcached.app
/trunk/memcached/src/memcached.erl
/trunk/memcached/src/memcached.hrl
...mcached/src/memcached_client.erl
...mcached/src/memcached_mnesia.erl
...mcached/src/memcached_reader.erl
...memcached/src/memcached_test.erl
/trunk/misc
/trunk/misc/src
/trunk/misc/src/cacherl_util.erl
/trunk/misc/src/reloader.erl
/trunk/scripts/run.sh
/trunk/src/memcached.app
/trunk/src/memcached.erl
/trunk/src/memcached.hrl
/trunk/src/memcached_client.erl
/trunk/src/memcached_mnesia.erl
/trunk/src/memcached_reader.erl
/trunk/src/memcached_test.erl
/trunk/src/tcp_acceptor.erl
/trunk/src/tcp_acceptor_sup.erl
/trunk/src/tcp_client_sup.erl
/trunk/src/tcp_listener.erl
/trunk/src/tcp_listener_sup.erl
/trunk/src/tcp_misc.erl
/trunk/src/tcp_server_sup.erl
/trunk/start.sh
Project members,
sign in
to write a code review
Older revisions
r5
by echou327 on Jan 25, 2008
Diff
[No log message]
r2
by echou327 on Jan 25, 2008
Diff
[No log message]
All revisions of this file
File info
Size: 3473 bytes, 104 lines
View raw file
Powered by
Google Project Hosting