- The Go Programming Language (original) (raw)
Source file src/runtime/time.go
1
2
3
4
5
6
7 package runtime
8
9 import (
10 "internal/abi"
11 "internal/runtime/atomic"
12 "internal/runtime/sys"
13 "unsafe"
14 )
15
16
17 func time_runtimeNow() (sec int64, nsec int32, mono int64) {
18 if bubble := getg().bubble; bubble != nil {
19 sec = bubble.now / (1000 * 1000 * 1000)
20 nsec = int32(bubble.now % (1000 * 1000 * 1000))
21
22
23
24
25
26
27 return sec, nsec, 0
28 }
29 return time_now()
30 }
31
32
33 func time_runtimeNano() int64 {
34 gp := getg()
35 if gp.bubble != nil {
36 return gp.bubble.now
37 }
38 return nanotime()
39 }
40
41
42 func time_runtimeIsBubbled() bool {
43 return getg().bubble != nil
44 }
45
46
47
48
49
50
51
52
53
54
55 type timer struct {
56
57 mu mutex
58
59 astate atomic.Uint8
60 state uint8
61 isChan bool
62 isFake bool
63
64 blocked uint32
65 rand uint32
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92 when int64
93 period int64
94 f func(arg any, seq uintptr, delay int64)
95 arg any
96 seq uintptr
97
98
99 ts *timers100 101 102 103 sendLock mutex 104 105 106 107 108 109 110 111 112 113 114 115 116 117 isSending atomic.Int32 118 } 119 120 121 122 123 124 func (t *timer) init(f func(arg any, seq uintptr, delay int64), arg any) { 125 lockInit(&t.mu, lockRankTimer) 126 t.f = f 127 t.arg = arg 128 } 129 130 131 type timers struct { 132 133 134 mu mutex 135 136 137 138 heap []timerWhen 139 140 141 len atomic.Uint32 142 143 144 145 zombies atomic.Int32 146 147 148 raceCtx uintptr 149 150 151 152 153 154 minWhenHeap atomic.Int64 155 156 157 158 159 minWhenModified atomic.Int64 160 } 161 162 type timerWhen struct { 163 timer *timer 164 when int64 165 } 166 167 168 func (tw timerWhen) less(other timerWhen) bool { 169 switch { 170 case tw.when < other.when: 171 return true 172 case tw.when > other.when: 173 return false 174 default: 175 176 177 178 return tw.timer.rand < other.timer.rand 179 } 180 } 181 182 func (ts *timers) lock() { 183 lock(&ts.mu) 184 } 185 186 func (ts *timers) unlock() { 187 188 189 190 191 192 193 194 ts.len.Store(uint32(len(ts.heap))) 195 196 unlock(&ts.mu) 197 } 198 199 200 const ( 201 202 timerHeaped uint8 = 1 << iota 203 204 205 206 207 208 209 timerModified 210 211 212 213 214 215 216 217 218 219 timerZombie 220 ) 221 222 223 const timerDebug = false 224 225 func (t *timer) trace(op string) { 226 if timerDebug { 227 t.trace1(op) 228 } 229 } 230 231 func (t *timer) trace1(op string) { 232 if !timerDebug { 233 return 234 } 235 bits := [4]string{"h", "m", "z", "c"} 236 for i := range 3 { 237 if t.state&(1<<i) == 0 { 238 bits[i] = "-" 239 } 240 } 241 if !t.isChan { 242 bits[3] = "-" 243 } 244 print("T ", t, " ", bits[0], bits[1], bits[2], bits[3], " b=", t.blocked, " ", op, "\n") 245 } 246 247 func (ts *timers) trace(op string) { 248 if timerDebug { 249 println("TS", ts, op) 250 } 251 } 252 253 254 func (t *timer) lock() { 255 lock(&t.mu) 256 t.trace("lock") 257 } 258 259 260 func (t *timer) unlock() { 261 t.trace("unlock") 262 263 264 t.astate.Store(t.state) 265 unlock(&t.mu) 266 } 267 268 269 270 func (t *timer) hchan() *hchan { 271 if !t.isChan { 272 badTimer() 273 } 274 275 276 277 return (*hchan)(efaceOf(&t.arg).data) 278 } 279 280 281 282 283 284 285 286 287 func (t *timer) updateHeap() (updated bool) { 288 assertWorldStoppedOrLockHeld(&t.mu) 289 t.trace("updateHeap") 290 ts := t.ts 291 if ts == nil || t != ts.heap[0].timer { 292 badTimer() 293 } 294 assertLockHeld(&ts.mu) 295 if t.state&timerZombie != 0 { 296 297 t.state &^= timerHeaped | timerZombie | timerModified 298 ts.zombies.Add(-1) 299 ts.deleteMin() 300 return true 301 } 302 303 if t.state&timerModified != 0 { 304 305 t.state &^= timerModified 306 ts.heap[0].when = t.when 307 ts.siftDown(0) 308 ts.updateMinWhenHeap() 309 return true 310 } 311 312 return false 313 } 314 315 316 const maxWhen = 1<<63 - 1 317 318 319 320 const verifyTimers = false 321 322 323 324 325 326 327 328 329 330 func timeSleep(ns int64) { 331 if ns <= 0 { 332 return 333 } 334 335 gp := getg() 336 t := gp.timer 337 if t == nil { 338 t = new(timer) 339 t.init(goroutineReady, gp) 340 if gp.bubble != nil { 341 t.isFake = true 342 } 343 gp.timer = t 344 } 345 var now int64 346 if bubble := gp.bubble; bubble != nil { 347 now = bubble.now 348 } else { 349 now = nanotime() 350 } 351 when := now + ns 352 if when < 0 { 353 when = maxWhen 354 } 355 gp.sleepWhen = when 356 if t.isFake { 357 358 359 360 resetForSleep(gp, nil) 361 gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1) 362 } else { 363 gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1) 364 } 365 } 366 367 368 369 370 371 func resetForSleep(gp *g, _ unsafe.Pointer) bool { 372 gp.timer.reset(gp.sleepWhen, 0) 373 return true 374 } 375 376 377 378 379 type timeTimer struct { 380 c unsafe.Pointer 381 init bool 382 timer 383 } 384 385 386 387 388 389 func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, c *hchan) *timeTimer { 390 t := new(timeTimer) 391 t.timer.init(nil, nil) 392 t.trace("new") 393 if raceenabled { 394 racerelease(unsafe.Pointer(&t.timer)) 395 } 396 if c != nil { 397 lockInit(&t.sendLock, lockRankTimerSend) 398 t.isChan = true 399 c.timer = &t.timer 400 if c.dataqsiz == 0 { 401 throw("invalid timer channel: no capacity") 402 } 403 } 404 if bubble := getg().bubble; bubble != nil { 405 t.isFake = true 406 } 407 t.modify(when, period, f, arg, 0) 408 t.init = true 409 return t 410 } 411 412 413 414 415 416 func stopTimer(t *timeTimer) bool { 417 if t.isFake && getg().bubble == nil { 418 fatal("stop of synctest timer from outside bubble") 419 } 420 return t.stop() 421 } 422 423 424 425 426 427 428 func resetTimer(t *timeTimer, when, period int64) bool { 429 if raceenabled { 430 racerelease(unsafe.Pointer(&t.timer)) 431 } 432 if t.isFake && getg().bubble == nil { 433 fatal("reset of synctest timer from outside bubble") 434 } 435 return t.reset(when, period) 436 } 437 438 439 440 441 func goroutineReady(arg any, _ uintptr, _ int64) { 442 goready(arg.(*g), 0) 443 } 444 445 446 447 448 449 450 func (ts *timers) addHeap(t *timer) { 451 assertWorldStoppedOrLockHeld(&ts.mu) 452 453 454 if netpollInited.Load() == 0 { 455 netpollGenericInit() 456 } 457 458 if t.ts != nil { 459 throw("ts set in timer") 460 } 461 t.ts = ts 462 ts.heap = append(ts.heap, timerWhen{t, t.when}) 463 ts.siftUp(len(ts.heap) - 1) 464 if t == ts.heap[0].timer { 465 ts.updateMinWhenHeap() 466 } 467 } 468 469 470 471 472 473 474 func (t *timer) maybeRunAsync() { 475 assertLockHeld(&t.mu) 476 if t.state&timerHeaped == 0 && t.isChan && t.when > 0 { 477 478 479 480 481 482 483 484 if now := nanotime(); t.when <= now { 485 systemstack(func() { 486 t.unlockAndRun(now, nil) 487 }) 488 t.lock() 489 } 490 } 491 } 492 493 494 495 496 497 func (t *timer) stop() bool { 498 async := debug.asynctimerchan.Load() != 0 499 if !async && t.isChan { 500 lock(&t.sendLock) 501 } 502 503 t.lock() 504 t.trace("stop") 505 if async { 506 t.maybeRunAsync() 507 } 508 if t.state&timerHeaped != 0 { 509 t.state |= timerModified 510 if t.state&timerZombie == 0 { 511 t.state |= timerZombie 512 t.ts.zombies.Add(1) 513 } 514 } 515 pending := t.when > 0 516 t.when = 0 517 518 if !async && t.isChan { 519 520 521 t.seq++ 522 523 524 525 526 527 528 if t.period == 0 && t.isSending.Load() > 0 { 529 pending = true 530 } 531 } 532 t.unlock() 533 if !async && t.isChan { 534 unlock(&t.sendLock) 535 if timerchandrain(t.hchan()) { 536 pending = true 537 } 538 } 539 540 return pending 541 } 542 543 544 545 func (ts *timers) deleteMin() { 546 assertLockHeld(&ts.mu) 547 t := ts.heap[0].timer 548 if t.ts != ts { 549 throw("wrong timers") 550 } 551 t.ts = nil 552 last := len(ts.heap) - 1 553 if last > 0 { 554 ts.heap[0] = ts.heap[last] 555 } 556 ts.heap[last] = timerWhen{} 557 ts.heap = ts.heap[:last] 558 if last > 0 { 559 ts.siftDown(0) 560 } 561 ts.updateMinWhenHeap() 562 if last == 0 { 563 564 ts.minWhenModified.Store(0) 565 } 566 } 567 568 569 570 571 572 func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, seq uintptr) bool { 573 if when <= 0 { 574 throw("timer when must be positive") 575 } 576 if period < 0 { 577 throw("timer period must be non-negative") 578 } 579 async := debug.asynctimerchan.Load() != 0 580 581 if !async && t.isChan { 582 lock(&t.sendLock) 583 } 584 585 t.lock() 586 if async { 587 t.maybeRunAsync() 588 } 589 t.trace("modify") 590 oldPeriod := t.period 591 t.period = period 592 if f != nil { 593 t.f = f 594 t.arg = arg 595 t.seq = seq 596 } 597 598 wake := false 599 pending := t.when > 0 600 t.when = when 601 if t.state&timerHeaped != 0 { 602 t.state |= timerModified 603 if t.state&timerZombie != 0 { 604 605 606 t.ts.zombies.Add(-1) 607 t.state &^= timerZombie 608 } 609 610 611 if min := t.ts.minWhenModified.Load(); min == 0 || when < min { 612 wake = true 613 614 615 t.astate.Store(t.state) 616 t.ts.updateMinWhenModified(when) 617 } 618 } 619 620 add := t.needsAdd() 621 622 if add && t.isFake { 623 624 625 626 627 628 629 630 631 632 633 bubble := getg().bubble 634 if bubble == nil { 635 throw("fake timer executing with no bubble") 636 } 637 if t.state&timerHeaped == 0 && when <= bubble.now { 638 systemstack(func() { 639 t.unlockAndRun(bubble.now, bubble) 640 }) 641 return pending 642 } 643 } 644 645 if !async && t.isChan { 646 647 648 t.seq++ 649 650 651 652 653 654 655 if oldPeriod == 0 && t.isSending.Load() > 0 { 656 pending = true 657 } 658 } 659 t.unlock() 660 if !async && t.isChan { 661 if timerchandrain(t.hchan()) { 662 pending = true 663 } 664 unlock(&t.sendLock) 665 } 666 667 if add { 668 t.maybeAdd() 669 } 670 if wake { 671 wakeNetPoller(when) 672 } 673 674 return pending 675 } 676 677 678 679 func (t *timer) needsAdd() bool { 680 assertLockHeld(&t.mu) 681 need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) 682 if need { 683 t.trace("needsAdd+") 684 } else { 685 t.trace("needsAdd-") 686 } 687 return need 688 } 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 func (t *timer) maybeAdd() { 709 710 711 712 713 714 715 716 717 718 mp := acquirem() 719 var ts *timers 720 if t.isFake { 721 bubble := getg().bubble 722 if bubble == nil { 723 throw("invalid timer: fake time but no syncgroup") 724 } 725 ts = &bubble.timers 726 } else { 727 ts = &mp.p.ptr().timers 728 } 729 ts.lock() 730 ts.cleanHead() 731 t.lock() 732 t.trace("maybeAdd") 733 when := int64(0) 734 wake := false 735 if t.needsAdd() { 736 if t.isFake { 737 738 739 740 t.rand = cheaprand() 741 } 742 t.state |= timerHeaped 743 when = t.when 744 wakeTime := ts.wakeTime() 745 wake = wakeTime == 0 || when < wakeTime 746 ts.addHeap(t) 747 } 748 t.unlock() 749 ts.unlock() 750 releasem(mp) 751 if wake { 752 wakeNetPoller(when) 753 } 754 } 755 756 757 758 759 func (t *timer) reset(when, period int64) bool { 760 return t.modify(when, period, nil, nil, 0) 761 } 762 763 764 765 766 767 func (ts *timers) cleanHead() { 768 ts.trace("cleanHead") 769 assertLockHeld(&ts.mu) 770 gp := getg() 771 for { 772 if len(ts.heap) == 0 { 773 return 774 } 775 776 777 778 779 780 if gp.preemptStop { 781 return 782 } 783 784 785 786 787 788 n := len(ts.heap) 789 if t := ts.heap[n-1].timer; t.astate.Load()&timerZombie != 0 { 790 t.lock() 791 if t.state&timerZombie != 0 { 792 t.state &^= timerHeaped | timerZombie | timerModified 793 t.ts = nil 794 ts.zombies.Add(-1) 795 ts.heap[n-1] = timerWhen{} 796 ts.heap = ts.heap[:n-1] 797 } 798 t.unlock() 799 continue 800 } 801 802 t := ts.heap[0].timer 803 if t.ts != ts { 804 throw("bad ts") 805 } 806 807 if t.astate.Load()&(timerModified|timerZombie) == 0 { 808 809 return 810 } 811 812 t.lock() 813 updated := t.updateHeap() 814 t.unlock() 815 if !updated { 816 817 return 818 } 819 } 820 } 821 822 823 824 825 826 827 func (ts *timers) take(src *timers) { 828 ts.trace("take") 829 assertWorldStopped() 830 if len(src.heap) > 0 { 831 832 833 834 for _, tw := range src.heap { 835 t := tw.timer 836 t.ts = nil 837 if t.state&timerZombie != 0 { 838 t.state &^= timerHeaped | timerZombie | timerModified 839 } else { 840 t.state &^= timerModified 841 ts.addHeap(t) 842 } 843 } 844 src.heap = nil 845 src.zombies.Store(0) 846 src.minWhenHeap.Store(0) 847 src.minWhenModified.Store(0) 848 src.len.Store(0) 849 ts.len.Store(uint32(len(ts.heap))) 850 } 851 } 852 853 854 855 856 857 858 func (ts *timers) adjust(now int64, force bool) { 859 ts.trace("adjust") 860 assertLockHeld(&ts.mu) 861 862 863 864 865 866 if !force { 867 first := ts.minWhenModified.Load() 868 if first == 0 || first > now { 869 if verifyTimers { 870 ts.verify() 871 } 872 return 873 } 874 } 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 ts.minWhenHeap.Store(ts.wakeTime()) 927 ts.minWhenModified.Store(0) 928 929 changed := false 930 for i := 0; i < len(ts.heap); i++ { 931 tw := &ts.heap[i] 932 t := tw.timer 933 if t.ts != ts { 934 throw("bad ts") 935 } 936 937 if t.astate.Load()&(timerModified|timerZombie) == 0 { 938 939 continue 940 } 941 942 t.lock() 943 switch { 944 case t.state&timerHeaped == 0: 945 badTimer() 946 947 case t.state&timerZombie != 0: 948 ts.zombies.Add(-1) 949 t.state &^= timerHeaped | timerZombie | timerModified 950 n := len(ts.heap) 951 ts.heap[i] = ts.heap[n-1] 952 ts.heap[n-1] = timerWhen{} 953 ts.heap = ts.heap[:n-1] 954 t.ts = nil 955 i-- 956 changed = true 957 958 case t.state&timerModified != 0: 959 tw.when = t.when 960 t.state &^= timerModified 961 changed = true 962 } 963 t.unlock() 964 } 965 966 if changed { 967 ts.initHeap() 968 } 969 ts.updateMinWhenHeap() 970 971 if verifyTimers { 972 ts.verify() 973 } 974 } 975 976 977 978 979 980 981 982 func (ts *timers) wakeTime() int64 { 983 984 985 986 987 988 nextWhen := ts.minWhenModified.Load() 989 when := ts.minWhenHeap.Load() 990 if when == 0 || (nextWhen != 0 && nextWhen < when) { 991 when = nextWhen 992 } 993 return when 994 } 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) { 1007 ts.trace("check") 1008 1009 1010 next := ts.wakeTime() 1011 if next == 0 { 1012 1013 return now, 0, false 1014 } 1015 1016 if now == 0 { 1017 now = nanotime() 1018 } 1019 1020 1021 1022 1023 zombies := ts.zombies.Load() 1024 if zombies < 0 { 1025 badTimer() 1026 } 1027 force := ts == &getg().m.p.ptr().timers && int(zombies) > int(ts.len.Load())/4 1028 1029 if now < next && !force { 1030 1031 return now, next, false 1032 } 1033 1034 ts.lock() 1035 if len(ts.heap) > 0 { 1036 ts.adjust(now, false) 1037 for len(ts.heap) > 0 { 1038 1039 if tw := ts.run(now, bubble); tw != 0 { 1040 if tw > 0 { 1041 pollUntil = tw 1042 } 1043 break 1044 } 1045 ran = true 1046 } 1047 1048 1049 1050 1051 1052 1053 force = ts == &getg().m.p.ptr().timers && int(ts.zombies.Load()) > int(ts.len.Load())/4 1054 if force { 1055 ts.adjust(now, true) 1056 } 1057 } 1058 ts.unlock() 1059 1060 return now, pollUntil, ran 1061 } 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 func (ts *timers) run(now int64, bubble *synctestBubble) int64 { 1072 ts.trace("run") 1073 assertLockHeld(&ts.mu) 1074 Redo: 1075 if len(ts.heap) == 0 { 1076 return -1 1077 } 1078 tw := ts.heap[0] 1079 t := tw.timer 1080 if t.ts != ts { 1081 throw("bad ts") 1082 } 1083 1084 if t.astate.Load()&(timerModified|timerZombie) == 0 && tw.when > now { 1085 1086 return tw.when 1087 } 1088 1089 t.lock() 1090 if t.updateHeap() { 1091 t.unlock() 1092 goto Redo 1093 } 1094 1095 if t.state&timerHeaped == 0 || t.state&timerModified != 0 { 1096 badTimer() 1097 } 1098 1099 if t.when > now { 1100 1101 t.unlock() 1102 return t.when 1103 } 1104 1105 t.unlockAndRun(now, bubble) 1106 assertLockHeld(&ts.mu) 1107 return 0 1108 } 1109 1110 1111 1112 1113 1114 1115 1116 func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) { 1117 t.trace("unlockAndRun") 1118 assertLockHeld(&t.mu) 1119 if t.ts != nil { 1120 assertLockHeld(&t.ts.mu) 1121 } 1122 if raceenabled { 1123 1124 1125 1126 gp := getg() 1127 var tsLocal *timers 1128 if bubble == nil { 1129 tsLocal = &gp.m.p.ptr().timers 1130 } else { 1131 tsLocal = &bubble.timers 1132 } 1133 if tsLocal.raceCtx == 0 { 1134 tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((timers).run) + sys.PCQuantum) 1135 } 1136 raceacquirectx(tsLocal.raceCtx, unsafe.Pointer(t)) 1137 } 1138 1139 if t.state&(timerModified|timerZombie) != 0 { 1140 badTimer() 1141 } 1142 1143 f := t.f 1144 arg := t.arg 1145 seq := t.seq 1146 var next int64 1147 delay := now - t.when 1148 if t.period > 0 { 1149 1150 next = t.when + t.period(1+delay/t.period) 1151 if next < 0 { 1152 next = maxWhen 1153 } 1154 } else { 1155 next = 0 1156 } 1157 ts := t.ts 1158 t.when = next 1159 if t.state&timerHeaped != 0 { 1160 t.state |= timerModified 1161 if next == 0 { 1162 t.state |= timerZombie 1163 t.ts.zombies.Add(1) 1164 } 1165 t.updateHeap() 1166 } 1167 1168 async := debug.asynctimerchan.Load() != 0 1169 if !async && t.isChan && t.period == 0 { 1170 1171 if t.isSending.Add(1) < 0 { 1172 throw("too many concurrent timer firings") 1173 } 1174 } 1175 1176 t.unlock() 1177 1178 if raceenabled { 1179 1180 gp := getg() 1181 if gp.racectx != 0 { 1182 throw("unexpected racectx") 1183 } 1184 if bubble == nil { 1185 gp.racectx = gp.m.p.ptr().timers.raceCtx 1186 } else { 1187 gp.racectx = bubble.timers.raceCtx 1188 } 1189 } 1190 1191 if ts != nil { 1192 ts.unlock() 1193 } 1194 1195 if bubble != nil { 1196 1197 gp := getg() 1198 if gp.bubble != nil { 1199 throw("unexpected syncgroup set") 1200 } 1201 gp.bubble = bubble 1202 bubble.changegstatus(gp, _Gdead, _Grunning) 1203 } 1204 1205 if !async && t.isChan { 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 lock(&t.sendLock) 1225 1226 if t.period == 0 { 1227 1228 1229 1230 if t.isSending.Add(-1) < 0 { 1231 throw("mismatched isSending updates") 1232 } 1233 } 1234 1235 if t.seq != seq { 1236 f = func(any, uintptr, int64) {} 1237 } 1238 } 1239 1240 f(arg, seq, delay) 1241 1242 if !async && t.isChan { 1243 unlock(&t.sendLock) 1244 } 1245 1246 if bubble != nil { 1247 gp := getg() 1248 bubble.changegstatus(gp, _Grunning, _Gdead) 1249 if raceenabled { 1250 1251 1252 racereleasemergeg(gp, bubble.raceaddr()) 1253 } 1254 gp.bubble = nil 1255 } 1256 1257 if ts != nil { 1258 ts.lock() 1259 } 1260 1261 if raceenabled { 1262 gp := getg() 1263 gp.racectx = 0 1264 } 1265 } 1266 1267 1268 1269 1270 func (ts *timers) verify() { 1271 assertLockHeld(&ts.mu) 1272 for i, tw := range ts.heap { 1273 if i == 0 { 1274 1275 continue 1276 } 1277 1278 1279 p := int(uint(i-1) / timerHeapN) 1280 if tw.less(ts.heap[p]) { 1281 print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].when, ", ", i, ": ", tw.when, "\n") 1282 throw("bad timer heap") 1283 } 1284 } 1285 if n := int(ts.len.Load()); len(ts.heap) != n { 1286 println("timer heap len", len(ts.heap), "!= atomic len", n) 1287 throw("bad timer heap len") 1288 } 1289 } 1290 1291 1292 1293 func (ts *timers) updateMinWhenHeap() { 1294 assertWorldStoppedOrLockHeld(&ts.mu) 1295 if len(ts.heap) == 0 { 1296 ts.minWhenHeap.Store(0) 1297 } else { 1298 ts.minWhenHeap.Store(ts.heap[0].when) 1299 } 1300 } 1301 1302 1303 1304 func (ts *timers) updateMinWhenModified(when int64) { 1305 for { 1306 old := ts.minWhenModified.Load() 1307 if old != 0 && old < when { 1308 return 1309 } 1310 if ts.minWhenModified.CompareAndSwap(old, when) { 1311 return 1312 } 1313 } 1314 } 1315 1316 1317 1318 1319 func timeSleepUntil() int64 { 1320 next := int64(maxWhen) 1321 1322 1323 lock(&allpLock) 1324 for _, pp := range allp { 1325 if pp == nil { 1326 1327 1328 continue 1329 } 1330 1331 if w := pp.timers.wakeTime(); w != 0 { 1332 next = min(next, w) 1333 } 1334 } 1335 unlock(&allpLock) 1336 1337 return next 1338 } 1339 1340 const timerHeapN = 4 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 func (ts *timers) siftUp(i int) { 1353 heap := ts.heap 1354 if i >= len(heap) { 1355 badTimer() 1356 } 1357 tw := heap[i] 1358 if tw.when <= 0 { 1359 badTimer() 1360 } 1361 for i > 0 { 1362 p := int(uint(i-1) / timerHeapN) 1363 if !tw.less(heap[p]) { 1364 break 1365 } 1366 heap[i] = heap[p] 1367 i = p 1368 } 1369 if heap[i].timer != tw.timer { 1370 heap[i] = tw 1371 } 1372 } 1373 1374 1375 1376 func (ts timers) siftDown(i int) { 1377 heap := ts.heap 1378 n := len(heap) 1379 if i >= n { 1380 badTimer() 1381 } 1382 if itimerHeapN+1 >= n { 1383 return 1384 } 1385 tw := heap[i] 1386 if tw.when <= 0 { 1387 badTimer() 1388 } 1389 for { 1390 leftChild := i*timerHeapN + 1 1391 if leftChild >= n { 1392 break 1393 } 1394 w := tw 1395 c := -1 1396 for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { 1397 if tw.less(w) { 1398 w = tw 1399 c = leftChild + j 1400 } 1401 } 1402 if c < 0 { 1403 break 1404 } 1405 heap[i] = heap[c] 1406 i = c 1407 } 1408 if heap[i].timer != tw.timer { 1409 heap[i] = tw 1410 } 1411 } 1412 1413 1414 1415 func (ts *timers) initHeap() { 1416 1417 1418 if len(ts.heap) <= 1 { 1419 return 1420 } 1421 for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- { 1422 ts.siftDown(i) 1423 } 1424 } 1425 1426 1427 1428 1429 1430 func badTimer() { 1431 throw("timer data corruption") 1432 } 1433 1434 1435 1436 1437 1438 1439 func (t *timer) maybeRunChan(c *hchan) { 1440 if t.isFake && getg().bubble != c.bubble { 1441 1442 fatal("synctest timer accessed from outside bubble") 1443 } 1444 if t.astate.Load()&timerHeaped != 0 { 1445 1446 1447 return 1448 } 1449 1450 t.lock() 1451 now := nanotime() 1452 if t.isFake { 1453 now = getg().bubble.now 1454 } 1455 if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { 1456 t.trace("maybeRunChan-") 1457 1458 t.unlock() 1459 return 1460 } 1461 t.trace("maybeRunChan+") 1462 systemstack(func() { 1463 t.unlockAndRun(now, c.bubble) 1464 }) 1465 } 1466 1467 1468 1469 1470 1471 func blockTimerChan(c *hchan) { 1472 t := c.timer 1473 if t.isFake && c.bubble != getg().bubble { 1474 1475 fatal("synctest timer accessed from outside bubble") 1476 } 1477 1478 t.lock() 1479 t.trace("blockTimerChan") 1480 if !t.isChan { 1481 badTimer() 1482 } 1483 1484 t.blocked++ 1485 1486 1487 1488 1489 if t.state&timerHeaped != 0 && t.state&timerZombie != 0 && t.when > 0 { 1490 t.state &^= timerZombie 1491 t.ts.zombies.Add(-1) 1492 } 1493 1494 1495 1496 1497 1498 1499 1500 add := t.needsAdd() 1501 t.unlock() 1502 if add { 1503 t.maybeAdd() 1504 } 1505 } 1506 1507 1508 1509 1510 1511 1512 1513 func unblockTimerChan(c *hchan) { 1514 t := c.timer 1515 t.lock() 1516 t.trace("unblockTimerChan") 1517 if !t.isChan || t.blocked == 0 { 1518 badTimer() 1519 } 1520 t.blocked-- 1521 if t.blocked == 0 && t.state&timerHeaped != 0 && t.state&timerZombie == 0 { 1522 1523 1524 1525 t.state |= timerZombie 1526 t.ts.zombies.Add(1) 1527 } 1528 t.unlock() 1529 } 1530