Fix console and exception waiter races
This commit is contained in:
parent
f38ceb3bd6
commit
fd15f95ca9
1 changed files with 71 additions and 20 deletions
91
console.go
91
console.go
|
|
@ -22,6 +22,7 @@ type ConsoleWatcher struct {
|
|||
filters []ConsoleFilter
|
||||
limit int
|
||||
handlers []consoleHandlerRegistration
|
||||
waiters []consoleMessageWaiter
|
||||
nextHandlerID atomic.Int64
|
||||
}
|
||||
|
||||
|
|
@ -39,6 +40,11 @@ type consoleHandlerRegistration struct {
|
|||
handler ConsoleHandler
|
||||
}
|
||||
|
||||
type consoleMessageWaiter struct {
|
||||
filter ConsoleFilter
|
||||
ch chan ConsoleMessage
|
||||
}
|
||||
|
||||
// Watch console messages from a Webview while a flow is running.
|
||||
//
|
||||
// watcher := webview.NewConsoleWatcher(wv)
|
||||
|
|
@ -227,6 +233,18 @@ func (cw *ConsoleWatcher) removeHandler(id int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cw *ConsoleWatcher) removeWaiter(ch chan ConsoleMessage) {
|
||||
cw.mu.Lock()
|
||||
defer cw.mu.Unlock()
|
||||
|
||||
for i, waiter := range cw.waiters {
|
||||
if waiter.ch == ch {
|
||||
cw.waiters = slices.Delete(cw.waiters, i, i+1)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetLimit replaces the retention limit for future appends.
|
||||
func (cw *ConsoleWatcher) SetLimit(limit int) {
|
||||
cw.mu.Lock()
|
||||
|
|
@ -328,7 +346,6 @@ func (cw *ConsoleWatcher) Clear() {
|
|||
|
||||
// WaitForMessage waits for a message matching the filter.
|
||||
func (cw *ConsoleWatcher) WaitForMessage(ctx context.Context, filter ConsoleFilter) (*ConsoleMessage, error) {
|
||||
// First check existing messages
|
||||
cw.mu.RLock()
|
||||
for _, msg := range cw.messages {
|
||||
if cw.matchesSingleFilter(msg, filter) {
|
||||
|
|
@ -338,19 +355,20 @@ func (cw *ConsoleWatcher) WaitForMessage(ctx context.Context, filter ConsoleFilt
|
|||
}
|
||||
cw.mu.RUnlock()
|
||||
|
||||
// Set up a channel for new messages
|
||||
messageCh := make(chan ConsoleMessage, 1)
|
||||
handler := func(msg ConsoleMessage) {
|
||||
cw.mu.Lock()
|
||||
for _, msg := range cw.messages {
|
||||
if cw.matchesSingleFilter(msg, filter) {
|
||||
select {
|
||||
case messageCh <- msg:
|
||||
default:
|
||||
}
|
||||
cw.mu.Unlock()
|
||||
return &msg, nil
|
||||
}
|
||||
}
|
||||
|
||||
handlerID := cw.addHandler(handler)
|
||||
defer cw.removeHandler(handlerID)
|
||||
cw.waiters = append(cw.waiters, consoleMessageWaiter{
|
||||
filter: filter,
|
||||
ch: messageCh,
|
||||
})
|
||||
cw.mu.Unlock()
|
||||
defer cw.removeWaiter(messageCh)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -442,8 +460,18 @@ func (cw *ConsoleWatcher) addMessage(msg ConsoleMessage) {
|
|||
|
||||
// Copy handlers to call outside lock
|
||||
handlers := slices.Clone(cw.handlers)
|
||||
waiters := slices.Clone(cw.waiters)
|
||||
cw.mu.Unlock()
|
||||
|
||||
for _, waiter := range waiters {
|
||||
if cw.matchesSingleFilter(msg, waiter.filter) {
|
||||
select {
|
||||
case waiter.ch <- msg:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Call handlers
|
||||
for _, registration := range handlers {
|
||||
registration.handler(msg)
|
||||
|
|
@ -520,6 +548,7 @@ type ExceptionWatcher struct {
|
|||
wv *Webview
|
||||
exceptions []ExceptionInfo
|
||||
handlers []exceptionHandlerRegistration
|
||||
waiters []exceptionWaiter
|
||||
nextHandlerID atomic.Int64
|
||||
}
|
||||
|
||||
|
|
@ -528,6 +557,10 @@ type exceptionHandlerRegistration struct {
|
|||
handler func(ExceptionInfo)
|
||||
}
|
||||
|
||||
type exceptionWaiter struct {
|
||||
ch chan ExceptionInfo
|
||||
}
|
||||
|
||||
// Capture Runtime.exceptionThrown events from the active page.
|
||||
//
|
||||
// watcher := webview.NewExceptionWatcher(wv)
|
||||
|
|
@ -619,9 +652,20 @@ func (ew *ExceptionWatcher) removeHandler(id int64) {
|
|||
}
|
||||
}
|
||||
|
||||
func (ew *ExceptionWatcher) removeWaiter(ch chan ExceptionInfo) {
|
||||
ew.mu.Lock()
|
||||
defer ew.mu.Unlock()
|
||||
|
||||
for i, waiter := range ew.waiters {
|
||||
if waiter.ch == ch {
|
||||
ew.waiters = slices.Delete(ew.waiters, i, i+1)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForException waits for an exception to be thrown.
|
||||
func (ew *ExceptionWatcher) WaitForException(ctx context.Context) (*ExceptionInfo, error) {
|
||||
// Check existing exceptions first
|
||||
ew.mu.RLock()
|
||||
if len(ew.exceptions) > 0 {
|
||||
exc := ew.exceptions[len(ew.exceptions)-1]
|
||||
|
|
@ -630,17 +674,16 @@ func (ew *ExceptionWatcher) WaitForException(ctx context.Context) (*ExceptionInf
|
|||
}
|
||||
ew.mu.RUnlock()
|
||||
|
||||
// Set up a channel for new exceptions
|
||||
excCh := make(chan ExceptionInfo, 1)
|
||||
handler := func(exc ExceptionInfo) {
|
||||
select {
|
||||
case excCh <- exc:
|
||||
default:
|
||||
}
|
||||
ew.mu.Lock()
|
||||
if len(ew.exceptions) > 0 {
|
||||
exc := ew.exceptions[len(ew.exceptions)-1]
|
||||
ew.mu.Unlock()
|
||||
return &exc, nil
|
||||
}
|
||||
|
||||
handlerID := ew.addHandler(handler)
|
||||
defer ew.removeHandler(handlerID)
|
||||
ew.waiters = append(ew.waiters, exceptionWaiter{ch: excCh})
|
||||
ew.mu.Unlock()
|
||||
defer ew.removeWaiter(excCh)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
|
@ -693,8 +736,16 @@ func (ew *ExceptionWatcher) handleException(params map[string]any) {
|
|||
ew.mu.Lock()
|
||||
ew.exceptions = append(ew.exceptions, info)
|
||||
handlers := slices.Clone(ew.handlers)
|
||||
waiters := slices.Clone(ew.waiters)
|
||||
ew.mu.Unlock()
|
||||
|
||||
for _, waiter := range waiters {
|
||||
select {
|
||||
case waiter.ch <- info:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Call handlers
|
||||
for _, registration := range handlers {
|
||||
registration.handler(info)
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue