Dark Mode

Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met#3386

Closed
tonyhb wants to merge 2 commits intogoogle:masterfrom
tonyhb:chore/flush-on-shutdown
Closed

pubsub: Ensure batcher flushes on shutdown, even if min batch size isn't met#3386
tonyhb wants to merge 2 commits intogoogle:masterfrom
tonyhb:chore/flush-on-shutdown

Conversation

Copy link

tonyhb commented Feb 27, 2024

This PR ensures that the batcher flushes on shutdown, even if the pending length is less than the min batch size specified. Sending events is preferred to dropping, even if limits are not obeyed.

Related to #3383, but doesn't necessarily replace.

...n't met

This PR ensures that the batcher flushes on shutdown, even if the
pending length is less than the min batch size specified. Sending
events is preferred to dropping, even if limits are not obeyed.
tonyhb mentioned this pull request Feb 27, 2024
4 tasks
Copy link
Contributor

vangent commented Mar 1, 2024

The tests are failing with a data race:

WARNING: DATA RACE
Write at 0x00c00044e3f0 by goroutine 978:
gocloud.dev/pubsub/batcher.(*Batcher).callHandler()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.g o:287 +0x1ca
gocloud.dev/pubsub/batcher.(*Batcher).handleBatch.func1()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.g o:217 +0x54

Previous read at 0x00c00044e3f0 by goroutine 979:
gocloud.dev/pubsub/batcher.(*Batcher).Shutdown()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.g o:303 +0x75
gocloud.dev/pubsub.(*Subscription).Shutdown.func2()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:697 +0xb2

Goroutine 978 (running) created at:
gocloud.dev/pubsub/batcher.(*Batcher).handleBatch()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.g o:216 +0x11d
gocloud.dev/pubsub/batcher.(*Batcher).AddNoWait()
/home/runner/work/go-cloud/go-cloud/pubsub/batcher/batcher.g o:203 +0x3b1
gocloud.dev/pubsub.(*Subscription).Receive.func3()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:608 +0xcc
gocloud.dev/pubsub.(*Message).Ack()
/home/runner/work/go-cloud/go-cloud/pubsub/pubsub.go:161 +0xe1
gocloud.dev/pubsub/natspubsub.TestInteropWithDirectNATS.deferwrap4()
/home/runner/work/go-cloud/go-cloud/pubsub/natspubsub/nats_t est.go:259 +0x33
runtime.deferreturn()
/opt/hostedtoolcache/go/1.22.0/x64/src/runtime/panic.go:602 +0x5d
testing.tRunner()
/opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:16 89 +0x21e
testing.(*T).Run.gowrap1()
/opt/hostedtoolcache/go/1.22.0/x64/src/testing/testing.go:17 42 +0x44

Copy link
Author

tonyhb commented Mar 1, 2024

Ah, thanks. I ran without -race . Fixing!

Copy link
Author

tonyhb commented Mar 1, 2024

@vangent fixed, thanks for the notification.

vangent requested changes Mar 2, 2024

// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
Copy link
Contributor

vangent Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be necessary to use atomic to check/modify nHandlers. There's a mutex on the struct to protect it. IIUC, the problem is that you added a read on this line outside of the lock. I think if you move the b.mu.Unlock a few lines up to below this new stanza, it will be fine.


// On shutdown, ensure that we attempt to flush any pending items
// if there's a minimum batch size.
if atomic.LoadInt32(&b.nHandlers) < int32(b.opts.MaxHandlers) {
Copy link
Contributor

vangent Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if nHandlers == MaxHandlers? Won't we drop some messages then?

Copy link
Author

tonyhb Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, nextBatch in the handlers call will return the remaining items as shutdown is set to true, so everything will be processed as expected.

Copy link
Contributor

vangent Mar 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If nHandlers == MaxHandlers, nextBatch won't even be called...?

Copy link
Contributor

vangent Jun 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not obvious to me that we should be checking nHandlers here at all. Seems like during shutdown we need to choose between possibly creating more than MaxHandlers handlers, or dropping messages.

Copy link
Contributor

vangent commented Dec 7, 2024

Ping?

Copy link
Contributor

vangent commented Apr 5, 2025

I patched this with changes in #3543.

vangent closed this Apr 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Reviewers

vangent vangent requested changes

Assignees

No one assigned

Labels

None yet

Projects

None yet

Milestone

No milestone

Development

Successfully merging this pull request may close these issues.

2 participants