Changeset 7122

Show
Ignore:
Timestamp:
01/29/10 06:44:55 (6 weeks ago)
Author:
dbaelde
Message:

Introduce a variant module of Generator.From_audio_video with builtin
multi-threading support and overfull protection.
It is used when streaming (because decoding and replaying are concurrent).
This simplifies the code of input.http/harbor and the Generated source.

Location:
trunk/liquidsoap/src
Files:
11 modified

Legend:

Unmodified
Added
Removed
  • trunk/liquidsoap/src/decoder/decoder.ml

    r7114 r7122  
    7777  * of things that should be explicitly managed, not just garbage collected). 
    7878  * Hence it does not need a close function. *) 
    79 type stream_decoder = input -> Generator.From_audio_video.t decoder 
     79type stream_decoder = input -> Generator.From_audio_video_plus.t decoder 
    8080 
    8181(** A decoder is a filling function and a closing function, 
  • trunk/liquidsoap/src/decoder/decoder.mli

    r7114 r7122  
    2929 
    3030type 'a decoder = Decoder of ('a -> unit) 
    31 type stream_decoder = input -> Generator.From_audio_video.t decoder 
     31type stream_decoder = input -> Generator.From_audio_video_plus.t decoder 
    3232type file_decoder = { fill : Frame.t -> int; close : unit -> unit; } 
    3333 
  • trunk/liquidsoap/src/decoder/mp3.ml

    r7114 r7122  
    2525open Dtools 
    2626 
    27 module Generator = Generator.From_audio_video 
    28 module Buffered = Decoder.Buffered(Generator) 
     27let log = Log.make ["decoder";"mp3"] 
    2928 
    30 let log = Log.make ["decoder";"mp3"] 
     29module Make (Generator:Generator.S_Asio) = 
     30struct 
    3131 
    3232let create_decoder input = 
     
    3939        resampler ~audio_src_rate:(float sample_freq) data 
    4040      in 
     41        Generator.set_mode gen `Audio ; 
    4142        Generator.put_audio gen content 0 (Array.length content.(0))) 
    4243 
     44end 
     45 
     46module G = Generator.From_audio_video 
     47module Buffered = Decoder.Buffered(G) 
     48module D = Make(G) 
     49 
    4350let create_file_decoder filename kind = 
    44   let generator = Generator.create Generator.Audio in 
    45     Buffered.file_decoder filename kind create_decoder generator 
     51  let generator = G.create `Audio in 
     52    Buffered.file_decoder filename kind D.create_decoder generator 
    4653 
    4754let conf_mad = 
     
    100107                   None) 
    101108 
     109module D_stream = Make(Generator.From_audio_video_plus) 
     110 
    102111let () = 
    103112  Decoder.stream_decoders#register 
     
    117126             * if there was possibly another plugin for decoding 
    118127             * correctly the stream (e.g. by performing conversions). *) 
    119             Some create_decoder 
     128            Some D_stream.create_decoder 
    120129          else 
    121130            None) 
  • trunk/liquidsoap/src/decoder/ogg_decoder.ml

    r7114 r7122  
    2222 
    2323(** Decode and read ogg files. *) 
    24  
    25 module Generator = Generator.From_audio_video 
    26 module Buffered = Decoder.Buffered(Generator) 
    2724 
    2825let log = Dtools.Log.make ["decoder";"ogg"] 
     
    112109      end 
    113110 
     111module Make (Generator:Generator.S_Asio) = 
     112struct 
     113 
    114114(* TODO this mimicks the old code, but in the near future decoding should 
    115115 * take into account the target content kind, e.g. for dropping channels *) 
    116 let create_decoder input = 
     116let create_decoder mode input = 
    117117  let decoder = 
    118118    let sync = Ogg.Sync.create input in 
     
    123123  let video_resample = video_resample () in 
    124124    Decoder.Decoder (fun buffer -> 
     125      Generator.set_mode buffer mode ; 
    125126      if Ogg_demuxer.eos decoder then 
    126127        raise Ogg_demuxer.End_of_stream; 
     
    159160          Ogg_demuxer.feed decoder) 
    160161 
    161 (** Stream decoder *) 
     162end 
     163 
     164(** File decoder *) 
     165 
     166module G = Generator.From_audio_video 
     167module Buffered = Decoder.Buffered(G) 
     168module D = Make(G) 
    162169 
    163170let create_file_decoder filename content_type kind = 
    164171  let mode = 
    165172    match content_type.Frame.video, content_type.Frame.audio with 
    166       | 0, _ -> Generator.Audio 
    167       | _, 0 -> Generator.Video 
    168       | _, _ -> Generator.Both 
    169   in 
    170   let generator = Generator.create mode in 
    171     Buffered.file_decoder filename kind create_decoder generator 
     173      | 0, _ -> `Audio 
     174      | _, 0 -> `Video 
     175      | _, _ -> `Both 
     176  in 
     177  let generator = G.create mode in 
     178    Buffered.file_decoder filename kind (D.create_decoder mode) generator 
    172179 
    173180let get_type filename = 
     
    227234      This settings has been DEPRECATED." 
    228235 
     236module D_stream = Make(Generator.From_audio_video_plus) 
     237 
    229238let () = 
    230239  Decoder.stream_decoders#register 
     
    233242     (fun mime kind -> 
    234243        if List.mem mime mime_types#get then 
    235           Some create_decoder 
     244          (* TODO We must find a way here... *) 
     245          Some (D_stream.create_decoder `Audio) 
    236246        else 
    237247          None) 
  • trunk/liquidsoap/src/formats/wavformat.ml

    r7114 r7122  
    2424 
    2525let log = Dtools.Log.make ["decoder";"wav"] 
    26  
    27 module Generator = Generator.From_audio_video 
    28 module Buffered = Decoder.Buffered(Generator) 
    2926 
    3027(** {1 Generic decoder} *) 
     
    5653let read_short ic = read_int_num_bytes ic 2 
    5754 
     55module Make (Generator:Generator.S_Asio) = 
     56struct 
     57 
    5858(* TODO It might be more efficient to write our code for an input 
    5959 * channel and use directly the one we have when decoding files 
    6060 * or external processes, if we could wrap the input function used 
    6161 * for decoding stream (in http and harbor) as an in_channel. *) 
    62 let create_decoder input = 
     62let create input = 
    6363  let decoder = ref (fun gen -> assert false) in 
    6464 
     
    6969      log#f 4 "Read %d bytes of PCM" bytes ; 
    7070      let content,length = converter (String.sub data 0 bytes) in 
     71        Generator.set_mode gen `Audio ; 
    7172        Generator.put_audio gen content 0 length ; 
    7273        log#f 4 "Done (%d)" length 
     
    119120    Decoder.Decoder (fun gen -> !decoder gen) 
    120121 
     122end 
     123 
     124module Generator = Generator.From_audio_video 
     125module Buffered = Decoder.Buffered(Generator) 
     126 
    121127(* File decoding *) 
     128 
     129module D = Make(Generator) 
    122130 
    123131let get_type filename = 
     
    128136 
    129137let create_file_decoder filename kind = 
    130   let generator = Generator.create Generator.Audio in 
    131     Buffered.file_decoder filename kind create_decoder generator 
     138  let generator = Generator.create `Audio in 
     139    Buffered.file_decoder filename kind D.create generator 
    132140 
    133141let () = 
  • trunk/liquidsoap/src/sources/external_input.ml

    r7031 r7122  
    2121 *****************************************************************************) 
    2222 
    23 module Generator = Generator.From_audio_video 
    24 module Generated = Generated.From_audio_video 
     23module Generator = Generator.From_audio_video_plus 
     24module Generated = Generated.From_audio_video_plus 
    2525 
    2626(* {1 External Input handling} *) 
     
    3838                             () ~audio_src_rate:in_freq 
    3939  in 
    40   let abg = Generator.create Generator.Audio in 
     40  let abg = Generator.create `Audio in 
    4141  let priority = Tutils.Non_blocking in 
    4242object (self) 
  • trunk/liquidsoap/src/sources/generated.ml

    r7110 r7122  
    2424struct 
    2525 
    26 (* Reads data from an audio buffer generator. The generator can be feeded 
    27  * in parallel, using [lock] if not in the main thread. 
     26(* Reads data from an audio buffer generator. 
     27 * A thread safe generator should be used if it has to be fed concurrently. 
    2828 * Store [bufferize] seconds before declaring itself as ready. *) 
    2929class virtual source ~bufferize ~empty_on_abort gen = 
     
    3535 
    3636  val mutable buffering = true 
    37   val lock = Mutex.create () 
    3837 
    3938  val mutable should_fail = false 
     
    4342  method abort_track = should_fail <- true 
    4443 
    45   method private length = 
    46     Mutex.lock lock ; 
    47     let r = Generator.length generator in 
    48       Mutex.unlock lock ; 
    49       r 
     44  method private length = Generator.length generator 
    5045 
    5146  method is_ready = 
     
    6964    if should_fail then 0 else 
    7065      let r = self#length in 
    71         Mutex.lock lock ; 
    7266        let l = Generator.remaining generator in 
    73         Mutex.unlock lock ; 
    74         if buffering && r <= bufferize then 0 else l 
     67          if buffering && r <= bufferize then 0 else l 
    7568 
    7669  method private get_frame ab = 
     
    7972      self#log#f 4 "Performing skip." ; 
    8073      should_fail <- false ; 
    81       if empty_on_abort then Generator.clear generator ; (* TODO lock *) 
     74      if empty_on_abort then Generator.clear generator ; 
    8275      Frame.add_break ab (Frame.position ab) 
    8376    end else begin 
    84       Mutex.lock lock ; 
    8577      Generator.fill generator ab ; 
    8678      (* Currently, we don't enter the buffering phase between tracks 
     
    9688        self#log#f 4 "Buffer emptied, starting buffering." ; 
    9789        buffering <- true 
    98       end ; 
    99       Mutex.unlock lock 
     90      end 
    10091    end 
    10192 
     
    113104end 
    114105 
    115 module From_audio_video = 
    116   Make(struct 
    117          type t = Generator.From_audio_video.t 
    118          let length x = Generator.From_audio_video.length x 
    119          let remaining x = Generator.From_audio_video.remaining x 
    120          let clear = Generator.From_audio_video.clear 
    121          let add_metadata = Generator.From_audio_video.add_metadata 
    122          let fill = Generator.From_audio_video.fill 
    123        end) 
     106module From_audio_video_plus = Make(Generator.From_audio_video_plus) 
  • trunk/liquidsoap/src/sources/harbor_input.ml

    r7110 r7122  
    3030                        ~on_connect ~on_disconnect 
    3131                        ~login ~debug = 
    32   (* let abg_max_len = Frame.audio_of_seconds max in 
    33      TODO handle buffer overflow, cf. input.http *) 
     32  let max_ticks = Frame.master_of_seconds max in 
    3433object (self) 
    3534  inherit Source.source kind 
    3635  (* TODO: we want video also in the genrator *) 
    3736  inherit Generated.source 
    38             (Generator.create Generator.Audio) 
     37            (Generator.create ~overfull:(`Drop_old max_ticks) `Undefined) 
    3938            ~empty_on_abort:false ~bufferize 
    4039 
     
    5857    if not (Hashtbl.mem m "title") then 
    5958      (try Hashtbl.add m "title" (Hashtbl.find m "song") with _ -> ()); 
    60     self#log#f 3 "New metadata chunk \"%s -- %s\"" 
     59    self#log#f 3 "New metadata chunk %S -- %S." 
    6160      (try Hashtbl.find m "artist" with _ -> "?") 
    6261      (try Hashtbl.find m "title" with _ -> "?") ; 
     
    116115      with 
    117116        | e -> 
    118             self#log#f 2 "Feeding stopped: %s" (Printexc.to_string e) ; 
     117            self#log#f 2 "Feeding stopped: %s." (Printexc.to_string e) ; 
    119118            if debug then raise e ; 
    120119            self#disconnect ; 
     
    149148 
    150149  method register_decoder mime = 
     150    Generator.set_mode generator `Undefined ; 
    151151    match 
    152152      Decoder.get_stream_decoder mime kind 
     
    215215        Some "Function to execute when a source is connected. \ 
    216216              Its receives the list of headers, of the form: \ 
    217               (\"label\",\"value\"). All labels are lowercase."; 
     217              (<label>,<value>). All labels are lowercase."; 
    218218 
    219219        "on_disconnect",Lang.fun_t [] Lang.unit_t, 
  • trunk/liquidsoap/src/sources/http_source.ml

    r7110 r7122  
    177177      host,80,mount,auth 
    178178 
    179 module Generator = Generator.From_audio_video 
     179module Generator = Generator.From_audio_video_plus 
    180180module Generated = Generated.Make(Generator) 
    181181 
     
    188188        ~debug ?(logfile=None) 
    189189        ~user_agent url = 
    190   let _ = 
    191   (* TODO 
    192    * We used to deal with overfull generators in the [put] method, 
    193    * passed to the stream decoders in the [sink]. Now, the 
    194    * interface is simpler, but we need to put this back somewhere: 
    195    * add overfull management to Generators? This can be done 
    196    * for a specific kind of generator, that also needs to be 
    197    * thread safe: 
    198    * 
    199    * The old "sink" system was used to: 
    200    *  - control concurrency on the generator 
    201    *  - log 
    202    *  - conversions (samplerate) 
    203    *  - control (fail to stop feeding on source/output stop) 
    204    * Most of it was adapted by passing directly a generator to the 
    205    * decoder, but we still need to handle multi-threading, 
    206    * by ensuring that read/writing in the generator don't occur 
    207    * at the same time. *) 
    208     Frame.audio_of_seconds (Pervasives.max max bufferize) 
    209   in 
     190  let max_ticks = Frame.master_of_seconds (Pervasives.max max bufferize) in 
    210191object (self) 
    211192  inherit Source.source kind 
    212193  inherit 
    213194    Generated.source 
    214       (Generator.create Generator.Audio) 
     195      (Generator.create ~overfull:(`Drop_old max_ticks) `Undefined) 
    215196      ~empty_on_abort:false ~bufferize 
    216197 
     
    234215  (* Insert metadata *) 
    235216  method insert_metadata m = 
    236     self#log#f 3 "New metadata chunk \"%s -- %s\"" 
     217    self#log#f 3 "New metadata chunk: %S -- %S." 
    237218                (try Hashtbl.find m "artist" with _ -> "?") 
    238219                (try Hashtbl.find m "title" with _ -> "?") ; 
     
    269250                  self#log#f 2 "Feeding stopped: %s." s 
    270251              | e -> 
    271                   self#log#f 2 "Feeding stopped: %s" (Printexc.to_string e) 
     252                  self#log#f 2 "Feeding stopped: %s." (Printexc.to_string e) 
    272253            end ; 
    273254            begin match logf with 
     
    398379                          | Not_found -> () 
    399380                      end else begin 
    400                         self#log#f 4 "Content-type \"%s\"." content_type ; 
     381                        self#log#f 4 "Content-type %S." content_type ; 
    401382                        if chunked then 
    402383                          self#log#f 4 "Chunked HTTP/1.1 transfer" ; 
     384                        Generator.set_mode generator `Undefined ; 
    403385                        let dec = 
    404386                          match 
     
    567549         raise (Lang.Invalid_value 
    568550                  (List.assoc "max" p, 
    569                    "Maximun buffering inferior to pre-buffered data")); 
     551                   "Maximum buffering inferior to pre-buffered data")); 
    570552       let poll_delay = Lang.to_float (List.assoc "poll_delay" p) in 
    571553         ((new http ~kind ~playlist_mode ~timeout ~autostart ~track_on_meta 
  • trunk/liquidsoap/src/stream/generator.ml

    r7110 r7122  
    2929  val fill : t -> Frame.t -> unit 
    3030  val add_metadata : t -> Frame.metadata -> unit 
     31end 
     32 
     33module type S_Asio = 
     34sig 
     35  type t 
     36  val length : t -> int (* ticks *) 
     37  val remaining : t -> int (* ticks *) 
     38  val clear : t -> unit 
     39  val fill : t -> Frame.t -> unit 
     40  val add_metadata : t -> Frame.metadata -> unit 
     41  val put_audio : t -> Frame.audio_t array -> int -> int -> unit 
     42  val put_video : t -> Frame.video_t array -> int -> int -> unit 
     43  val set_mode : t -> [ `Audio | `Video | `Both | `Undefined ] -> unit 
    3144end 
    3245 
     
    232245struct 
    233246 
    234   type mode = Audio | Video | Both 
     247  type mode = [ `Audio | `Video | `Both | `Undefined ] 
    235248  type t = { 
    236249    mutable mode : mode ; 
     
    299312      Generator.put t.audio content o l ; 
    300313      match t.mode with 
    301         | Audio -> 
     314        | `Audio -> 
    302315            Generator.put t.video [||] 0 l 
    303         | Both -> () 
    304         | Video -> assert false 
     316        | `Both -> () 
     317        | `Video | `Undefined -> assert false 
    305318 
    306319  (** Add some video content. Offset and length are given in video samples. *) 
     
    311324      Generator.put t.video content o l ; 
    312325      match t.mode with 
    313         | Video -> 
     326        | `Video -> 
    314327            Generator.put t.audio [||] 0 l 
    315         | Both -> () 
    316         | Audio -> assert false 
     328        | `Both -> () 
     329        | `Audio | `Undefined -> assert false 
    317330 
    318331  (* Advance metadata and breaks by [len] ticks. *) 
     
    404417 
    405418end 
     419 
     420module From_audio_video_plus = 
     421struct 
     422 
     423  module Super = From_audio_video 
     424 
     425  type mode = [ `Audio | `Video | `Both | `Undefined ] 
     426  type overfull = [ `Drop_old of int ] 
     427  type t = { 
     428    lock : Mutex.t ; 
     429    overfull : overfull option ; 
     430    gen : Super.t 
     431  } 
     432 
     433  let create ?(lock=Mutex.create()) ?overfull mode = 
     434    { lock = lock ; overfull = overfull ; gen = Super.create mode } 
     435 
     436  let mode t = Tutils.mutexify t.lock Super.mode t.gen 
     437  let set_mode t mode = Tutils.mutexify t.lock (Super.set_mode t.gen) mode 
     438 
     439  let audio_length t = Tutils.mutexify t.lock Super.audio_length t.gen 
     440  let video_length t = Tutils.mutexify t.lock Super.video_length t.gen 
     441  let length t = Tutils.mutexify t.lock Super.length t.gen 
     442  let remaining t = Tutils.mutexify t.lock Super.remaining t.gen 
     443 
     444  let add_metadata t m = 
     445    Tutils.mutexify t.lock (Super.add_metadata t.gen) m 
     446  let add_break t = Tutils.mutexify t.lock Super.add_break t.gen 
     447 
     448  let clear t = Tutils.mutexify t.lock Super.clear t.gen 
     449  let fill t frame = Tutils.mutexify t.lock (Super.fill t.gen) frame 
     450 
     451  let remove t len = 
     452    Tutils.mutexify t.lock (Super.remove t.gen) len 
     453 
     454  let check_overfull t extra = 
     455    assert (Tutils.seems_locked t.lock) ; 
     456    match t.overfull with 
     457      | Some (`Drop_old len) when Super.length t.gen + extra > len -> 
     458          Super.remove t.gen (Super.length t.gen + extra - len) 
     459      | _ -> () 
     460 
     461  let put_audio t buf off len = 
     462    Tutils.mutexify t.lock 
     463      (fun () -> 
     464         check_overfull t (Frame.master_of_audio len) ; 
     465         Super.put_audio t.gen buf off len) () 
     466 
     467  let put_video t buf off len = 
     468    Tutils.mutexify t.lock 
     469      (fun () -> 
     470         check_overfull t (Frame.master_of_video len) ; 
     471         Super.put_video t.gen buf off len) () 
     472 
     473end 
  • trunk/liquidsoap/src/stream/generator.mli

    r7110 r7122  
    6464sig 
    6565  type t 
    66   type mode = Audio | Video | Both 
     66 
     67  (** In [Audio] mode, only audio can be put in the buffer, and similarly 
     68    * for the [Video] mode. In [Both] mode, both types of content can 
     69    * be fed into the generator, asynchronously, and they exit the 
     70    * buffer synchronously. 
     71    * The [Undefined] forbids any feeding, it's useful to make sure 
     72    * a meaningful mode is assigned before any use. *) 
     73  type mode = [ `Audio | `Video | `Both | `Undefined ] 
     74 
    6775  val create : mode -> t 
    6876 
     
    8997  val clear : t -> unit 
    9098end 
     99 
     100(** Generator not only with Output but also with ASynchronous Input. *) 
     101module type S_Asio = 
     102sig 
     103  type t 
     104  val length : t -> int (* ticks *) 
     105  val remaining : t -> int (* ticks *) 
     106  val clear : t -> unit 
     107  val fill : t -> Frame.t -> unit 
     108  val add_metadata : t -> Frame.metadata -> unit 
     109  val put_audio : t -> Frame.audio_t array -> int -> int -> unit 
     110  val put_video : t -> Frame.video_t array -> int -> int -> unit 
     111  val set_mode : t -> [ `Audio | `Video | `Both | `Undefined ] -> unit 
     112end 
     113 
     114(** Same as From_audio_video but with two extra features useful for 
     115  * streaming decoders: it is thread safe and supports overfull 
     116  * buffer management. *) 
     117module From_audio_video_plus : 
     118sig 
     119  type t 
     120 
     121  (** Same as [From_audio_video]. *) 
     122  type mode = [ `Audio | `Video | `Both | `Undefined ] 
     123 
     124  (** How to handle overfull buffers: 
     125    * drop old data, keeping at most [len] ticks. *) 
     126  type overfull = [ `Drop_old of int ] 
     127 
     128  val create : ?lock:Mutex.t -> ?overfull:overfull -> mode -> t 
     129 
     130  val mode : t -> From_audio_video.mode 
     131  val set_mode : t -> From_audio_video.mode -> unit 
     132 
     133  val audio_length : t -> int 
     134  val video_length : t -> int 
     135  val length : t -> int 
     136  val remaining : t -> int 
     137 
     138  val add_metadata : t -> Frame.metadata -> unit 
     139  val add_break : t -> unit 
     140 
     141  (* [put_audio buffer data offset length]: 
     142   * offset and length are in audio samples! *) 
     143  val put_audio : t -> Frame.audio_t array -> int -> int -> unit 
     144  (* [put_video buffer data offset length]: 
     145   * offset and length are in video samples! *) 
     146  val put_video : t -> Frame.video_t array -> int -> int -> unit 
     147  val fill : t -> Frame.t -> unit 
     148 
     149  val remove : t -> int -> unit 
     150  val clear : t -> unit 
     151end