- The Go Programming Language (original) (raw)

Source file src/runtime/time.go

 1  
 2  
 3  
 4  
 5  
 6  
 7  package runtime
 8  
 9  import (
10  	"internal/abi"
11  	"internal/runtime/atomic"
12  	"internal/runtime/sys"
13  	"unsafe"
14  )
15  
16  
17  func time_runtimeNow() (sec int64, nsec int32, mono int64) {
18  	if bubble := getg().bubble; bubble != nil {
19  		sec = bubble.now / (1000 * 1000 * 1000)
20  		nsec = int32(bubble.now % (1000 * 1000 * 1000))
21  		
22  		
23  		
24  		
25  		
26  		
27  		return sec, nsec, 0
28  	}
29  	return time_now()
30  }
31  
32  
33  func time_runtimeNano() int64 {
34  	gp := getg()
35  	if gp.bubble != nil {
36  		return gp.bubble.now
37  	}
38  	return nanotime()
39  }
40  
41  
42  func time_runtimeIsBubbled() bool {
43  	return getg().bubble != nil
44  }
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  type timer struct {
56  	
57  	mu mutex
58  
59  	astate atomic.Uint8 
60  	state  uint8        
61  	isChan bool         
62  	isFake bool         
63  
64  	blocked uint32 
65  	rand    uint32 
66  
67  	
68  	
69  	
70  	
71  	
72  	
73  	
74  	
75  	
76  	
77  	
78  	
79  	
80  	
81  	
82  	
83  	
84  	
85  	
86  	
87  	
88  	
89  	
90  	
91  	
92  	when   int64
93  	period int64
94  	f      func(arg any, seq uintptr, delay int64)
95  	arg    any
96  	seq    uintptr
97  
98  	
99  	ts *timers

100   101   102   103   sendLock mutex 104   105   106   107   108   109   110   111   112   113   114   115   116   117   isSending atomic.Int32 118  } 119   120   121   122   123   124  func (t *timer) init(f func(arg any, seq uintptr, delay int64), arg any) { 125   lockInit(&t.mu, lockRankTimer) 126   t.f = f 127   t.arg = arg 128  } 129   130   131  type timers struct { 132   133   134   mu mutex 135   136   137   138   heap []timerWhen 139   140   141   len atomic.Uint32 142   143   144   145   zombies atomic.Int32 146   147   148   raceCtx uintptr 149   150   151   152   153   154   minWhenHeap atomic.Int64 155   156   157   158   159   minWhenModified atomic.Int64 160  } 161   162  type timerWhen struct { 163   timer *timer 164   when int64 165  } 166   167   168  func (tw timerWhen) less(other timerWhen) bool { 169   switch { 170   case tw.when < other.when: 171   return true 172   case tw.when > other.when: 173   return false 174   default: 175   176   177   178   return tw.timer.rand < other.timer.rand 179   } 180  } 181   182  func (ts *timers) lock() { 183   lock(&ts.mu) 184  } 185   186  func (ts *timers) unlock() { 187   188   189   190   191   192   193   194   ts.len.Store(uint32(len(ts.heap))) 195   196   unlock(&ts.mu) 197  } 198   199   200  const ( 201   202   timerHeaped uint8 = 1 << iota 203   204   205   206   207   208   209   timerModified 210   211   212   213   214   215   216   217   218   219   timerZombie 220  ) 221   222   223  const timerDebug = false 224   225  func (t *timer) trace(op string) { 226   if timerDebug { 227   t.trace1(op) 228   } 229  } 230   231  func (t *timer) trace1(op string) { 232   if !timerDebug { 233   return 234   } 235   bits := [4]string{"h", "m", "z", "c"} 236   for i := range 3 { 237   if t.state&(1<<i) == 0 { 238   bits[i] = "-" 239   } 240   } 241   if !t.isChan { 242   bits[3] = "-" 243   } 244   print("T ", t, " ", bits[0], bits[1], bits[2], bits[3], " b=", t.blocked, " ", op, "\n") 245  } 246   247  func (ts *timers) trace(op string) { 248   if timerDebug { 249   println("TS", ts, op) 250   } 251  } 252   253   254  func (t *timer) lock() { 255   lock(&t.mu) 256   t.trace("lock") 257  } 258   259   260  func (t *timer) unlock() { 261   t.trace("unlock") 262   263   264   t.astate.Store(t.state) 265   unlock(&t.mu) 266  } 267   268   269   270  func (t *timer) hchan() *hchan { 271   if !t.isChan { 272   badTimer() 273   } 274   275   276   277   return (*hchan)(efaceOf(&t.arg).data) 278  } 279   280   281   282   283   284   285   286   287  func (t *timer) updateHeap() (updated bool) { 288   assertWorldStoppedOrLockHeld(&t.mu) 289   t.trace("updateHeap") 290   ts := t.ts 291   if ts == nil || t != ts.heap[0].timer { 292   badTimer() 293   } 294   assertLockHeld(&ts.mu) 295   if t.state&timerZombie != 0 { 296   297   t.state &^= timerHeaped | timerZombie | timerModified 298   ts.zombies.Add(-1) 299   ts.deleteMin() 300   return true 301   } 302   303   if t.state&timerModified != 0 { 304   305   t.state &^= timerModified 306   ts.heap[0].when = t.when 307   ts.siftDown(0) 308   ts.updateMinWhenHeap() 309   return true 310   } 311   312   return false 313  } 314   315   316  const maxWhen = 1<<63 - 1 317   318   319   320  const verifyTimers = false 321   322   323   324   325   326   327   328   329   330  func timeSleep(ns int64) { 331   if ns <= 0 { 332   return 333   } 334   335   gp := getg() 336   t := gp.timer 337   if t == nil { 338   t = new(timer) 339   t.init(goroutineReady, gp) 340   if gp.bubble != nil { 341   t.isFake = true 342   } 343   gp.timer = t 344   } 345   var now int64 346   if bubble := gp.bubble; bubble != nil { 347   now = bubble.now 348   } else { 349   now = nanotime() 350   } 351   when := now + ns 352   if when < 0 { 353   when = maxWhen 354   } 355   gp.sleepWhen = when 356   if t.isFake { 357   358   359   360   resetForSleep(gp, nil) 361   gopark(nil, nil, waitReasonSleep, traceBlockSleep, 1) 362   } else { 363   gopark(resetForSleep, nil, waitReasonSleep, traceBlockSleep, 1) 364   } 365  } 366   367   368   369   370   371  func resetForSleep(gp *g, _ unsafe.Pointer) bool { 372   gp.timer.reset(gp.sleepWhen, 0) 373   return true 374  } 375   376   377   378   379  type timeTimer struct { 380   c unsafe.Pointer 381   init bool 382   timer 383  } 384   385   386   387   388   389  func newTimer(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, c *hchan) *timeTimer { 390   t := new(timeTimer) 391   t.timer.init(nil, nil) 392   t.trace("new") 393   if raceenabled { 394   racerelease(unsafe.Pointer(&t.timer)) 395   } 396   if c != nil { 397   lockInit(&t.sendLock, lockRankTimerSend) 398   t.isChan = true 399   c.timer = &t.timer 400   if c.dataqsiz == 0 { 401   throw("invalid timer channel: no capacity") 402   } 403   } 404   if bubble := getg().bubble; bubble != nil { 405   t.isFake = true 406   } 407   t.modify(when, period, f, arg, 0) 408   t.init = true 409   return t 410  } 411   412   413   414   415   416  func stopTimer(t *timeTimer) bool { 417   if t.isFake && getg().bubble == nil { 418   fatal("stop of synctest timer from outside bubble") 419   } 420   return t.stop() 421  } 422   423   424   425   426   427   428  func resetTimer(t *timeTimer, when, period int64) bool { 429   if raceenabled { 430   racerelease(unsafe.Pointer(&t.timer)) 431   } 432   if t.isFake && getg().bubble == nil { 433   fatal("reset of synctest timer from outside bubble") 434   } 435   return t.reset(when, period) 436  } 437   438   439   440   441  func goroutineReady(arg any, _ uintptr, _ int64) { 442   goready(arg.(*g), 0) 443  } 444   445   446   447   448   449   450  func (ts *timers) addHeap(t *timer) { 451   assertWorldStoppedOrLockHeld(&ts.mu) 452   453   454   if netpollInited.Load() == 0 { 455   netpollGenericInit() 456   } 457   458   if t.ts != nil { 459   throw("ts set in timer") 460   } 461   t.ts = ts 462   ts.heap = append(ts.heap, timerWhen{t, t.when}) 463   ts.siftUp(len(ts.heap) - 1) 464   if t == ts.heap[0].timer { 465   ts.updateMinWhenHeap() 466   } 467  } 468   469   470   471   472   473   474  func (t *timer) maybeRunAsync() { 475   assertLockHeld(&t.mu) 476   if t.state&timerHeaped == 0 && t.isChan && t.when > 0 { 477   478   479   480   481   482   483   484   if now := nanotime(); t.when <= now { 485   systemstack(func() { 486   t.unlockAndRun(now, nil) 487   }) 488   t.lock() 489   } 490   } 491  } 492   493   494   495   496   497  func (t *timer) stop() bool { 498   async := debug.asynctimerchan.Load() != 0 499   if !async && t.isChan { 500   lock(&t.sendLock) 501   } 502   503   t.lock() 504   t.trace("stop") 505   if async { 506   t.maybeRunAsync() 507   } 508   if t.state&timerHeaped != 0 { 509   t.state |= timerModified 510   if t.state&timerZombie == 0 { 511   t.state |= timerZombie 512   t.ts.zombies.Add(1) 513   } 514   } 515   pending := t.when > 0 516   t.when = 0 517   518   if !async && t.isChan { 519   520   521   t.seq++ 522   523   524   525   526   527   528   if t.period == 0 && t.isSending.Load() > 0 { 529   pending = true 530   } 531   } 532   t.unlock() 533   if !async && t.isChan { 534   unlock(&t.sendLock) 535   if timerchandrain(t.hchan()) { 536   pending = true 537   } 538   } 539   540   return pending 541  } 542   543   544   545  func (ts *timers) deleteMin() { 546   assertLockHeld(&ts.mu) 547   t := ts.heap[0].timer 548   if t.ts != ts { 549   throw("wrong timers") 550   } 551   t.ts = nil 552   last := len(ts.heap) - 1 553   if last > 0 { 554   ts.heap[0] = ts.heap[last] 555   } 556   ts.heap[last] = timerWhen{} 557   ts.heap = ts.heap[:last] 558   if last > 0 { 559   ts.siftDown(0) 560   } 561   ts.updateMinWhenHeap() 562   if last == 0 { 563   564   ts.minWhenModified.Store(0) 565   } 566  } 567   568   569   570   571   572  func (t *timer) modify(when, period int64, f func(arg any, seq uintptr, delay int64), arg any, seq uintptr) bool { 573   if when <= 0 { 574   throw("timer when must be positive") 575   } 576   if period < 0 { 577   throw("timer period must be non-negative") 578   } 579   async := debug.asynctimerchan.Load() != 0 580   581   if !async && t.isChan { 582   lock(&t.sendLock) 583   } 584   585   t.lock() 586   if async { 587   t.maybeRunAsync() 588   } 589   t.trace("modify") 590   oldPeriod := t.period 591   t.period = period 592   if f != nil { 593   t.f = f 594   t.arg = arg 595   t.seq = seq 596   } 597   598   wake := false 599   pending := t.when > 0 600   t.when = when 601   if t.state&timerHeaped != 0 { 602   t.state |= timerModified 603   if t.state&timerZombie != 0 { 604   605   606   t.ts.zombies.Add(-1) 607   t.state &^= timerZombie 608   } 609   610   611   if min := t.ts.minWhenModified.Load(); min == 0 || when < min { 612   wake = true 613   614   615   t.astate.Store(t.state) 616   t.ts.updateMinWhenModified(when) 617   } 618   } 619   620   add := t.needsAdd() 621   622   if add && t.isFake { 623   624   625   626   627   628   629   630   631   632   633   bubble := getg().bubble 634   if bubble == nil { 635   throw("fake timer executing with no bubble") 636   } 637   if t.state&timerHeaped == 0 && when <= bubble.now { 638   systemstack(func() { 639   t.unlockAndRun(bubble.now, bubble) 640   }) 641   return pending 642   } 643   } 644   645   if !async && t.isChan { 646   647   648   t.seq++ 649   650   651   652   653   654   655   if oldPeriod == 0 && t.isSending.Load() > 0 { 656   pending = true 657   } 658   } 659   t.unlock() 660   if !async && t.isChan { 661   if timerchandrain(t.hchan()) { 662   pending = true 663   } 664   unlock(&t.sendLock) 665   } 666   667   if add { 668   t.maybeAdd() 669   } 670   if wake { 671   wakeNetPoller(when) 672   } 673   674   return pending 675  } 676   677   678   679  func (t *timer) needsAdd() bool { 680   assertLockHeld(&t.mu) 681   need := t.state&timerHeaped == 0 && t.when > 0 && (!t.isChan || t.blocked > 0) 682   if need { 683   t.trace("needsAdd+") 684   } else { 685   t.trace("needsAdd-") 686   } 687   return need 688  } 689   690   691   692   693   694   695   696   697   698   699   700   701   702   703   704   705   706   707   708  func (t *timer) maybeAdd() { 709   710   711   712   713   714   715   716   717   718   mp := acquirem() 719   var ts *timers 720   if t.isFake { 721   bubble := getg().bubble 722   if bubble == nil { 723   throw("invalid timer: fake time but no syncgroup") 724   } 725   ts = &bubble.timers 726   } else { 727   ts = &mp.p.ptr().timers 728   } 729   ts.lock() 730   ts.cleanHead() 731   t.lock() 732   t.trace("maybeAdd") 733   when := int64(0) 734   wake := false 735   if t.needsAdd() { 736   if t.isFake { 737   738   739   740   t.rand = cheaprand() 741   } 742   t.state |= timerHeaped 743   when = t.when 744   wakeTime := ts.wakeTime() 745   wake = wakeTime == 0 || when < wakeTime 746   ts.addHeap(t) 747   } 748   t.unlock() 749   ts.unlock() 750   releasem(mp) 751   if wake { 752   wakeNetPoller(when) 753   } 754  } 755   756   757   758   759  func (t *timer) reset(when, period int64) bool { 760   return t.modify(when, period, nil, nil, 0) 761  } 762   763   764   765   766   767  func (ts *timers) cleanHead() { 768   ts.trace("cleanHead") 769   assertLockHeld(&ts.mu) 770   gp := getg() 771   for { 772   if len(ts.heap) == 0 { 773   return 774   } 775   776   777   778   779   780   if gp.preemptStop { 781   return 782   } 783   784   785   786   787   788   n := len(ts.heap) 789   if t := ts.heap[n-1].timer; t.astate.Load()&timerZombie != 0 { 790   t.lock() 791   if t.state&timerZombie != 0 { 792   t.state &^= timerHeaped | timerZombie | timerModified 793   t.ts = nil 794   ts.zombies.Add(-1) 795   ts.heap[n-1] = timerWhen{} 796   ts.heap = ts.heap[:n-1] 797   } 798   t.unlock() 799   continue 800   } 801   802   t := ts.heap[0].timer 803   if t.ts != ts { 804   throw("bad ts") 805   } 806   807   if t.astate.Load()&(timerModified|timerZombie) == 0 { 808   809   return 810   } 811   812   t.lock() 813   updated := t.updateHeap() 814   t.unlock() 815   if !updated { 816   817   return 818   } 819   } 820  } 821   822   823   824   825   826   827  func (ts *timers) take(src *timers) { 828   ts.trace("take") 829   assertWorldStopped() 830   if len(src.heap) > 0 { 831   832   833   834   for _, tw := range src.heap { 835   t := tw.timer 836   t.ts = nil 837   if t.state&timerZombie != 0 { 838   t.state &^= timerHeaped | timerZombie | timerModified 839   } else { 840   t.state &^= timerModified 841   ts.addHeap(t) 842   } 843   } 844   src.heap = nil 845   src.zombies.Store(0) 846   src.minWhenHeap.Store(0) 847   src.minWhenModified.Store(0) 848   src.len.Store(0) 849   ts.len.Store(uint32(len(ts.heap))) 850   } 851  } 852   853   854   855   856   857   858  func (ts *timers) adjust(now int64, force bool) { 859   ts.trace("adjust") 860   assertLockHeld(&ts.mu) 861   862   863   864   865   866   if !force { 867   first := ts.minWhenModified.Load() 868   if first == 0 || first > now { 869   if verifyTimers { 870   ts.verify() 871   } 872   return 873   } 874   } 875   876   877   878   879   880   881   882   883   884   885   886   887   888   889   890   891   892   893   894   895   896   897   898   899   900   901   902   903   904   905   906   907   908   909   910   911   912   913   914   915   916   917   918   919   920   921   922   923   924   925   926   ts.minWhenHeap.Store(ts.wakeTime()) 927   ts.minWhenModified.Store(0) 928   929   changed := false 930   for i := 0; i < len(ts.heap); i++ { 931   tw := &ts.heap[i] 932   t := tw.timer 933   if t.ts != ts { 934   throw("bad ts") 935   } 936   937   if t.astate.Load()&(timerModified|timerZombie) == 0 { 938   939   continue 940   } 941   942   t.lock() 943   switch { 944   case t.state&timerHeaped == 0: 945   badTimer() 946   947   case t.state&timerZombie != 0: 948   ts.zombies.Add(-1) 949   t.state &^= timerHeaped | timerZombie | timerModified 950   n := len(ts.heap) 951   ts.heap[i] = ts.heap[n-1] 952   ts.heap[n-1] = timerWhen{} 953   ts.heap = ts.heap[:n-1] 954   t.ts = nil 955   i-- 956   changed = true 957   958   case t.state&timerModified != 0: 959   tw.when = t.when 960   t.state &^= timerModified 961   changed = true 962   } 963   t.unlock() 964   } 965   966   if changed { 967   ts.initHeap() 968   } 969   ts.updateMinWhenHeap() 970   971   if verifyTimers { 972   ts.verify() 973   } 974  } 975   976   977   978   979   980   981   982  func (ts *timers) wakeTime() int64 { 983   984   985   986   987   988   nextWhen := ts.minWhenModified.Load() 989   when := ts.minWhenHeap.Load() 990   if when == 0 || (nextWhen != 0 && nextWhen < when) { 991   when = nextWhen 992   } 993   return when 994  } 995   996   997   998   999   1000   1001   1002   1003   1004   1005   1006  func (ts *timers) check(now int64, bubble *synctestBubble) (rnow, pollUntil int64, ran bool) { 1007   ts.trace("check") 1008   1009   1010   next := ts.wakeTime() 1011   if next == 0 { 1012   1013   return now, 0, false 1014   } 1015   1016   if now == 0 { 1017   now = nanotime() 1018   } 1019   1020   1021   1022   1023   zombies := ts.zombies.Load() 1024   if zombies < 0 { 1025   badTimer() 1026   } 1027   force := ts == &getg().m.p.ptr().timers && int(zombies) > int(ts.len.Load())/4 1028   1029   if now < next && !force { 1030   1031   return now, next, false 1032   } 1033   1034   ts.lock() 1035   if len(ts.heap) > 0 { 1036   ts.adjust(now, false) 1037   for len(ts.heap) > 0 { 1038   1039   if tw := ts.run(now, bubble); tw != 0 { 1040   if tw > 0 { 1041   pollUntil = tw 1042   } 1043   break 1044   } 1045   ran = true 1046   } 1047   1048   1049   1050   1051   1052   1053   force = ts == &getg().m.p.ptr().timers && int(ts.zombies.Load()) > int(ts.len.Load())/4 1054   if force { 1055   ts.adjust(now, true) 1056   } 1057   } 1058   ts.unlock() 1059   1060   return now, pollUntil, ran 1061  } 1062   1063   1064   1065   1066   1067   1068   1069   1070   1071  func (ts *timers) run(now int64, bubble *synctestBubble) int64 { 1072   ts.trace("run") 1073   assertLockHeld(&ts.mu) 1074  Redo: 1075   if len(ts.heap) == 0 { 1076   return -1 1077   } 1078   tw := ts.heap[0] 1079   t := tw.timer 1080   if t.ts != ts { 1081   throw("bad ts") 1082   } 1083   1084   if t.astate.Load()&(timerModified|timerZombie) == 0 && tw.when > now { 1085   1086   return tw.when 1087   } 1088   1089   t.lock() 1090   if t.updateHeap() { 1091   t.unlock() 1092   goto Redo 1093   } 1094   1095   if t.state&timerHeaped == 0 || t.state&timerModified != 0 { 1096   badTimer() 1097   } 1098   1099   if t.when > now { 1100   1101   t.unlock() 1102   return t.when 1103   } 1104   1105   t.unlockAndRun(now, bubble) 1106   assertLockHeld(&ts.mu) 1107   return 0 1108  } 1109   1110   1111   1112   1113   1114   1115   1116  func (t *timer) unlockAndRun(now int64, bubble *synctestBubble) { 1117   t.trace("unlockAndRun") 1118   assertLockHeld(&t.mu) 1119   if t.ts != nil { 1120   assertLockHeld(&t.ts.mu) 1121   } 1122   if raceenabled { 1123   1124   1125   1126   gp := getg() 1127   var tsLocal *timers 1128   if bubble == nil { 1129   tsLocal = &gp.m.p.ptr().timers 1130   } else { 1131   tsLocal = &bubble.timers 1132   } 1133   if tsLocal.raceCtx == 0 { 1134   tsLocal.raceCtx = racegostart(abi.FuncPCABIInternal((timers).run) + sys.PCQuantum) 1135   } 1136   raceacquirectx(tsLocal.raceCtx, unsafe.Pointer(t)) 1137   } 1138   1139   if t.state&(timerModified|timerZombie) != 0 { 1140   badTimer() 1141   } 1142   1143   f := t.f 1144   arg := t.arg 1145   seq := t.seq 1146   var next int64 1147   delay := now - t.when 1148   if t.period > 0 { 1149   1150   next = t.when + t.period(1+delay/t.period) 1151   if next < 0 { 1152   next = maxWhen 1153   } 1154   } else { 1155   next = 0 1156   } 1157   ts := t.ts 1158   t.when = next 1159   if t.state&timerHeaped != 0 { 1160   t.state |= timerModified 1161   if next == 0 { 1162   t.state |= timerZombie 1163   t.ts.zombies.Add(1) 1164   } 1165   t.updateHeap() 1166   } 1167   1168   async := debug.asynctimerchan.Load() != 0 1169   if !async && t.isChan && t.period == 0 { 1170   1171   if t.isSending.Add(1) < 0 { 1172   throw("too many concurrent timer firings") 1173   } 1174   } 1175   1176   t.unlock() 1177   1178   if raceenabled { 1179   1180   gp := getg() 1181   if gp.racectx != 0 { 1182   throw("unexpected racectx") 1183   } 1184   if bubble == nil { 1185   gp.racectx = gp.m.p.ptr().timers.raceCtx 1186   } else { 1187   gp.racectx = bubble.timers.raceCtx 1188   } 1189   } 1190   1191   if ts != nil { 1192   ts.unlock() 1193   } 1194   1195   if bubble != nil { 1196   1197   gp := getg() 1198   if gp.bubble != nil { 1199   throw("unexpected syncgroup set") 1200   } 1201   gp.bubble = bubble 1202   bubble.changegstatus(gp, _Gdead, _Grunning) 1203   } 1204   1205   if !async && t.isChan { 1206   1207   1208   1209   1210   1211   1212   1213   1214   1215   1216   1217   1218   1219   1220   1221   1222   1223   1224   lock(&t.sendLock) 1225   1226   if t.period == 0 { 1227   1228   1229   1230   if t.isSending.Add(-1) < 0 { 1231   throw("mismatched isSending updates") 1232   } 1233   } 1234   1235   if t.seq != seq { 1236   f = func(any, uintptr, int64) {} 1237   } 1238   } 1239   1240   f(arg, seq, delay) 1241   1242   if !async && t.isChan { 1243   unlock(&t.sendLock) 1244   } 1245   1246   if bubble != nil { 1247   gp := getg() 1248   bubble.changegstatus(gp, _Grunning, _Gdead) 1249   if raceenabled { 1250   1251   1252   racereleasemergeg(gp, bubble.raceaddr()) 1253   } 1254   gp.bubble = nil 1255   } 1256   1257   if ts != nil { 1258   ts.lock() 1259   } 1260   1261   if raceenabled { 1262   gp := getg() 1263   gp.racectx = 0 1264   } 1265  } 1266   1267   1268   1269   1270  func (ts *timers) verify() { 1271   assertLockHeld(&ts.mu) 1272   for i, tw := range ts.heap { 1273   if i == 0 { 1274   1275   continue 1276   } 1277   1278   1279   p := int(uint(i-1) / timerHeapN) 1280   if tw.less(ts.heap[p]) { 1281   print("bad timer heap at ", i, ": ", p, ": ", ts.heap[p].when, ", ", i, ": ", tw.when, "\n") 1282   throw("bad timer heap") 1283   } 1284   } 1285   if n := int(ts.len.Load()); len(ts.heap) != n { 1286   println("timer heap len", len(ts.heap), "!= atomic len", n) 1287   throw("bad timer heap len") 1288   } 1289  } 1290   1291   1292   1293  func (ts *timers) updateMinWhenHeap() { 1294   assertWorldStoppedOrLockHeld(&ts.mu) 1295   if len(ts.heap) == 0 { 1296   ts.minWhenHeap.Store(0) 1297   } else { 1298   ts.minWhenHeap.Store(ts.heap[0].when) 1299   } 1300  } 1301   1302   1303   1304  func (ts *timers) updateMinWhenModified(when int64) { 1305   for { 1306   old := ts.minWhenModified.Load() 1307   if old != 0 && old < when { 1308   return 1309   } 1310   if ts.minWhenModified.CompareAndSwap(old, when) { 1311   return 1312   } 1313   } 1314  } 1315   1316   1317   1318   1319  func timeSleepUntil() int64 { 1320   next := int64(maxWhen) 1321   1322   1323   lock(&allpLock) 1324   for _, pp := range allp { 1325   if pp == nil { 1326   1327   1328   continue 1329   } 1330   1331   if w := pp.timers.wakeTime(); w != 0 { 1332   next = min(next, w) 1333   } 1334   } 1335   unlock(&allpLock) 1336   1337   return next 1338  } 1339   1340  const timerHeapN = 4 1341   1342   1343   1344   1345   1346   1347   1348   1349   1350   1351   1352  func (ts *timers) siftUp(i int) { 1353   heap := ts.heap 1354   if i >= len(heap) { 1355   badTimer() 1356   } 1357   tw := heap[i] 1358   if tw.when <= 0 { 1359   badTimer() 1360   } 1361   for i > 0 { 1362   p := int(uint(i-1) / timerHeapN) 1363   if !tw.less(heap[p]) { 1364   break 1365   } 1366   heap[i] = heap[p] 1367   i = p 1368   } 1369   if heap[i].timer != tw.timer { 1370   heap[i] = tw 1371   } 1372  } 1373   1374   1375   1376  func (ts timers) siftDown(i int) { 1377   heap := ts.heap 1378   n := len(heap) 1379   if i >= n { 1380   badTimer() 1381   } 1382   if itimerHeapN+1 >= n { 1383   return 1384   } 1385   tw := heap[i] 1386   if tw.when <= 0 { 1387   badTimer() 1388   } 1389   for { 1390   leftChild := i*timerHeapN + 1 1391   if leftChild >= n { 1392   break 1393   } 1394   w := tw 1395   c := -1 1396   for j, tw := range heap[leftChild:min(leftChild+timerHeapN, n)] { 1397   if tw.less(w) { 1398   w = tw 1399   c = leftChild + j 1400   } 1401   } 1402   if c < 0 { 1403   break 1404   } 1405   heap[i] = heap[c] 1406   i = c 1407   } 1408   if heap[i].timer != tw.timer { 1409   heap[i] = tw 1410   } 1411  } 1412   1413   1414   1415  func (ts *timers) initHeap() { 1416   1417   1418   if len(ts.heap) <= 1 { 1419   return 1420   } 1421   for i := int(uint(len(ts.heap)-1-1) / timerHeapN); i >= 0; i-- { 1422   ts.siftDown(i) 1423   } 1424  } 1425   1426   1427   1428   1429   1430  func badTimer() { 1431   throw("timer data corruption") 1432  } 1433   1434   1435   1436   1437   1438   1439  func (t *timer) maybeRunChan(c *hchan) { 1440   if t.isFake && getg().bubble != c.bubble { 1441   1442   fatal("synctest timer accessed from outside bubble") 1443   } 1444   if t.astate.Load()&timerHeaped != 0 { 1445   1446   1447   return 1448   } 1449   1450   t.lock() 1451   now := nanotime() 1452   if t.isFake { 1453   now = getg().bubble.now 1454   } 1455   if t.state&timerHeaped != 0 || t.when == 0 || t.when > now { 1456   t.trace("maybeRunChan-") 1457   1458   t.unlock() 1459   return 1460   } 1461   t.trace("maybeRunChan+") 1462   systemstack(func() { 1463   t.unlockAndRun(now, c.bubble) 1464   }) 1465  } 1466   1467   1468   1469   1470   1471  func blockTimerChan(c *hchan) { 1472   t := c.timer 1473   if t.isFake && c.bubble != getg().bubble { 1474   1475   fatal("synctest timer accessed from outside bubble") 1476   } 1477   1478   t.lock() 1479   t.trace("blockTimerChan") 1480   if !t.isChan { 1481   badTimer() 1482   } 1483   1484   t.blocked++ 1485   1486   1487   1488   1489   if t.state&timerHeaped != 0 && t.state&timerZombie != 0 && t.when > 0 { 1490   t.state &^= timerZombie 1491   t.ts.zombies.Add(-1) 1492   } 1493   1494   1495   1496   1497   1498   1499   1500   add := t.needsAdd() 1501   t.unlock() 1502   if add { 1503   t.maybeAdd() 1504   } 1505  } 1506   1507   1508   1509   1510   1511   1512   1513  func unblockTimerChan(c *hchan) { 1514   t := c.timer 1515   t.lock() 1516   t.trace("unblockTimerChan") 1517   if !t.isChan || t.blocked == 0 { 1518   badTimer() 1519   } 1520   t.blocked-- 1521   if t.blocked == 0 && t.state&timerHeaped != 0 && t.state&timerZombie == 0 { 1522   1523   1524   1525   t.state |= timerZombie 1526   t.ts.zombies.Add(1) 1527   } 1528   t.unlock() 1529  } 1530  

View as plain text