root/trunk/liquidsoap/src/sources/harbor_input.ml @ 7144

Revision 7144, 11.7 KB (checked in by dbaelde, 7 months ago)

Fix #remaining in input.harbor/http: infinity when relaying, and the amount
of buffered data only when not streaming.
Also fix the max>buffer check: the case of equality is also non-functional.

Line 
1(*****************************************************************************
2
3  Liquidsoap, a programmable audio stream generator.
4  Copyright 2003-2010 Savonet team
5
6  This program is free software; you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation; either version 2 of the License, or
9  (at your option) any later version.
10
11  This program is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  GNU General Public License for more details, fully stated in the COPYING
15  file at the root of the liquidsoap distribution.
16
17  You should have received a copy of the GNU General Public License
18  along with this program; if not, write to the Free Software
19  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20
21 *****************************************************************************)
22
23open Unix
24open Http_source
25
26(* {1 Input handling} *)
27
28class http_input_server ~kind ~dumpfile ~logfile
29                        ~bufferize ~max
30                        ~on_connect ~on_disconnect
31                        ~login ~debug =
32  let max_ticks = Frame.master_of_seconds max in
33object (self)
34  inherit Source.source kind
35  (* TODO: we want video also in the genrator *)
36  inherit Generated.source
37            (Generator.create ~overfull:(`Drop_old max_ticks) `Undefined)
38            ~empty_on_abort:false ~bufferize as generated
39
40  val mutable relaying = false
41  val mutable ns = []
42  val mutable create_decoder = fun _ -> assert false
43  val mutable mime_type = None
44
45  val mutable dump = None
46  val mutable logf = None
47
48  method login : (string option)*(string -> string -> bool) = login
49
50  method stype = Source.Fallible
51
52  (* Insert metadata *)
53  method insert_metadata m =
54    (* Metadata may contain only the "song" value
55     * or "artist" and "title". Here, we use "song"
56     * as the "title" field if "title" is not provided. *)
57    if not (Hashtbl.mem m "title") then
58      (try Hashtbl.add m "title" (Hashtbl.find m "song") with _ -> ());
59    self#log#f 3 "New metadata chunk %S -- %S."
60      (try Hashtbl.find m "artist" with _ -> "?")
61      (try Hashtbl.find m "title" with _ -> "?") ;
62    Generator.add_metadata generator m
63
64  (* TODO There must be two ways of handling overfull generator:
65   * (1) when streaming, one should just stop the decoder for a while;
66   * (2) when not streaming, one should throw some data.
67   * Doing 1 instead of 2 can lead to deconnections.
68   * Doing 2 instead of 1 leads to ugly sound.
69   * Here, we USED TO drop data since we want to remain
70   * connected to the client.
71   * TODO Restore the old behavior. *)
72
73  method get_mime_type = mime_type
74
75  method feed socket =
76    self#log#f 3 "Decoding..." ;
77    let t0 = Unix.gettimeofday () in
78    let read len =
79      let buf = String.make len ' ' in
80      let () =
81        let rec wait n =
82          let l,_,_ = Unix.select [socket] [] [] 1. in
83            if l=[] then begin
84              self#log#f 4 "No network activity for %d second(s)." n ;
85              if float n >= Harbor.conf_timeout#get then
86               begin
87                self#log#f 4 "Network activity timeout! Disconnecting source." ;
88                self#disconnect
89               end
90              else
91               wait (n+1)
92            end
93        in wait 1
94      in
95      let input = Unix.read socket buf 0 len in
96      if input<=0 then raise End_of_file ;
97      begin match dump with
98        | Some b -> output_string b (String.sub buf 0 input)
99        | None -> ()
100      end ;
101      begin match logf with
102        | Some b ->
103            let time = (Unix.gettimeofday () -. t0) /. 60. in
104              Printf.fprintf b "%f %d\n%!" time self#length
105        | None -> ()
106      end ;
107      buf,input
108    in
109    let Decoder.Decoder decoder = create_decoder read in
110      try
111        while true do
112          if not relaying then failwith "relaying stopped" ;
113          decoder generator
114        done
115      with
116        | e ->
117            self#log#f 2 "Feeding stopped: %s." (Printexc.to_string e) ;
118            if debug then raise e ;
119            self#disconnect ;
120            begin try Unix.close socket with _ -> () end
121
122  method private wake_up _ =
123    if ns = [] then
124      ns <- Server.register [self#id] "input.harbor" ;
125    self#set_id (Server.to_string ns) ;
126    let stop _ =
127      if relaying then (self#disconnect ; "Done")
128      else "No source client connected"
129    in
130    Server.add
131      ~ns "stop" ~descr:"Stop current source client, if connected." stop ;
132    Server.add
133      ~ns "kick" ~descr:"Kick current source client, if connected." stop ;
134    Server.add
135      ~ns "status" ~descr:"Display current status."
136      (fun _ ->
137         if relaying then
138           "source client connected"
139         else
140           "no source client connected") ;
141    Server.add ~ns "buffer_length" ~usage:"buffer_length"
142               ~descr:"Get the buffer's length, in seconds."
143       (fun _ -> Printf.sprintf "%.2f"
144             (Frame.seconds_of_audio self#length))
145
146  method private sleep =
147    if relaying then self#disconnect
148
149  method register_decoder mime =
150    Generator.set_mode generator `Undefined ;
151    match
152      Decoder.get_stream_decoder mime kind
153    with
154      | Some d -> create_decoder <- d ; mime_type <- Some mime
155      | None -> raise Harbor.Unknown_codec
156
157  method relay (headers:(string*string) list) socket =
158    relaying <- true ;
159    let headers = List.map (fun (x,y) -> String.lowercase x,y) headers in
160    on_connect headers ;
161    begin match dumpfile with
162      | Some f ->
163          begin try
164            dump <- Some (open_out_bin (Utils.home_unrelate f))
165          with e ->
166            self#log#f 2 "Could not open dump file: %s" (Printexc.to_string e)
167          end
168      | None -> ()
169    end ;
170    begin match logfile with
171      | Some f ->
172          begin try
173            logf <- Some (open_out_bin (Utils.home_unrelate f))
174          with e ->
175            self#log#f 2 "Could not open log file: %s" (Printexc.to_string e)
176          end
177      | None -> ()
178    end ;
179    ignore (Tutils.create
180              (fun () -> self#feed socket) ()
181              "harbor source feeding")
182
183  method disconnect =
184    if relaying then on_disconnect () ;
185    begin match dump with
186      | Some f -> close_out f ; dump <- None
187      | None -> ()
188    end ;
189    begin match logf with
190      | Some f -> close_out f ; logf <- None
191      | None -> ()
192    end ;
193    relaying <- false
194
195  method is_taken = relaying
196
197  method remaining =
198    if relaying then
199      if buffering then 0 else -1
200    else
201      generated#remaining
202
203end
204
205let () =
206  let kind = Lang.kind_type_of_kind_format ~fresh:1 Lang.audio_any in
207    Lang.add_operator "input.harbor"
208      ~kind:(Lang.Unconstrained kind)
209      ~category:Lang.Input
210      ~descr:("Retrieves the given http stream from the harbor.")
211      [
212        "buffer", Lang.float_t, Some (Lang.float 2.),
213         Some "Duration of the pre-buffered data." ;
214
215        "max", Lang.float_t, Some (Lang.float 10.),
216        Some "Maximum duration of the buffered data.";
217
218        "on_connect",
219        Lang.fun_t [false,"",Lang.metadata_t] Lang.unit_t,
220        Some (Lang.val_cst_fun ["",Lang.metadata_t,None] Lang.unit),
221        Some "Function to execute when a source is connected. \
222              Its receives the list of headers, of the form: \
223              (<label>,<value>). All labels are lowercase.";
224
225        "on_disconnect",Lang.fun_t [] Lang.unit_t,
226        Some (Lang.val_cst_fun [] Lang.unit),
227        Some "Functions to excecute when a source is disconnected";
228
229        "user",Lang.string_t,
230        Some (Lang.string ""),
231        Some "Source user. Override default if not empty.";
232
233        "password",Lang.string_t,
234        Some (Lang.string ""),
235        Some "Source password. Override default if not empty.";
236
237        "auth",
238        Lang.fun_t [false,"",Lang.string_t;false,"",Lang.string_t] Lang.bool_t,
239        Some
240          (Lang.val_cst_fun
241             ["",Lang.string_t,None;"",Lang.string_t,None]
242             (Lang.bool false)),
243        Some "Authentification function. \
244              <code>f(login,password)</code> returns <code>true</code> \
245              if the user should be granted access for this login. \
246              Override any other method if used.";
247
248        "dumpfile", Lang.string_t, Some (Lang.string ""),
249        Some "Dump stream to file, for debugging purpose. Disabled if empty.";
250
251        "logfile", Lang.string_t, Some (Lang.string ""),
252        Some "Log buffer status to file, for debugging purpose. \
253              Disabled if empty.";
254
255        "debug", Lang.bool_t, Some (Lang.bool false),
256        Some "Run in debugging mode by not catching some exceptions.";
257
258        "", Lang.string_t, None,
259        Some "Mountpoint to look for." ]
260      (fun p kind ->
261         let mount = Lang.to_string (List.assoc "" p) in
262         let mount =
263           if mount<>"" && mount.[0]='/' then mount else
264             Printf.sprintf "/%s" mount
265         in
266         let trivially_false = function
267           | { Lang.value =
268                 Lang.Fun (_,_,_,
269                           { Lang_values.term = Lang_values.Bool false }) }
270               -> true
271           | _ -> false
272         in
273         let user = Lang.to_string (List.assoc "user" p) in
274         let password = Lang.to_string (List.assoc "password" p) in
275         let debug = Lang.to_bool (List.assoc "debug" p) in
276         let auth_function = List.assoc "auth" p in
277         let login user pass =
278           let user_login test_user test_pass =
279             let user,pass =
280               let f g x = match x with "" -> g | _ -> x in
281               f Harbor.conf_harbor_user#get user,
282               f Harbor.conf_harbor_pass#get password
283             in
284             test_user = user &&
285             test_pass = pass
286           in
287             if not (trivially_false auth_function) then
288               Lang.to_bool
289                 (Lang.apply ~t:Lang.bool_t
290                    auth_function
291                    ["",Lang.string user;
292                     "",Lang.string pass])
293             else
294               user_login user pass
295         in
296         let login =
297           let f x = if x <> "" then Some x else None in
298           (f user, login)
299         in
300         let dumpfile =
301           match Lang.to_string (List.assoc "dumpfile" p) with
302             | "" -> None
303             | s -> Some s
304         in
305         let logfile =
306           match Lang.to_string (List.assoc "logfile" p) with
307             | "" -> None
308             | s -> Some s
309         in
310         let bufferize = Lang.to_float (List.assoc "buffer" p) in
311         let max = Lang.to_float (List.assoc "max" p) in
312         if bufferize >= max then
313           raise (Lang.Invalid_value
314                    (List.assoc "max" p,
315                     "Maximun buffering inferior to pre-buffered data"));
316         let on_connect l =
317           let l = 
318             List.map
319              (fun (x,y) -> Lang.product (Lang.string x) (Lang.string y))
320              l
321           in
322           let arg =
323             Lang.list ~t:(Lang.product_t Lang.string_t Lang.string_t) l
324           in
325           ignore
326             (Lang.apply ~t:Lang.unit_t (List.assoc "on_connect" p) ["",arg])
327         in
328         let on_disconnect () =
329           ignore
330             (Lang.apply ~t:Lang.unit_t (List.assoc "on_disconnect" p) [])
331         in
332           try
333             ((Harbor.find_source mount):>Source.source)
334           with
335             | Not_found ->
336                 Harbor.add_source mount
337                   ((new http_input_server ~kind
338                       ~bufferize ~max ~login
339                       ~dumpfile ~logfile
340                       ~on_connect ~on_disconnect ~debug):>Harbor.source) ;
341                 ((Harbor.find_source mount):>Source.source))
Note: See TracBrowser for help on using the browser.