class SizedQueue - RDoc Documentation (original) (raw)

This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.

See Queue for an example of how a SizedQueue works.

Public Class Methods

new(max) click to toggle source

Creates a fixed-length queue with a maximum size of max.

static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax) { long max; struct rb_szqueue *sq = szqueue_ptr(self);

max = NUM2LONG(vmax);
if (max <= 0) {
    rb_raise(rb_eArgError, "queue size must be positive");
}

RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
sq->max = max;

return self;

}

Public Instance Methods

<<(object)

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn't suspended, and ThreadError is raised.

clear() click to toggle source

Removes all objects from the queue.

static VALUE rb_szqueue_clear(VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self);

rb_ary_clear(check_array(self, sq->q.que));
wakeup_all(szqueue_pushq(sq));
return self;

}

close click to toggle source

Similar to Queue#close.

The difference is behavior with waiting enqueuing threads.

If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError('queue closed').

static VALUE rb_szqueue_close(VALUE self) { if (!queue_closed_p(self)) { struct rb_szqueue *sq = szqueue_ptr(self);

    FL_SET(self, QUEUE_CLOSED);
    wakeup_all(szqueue_waitq(sq));
    wakeup_all(szqueue_pushq(sq));
}
return self;

}

deq(non_block=false)

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

Alias for: pop

empty? click to toggle source

Returns true if the queue is empty.

static VALUE rb_szqueue_empty_p(VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self);

return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;

}

enq(object, non_block=false)

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn't suspended, and ThreadError is raised.

length click to toggle source

Returns the length of the queue.

static VALUE rb_szqueue_length(VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self);

return LONG2NUM(queue_length(self, &sq->q));

}

Also aliased as: size

max() click to toggle source

Returns the maximum size of the queue.

static VALUE rb_szqueue_max_get(VALUE self) { return LONG2NUM(szqueue_ptr(self)->max); }

max=(number) click to toggle source

Sets the maximum size of the queue to the given number.

static VALUE rb_szqueue_max_set(VALUE self, VALUE vmax) { long max = NUM2LONG(vmax); long diff = 0; struct rb_szqueue *sq = szqueue_ptr(self);

if (max <= 0) {
    rb_raise(rb_eArgError, "queue size must be positive");
}
if (max > sq->max) {
    diff = max - sq->max;
}
sq->max = max;
sync_wakeup(szqueue_pushq(sq), diff);
return vmax;

}

num_waiting() click to toggle source

Returns the number of threads waiting on the queue.

static VALUE rb_szqueue_num_waiting(VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self);

return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);

}

pop(non_block=false) click to toggle source

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

static VALUE rb_szqueue_pop(int argc, VALUE *argv, VALUE self) { int should_block = queue_pop_should_block(argc, argv); return szqueue_do_pop(self, should_block); }

push(object, non_block=false) click to toggle source

Pushes object to the queue.

If there is no space left in the queue, waits until space becomes available, unless non_block is true. If non_block is true, the thread isn't suspended, and ThreadError is raised.

static VALUE rb_szqueue_push(int argc, VALUE *argv, VALUE self) { struct rb_szqueue *sq = szqueue_ptr(self); int should_block = szqueue_push_should_block(argc, argv);

while (queue_length(self, &sq->q) >= sq->max) {
    if (!should_block) {
        rb_raise(rb_eThreadError, "queue full");
    }
    else if (queue_closed_p(self)) {
        break;
    }
    else {
        rb_execution_context_t *ec = GET_EC();
        COROUTINE_STACK_LOCAL(struct queue_waiter, qw);
        struct list_head *pushq = szqueue_pushq(sq);

        qw->w.self = self;
        qw->w.th = ec->thread_ptr;
        qw->w.fiber = ec->fiber_ptr;

        qw->as.sq = sq;
        list_add_tail(pushq, &qw->w.node);
        sq->num_waiting_push++;

        rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw);
    }
}

if (queue_closed_p(self)) {
    raise_closed_queue_error(self);
}

return queue_do_push(self, &sq->q, argv[0]);

}

Also aliased as: enq, <<

shift(non_block=false)

Retrieves data from the queue.

If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

Alias for: pop

size

Returns the length of the queue.