Add SSO login support
- Add [sso] config section with redirect_uri - Create mcdsl/sso client when SSO is configured - Add /login (landing page), /sso/redirect, /sso/callback routes - Add /logout route - Update login template with SSO landing page variant - Bump mcdsl to v1.6.0 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
11
vendor/github.com/sourcegraph/conc/.golangci.yml
generated
vendored
Normal file
11
vendor/github.com/sourcegraph/conc/.golangci.yml
generated
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
linters:
|
||||
disable-all: true
|
||||
enable:
|
||||
- errcheck
|
||||
- godot
|
||||
- gosimple
|
||||
- govet
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- typecheck
|
||||
- unused
|
||||
21
vendor/github.com/sourcegraph/conc/LICENSE
generated
vendored
Normal file
21
vendor/github.com/sourcegraph/conc/LICENSE
generated
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2023 Sourcegraph
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
24
vendor/github.com/sourcegraph/conc/Makefile
generated
vendored
Normal file
24
vendor/github.com/sourcegraph/conc/Makefile
generated
vendored
Normal file
@@ -0,0 +1,24 @@
|
||||
.DEFAULT_GOAL := help
|
||||
|
||||
GO_BIN ?= $(shell go env GOPATH)/bin
|
||||
|
||||
.PHONY: help
|
||||
help:
|
||||
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
|
||||
|
||||
$(GO_BIN)/golangci-lint:
|
||||
@echo "==> Installing golangci-lint within "${GO_BIN}""
|
||||
@go install -v github.com/golangci/golangci-lint/cmd/golangci-lint@latest
|
||||
|
||||
.PHONY: lint
|
||||
lint: $(GO_BIN)/golangci-lint ## Run linting on Go files
|
||||
@echo "==> Linting Go source files"
|
||||
@golangci-lint run -v --fix -c .golangci.yml ./...
|
||||
|
||||
.PHONY: test
|
||||
test: ## Run tests
|
||||
go test -race -v ./... -coverprofile ./coverage.txt
|
||||
|
||||
.PHONY: bench
|
||||
bench: ## Run benchmarks. See https://pkg.go.dev/cmd/go#hdr-Testing_flags
|
||||
go test ./... -bench . -benchtime 5s -timeout 0 -run=XXX -cpu 1 -benchmem
|
||||
464
vendor/github.com/sourcegraph/conc/README.md
generated
vendored
Normal file
464
vendor/github.com/sourcegraph/conc/README.md
generated
vendored
Normal file
@@ -0,0 +1,464 @@
|
||||

|
||||
|
||||
# `conc`: better structured concurrency for go
|
||||
|
||||
[](https://pkg.go.dev/github.com/sourcegraph/conc)
|
||||
[](https://sourcegraph.com/github.com/sourcegraph/conc)
|
||||
[](https://goreportcard.com/report/github.com/sourcegraph/conc)
|
||||
[](https://codecov.io/gh/sourcegraph/conc)
|
||||
[](https://discord.gg/bvXQXmtRjN)
|
||||
|
||||
`conc` is your toolbelt for structured concurrency in go, making common tasks
|
||||
easier and safer.
|
||||
|
||||
```sh
|
||||
go get github.com/sourcegraph/conc
|
||||
```
|
||||
|
||||
# At a glance
|
||||
|
||||
- Use [`conc.WaitGroup`](https://pkg.go.dev/github.com/sourcegraph/conc#WaitGroup) if you just want a safer version of `sync.WaitGroup`
|
||||
- Use [`pool.Pool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool) if you want a concurrency-limited task runner
|
||||
- Use [`pool.ResultPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultPool) if you want a concurrent task runner that collects task results
|
||||
- Use [`pool.(Result)?ErrorPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool) if your tasks are fallible
|
||||
- Use [`pool.(Result)?ContextPool`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ContextPool) if your tasks should be canceled on failure
|
||||
- Use [`stream.Stream`](https://pkg.go.dev/github.com/sourcegraph/conc/stream#Stream) if you want to process an ordered stream of tasks in parallel with serial callbacks
|
||||
- Use [`iter.Map`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#Map) if you want to concurrently map a slice
|
||||
- Use [`iter.ForEach`](https://pkg.go.dev/github.com/sourcegraph/conc/iter#ForEach) if you want to concurrently iterate over a slice
|
||||
- Use [`panics.Catcher`](https://pkg.go.dev/github.com/sourcegraph/conc/panics#Catcher) if you want to catch panics in your own goroutines
|
||||
|
||||
All pools are created with
|
||||
[`pool.New()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#New)
|
||||
or
|
||||
[`pool.NewWithResults[T]()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#NewWithResults),
|
||||
then configured with methods:
|
||||
|
||||
- [`p.WithMaxGoroutines()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.MaxGoroutines) configures the maximum number of goroutines in the pool
|
||||
- [`p.WithErrors()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithErrors) configures the pool to run tasks that return errors
|
||||
- [`p.WithContext(ctx)`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#Pool.WithContext) configures the pool to run tasks that should be canceled on first error
|
||||
- [`p.WithFirstError()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ErrorPool.WithFirstError) configures error pools to only keep the first returned error rather than an aggregated error
|
||||
- [`p.WithCollectErrored()`](https://pkg.go.dev/github.com/sourcegraph/conc/pool#ResultContextPool.WithCollectErrored) configures result pools to collect results even when the task errored
|
||||
|
||||
# Goals
|
||||
|
||||
The main goals of the package are:
|
||||
1) Make it harder to leak goroutines
|
||||
2) Handle panics gracefully
|
||||
3) Make concurrent code easier to read
|
||||
|
||||
## Goal #1: Make it harder to leak goroutines
|
||||
|
||||
A common pain point when working with goroutines is cleaning them up. It's
|
||||
really easy to fire off a `go` statement and fail to properly wait for it to
|
||||
complete.
|
||||
|
||||
`conc` takes the opinionated stance that all concurrency should be scoped.
|
||||
That is, goroutines should have an owner and that owner should always
|
||||
ensure that its owned goroutines exit properly.
|
||||
|
||||
In `conc`, the owner of a goroutine is always a `conc.WaitGroup`. Goroutines
|
||||
are spawned in a `WaitGroup` with `(*WaitGroup).Go()`, and
|
||||
`(*WaitGroup).Wait()` should always be called before the `WaitGroup` goes out
|
||||
of scope.
|
||||
|
||||
In some cases, you might want a spawned goroutine to outlast the scope of the
|
||||
caller. In that case, you could pass a `WaitGroup` into the spawning function.
|
||||
|
||||
```go
|
||||
func main() {
|
||||
var wg conc.WaitGroup
|
||||
defer wg.Wait()
|
||||
|
||||
startTheThing(&wg)
|
||||
}
|
||||
|
||||
func startTheThing(wg *conc.WaitGroup) {
|
||||
wg.Go(func() { ... })
|
||||
}
|
||||
```
|
||||
|
||||
For some more discussion on why scoped concurrency is nice, check out [this
|
||||
blog
|
||||
post](https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/).
|
||||
|
||||
## Goal #2: Handle panics gracefully
|
||||
|
||||
A frequent problem with goroutines in long-running applications is handling
|
||||
panics. A goroutine spawned without a panic handler will crash the whole process
|
||||
on panic. This is usually undesirable.
|
||||
|
||||
However, if you do add a panic handler to a goroutine, what do you do with the
|
||||
panic once you catch it? Some options:
|
||||
1) Ignore it
|
||||
2) Log it
|
||||
3) Turn it into an error and return that to the goroutine spawner
|
||||
4) Propagate the panic to the goroutine spawner
|
||||
|
||||
Ignoring panics is a bad idea since panics usually mean there is actually
|
||||
something wrong and someone should fix it.
|
||||
|
||||
Just logging panics isn't great either because then there is no indication to the spawner
|
||||
that something bad happened, and it might just continue on as normal even though your
|
||||
program is in a really bad state.
|
||||
|
||||
Both (3) and (4) are reasonable options, but both require the goroutine to have
|
||||
an owner that can actually receive the message that something went wrong. This
|
||||
is generally not true with a goroutine spawned with `go`, but in the `conc`
|
||||
package, all goroutines have an owner that must collect the spawned goroutine.
|
||||
In the conc package, any call to `Wait()` will panic if any of the spawned goroutines
|
||||
panicked. Additionally, it decorates the panic value with a stacktrace from the child
|
||||
goroutine so that you don't lose information about what caused the panic.
|
||||
|
||||
Doing this all correctly every time you spawn something with `go` is not
|
||||
trivial and it requires a lot of boilerplate that makes the important parts of
|
||||
the code more difficult to read, so `conc` does this for you.
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
type caughtPanicError struct {
|
||||
val any
|
||||
stack []byte
|
||||
}
|
||||
|
||||
func (e *caughtPanicError) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"panic: %q\n%s",
|
||||
e.val,
|
||||
string(e.stack)
|
||||
)
|
||||
}
|
||||
|
||||
func main() {
|
||||
done := make(chan error)
|
||||
go func() {
|
||||
defer func() {
|
||||
if v := recover(); v != nil {
|
||||
done <- &caughtPanicError{
|
||||
val: v,
|
||||
stack: debug.Stack()
|
||||
}
|
||||
} else {
|
||||
done <- nil
|
||||
}
|
||||
}()
|
||||
doSomethingThatMightPanic()
|
||||
}()
|
||||
err := <-done
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func main() {
|
||||
var wg conc.WaitGroup
|
||||
wg.Go(doSomethingThatMightPanic)
|
||||
// panics with a nice stacktrace
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
## Goal #3: Make concurrent code easier to read
|
||||
|
||||
Doing concurrency correctly is difficult. Doing it in a way that doesn't
|
||||
obfuscate what the code is actually doing is more difficult. The `conc` package
|
||||
attempts to make common operations easier by abstracting as much boilerplate
|
||||
complexity as possible.
|
||||
|
||||
Want to run a set of concurrent tasks with a bounded set of goroutines? Use
|
||||
`pool.New()`. Want to process an ordered stream of results concurrently, but
|
||||
still maintain order? Try `stream.New()`. What about a concurrent map over
|
||||
a slice? Take a peek at `iter.Map()`.
|
||||
|
||||
Browse some examples below for some comparisons with doing these by hand.
|
||||
|
||||
# Examples
|
||||
|
||||
Each of these examples forgoes propagating panics for simplicity. To see
|
||||
what kind of complexity that would add, check out the "Goal #2" header above.
|
||||
|
||||
Spawn a set of goroutines and waiting for them to finish:
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func main() {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// crashes on panic!
|
||||
doSomething()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func main() {
|
||||
var wg conc.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Go(doSomething)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
Process each element of a stream in a static pool of goroutines:
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func process(stream chan int) {
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for elem := range stream {
|
||||
handle(elem)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func process(stream chan int) {
|
||||
p := pool.New().WithMaxGoroutines(10)
|
||||
for elem := range stream {
|
||||
elem := elem
|
||||
p.Go(func() {
|
||||
handle(elem)
|
||||
})
|
||||
}
|
||||
p.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
Process each element of a slice in a static pool of goroutines:
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func process(values []int) {
|
||||
feeder := make(chan int, 8)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for elem := range feeder {
|
||||
handle(elem)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
feeder <- value
|
||||
}
|
||||
close(feeder)
|
||||
wg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func process(values []int) {
|
||||
iter.ForEach(values, handle)
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
Concurrently map a slice:
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func concMap(
|
||||
input []int,
|
||||
f func(int) int,
|
||||
) []int {
|
||||
res := make([]int, len(input))
|
||||
var idx atomic.Int64
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
i := int(idx.Add(1) - 1)
|
||||
if i >= len(input) {
|
||||
return
|
||||
}
|
||||
|
||||
res[i] = f(input[i])
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
return res
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func concMap(
|
||||
input []int,
|
||||
f func(*int) int,
|
||||
) []int {
|
||||
return iter.Map(input, f)
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
Process an ordered stream concurrently:
|
||||
|
||||
|
||||
<table>
|
||||
<tr>
|
||||
<th><code>stdlib</code></th>
|
||||
<th><code>conc</code></th>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func mapStream(
|
||||
in chan int,
|
||||
out chan int,
|
||||
f func(int) int,
|
||||
) {
|
||||
tasks := make(chan func())
|
||||
taskResults := make(chan chan int)
|
||||
|
||||
// Worker goroutines
|
||||
var workerWg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
workerWg.Add(1)
|
||||
go func() {
|
||||
defer workerWg.Done()
|
||||
for task := range tasks {
|
||||
task()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Ordered reader goroutines
|
||||
var readerWg sync.WaitGroup
|
||||
readerWg.Add(1)
|
||||
go func() {
|
||||
defer readerWg.Done()
|
||||
for result := range taskResults {
|
||||
item := <-result
|
||||
out <- item
|
||||
}
|
||||
}()
|
||||
|
||||
// Feed the workers with tasks
|
||||
for elem := range in {
|
||||
resultCh := make(chan int, 1)
|
||||
taskResults <- resultCh
|
||||
tasks <- func() {
|
||||
resultCh <- f(elem)
|
||||
}
|
||||
}
|
||||
|
||||
// We've exhausted input.
|
||||
// Wait for everything to finish
|
||||
close(tasks)
|
||||
workerWg.Wait()
|
||||
close(taskResults)
|
||||
readerWg.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
<td>
|
||||
|
||||
```go
|
||||
func mapStream(
|
||||
in chan int,
|
||||
out chan int,
|
||||
f func(int) int,
|
||||
) {
|
||||
s := stream.New().WithMaxGoroutines(10)
|
||||
for elem := range in {
|
||||
elem := elem
|
||||
s.Go(func() stream.Callback {
|
||||
res := f(elem)
|
||||
return func() { out <- res }
|
||||
})
|
||||
}
|
||||
s.Wait()
|
||||
}
|
||||
```
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
# Status
|
||||
|
||||
This package is currently pre-1.0. There are likely to be minor breaking
|
||||
changes before a 1.0 release as we stabilize the APIs and tweak defaults.
|
||||
Please open an issue if you have questions, concerns, or requests that you'd
|
||||
like addressed before the 1.0 release. Currently, a 1.0 is targeted for
|
||||
March 2023.
|
||||
102
vendor/github.com/sourcegraph/conc/panics/panics.go
generated
vendored
Normal file
102
vendor/github.com/sourcegraph/conc/panics/panics.go
generated
vendored
Normal file
@@ -0,0 +1,102 @@
|
||||
package panics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// Catcher is used to catch panics. You can execute a function with Try,
|
||||
// which will catch any spawned panic. Try can be called any number of times,
|
||||
// from any number of goroutines. Once all calls to Try have completed, you can
|
||||
// get the value of the first panic (if any) with Recovered(), or you can just
|
||||
// propagate the panic (re-panic) with Repanic().
|
||||
type Catcher struct {
|
||||
recovered atomic.Pointer[Recovered]
|
||||
}
|
||||
|
||||
// Try executes f, catching any panic it might spawn. It is safe
|
||||
// to call from multiple goroutines simultaneously.
|
||||
func (p *Catcher) Try(f func()) {
|
||||
defer p.tryRecover()
|
||||
f()
|
||||
}
|
||||
|
||||
func (p *Catcher) tryRecover() {
|
||||
if val := recover(); val != nil {
|
||||
rp := NewRecovered(1, val)
|
||||
p.recovered.CompareAndSwap(nil, &rp)
|
||||
}
|
||||
}
|
||||
|
||||
// Repanic panics if any calls to Try caught a panic. It will panic with the
|
||||
// value of the first panic caught, wrapped in a panics.Recovered with caller
|
||||
// information.
|
||||
func (p *Catcher) Repanic() {
|
||||
if val := p.Recovered(); val != nil {
|
||||
panic(val)
|
||||
}
|
||||
}
|
||||
|
||||
// Recovered returns the value of the first panic caught by Try, or nil if
|
||||
// no calls to Try panicked.
|
||||
func (p *Catcher) Recovered() *Recovered {
|
||||
return p.recovered.Load()
|
||||
}
|
||||
|
||||
// NewRecovered creates a panics.Recovered from a panic value and a collected
|
||||
// stacktrace. The skip parameter allows the caller to skip stack frames when
|
||||
// collecting the stacktrace. Calling with a skip of 0 means include the call to
|
||||
// NewRecovered in the stacktrace.
|
||||
func NewRecovered(skip int, value any) Recovered {
|
||||
// 64 frames should be plenty
|
||||
var callers [64]uintptr
|
||||
n := runtime.Callers(skip+1, callers[:])
|
||||
return Recovered{
|
||||
Value: value,
|
||||
Callers: callers[:n],
|
||||
Stack: debug.Stack(),
|
||||
}
|
||||
}
|
||||
|
||||
// Recovered is a panic that was caught with recover().
|
||||
type Recovered struct {
|
||||
// The original value of the panic.
|
||||
Value any
|
||||
// The caller list as returned by runtime.Callers when the panic was
|
||||
// recovered. Can be used to produce a more detailed stack information with
|
||||
// runtime.CallersFrames.
|
||||
Callers []uintptr
|
||||
// The formatted stacktrace from the goroutine where the panic was recovered.
|
||||
// Easier to use than Callers.
|
||||
Stack []byte
|
||||
}
|
||||
|
||||
// String renders a human-readable formatting of the panic.
|
||||
func (p *Recovered) String() string {
|
||||
return fmt.Sprintf("panic: %v\nstacktrace:\n%s\n", p.Value, p.Stack)
|
||||
}
|
||||
|
||||
// AsError casts the panic into an error implementation. The implementation
|
||||
// is unwrappable with the cause of the panic, if the panic was provided one.
|
||||
func (p *Recovered) AsError() error {
|
||||
if p == nil {
|
||||
return nil
|
||||
}
|
||||
return &ErrRecovered{*p}
|
||||
}
|
||||
|
||||
// ErrRecovered wraps a panics.Recovered in an error implementation.
|
||||
type ErrRecovered struct{ Recovered }
|
||||
|
||||
var _ error = (*ErrRecovered)(nil)
|
||||
|
||||
func (p *ErrRecovered) Error() string { return p.String() }
|
||||
|
||||
func (p *ErrRecovered) Unwrap() error {
|
||||
if err, ok := p.Value.(error); ok {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
11
vendor/github.com/sourcegraph/conc/panics/try.go
generated
vendored
Normal file
11
vendor/github.com/sourcegraph/conc/panics/try.go
generated
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
package panics
|
||||
|
||||
// Try executes f, catching and returning any panic it might spawn.
|
||||
//
|
||||
// The recovered panic can be propagated with panic(), or handled as a normal error with
|
||||
// (*panics.Recovered).AsError().
|
||||
func Try(f func()) *Recovered {
|
||||
var c Catcher
|
||||
c.Try(f)
|
||||
return c.Recovered()
|
||||
}
|
||||
104
vendor/github.com/sourcegraph/conc/pool/context_pool.go
generated
vendored
Normal file
104
vendor/github.com/sourcegraph/conc/pool/context_pool.go
generated
vendored
Normal file
@@ -0,0 +1,104 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// ContextPool is a pool that runs tasks that take a context.
|
||||
// A new ContextPool should be created with `New().WithContext(ctx)`.
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
type ContextPool struct {
|
||||
errorPool ErrorPool
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
cancelOnError bool
|
||||
}
|
||||
|
||||
// Go submits a task. If it returns an error, the error will be
|
||||
// collected and returned by Wait(). If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *ContextPool) Go(f func(ctx context.Context) error) {
|
||||
p.errorPool.Go(func() error {
|
||||
if p.cancelOnError {
|
||||
// If we are cancelling on error, then we also want to cancel if a
|
||||
// panic is raised. To do this, we need to recover, cancel, and then
|
||||
// re-throw the caught panic.
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
p.cancel()
|
||||
panic(r)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
err := f(p.ctx)
|
||||
if err != nil && p.cancelOnError {
|
||||
// Leaky abstraction warning: We add the error directly because
|
||||
// otherwise, canceling could cause another goroutine to exit and
|
||||
// return an error before this error was added, which breaks the
|
||||
// expectations of WithFirstError().
|
||||
p.errorPool.addErr(err)
|
||||
p.cancel()
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Wait cleans up all spawned goroutines, propagates any panics, and
|
||||
// returns an error if any of the tasks errored.
|
||||
func (p *ContextPool) Wait() error {
|
||||
// Make sure we call cancel after pool is done to avoid memory leakage.
|
||||
defer p.cancel()
|
||||
return p.errorPool.Wait()
|
||||
}
|
||||
|
||||
// WithFirstError configures the pool to only return the first error
|
||||
// returned by a task. By default, Wait() will return a combined error.
|
||||
// This is particularly useful for (*ContextPool).WithCancelOnError(),
|
||||
// where all errors after the first are likely to be context.Canceled.
|
||||
func (p *ContextPool) WithFirstError() *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
p.errorPool.WithFirstError()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithCancelOnError configures the pool to cancel its context as soon as
|
||||
// any task returns an error or panics. By default, the pool's context is not
|
||||
// canceled until the parent context is canceled.
|
||||
//
|
||||
// In this case, all errors returned from the pool after the first will
|
||||
// likely be context.Canceled - you may want to also use
|
||||
// (*ContextPool).WithFirstError() to configure the pool to only return
|
||||
// the first error.
|
||||
func (p *ContextPool) WithCancelOnError() *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
p.cancelOnError = true
|
||||
return p
|
||||
}
|
||||
|
||||
// WithFailFast is an alias for the combination of WithFirstError and
|
||||
// WithCancelOnError. By default, the errors from all tasks are returned and
|
||||
// the pool's context is not canceled until the parent context is canceled.
|
||||
func (p *ContextPool) WithFailFast() *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
p.WithFirstError()
|
||||
p.WithCancelOnError()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *ContextPool) WithMaxGoroutines(n int) *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
p.errorPool.WithMaxGoroutines(n)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ContextPool) panicIfInitialized() {
|
||||
p.errorPool.panicIfInitialized()
|
||||
}
|
||||
100
vendor/github.com/sourcegraph/conc/pool/error_pool.go
generated
vendored
Normal file
100
vendor/github.com/sourcegraph/conc/pool/error_pool.go
generated
vendored
Normal file
@@ -0,0 +1,100 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ErrorPool is a pool that runs tasks that may return an error.
|
||||
// Errors are collected and returned by Wait().
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
//
|
||||
// A new ErrorPool should be created using `New().WithErrors()`.
|
||||
type ErrorPool struct {
|
||||
pool Pool
|
||||
|
||||
onlyFirstError bool
|
||||
|
||||
mu sync.Mutex
|
||||
errs []error
|
||||
}
|
||||
|
||||
// Go submits a task to the pool. If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *ErrorPool) Go(f func() error) {
|
||||
p.pool.Go(func() {
|
||||
p.addErr(f())
|
||||
})
|
||||
}
|
||||
|
||||
// Wait cleans up any spawned goroutines, propagating any panics and
|
||||
// returning any errors from tasks.
|
||||
func (p *ErrorPool) Wait() error {
|
||||
p.pool.Wait()
|
||||
|
||||
errs := p.errs
|
||||
p.errs = nil // reset errs
|
||||
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
} else if p.onlyFirstError {
|
||||
return errs[0]
|
||||
} else {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext converts the pool to a ContextPool for tasks that should
|
||||
// run under the same context, such that they each respect shared cancellation.
|
||||
// For example, WithCancelOnError can be configured on the returned pool to
|
||||
// signal that all goroutines should be cancelled upon the first error.
|
||||
func (p *ErrorPool) WithContext(ctx context.Context) *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &ContextPool{
|
||||
errorPool: p.deref(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// WithFirstError configures the pool to only return the first error
|
||||
// returned by a task. By default, Wait() will return a combined error.
|
||||
func (p *ErrorPool) WithFirstError() *ErrorPool {
|
||||
p.panicIfInitialized()
|
||||
p.onlyFirstError = true
|
||||
return p
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *ErrorPool) WithMaxGoroutines(n int) *ErrorPool {
|
||||
p.panicIfInitialized()
|
||||
p.pool.WithMaxGoroutines(n)
|
||||
return p
|
||||
}
|
||||
|
||||
// deref is a helper that creates a shallow copy of the pool with the same
|
||||
// settings. We don't want to just dereference the pointer because that makes
|
||||
// the copylock lint angry.
|
||||
func (p *ErrorPool) deref() ErrorPool {
|
||||
return ErrorPool{
|
||||
pool: p.pool.deref(),
|
||||
onlyFirstError: p.onlyFirstError,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ErrorPool) panicIfInitialized() {
|
||||
p.pool.panicIfInitialized()
|
||||
}
|
||||
|
||||
func (p *ErrorPool) addErr(err error) {
|
||||
if err != nil {
|
||||
p.mu.Lock()
|
||||
p.errs = append(p.errs, err)
|
||||
p.mu.Unlock()
|
||||
}
|
||||
}
|
||||
174
vendor/github.com/sourcegraph/conc/pool/pool.go
generated
vendored
Normal file
174
vendor/github.com/sourcegraph/conc/pool/pool.go
generated
vendored
Normal file
@@ -0,0 +1,174 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/sourcegraph/conc"
|
||||
)
|
||||
|
||||
// New creates a new Pool.
|
||||
func New() *Pool {
|
||||
return &Pool{}
|
||||
}
|
||||
|
||||
// Pool is a pool of goroutines used to execute tasks concurrently.
|
||||
//
|
||||
// Tasks are submitted with Go(). Once all your tasks have been submitted, you
|
||||
// must call Wait() to clean up any spawned goroutines and propagate any
|
||||
// panics.
|
||||
//
|
||||
// Goroutines are started lazily, so creating a new pool is cheap. There will
|
||||
// never be more goroutines spawned than there are tasks submitted.
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
//
|
||||
// Pool is efficient, but not zero cost. It should not be used for very short
|
||||
// tasks. Startup and teardown come with an overhead of around 1µs, and each
|
||||
// task has an overhead of around 300ns.
|
||||
type Pool struct {
|
||||
handle conc.WaitGroup
|
||||
limiter limiter
|
||||
tasks chan func()
|
||||
initOnce sync.Once
|
||||
}
|
||||
|
||||
// Go submits a task to be run in the pool. If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *Pool) Go(f func()) {
|
||||
p.init()
|
||||
|
||||
if p.limiter == nil {
|
||||
// No limit on the number of goroutines.
|
||||
select {
|
||||
case p.tasks <- f:
|
||||
// A goroutine was available to handle the task.
|
||||
default:
|
||||
// No goroutine was available to handle the task.
|
||||
// Spawn a new one and send it the task.
|
||||
p.handle.Go(func() {
|
||||
p.worker(f)
|
||||
})
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case p.limiter <- struct{}{}:
|
||||
// If we are below our limit, spawn a new worker rather
|
||||
// than waiting for one to become available.
|
||||
p.handle.Go(func() {
|
||||
p.worker(f)
|
||||
})
|
||||
case p.tasks <- f:
|
||||
// A worker is available and has accepted the task.
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Wait cleans up spawned goroutines, propagating any panics that were
|
||||
// raised by a tasks.
|
||||
func (p *Pool) Wait() {
|
||||
p.init()
|
||||
|
||||
close(p.tasks)
|
||||
|
||||
// After Wait() returns, reset the struct so tasks will be reinitialized on
|
||||
// next use. This better matches the behavior of sync.WaitGroup
|
||||
defer func() { p.initOnce = sync.Once{} }()
|
||||
|
||||
p.handle.Wait()
|
||||
}
|
||||
|
||||
// MaxGoroutines returns the maximum size of the pool.
|
||||
func (p *Pool) MaxGoroutines() int {
|
||||
return p.limiter.limit()
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *Pool) WithMaxGoroutines(n int) *Pool {
|
||||
p.panicIfInitialized()
|
||||
if n < 1 {
|
||||
panic("max goroutines in a pool must be greater than zero")
|
||||
}
|
||||
p.limiter = make(limiter, n)
|
||||
return p
|
||||
}
|
||||
|
||||
// init ensures that the pool is initialized before use. This makes the
|
||||
// zero value of the pool usable.
|
||||
func (p *Pool) init() {
|
||||
p.initOnce.Do(func() {
|
||||
p.tasks = make(chan func())
|
||||
})
|
||||
}
|
||||
|
||||
// panicIfInitialized will trigger a panic if a configuration method is called
|
||||
// after the pool has started any goroutines for the first time. In the case that
|
||||
// new settings are needed, a new pool should be created.
|
||||
func (p *Pool) panicIfInitialized() {
|
||||
if p.tasks != nil {
|
||||
panic("pool can not be reconfigured after calling Go() for the first time")
|
||||
}
|
||||
}
|
||||
|
||||
// WithErrors converts the pool to an ErrorPool so the submitted tasks can
|
||||
// return errors.
|
||||
func (p *Pool) WithErrors() *ErrorPool {
|
||||
p.panicIfInitialized()
|
||||
return &ErrorPool{
|
||||
pool: p.deref(),
|
||||
}
|
||||
}
|
||||
|
||||
// deref is a helper that creates a shallow copy of the pool with the same
|
||||
// settings. We don't want to just dereference the pointer because that makes
|
||||
// the copylock lint angry.
|
||||
func (p *Pool) deref() Pool {
|
||||
p.panicIfInitialized()
|
||||
return Pool{
|
||||
limiter: p.limiter,
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext converts the pool to a ContextPool for tasks that should
|
||||
// run under the same context, such that they each respect shared cancellation.
|
||||
// For example, WithCancelOnError can be configured on the returned pool to
|
||||
// signal that all goroutines should be cancelled upon the first error.
|
||||
func (p *Pool) WithContext(ctx context.Context) *ContextPool {
|
||||
p.panicIfInitialized()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &ContextPool{
|
||||
errorPool: p.WithErrors().deref(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Pool) worker(initialFunc func()) {
|
||||
// The only time this matters is if the task panics.
|
||||
// This makes it possible to spin up new workers in that case.
|
||||
defer p.limiter.release()
|
||||
|
||||
if initialFunc != nil {
|
||||
initialFunc()
|
||||
}
|
||||
|
||||
for f := range p.tasks {
|
||||
f()
|
||||
}
|
||||
}
|
||||
|
||||
type limiter chan struct{}
|
||||
|
||||
func (l limiter) limit() int {
|
||||
return cap(l)
|
||||
}
|
||||
|
||||
func (l limiter) release() {
|
||||
if l != nil {
|
||||
<-l
|
||||
}
|
||||
}
|
||||
85
vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
generated
vendored
Normal file
85
vendor/github.com/sourcegraph/conc/pool/result_context_pool.go
generated
vendored
Normal file
@@ -0,0 +1,85 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// ResultContextPool is a pool that runs tasks that take a context and return a
|
||||
// result. The context passed to the task will be canceled if any of the tasks
|
||||
// return an error, which makes its functionality different than just capturing
|
||||
// a context with the task closure.
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
type ResultContextPool[T any] struct {
|
||||
contextPool ContextPool
|
||||
agg resultAggregator[T]
|
||||
collectErrored bool
|
||||
}
|
||||
|
||||
// Go submits a task to the pool. If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *ResultContextPool[T]) Go(f func(context.Context) (T, error)) {
|
||||
idx := p.agg.nextIndex()
|
||||
p.contextPool.Go(func(ctx context.Context) error {
|
||||
res, err := f(ctx)
|
||||
p.agg.save(idx, res, err != nil)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Wait cleans up all spawned goroutines, propagates any panics, and
|
||||
// returns an error if any of the tasks errored.
|
||||
func (p *ResultContextPool[T]) Wait() ([]T, error) {
|
||||
err := p.contextPool.Wait()
|
||||
results := p.agg.collect(p.collectErrored)
|
||||
p.agg = resultAggregator[T]{}
|
||||
return results, err
|
||||
}
|
||||
|
||||
// WithCollectErrored configures the pool to still collect the result of a task
|
||||
// even if the task returned an error. By default, the result of tasks that errored
|
||||
// are ignored and only the error is collected.
|
||||
func (p *ResultContextPool[T]) WithCollectErrored() *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.collectErrored = true
|
||||
return p
|
||||
}
|
||||
|
||||
// WithFirstError configures the pool to only return the first error
|
||||
// returned by a task. By default, Wait() will return a combined error.
|
||||
func (p *ResultContextPool[T]) WithFirstError() *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.contextPool.WithFirstError()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithCancelOnError configures the pool to cancel its context as soon as
|
||||
// any task returns an error. By default, the pool's context is not
|
||||
// canceled until the parent context is canceled.
|
||||
func (p *ResultContextPool[T]) WithCancelOnError() *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.contextPool.WithCancelOnError()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithFailFast is an alias for the combination of WithFirstError and
|
||||
// WithCancelOnError. By default, the errors from all tasks are returned and
|
||||
// the pool's context is not canceled until the parent context is canceled.
|
||||
func (p *ResultContextPool[T]) WithFailFast() *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.contextPool.WithFailFast()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *ResultContextPool[T]) WithMaxGoroutines(n int) *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.contextPool.WithMaxGoroutines(n)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ResultContextPool[T]) panicIfInitialized() {
|
||||
p.contextPool.panicIfInitialized()
|
||||
}
|
||||
80
vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
generated
vendored
Normal file
80
vendor/github.com/sourcegraph/conc/pool/result_error_pool.go
generated
vendored
Normal file
@@ -0,0 +1,80 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
// ResultErrorPool is a pool that executes tasks that return a generic result
|
||||
// type and an error. Tasks are executed in the pool with Go(), then the
|
||||
// results of the tasks are returned by Wait().
|
||||
//
|
||||
// The order of the results is guaranteed to be the same as the order the
|
||||
// tasks were submitted.
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
type ResultErrorPool[T any] struct {
|
||||
errorPool ErrorPool
|
||||
agg resultAggregator[T]
|
||||
collectErrored bool
|
||||
}
|
||||
|
||||
// Go submits a task to the pool. If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *ResultErrorPool[T]) Go(f func() (T, error)) {
|
||||
idx := p.agg.nextIndex()
|
||||
p.errorPool.Go(func() error {
|
||||
res, err := f()
|
||||
p.agg.save(idx, res, err != nil)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Wait cleans up any spawned goroutines, propagating any panics and
|
||||
// returning the results and any errors from tasks.
|
||||
func (p *ResultErrorPool[T]) Wait() ([]T, error) {
|
||||
err := p.errorPool.Wait()
|
||||
results := p.agg.collect(p.collectErrored)
|
||||
p.agg = resultAggregator[T]{} // reset for reuse
|
||||
return results, err
|
||||
}
|
||||
|
||||
// WithCollectErrored configures the pool to still collect the result of a task
|
||||
// even if the task returned an error. By default, the result of tasks that errored
|
||||
// are ignored and only the error is collected.
|
||||
func (p *ResultErrorPool[T]) WithCollectErrored() *ResultErrorPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.collectErrored = true
|
||||
return p
|
||||
}
|
||||
|
||||
// WithContext converts the pool to a ResultContextPool for tasks that should
|
||||
// run under the same context, such that they each respect shared cancellation.
|
||||
// For example, WithCancelOnError can be configured on the returned pool to
|
||||
// signal that all goroutines should be cancelled upon the first error.
|
||||
func (p *ResultErrorPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
return &ResultContextPool[T]{
|
||||
contextPool: *p.errorPool.WithContext(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
// WithFirstError configures the pool to only return the first error
|
||||
// returned by a task. By default, Wait() will return a combined error.
|
||||
func (p *ResultErrorPool[T]) WithFirstError() *ResultErrorPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.errorPool.WithFirstError()
|
||||
return p
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *ResultErrorPool[T]) WithMaxGoroutines(n int) *ResultErrorPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.errorPool.WithMaxGoroutines(n)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ResultErrorPool[T]) panicIfInitialized() {
|
||||
p.errorPool.panicIfInitialized()
|
||||
}
|
||||
142
vendor/github.com/sourcegraph/conc/pool/result_pool.go
generated
vendored
Normal file
142
vendor/github.com/sourcegraph/conc/pool/result_pool.go
generated
vendored
Normal file
@@ -0,0 +1,142 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// NewWithResults creates a new ResultPool for tasks with a result of type T.
|
||||
//
|
||||
// The configuration methods (With*) will panic if they are used after calling
|
||||
// Go() for the first time.
|
||||
func NewWithResults[T any]() *ResultPool[T] {
|
||||
return &ResultPool[T]{
|
||||
pool: *New(),
|
||||
}
|
||||
}
|
||||
|
||||
// ResultPool is a pool that executes tasks that return a generic result type.
|
||||
// Tasks are executed in the pool with Go(), then the results of the tasks are
|
||||
// returned by Wait().
|
||||
//
|
||||
// The order of the results is guaranteed to be the same as the order the
|
||||
// tasks were submitted.
|
||||
type ResultPool[T any] struct {
|
||||
pool Pool
|
||||
agg resultAggregator[T]
|
||||
}
|
||||
|
||||
// Go submits a task to the pool. If all goroutines in the pool
|
||||
// are busy, a call to Go() will block until the task can be started.
|
||||
func (p *ResultPool[T]) Go(f func() T) {
|
||||
idx := p.agg.nextIndex()
|
||||
p.pool.Go(func() {
|
||||
p.agg.save(idx, f(), false)
|
||||
})
|
||||
}
|
||||
|
||||
// Wait cleans up all spawned goroutines, propagating any panics, and returning
|
||||
// a slice of results from tasks that did not panic.
|
||||
func (p *ResultPool[T]) Wait() []T {
|
||||
p.pool.Wait()
|
||||
results := p.agg.collect(true)
|
||||
p.agg = resultAggregator[T]{} // reset for reuse
|
||||
return results
|
||||
}
|
||||
|
||||
// MaxGoroutines returns the maximum size of the pool.
|
||||
func (p *ResultPool[T]) MaxGoroutines() int {
|
||||
return p.pool.MaxGoroutines()
|
||||
}
|
||||
|
||||
// WithErrors converts the pool to an ResultErrorPool so the submitted tasks
|
||||
// can return errors.
|
||||
func (p *ResultPool[T]) WithErrors() *ResultErrorPool[T] {
|
||||
p.panicIfInitialized()
|
||||
return &ResultErrorPool[T]{
|
||||
errorPool: *p.pool.WithErrors(),
|
||||
}
|
||||
}
|
||||
|
||||
// WithContext converts the pool to a ResultContextPool for tasks that should
|
||||
// run under the same context, such that they each respect shared cancellation.
|
||||
// For example, WithCancelOnError can be configured on the returned pool to
|
||||
// signal that all goroutines should be cancelled upon the first error.
|
||||
func (p *ResultPool[T]) WithContext(ctx context.Context) *ResultContextPool[T] {
|
||||
p.panicIfInitialized()
|
||||
return &ResultContextPool[T]{
|
||||
contextPool: *p.pool.WithContext(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
// WithMaxGoroutines limits the number of goroutines in a pool.
|
||||
// Defaults to unlimited. Panics if n < 1.
|
||||
func (p *ResultPool[T]) WithMaxGoroutines(n int) *ResultPool[T] {
|
||||
p.panicIfInitialized()
|
||||
p.pool.WithMaxGoroutines(n)
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ResultPool[T]) panicIfInitialized() {
|
||||
p.pool.panicIfInitialized()
|
||||
}
|
||||
|
||||
// resultAggregator is a utility type that lets us safely append from multiple
|
||||
// goroutines. The zero value is valid and ready to use.
|
||||
type resultAggregator[T any] struct {
|
||||
mu sync.Mutex
|
||||
len int
|
||||
results []T
|
||||
errored []int
|
||||
}
|
||||
|
||||
// nextIndex reserves a slot for a result. The returned value should be passed
|
||||
// to save() when adding a result to the aggregator.
|
||||
func (r *resultAggregator[T]) nextIndex() int {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
nextIdx := r.len
|
||||
r.len += 1
|
||||
return nextIdx
|
||||
}
|
||||
|
||||
func (r *resultAggregator[T]) save(i int, res T, errored bool) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if i >= len(r.results) {
|
||||
old := r.results
|
||||
r.results = make([]T, r.len)
|
||||
copy(r.results, old)
|
||||
}
|
||||
|
||||
r.results[i] = res
|
||||
|
||||
if errored {
|
||||
r.errored = append(r.errored, i)
|
||||
}
|
||||
}
|
||||
|
||||
// collect returns the set of aggregated results.
|
||||
func (r *resultAggregator[T]) collect(collectErrored bool) []T {
|
||||
if !r.mu.TryLock() {
|
||||
panic("collect should not be called until all goroutines have exited")
|
||||
}
|
||||
|
||||
if collectErrored || len(r.errored) == 0 {
|
||||
return r.results
|
||||
}
|
||||
|
||||
filtered := r.results[:0]
|
||||
sort.Ints(r.errored)
|
||||
for i, e := range r.errored {
|
||||
if i == 0 {
|
||||
filtered = append(filtered, r.results[:e]...)
|
||||
} else {
|
||||
filtered = append(filtered, r.results[r.errored[i-1]+1:e]...)
|
||||
}
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
52
vendor/github.com/sourcegraph/conc/waitgroup.go
generated
vendored
Normal file
52
vendor/github.com/sourcegraph/conc/waitgroup.go
generated
vendored
Normal file
@@ -0,0 +1,52 @@
|
||||
package conc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/sourcegraph/conc/panics"
|
||||
)
|
||||
|
||||
// NewWaitGroup creates a new WaitGroup.
|
||||
func NewWaitGroup() *WaitGroup {
|
||||
return &WaitGroup{}
|
||||
}
|
||||
|
||||
// WaitGroup is the primary building block for scoped concurrency.
|
||||
// Goroutines can be spawned in the WaitGroup with the Go method,
|
||||
// and calling Wait() will ensure that each of those goroutines exits
|
||||
// before continuing. Any panics in a child goroutine will be caught
|
||||
// and propagated to the caller of Wait().
|
||||
//
|
||||
// The zero value of WaitGroup is usable, just like sync.WaitGroup.
|
||||
// Also like sync.WaitGroup, it must not be copied after first use.
|
||||
type WaitGroup struct {
|
||||
wg sync.WaitGroup
|
||||
pc panics.Catcher
|
||||
}
|
||||
|
||||
// Go spawns a new goroutine in the WaitGroup.
|
||||
func (h *WaitGroup) Go(f func()) {
|
||||
h.wg.Add(1)
|
||||
go func() {
|
||||
defer h.wg.Done()
|
||||
h.pc.Try(f)
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait will block until all goroutines spawned with Go exit and will
|
||||
// propagate any panics spawned in a child goroutine.
|
||||
func (h *WaitGroup) Wait() {
|
||||
h.wg.Wait()
|
||||
|
||||
// Propagate a panic if we caught one from a child goroutine.
|
||||
h.pc.Repanic()
|
||||
}
|
||||
|
||||
// WaitAndRecover will block until all goroutines spawned with Go exit and
|
||||
// will return a *panics.Recovered if one of the child goroutines panics.
|
||||
func (h *WaitGroup) WaitAndRecover() *panics.Recovered {
|
||||
h.wg.Wait()
|
||||
|
||||
// Return a recovered panic if we caught one from a child goroutine.
|
||||
return h.pc.Recovered()
|
||||
}
|
||||
Reference in New Issue
Block a user