Merge #3557 into 3.5.9 · reactor/reactor-core@a7cacc4 (original) (raw)

1

1

`/*

`

2

``

`-

`

``

2

`+

`

3

3

` *

`

4

4

` * Licensed under the Apache License, Version 2.0 (the "License");

`

5

5

` * you may not use this file except in compliance with the License.

`

`@@ -268,7 +268,7 @@ void drainLoop() {

`

268

268

``

269

269

`@Override

`

270

270

`public FluxSink onRequest(LongConsumer consumer) {

`

271

``

`-

sink.onRequest(consumer, consumer, sink.requested);

`

``

271

`+

sink.onPushPullRequest(consumer);

`

272

272

`return this;

`

273

273

` }

`

274

274

``

`@@ -409,6 +409,8 @@ static abstract class BaseSink extends AtomicBoolean

`

409

409

`static final Disposable TERMINATED = OperatorDisposables.DISPOSED;

`

410

410

`static final Disposable CANCELLED = Disposables.disposed();

`

411

411

``

``

412

`+

static final LongConsumer NOOP_CONSUMER = n -> {};

`

``

413

+

412

414

`final CoreSubscriber<? super T> actual;

`

413

415

`final Context ctx;

`

414

416

``

`@@ -434,6 +436,7 @@ static abstract class BaseSink extends AtomicBoolean

`

434

436

`BaseSink(CoreSubscriber<? super T> actual) {

`

435

437

`this.actual = actual;

`

436

438

`this.ctx = actual.currentContext();

`

``

439

`+

REQUESTED.lazySet(this, Long.MIN_VALUE);

`

437

440

` }

`

438

441

``

439

442

`@Override

`

`@@ -500,7 +503,7 @@ void disposeResource(boolean isCancel) {

`

500

503

``

501

504

`@Override

`

502

505

`public long requestedFromDownstream() {

`

503

``

`-

return requested;

`

``

506

`+

return requested & Long.MAX_VALUE;

`

504

507

` }

`

505

508

``

506

509

`void onCancel() {

`

`@@ -519,12 +522,15 @@ final boolean isTerminated() {

`

519

522

`@Override

`

520

523

`public final void request(long n) {

`

521

524

`if (Operators.validate(n)) {

`

522

``

`-

Operators.addCap(REQUESTED, this, n);

`

``

525

`+

long s = addCap(this, n);

`

523

526

``

524

``

`-

LongConsumer consumer = requestConsumer;

`

525

``

`-

if (n > 0 && consumer != null && !isCancelled()) {

`

526

``

`-

consumer.accept(n);

`

``

527

`+

if (hasRequestConsumer(s)) {

`

``

528

`+

LongConsumer consumer = requestConsumer;

`

``

529

`+

if (!isCancelled()) {

`

``

530

`+

consumer.accept(n);

`

``

531

`+

}

`

527

532

` }

`

``

533

+

528

534

`onRequestedFromDownstream();

`

529

535

` }

`

530

536

` }

`

`@@ -541,20 +547,29 @@ public CoreSubscriber<? super T> actual() {

`

541

547

`@Override

`

542

548

`public FluxSink onRequest(LongConsumer consumer) {

`

543

549

`Objects.requireNonNull(consumer, "onRequest");

`

544

``

`-

onRequest(consumer, n -> {

`

545

``

`-

}, Long.MAX_VALUE);

`

``

550

`+

onPushRequest(consumer);

`

546

551

`return this;

`

547

552

` }

`

548

553

``

549

``

`-

protected void onRequest(LongConsumer initialRequestConsumer,

`

550

``

`-

LongConsumer requestConsumer,

`

551

``

`-

long value) {

`

``

554

`+

protected void onPushRequest(LongConsumer initialRequestConsumer) {

`

``

555

`+

if (!REQUEST_CONSUMER.compareAndSet(this, null, NOOP_CONSUMER)) {

`

``

556

`+

throw new IllegalStateException(

`

``

557

`+

"A consumer has already been assigned to consume requests");

`

``

558

`+

}

`

``

559

+

``

560

`+

// do not change real flag since real consumer is technically absent

`

``

561

`+

initialRequestConsumer.accept(Long.MAX_VALUE);

`

``

562

`+

}

`

``

563

+

``

564

`+

protected void onPushPullRequest(LongConsumer requestConsumer) {

`

552

565

`if (!REQUEST_CONSUMER.compareAndSet(this, null, requestConsumer)) {

`

553

566

`throw new IllegalStateException(

`

554

567

`"A consumer has already been assigned to consume requests");

`

555

568

` }

`

556

``

`-

else if (value > 0) {

`

557

``

`-

initialRequestConsumer.accept(value);

`

``

569

+

``

570

`+

long initialRequest = markRequestConsumerSet(this);

`

``

571

`+

if (initialRequest > 0) {

`

``

572

`+

requestConsumer.accept(initialRequest);

`

558

573

` }

`

559

574

` }

`

560

575

``

`@@ -607,7 +622,7 @@ else if (c instanceof SinkDisposable) {

`

607

622

`public Object scanUnsafe(Attr key) {

`

608

623

`if (key == Attr.TERMINATED) return disposable == TERMINATED;

`

609

624

`if (key == Attr.CANCELLED) return disposable == CANCELLED;

`

610

``

`-

if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;

`

``

625

`+

if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requestedFromDownstream();

`

611

626

`if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC;

`

612

627

``

613

628

`return InnerProducer.super.scanUnsafe(key);

`

`@@ -617,6 +632,54 @@ public Object scanUnsafe(Attr key) {

`

617

632

`public String toString() {

`

618

633

`return "FluxSink";

`

619

634

` }

`

``

635

+

``

636

`+

static void produced(BaseSink instance, long toSub) {

`

``

637

`+

long s, r, u;

`

``

638

`+

do {

`

``

639

`+

s = instance.requested;

`

``

640

`+

r = s & Long.MAX_VALUE;

`

``

641

`+

if (r == 0 || r == Long.MAX_VALUE) {

`

``

642

`+

return;

`

``

643

`+

}

`

``

644

`+

u = Operators.subOrZero(r, toSub);

`

``

645

`+

} while (!REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE)));

`

``

646

`+

}

`

``

647

+

``

648

+

``

649

`+

static long addCap(BaseSink instance, long toAdd) {

`

``

650

`+

long r, u, s;

`

``

651

`+

for (;;) {

`

``

652

`+

s = instance.requested;

`

``

653

`+

r = s & Long.MAX_VALUE;

`

``

654

`+

if (r == Long.MAX_VALUE) {

`

``

655

`+

return Long.MAX_VALUE;

`

``

656

`+

}

`

``

657

`+

u = Operators.addCap(r, toAdd);

`

``

658

`+

if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) {

`

``

659

`+

return s;

`

``

660

`+

}

`

``

661

`+

}

`

``

662

`+

}

`

``

663

+

``

664

`+

static long markRequestConsumerSet(BaseSink instance) {

`

``

665

`+

long u, s;

`

``

666

`+

for (;;) {

`

``

667

`+

s = instance.requested;

`

``

668

+

``

669

`+

if (hasRequestConsumer(s)) {

`

``

670

`+

return s;

`

``

671

`+

}

`

``

672

+

``

673

`+

u = s & Long.MAX_VALUE;

`

``

674

`+

if (REQUESTED.compareAndSet(instance, s, u)) {

`

``

675

`+

return u;

`

``

676

`+

}

`

``

677

`+

}

`

``

678

`+

}

`

``

679

+

``

680

`+

static boolean hasRequestConsumer(long requestedState) {

`

``

681

`+

return (requestedState & Long.MIN_VALUE) == 0;

`

``

682

`+

}

`

620

683

` }

`

621

684

``

622

685

`static final class IgnoreSink extends BaseSink {

`

`@@ -639,8 +702,9 @@ public FluxSink next(T t) {

`

639

702

`actual.onNext(t);

`

640

703

``

641

704

`for (; ; ) {

`

642

``

`-

long r = requested;

`

643

``

`-

if (r == 0L || REQUESTED.compareAndSet(this, r, r - 1)) {

`

``

705

`+

long s = requested;

`

``

706

`+

long r = s & Long.MAX_VALUE;

`

``

707

`+

if (r == 0L || REQUESTED.compareAndSet(this, s, (r - 1) | (s & Long.MIN_VALUE))) {

`

644

708

`return this;

`

645

709

` }

`

646

710

` }

`

`@@ -665,9 +729,9 @@ public final FluxSink next(T t) {

`

665

729

`return this;

`

666

730

` }

`

667

731

``

668

``

`-

if (requested != 0) {

`

``

732

`+

if (requestedFromDownstream() != 0) {

`

669

733

`actual.onNext(t);

`

670

``

`-

Operators.produced(REQUESTED, this, 1);

`

``

734

`+

produced(this, 1);

`

671

735

` }

`

672

736

`else {

`

673

737

`onOverflow();

`

`@@ -776,7 +840,7 @@ void drain() {

`

776

840

`final Queue q = queue;

`

777

841

``

778

842

`for (; ; ) {

`

779

``

`-

long r = requested;

`

``

843

`+

long r = requestedFromDownstream();

`

780

844

`long e = 0L;

`

781

845

``

782

846

`while (e != r) {

`

`@@ -844,7 +908,7 @@ void drain() {

`

844

908

` }

`

845

909

``

846

910

`if (e != 0) {

`

847

``

`-

Operators.produced(REQUESTED, this, e);

`

``

911

`+

produced(this, e);

`

848

912

` }

`

849

913

``

850

914

`if (WIP.decrementAndGet(this) == 0) {

`

`@@ -936,7 +1000,7 @@ void drain() {

`

936

1000

`final AtomicReference q = queue;

`

937

1001

``

938

1002

`for (; ; ) {

`

939

``

`-

long r = requested;

`

``

1003

`+

long r = requestedFromDownstream();

`

940

1004

`long e = 0L;

`

941

1005

``

942

1006

`while (e != r) {

`

`@@ -1006,7 +1070,7 @@ void drain() {

`

1006

1070

` }

`

1007

1071

``

1008

1072

`if (e != 0) {

`

1009

``

`-

Operators.produced(REQUESTED, this, e);

`

``

1073

`+

produced(this, e);

`

1010

1074

` }

`

1011

1075

``

1012

1076

`if (WIP.decrementAndGet(this) == 0) {

`