My favorites | Sign in
Project Logo
                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
open Ocamerl

let prog = "Ex_node_wc"

let (|>) left right = right left


let words_of_file fn =
let buf = Scanf.Scanning.from_file fn in
let scanner () = Scanf.bscanf buf " %s " (fun s -> s) in
Stream.from (fun _ -> match scanner () with "" -> None | s -> Some s)

let send_ok node self ref ctrlPid =
Enode.send
node
ctrlPid
(Eterm.ET_tuple [|Eterm.ET_atom "ok"; ref; self;|])

let create_mapper_process node ref ctrlPid =
let mbox = Enode.create_mbox node in
let self = Enode.Mbox.pid mbox in
let recvCB = fun msg -> match msg with
| Eterm.ET_tuple [| ref; Eterm.ET_atom "from_file"; Eterm.ET_string filename; destPid; ctrlPid;|] ->
let sender w =
let item = (Eterm.ET_tuple [| Eterm.ET_string w; Eterm.ET_int 1l;|]) in
Trace.dbg prog "Pid %s sending item %s\n" (Eterm.to_string self) (Eterm.to_string item);
Enode.send node destPid item
in
words_of_file filename |> Stream.iter sender;
send_ok node self ref ctrlPid
| Eterm.ET_atom "stop" ->
Enode.destroy_mbox node mbox
| m ->
(* skip unknown message *)
Trace.inf prog "Skip unknow message: %s\n" (Eterm.to_string m)
in
Enode.Mbox.create_activity mbox recvCB;
send_ok node self ref ctrlPid

let create_main_process node name =
let mbox = Enode.create_mbox node in
let _ = Enode.register_mbox node mbox name in
let recvCB = fun msg -> match msg with
| Eterm.ET_tuple [| Eterm.ET_atom "new_mapper"; ref; ctrlPid; |] ->
create_mapper_process node ref ctrlPid
| m ->
(* skip unknown message *)
Trace.inf prog "Skip unknow message: %s\n" (Eterm.to_string m)
in
Enode.Mbox.create_activity mbox recvCB

let doit () =
let name = ref "ocaml" in
let cookie = ref "" in
Arg.parse
[
("-cookie", Arg.String ((:=) cookie), "erlang node cookie");
("-name", Arg.String ((:=) name), "erlang node name");
("-debug", Arg.Unit (fun () -> Trace.set_level Trace.lvl_debug), "print debug messages");
]
ignore
"";
Trace.inf prog "Creating node; name: %s; cookie: %s\n" !name !cookie;
let n = Enode.create !name ~cookie:!cookie in
let _ = Thread.sigmask Unix.SIG_BLOCK [Sys.sigint] in
let _ = Enode.start n in
let _ = create_main_process n "wc" in
let _ = Thread.wait_signal [Sys.sigint] in
Enode.stop n

let _ =
try doit ()
with exn -> Printf.printf "ERROR:%s\n" (Printexc.to_string exn)

Show details Hide details

Change log

r51 by lcoquelle on May 20, 2008   Diff
trunk/lib/ocamerl: added a "map-reduce
style" word-count example.
Go to: 
Project members, sign in to write a code review

Older revisions

All revisions of this file

File info

Size: 2556 bytes, 74 lines
Hosted by Google Code