Use mcdsl/terminal for all password prompts

Replace direct golang.org/x/term calls with mcdsl/terminal.ReadPassword
across mciasctl (6 sites), mciasgrpcctl (1 site), and mciasdb (1 site).
Aligns with the new CLI security standard in engineering-standards.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-28 11:40:11 -07:00
parent e4220b840e
commit 5b5e1a7ed6
142 changed files with 10241 additions and 7788 deletions

View File

@@ -67,6 +67,10 @@ type Balancer struct {
// balancerCurrent before the UpdateSubConnState is called on the
// balancerCurrent.
currentMu sync.Mutex
// activeGoroutines tracks all the goroutines that this balancer has started
// and that should be waited on when the balancer closes.
activeGoroutines sync.WaitGroup
}
// swap swaps out the current lb with the pending lb and updates the ClientConn.
@@ -76,7 +80,9 @@ func (gsb *Balancer) swap() {
cur := gsb.balancerCurrent
gsb.balancerCurrent = gsb.balancerPending
gsb.balancerPending = nil
gsb.activeGoroutines.Add(1)
go func() {
defer gsb.activeGoroutines.Done()
gsb.currentMu.Lock()
defer gsb.currentMu.Unlock()
cur.Close()
@@ -274,6 +280,7 @@ func (gsb *Balancer) Close() {
currentBalancerToClose.Close()
pendingBalancerToClose.Close()
gsb.activeGoroutines.Wait()
}
// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
@@ -324,7 +331,12 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
defer bw.gsb.mu.Unlock()
bw.lastState = state
// If Close() acquires the mutex before UpdateState(), the balancer
// will already have been removed from the current or pending state when
// reaching this point.
if !bw.gsb.balancerCurrentOrPending(bw) {
// Returning here ensures that (*Balancer).swap() is not invoked after
// (*Balancer).Close() and therefore prevents "use after close".
return
}

View File

@@ -0,0 +1,66 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package weight contains utilities to manage endpoint weights. Weights are
// used by LB policies such as ringhash to distribute load across multiple
// endpoints.
package weight
import (
"fmt"
"google.golang.org/grpc/resolver"
)
// attributeKey is the type used as the key to store EndpointInfo in the
// Attributes field of resolver.Endpoint.
type attributeKey struct{}
// EndpointInfo will be stored in the Attributes field of Endpoints in order to
// use the ringhash balancer.
type EndpointInfo struct {
Weight uint32
}
// Equal allows the values to be compared by Attributes.Equal.
func (a EndpointInfo) Equal(o any) bool {
oa, ok := o.(EndpointInfo)
return ok && oa.Weight == a.Weight
}
// Set returns a copy of endpoint in which the Attributes field is updated with
// EndpointInfo.
func Set(endpoint resolver.Endpoint, epInfo EndpointInfo) resolver.Endpoint {
endpoint.Attributes = endpoint.Attributes.WithValue(attributeKey{}, epInfo)
return endpoint
}
// String returns a human-readable representation of EndpointInfo.
// This method is intended for logging, testing, and debugging purposes only.
// Do not rely on the output format, as it is not guaranteed to remain stable.
func (a EndpointInfo) String() string {
return fmt.Sprintf("Weight: %d", a.Weight)
}
// FromEndpoint returns the EndpointInfo stored in the Attributes field of an
// endpoint. It returns an empty EndpointInfo if attribute is not found.
func FromEndpoint(endpoint resolver.Endpoint) EndpointInfo {
v := endpoint.Attributes.Value(attributeKey{})
ei, _ := v.(EndpointInfo)
return ei
}

View File

@@ -83,6 +83,7 @@ func (b *Unbounded) Load() {
default:
}
} else if b.closing && !b.closed {
b.closed = true
close(b.c)
}
}

View File

@@ -194,7 +194,7 @@ func (r RefChannelType) String() string {
// If channelz is not turned ON, this will simply log the event descriptions.
func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) {
// Log only the trace description associated with the bottom most entity.
d := fmt.Sprintf("[%s]%s", e, desc.Desc)
d := fmt.Sprintf("[%s] %s", e, desc.Desc)
switch desc.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, d)

View File

@@ -26,31 +26,31 @@ import (
)
var (
// TXTErrIgnore is set if TXT errors should be ignored ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
// EnableTXTServiceConfig is set if the DNS resolver should perform TXT
// lookups for service config ("GRPC_ENABLE_TXT_SERVICE_CONFIG" is not
// "false").
EnableTXTServiceConfig = boolFromEnv("GRPC_ENABLE_TXT_SERVICE_CONFIG", true)
// TXTErrIgnore is set if TXT errors should be ignored
// ("GRPC_GO_IGNORE_TXT_ERRORS" is not "false").
TXTErrIgnore = boolFromEnv("GRPC_GO_IGNORE_TXT_ERRORS", true)
// RingHashCap indicates the maximum ring size which defaults to 4096
// entries but may be overridden by setting the environment variable
// "GRPC_RING_HASH_CAP". This does not override the default bounds
// checking which NACKs configs specifying ring sizes > 8*1024*1024 (~8M).
RingHashCap = uint64FromEnv("GRPC_RING_HASH_CAP", 4096, 1, 8*1024*1024)
// ALTSMaxConcurrentHandshakes is the maximum number of concurrent ALTS
// handshakes that can be performed.
ALTSMaxConcurrentHandshakes = uint64FromEnv("GRPC_ALTS_MAX_CONCURRENT_HANDSHAKES", 100, 1, 100)
// EnforceALPNEnabled is set if TLS connections to servers with ALPN disabled
// should be rejected. The HTTP/2 protocol requires ALPN to be enabled, this
// option is present for backward compatibility. This option may be overridden
// by setting the environment variable "GRPC_ENFORCE_ALPN_ENABLED" to "true"
// or "false".
EnforceALPNEnabled = boolFromEnv("GRPC_ENFORCE_ALPN_ENABLED", true)
// XDSFallbackSupport is the env variable that controls whether support for
// xDS fallback is turned on. If this is unset or is false, only the first
// xDS server in the list of server configs will be used.
XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", true)
// NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used
// instead of the exiting pickfirst implementation. This can be disabled by
// setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST"
// to "false".
NewPickFirstEnabled = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", true)
// XDSEndpointHashKeyBackwardCompat controls the parsing of the endpoint hash
// key from EDS LbEndpoint metadata. Endpoint hash keys can be disabled by
@@ -69,6 +69,41 @@ var (
// ALTSHandshakerKeepaliveParams is set if we should add the
// KeepaliveParams when dial the ALTS handshaker service.
ALTSHandshakerKeepaliveParams = boolFromEnv("GRPC_EXPERIMENTAL_ALTS_HANDSHAKER_KEEPALIVE_PARAMS", false)
// EnableDefaultPortForProxyTarget controls whether the resolver adds a default port 443
// to a target address that lacks one. This flag only has an effect when all of
// the following conditions are met:
// - A connect proxy is being used.
// - Target resolution is disabled.
// - The DNS resolver is being used.
EnableDefaultPortForProxyTarget = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_DEFAULT_PORT_FOR_PROXY_TARGET", true)
// XDSAuthorityRewrite indicates whether xDS authority rewriting is enabled.
// This feature is defined in gRFC A81 and is enabled by setting the
// environment variable GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE to "true".
XDSAuthorityRewrite = boolFromEnv("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)
// PickFirstWeightedShuffling indicates whether weighted endpoint shuffling
// is enabled in the pick_first LB policy, as defined in gRFC A113. This
// feature can be disabled by setting the environment variable
// GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING to "false".
PickFirstWeightedShuffling = boolFromEnv("GRPC_EXPERIMENTAL_PF_WEIGHTED_SHUFFLING", true)
// DisableStrictPathChecking indicates whether strict path checking is
// disabled. This feature can be disabled by setting the environment
// variable GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING to "true".
//
// When strict path checking is enabled, gRPC will reject requests with
// paths that do not conform to the gRPC over HTTP/2 specification found at
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md.
//
// When disabled, gRPC will allow paths that do not contain a leading slash.
// Enabling strict path checking is recommended for security reasons, as it
// prevents potential path traversal vulnerabilities.
//
// A future release will remove this environment variable, enabling strict
// path checking behavior unconditionally.
DisableStrictPathChecking = boolFromEnv("GRPC_GO_EXPERIMENTAL_DISABLE_STRICT_PATH_CHECKING", false)
)
func boolFromEnv(envVar string, def bool) bool {

View File

@@ -68,4 +68,15 @@ var (
// trust. For more details, see:
// https://github.com/grpc/proposal/blob/master/A87-mtls-spiffe-support.md
XDSSPIFFEEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_MTLS_SPIFFE", false)
// XDSHTTPConnectEnabled is true if gRPC should parse custom Metadata
// configuring use of an HTTP CONNECT proxy via xDS from cluster resources.
// For more details, see:
// https://github.com/grpc/proposal/blob/master/A86-xds-http-connect.md
XDSHTTPConnectEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_HTTP_CONNECT", false)
// XDSBootstrapCallCredsEnabled controls if call credentials can be used in
// xDS bootstrap configuration via the `call_creds` field. For more details,
// see: https://github.com/grpc/proposal/blob/master/A97-xds-jwt-call-creds.md
XDSBootstrapCallCredsEnabled = boolFromEnv("GRPC_EXPERIMENTAL_XDS_BOOTSTRAP_CALL_CREDS", false)
)

View File

@@ -25,4 +25,11 @@ var (
// BufferPool is implemented by the grpc package and returns a server
// option to configure a shared buffer pool for a grpc.Server.
BufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption
// SetDefaultBufferPool updates the default buffer pool.
SetDefaultBufferPool any // func(mem.BufferPool)
// AcceptCompressors is implemented by the grpc package and returns
// a call option that restricts the grpc-accept-encoding header for a call.
AcceptCompressors any // func(...string) grpc.CallOption
)

View File

@@ -80,25 +80,11 @@ func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure
func (cs *CallbackSerializer) run(ctx context.Context) {
defer close(cs.done)
// TODO: when Go 1.21 is the oldest supported version, this loop and Close
// can be replaced with:
//
// context.AfterFunc(ctx, cs.callbacks.Close)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case cb := <-cs.callbacks.Get():
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
// Close the buffer when the context is canceled
// to prevent new callbacks from being added.
context.AfterFunc(ctx, cs.callbacks.Close)
// Close the buffer to prevent new callbacks from being added.
cs.callbacks.Close()
// Run all pending callbacks.
// Run all callbacks.
for cb := range cs.callbacks.Get() {
cs.callbacks.Load()
cb.(func(context.Context))(ctx)

View File

@@ -21,7 +21,6 @@
package idle
import (
"fmt"
"math"
"sync"
"sync/atomic"
@@ -33,15 +32,15 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}
// Enforcer is the functionality provided by grpc.ClientConn to enter
// and exit from idle mode.
type Enforcer interface {
ExitIdleMode() error
// ClientConn is the functionality provided by grpc.ClientConn to enter and exit
// from idle mode.
type ClientConn interface {
ExitIdleMode()
EnterIdleMode()
}
// Manager implements idleness detection and calls the configured Enforcer to
// enter/exit idle mode when appropriate. Must be created by NewManager.
// Manager implements idleness detection and calls the ClientConn to enter/exit
// idle mode when appropriate. Must be created by NewManager.
type Manager struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
@@ -51,8 +50,8 @@ type Manager struct {
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer Enforcer // Functionality provided by grpc.ClientConn.
timeout time.Duration
cc ClientConn // Functionality provided by grpc.ClientConn.
timeout time.Duration
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
@@ -72,9 +71,9 @@ type Manager struct {
// NewManager creates a new idleness manager implementation for the
// given idle timeout. It begins in idle mode.
func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
func NewManager(cc ClientConn, timeout time.Duration) *Manager {
return &Manager{
enforcer: enforcer,
cc: cc,
timeout: timeout,
actuallyIdle: true,
activeCallsCount: -math.MaxInt32,
@@ -127,7 +126,7 @@ func (m *Manager) handleIdleTimeout() {
// Now that we've checked that there has been no activity, attempt to enter
// idle mode, which is very likely to succeed.
if m.tryEnterIdleMode() {
if m.tryEnterIdleMode(true) {
// Successfully entered idle mode. No timer needed until we exit idle.
return
}
@@ -142,10 +141,13 @@ func (m *Manager) handleIdleTimeout() {
// that, it performs a last minute check to ensure that no new RPC has come in,
// making the channel active.
//
// checkActivity controls if a check for RPC activity, since the last time the
// idle_timeout fired, is made.
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
func (m *Manager) tryEnterIdleMode() bool {
func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
// that the channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
@@ -166,7 +168,7 @@ func (m *Manager) tryEnterIdleMode() bool {
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
return false
}
if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
// A very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
@@ -177,44 +179,37 @@ func (m *Manager) tryEnterIdleMode() bool {
// No new RPCs have come in since we set the active calls count value to
// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
// unconditionally now.
m.enforcer.EnterIdleMode()
m.cc.EnterIdleMode()
m.actuallyIdle = true
return true
}
// EnterIdleModeForTesting instructs the channel to enter idle mode.
func (m *Manager) EnterIdleModeForTesting() {
m.tryEnterIdleMode()
m.tryEnterIdleMode(false)
}
// OnCallBegin is invoked at the start of every RPC.
func (m *Manager) OnCallBegin() error {
func (m *Manager) OnCallBegin() {
if m.isClosed() {
return nil
return
}
if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
// Channel is not idle now. Set the activity bit and allow the call.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
return
}
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
if err := m.ExitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&m.activeCallsCount, -1)
return err
}
m.ExitIdleMode()
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
}
// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
// ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
// internal state.
func (m *Manager) ExitIdleMode() error {
func (m *Manager) ExitIdleMode() {
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
m.idleMu.Lock()
defer m.idleMu.Unlock()
@@ -231,12 +226,10 @@ func (m *Manager) ExitIdleMode() error {
// m.ExitIdleMode.
//
// In any case, there is nothing to do here.
return nil
return
}
if err := m.enforcer.ExitIdleMode(); err != nil {
return fmt.Errorf("failed to exit idle mode: %w", err)
}
m.cc.ExitIdleMode()
// Undo the idle entry process. This also respects any new RPC attempts.
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
@@ -244,7 +237,23 @@ func (m *Manager) ExitIdleMode() error {
// Start a new timer to fire after the configured idle timeout.
m.resetIdleTimerLocked(m.timeout)
return nil
}
// UnsafeSetNotIdle instructs the Manager to update its internal state to
// reflect the reality that the channel is no longer in IDLE mode.
//
// N.B. This method is intended only for internal use by the gRPC client
// when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
// - The channel was **actually in IDLE mode** immediately prior to the call.
// - There is **no concurrent activity** that could cause the channel to exit
// IDLE mode *naturally* at the same time.
func (m *Manager) UnsafeSetNotIdle() {
m.idleMu.Lock()
defer m.idleMu.Unlock()
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
m.actuallyIdle = false
m.resetIdleTimerLocked(m.timeout)
}
// OnCallEnd is invoked at the end of every RPC.

View File

@@ -182,35 +182,6 @@ var (
// other features, including the CSDS service.
NewXDSResolverWithClientForTesting any // func(xdsclient.XDSClient) (resolver.Builder, error)
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
// variable.
//
// TODO: Remove this function once the RLS env var is removed.
RegisterRLSClusterSpecifierPluginForTesting func()
// UnregisterRLSClusterSpecifierPluginForTesting unregisters the RLS Cluster
// Specifier Plugin for testing purposes. This is needed because there is no way
// to unregister the RLS Cluster Specifier Plugin after registering it solely
// for testing purposes using RegisterRLSClusterSpecifierPluginForTesting().
//
// TODO: Remove this function once the RLS env var is removed.
UnregisterRLSClusterSpecifierPluginForTesting func()
// RegisterRBACHTTPFilterForTesting registers the RBAC HTTP Filter for testing
// purposes, regardless of the RBAC environment variable.
//
// TODO: Remove this function once the RBAC env var is removed.
RegisterRBACHTTPFilterForTesting func()
// UnregisterRBACHTTPFilterForTesting unregisters the RBAC HTTP Filter for
// testing purposes. This is needed because there is no way to unregister the
// HTTP Filter after registering it solely for testing purposes using
// RegisterRBACHTTPFilterForTesting().
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()
// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval any // func(so *orca.ServiceOptions)
@@ -240,22 +211,11 @@ var (
// default resolver scheme.
UserSetDefaultScheme = false
// ConnectedAddress returns the connected address for a SubConnState. The
// address is only valid if the state is READY.
ConnectedAddress any // func (scs SubConnState) resolver.Address
// SetConnectedAddress sets the connected address for a SubConnState.
SetConnectedAddress any // func(scs *SubConnState, addr resolver.Address)
// SnapshotMetricRegistryForTesting snapshots the global data of the metric
// registry. Returns a cleanup function that sets the metric registry to its
// original state. Only called in testing functions.
SnapshotMetricRegistryForTesting func() func()
// SetDefaultBufferPoolForTesting updates the default buffer pool, for
// testing purposes.
SetDefaultBufferPoolForTesting any // func(mem.BufferPool)
// SetBufferPoolingThresholdForTesting updates the buffer pooling threshold, for
// testing purposes.
SetBufferPoolingThresholdForTesting any // func(int)
@@ -273,6 +233,18 @@ var (
// When set, the function will be called before the stream enters
// the blocking state.
NewStreamWaitingForResolver = func() {}
// AddressToTelemetryLabels is an xDS-provided function to extract telemetry
// labels from a resolver.Address. Callers must assert its type before calling.
AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string
// AsyncReporterCleanupDelegate is initialized to a pass-through function by
// default (production behavior), allowing tests to swap it with an
// implementation which tracks registration of async reporter and its
// corresponding cleanup.
AsyncReporterCleanupDelegate = func(cleanup func()) func() {
return cleanup
}
)
// HealthChecker defines the signature of the client-side LB channel health
@@ -320,3 +292,9 @@ type EnforceClientConnEmbedding interface {
type Timer interface {
Stop() bool
}
// EnforceMetricsRecorderEmbedding is used to enforce proper MetricsRecorder
// implementation embedding.
type EnforceMetricsRecorderEmbedding interface {
enforceMetricsRecorderEmbedding()
}

View File

@@ -22,11 +22,13 @@ package delegatingresolver
import (
"fmt"
"net"
"net/http"
"net/url"
"sync"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/proxyattributes"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/internal/transport/networktype"
@@ -40,6 +42,8 @@ var (
HTTPSProxyFromEnvironment = http.ProxyFromEnvironment
)
const defaultPort = "443"
// delegatingResolver manages both target URI and proxy address resolution by
// delegating these tasks to separate child resolvers. Essentially, it acts as
// an intermediary between the gRPC ClientConn and the child resolvers.
@@ -107,10 +111,18 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti
targetResolver: nopResolver{},
}
addr := target.Endpoint()
var err error
r.proxyURL, err = proxyURLForTarget(target.Endpoint())
if target.URL.Scheme == "dns" && !targetResolutionEnabled && envconfig.EnableDefaultPortForProxyTarget {
addr, err = parseTarget(addr)
if err != nil {
return nil, fmt.Errorf("delegating_resolver: invalid target address %q: %v", target.Endpoint(), err)
}
}
r.proxyURL, err = proxyURLForTarget(addr)
if err != nil {
return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %s: %v", target, err)
return nil, fmt.Errorf("delegating_resolver: failed to determine proxy URL for target %q: %v", target, err)
}
// proxy is not configured or proxy address excluded using `NO_PROXY` env
@@ -132,8 +144,8 @@ func New(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOpti
// bypass the target resolver and store the unresolved target address.
if target.URL.Scheme == "dns" && !targetResolutionEnabled {
r.targetResolverState = &resolver.State{
Addresses: []resolver.Address{{Addr: target.Endpoint()}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: target.Endpoint()}}}},
Addresses: []resolver.Address{{Addr: addr}},
Endpoints: []resolver.Endpoint{{Addresses: []resolver.Address{{Addr: addr}}}},
}
r.updateTargetResolverState(*r.targetResolverState)
return r, nil
@@ -202,6 +214,44 @@ func needsProxyResolver(state *resolver.State) bool {
return false
}
// parseTarget takes a target string and ensures it is a valid "host:port" target.
//
// It does the following:
// 1. If the target already has a port (e.g., "host:port", "[ipv6]:port"),
// it is returned as is.
// 2. If the host part is empty (e.g., ":80"), it defaults to "localhost",
// returning "localhost:80".
// 3. If the target is missing a port (e.g., "host", "ipv6"), the defaultPort
// is added.
//
// An error is returned for empty targets or targets with a trailing colon
// but no port (e.g., "host:").
func parseTarget(target string) (string, error) {
if target == "" {
return "", fmt.Errorf("missing address")
}
host, port, err := net.SplitHostPort(target)
if err != nil {
// If SplitHostPort fails, it's likely because the port is missing.
// We append the default port and return the result.
return net.JoinHostPort(target, defaultPort), nil
}
// If SplitHostPort succeeds, we check for edge cases.
if port == "" {
// A success with an empty port means the target had a trailing colon,
// e.g., "host:", which is an error.
return "", fmt.Errorf("missing port after port-separator colon")
}
if host == "" {
// A success with an empty host means the target was like ":80".
// We default the host to "localhost".
host = "localhost"
}
return net.JoinHostPort(host, port), nil
}
func skipProxy(address resolver.Address) bool {
// Avoid proxy when network is not tcp.
networkType, ok := networktype.Get(address)

View File

@@ -125,20 +125,23 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
// IP address.
if ipAddr, err := formatIP(host); err == nil {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
cc.UpdateState(resolver.State{
Addresses: addr,
Endpoints: []resolver.Endpoint{{Addresses: addr}},
})
return deadResolver{}, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
d := &dnsResolver{
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
rn: make(chan struct{}, 1),
enableServiceConfig: envconfig.EnableTXTServiceConfig && !opts.DisableServiceConfig,
}
d.resolver, err = internal.NewNetResolver(target.URL.Host)
@@ -181,8 +184,8 @@ type dnsResolver struct {
// finishes, race detector sometimes will warn lookup (READ the lookup
// function pointers) inside watcher() goroutine has data race with
// replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
disableServiceConfig bool
wg sync.WaitGroup
enableServiceConfig bool
}
// ResolveNow invoke an immediate resolution of the target that this
@@ -342,11 +345,19 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
return nil, hostErr
}
state := resolver.State{Addresses: addrs}
eps := make([]resolver.Endpoint, 0, len(addrs))
for _, addr := range addrs {
eps = append(eps, resolver.Endpoint{Addresses: []resolver.Address{addr}})
}
state := resolver.State{
Addresses: addrs,
Endpoints: eps,
}
if len(srv) > 0 {
state = grpclbstate.Set(state, &grpclbstate.State{BalancerAddresses: srv})
}
if !d.disableServiceConfig {
if d.enableServiceConfig {
state.ServiceConfig = d.lookupTXT(ctx)
}
return &state, nil

View File

@@ -20,6 +20,7 @@ import (
"fmt"
estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
)
@@ -28,6 +29,7 @@ import (
// It eats any record calls where the label values provided do not match the
// number of label keys.
type MetricsRecorderList struct {
internal.EnforceMetricsRecorderEmbedding
// metricsRecorders are the metrics recorders this list will forward to.
metricsRecorders []estats.MetricsRecorder
}
@@ -64,6 +66,16 @@ func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle,
}
}
// RecordInt64UpDownCount records the measurement alongside labels on the int
// count associated with the provided handle.
func (l *MetricsRecorderList) RecordInt64UpDownCount(handle *estats.Int64UpDownCountHandle, incr int64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
for _, metricRecorder := range l.metricsRecorders {
metricRecorder.RecordInt64UpDownCount(handle, incr, labels...)
}
}
// RecordFloat64Count records the measurement alongside labels on the float
// count associated with the provided handle.
func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
@@ -103,3 +115,61 @@ func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
metricRecorder.RecordInt64Gauge(handle, incr, labels...)
}
}
// RegisterAsyncReporter forwards the registration to all underlying metrics
// recorders.
//
// It returns a cleanup function that, when called, invokes the cleanup function
// returned by each underlying recorder, ensuring the reporter is unregistered
// from all of them.
func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricReporter, metrics ...estats.AsyncMetric) func() {
descriptorsMap := make(map[*estats.MetricDescriptor]bool, len(metrics))
for _, m := range metrics {
descriptorsMap[m.Descriptor()] = true
}
unregisterFns := make([]func(), 0, len(l.metricsRecorders))
for _, mr := range l.metricsRecorders {
// Wrap the AsyncMetricsRecorder to intercept calls to RecordInt64Gauge
// and validate the labels.
wrappedCallback := func(recorder estats.AsyncMetricsRecorder) error {
wrappedRecorder := &asyncRecorderWrapper{
delegate: recorder,
descriptors: descriptorsMap,
}
return reporter.Report(wrappedRecorder)
}
unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...))
}
// Wrap the cleanup function using the internal delegate.
// In production, this returns realCleanup as-is.
// In tests, the leak checker can swap this to track the registration lifetime.
return internal.AsyncReporterCleanupDelegate(defaultCleanUp(unregisterFns))
}
func defaultCleanUp(unregisterFns []func()) func() {
return func() {
for _, unregister := range unregisterFns {
unregister()
}
}
}
type asyncRecorderWrapper struct {
delegate estats.AsyncMetricsRecorder
descriptors map[*estats.MetricDescriptor]bool
}
// RecordIntAsync64Gauge records the measurement alongside labels on the int
// gauge associated with the provided handle.
func (w *asyncRecorderWrapper) RecordInt64AsyncGauge(handle *estats.Int64AsyncGaugeHandle, value int64, labels ...string) {
// Ensure only metrics for descriptors passed during callback registration
// are emitted.
d := handle.Descriptor()
if _, ok := w.descriptors[d]; !ok {
return
}
// Validate labels and delegate.
verifyLabels(d, labels...)
w.delegate.RecordInt64AsyncGauge(handle, value, labels...)
}

70
vendor/google.golang.org/grpc/internal/stats/stats.go generated vendored Normal file
View File

@@ -0,0 +1,70 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package stats
import (
"context"
"google.golang.org/grpc/stats"
)
type combinedHandler struct {
handlers []stats.Handler
}
// NewCombinedHandler combines multiple stats.Handlers into a single handler.
//
// It returns nil if no handlers are provided. If only one handler is
// provided, it is returned directly without wrapping.
func NewCombinedHandler(handlers ...stats.Handler) stats.Handler {
switch len(handlers) {
case 0:
return nil
case 1:
return handlers[0]
default:
return &combinedHandler{handlers: handlers}
}
}
func (ch *combinedHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
for _, h := range ch.handlers {
ctx = h.TagRPC(ctx, info)
}
return ctx
}
func (ch *combinedHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
for _, h := range ch.handlers {
h.HandleRPC(ctx, stats)
}
}
func (ch *combinedHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
for _, h := range ch.handlers {
ctx = h.TagConn(ctx, info)
}
return ctx
}
func (ch *combinedHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
for _, h := range ch.handlers {
h.HandleConn(ctx, stats)
}
}

View File

@@ -24,30 +24,34 @@ import (
"golang.org/x/net/http2"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
// ClientStream implements streaming functionality for a gRPC client.
type ClientStream struct {
*Stream // Embed for common stream functionality.
Stream // Embed for common stream functionality.
ct *http2Client
done chan struct{} // closed at the end of stream to unblock writers.
doneFunc func() // invoked at the end of stream.
headerChan chan struct{} // closed to indicate the end of header metadata.
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
headerChan chan struct{} // closed to indicate the end of header metadata.
header metadata.MD // the received header metadata
status *status.Status // the status error received from the server
// Non-pointer fields are at the end to optimize GC allocations.
// headerValid indicates whether a valid header was received. Only
// meaningful after headerChan is closed (always call waitOnHeader() before
// reading its value).
headerValid bool
header metadata.MD // the received header metadata
noHeaders bool // set if the client never received headers (set only after the stream is done).
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
status *status.Status // the status error received from the server
headerValid bool
noHeaders bool // set if the client never received headers (set only after the stream is done).
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
statsHandler stats.Handler // nil for internal streams (e.g., health check, ORCA) where telemetry is not supported.
}
// Read reads an n byte message from the input stream.
@@ -142,3 +146,11 @@ func (s *ClientStream) TrailersOnly() bool {
func (s *ClientStream) Status() *status.Status {
return s.status
}
func (s *ClientStream) requestRead(n int) {
s.ct.adjustWindow(s, uint32(n))
}
func (s *ClientStream) updateWindow(n int) {
s.ct.updateWindow(s, uint32(n))
}

View File

@@ -24,16 +24,13 @@ import (
"fmt"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/status"
)
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
@@ -147,11 +144,9 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type earlyAbortStream struct {
httpStatus uint32
streamID uint32
contentSubtype string
status *status.Status
rst bool
streamID uint32
rst bool
hf []hpack.HeaderField // Pre-built header fields
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
@@ -496,6 +491,16 @@ const (
serverSide
)
// maxWriteBufSize is the maximum length (number of elements) the cached
// writeBuf can grow to. The length depends on the number of buffers
// contained within the BufferSlice produced by the codec, which is
// generally small.
//
// If a writeBuf larger than this limit is required, it will be allocated
// and freed after use, rather than being cached. This avoids holding
// on to large amounts of memory.
const maxWriteBufSize = 64
// Loopy receives frames from the control buffer.
// Each frame is handled individually; most of the work done by loopy goes
// into handling data frames. Loopy maintains a queue of active streams, and each
@@ -530,6 +535,8 @@ type loopyWriter struct {
// Side-specific handlers
ssGoAwayHandler func(*goAway) (bool, error)
writeBuf [][]byte // cached slice to avoid heap allocations for calls to mem.Reader.Peek.
}
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator, conn net.Conn, logger *grpclog.PrefixLogger, goAwayHandler func(*goAway) (bool, error), bufferPool mem.BufferPool) *loopyWriter {
@@ -665,11 +672,10 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
l.estdStreams[h.streamID] = str
}
@@ -701,11 +707,10 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
}
// Case 2: Client wants to originate stream.
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
}
return l.originateStream(str, h)
}
@@ -833,18 +838,7 @@ func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if l.side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
// In case the caller forgets to set the http status, default to 200.
if eas.httpStatus == 0 {
eas.httpStatus = 200
}
headerFields := []hpack.HeaderField{
{Name: ":status", Value: strconv.Itoa(int(eas.httpStatus))},
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
}
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
if err := l.writeHeader(eas.streamID, true, eas.hf, nil); err != nil {
return err
}
if eas.rst {
@@ -948,11 +942,11 @@ func (l *loopyWriter) processData() (bool, error) {
if str == nil {
return true, nil
}
reader := str.reader
reader := &str.reader
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
if !dataItem.processing {
dataItem.processing = true
str.reader.Reset(dataItem.data)
reader.Reset(dataItem.data)
dataItem.data.Free()
}
// A data item is represented by a dataFrame, since it later translates into
@@ -964,11 +958,11 @@ func (l *loopyWriter) processData() (bool, error) {
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
if err := l.framer.writeData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
_ = reader.Close()
reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -1001,25 +995,20 @@ func (l *loopyWriter) processData() (bool, error) {
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
size := hSize + dSize
var buf *[]byte
if hSize != 0 && dSize == 0 {
buf = &dataItem.h
} else {
// Note: this is only necessary because the http2.Framer does not support
// partially writing a frame, so the sequence must be materialized into a buffer.
// TODO: Revisit once https://github.com/golang/go/issues/66655 is addressed.
pool := l.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
l.writeBuf = l.writeBuf[:0]
if hSize > 0 {
l.writeBuf = append(l.writeBuf, dataItem.h[:hSize])
}
if dSize > 0 {
var err error
l.writeBuf, err = reader.Peek(dSize, l.writeBuf)
if err != nil {
// This must never happen since the reader must have at least dSize
// bytes.
// Log an error to fail tests.
l.logger.Errorf("unexpected error while reading Data frame payload: %v", err)
return false, err
}
buf = pool.Get(size)
defer pool.Put(buf)
copy((*buf)[:hSize], dataItem.h)
_, _ = reader.Read((*buf)[hSize:])
}
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1032,7 +1021,14 @@ func (l *loopyWriter) processData() (bool, error) {
if dataItem.onEachWrite != nil {
dataItem.onEachWrite()
}
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, (*buf)[:size]); err != nil {
err := l.framer.writeData(dataItem.streamID, endStream, l.writeBuf)
reader.Discard(dSize)
if cap(l.writeBuf) > maxWriteBufSize {
l.writeBuf = nil
} else {
clear(l.writeBuf)
}
if err != nil {
return false, err
}
str.bytesOutStanding += size
@@ -1040,7 +1036,7 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem.h = dataItem.h[hSize:]
if remainingBytes == 0 { // All the data from that message was written out.
_ = reader.Close()
reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {

View File

@@ -28,7 +28,7 @@ import (
// writeQuota is a soft limit on the amount of data a stream can
// schedule before some of it is written out.
type writeQuota struct {
quota int32
_ noCopy
// get waits on read from when quota goes less than or equal to zero.
// replenish writes on it when quota goes positive again.
ch chan struct{}
@@ -38,16 +38,17 @@ type writeQuota struct {
// It is implemented as a field so that it can be updated
// by tests.
replenish func(n int)
quota int32
}
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
w := &writeQuota{
quota: sz,
ch: make(chan struct{}, 1),
done: done,
}
// init allows a writeQuota to be initialized in-place, which is useful for
// resetting a buffer or for avoiding a heap allocation when the buffer is
// embedded in another struct.
func (w *writeQuota) init(sz int32, done <-chan struct{}) {
w.quota = sz
w.ch = make(chan struct{}, 1)
w.done = done
w.replenish = w.realReplenish
return w
}
func (w *writeQuota) get(sz int32) error {
@@ -67,9 +68,9 @@ func (w *writeQuota) get(sz int32) error {
func (w *writeQuota) realReplenish(n int) {
sz := int32(n)
a := atomic.AddInt32(&w.quota, sz)
b := a - sz
if b <= 0 && a > 0 {
newQuota := atomic.AddInt32(&w.quota, sz)
previousQuota := newQuota - sz
if previousQuota <= 0 && newQuota > 0 {
select {
case w.ch <- struct{}{}:
default:

View File

@@ -50,7 +50,7 @@ import (
// NewServerHandlerTransport returns a ServerTransport handling gRPC from
// inside an http.Handler, or writes an HTTP error to w and returns an error.
// It requires that the http Server supports HTTP/2.
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler, bufferPool mem.BufferPool) (ServerTransport, error) {
if r.Method != http.MethodPost {
w.Header().Set("Allow", http.MethodPost)
msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
@@ -170,7 +170,7 @@ type serverHandlerTransport struct {
// TODO make sure this is consistent across handler_server and http2_server
contentSubtype string
stats []stats.Handler
stats stats.Handler
logger *grpclog.PrefixLogger
bufferPool mem.BufferPool
@@ -274,14 +274,14 @@ func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status
}
})
if err == nil { // transport has not been closed
if err == nil && ht.stats != nil { // transport has not been closed
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
for _, sh := range ht.stats {
sh.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
s.hdrMu.Lock()
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
s.hdrMu.Unlock()
}
ht.Close(errors.New("finished writing status"))
return err
@@ -372,19 +372,23 @@ func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) e
ht.rw.(http.Flusher).Flush()
})
if err == nil {
for _, sh := range ht.stats {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
sh.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
}
if err == nil && ht.stats != nil {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
ht.stats.HandleRPC(s.Context(), &stats.OutHeader{
Header: md.Copy(),
Compression: s.sendCompress,
})
}
return err
}
func (ht *serverHandlerTransport) adjustWindow(*ServerStream, uint32) {
}
func (ht *serverHandlerTransport) updateWindow(*ServerStream, uint32) {
}
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
// With this transport type there will be exactly 1 stream: this HTTP request.
var cancel context.CancelFunc
@@ -409,11 +413,9 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
req := ht.req
s := &ServerStream{
Stream: &Stream{
Stream: Stream{
id: 0, // irrelevant
ctx: ctx,
requestRead: func(int) {},
buf: newRecvBuffer(),
method: req.URL.Path,
recvCompress: req.Header.Get("grpc-encoding"),
contentSubtype: ht.contentSubtype,
@@ -422,9 +424,11 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
st: ht,
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
}
s.trReader = &transportReader{
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
windowHandler: func(int) {},
s.Stream.buf.init()
s.readRequester = s
s.trReader = transportReader{
reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
windowHandler: s,
}
// readerDone is closed when the Body.Read-ing goroutine exits.

View File

@@ -44,6 +44,7 @@ import (
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/proxyattributes"
istats "google.golang.org/grpc/internal/stats"
istatus "google.golang.org/grpc/internal/status"
isyscall "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
@@ -105,7 +106,7 @@ type http2Client struct {
kp keepalive.ClientParameters
keepaliveEnabled bool
statsHandlers []stats.Handler
statsHandler stats.Handler
initialWindowSize int32
@@ -335,14 +336,14 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
keepaliveDone: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize, opts.BufferPool),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
activeStreams: make(map[uint32]*ClientStream),
isSecure: isSecure,
perRPCCreds: perRPCCreds,
kp: kp,
statsHandlers: opts.StatsHandlers,
statsHandler: istats.NewCombinedHandler(opts.StatsHandlers...),
initialWindowSize: initialWindowSize,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
@@ -369,7 +370,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
})
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.ctx = peer.NewContext(t.ctx, t.Peer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
@@ -386,15 +387,14 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
updateFlowControl: t.updateFlowControl,
}
}
for _, sh := range t.statsHandlers {
t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
if t.statsHandler != nil {
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
t.statsHandler.HandleConn(t.ctx, &stats.ConnBegin{
Client: true,
}
sh.HandleConn(t.ctx, connBegin)
})
}
if t.keepaliveEnabled {
t.kpDormancyCond = sync.NewCond(&t.mu)
@@ -478,45 +478,40 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
return t, nil
}
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) *ClientStream {
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &ClientStream{
Stream: &Stream{
Stream: Stream{
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
contentSubtype: callHdr.ContentSubtype,
},
ct: t,
done: make(chan struct{}),
headerChan: make(chan struct{}),
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
ct: t,
done: make(chan struct{}),
headerChan: make(chan struct{}),
doneFunc: callHdr.DoneFunc,
statsHandler: handler,
}
s.Stream.buf.init()
s.Stream.wq.init(defaultWriteQuota, s.done)
s.readRequester = s
// The client side stream context should have exactly the same life cycle with the user provided context.
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
// So we use the original context here instead of creating a copy.
s.ctx = ctx
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctx.Done(),
recv: s.buf,
closeStream: func(err error) {
s.Close(err)
},
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
s.trReader = transportReader{
reader: recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctx.Done(),
recv: &s.buf,
clientStream: s,
},
windowHandler: s,
}
return s
}
func (t *http2Client) getPeer() *peer.Peer {
func (t *http2Client) Peer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
@@ -556,6 +551,22 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
// Make the slice of certain predictable size to reduce allocations made by append.
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
hfLen += len(authData) + len(callAuthData)
registeredCompressors := t.registeredCompressors
if callHdr.AcceptedCompressors != nil {
registeredCompressors = *callHdr.AcceptedCompressors
}
if callHdr.PreviousAttempts > 0 {
hfLen++
}
if callHdr.SendCompress != "" {
hfLen++
}
if registeredCompressors != "" {
hfLen++
}
if _, ok := ctx.Deadline(); ok {
hfLen++
}
headerFields := make([]hpack.HeaderField, 0, hfLen)
headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
@@ -568,7 +579,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
}
registeredCompressors := t.registeredCompressors
if callHdr.SendCompress != "" {
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
// Include the outgoing compressor name when compressor is not registered
@@ -735,8 +745,8 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error) {
ctx = peer.NewContext(ctx, t.Peer())
// ServerName field of the resolver returned address takes precedence over
// Host field of CallHdr to determine the :authority header. This is because,
@@ -772,7 +782,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
if err != nil {
return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
}
s := t.newStream(ctx, callHdr)
s := t.newStream(ctx, callHdr, handler)
cleanup := func(err error) {
if s.swapState(streamDone) == streamDone {
// If it was already done, return.
@@ -811,7 +821,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
return nil
},
onOrphaned: cleanup,
wq: s.wq,
wq: &s.wq,
}
firstTry := true
var ch chan struct{}
@@ -842,7 +852,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
transportDrainRequired = t.nextID > MaxStreamID
s.id = hdr.streamID
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
s.fc = inFlow{limit: uint32(t.initialWindowSize)}
t.activeStreams[s.id] = s
t.mu.Unlock()
@@ -893,27 +903,23 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
}
}
if len(t.statsHandlers) != 0 {
if s.statsHandler != nil {
header, ok := metadata.FromOutgoingContext(ctx)
if ok {
header.Set("user-agent", t.userAgent)
} else {
header = metadata.Pairs("user-agent", t.userAgent)
}
for _, sh := range t.statsHandlers {
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
// Note: Creating a new stats object to prevent pollution.
outHeader := &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
}
sh.HandleRPC(s.ctx, outHeader)
}
// Note: The header fields are compressed with hpack after this call returns.
// No WireLength field is set here.
s.statsHandler.HandleRPC(s.ctx, &stats.OutHeader{
Client: true,
FullMethod: callHdr.Method,
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
Compression: callHdr.SendCompress,
Header: header,
})
}
if transportDrainRequired {
if t.logger.V(logLevel) {
@@ -990,6 +996,9 @@ func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode
// accessed anymore.
func (t *http2Client) Close(err error) {
t.conn.SetWriteDeadline(time.Now().Add(time.Second * 10))
// For background on the deadline value chosen here, see
// https://github.com/grpc/grpc-go/issues/8425#issuecomment-3057938248 .
t.conn.SetReadDeadline(time.Now().Add(time.Second))
t.mu.Lock()
// Make sure we only close once.
if t.state == closing {
@@ -1051,11 +1060,10 @@ func (t *http2Client) Close(err error) {
for _, s := range streams {
t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
}
for _, sh := range t.statsHandlers {
connEnd := &stats.ConnEnd{
if t.statsHandler != nil {
t.statsHandler.HandleConn(t.ctx, &stats.ConnEnd{
Client: true,
}
sh.HandleConn(t.ctx, connEnd)
})
}
}
@@ -1166,7 +1174,7 @@ func (t *http2Client) updateFlowControl(n uint32) {
})
}
func (t *http2Client) handleData(f *http2.DataFrame) {
func (t *http2Client) handleData(f *parsedDataFrame) {
size := f.Header().Length
var sendBDPPing bool
if t.bdpEst != nil {
@@ -1210,22 +1218,15 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
return
}
dataLen := f.data.Len()
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
pool := t.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
}
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
if dataLen > 0 {
f.data.Ref()
s.write(recvMsg{buffer: f.data})
}
}
// The server has closed the stream without sending trailers. Record that
@@ -1465,17 +1466,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
recvCompress string
httpStatusCode *int
httpStatusErr string
rawStatusCode = codes.Unknown
// the code from the grpc-status header, if present
grpcStatusCode = codes.Unknown
// headerError is set if an error is encountered while parsing the headers
headerError string
httpStatus string
)
if initialHeader {
httpStatusErr = "malformed header: missing HTTP status"
}
for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
@@ -1491,35 +1489,15 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
case "grpc-status":
code, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
se := status.New(codes.Unknown, fmt.Sprintf("transport: malformed grpc-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
rawStatusCode = codes.Code(uint32(code))
grpcStatusCode = codes.Code(uint32(code))
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case ":status":
if hf.Value == "200" {
httpStatusErr = ""
statusCode := 200
httpStatusCode = &statusCode
break
}
c, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
statusCode := int(c)
httpStatusCode = &statusCode
httpStatusErr = fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
statusCode,
http.StatusText(statusCode),
)
httpStatus = hf.Value
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
@@ -1534,25 +1512,52 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}
if !isGRPC || httpStatusErr != "" {
var code = codes.Internal // when header does not include HTTP status, return INTERNAL
if httpStatusCode != nil {
// If a non-gRPC response is received, then evaluate the HTTP status to
// process the response and close the stream.
// In case http status doesn't provide any error information (status : 200),
// then evalute response code to be Unknown.
if !isGRPC {
var grpcErrorCode = codes.Internal
if httpStatus == "" {
httpStatusErr = "malformed header: missing HTTP status"
} else {
// Parse the status codes (e.g. "200", 404").
statusCode, err := strconv.Atoi(httpStatus)
if err != nil {
se := status.New(grpcErrorCode, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
if statusCode >= 100 && statusCode < 200 {
if endStream {
se := status.New(codes.Internal, fmt.Sprintf(
"protocol error: informational header with status code %d must not have END_STREAM set", statusCode))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
}
// In case of informational headers, return.
return
}
httpStatusErr = fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
statusCode,
http.StatusText(statusCode),
)
var ok bool
code, ok = HTTPStatusConvTab[*httpStatusCode]
grpcErrorCode, ok = HTTPStatusConvTab[statusCode]
if !ok {
code = codes.Unknown
grpcErrorCode = codes.Unknown
}
}
var errs []string
if httpStatusErr != "" {
errs = append(errs, httpStatusErr)
}
if contentTypeErr != "" {
errs = append(errs, contentTypeErr)
}
// Verify the HTTP response is a 200.
se := status.New(code, strings.Join(errs, "; "))
se := status.New(grpcErrorCode, strings.Join(errs, "; "))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
@@ -1583,22 +1588,20 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
}
}
for _, sh := range t.statsHandlers {
if s.statsHandler != nil {
if !endStream {
inHeader := &stats.InHeader{
s.statsHandler.HandleRPC(s.ctx, &stats.InHeader{
Client: true,
WireLength: int(frame.Header().Length),
Header: metadata.MD(mdata).Copy(),
Compression: s.recvCompress,
}
sh.HandleRPC(s.ctx, inHeader)
})
} else {
inTrailer := &stats.InTrailer{
s.statsHandler.HandleRPC(s.ctx, &stats.InTrailer{
Client: true,
WireLength: int(frame.Header().Length),
Trailer: metadata.MD(mdata).Copy(),
}
sh.HandleRPC(s.ctx, inTrailer)
})
}
}
@@ -1606,7 +1609,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}
status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
status := istatus.NewWithProto(grpcStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
@@ -1653,7 +1656,7 @@ func (t *http2Client) reader(errCh chan<- error) {
// loop to keep reading incoming messages on this transport.
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
frame, err := t.framer.readFrame()
if t.keepaliveEnabled {
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
}
@@ -1668,7 +1671,7 @@ func (t *http2Client) reader(errCh chan<- error) {
if s != nil {
// use error detail to provide better err message
code := http2ErrConvTab[se.Code]
errorDetail := t.framer.fr.ErrorDetail()
errorDetail := t.framer.errorDetail()
var msg string
if errorDetail != nil {
msg = errorDetail.Error()
@@ -1686,8 +1689,9 @@ func (t *http2Client) reader(errCh chan<- error) {
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
t.operateHeaders(frame)
case *http2.DataFrame:
case *parsedDataFrame:
t.handleData(frame)
frame.data.Free()
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
@@ -1807,8 +1811,6 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
}
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) incrMsgSent() {
if channelz.IsOn() {
t.channelz.SocketMetrics.MessagesSent.Add(1)

View File

@@ -35,6 +35,8 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcutil"
@@ -42,7 +44,6 @@ import (
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/mem"
"google.golang.org/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
@@ -86,7 +87,7 @@ type http2Server struct {
// updates, reset streams, and various settings) to the controller.
controlBuf *controlBuffer
fc *trInFlow
stats []stats.Handler
stats stats.Handler
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
// Keepalive enforcement policy.
@@ -168,7 +169,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
if config.MaxHeaderListSize != nil {
maxHeaderListSize = *config.MaxHeaderListSize
}
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize, config.BufferPool)
// Send initial settings as connection preface to client.
isettings := []http2.Setting{{
ID: http2.SettingMaxFrameSize,
@@ -260,7 +261,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
fc: &trInFlow{limit: uint32(icwz)},
state: reachable,
activeStreams: make(map[uint32]*ServerStream),
stats: config.StatsHandlers,
stats: config.StatsHandler,
kp: kp,
idle: time.Now(),
kep: kep,
@@ -390,16 +391,15 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
}
t.maxStreamID = streamID
buf := newRecvBuffer()
s := &ServerStream{
Stream: &Stream{
id: streamID,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
Stream: Stream{
id: streamID,
fc: inFlow{limit: uint32(t.initialWindowSize)},
},
st: t,
headerWireLength: int(frame.Header().Length),
}
s.Stream.buf.init()
var (
// if false, content-type was missing or invalid
isGRPC = false
@@ -479,13 +479,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(streamID, s.contentSubtype, status.New(codes.Internal, errMsg), http.StatusBadRequest, !frame.StreamEnded())
return nil
}
@@ -499,23 +493,11 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
return nil
}
if !isGRPC {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusUnsupportedMediaType,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(streamID, s.contentSubtype, status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType), http.StatusUnsupportedMediaType, !frame.StreamEnded())
return nil
}
if headerError != nil {
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusBadRequest,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: headerError,
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(streamID, s.contentSubtype, headerError, http.StatusBadRequest, !frame.StreamEnded())
return nil
}
@@ -569,13 +551,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
if t.logger.V(logLevel) {
t.logger.Infof("Aborting the stream early: %v", errMsg)
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusMethodNotAllowed,
streamID: streamID,
contentSubtype: s.contentSubtype,
status: status.New(codes.Internal, errMsg),
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(streamID, s.contentSubtype, status.New(codes.Internal, errMsg), http.StatusMethodNotAllowed, !frame.StreamEnded())
s.cancel()
return nil
}
@@ -590,27 +566,16 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
if !ok {
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusOK,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(s.id, s.contentSubtype, stat, http.StatusOK, !frame.StreamEnded())
return nil
}
}
if s.ctx.Err() != nil {
t.mu.Unlock()
st := status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
// Early abort in case the timeout was zero or so low it already fired.
t.controlBuf.put(&earlyAbortStream{
httpStatus: http.StatusOK,
streamID: s.id,
contentSubtype: s.contentSubtype,
status: status.New(codes.DeadlineExceeded, context.DeadlineExceeded.Error()),
rst: !frame.StreamEnded(),
})
t.writeEarlyAbort(s.id, s.contentSubtype, st, http.StatusOK, !frame.StreamEnded())
return nil
}
@@ -640,25 +605,21 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
t.channelz.SocketMetrics.StreamsStarted.Add(1)
t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
}
s.requestRead = func(n int) {
t.adjustWindow(s, uint32(n))
}
s.readRequester = s
s.ctxDone = s.ctx.Done()
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
s.trReader = &transportReader{
reader: &recvBufferReader{
s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
s.trReader = transportReader{
reader: recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
recv: &s.buf,
},
windowHandler: s,
}
// Register the stream with loopy.
t.controlBuf.put(&registerStream{
streamID: s.id,
wq: s.wq,
wq: &s.wq,
})
handle(s)
return nil
@@ -674,7 +635,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
}()
for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame()
frame, err := t.framer.readFrame()
atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
if err != nil {
if se, ok := err.(http2.StreamError); ok {
@@ -711,8 +672,9 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStre
})
continue
}
case *http2.DataFrame:
case *parsedDataFrame:
t.handleData(frame)
frame.data.Free()
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
@@ -792,7 +754,7 @@ func (t *http2Server) updateFlowControl(n uint32) {
}
func (t *http2Server) handleData(f *http2.DataFrame) {
func (t *http2Server) handleData(f *parsedDataFrame) {
size := f.Header().Length
var sendBDPPing bool
if t.bdpEst != nil {
@@ -837,22 +799,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
t.closeStream(s, true, http2.ErrCodeFlowControl, false)
return
}
dataLen := f.data.Len()
if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
if w := s.fc.onRead(size - uint32(dataLen)); w > 0 {
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
}
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
pool := t.bufferPool
if pool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream is
// always initialized with a BufferPool.
pool = mem.DefaultBufferPool()
}
s.write(recvMsg{buffer: mem.Copy(f.Data(), pool)})
if dataLen > 0 {
f.data.Ref()
s.write(recvMsg{buffer: f.data})
}
}
if f.StreamEnded() {
@@ -979,13 +934,12 @@ func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD)
return headerFields
}
func (t *http2Server) checkForHeaderListSize(it any) bool {
func (t *http2Server) checkForHeaderListSize(hf []hpack.HeaderField) bool {
if t.maxSendHeaderListSize == nil {
return true
}
hdrFrame := it.(*headerFrame)
var sz int64
for _, f := range hdrFrame.hf {
for _, f := range hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
if t.logger.V(logLevel) {
t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
@@ -996,6 +950,42 @@ func (t *http2Server) checkForHeaderListSize(it any) bool {
return true
}
// writeEarlyAbort sends an early abort response with the given HTTP status and
// gRPC status. If the header list size exceeds the peer's limit, it sends a
// RST_STREAM instead.
func (t *http2Server) writeEarlyAbort(streamID uint32, contentSubtype string, stat *status.Status, httpStatus uint32, rst bool) {
hf := []hpack.HeaderField{
{Name: ":status", Value: strconv.Itoa(int(httpStatus))},
{Name: "content-type", Value: grpcutil.ContentType(contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(stat.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(stat.Message())},
}
if p := istatus.RawStatusProto(stat); len(p.GetDetails()) > 0 {
stBytes, err := proto.Marshal(p)
if err != nil {
t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
}
if err == nil {
hf = append(hf, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
}
}
success, _ := t.controlBuf.executeAndPut(func() bool {
return t.checkForHeaderListSize(hf)
}, &earlyAbortStream{
streamID: streamID,
rst: rst,
hf: hf,
})
if !success {
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeInternal,
onWrite: func() {},
})
}
}
func (t *http2Server) streamContextErr(s *ServerStream) error {
select {
case <-t.done:
@@ -1051,7 +1041,7 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
endStream: false,
onWrite: t.setResetPingStrikes,
}
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf.hf) }, hf)
if !success {
if err != nil {
return err
@@ -1059,14 +1049,13 @@ func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
t.closeStream(s, true, http2.ErrCodeInternal, false)
return ErrHeaderListSizeLimitViolation
}
for _, sh := range t.stats {
if t.stats != nil {
// Note: Headers are compressed with hpack after this call returns.
// No WireLength field is set here.
outHeader := &stats.OutHeader{
t.stats.HandleRPC(s.Context(), &stats.OutHeader{
Header: s.header.Copy(),
Compression: s.sendCompress,
}
sh.HandleRPC(s.Context(), outHeader)
})
}
return nil
}
@@ -1122,7 +1111,7 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
}
success, err := t.controlBuf.executeAndPut(func() bool {
return t.checkForHeaderListSize(trailingHeader)
return t.checkForHeaderListSize(trailingHeader.hf)
}, nil)
if !success {
if err != nil {
@@ -1134,10 +1123,10 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
// Send a RST_STREAM after the trailers if the client has not already half-closed.
rst := s.getState() == streamActive
t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
for _, sh := range t.stats {
if t.stats != nil {
// Note: The trailer fields are compressed with hpack after this call returns.
// No WireLength field is set here.
sh.HandleRPC(s.Context(), &stats.OutTrailer{
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{
Trailer: s.trailer.Copy(),
})
}
@@ -1305,7 +1294,8 @@ func (t *http2Server) Close(err error) {
// deleteStream deletes the stream s from transport's active streams.
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
t.mu.Lock()
if _, ok := t.activeStreams[s.id]; ok {
_, isActive := t.activeStreams[s.id]
if isActive {
delete(t.activeStreams, s.id)
if len(t.activeStreams) == 0 {
t.idle = time.Now()
@@ -1313,7 +1303,7 @@ func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
}
t.mu.Unlock()
if channelz.IsOn() {
if isActive && channelz.IsOn() {
if eosReceived {
t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
} else {
@@ -1353,10 +1343,10 @@ func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCo
// called to interrupt the potential blocking on other goroutines.
s.cancel()
oldState := s.swapState(streamDone)
if oldState == streamDone {
return
}
// We can't return early even if the stream's state is "done" as the state
// might have been set by the `finishStream` method. Deleting the stream via
// `finishStream` can get blocked on flow control.
s.swapState(streamDone)
t.deleteStream(s, eosReceived)
t.controlBuf.put(&cleanupStream{

View File

@@ -25,7 +25,6 @@ import (
"fmt"
"io"
"math"
"net"
"net/http"
"net/url"
"strconv"
@@ -37,6 +36,7 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/mem"
)
const (
@@ -300,11 +300,11 @@ type bufWriter struct {
buf []byte
offset int
batchSize int
conn net.Conn
conn io.Writer
err error
}
func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
func newBufWriter(conn io.Writer, batchSize int, pool *sync.Pool) *bufWriter {
w := &bufWriter{
batchSize: batchSize,
conn: conn,
@@ -388,15 +388,29 @@ func toIOError(err error) error {
return ioError{error: err}
}
type parsedDataFrame struct {
http2.FrameHeader
data mem.Buffer
}
func (df *parsedDataFrame) StreamEnded() bool {
return df.FrameHeader.Flags.Has(http2.FlagDataEndStream)
}
type framer struct {
writer *bufWriter
fr *http2.Framer
writer *bufWriter
fr *http2.Framer
headerBuf []byte // cached slice for framer headers to reduce heap allocs.
reader io.Reader
dataFrame parsedDataFrame // Cached data frame to avoid heap allocations.
pool mem.BufferPool
errDetail error
}
var writeBufferPoolMap = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer {
if writeBufferSize < 0 {
writeBufferSize = 0
}
@@ -412,6 +426,8 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
f := &framer{
writer: w,
fr: http2.NewFramer(w, r),
reader: r,
pool: memPool,
}
f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
// Opt-in to Frame reuse API on framer to reduce garbage.
@@ -422,6 +438,146 @@ func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBu
return f
}
// writeData writes a DATA frame.
//
// It is the caller's responsibility not to violate the maximum frame size.
func (f *framer) writeData(streamID uint32, endStream bool, data [][]byte) error {
var flags http2.Flags
if endStream {
flags = http2.FlagDataEndStream
}
length := uint32(0)
for _, d := range data {
length += uint32(len(d))
}
// TODO: Replace the header write with the framer API being added in
// https://github.com/golang/go/issues/66655.
f.headerBuf = append(f.headerBuf[:0],
byte(length>>16),
byte(length>>8),
byte(length),
byte(http2.FrameData),
byte(flags),
byte(streamID>>24),
byte(streamID>>16),
byte(streamID>>8),
byte(streamID))
if _, err := f.writer.Write(f.headerBuf); err != nil {
return err
}
for _, d := range data {
if _, err := f.writer.Write(d); err != nil {
return err
}
}
return nil
}
// readFrame reads a single frame. The returned Frame is only valid
// until the next call to readFrame.
func (f *framer) readFrame() (any, error) {
f.errDetail = nil
fh, err := f.fr.ReadFrameHeader()
if err != nil {
f.errDetail = f.fr.ErrorDetail()
return nil, err
}
// Read the data frame directly from the underlying io.Reader to avoid
// copies.
if fh.Type == http2.FrameData {
err = f.readDataFrame(fh)
return &f.dataFrame, err
}
fr, err := f.fr.ReadFrameForHeader(fh)
if err != nil {
f.errDetail = f.fr.ErrorDetail()
return nil, err
}
return fr, err
}
// errorDetail returns a more detailed error of the last error
// returned by framer.readFrame. For instance, if readFrame
// returns a StreamError with code PROTOCOL_ERROR, errorDetail
// will say exactly what was invalid. errorDetail is not guaranteed
// to return a non-nil value.
// errorDetail is reset after the next call to readFrame.
func (f *framer) errorDetail() error {
return f.errDetail
}
func (f *framer) readDataFrame(fh http2.FrameHeader) (err error) {
if fh.StreamID == 0 {
// DATA frames MUST be associated with a stream. If a
// DATA frame is received whose stream identifier
// field is 0x0, the recipient MUST respond with a
// connection error (Section 5.4.1) of type
// PROTOCOL_ERROR.
f.errDetail = errors.New("DATA frame with stream ID 0")
return http2.ConnectionError(http2.ErrCodeProtocol)
}
// Converting a *[]byte to a mem.SliceBuffer incurs a heap allocation. This
// conversion is performed by mem.NewBuffer. To avoid the extra allocation
// a []byte is allocated directly if required and cast to a mem.SliceBuffer.
var buf []byte
// poolHandle is the pointer returned by the buffer pool (if it's used.).
var poolHandle *[]byte
useBufferPool := !mem.IsBelowBufferPoolingThreshold(int(fh.Length))
if useBufferPool {
poolHandle = f.pool.Get(int(fh.Length))
buf = *poolHandle
defer func() {
if err != nil {
f.pool.Put(poolHandle)
}
}()
} else {
buf = make([]byte, int(fh.Length))
}
if fh.Flags.Has(http2.FlagDataPadded) {
if fh.Length == 0 {
return io.ErrUnexpectedEOF
}
// This initial 1-byte read can be inefficient for unbuffered readers,
// but it allows the rest of the payload to be read directly to the
// start of the destination slice. This makes it easy to return the
// original slice back to the buffer pool.
if _, err := io.ReadFull(f.reader, buf[:1]); err != nil {
return err
}
padSize := buf[0]
buf = buf[:len(buf)-1]
if int(padSize) > len(buf) {
// If the length of the padding is greater than the
// length of the frame payload, the recipient MUST
// treat this as a connection error.
// Filed: https://github.com/http2/http2-spec/issues/610
f.errDetail = errors.New("pad size larger than data payload")
return http2.ConnectionError(http2.ErrCodeProtocol)
}
if _, err := io.ReadFull(f.reader, buf); err != nil {
return err
}
buf = buf[:len(buf)-int(padSize)]
} else if _, err := io.ReadFull(f.reader, buf); err != nil {
return err
}
f.dataFrame.FrameHeader = fh
if useBufferPool {
// Update the handle to point to the (potentially re-sliced) buf.
*poolHandle = buf
f.dataFrame.data = mem.NewBuffer(poolHandle, f.pool)
} else {
f.dataFrame.data = mem.SliceBuffer(buf)
}
return nil
}
func (df *parsedDataFrame) Header() http2.FrameHeader {
return df.FrameHeader
}
func getWriteBufferPool(size int) *sync.Pool {
writeBufferMutex.Lock()
defer writeBufferMutex.Unlock()

View File

@@ -32,7 +32,7 @@ import (
// ServerStream implements streaming functionality for a gRPC server.
type ServerStream struct {
*Stream // Embed for common stream functionality.
Stream // Embed for common stream functionality.
st internalServerTransport
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
@@ -43,12 +43,13 @@ type ServerStream struct {
// Holds compressor names passed in grpc-accept-encoding metadata from the
// client.
clientAdvertisedCompressors string
headerWireLength int
// hdrMu protects outgoing header and trailer metadata.
hdrMu sync.Mutex
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
headerSent atomic.Bool // atomically set when the headers are sent out.
headerWireLength int
}
// Read reads an n byte message from the input stream.
@@ -178,3 +179,11 @@ func (s *ServerStream) SetTrailer(md metadata.MD) error {
s.hdrMu.Unlock()
return nil
}
func (s *ServerStream) requestRead(n int) {
s.st.adjustWindow(s, uint32(n))
}
func (s *ServerStream) updateWindow(n int) {
s.st.updateWindow(s, uint32(n))
}

View File

@@ -68,11 +68,11 @@ type recvBuffer struct {
err error
}
func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
c: make(chan recvMsg, 1),
}
return b
// init allows a recvBuffer to be initialized in-place, which is useful
// for resetting a buffer or for avoiding a heap allocation when the buffer
// is embedded in another struct.
func (b *recvBuffer) init() {
b.c = make(chan recvMsg, 1)
}
func (b *recvBuffer) put(r recvMsg) {
@@ -123,12 +123,13 @@ func (b *recvBuffer) get() <-chan recvMsg {
// recvBufferReader implements io.Reader interface to read the data from
// recvBuffer.
type recvBufferReader struct {
closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last mem.Buffer // Stores the remaining data in the previous calls.
err error
_ noCopy
clientStream *ClientStream // The client transport stream is closed with a status representing ctx.Err() and nil trailer metadata.
ctx context.Context
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
recv *recvBuffer
last mem.Buffer // Stores the remaining data in the previous calls.
err error
}
func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
@@ -139,7 +140,7 @@ func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
n, r.last = mem.ReadUnsafe(header, r.last)
return n, nil
}
if r.closeStream != nil {
if r.clientStream != nil {
n, r.err = r.readMessageHeaderClient(header)
} else {
n, r.err = r.readMessageHeader(header)
@@ -164,7 +165,7 @@ func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
}
return buf, nil
}
if r.closeStream != nil {
if r.clientStream != nil {
buf, r.err = r.readClient(n)
} else {
buf, r.err = r.read(n)
@@ -209,7 +210,7 @@ func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err er
// TODO: delaying ctx error seems like a unnecessary side effect. What
// we really want is to mark the stream as done, and return ctx error
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
r.clientStream.Close(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readMessageHeaderAdditional(m, header)
case m := <-r.recv.get():
@@ -236,7 +237,7 @@ func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
// TODO: delaying ctx error seems like a unnecessary side effect. What
// we really want is to mark the stream as done, and return ctx error
// faster.
r.closeStream(ContextErr(r.ctx.Err()))
r.clientStream.Close(ContextErr(r.ctx.Err()))
m := <-r.recv.get()
return r.readAdditional(m, n)
case m := <-r.recv.get():
@@ -285,27 +286,32 @@ const (
// Stream represents an RPC in the transport layer.
type Stream struct {
id uint32
ctx context.Context // the associated context of the stream
method string // the associated RPC method of the stream
recvCompress string
sendCompress string
buf *recvBuffer
trReader *transportReader
fc *inFlow
wq *writeQuota
// Callback to state application's intentions to read data. This
// is used to adjust flow control, if needed.
requestRead func(int)
state streamState
readRequester readRequester
// contentSubtype is the content-subtype for requests.
// this must be lowercase or the behavior is undefined.
contentSubtype string
trailer metadata.MD // the key-value map of trailer metadata.
// Non-pointer fields are at the end to optimize GC performance.
state streamState
id uint32
buf recvBuffer
trReader transportReader
fc inFlow
wq writeQuota
}
// readRequester is used to state application's intentions to read data. This
// is used to adjust flow control, if needed.
type readRequester interface {
requestRead(int)
}
func (s *Stream) swapState(st streamState) streamState {
@@ -355,7 +361,7 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) {
if er := s.trReader.er; er != nil {
return er
}
s.requestRead(len(header))
s.readRequester.requestRead(len(header))
for len(header) != 0 {
n, err := s.trReader.ReadMessageHeader(header)
header = header[n:]
@@ -372,13 +378,29 @@ func (s *Stream) ReadMessageHeader(header []byte) (err error) {
return nil
}
// ceil returns the ceil after dividing the numerator and denominator while
// avoiding integer overflows.
func ceil(numerator, denominator int) int {
if numerator == 0 {
return 0
}
return (numerator-1)/denominator + 1
}
// Read reads n bytes from the wire for this stream.
func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {
return nil, er
}
s.requestRead(n)
// gRPC Go accepts data frames with a maximum length of 16KB. Larger
// messages must be split into multiple frames. We pre-allocate the
// buffer to avoid resizing during the read loop, but cap the initial
// capacity to 128 frames (2MB) to prevent over-allocation or panics
// when reading extremely large streams.
allocCap := min(ceil(n, http2MaxFrameLen), 128)
data = make(mem.BufferSlice, 0, allocCap)
s.readRequester.requestRead(n)
for n != 0 {
buf, err := s.trReader.Read(n)
var bufLen int
@@ -401,16 +423,34 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
return data, nil
}
// noCopy may be embedded into structs which must not be copied
// after the first use.
//
// See https://golang.org/issues/8005#issuecomment-190753527
// for details.
type noCopy struct {
}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
// transportReader reads all the data available for this Stream from the transport and
// passes them into the decoder, which converts them into a gRPC message stream.
// The error is io.EOF when the stream is done or another non-nil error if
// the stream broke.
type transportReader struct {
reader *recvBufferReader
_ noCopy
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
windowHandler func(int)
windowHandler windowHandler
er error
reader recvBufferReader
}
// The handler to control the window update procedure for both this
// particular stream and the associated transport.
type windowHandler interface {
updateWindow(int)
}
func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
@@ -419,7 +459,7 @@ func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
t.er = err
return 0, err
}
t.windowHandler(n)
t.windowHandler.updateWindow(n)
return n, nil
}
@@ -429,7 +469,7 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {
t.er = err
return buf, err
}
t.windowHandler(buf.Len())
t.windowHandler.updateWindow(buf.Len())
return buf, nil
}
@@ -454,7 +494,7 @@ type ServerConfig struct {
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandlers []stats.Handler
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
@@ -529,6 +569,12 @@ type CallHdr struct {
// outbound message.
SendCompress string
// AcceptedCompressors overrides the grpc-accept-encoding header for this
// call. When nil, the transport advertises the default set of registered
// compressors. A non-nil pointer overrides that value (including the empty
// string to advertise none).
AcceptedCompressors *string
// Creds specifies credentials.PerRPCCredentials for a call.
Creds credentials.PerRPCCredentials
@@ -544,9 +590,14 @@ type CallHdr struct {
DoneFunc func() // called when the stream is finished
// Authority is used to explicitly override the `:authority` header. If set,
// this value takes precedence over the Host field and will be used as the
// value for the `:authority` header.
// Authority is used to explicitly override the `:authority` header.
//
// This value comes from one of two sources:
// 1. The `CallAuthority` call option, if specified by the user.
// 2. An override provided by the LB picker (e.g. xDS authority rewriting).
//
// The `CallAuthority` call option always takes precedence over the LB
// picker override.
Authority string
}
@@ -566,7 +617,7 @@ type ClientTransport interface {
GracefulClose()
// NewStream creates a Stream for an RPC.
NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
NewStream(ctx context.Context, callHdr *CallHdr, handler stats.Handler) (*ClientStream, error)
// Error returns a channel that is closed when some I/O error
// happens. Typically the caller should have a goroutine to monitor
@@ -584,8 +635,9 @@ type ClientTransport interface {
// with a human readable string with debug info.
GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
// Peer returns information about the peer associated with the Transport.
// The returned information includes authentication and network address details.
Peer() *peer.Peer
}
// ServerTransport is the common interface for all gRPC server-side transport
@@ -615,6 +667,8 @@ type internalServerTransport interface {
write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
writeStatus(s *ServerStream, st *status.Status) error
incrMsgRecv()
adjustWindow(s *ServerStream, n uint32)
updateWindow(s *ServerStream, n uint32)
}
// connectionErrorf creates an ConnectionError with the specified error description.