Changeset 6649
- Timestamp:
- 06/18/09 17:43:47 (9 months ago)
- Location:
- branches/bugfix-0.9.1
- Files:
-
- 7 modified
-
CHANGES (modified) (1 diff)
-
src/sources/playlist.ml (modified) (3 diffs)
-
src/sources/req_equeue.ml (modified) (2 diffs)
-
src/sources/req_queue.ml (modified) (3 diffs)
-
src/sources/req_simple.ml (modified) (6 diffs)
-
src/sources/request_source.ml (modified) (13 diffs)
-
src/sources/request_source.mli (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
branches/bugfix-0.9.1/CHANGES
r6648 r6649 15 15 list of headers given by the connected source (#266). 16 16 - Added on_end operator, to execute a handler when a track ends. 17 - Added estimated remaining time in the queue length for request-driven 18 sources (request.{equeue,queue}, playlist). This allows these sources 19 to prepare less files in advance. In particular, primary queue may 20 only contain the file currently played. Default behaviour has been 21 set to the old behaviour, and a conservative option has 22 been added to switch to the new behaviour. New behavivour 23 will be the default for the next release. 24 fixes #169, references #146 17 25 18 26 0.9.0 (01-03-2009) -
branches/bugfix-0.9.1/src/sources/playlist.ml
r6344 r6649 367 367 ~random ~length 368 368 ~default_duration 369 ~timeout ~prefix 369 ~timeout ~prefix 370 ~conservative 370 371 uri = 371 372 object … … 376 377 inherit Request_source.queued 377 378 ~length ~default_duration 378 ~timeout () as super379 ~timeout ~conservative () as super 379 380 380 381 method reload_playlist_internal a b c = … … 514 515 (Lang.to_string (e "prefix")) 515 516 in 516 let length,default_duration,timeout =517 let length,default_duration,timeout,conservative = 517 518 Request_source.extract_queued_params params 518 519 in 519 520 ((new playlist ~mime ~reload ~prefix 520 521 ~length ~default_duration 521 ~timeout ~random uri):>source)) ;522 ~timeout ~random ~conservative uri):>source)) ; 522 523 523 524 Lang.add_operator "playlist.safe" -
branches/bugfix-0.9.1/src/sources/req_equeue.ml
r6344 r6649 30 30 (** [telnet_request] is an [external_request] which queue is feed by 31 31 * requests made over the network using the Server interface. *) 32 class equeue length default_duration timeout =32 class equeue length default_duration timeout conservative = 33 33 object (self) 34 inherit Request_source.queued ~length ~default_duration ~timeout () as super 34 inherit Request_source.queued ~length ~default_duration 35 ~timeout ~conservative () as super 35 36 36 37 val queue = Rqueue.create () … … 154 155 "Insertion and deletion possible at any position.") 155 156 Request_source.queued_proto 156 (fun p -> let l,d,t = Request_source.extract_queued_params p in 157 ((new equeue l d t) :> source)) 157 (fun p -> 158 let l,d,t,c = Request_source.extract_queued_params p in 159 ((new equeue l d t c) :> source)) -
branches/bugfix-0.9.1/src/sources/req_queue.ml
r6344 r6649 35 35 * user emits a request. *) 36 36 class queue ?(requests=Queue.create()) ?(interactive=true) 37 length default_duration timeout =37 length default_duration timeout conservative = 38 38 object (self) 39 inherit Request_source.queued ~length ~default_duration ~timeout () as queued 39 inherit Request_source.queued ~length ~default_duration 40 ~timeout ~conservative () as queued 40 41 41 42 val reqlock = Mutex.create () … … 179 180 Request_source.queued_proto) 180 181 (fun p -> 181 let l,d,t = Request_source.extract_queued_params p in182 let l,d,t,c = Request_source.extract_queued_params p in 182 183 let interactive = Lang.to_bool (Lang.assoc "interactive" 1 p) in 183 184 let requests = Queue.create () in … … 187 188 | None -> ()) 188 189 (Lang.to_list (List.assoc "queue" p)) ; 189 ((new queue ~requests ~interactive l d t ) :> source))190 ((new queue ~requests ~interactive l d t c) :> source)) -
branches/bugfix-0.9.1/src/sources/req_simple.ml
r6344 r6649 41 41 end 42 42 43 class queued uri length default_duration timeout =43 class queued uri length default_duration timeout conservative = 44 44 object (self) 45 inherit Request_source.queued ~length ~default_duration ~timeout () 45 inherit Request_source.queued 46 ~length ~default_duration ~conservative ~timeout () 46 47 method get_next_request = self#create_request uri 47 48 end … … 60 61 (fun p -> 61 62 let val_uri = List.assoc "" p in 62 let l,d,t = extract_queued_params p in63 let l,d,t,c = extract_queued_params p in 63 64 let uri = Lang.to_string val_uri in 64 65 try match … … 77 78 * that we ever run out of RID there... *) 78 79 log#f 2 "No available RID: %S will be queued.." uri ; 79 ((new queued uri l d t ) :> source)80 ((new queued uri l d t c) :> source) 80 81 | Some r -> 81 82 log#f 3 "%S is static, resolving once for all..." uri ; … … 86 87 | None | Some false -> 87 88 log#f 3 "%S will be queued." uri ; 88 ((new queued uri l d t ) :> source)89 ((new queued uri l d t c) :> source) 89 90 with 90 91 | Invalid_URI -> … … 106 107 ((new unqueued r):>source)) 107 108 108 class dynamic (f:Lang.value) length default_duration timeout = 109 class dynamic (f:Lang.value) length default_duration 110 timeout conservative = 109 111 object (self) 110 inherit Request_source.queued ~length ~default_duration ~timeout () 112 inherit Request_source.queued ~length ~default_duration 113 ~timeout ~conservative () 111 114 method get_next_request = 112 115 try … … 132 135 (fun p -> 133 136 let f = List.assoc "" p in 134 let l,d,t = extract_queued_params p in135 ((new dynamic f l d t ) :> source))137 let l,d,t,c = extract_queued_params p in 138 ((new dynamic f l d t c) :> source)) -
branches/bugfix-0.9.1/src/sources/request_source.ml
r6640 r6649 79 79 let file = Utils.get_some (Request.get_filename req) in 80 80 let decoder = Utils.get_some (Request.get_decoder req) in 81 self#log#f 3 "Prepared %S -- rid%d" file (Request.get_id req) ;81 self#log#f 3 "Prepared %S -- RID %d" file (Request.get_id req) ; 82 82 current <- 83 83 Some (req, … … 155 155 * - if the duration of a file is unknown we use [default_duration] seconds 156 156 * - downloading a file is required to take less than [timeout] seconds *) 157 class virtual queued ?(length=60.) ?(default_duration=30.) ?(timeout=20.) () = 157 class virtual queued 158 ?(length=60.) ?(default_duration=30.) 159 ?(conservative=true) ?(timeout=20.) () = 158 160 object (self) 159 161 inherit unqueued as super 162 163 method stype = Fallible 160 164 161 165 method virtual get_next_request : Request.audio Request.t option … … 165 169 val qlock = Mutex.create () 166 170 val retrieved = Queue.create () 167 val mutable queue_length = 0 (* Frames *)171 val mutable queue_length = 0 (* Ticks *) 168 172 val mutable resolving = None 173 174 method private available_length = 175 if conservative then 176 queue_length 177 else 178 let remaining = self#remaining in 179 if remaining < 0 then -1 else 180 queue_length + remaining 169 181 170 182 (** State should be `Sleeping on awakening, and is then turned to `Running. 171 183 * Eventually #sleep puts it to `Tired, then waits for it to be `Sleeping, 172 * meaning that the feeding task sexited. *)184 * meaning that the feeding task exited. *) 173 185 val mutable state = `Sleeping 174 186 val state_lock = Mutex.create () … … 182 194 assert (state = `Sleeping) ; 183 195 state <- `Running) () ; 184 let t = 185 Duppy.Async.add Tutils.scheduler ~priority 186 self#feed_queue 196 let t = 197 Duppy.Async.add Tutils.scheduler ~priority self#feed_queue 187 198 in 188 199 Duppy.Async.wake_up t ; … … 190 201 191 202 method private sleep = 192 (* Ask the feeding task to die. *) 203 (* We need to be sure that the feeding task stopped filling the queue 204 * before we destroy all requests from that queue. 205 * Async.stop only promises us that on the next round the task will 206 * stop but won't tell us if it's currently resolving a file or not. 207 * So we first put the queue into an harmless state: we put the state 208 * to `Tired and wait for it to acknowledge it by setting it to 209 * `Sleeping. *) 193 210 Tutils.mutexify state_lock 194 211 (fun () -> 195 212 assert (state = `Running) ; 196 213 state <- `Tired) () ; 197 (* Make sure the task is awake so it can die and let us know about it. *)214 (* Make sure the task is awake so that it can see our signal. *) 198 215 Duppy.Async.wake_up (Utils.get_some task) ; 216 self#log#f 4 "Waiting for feeding task to stop..." ; 199 217 Tutils.wait state_cond state_lock (fun () -> state = `Sleeping) ; 200 218 Duppy.Async.stop (Utils.get_some task) ; … … 202 220 (* No more feeding task, we can go to sleep. *) 203 221 super#sleep ; 222 self#log#f 4 "Cleaning up request queue..." ; 204 223 begin try 205 224 Mutex.lock qlock ; … … 213 232 * opportunity to feed the queue, in case it is sleeping. *) 214 233 method private notify_new_request = 215 (* Don't wake up the task while we're trying to shut down, 216 * it could avoid its death and run forever in the wild. *) 217 if Tutils.mutexify state_lock (fun () -> state) () = `Running then 218 match task with 219 | Some task -> Duppy.Async.wake_up task 220 | None -> () 234 (* Avoid trying to wake up the task during the shutdown process 235 * where it might have been stopped already, in which case we'd 236 * get an exception. *) 237 Tutils.mutexify state_lock 238 (fun () -> 239 if state = `Running then 240 Duppy.Async.wake_up (Utils.get_some task)) () 221 241 222 242 (** A function that returns delays for tasks, making sure that these tasks … … 239 259 240 260 (** The body of the feeding task *) 241 method private feed_queue () : float = 242 (* If the test fails, the task sleeps. *) 261 method private feed_queue () = 243 262 if 263 (* Is the source running? And does it need prefetching? 264 * If the test fails, the task sleeps. *) 244 265 Tutils.mutexify state_lock 245 (fun () -> 246 if state <> `Tired then true else begin 247 state <- `Sleeping ; 248 Condition.signal state_cond ; 249 false 250 end) () 266 (fun () -> match state with 267 | `Running -> true 268 | `Tired -> 269 state <- `Sleeping ; 270 Condition.signal state_cond ; 271 false 272 | `Sleeping -> assert false) () && 273 self#available_length < min_queue_length 251 274 then 252 if queue_length < min_queue_length then253 match self#prefetch with254 | Finished -> 0.255 | Retry -> adaptative_delay ()256 | Empty -> (-1.)257 else(-1.)258 else (-1.) 259 260 (** Try to feed the queue with a new request.261 * Ret urn false if there was no new request to try,262 * true otherwise, whether the request was fetched successfully or not. *)275 match self#prefetch with 276 | Finished -> 0. 277 | Retry -> adaptative_delay () 278 | Empty -> (-1.) 279 else 280 (-1.) 281 282 (** Try to feed the queue with a new request. Return a resolution status: 283 * Empty if there was no new request to try, 284 * Retry if there was a new one but it failed to be resolved, 285 * Finished if all went OK. *) 263 286 method private prefetch = 264 287 match self#get_next_request with … … 279 302 Mutex.lock qlock ; 280 303 Queue.add (len,req) retrieved ; 281 self#log#f 4 "queue length %d+=%d (rid %d)" 282 queue_length len (Request.get_id req) ; 304 self#log#f 4 305 "Remaining: %d, queued: %d, adding: %d (RID %d)" 306 self#remaining queue_length len (Request.get_id req) ; 283 307 queue_length <- queue_length + len ; 284 308 Mutex.unlock qlock ; … … 298 322 try 299 323 let len,f = Queue.take retrieved in 300 self#log#f 4 "queue length %d-=%d" queue_length len ; 324 self#log#f 4 325 "Remaining: %d, queued: %d, taking: %d" 326 self#remaining queue_length len ; 301 327 queue_length <- queue_length - len ; 302 328 Some f … … 307 333 in 308 334 Mutex.unlock qlock ; 309 self#notify_new_request ; 335 (* A request has been taken off the queue, there is a chance 336 * that the queue should be refilled: awaken the feeding task. 337 * However, we can wait that this file is played, and this need 338 * will be noticed in #get_frame. 339 * This is critical in non-conservative mode because at this point 340 * remaining is still 0 (we're inside #begin_track) which would 341 * lead to too early prefetching; if we wait that a frame has been 342 * produced, we'll get the first non-infinite remaining time 343 * estimations. *) 310 344 ans 345 346 method private get_frame ab = 347 super#get_frame ab ; 348 (* At an end of track, we always have unqueued#remaining=0, 349 * so there's nothing special to do. *) 350 if self#available_length < min_queue_length then 351 self#notify_new_request 311 352 312 353 method copy_queue = … … 329 370 330 371 let queued_proto = 331 [ "length", Lang.float_t, Some (Lang.float 60.),372 [ "length", Lang.float_t, Some (Lang.float 10.), 332 373 Some "How much audio (in sec.) should be downloaded in advance." ; 333 374 "default_duration", Lang.float_t, Some (Lang.float 30.), 334 375 Some "When unknown, assume this duration (in sec.) for files." ; 376 "conservative", Lang.bool_t, Some (Lang.bool true), 377 Some "If true, estimated remaining time on the current track \ 378 is not considered when computing queue length." ; 335 379 "timeout", Lang.float_t, Some (Lang.float 20.), 336 380 Some "Timeout (in sec.) for a single download." ] … … 340 384 let d = Lang.to_float (List.assoc "default_duration" p) in 341 385 let t = Lang.to_float (List.assoc "timeout" p) in 342 l,d,t 386 let c = Lang.to_bool (List.assoc "conservative" p) in 387 l,d,t,c -
branches/bugfix-0.9.1/src/sources/request_source.mli
r6344 r6649 38 38 39 39 class virtual queued : 40 ?length:float -> ?default_duration:float -> ?timeout:float -> unit -> 40 ?length:float -> ?default_duration:float -> ?conservative:bool -> 41 ?timeout:float -> unit -> 41 42 object 42 43 method copy_queue : Request.audio Request.t list 44 45 method stype : Source.source_t 43 46 44 47 (** You should only define this. *) … … 56 59 57 60 val queued_proto : Lang.proto 58 val extract_queued_params : Lang.env -> float*float*float 61 val extract_queued_params : Lang.env -> float*float*float*bool
