Merge pull request #1584 from crazy-max/dependabot/go_modules/google.golang.org/grpc-1.78.0

chore(deps): bump google.golang.org/grpc from 1.77.0 to 1.78.0
This commit is contained in:
CrazyMax
2025-12-24 12:28:38 +01:00
committed by GitHub
19 changed files with 410 additions and 109 deletions

4
go.mod
View File

@@ -41,7 +41,7 @@ require (
go.podman.io/image/v5 v5.38.0
golang.org/x/mod v0.31.0
golang.org/x/sys v0.39.0
google.golang.org/grpc v1.77.0
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.34.1
@@ -148,7 +148,7 @@ require (
golang.org/x/term v0.38.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect

12
go.sum
View File

@@ -485,12 +485,12 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/genproto v0.0.0-20250324211829-b45e905df463 h1:qEFnJI6AnfZk0NNe8YTyXQh5i//Zxi4gBHwRgp76qpw=
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 h1:mepRgnBZa07I4TRuomDE4sTIYieg/osKmzIf4USdWS4=
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda h1:+2XxjfsAu6vqFxwGBRcHiMaDCuZiqXGDUDVWVtrFAnE=
google.golang.org/genproto/googleapis/api v0.0.0-20251029180050-ab9386a59fda/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda h1:i/Q+bfisr7gq6feoJnS/DlpdwEL4ihp41fvRiM3Ork0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc=
google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.0 h1:6Al3kEFFP9VJhRz3DID6quisgPnTeZVr4lep9kkxdPA=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.6.0/go.mod h1:QLvsjh0OIR0TYBeiu2bkWGTJBUNQ64st52iWj/yA93I=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=

View File

@@ -35,6 +35,8 @@ import (
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
expstats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
@@ -98,6 +100,41 @@ var (
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
)
var (
disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "{disconnection}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"},
Default: false,
})
connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{
Name: "grpc.subchannel.open_connections",
Description: "EXPERIMENTAL. Number of open connections.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"},
Default: false,
})
)
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
@@ -262,9 +299,10 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}()
// This creates the name resolver, load balancer, etc.
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
return nil, err
if err := cc.exitIdleMode(); err != nil {
return nil, fmt.Errorf("failed to exit idle mode: %w", err)
}
cc.idlenessMgr.UnsafeSetNotIdle()
// Return now for non-blocking dials.
if !cc.dopts.block {
@@ -332,7 +370,7 @@ func (cc *ClientConn) addTraceEvent(msg string) {
Severity: channelz.CtInfo,
}
}
channelz.AddTraceEvent(logger, cc.channelz, 0, ted)
channelz.AddTraceEvent(logger, cc.channelz, 1, ted)
}
type idler ClientConn
@@ -341,14 +379,17 @@ func (i *idler) EnterIdleMode() {
(*ClientConn)(i).enterIdleMode()
}
func (i *idler) ExitIdleMode() error {
return (*ClientConn)(i).exitIdleMode()
func (i *idler) ExitIdleMode() {
// Ignore the error returned from this method, because from the perspective
// of the caller (idleness manager), the channel would have always moved out
// of IDLE by the time this method returns.
(*ClientConn)(i).exitIdleMode()
}
// exitIdleMode moves the channel out of idle mode by recreating the name
// resolver and load balancer. This should never be called directly; use
// cc.idlenessMgr.ExitIdleMode instead.
func (cc *ClientConn) exitIdleMode() (err error) {
func (cc *ClientConn) exitIdleMode() error {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
@@ -356,11 +397,23 @@ func (cc *ClientConn) exitIdleMode() (err error) {
}
cc.mu.Unlock()
// Set state to CONNECTING before building the name resolver
// so the channel does not remain in IDLE.
cc.csMgr.updateState(connectivity.Connecting)
// This needs to be called without cc.mu because this builds a new resolver
// which might update state or report error inline, which would then need to
// acquire cc.mu.
if err := cc.resolverWrapper.start(); err != nil {
return err
// If resolver creation fails, treat it like an error reported by the
// resolver before any valid updates. Set channel's state to
// TransientFailure, and set an erroring picker with the resolver build
// error, which will returned as part of any subsequent RPCs.
logger.Warningf("Failed to start resolver: %v", err)
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Lock()
cc.updateResolverStateAndUnlock(resolver.State{}, err)
return fmt.Errorf("failed to start resolver: %w", err)
}
cc.addTraceEvent("exiting idle mode")
@@ -681,10 +734,8 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
if err := cc.idlenessMgr.ExitIdleMode(); err != nil {
cc.addTraceEvent(err.Error())
return
}
cc.idlenessMgr.ExitIdleMode()
// If the ClientConn was not in idle mode, we need to call ExitIdle on the
// LB policy so that connections can be created.
cc.mu.Lock()
@@ -735,8 +786,8 @@ func init() {
internal.EnterIdleModeForTesting = func(cc *ClientConn) {
cc.idlenessMgr.EnterIdleModeForTesting()
}
internal.ExitIdleModeForTesting = func(cc *ClientConn) error {
return cc.idlenessMgr.ExitIdleMode()
internal.ExitIdleModeForTesting = func(cc *ClientConn) {
cc.idlenessMgr.ExitIdleMode()
}
}
@@ -861,6 +912,7 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
}
ac.updateTelemetryLabelsLocked()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Start with our address set to the first address; this may be updated if
// we connect to different addresses.
@@ -977,7 +1029,7 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
}
ac.addrs = addrs
ac.updateTelemetryLabelsLocked()
if ac.state == connectivity.Shutdown ||
ac.state == connectivity.TransientFailure ||
ac.state == connectivity.Idle {
@@ -1216,6 +1268,9 @@ type addrConn struct {
resetBackoff chan struct{}
channelz *channelz.SubChannel
localityLabel string
backendServiceLabel string
}
// Note: this requires a lock on ac.mu.
@@ -1223,6 +1278,18 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}
// If we are transitioning out of Ready, it means there is a disconnection.
// A SubConn can also transition from CONNECTING directly to IDLE when
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
// TODO: https://github.com/grpc/grpc-go/issues/7862 - Remove the second
// part of the if condition below once the issue is fixed.
if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel, "unknown")
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
}
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
@@ -1280,6 +1347,15 @@ func (ac *addrConn) resetTransportAndUnlock() {
ac.mu.Unlock()
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
if !errors.Is(err, context.Canceled) {
connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
} else {
if logger.V(2) {
// This records cancelled connection attempts which can be later
// replaced by a metric.
logger.Infof("Context cancellation detected; not recording this as a failed connection attempt.")
}
}
// TODO: #7534 - Move re-resolution requests into the pick_first LB policy
// to ensure one resolution request per pass instead of per subconn failure.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
@@ -1319,10 +1395,50 @@ func (ac *addrConn) resetTransportAndUnlock() {
}
// Success; reset backoff.
ac.mu.Lock()
connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.localityLabel)
openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, ac.backendServiceLabel, ac.securityLevelLocked(), ac.localityLabel)
ac.backoffIdx = 0
ac.mu.Unlock()
}
// updateTelemetryLabelsLocked calculates and caches the telemetry labels based on the
// first address in addrConn.
func (ac *addrConn) updateTelemetryLabelsLocked() {
labelsFunc, ok := internal.AddressToTelemetryLabels.(func(resolver.Address) map[string]string)
if !ok || len(ac.addrs) == 0 {
// Reset defaults
ac.localityLabel = ""
ac.backendServiceLabel = ""
return
}
labels := labelsFunc(ac.addrs[0])
ac.localityLabel = labels["grpc.lb.locality"]
ac.backendServiceLabel = labels["grpc.lb.backend_service"]
}
type securityLevelKey struct{}
func (ac *addrConn) securityLevelLocked() string {
var secLevel string
// During disconnection, ac.transport is nil. Fall back to the security level
// stored in the current address during connection.
if ac.transport == nil {
secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string)
return secLevel
}
authInfo := ac.transport.Peer().AuthInfo
if ci, ok := authInfo.(interface {
GetCommonAuthInfo() credentials.CommonAuthInfo
}); ok {
secLevel = ci.GetCommonAuthInfo().SecurityLevel.String()
// Store the security level in the current address' attributes so
// that it remains available for disconnection metrics after the
// transport is closed.
ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel)
}
return secLevel
}
// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.

View File

@@ -76,6 +76,7 @@ const (
MetricTypeFloatHisto
MetricTypeIntGauge
MetricTypeIntUpDownCount
MetricTypeIntAsyncGauge
)
// Int64CountHandle is a typed handle for a int count metric. This handle
@@ -172,6 +173,30 @@ func (h *Int64GaugeHandle) Record(recorder MetricsRecorder, incr int64, labels .
recorder.RecordInt64Gauge(h, incr, labels...)
}
// AsyncMetric is a marker interface for asynchronous metric types.
type AsyncMetric interface {
isAsync()
Descriptor() *MetricDescriptor
}
// Int64AsyncGaugeHandle is a typed handle for an int gauge metric. This handle is
// passed at the recording point in order to know which metric to record on.
type Int64AsyncGaugeHandle MetricDescriptor
// isAsync implements the AsyncMetric interface.
func (h *Int64AsyncGaugeHandle) isAsync() {}
// Descriptor returns the int64 gauge handle typecast to a pointer to a
// MetricDescriptor.
func (h *Int64AsyncGaugeHandle) Descriptor() *MetricDescriptor {
return (*MetricDescriptor)(h)
}
// Record records the int64 gauge value on the metrics recorder provided.
func (h *Int64AsyncGaugeHandle) Record(recorder AsyncMetricsRecorder, value int64, labels ...string) {
recorder.RecordInt64AsyncGauge(h, value, labels...)
}
// registeredMetrics are the registered metric descriptor names.
var registeredMetrics = make(map[string]bool)
@@ -282,6 +307,20 @@ func RegisterInt64UpDownCount(descriptor MetricDescriptor) *Int64UpDownCountHand
return (*Int64UpDownCountHandle)(descPtr)
}
// RegisterInt64AsyncGauge registers the metric description onto the global registry.
// It returns a typed handle to use for recording data.
//
// NOTE: this function must only be called during initialization time (i.e. in
// an init() function), and is not thread-safe. If multiple metrics are
// registered with the same name, this function will panic.
func RegisterInt64AsyncGauge(descriptor MetricDescriptor) *Int64AsyncGaugeHandle {
registerMetric(descriptor.Name, descriptor.Default)
descriptor.Type = MetricTypeIntAsyncGauge
descPtr := &descriptor
metricsRegistry[descriptor.Name] = descPtr
return (*Int64AsyncGaugeHandle)(descPtr)
}
// snapshotMetricsRegistryForTesting snapshots the global data of the metrics
// registry. Returns a cleanup function that sets the metrics registry to its
// original state.

View File

@@ -43,6 +43,13 @@ type MetricsRecorder interface {
RecordInt64UpDownCount(handle *Int64UpDownCountHandle, incr int64, labels ...string)
}
// AsyncMetricsRecorder records on asynchronous metrics derived from metric registry.
type AsyncMetricsRecorder interface {
// RecordInt64AsyncGauge records the measurement alongside labels on the int
// count associated with the provided handle asynchronously
RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandle, incr int64, labels ...string)
}
// Metrics is an experimental legacy alias of the now-stable stats.MetricSet.
// Metrics will be deleted in a future release.
type Metrics = stats.MetricSet

View File

@@ -67,6 +67,10 @@ type Balancer struct {
// balancerCurrent before the UpdateSubConnState is called on the
// balancerCurrent.
currentMu sync.Mutex
// activeGoroutines tracks all the goroutines that this balancer has started
// and that should be waited on when the balancer closes.
activeGoroutines sync.WaitGroup
}
// swap swaps out the current lb with the pending lb and updates the ClientConn.
@@ -76,7 +80,9 @@ func (gsb *Balancer) swap() {
cur := gsb.balancerCurrent
gsb.balancerCurrent = gsb.balancerPending
gsb.balancerPending = nil
gsb.activeGoroutines.Add(1)
go func() {
defer gsb.activeGoroutines.Done()
gsb.currentMu.Lock()
defer gsb.currentMu.Unlock()
cur.Close()
@@ -274,6 +280,7 @@ func (gsb *Balancer) Close() {
currentBalancerToClose.Close()
pendingBalancerToClose.Close()
gsb.activeGoroutines.Wait()
}
// balancerWrapper wraps a balancer.Balancer, and overrides some Balancer
@@ -324,7 +331,12 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
defer bw.gsb.mu.Unlock()
bw.lastState = state
// If Close() acquires the mutex before UpdateState(), the balancer
// will already have been removed from the current or pending state when
// reaching this point.
if !bw.gsb.balancerCurrentOrPending(bw) {
// Returning here ensures that (*Balancer).swap() is not invoked after
// (*Balancer).Close() and therefore prevents "use after close".
return
}

View File

@@ -77,6 +77,11 @@ var (
// - Target resolution is disabled.
// - The DNS resolver is being used.
EnableDefaultPortForProxyTarget = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_DEFAULT_PORT_FOR_PROXY_TARGET", true)
// XDSAuthorityRewrite indicates whether xDS authority rewriting is enabled.
// This feature is defined in gRFC A81 and is enabled by setting the
// environment variable GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE to "true".
XDSAuthorityRewrite = boolFromEnv("GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE", false)
)
func boolFromEnv(envVar string, def bool) bool {

View File

@@ -25,4 +25,8 @@ var (
// BufferPool is implemented by the grpc package and returns a server
// option to configure a shared buffer pool for a grpc.Server.
BufferPool any // func (grpc.SharedBufferPool) grpc.ServerOption
// AcceptCompressors is implemented by the grpc package and returns
// a call option that restricts the grpc-accept-encoding header for a call.
AcceptCompressors any // func(...string) grpc.CallOption
)

View File

@@ -21,7 +21,6 @@
package idle
import (
"fmt"
"math"
"sync"
"sync/atomic"
@@ -33,15 +32,15 @@ var timeAfterFunc = func(d time.Duration, f func()) *time.Timer {
return time.AfterFunc(d, f)
}
// Enforcer is the functionality provided by grpc.ClientConn to enter
// and exit from idle mode.
type Enforcer interface {
ExitIdleMode() error
// ClientConn is the functionality provided by grpc.ClientConn to enter and exit
// from idle mode.
type ClientConn interface {
ExitIdleMode()
EnterIdleMode()
}
// Manager implements idleness detection and calls the configured Enforcer to
// enter/exit idle mode when appropriate. Must be created by NewManager.
// Manager implements idleness detection and calls the ClientConn to enter/exit
// idle mode when appropriate. Must be created by NewManager.
type Manager struct {
// State accessed atomically.
lastCallEndTime int64 // Unix timestamp in nanos; time when the most recent RPC completed.
@@ -51,8 +50,8 @@ type Manager struct {
// Can be accessed without atomics or mutex since these are set at creation
// time and read-only after that.
enforcer Enforcer // Functionality provided by grpc.ClientConn.
timeout time.Duration
cc ClientConn // Functionality provided by grpc.ClientConn.
timeout time.Duration
// idleMu is used to guarantee mutual exclusion in two scenarios:
// - Opposing intentions:
@@ -72,9 +71,9 @@ type Manager struct {
// NewManager creates a new idleness manager implementation for the
// given idle timeout. It begins in idle mode.
func NewManager(enforcer Enforcer, timeout time.Duration) *Manager {
func NewManager(cc ClientConn, timeout time.Duration) *Manager {
return &Manager{
enforcer: enforcer,
cc: cc,
timeout: timeout,
actuallyIdle: true,
activeCallsCount: -math.MaxInt32,
@@ -127,7 +126,7 @@ func (m *Manager) handleIdleTimeout() {
// Now that we've checked that there has been no activity, attempt to enter
// idle mode, which is very likely to succeed.
if m.tryEnterIdleMode() {
if m.tryEnterIdleMode(true) {
// Successfully entered idle mode. No timer needed until we exit idle.
return
}
@@ -142,10 +141,13 @@ func (m *Manager) handleIdleTimeout() {
// that, it performs a last minute check to ensure that no new RPC has come in,
// making the channel active.
//
// checkActivity controls if a check for RPC activity, since the last time the
// idle_timeout fired, is made.
// Return value indicates whether or not the channel moved to idle mode.
//
// Holds idleMu which ensures mutual exclusion with exitIdleMode.
func (m *Manager) tryEnterIdleMode() bool {
func (m *Manager) tryEnterIdleMode(checkActivity bool) bool {
// Setting the activeCallsCount to -math.MaxInt32 indicates to OnCallBegin()
// that the channel is either in idle mode or is trying to get there.
if !atomic.CompareAndSwapInt32(&m.activeCallsCount, 0, -math.MaxInt32) {
@@ -166,7 +168,7 @@ func (m *Manager) tryEnterIdleMode() bool {
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
return false
}
if atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
if checkActivity && atomic.LoadInt32(&m.activeSinceLastTimerCheck) == 1 {
// A very short RPC could have come in (and also finished) after we
// checked for calls count and activity in handleIdleTimeout(), but
// before the CAS operation. So, we need to check for activity again.
@@ -177,44 +179,37 @@ func (m *Manager) tryEnterIdleMode() bool {
// No new RPCs have come in since we set the active calls count value to
// -math.MaxInt32. And since we have the lock, it is safe to enter idle mode
// unconditionally now.
m.enforcer.EnterIdleMode()
m.cc.EnterIdleMode()
m.actuallyIdle = true
return true
}
// EnterIdleModeForTesting instructs the channel to enter idle mode.
func (m *Manager) EnterIdleModeForTesting() {
m.tryEnterIdleMode()
m.tryEnterIdleMode(false)
}
// OnCallBegin is invoked at the start of every RPC.
func (m *Manager) OnCallBegin() error {
func (m *Manager) OnCallBegin() {
if m.isClosed() {
return nil
return
}
if atomic.AddInt32(&m.activeCallsCount, 1) > 0 {
// Channel is not idle now. Set the activity bit and allow the call.
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
return
}
// Channel is either in idle mode or is in the process of moving to idle
// mode. Attempt to exit idle mode to allow this RPC.
if err := m.ExitIdleMode(); err != nil {
// Undo the increment to calls count, and return an error causing the
// RPC to fail.
atomic.AddInt32(&m.activeCallsCount, -1)
return err
}
m.ExitIdleMode()
atomic.StoreInt32(&m.activeSinceLastTimerCheck, 1)
return nil
}
// ExitIdleMode instructs m to call the enforcer's ExitIdleMode and update m's
// ExitIdleMode instructs m to call the ClientConn's ExitIdleMode and update its
// internal state.
func (m *Manager) ExitIdleMode() error {
func (m *Manager) ExitIdleMode() {
// Holds idleMu which ensures mutual exclusion with tryEnterIdleMode.
m.idleMu.Lock()
defer m.idleMu.Unlock()
@@ -231,12 +226,10 @@ func (m *Manager) ExitIdleMode() error {
// m.ExitIdleMode.
//
// In any case, there is nothing to do here.
return nil
return
}
if err := m.enforcer.ExitIdleMode(); err != nil {
return fmt.Errorf("failed to exit idle mode: %w", err)
}
m.cc.ExitIdleMode()
// Undo the idle entry process. This also respects any new RPC attempts.
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
@@ -244,7 +237,23 @@ func (m *Manager) ExitIdleMode() error {
// Start a new timer to fire after the configured idle timeout.
m.resetIdleTimerLocked(m.timeout)
return nil
}
// UnsafeSetNotIdle instructs the Manager to update its internal state to
// reflect the reality that the channel is no longer in IDLE mode.
//
// N.B. This method is intended only for internal use by the gRPC client
// when it exits IDLE mode **manually** from `Dial`. The callsite must ensure:
// - The channel was **actually in IDLE mode** immediately prior to the call.
// - There is **no concurrent activity** that could cause the channel to exit
// IDLE mode *naturally* at the same time.
func (m *Manager) UnsafeSetNotIdle() {
m.idleMu.Lock()
defer m.idleMu.Unlock()
atomic.AddInt32(&m.activeCallsCount, math.MaxInt32)
m.actuallyIdle = false
m.resetIdleTimerLocked(m.timeout)
}
// OnCallEnd is invoked at the end of every RPC.

View File

@@ -244,6 +244,10 @@ var (
// When set, the function will be called before the stream enters
// the blocking state.
NewStreamWaitingForResolver = func() {}
// AddressToTelemetryLabels is an xDS-provided function to extract telemetry
// labels from a resolver.Address. Callers must assert its type before calling.
AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string
)
// HealthChecker defines the signature of the client-side LB channel health

View File

@@ -370,7 +370,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
})
t.logger = prefixLoggerForClientTransport(t)
// Add peer information to the http2client context.
t.ctx = peer.NewContext(t.ctx, t.getPeer())
t.ctx = peer.NewContext(t.ctx, t.Peer())
if md, ok := addr.Metadata.(*metadata.MD); ok {
t.md = *md
@@ -510,7 +510,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
return s
}
func (t *http2Client) getPeer() *peer.Peer {
func (t *http2Client) Peer() *peer.Peer {
return &peer.Peer{
Addr: t.remoteAddr,
AuthInfo: t.authInfo, // Can be nil
@@ -551,6 +551,9 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
hfLen += len(authData) + len(callAuthData)
registeredCompressors := t.registeredCompressors
if callHdr.AcceptedCompressors != nil {
registeredCompressors = *callHdr.AcceptedCompressors
}
if callHdr.PreviousAttempts > 0 {
hfLen++
}
@@ -742,7 +745,7 @@ func (e NewStreamError) Error() string {
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
ctx = peer.NewContext(ctx, t.getPeer())
ctx = peer.NewContext(ctx, t.Peer())
// ServerName field of the resolver returned address takes precedence over
// Host field of CallHdr to determine the :authority header. This is because,
@@ -1485,7 +1488,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
case "grpc-status":
code, err := strconv.ParseInt(hf.Value, 10, 32)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
se := status.New(codes.Unknown, fmt.Sprintf("transport: malformed grpc-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
@@ -1807,8 +1810,6 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
}
}
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
func (t *http2Client) incrMsgSent() {
if channelz.IsOn() {
t.channelz.SocketMetrics.MessagesSent.Add(1)

View File

@@ -411,12 +411,6 @@ var writeBufferPoolMap = make(map[int]*sync.Pool)
var writeBufferMutex sync.Mutex
func newFramer(conn io.ReadWriter, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32, memPool mem.BufferPool) *framer {
if memPool == nil {
// Note that this is only supposed to be nil in tests. Otherwise, stream
// is always initialized with a BufferPool.
memPool = mem.DefaultBufferPool()
}
if writeBufferSize < 0 {
writeBufferSize = 0
}

View File

@@ -553,6 +553,12 @@ type CallHdr struct {
// outbound message.
SendCompress string
// AcceptedCompressors overrides the grpc-accept-encoding header for this
// call. When nil, the transport advertises the default set of registered
// compressors. A non-nil pointer overrides that value (including the empty
// string to advertise none).
AcceptedCompressors *string
// Creds specifies credentials.PerRPCCredentials for a call.
Creds credentials.PerRPCCredentials
@@ -608,8 +614,9 @@ type ClientTransport interface {
// with a human readable string with debug info.
GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
// Peer returns information about the peer associated with the Transport.
// The returned information includes authentication and network address details.
Peer() *peer.Peer
}
// ServerTransport is the common interface for all gRPC server-side transport

View File

@@ -38,9 +38,11 @@ type BufferPool interface {
Put(*[]byte)
}
const goPageSize = 4 << 10 // 4KiB. N.B. this must be a power of 2.
var defaultBufferPoolSizes = []int{
256,
4 << 10, // 4KB (go page size)
goPageSize,
16 << 10, // 16KB (max HTTP/2 frame size used by gRPC)
32 << 10, // 32KB (default buffer size for io.Copy)
1 << 20, // 1MB
@@ -172,7 +174,14 @@ func (p *simpleBufferPool) Get(size int) *[]byte {
p.pool.Put(bs)
}
b := make([]byte, size)
// If we're going to allocate, round up to the nearest page. This way if
// requests frequently arrive with small variation we don't allocate
// repeatedly if we get unlucky and they increase over time. By default we
// only allocate here if size > 1MiB. Because goPageSize is a power of 2, we
// can round up efficiently.
allocSize := (size + goPageSize - 1) & ^(goPageSize - 1)
b := make([]byte, size, allocSize)
return &b
}

View File

@@ -69,6 +69,7 @@ func (ccr *ccResolverWrapper) start() error {
errCh := make(chan error)
ccr.serializer.TrySchedule(func(ctx context.Context) {
if ctx.Err() != nil {
errCh <- ctx.Err()
return
}
opts := resolver.BuildOptions{

View File

@@ -33,6 +33,8 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/mem"
"google.golang.org/grpc/metadata"
@@ -41,6 +43,10 @@ import (
"google.golang.org/grpc/status"
)
func init() {
internal.AcceptCompressors = acceptCompressors
}
// Compressor defines the interface gRPC uses to compress a message.
//
// Deprecated: use package encoding.
@@ -151,16 +157,32 @@ func (d *gzipDecompressor) Type() string {
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
compressorName string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
onFinish []func(err error)
authority string
compressorName string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
contentSubtype string
codec baseCodec
maxRetryRPCBufferSize int
onFinish []func(err error)
authority string
acceptedResponseCompressors []string
}
func acceptedCompressorAllows(allowed []string, name string) bool {
if allowed == nil {
return true
}
if name == "" || name == encoding.Identity {
return true
}
for _, a := range allowed {
if a == name {
return true
}
}
return false
}
func defaultCallInfo() *callInfo {
@@ -170,6 +192,29 @@ func defaultCallInfo() *callInfo {
}
}
func newAcceptedCompressionConfig(names []string) ([]string, error) {
if len(names) == 0 {
return nil, nil
}
var allowed []string
seen := make(map[string]struct{}, len(names))
for _, name := range names {
name = strings.TrimSpace(name)
if name == "" || name == encoding.Identity {
continue
}
if !grpcutil.IsCompressorNameRegistered(name) {
return nil, status.Errorf(codes.InvalidArgument, "grpc: compressor %q is not registered", name)
}
if _, dup := seen[name]; dup {
continue
}
seen[name] = struct{}{}
allowed = append(allowed, name)
}
return allowed, nil
}
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
type CallOption interface {
@@ -471,6 +516,31 @@ func (o CompressorCallOption) before(c *callInfo) error {
}
func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
// acceptCompressors returns a CallOption that limits the compression algorithms
// advertised in the grpc-accept-encoding header for response messages.
// Compression algorithms not in the provided list will not be advertised, and
// responses compressed with non-listed algorithms will be rejected.
func acceptCompressors(names ...string) CallOption {
cp := append([]string(nil), names...)
return acceptCompressorsCallOption{names: cp}
}
// acceptCompressorsCallOption is a CallOption that limits response compression.
type acceptCompressorsCallOption struct {
names []string
}
func (o acceptCompressorsCallOption) before(c *callInfo) error {
allowed, err := newAcceptedCompressionConfig(o.names)
if err != nil {
return err
}
c.acceptedResponseCompressors = allowed
return nil
}
func (acceptCompressorsCallOption) after(*callInfo, *csAttempt) {}
// CallContentSubtype returns a CallOption that will set the content-subtype
// for a call. For example, if content-subtype is "json", the Content-Type over
// the wire will be "application/grpc+json". The content-subtype is converted
@@ -857,8 +927,7 @@ func (p *payloadInfo) free() {
// the buffer is no longer needed.
// TODO: Refactor this function to reduce the number of arguments.
// See: https://google.github.io/styleguide/go/best-practices.html#function-argument-lists
func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool,
) (out mem.BufferSlice, err error) {
func recvAndDecompress(p *parser, s recvCompressor, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor, isServer bool) (out mem.BufferSlice, err error) {
pf, compressed, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
return nil, err

View File

@@ -25,6 +25,7 @@ import (
"math"
rand "math/rand/v2"
"strconv"
"strings"
"sync"
"time"
@@ -179,13 +180,41 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
var emptyMethodConfig = serviceconfig.MethodConfig{}
// endOfClientStream performs cleanup actions required for both successful and
// failed streams. This includes incrementing channelz stats and invoking all
// registered OnFinish call options.
func endOfClientStream(cc *ClientConn, err error, opts ...CallOption) {
if channelz.IsOn() {
if err != nil {
cc.incrCallsFailed()
} else {
cc.incrCallsSucceeded()
}
}
for _, o := range opts {
if o, ok := o.(OnFinishCallOption); ok {
o.OnFinish(err)
}
}
}
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if channelz.IsOn() {
cc.incrCallsStarted()
}
defer func() {
if err != nil {
// Ensure cleanup when stream creation fails.
endOfClientStream(cc, err, opts...)
}
}()
// Start tracking the RPC for idleness purposes. This is where a stream is
// created for both streaming and unary RPCs, and hence is a good place to
// track active RPC count.
if err := cc.idlenessMgr.OnCallBegin(); err != nil {
return nil, err
}
cc.idlenessMgr.OnCallBegin()
// Add a calloption, to decrement the active call count, that gets executed
// when the RPC completes.
opts = append([]CallOption{OnFinish(func(error) { cc.idlenessMgr.OnCallEnd() })}, opts...)
@@ -204,14 +233,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
}
}
}
if channelz.IsOn() {
cc.incrCallsStarted()
defer func() {
if err != nil {
cc.incrCallsFailed()
}
}()
}
// Provide an opportunity for the first RPC to see the first service config
// provided by the resolver.
nameResolutionDelayed, err := cc.waitForResolvedAddrs(ctx)
@@ -301,6 +322,10 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
DoneFunc: doneFunc,
Authority: callInfo.authority,
}
if allowed := callInfo.acceptedResponseCompressors; len(allowed) > 0 {
headerValue := strings.Join(allowed, ",")
callHdr.AcceptedCompressors = &headerValue
}
// Set our outgoing compression according to the UseCompressor CallOption, if
// set. In that case, also find the compressor from the encoding package.
@@ -484,7 +509,7 @@ func (a *csAttempt) getTransport() error {
return err
}
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
a.trInfo.firstLine.SetRemoteAddr(a.transport.Peer().Addr)
}
if pick.blocked && a.statsHandler != nil {
a.statsHandler.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
@@ -1042,9 +1067,6 @@ func (cs *clientStream) finish(err error) {
return
}
cs.finished = true
for _, onFinish := range cs.callInfo.onFinish {
onFinish(err)
}
cs.commitAttemptLocked()
if cs.attempt != nil {
cs.attempt.finish(err)
@@ -1084,13 +1106,7 @@ func (cs *clientStream) finish(err error) {
if err == nil {
cs.retryThrottler.successfulRPC()
}
if channelz.IsOn() {
if err != nil {
cs.cc.incrCallsFailed()
} else {
cs.cc.incrCallsSucceeded()
}
}
endOfClientStream(cs.cc, err, cs.opts...)
cs.cancel()
}
@@ -1134,6 +1150,10 @@ func (a *csAttempt) recvMsg(m any, payInfo *payloadInfo) (err error) {
a.decompressorV0 = nil
a.decompressorV1 = encoding.GetCompressor(ct)
}
// Validate that the compression method is acceptable for this call.
if !acceptedCompressorAllows(cs.callInfo.acceptedResponseCompressors, ct) {
return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
}
} else {
// No compression is used; disable our decompressor.
a.decompressorV0 = nil
@@ -1479,6 +1499,10 @@ func (as *addrConnStream) RecvMsg(m any) (err error) {
as.decompressorV0 = nil
as.decompressorV1 = encoding.GetCompressor(ct)
}
// Validate that the compression method is acceptable for this call.
if !acceptedCompressorAllows(as.callInfo.acceptedResponseCompressors, ct) {
return status.Errorf(codes.Internal, "grpc: peer compressed the response with %q which is not allowed by AcceptCompressors", ct)
}
} else {
// No compression is used; disable our decompressor.
as.decompressorV0 = nil

View File

@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.77.0"
const Version = "1.78.0"

4
vendor/modules.txt vendored
View File

@@ -591,10 +591,10 @@ golang.org/x/text/width
# golang.org/x/time v0.14.0
## explicit; go 1.24.0
golang.org/x/time/rate
# google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8
# google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda
## explicit; go 1.24.0
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.77.0
# google.golang.org/grpc v1.78.0
## explicit; go 1.24.0
google.golang.org/grpc
google.golang.org/grpc/attributes