Disallow memtable flush and sst ingest while WAL is locked (#12652) · facebook/rocksdb@94a867c (original) (raw)

`@@ -666,11 +666,25 @@ TEST_P(DBWriteTest, LockWALInEffect) {

`

666

666

`// try the 1st WAL created during open

`

667

667

`ASSERT_OK(Put("key0", "value"));

`

668

668

`ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());

`

``

669

+

669

670

`ASSERT_OK(db_->LockWAL());

`

``

671

+

670

672

`ASSERT_TRUE(dbfull()->WALBufferIsEmpty());

`

``

673

`+

uint64_t wal_num = dbfull()->TEST_GetCurrentLogNumber();

`

``

674

`+

// Manual flush with wait=false should abruptly fail with TryAgain

`

``

675

`+

FlushOptions flush_opts;

`

``

676

`+

flush_opts.wait = false;

`

``

677

`+

for (bool allow_write_stall : {true, false}) {

`

``

678

`+

flush_opts.allow_write_stall = allow_write_stall;

`

``

679

`+

ASSERT_TRUE(db_->Flush(flush_opts).IsTryAgain());

`

``

680

`+

}

`

``

681

`+

ASSERT_EQ(wal_num, dbfull()->TEST_GetCurrentLogNumber());

`

``

682

+

671

683

`ASSERT_OK(db_->UnlockWAL());

`

672

``

`-

// try the 2nd wal created during SwitchWAL

`

``

684

+

``

685

`+

// try the 2nd wal created during SwitchWAL (not locked this time)

`

673

686

`ASSERT_OK(dbfull()->TEST_SwitchWAL());

`

``

687

`+

ASSERT_NE(wal_num, dbfull()->TEST_GetCurrentLogNumber());

`

674

688

`ASSERT_OK(Put("key1", "value"));

`

675

689

`ASSERT_NE(options.manual_wal_flush, dbfull()->WALBufferIsEmpty());

`

676

690

`ASSERT_OK(db_->LockWAL());

`

`@@ -709,21 +723,57 @@ TEST_P(DBWriteTest, LockWALInEffect) {

`

709

723

`}

`

710

724

``

711

725

`TEST_P(DBWriteTest, LockWALConcurrentRecursive) {

`

``

726

`+

// This is a micro-stress test of LockWAL and concurrency handling.

`

``

727

`+

// It is considered the most convenient way to balance functional

`

``

728

`+

// coverage and reproducibility (vs. the two extremes of (a) unit tests

`

``

729

`+

// tailored to specific interleavings and (b) db_stress)

`

712

730

` Options options = GetOptions();

`

713

731

`Reopen(options);

`

714

``

`-

ASSERT_OK(Put("k1", "val"));

`

``

732

`+

ASSERT_OK(Put("k1", "k1_orig"));

`

715

733

`ASSERT_OK(db_->LockWAL()); // 0 -> 1

`

716

734

`auto frozen_seqno = db_->GetLatestSequenceNumber();

`

717

``

`-

std::atomic t1_completed{false};

`

718

``

`-

port::Thread t1{& {

`

719

``

`-

// Won't finish until WAL unlocked

`

720

``

`-

ASSERT_OK(Put("k1", "val2"));

`

721

``

`-

t1_completed = true;

`

``

735

+

``

736

`+

std::string ingest_file = dbname_ + "/external.sst";

`

``

737

`+

{

`

``

738

`+

SstFileWriter sst_file_writer(EnvOptions(), options);

`

``

739

`+

ASSERT_OK(sst_file_writer.Open(ingest_file));

`

``

740

`+

ASSERT_OK(sst_file_writer.Put("k2", "k2_val"));

`

``

741

`+

ExternalSstFileInfo external_info;

`

``

742

`+

ASSERT_OK(sst_file_writer.Finish(&external_info));

`

``

743

`+

}

`

``

744

`+

AcqRelAtomic parallel_ingest_completed{false};

`

``

745

`+

port::Thread parallel_ingest{& {

`

``

746

`+

IngestExternalFileOptions ingest_opts;

`

``

747

`+

ingest_opts.move_files = true; // faster than copy

`

``

748

`+

// Shouldn't finish until WAL unlocked

`

``

749

`+

ASSERT_OK(db_->IngestExternalFile({ingest_file}, ingest_opts));

`

``

750

`+

parallel_ingest_completed.Store(true);

`

``

751

`+

}};

`

``

752

+

``

753

`+

AcqRelAtomic flush_completed{false};

`

``

754

`+

port::Thread parallel_flush{& {

`

``

755

`+

FlushOptions flush_opts;

`

``

756

`+

// NB: Flush with wait=false case is tested above in LockWALInEffect

`

``

757

`+

flush_opts.wait = true;

`

``

758

`+

// allow_write_stall = true blocks in fewer cases

`

``

759

`+

flush_opts.allow_write_stall = true;

`

``

760

`+

// Shouldn't finish until WAL unlocked

`

``

761

`+

ASSERT_OK(db_->Flush(flush_opts));

`

``

762

`+

flush_completed.Store(true);

`

``

763

`+

}};

`

``

764

+

``

765

`+

AcqRelAtomic parallel_put_completed{false};

`

``

766

`+

port::Thread parallel_put{& {

`

``

767

`+

// This can make certain failure scenarios more likely:

`

``

768

`+

// sleep(1);

`

``

769

`+

// Shouldn't finish until WAL unlocked

`

``

770

`+

ASSERT_OK(Put("k1", "k1_mod"));

`

``

771

`+

parallel_put_completed.Store(true);

`

722

772

` }};

`

723

773

``

724

774

`ASSERT_OK(db_->LockWAL()); // 1 -> 2

`

725

775

`// Read-only ops are OK

`

726

``

`-

ASSERT_EQ(Get("k1"), "val");

`

``

776

`+

ASSERT_EQ(Get("k1"), "k1_orig");

`

727

777

` {

`

728

778

` std::vector files;

`

729

779

` LiveFilesStorageInfoOptions lf_opts;

`

`@@ -732,29 +782,35 @@ TEST_P(DBWriteTest, LockWALConcurrentRecursive) {

`

732

782

`ASSERT_OK(db_->GetLiveFilesStorageInfo({lf_opts}, &files));

`

733

783

` }

`

734

784

``

735

``

`-

port::Thread t2{& {

`

``

785

`+

port::Thread parallel_lock_wal{& {

`

736

786

`ASSERT_OK(db_->LockWAL()); // 2 -> 3 or 1 -> 2

`

737

787

` }};

`

738

788

``

739

789

`ASSERT_OK(db_->UnlockWAL()); // 2 -> 1 or 3 -> 2

`

740

``

`-

// Give t1 an extra chance to jump in case of bug

`

``

790

`+

// Give parallel_put an extra chance to jump in case of bug

`

741

791

`std::this_thread::yield();

`

742

``

`-

t2.join();

`

743

``

`-

ASSERT_FALSE(t1_completed.load());

`

``

792

`+

parallel_lock_wal.join();

`

``

793

`+

ASSERT_FALSE(parallel_put_completed.Load());

`

``

794

`+

ASSERT_FALSE(parallel_ingest_completed.Load());

`

``

795

`+

ASSERT_FALSE(flush_completed.Load());

`

744

796

``

745

797

`// Should now have 2 outstanding LockWAL

`

746

``

`-

ASSERT_EQ(Get("k1"), "val");

`

``

798

`+

ASSERT_EQ(Get("k1"), "k1_orig");

`

747

799

``

748

800

`ASSERT_OK(db_->UnlockWAL()); // 2 -> 1

`

749

801

``

750

``

`-

ASSERT_FALSE(t1_completed.load());

`

751

``

`-

ASSERT_EQ(Get("k1"), "val");

`

``

802

`+

ASSERT_FALSE(parallel_put_completed.Load());

`

``

803

`+

ASSERT_FALSE(parallel_ingest_completed.Load());

`

``

804

`+

ASSERT_FALSE(flush_completed.Load());

`

``

805

+

``

806

`+

ASSERT_EQ(Get("k1"), "k1_orig");

`

``

807

`+

ASSERT_EQ(Get("k2"), "NOT_FOUND");

`

752

808

`ASSERT_EQ(frozen_seqno, db_->GetLatestSequenceNumber());

`

753

809

``

754

810

`// Ensure final Unlock is concurrency safe and extra Unlock is safe but

`

755

811

`// non-OK

`

756

812

` std::atomic unlock_ok{0};

`

757

``

`-

port::Thread t3{& {

`

``

813

`+

port::Thread parallel_stuff{& {

`

758

814

`if (db_->UnlockWAL().ok()) {

`

759

815

` unlock_ok++;

`

760

816

` }

`

`@@ -767,18 +823,23 @@ TEST_P(DBWriteTest, LockWALConcurrentRecursive) {

`

767

823

`if (db_->UnlockWAL().ok()) {

`

768

824

` unlock_ok++;

`

769

825

` }

`

770

``

`-

t3.join();

`

``

826

`+

parallel_stuff.join();

`

771

827

``

772

828

`// There was one extra unlock, so just one non-ok

`

773

829

`ASSERT_EQ(unlock_ok.load(), 2);

`

774

830

``

775

831

`// Write can proceed

`

776

``

`-

t1.join();

`

777

``

`-

ASSERT_TRUE(t1_completed.load());

`

778

``

`-

ASSERT_EQ(Get("k1"), "val2");

`

``

832

`+

parallel_put.join();

`

``

833

`+

ASSERT_TRUE(parallel_put_completed.Load());

`

``

834

`+

ASSERT_EQ(Get("k1"), "k1_mod");

`

``

835

`+

parallel_ingest.join();

`

``

836

`+

ASSERT_TRUE(parallel_ingest_completed.Load());

`

``

837

`+

ASSERT_EQ(Get("k2"), "k2_val");

`

``

838

`+

parallel_flush.join();

`

``

839

`+

ASSERT_TRUE(flush_completed.Load());

`

779

840

`// And new writes

`

780

``

`-

ASSERT_OK(Put("k2", "val"));

`

781

``

`-

ASSERT_EQ(Get("k2"), "val");

`

``

841

`+

ASSERT_OK(Put("k3", "val"));

`

``

842

`+

ASSERT_EQ(Get("k3"), "val");

`

782

843

`}

`

783

844

``

784

845

`TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {

`