Changeset 6649

Show
Ignore:
Timestamp:
06/18/09 17:43:47 (9 months ago)
Author:
metamorph68
Message:

Backported changes adding remaining time in request-based sources.
Set default options to the old behaviour.

Location:
branches/bugfix-0.9.1
Files:
7 modified

Legend:

Unmodified
Added
Removed
  • branches/bugfix-0.9.1/CHANGES

    r6648 r6649  
    1515   list of headers given by the connected source (#266). 
    1616 - Added on_end operator, to execute a handler when a track ends. 
     17 - Added estimated remaining time in the queue length for request-driven 
     18   sources (request.{equeue,queue}, playlist). This allows these sources 
     19   to prepare less files in advance. In particular, primary queue may 
     20   only contain the file currently played. Default behaviour has been 
     21   set to the old behaviour, and a conservative option has 
     22   been added to switch to the new behaviour. New behavivour 
     23   will be the default for the next release. 
     24   fixes #169, references #146 
    1725 
    18260.9.0 (01-03-2009) 
  • branches/bugfix-0.9.1/src/sources/playlist.ml

    r6344 r6649  
    367367               ~random ~length  
    368368               ~default_duration 
    369                ~timeout ~prefix  
     369               ~timeout ~prefix 
     370               ~conservative  
    370371               uri = 
    371372object 
     
    376377  inherit Request_source.queued  
    377378            ~length ~default_duration  
    378             ~timeout () as super 
     379            ~timeout ~conservative () as super 
    379380 
    380381  method reload_playlist_internal a b c = 
     
    514515             (Lang.to_string (e "prefix")) 
    515516         in 
    516          let length,default_duration,timeout =  
     517         let length,default_duration,timeout,conservative =  
    517518                Request_source.extract_queued_params params  
    518519         in 
    519520           ((new playlist ~mime ~reload ~prefix  
    520521                          ~length ~default_duration 
    521                           ~timeout ~random uri):>source)) ; 
     522                          ~timeout ~random ~conservative uri):>source)) ; 
    522523 
    523524    Lang.add_operator "playlist.safe" 
  • branches/bugfix-0.9.1/src/sources/req_equeue.ml

    r6344 r6649  
    3030(** [telnet_request] is an [external_request] which queue is feed by 
    3131  * requests made over the network using the Server interface. *) 
    32 class equeue length default_duration timeout = 
     32class equeue length default_duration timeout conservative = 
    3333object (self) 
    34   inherit Request_source.queued ~length ~default_duration ~timeout () as super 
     34  inherit Request_source.queued ~length ~default_duration  
     35     ~timeout ~conservative () as super 
    3536 
    3637  val queue = Rqueue.create () 
     
    154155            "Insertion and deletion possible at any position.") 
    155156    Request_source.queued_proto 
    156     (fun p -> let l,d,t = Request_source.extract_queued_params p in 
    157                 ((new equeue l d t) :> source)) 
     157    (fun p -> 
     158       let l,d,t,c = Request_source.extract_queued_params p in 
     159         ((new equeue l d t c) :> source)) 
  • branches/bugfix-0.9.1/src/sources/req_queue.ml

    r6344 r6649  
    3535  * user emits a request. *) 
    3636class queue ?(requests=Queue.create()) ?(interactive=true) 
    37             length default_duration timeout = 
     37            length default_duration timeout conservative = 
    3838object (self) 
    39   inherit Request_source.queued ~length ~default_duration ~timeout () as queued 
     39  inherit Request_source.queued ~length ~default_duration  
     40     ~timeout ~conservative () as queued 
    4041 
    4142  val reqlock = Mutex.create () 
     
    179180     Request_source.queued_proto) 
    180181    (fun p -> 
    181        let l,d,t = Request_source.extract_queued_params p in 
     182       let l,d,t,c = Request_source.extract_queued_params p in 
    182183       let interactive = Lang.to_bool (Lang.assoc "interactive" 1 p) in 
    183184       let requests = Queue.create () in 
     
    187188              | None -> ()) 
    188189           (Lang.to_list (List.assoc "queue" p)) ; 
    189          ((new queue ~requests ~interactive l d t) :> source)) 
     190         ((new queue ~requests ~interactive l d t c) :> source)) 
  • branches/bugfix-0.9.1/src/sources/req_simple.ml

    r6344 r6649  
    4141end 
    4242 
    43 class queued uri length default_duration timeout = 
     43class queued uri length default_duration timeout conservative = 
    4444object (self) 
    45   inherit Request_source.queued ~length ~default_duration ~timeout () 
     45  inherit Request_source.queued  
     46      ~length ~default_duration ~conservative ~timeout () 
    4647  method get_next_request = self#create_request uri 
    4748end 
     
    6061    (fun p -> 
    6162       let val_uri = List.assoc "" p in 
    62        let l,d,t = extract_queued_params p in 
     63       let l,d,t,c = extract_queued_params p in 
    6364       let uri = Lang.to_string val_uri in 
    6465         try match 
     
    7778                      * that we ever run out of RID there... *) 
    7879                     log#f 2 "No available RID: %S will be queued.." uri ; 
    79                      ((new queued uri l d t) :> source) 
     80                     ((new queued uri l d t c) :> source) 
    8081                 | Some r -> 
    8182                     log#f 3 "%S is static, resolving once for all..." uri ; 
     
    8687           | None | Some false -> 
    8788               log#f 3 "%S will be queued." uri ; 
    88                ((new queued uri l d t) :> source) 
     89               ((new queued uri l d t c) :> source) 
    8990         with 
    9091           | Invalid_URI -> 
     
    106107         ((new unqueued r):>source)) 
    107108 
    108 class dynamic (f:Lang.value) length default_duration timeout = 
     109class dynamic (f:Lang.value) length default_duration  
     110               timeout conservative = 
    109111object (self) 
    110   inherit Request_source.queued ~length ~default_duration ~timeout () 
     112  inherit Request_source.queued ~length ~default_duration  
     113       ~timeout ~conservative () 
    111114  method get_next_request = 
    112115    try 
     
    132135    (fun p -> 
    133136       let f = List.assoc "" p in 
    134        let l,d,t = extract_queued_params p in 
    135          ((new dynamic f l d t) :> source)) 
     137       let l,d,t,c = extract_queued_params p in 
     138         ((new dynamic f l d t c) :> source)) 
  • branches/bugfix-0.9.1/src/sources/request_source.ml

    r6640 r6649  
    7979          let file = Utils.get_some (Request.get_filename req) in 
    8080          let decoder = Utils.get_some (Request.get_decoder req) in 
    81             self#log#f 3 "Prepared %S -- rid %d" file (Request.get_id req) ; 
     81            self#log#f 3 "Prepared %S -- RID %d" file (Request.get_id req) ; 
    8282            current <- 
    8383              Some (req, 
     
    155155  * - if the duration of a file is unknown we use [default_duration] seconds 
    156156  * - downloading a file is required to take less than [timeout] seconds *) 
    157 class virtual queued ?(length=60.) ?(default_duration=30.) ?(timeout=20.) () = 
     157class virtual queued 
     158  ?(length=60.) ?(default_duration=30.) 
     159  ?(conservative=true) ?(timeout=20.) () = 
    158160object (self) 
    159161  inherit unqueued as super 
     162 
     163  method stype = Fallible 
    160164 
    161165  method virtual get_next_request : Request.audio Request.t option 
     
    165169  val qlock = Mutex.create () 
    166170  val retrieved = Queue.create () 
    167   val mutable queue_length = 0 (* Frames *) 
     171  val mutable queue_length = 0 (* Ticks *) 
    168172  val mutable resolving = None 
     173 
     174  method private available_length = 
     175    if conservative then 
     176      queue_length 
     177    else 
     178      let remaining = self#remaining in 
     179        if remaining < 0 then -1 else 
     180          queue_length + remaining 
    169181 
    170182  (** State should be `Sleeping on awakening, and is then turned to `Running. 
    171183    * Eventually #sleep puts it to `Tired, then waits for it to be `Sleeping, 
    172     * meaning that the feeding tasks exited. *) 
     184    * meaning that the feeding task exited. *) 
    173185  val mutable state = `Sleeping 
    174186  val state_lock = Mutex.create () 
     
    182194         assert (state = `Sleeping) ; 
    183195         state <- `Running) () ; 
    184     let t =  
    185       Duppy.Async.add Tutils.scheduler ~priority 
    186         self#feed_queue 
     196    let t = 
     197      Duppy.Async.add Tutils.scheduler ~priority self#feed_queue 
    187198    in 
    188199      Duppy.Async.wake_up t ; 
     
    190201 
    191202  method private sleep = 
    192     (* Ask the feeding task to die. *) 
     203    (* We need to be sure that the feeding task stopped filling the queue 
     204     * before we destroy all requests from that queue. 
     205     * Async.stop only promises us that on the next round the task will 
     206     * stop but won't tell us if it's currently resolving a file or not. 
     207     * So we first put the queue into an harmless state: we put the state 
     208     * to `Tired and wait for it to acknowledge it by setting it to 
     209     * `Sleeping. *) 
    193210    Tutils.mutexify state_lock 
    194211      (fun () -> 
    195212         assert (state = `Running) ; 
    196213         state <- `Tired) () ; 
    197     (* Make sure the task is awake so it can die and let us know about it. *) 
     214    (* Make sure the task is awake so that it can see our signal. *) 
    198215    Duppy.Async.wake_up (Utils.get_some task) ; 
     216    self#log#f 4 "Waiting for feeding task to stop..." ; 
    199217    Tutils.wait state_cond state_lock (fun () -> state = `Sleeping) ; 
    200218    Duppy.Async.stop (Utils.get_some task) ; 
     
    202220    (* No more feeding task, we can go to sleep. *) 
    203221    super#sleep ; 
     222    self#log#f 4 "Cleaning up request queue..." ; 
    204223    begin try 
    205224      Mutex.lock qlock ; 
     
    213232    * opportunity to feed the queue, in case it is sleeping. *) 
    214233  method private notify_new_request = 
    215     (* Don't wake up the task while we're trying to shut down, 
    216      * it could avoid its death and run forever in the wild. *) 
    217     if Tutils.mutexify state_lock (fun () -> state) () = `Running then 
    218       match task with 
    219         | Some task -> Duppy.Async.wake_up task 
    220         | None -> () 
     234    (* Avoid trying to wake up the task during the shutdown process 
     235     * where it might have been stopped already, in which case we'd 
     236     * get an exception. *) 
     237    Tutils.mutexify state_lock 
     238      (fun () -> 
     239         if state = `Running then 
     240           Duppy.Async.wake_up (Utils.get_some task)) () 
    221241 
    222242  (** A function that returns delays for tasks, making sure that these tasks 
     
    239259 
    240260  (** The body of the feeding task *) 
    241   method private feed_queue () : float = 
    242     (* If the test fails, the task sleeps. *) 
     261  method private feed_queue () = 
    243262    if 
     263      (* Is the source running? And does it need prefetching? 
     264       * If the test fails, the task sleeps. *) 
    244265      Tutils.mutexify state_lock 
    245         (fun () -> 
    246            if state <> `Tired then true else begin 
    247              state <- `Sleeping ; 
    248              Condition.signal state_cond ; 
    249              false 
    250            end) () 
     266        (fun () -> match state with 
     267           | `Running -> true 
     268           | `Tired -> 
     269               state <- `Sleeping ; 
     270               Condition.signal state_cond ; 
     271               false 
     272           | `Sleeping -> assert false) () && 
     273      self#available_length < min_queue_length 
    251274    then 
    252       if queue_length < min_queue_length then 
    253         match self#prefetch with 
    254           | Finished -> 0. 
    255           | Retry -> adaptative_delay () 
    256           | Empty -> (-1.) 
    257       else (-1.) 
    258     else (-1.) 
    259  
    260   (** Try to feed the queue with a new request. 
    261     * Return false if there was no new request to try, 
    262     * true otherwise, whether the request was fetched successfully or not. *) 
     275       match self#prefetch with 
     276         | Finished -> 0. 
     277         | Retry -> adaptative_delay () 
     278         | Empty -> (-1.) 
     279    else 
     280      (-1.) 
     281 
     282  (** Try to feed the queue with a new request. Return a resolution status: 
     283    * Empty if there was no new request to try, 
     284    * Retry if there was a new one but it failed to be resolved, 
     285    * Finished if all went OK. *) 
    263286  method private prefetch = 
    264287    match self#get_next_request with 
     
    279302                  Mutex.lock qlock ; 
    280303                  Queue.add (len,req) retrieved ; 
    281                   self#log#f 4 "queue length %d+=%d (rid %d)" 
    282                     queue_length len (Request.get_id req) ; 
     304                  self#log#f 4 
     305                    "Remaining: %d, queued: %d, adding: %d (RID %d)" 
     306                    self#remaining queue_length len (Request.get_id req) ; 
    283307                  queue_length <- queue_length + len ; 
    284308                  Mutex.unlock qlock ; 
     
    298322      try 
    299323        let len,f = Queue.take retrieved in 
    300           self#log#f 4 "queue length %d-=%d" queue_length len ; 
     324          self#log#f 4 
     325            "Remaining: %d, queued: %d, taking: %d" 
     326            self#remaining queue_length len ; 
    301327          queue_length <- queue_length - len ; 
    302328          Some f 
     
    307333    in 
    308334      Mutex.unlock qlock ; 
    309       self#notify_new_request ; 
     335      (* A request has been taken off the queue, there is a chance 
     336       * that the queue should be refilled: awaken the feeding task. 
     337       * However, we can wait that this file is played, and this need 
     338       * will be noticed in #get_frame. 
     339       * This is critical in non-conservative mode because at this point 
     340       * remaining is still 0 (we're inside #begin_track) which would 
     341       * lead to too early prefetching; if we wait that a frame has been 
     342       * produced, we'll get the first non-infinite remaining time 
     343       * estimations. *) 
    310344      ans 
     345 
     346  method private get_frame ab = 
     347    super#get_frame ab ; 
     348    (* At an end of track, we always have unqueued#remaining=0, 
     349     * so there's nothing special to do. *) 
     350    if self#available_length < min_queue_length then 
     351      self#notify_new_request 
    311352 
    312353  method copy_queue = 
     
    329370 
    330371let queued_proto = 
    331   [ "length", Lang.float_t, Some (Lang.float 60.), 
     372  [ "length", Lang.float_t, Some (Lang.float 10.), 
    332373    Some "How much audio (in sec.) should be downloaded in advance." ; 
    333374    "default_duration", Lang.float_t, Some (Lang.float 30.), 
    334375    Some "When unknown, assume this duration (in sec.) for files." ; 
     376    "conservative", Lang.bool_t, Some (Lang.bool true), 
     377    Some "If true, estimated remaining time on the current track \ 
     378          is not considered when computing queue length." ; 
    335379    "timeout", Lang.float_t, Some (Lang.float 20.), 
    336380    Some "Timeout (in sec.) for a single download." ] 
     
    340384  let d = Lang.to_float (List.assoc "default_duration" p) in 
    341385  let t = Lang.to_float (List.assoc "timeout" p) in 
    342     l,d,t 
     386  let c = Lang.to_bool (List.assoc "conservative" p) in 
     387    l,d,t,c 
  • branches/bugfix-0.9.1/src/sources/request_source.mli

    r6344 r6649  
    3838 
    3939class virtual queued : 
    40   ?length:float -> ?default_duration:float -> ?timeout:float -> unit -> 
     40  ?length:float -> ?default_duration:float -> ?conservative:bool ->  
     41  ?timeout:float -> unit -> 
    4142object 
    4243  method copy_queue : Request.audio Request.t list 
     44 
     45  method stype : Source.source_t 
    4346 
    4447  (** You should only define this. *) 
     
    5659 
    5760val queued_proto : Lang.proto 
    58 val extract_queued_params : Lang.env -> float*float*float 
     61val extract_queued_params : Lang.env -> float*float*float*bool