| 1 | (***************************************************************************** |
|---|
| 2 | |
|---|
| 3 | Liquidsoap, a programmable audio stream generator. |
|---|
| 4 | Copyright 2003-2010 Savonet team |
|---|
| 5 | |
|---|
| 6 | This program is free software; you can redistribute it and/or modify |
|---|
| 7 | it under the terms of the GNU General Public License as published by |
|---|
| 8 | the Free Software Foundation; either version 2 of the License, or |
|---|
| 9 | (at your option) any later version. |
|---|
| 10 | |
|---|
| 11 | This program is distributed in the hope that it will be useful, |
|---|
| 12 | but WITHOUT ANY WARRANTY; without even the implied warranty of |
|---|
| 13 | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|---|
| 14 | GNU General Public License for more details, fully stated in the COPYING |
|---|
| 15 | file at the root of the liquidsoap distribution. |
|---|
| 16 | |
|---|
| 17 | You should have received a copy of the GNU General Public License |
|---|
| 18 | along with this program; if not, write to the Free Software |
|---|
| 19 | Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
|---|
| 20 | |
|---|
| 21 | *****************************************************************************) |
|---|
| 22 | |
|---|
| 23 | open Unix |
|---|
| 24 | open Http_source |
|---|
| 25 | |
|---|
| 26 | (* {1 Input handling} *) |
|---|
| 27 | |
|---|
| 28 | class http_input_server ~kind ~dumpfile ~logfile |
|---|
| 29 | ~bufferize ~max |
|---|
| 30 | ~on_connect ~on_disconnect |
|---|
| 31 | ~login ~debug = |
|---|
| 32 | let max_ticks = Frame.master_of_seconds max in |
|---|
| 33 | object (self) |
|---|
| 34 | inherit Source.source kind |
|---|
| 35 | (* TODO: we want video also in the genrator *) |
|---|
| 36 | inherit Generated.source |
|---|
| 37 | (Generator.create ~overfull:(`Drop_old max_ticks) `Undefined) |
|---|
| 38 | ~empty_on_abort:false ~bufferize as generated |
|---|
| 39 | |
|---|
| 40 | val mutable relaying = false |
|---|
| 41 | val mutable ns = [] |
|---|
| 42 | val mutable create_decoder = fun _ -> assert false |
|---|
| 43 | val mutable mime_type = None |
|---|
| 44 | |
|---|
| 45 | val mutable dump = None |
|---|
| 46 | val mutable logf = None |
|---|
| 47 | |
|---|
| 48 | method login : (string option)*(string -> string -> bool) = login |
|---|
| 49 | |
|---|
| 50 | method stype = Source.Fallible |
|---|
| 51 | |
|---|
| 52 | (* Insert metadata *) |
|---|
| 53 | method insert_metadata m = |
|---|
| 54 | (* Metadata may contain only the "song" value |
|---|
| 55 | * or "artist" and "title". Here, we use "song" |
|---|
| 56 | * as the "title" field if "title" is not provided. *) |
|---|
| 57 | if not (Hashtbl.mem m "title") then |
|---|
| 58 | (try Hashtbl.add m "title" (Hashtbl.find m "song") with _ -> ()); |
|---|
| 59 | self#log#f 3 "New metadata chunk %S -- %S." |
|---|
| 60 | (try Hashtbl.find m "artist" with _ -> "?") |
|---|
| 61 | (try Hashtbl.find m "title" with _ -> "?") ; |
|---|
| 62 | Generator.add_metadata generator m |
|---|
| 63 | |
|---|
| 64 | (* TODO There must be two ways of handling overfull generator: |
|---|
| 65 | * (1) when streaming, one should just stop the decoder for a while; |
|---|
| 66 | * (2) when not streaming, one should throw some data. |
|---|
| 67 | * Doing 1 instead of 2 can lead to deconnections. |
|---|
| 68 | * Doing 2 instead of 1 leads to ugly sound. |
|---|
| 69 | * Here, we USED TO drop data since we want to remain |
|---|
| 70 | * connected to the client. |
|---|
| 71 | * TODO Restore the old behavior. *) |
|---|
| 72 | |
|---|
| 73 | method get_mime_type = mime_type |
|---|
| 74 | |
|---|
| 75 | method feed socket = |
|---|
| 76 | self#log#f 3 "Decoding..." ; |
|---|
| 77 | let t0 = Unix.gettimeofday () in |
|---|
| 78 | let read len = |
|---|
| 79 | let buf = String.make len ' ' in |
|---|
| 80 | let () = |
|---|
| 81 | let rec wait n = |
|---|
| 82 | let l,_,_ = Unix.select [socket] [] [] 1. in |
|---|
| 83 | if l=[] then begin |
|---|
| 84 | self#log#f 4 "No network activity for %d second(s)." n ; |
|---|
| 85 | if float n >= Harbor.conf_timeout#get then |
|---|
| 86 | begin |
|---|
| 87 | self#log#f 4 "Network activity timeout! Disconnecting source." ; |
|---|
| 88 | self#disconnect |
|---|
| 89 | end |
|---|
| 90 | else |
|---|
| 91 | wait (n+1) |
|---|
| 92 | end |
|---|
| 93 | in wait 1 |
|---|
| 94 | in |
|---|
| 95 | let input = Unix.read socket buf 0 len in |
|---|
| 96 | if input<=0 then raise End_of_file ; |
|---|
| 97 | begin match dump with |
|---|
| 98 | | Some b -> output_string b (String.sub buf 0 input) |
|---|
| 99 | | None -> () |
|---|
| 100 | end ; |
|---|
| 101 | begin match logf with |
|---|
| 102 | | Some b -> |
|---|
| 103 | let time = (Unix.gettimeofday () -. t0) /. 60. in |
|---|
| 104 | Printf.fprintf b "%f %d\n%!" time self#length |
|---|
| 105 | | None -> () |
|---|
| 106 | end ; |
|---|
| 107 | buf,input |
|---|
| 108 | in |
|---|
| 109 | let Decoder.Decoder decoder = create_decoder read in |
|---|
| 110 | try |
|---|
| 111 | while true do |
|---|
| 112 | if not relaying then failwith "relaying stopped" ; |
|---|
| 113 | decoder generator |
|---|
| 114 | done |
|---|
| 115 | with |
|---|
| 116 | | e -> |
|---|
| 117 | self#log#f 2 "Feeding stopped: %s." (Printexc.to_string e) ; |
|---|
| 118 | if debug then raise e ; |
|---|
| 119 | self#disconnect ; |
|---|
| 120 | begin try Unix.close socket with _ -> () end |
|---|
| 121 | |
|---|
| 122 | method private wake_up _ = |
|---|
| 123 | if ns = [] then |
|---|
| 124 | ns <- Server.register [self#id] "input.harbor" ; |
|---|
| 125 | self#set_id (Server.to_string ns) ; |
|---|
| 126 | let stop _ = |
|---|
| 127 | if relaying then (self#disconnect ; "Done") |
|---|
| 128 | else "No source client connected" |
|---|
| 129 | in |
|---|
| 130 | Server.add |
|---|
| 131 | ~ns "stop" ~descr:"Stop current source client, if connected." stop ; |
|---|
| 132 | Server.add |
|---|
| 133 | ~ns "kick" ~descr:"Kick current source client, if connected." stop ; |
|---|
| 134 | Server.add |
|---|
| 135 | ~ns "status" ~descr:"Display current status." |
|---|
| 136 | (fun _ -> |
|---|
| 137 | if relaying then |
|---|
| 138 | "source client connected" |
|---|
| 139 | else |
|---|
| 140 | "no source client connected") ; |
|---|
| 141 | Server.add ~ns "buffer_length" ~usage:"buffer_length" |
|---|
| 142 | ~descr:"Get the buffer's length, in seconds." |
|---|
| 143 | (fun _ -> Printf.sprintf "%.2f" |
|---|
| 144 | (Frame.seconds_of_audio self#length)) |
|---|
| 145 | |
|---|
| 146 | method private sleep = |
|---|
| 147 | if relaying then self#disconnect |
|---|
| 148 | |
|---|
| 149 | method register_decoder mime = |
|---|
| 150 | Generator.set_mode generator `Undefined ; |
|---|
| 151 | match |
|---|
| 152 | Decoder.get_stream_decoder mime kind |
|---|
| 153 | with |
|---|
| 154 | | Some d -> create_decoder <- d ; mime_type <- Some mime |
|---|
| 155 | | None -> raise Harbor.Unknown_codec |
|---|
| 156 | |
|---|
| 157 | method relay (headers:(string*string) list) socket = |
|---|
| 158 | relaying <- true ; |
|---|
| 159 | let headers = List.map (fun (x,y) -> String.lowercase x,y) headers in |
|---|
| 160 | on_connect headers ; |
|---|
| 161 | begin match dumpfile with |
|---|
| 162 | | Some f -> |
|---|
| 163 | begin try |
|---|
| 164 | dump <- Some (open_out_bin (Utils.home_unrelate f)) |
|---|
| 165 | with e -> |
|---|
| 166 | self#log#f 2 "Could not open dump file: %s" (Printexc.to_string e) |
|---|
| 167 | end |
|---|
| 168 | | None -> () |
|---|
| 169 | end ; |
|---|
| 170 | begin match logfile with |
|---|
| 171 | | Some f -> |
|---|
| 172 | begin try |
|---|
| 173 | logf <- Some (open_out_bin (Utils.home_unrelate f)) |
|---|
| 174 | with e -> |
|---|
| 175 | self#log#f 2 "Could not open log file: %s" (Printexc.to_string e) |
|---|
| 176 | end |
|---|
| 177 | | None -> () |
|---|
| 178 | end ; |
|---|
| 179 | ignore (Tutils.create |
|---|
| 180 | (fun () -> self#feed socket) () |
|---|
| 181 | "harbor source feeding") |
|---|
| 182 | |
|---|
| 183 | method disconnect = |
|---|
| 184 | if relaying then on_disconnect () ; |
|---|
| 185 | begin match dump with |
|---|
| 186 | | Some f -> close_out f ; dump <- None |
|---|
| 187 | | None -> () |
|---|
| 188 | end ; |
|---|
| 189 | begin match logf with |
|---|
| 190 | | Some f -> close_out f ; logf <- None |
|---|
| 191 | | None -> () |
|---|
| 192 | end ; |
|---|
| 193 | relaying <- false |
|---|
| 194 | |
|---|
| 195 | method is_taken = relaying |
|---|
| 196 | |
|---|
| 197 | method remaining = |
|---|
| 198 | if relaying then |
|---|
| 199 | if buffering then 0 else -1 |
|---|
| 200 | else |
|---|
| 201 | generated#remaining |
|---|
| 202 | |
|---|
| 203 | end |
|---|
| 204 | |
|---|
| 205 | let () = |
|---|
| 206 | let kind = Lang.kind_type_of_kind_format ~fresh:1 Lang.audio_any in |
|---|
| 207 | Lang.add_operator "input.harbor" |
|---|
| 208 | ~kind:(Lang.Unconstrained kind) |
|---|
| 209 | ~category:Lang.Input |
|---|
| 210 | ~descr:("Retrieves the given http stream from the harbor.") |
|---|
| 211 | [ |
|---|
| 212 | "buffer", Lang.float_t, Some (Lang.float 2.), |
|---|
| 213 | Some "Duration of the pre-buffered data." ; |
|---|
| 214 | |
|---|
| 215 | "max", Lang.float_t, Some (Lang.float 10.), |
|---|
| 216 | Some "Maximum duration of the buffered data."; |
|---|
| 217 | |
|---|
| 218 | "on_connect", |
|---|
| 219 | Lang.fun_t [false,"",Lang.metadata_t] Lang.unit_t, |
|---|
| 220 | Some (Lang.val_cst_fun ["",Lang.metadata_t,None] Lang.unit), |
|---|
| 221 | Some "Function to execute when a source is connected. \ |
|---|
| 222 | Its receives the list of headers, of the form: \ |
|---|
| 223 | (<label>,<value>). All labels are lowercase."; |
|---|
| 224 | |
|---|
| 225 | "on_disconnect",Lang.fun_t [] Lang.unit_t, |
|---|
| 226 | Some (Lang.val_cst_fun [] Lang.unit), |
|---|
| 227 | Some "Functions to excecute when a source is disconnected"; |
|---|
| 228 | |
|---|
| 229 | "user",Lang.string_t, |
|---|
| 230 | Some (Lang.string ""), |
|---|
| 231 | Some "Source user. Override default if not empty."; |
|---|
| 232 | |
|---|
| 233 | "password",Lang.string_t, |
|---|
| 234 | Some (Lang.string ""), |
|---|
| 235 | Some "Source password. Override default if not empty."; |
|---|
| 236 | |
|---|
| 237 | "auth", |
|---|
| 238 | Lang.fun_t [false,"",Lang.string_t;false,"",Lang.string_t] Lang.bool_t, |
|---|
| 239 | Some |
|---|
| 240 | (Lang.val_cst_fun |
|---|
| 241 | ["",Lang.string_t,None;"",Lang.string_t,None] |
|---|
| 242 | (Lang.bool false)), |
|---|
| 243 | Some "Authentification function. \ |
|---|
| 244 | <code>f(login,password)</code> returns <code>true</code> \ |
|---|
| 245 | if the user should be granted access for this login. \ |
|---|
| 246 | Override any other method if used."; |
|---|
| 247 | |
|---|
| 248 | "dumpfile", Lang.string_t, Some (Lang.string ""), |
|---|
| 249 | Some "Dump stream to file, for debugging purpose. Disabled if empty."; |
|---|
| 250 | |
|---|
| 251 | "logfile", Lang.string_t, Some (Lang.string ""), |
|---|
| 252 | Some "Log buffer status to file, for debugging purpose. \ |
|---|
| 253 | Disabled if empty."; |
|---|
| 254 | |
|---|
| 255 | "debug", Lang.bool_t, Some (Lang.bool false), |
|---|
| 256 | Some "Run in debugging mode by not catching some exceptions."; |
|---|
| 257 | |
|---|
| 258 | "", Lang.string_t, None, |
|---|
| 259 | Some "Mountpoint to look for." ] |
|---|
| 260 | (fun p kind -> |
|---|
| 261 | let mount = Lang.to_string (List.assoc "" p) in |
|---|
| 262 | let mount = |
|---|
| 263 | if mount<>"" && mount.[0]='/' then mount else |
|---|
| 264 | Printf.sprintf "/%s" mount |
|---|
| 265 | in |
|---|
| 266 | let trivially_false = function |
|---|
| 267 | | { Lang.value = |
|---|
| 268 | Lang.Fun (_,_,_, |
|---|
| 269 | { Lang_values.term = Lang_values.Bool false }) } |
|---|
| 270 | -> true |
|---|
| 271 | | _ -> false |
|---|
| 272 | in |
|---|
| 273 | let user = Lang.to_string (List.assoc "user" p) in |
|---|
| 274 | let password = Lang.to_string (List.assoc "password" p) in |
|---|
| 275 | let debug = Lang.to_bool (List.assoc "debug" p) in |
|---|
| 276 | let auth_function = List.assoc "auth" p in |
|---|
| 277 | let login user pass = |
|---|
| 278 | let user_login test_user test_pass = |
|---|
| 279 | let user,pass = |
|---|
| 280 | let f g x = match x with "" -> g | _ -> x in |
|---|
| 281 | f Harbor.conf_harbor_user#get user, |
|---|
| 282 | f Harbor.conf_harbor_pass#get password |
|---|
| 283 | in |
|---|
| 284 | test_user = user && |
|---|
| 285 | test_pass = pass |
|---|
| 286 | in |
|---|
| 287 | if not (trivially_false auth_function) then |
|---|
| 288 | Lang.to_bool |
|---|
| 289 | (Lang.apply ~t:Lang.bool_t |
|---|
| 290 | auth_function |
|---|
| 291 | ["",Lang.string user; |
|---|
| 292 | "",Lang.string pass]) |
|---|
| 293 | else |
|---|
| 294 | user_login user pass |
|---|
| 295 | in |
|---|
| 296 | let login = |
|---|
| 297 | let f x = if x <> "" then Some x else None in |
|---|
| 298 | (f user, login) |
|---|
| 299 | in |
|---|
| 300 | let dumpfile = |
|---|
| 301 | match Lang.to_string (List.assoc "dumpfile" p) with |
|---|
| 302 | | "" -> None |
|---|
| 303 | | s -> Some s |
|---|
| 304 | in |
|---|
| 305 | let logfile = |
|---|
| 306 | match Lang.to_string (List.assoc "logfile" p) with |
|---|
| 307 | | "" -> None |
|---|
| 308 | | s -> Some s |
|---|
| 309 | in |
|---|
| 310 | let bufferize = Lang.to_float (List.assoc "buffer" p) in |
|---|
| 311 | let max = Lang.to_float (List.assoc "max" p) in |
|---|
| 312 | if bufferize >= max then |
|---|
| 313 | raise (Lang.Invalid_value |
|---|
| 314 | (List.assoc "max" p, |
|---|
| 315 | "Maximun buffering inferior to pre-buffered data")); |
|---|
| 316 | let on_connect l = |
|---|
| 317 | let l = |
|---|
| 318 | List.map |
|---|
| 319 | (fun (x,y) -> Lang.product (Lang.string x) (Lang.string y)) |
|---|
| 320 | l |
|---|
| 321 | in |
|---|
| 322 | let arg = |
|---|
| 323 | Lang.list ~t:(Lang.product_t Lang.string_t Lang.string_t) l |
|---|
| 324 | in |
|---|
| 325 | ignore |
|---|
| 326 | (Lang.apply ~t:Lang.unit_t (List.assoc "on_connect" p) ["",arg]) |
|---|
| 327 | in |
|---|
| 328 | let on_disconnect () = |
|---|
| 329 | ignore |
|---|
| 330 | (Lang.apply ~t:Lang.unit_t (List.assoc "on_disconnect" p) []) |
|---|
| 331 | in |
|---|
| 332 | try |
|---|
| 333 | ((Harbor.find_source mount):>Source.source) |
|---|
| 334 | with |
|---|
| 335 | | Not_found -> |
|---|
| 336 | Harbor.add_source mount |
|---|
| 337 | ((new http_input_server ~kind |
|---|
| 338 | ~bufferize ~max ~login |
|---|
| 339 | ~dumpfile ~logfile |
|---|
| 340 | ~on_connect ~on_disconnect ~debug):>Harbor.source) ; |
|---|
| 341 | ((Harbor.find_source mount):>Source.source)) |
|---|