Changeset 6565

Show
Ignore:
Timestamp:
05/10/09 21:50:52 (10 months ago)
Author:
metamorph68
Message:

Factorized buffered I/O code, added blocking API. fixes #203

Location:
trunk/liquidsoap
Files:
2 added
9 modified

Legend:

Unmodified
Added
Removed
  • trunk/liquidsoap/CHANGES

    r6534 r6565  
    1212   to finish, allowing proper restart_on_new_track like for ogg encoded 
    1313   data. 
     14 - Factorized buffered I/O code. Also added a blocking API, which avoids 
     15   "no available frame" and "reader not ready" messages and audio glitches.  
     16   It requires that "root.sync" be desactivated for these sources, such that 
     17   synchronisation is done by the source. (#203) 
    1418New: 
    1519 - Support ogg/dirac (#TODO: close ticket). 
  • trunk/liquidsoap/src/Makefile

    r6527 r6565  
    149149        $(if $(W_RTP),tools/rtp_c.o tools/rtp.ml) \ 
    150150        tools/server.ml tools/server_builtins.ml \ 
    151         $(if $(W_LAST),tools/liqfm.ml) \ 
     151        $(if $(W_LAST),tools/liqfm.ml) alsa_settings.ml \ 
    152152        lang/lang_types.ml lang/lang_values.ml \ 
    153153        lang/lang_lexer.ml lang/lang_parser.ml lang/lang_pp.ml lang/lang.ml \ 
    154         tools/harbor.ml $(video_converters) $(audio_converters) \ 
     154        tools/harbor.ml tools/ioRing.ml \ 
     155        $(video_converters) $(audio_converters) \ 
    155156        $(ogg_utils) $(protocols) $(sources) $(operators) $(outputs) $(io) \ 
    156157        $(analyze) $(playlists) $(visualization) $(formats) \ 
     
    223224 
    224225libliq_sources= $(tools) SVN.ml audio_converter.ml $(stream) \ 
    225         shebang.ml \ 
     226        shebang.ml alsa_settings.ml \ 
    226227        $(if $(W_RTP),tools/rtp_c.o tools/rtp.ml) \ 
    227228        tools/server.ml tools/server_builtins.ml \ 
  • trunk/liquidsoap/src/io/alsa_io.ml

    r6527 r6565  
    2424 
    2525exception Error of string 
    26  
    27 (** ALSA should be quiet *) 
    28 let () = no_stderr_report () 
    2926 
    3027let handle lbl f x = 
     
    128125          (Pcm.set_channels dev params) channels ; 
    129126        handle "periods" 
    130           (Pcm.set_periods dev params Alsa_out.periods#get) Dir_eq ; 
     127          (Pcm.set_periods dev params Alsa_settings.periods#get) Dir_eq ; 
    131128        let rate = 
    132129          handle "rate" (Pcm.set_rate_near dev params samples_per_second) Dir_eq 
  • trunk/liquidsoap/src/outputs/alsa_out.ml

    r6354 r6565  
    2121 *****************************************************************************) 
    2222 
    23 (* See sources/alsa_in.ml for details. 
    24  * Basically, we thread because alsa calls are blocking and can have a bad 
    25  * interaction with the root scheduling. *) 
     23(* Buffered ALSA output *) 
    2624 
    2725open Alsa 
    2826open Dtools 
    29  
    30 let log = Log.make ["output";"alsa"] 
    31  
    32 exception Error of string 
    33  
    34 let conf = 
    35   Conf.void ~p:(Configure.conf#plug "alsa") 
    36     "ALSA configuration" 
    37 let conf_buffer_length = 
    38   Conf.float ~p:(conf#plug "buffer_length") ~d:4. 
    39     "Length of the alsa ringbuffer in seconds" 
    40     ~comments:[ 
    41       "This is only used for buffered ALSA I/O, and affects the latency." 
    42     ] 
    43 let periods = 
    44   Conf.int ~p:(conf#plug "periods") ~d:5 
    45     "Number of periods" 
    4627 
    4728(** ALSA should be quiet *) 
     
    4930 
    5031class output dev start source = 
    51   let channels = Fmt.channels () in 
     32  let buffer_length = Fmt.samples_per_frame () in 
     33  let buffer_chans = Fmt.channels () in 
     34  let blank () = Array.init buffer_chans (fun _ -> Array.make buffer_length 0.) in 
     35  let nb_blocks = Alsa_settings.conf_buffer_length#get in 
    5236  let samples_per_second = Fmt.samples_per_second () in 
    53   let seconds_per_frame = Fmt.seconds_per_frame () in 
     37  let periods = Alsa_settings.periods#get in 
    5438object (self) 
    5539  inherit Output.output ~name:"output.alsa" ~kind:"output.alsa" source start 
     40  inherit [float array array] IoRing.output ~nb_blocks ~blank 
     41                                 ~blocking:true () as ioring 
    5642 
    57   val mutable alsa_rate = 0 
    58   val mutable write = 
    59     (fun pcm buf ofs len -> Pcm.writen_float pcm buf ofs len) 
    60   val mutable ring = Ringbuffer.TS.create channels 0 
     43  val mutable device = None 
    6144 
    62   val samplerate_converter = Audio_converter.Samplerate.create channels 
     45  val mutable alsa_rate = samples_per_second 
     46  val samplerate_converter = Audio_converter.Samplerate.create buffer_chans 
     47  val mutable alsa_write =                                                                        
     48    (fun pcm buf ofs len -> Pcm.writen_float pcm buf ofs len)  
    6349 
    64   val mutable sleep = false 
    65   method output_stop = sleep <- true 
     50  method get_device = 
     51    match device with 
     52      | Some d -> d 
     53      | None ->  
     54          self#log#f 3 "Using ALSA %s." (Alsa.get_version ()) ; 
     55          let dev = Pcm.open_pcm dev [Pcm.Playback] [] in 
     56          let params = Pcm.get_params dev in 
     57          let bufsize = 
     58             ( 
     59               try 
     60                 Pcm.set_access dev params Pcm.Access_rw_noninterleaved ; 
     61                 Pcm.set_format dev params Pcm.Format_float 
     62               with 
     63                 | _ -> 
     64                    (* If we can't get floats we fallback on interleaved s16le *) 
     65                    self#log#f 2 "Falling back on interleaved S16LE"; 
     66                    Pcm.set_access dev params Pcm.Access_rw_interleaved ; 
     67                    Pcm.set_format dev params Pcm.Format_s16_le ; 
     68                    alsa_write <- 
     69                      (fun pcm buf ofs len -> 
     70                         let sbuf = String.create (2 * len * Array.length buf) in 
     71                         let _ =  Float_pcm.to_s16le buf ofs len sbuf 0 in 
     72                         Pcm.writei pcm sbuf 0 len 
     73                      )      
     74             ); 
     75             Pcm.set_channels dev params buffer_chans ; 
     76             alsa_rate <- Pcm.set_rate_near dev params samples_per_second Dir_eq ; 
     77             (* Size in frames, must be set after the samplerate. 
     78              * This setting is critical as a too small bufsize will easily result in 
     79              * underruns when the thread isn't fast enough. 
     80              * TODO make it customizable *) 
     81             Pcm.set_periods dev params periods Dir_eq; 
     82             Pcm.set_buffer_size_near dev params 65536 
     83          in 
     84          self#log#f 3 "Samplefreq=%dHz, Bufsize=%dB, Frame=%dB, Periods=%d" 
     85            alsa_rate bufsize (Pcm.get_frame_size params) periods ; 
     86          Pcm.set_params dev params ; 
     87          device <- Some dev ; 
     88          dev 
    6689 
    67   method output_start = 
    68     sleep <- false ; 
    69     self#log#f 3 "Using ALSA %s." (Alsa.get_version ()) ; 
     90  method close =  
     91    match device with 
     92      | Some d -> 
     93          Pcm.close d ; 
     94          device <- None 
     95      | None -> () 
     96 
     97  method push_block data =  
     98    let dev = self#get_device in 
    7099    try 
    71       let dev = Pcm.open_pcm dev [Pcm.Playback] [] in 
    72       let params = Pcm.get_params dev in 
    73       let bufsize = 
    74         ( 
    75           try 
    76             Pcm.set_access dev params Pcm.Access_rw_noninterleaved ; 
    77             Pcm.set_format dev params Pcm.Format_float 
    78           with 
    79             | _ -> 
    80                 (* If we can't get floats we fallback on interleaved s16le *) 
    81                 self#log#f 2 "Falling back on interleaved S16LE"; 
    82                 Pcm.set_access dev params Pcm.Access_rw_interleaved ; 
    83                 Pcm.set_format dev params Pcm.Format_s16_le ; 
    84                 write <- 
    85                 (fun pcm buf ofs len -> 
    86                    let sbuf = String.create (2 * len * Array.length buf) in 
    87                    let _ =  Float_pcm.to_s16le buf ofs len sbuf 0 in 
    88                      Pcm.writei pcm sbuf 0 len 
    89                 ) 
    90         ); 
    91         Pcm.set_channels dev params channels ; 
    92         alsa_rate <- Pcm.set_rate_near dev params samples_per_second Dir_eq ; 
    93         (* Size in frames, must be set after the samplerate. 
    94          * This setting is critical as a too small bufsize will easily result in 
    95          * underruns when the thread isn't fast enough. 
    96          * TODO make it customizable *) 
    97         Pcm.set_periods dev params periods#get Dir_eq; 
    98         Pcm.set_buffer_size_near dev params 65536 
     100      let len = Array.length data.(0) in 
     101      let rec f pos =  
     102        if pos < len then 
     103          let ret = alsa_write dev data pos (len - pos) in 
     104          f (pos+ret)  
    99105      in 
    100         self#log#f 3 "Samplefreq=%dHz, Bufsize=%dB, Frame=%dB, Periods=%d" 
    101              alsa_rate bufsize (Pcm.get_frame_size params) periods#get ; 
    102         Pcm.set_params dev params ; 
    103         let ringsize = Fmt.samples_of_seconds (conf_buffer_length#get) in 
    104           ring <- Ringbuffer.TS.create channels ringsize; 
    105           (* Now feed half of the ringbuffer with blank *) 
    106           Ringbuffer.TS.write_advance ring (ringsize/2); 
    107           ignore 
    108             (Tutils.create (fun () -> self#reader dev bufsize) () "alsa_playback") 
     106      f 0 
    109107    with 
    110       | Unknown_error n when false -> raise (Error (string_of_error n)) 
    111  
    112   method reader dev bufsize = 
    113     (* TODO: static buffers *) 
    114     let blank = Array.init channels (fun _ -> Array.create bufsize 0.) in 
    115     try 
    116       Pcm.prepare dev ; 
    117       while not sleep && not !Root.shutdown do 
    118         try 
    119           if Ringbuffer.TS.transmit ring (Pcm.writen_float dev) = 0 
    120           then ( 
    121             self#log#f 3 "Writer is late!" ; 
    122             Thread.delay (seconds_per_frame/.2.) 
    123           ) 
    124         with 
    125           | Buffer_xrun -> 
    126               log#f 2 "Underrun!" ; 
    127               Pcm.prepare dev ; 
    128               ignore (write dev blank 0 bufsize) 
    129       done ; 
    130       Pcm.close dev 
    131     with 
    132       | Unknown_error n -> raise (Error (string_of_error n)) 
     108      | Buffer_xrun -> 
     109           self#log#f 2 "Underrun!" ; 
     110           Pcm.prepare dev 
    133111 
    134112  method output_send buf = 
     
    138116                  ratio buf 0 (Array.length buf.(0))  
    139117    in 
    140       if Ringbuffer.TS.write_space ring < Array.length buf.(0) then begin 
    141         if false then self#log#f 3 "Reader is late!" ; 
    142         (* Thread.delay (Mixer.Buffer.length/.2.) *) 
    143       end else 
    144         Ringbuffer.TS.write ring buf 0 (Array.length buf.(0)) 
     118    let f data =  
     119      for c = 0 to Array.length buf - 1 do 
     120        Float_pcm.float_blit buf.(c) 0 data.(c) 0 (Array.length buf.(0)) 
     121      done 
     122    in 
     123    ioring#put_block f 
    145124 
    146125  method output_reset = () 
  • trunk/liquidsoap/src/outputs/ao_out.ml

    r6527 r6565  
    2424open Ao 
    2525 
    26 exception Error of string 
    27  
    28 let nb_blocks = 10 
    29  
    30 let bytes_per_sample = 2 
    31  
    32 class output ~driver ~options source start = 
     26class output ~nb_blocks ~driver  
     27             ~options source start = 
    3328  let channels = Fmt.channels () in 
    3429  let samples_per_frame = Fmt.samples_per_frame () in 
    3530  let samples_per_second = Fmt.samples_per_second () in 
     31  let bytes_per_sample = 2 in 
     32  let blank () = 
     33    String.make (samples_per_frame * channels * bytes_per_sample) '0' 
     34  in 
    3635object (self) 
    3736  inherit Output.output ~name:"ao" ~kind:"output.ao" source start 
     37  inherit [string] IoRing.output ~nb_blocks ~blank  
     38                                 ~blocking:true () as ioring 
    3839 
    39   (* See sources/alsa_in.ml, it's the same producer/consumer system. *) 
    40   val buffer = Array.create nb_blocks "" 
    41   initializer 
    42     for i = 0 to nb_blocks - 1 do 
    43       buffer.(i) <- 
    44         String.create (samples_per_frame * channels * bytes_per_sample) 
    45     done 
    46   val mutable read = 0 
    47   val mutable write = 0 
     40  val mutable device = None 
    4841 
    49   val mutable sleep = false 
    50   method output_stop = sleep <- true 
     42  method get_device =  
     43    match device with 
     44      | Some d -> d 
     45      | None -> 
     46          (* Wait for things to settle *)                                            
     47          Thread.delay (5. *. (Fmt.seconds_per_frame ()));  
     48          let driver =  
     49            if driver = "" then 
     50              get_default_driver () 
     51            else 
     52              find_driver driver 
     53           in 
     54           let dev = open_live ~driver ~options 
     55                        ~rate:samples_per_second 
     56                        ~bits:(bytes_per_sample * 8) 
     57                        ~channels:channels () 
     58           in 
     59           device <- Some dev ; 
     60           dev 
    5161 
    52   method output_start = 
    53     sleep <- false ; 
    54     read <- 0 ; write <- 0 ; 
    55     ignore (Tutils.create (fun () -> self#reader) () "ao_playback") 
     62  method close =  
     63    match device with  
     64      | Some d ->  
     65          Ao.close d ; 
     66          device <- None 
     67      | None -> () 
    5668 
    57   method reader = 
    58     let driver = if driver = "" then 
    59       get_default_driver () 
    60     else 
    61       find_driver driver 
    62     in 
    63     let device = open_live ~driver ~options 
    64                    ~rate:samples_per_second 
    65                    ~bits:(bytes_per_sample * 8) 
    66                    ~channels:channels () 
    67     in 
    68       (* Wait for things to settle *) 
    69       Thread.delay (5. *. (Fmt.seconds_per_frame ())); 
    70       (* The output loop *) 
    71       try 
    72         while true do 
    73           while write = read do 
    74             Thread.delay ((Fmt.seconds_per_frame ()) /. 2.) ; 
    75             if sleep then raise Exit 
    76           done ; 
    77           if sleep then raise Exit ; 
    78           play device buffer.(read mod nb_blocks); 
    79           read <- (read + 1) mod (2*nb_blocks) 
    80         done 
    81       with Exit -> close device 
     69  method push_block data = 
     70    let dev = self#get_device in 
     71    play dev data 
    8272 
    8373  method output_send wav = 
    84     if read <> write && 
    85        write mod nb_blocks = read mod nb_blocks then 
    86       self#log#f 4 "Reader not ready!" 
    87     else begin 
     74    let push data =  
    8875      ignore (Float_pcm.to_s16le (AFrame.get_float_pcm wav) 0 (AFrame.size wav) 
    89                 buffer.(write mod nb_blocks) 0) ; 
    90       write <- (write + 1) mod (2*nb_blocks) 
    91     end 
     76                data 0)  
     77    in 
     78    ioring#put_block push 
    9279 
    9380  method output_reset = () 
     
    10491      Some "libao driver to use." ; 
    10592 
     93      "buffer_size", 
     94      Lang.int_t, Some (Lang.int 2), 
     95      Some "Set buffer size, in frames."; 
     96 
    10697      "options", 
    10798      Lang.list_t (Lang.product_t Lang.string_t Lang.string_t), 
     
    116107       let start = Lang.to_bool (List.assoc "start" p) in 
    117108       let driver = Lang.to_string (List.assoc "driver" p) in 
     109       let nb_blocks = Lang.to_int (List.assoc "buffer_size" p) in 
    118110       let options = 
    119111         List.map 
     
    124116       in 
    125117       let source = List.assoc "" p in 
    126          ((new output ~driver ~options source start):>Source.source)) 
     118         ((new output ~nb_blocks ~driver  
     119                      ~options source start):>Source.source)) 
  • trunk/liquidsoap/src/outputs/bjack_out.ml

    r6527 r6565  
    2828  let samples_per_frame = Fmt.samples_per_frame () in 
    2929  let samples_per_second = Fmt.samples_per_second () in 
     30  let blank () = 
     31    String.make (samples_per_frame * channels * bytes_per_sample) '0' 
     32  in 
    3033object (self) 
    3134  inherit Output.output ~name:"output.jack" ~kind:"output.jack" source true 
     35  inherit [string] IoRing.output ~nb_blocks ~blank  
     36                                 ~blocking:true () as ioring 
    3237 
    33   (* See sources/alsa_in.ml, it's the same producer/consumer system. *) 
    34   val buffer = Array.create nb_blocks "" 
    35   initializer 
    36     for i = 0 to nb_blocks - 1 do 
    37       buffer.(i) <- String.create (samples_per_frame * channels * bytes_per_sample) 
    38     done 
    39   val mutable read = 0 
    40   val mutable write = 0 
    41  
    42   val mutable sleep = false 
    4338  val mutable device = None 
    44   method output_stop = sleep <- true 
    4539 
    4640  method get_device =  
    4741    match device with 
    4842      | None ->  
     43          (* Wait for things to settle *) 
     44          Thread.delay (5. *. (Fmt.seconds_per_frame ())); 
    4945          let server_name = 
    5046            match server with "" -> None | s -> Some s 
     
    6157      | Some d -> d 
    6258 
    63   method output_start = 
     59  method push_block data =  
    6460    let dev = self#get_device in 
    65     sleep <- false ; 
    66     read <- 0 ; write <- 0 ; 
    67     ignore (Tutils.create (fun () -> self#reader dev) () "jack_playback") 
    68  
    69   method write_block dev data =  
    7061    let len = String.length data in 
    7162    let remaining = ref (len - (Bjack.write dev data)) in 
     
    7768    done 
    7869 
    79   method reader device = 
    80       (* Wait for things to settle *) 
    81       Thread.delay (5. *. (Fmt.seconds_per_frame ())); 
    82       (* The output loop *) 
    83       while not sleep do 
    84         while write = read do 
    85           Thread.delay ((Fmt.seconds_per_frame ()) /. 2.) 
    86         done ; 
    87         let data = buffer.(read mod nb_blocks) in 
    88         self#write_block device data ; 
    89         read <- (read + 1) mod (2*nb_blocks) 
    90       done ; 
    91       Bjack.close device 
     70  method close =  
     71    match device with 
     72      | Some d ->  
     73          Bjack.close d ; 
     74          device <- None 
     75      | None -> () 
    9276 
    9377  method output_send wav = 
    94     if read <> write && 
    95       write mod nb_blocks = read mod nb_blocks then 
    96       self#log#f 4 "Reader not ready!" 
    97     else begin 
     78    let push data = 
    9879      ignore (Float_pcm.to_s16le (AFrame.get_float_pcm wav) 0 (AFrame.size wav) 
    99                 buffer.(write mod nb_blocks) 0) ; 
    100       write <- (write + 1) mod (2*nb_blocks) 
    101     end 
     80                data 0) 
     81    in 
     82    ioring#put_block push 
    10283 
    10384  method output_reset = () 
     
    10889    [ "buffer_size", 
    10990      Lang.int_t, Some (Lang.int 2), 
    110       Some "Set buffer size, in frames. 0 means unbuffered output."; 
     91      Some "Set buffer size, in frames."; 
    11192     "server", 
    11293      Lang.string_t, Some (Lang.string ""), 
  • trunk/liquidsoap/src/sources/alsa_in.ml

    r6344 r6565  
    3030open Source 
    3131 
    32 exception Error of string 
    33  
    34 let log = Dtools.Log.make ["input";"alsa"] 
    35  
    3632class mic device = 
    3733  let buffer_length = Fmt.samples_per_frame () in 
    3834  let buffer_chans = Fmt.channels () in 
    39   (* Only one block is OK, but 10 doesn't cost much and could allow 
    40    * some timing errors to be absorbed.. I can't hear any difference anyway. *) 
    41   let nb_blocks = 10 in 
    42   let blank = Array.init buffer_chans (fun _ -> Array.make buffer_length 0.) in 
     35  let alsa_device = device in 
     36  let nb_blocks = Alsa_settings.conf_buffer_length#get in 
     37  let blank () = Array.init buffer_chans (fun _ -> Array.make buffer_length 0.) in 
    4338object (self) 
    4439  inherit active_source 
     40  inherit [float array array] IoRing.input 
     41      ~nb_blocks ~blank () as ioring 
    4542 
    4643  method stype = Infallible 
     
    5249  val mutable sample_freq = Fmt.samples_per_second () 
    5350 
    54   val buffer = Array.make nb_blocks [||] 
    55   initializer 
    56     for i = 0 to nb_blocks - 1 do 
    57       buffer.(i) <- 
    58         Array.init buffer_chans (fun _ -> Array.make buffer_length 0.) 
    59     done 
    60   val mutable read = 0 
    61   val mutable write = 0 
    62   (* Read and write are stored modulo 2*nb_blocks, 
    63    * because we must be able to distinguish the case where the sched is late 
    64    * from the one where the capture is late. 
    65    * And we don't need more than modulo 2*nb_blocks. *) 
    66  
    6751  val mutable read_fun = 
    6852    (fun pcm buf ofs len -> Pcm.readn_float pcm buf ofs len) 
    6953 
    70   val mutable sleep = false 
    71   method sleep = sleep <- true 
     54  val mutable device = None 
    7255 
    73   method output_get_ready = 
    74     sleep <- false ; 
    75     read <- 0 ; write <- 0 ; 
    76     ignore (Tutils.create (fun () -> self#writer) () "alsa_capture") 
     56  method close = 
     57    match device with 
     58      | Some d -> 
     59          Pcm.close d ; 
     60          device <- None 
     61      | None -> () 
    7762 
    78   method writer = 
    79     self#log#f 3 "Using ALSA %s." (Alsa.get_version ()) ; 
    80     let dev = Pcm.open_pcm device [Pcm.Capture] [] in 
    81     let params = Pcm.get_params dev in 
    82       begin try 
    83         Pcm.set_access dev params Pcm.Access_rw_noninterleaved ; 
    84         Pcm.set_format dev params Pcm.Format_float 
    85       with 
    86         | _ -> 
    87             (* If we can't get floats we fallback on interleaved s16le *) 
    88             self#log#f 2 "Falling back on interleaved S16LE"; 
    89             Pcm.set_access dev params Pcm.Access_rw_interleaved ; 
    90             Pcm.set_format dev params Pcm.Format_s16_le ; 
    91             read_fun <- 
    92               (fun pcm buf ofs len -> 
    93                  let sbuf = String.create (2 * 2 * len) in 
    94                  let r = Pcm.readi pcm sbuf 0 len in 
    95                    Float_pcm.from_s16le buf ofs sbuf 0 r; 
    96                    r) 
    97       end ; 
    98       sample_freq <- 
    99         Pcm.set_rate_near dev params sample_freq Dir_eq; (* TODO: resample *) 
    100       Pcm.set_channels dev params buffer_chans ; 
    101       Pcm.set_params dev params ; 
    102       Pcm.prepare dev ; 
    103       let fill block = 
    104         try 
    105           let pos = ref 0 in 
    106             while !pos < buffer_length do 
    107               let len = buffer_length - !pos in 
    108               let ret = read_fun dev buffer.(block mod nb_blocks) !pos len in 
    109                 assert (ret <= len); 
    110                 pos := !pos + ret; 
    111             done; 
    112         with 
    113           | Unknown_error n -> raise (Error (string_of_error n)) 
    114           | Buffer_xrun -> 
    115               (* Restart. Try again ... *) 
    116               log#f 2 "Overrun!" ; 
    117               Pcm.prepare dev 
    118       in 
    119         (* Fill the first block *) 
    120         fill 0 ; 
    121         write <- 1 ; 
    122         (* Filling loop *) 
    123         while not sleep do 
    124           if 
    125             read <> write && 
    126             write mod nb_blocks = read mod nb_blocks 
    127           then begin 
    128             (* Wait for the reader to read the block we fancy *) 
    129             Thread.delay (Fmt.seconds_per_frame () /. 2.) 
    130           end else begin 
    131             fill write ; write <- (write + 1) mod (2*nb_blocks) 
    132           end 
    133         done 
     63  method get_device =  
     64    match device with 
     65      | Some d -> d  
     66      | None ->  
     67          self#log#f 3 "Using ALSA %s." (Alsa.get_version ()) ; 
     68          let dev = Pcm.open_pcm alsa_device [Pcm.Capture] [] in 
     69          let params = Pcm.get_params dev in 
     70          begin try 
     71            Pcm.set_access dev params Pcm.Access_rw_noninterleaved ; 
     72            Pcm.set_format dev params Pcm.Format_float 
     73          with 
     74            | _ -> 
     75                (* If we can't get floats we fallback on interleaved s16le *) 
     76                self#log#f 2 "Falling back on interleaved S16LE"; 
     77                Pcm.set_access dev params Pcm.Access_rw_interleaved ; 
     78                Pcm.set_format dev params Pcm.Format_s16_le ; 
     79                read_fun <- 
     80                  (fun pcm buf ofs len -> 
     81                     let sbuf = String.create (2 * 2 * len) in 
     82                     let r = Pcm.readi pcm sbuf 0 len in 
     83                     Float_pcm.from_s16le buf ofs sbuf 0 r; 
     84                     r) 
     85          end ; 
     86          sample_freq <- 
     87            Pcm.set_rate_near dev params sample_freq Dir_eq; (* TODO: resample *) 
     88          Pcm.set_channels dev params buffer_chans ; 
     89          Pcm.set_params dev params ; 
     90          Pcm.prepare dev ; 
     91          device <- Some dev; 
     92          dev 
     93 
     94  method pull_block block = 
     95    let dev = self#get_device in  
     96    try 
     97      let pos = ref 0 in 
     98      while !pos < buffer_length do 
     99        let len = buffer_length - !pos in 
     100        let ret = read_fun dev block !pos len in 
     101        assert (ret <= len); 
     102        pos := !pos + ret; 
     103      done; 
     104    with 
     105      | Buffer_xrun -> 
     106          (* Restart. Try again ... *) 
     107          self#log#f 2 "Overrun!" ; 
     108          Pcm.prepare dev 
    134109 
    135110  method get_frame buf = 
    136111    assert (0 = AFrame.position buf) ; 
    137     let buffer = 
    138       (* Check that the writer still has an advance. 
    139        * Otherwise play blank for waiting.. *) 
    140       if write = read then begin 
    141         log#f 2 "No available frame!" ; 
    142         blank 
    143       end else 
    144         let b = buffer.(read mod nb_blocks) in 
    145           read <- (read + 1) mod (2*nb_blocks) ; 
    146           b 
    147     in 
     112    let buffer = ioring#get_block in 
    148113    let fbuf = AFrame.get_float_pcm buf in 
    149114      for c = 0 to Array.length fbuf - 1 do 
  • trunk/liquidsoap/src/sources/bjack_in.ml

    r6527 r6565  
    3030  let samples_per_second = Fmt.samples_per_second () in 
    3131  let bytes_per_sample = 2 in 
    32   let blank = String.make (samples_per_frame * channels * bytes_per_sample) '0' in 
     32  let blank () =  
     33    String.make (samples_per_frame * channels * bytes_per_sample) '0'  
     34  in 
    3335object (self) 
    3436  inherit active_source 
     37  inherit [string] IoRing.input  
     38      ~nb_blocks ~blank () as ioring 
    3539 
    3640  method stype = Infallible 
     
    4145  val mutable sample_freq = Fmt.samples_per_second () 
    4246 
    43   (* See sources/alsa_in.ml, it's the same producer/consumer system. *) 
    44   val buffer = Array.create nb_blocks "" 
    45   initializer 
    46     for i = 0 to nb_blocks - 1 do 
    47       buffer.(i) <-blank 
    48     done 
    49   val mutable read = 0 
    50   val mutable write = 0 
    51   (* Read and write are stored modulo 2*nb_blocks, 
    52    * because we must be able to distinguish the case where the sched is late 
    53    * from the one where the capture is late. 
    54    * And we don't need more than modulo 2*nb_blocks. *) 
     47  val mutable device = None 
    5548 
    56   val mutable sleep = false 
    57   val mutable device = None 
    58   method sleep = sleep <- true 
     49  method close =  
     50    match device with 
     51      | Some d ->  
     52          Bjack.close d ; 
     53          device <- None 
     54      | None -> () 
    5955 
    6056  method private get_device =  
    6157    match device with 
    62       | None ->  
     58      | None -> 
    6359          let server_name =  
    6460            match server with "" -> None | s -> Some s 
     
    7571      | Some d -> d 
    7672 
    77   method output_get_ready = 
    78     let dev = self#get_device in 
    79     sleep <- false ; 
    80     read <- 0 ; write <- 0 ; 
    81     let m = Mutex.create () in 
    82     let c = Condition.create () in 
    83     Mutex.lock m; 
    84     ignore (Tutils.create  
    85          (fun _ -> self#writer dev m c) () "jack_capture") ; 
    86     (* Wait for the first buffer input. *) 
    87     Condition.wait c m; 
    88     Mutex.unlock m 
    89  
    90   method private get_block dev =  
    91         let length = samples_per_frame * channels * bytes_per_sample in 
     73  method private pull_block block = 
     74        let dev = self#get_device in  
     75        let length = String.length block in 
    9276        let ans = ref (Bjack.read dev length) in 
    9377          while String.length !ans < length do 
     
    9781            ans := !ans ^ tmp  
    9882         done; 
    99          !ans 
    100  
    101   method private writer dev m c = 
    102       let fill block = 
    103          buffer.(block mod nb_blocks) <- self#get_block dev  
    104       in 
    105         (* Fill the first block *) 
    106         fill 0 ; 
    107         write <- 1 ; 
    108         Mutex.lock m ; 
    109         Condition.signal c; 
    110         Mutex.unlock m; 
    111         (* Filling loop *) 
    112         while not sleep do 
    113           if read <> write && 
    114              write mod nb_blocks = read mod nb_blocks then begin 
    115                (* Wait for the reader to read the block we fancy *) 
    116                Thread.delay (Fmt.seconds_per_frame () /. 2.) 
    117           end else 
    118             ( fill write ; write <- (write + 1) mod (2*nb_blocks) ) 
    119         done 
     83         String.blit !ans 0 block 0 length 
    12084 
    12185  method private get_frame buf = 
    12286    assert (0 = AFrame.position buf) ; 
    123     let buffer = 
    124       (* Check that the writer still has an advance. 
    125        * Otherwise play blank for waiting.. *) 
    126       if write = read then begin 
    127         log#f 4 "No available frame!" ; 
    128         blank 
    129       end else 
    130         let b = buffer.(read mod nb_blocks) in 
    131           read <- (read + 1) mod (2*nb_blocks) ; 
    132           b 
    133     in 
     87    let buffer = ioring#get_block in 
    13488    let fbuf = AFrame.get_float_pcm buf in 
    13589      Float_pcm.from_s16le fbuf 0 buffer 0 samples_per_frame ; 
  • trunk/liquidsoap/src/stream/float_pcm.mli

    r6344 r6565  
    3939(** Multiply samplerate by the given ratio. *) 
    4040val native_resample : float -> float array -> int -> int -> float array 
     41 
     42(** Blit float array array *) 
     43val float_blit : float array -> int -> float array -> int -> int -> unit 
    4144 
    4245(** {2 Generators} *)