fix(proxy): close append-only loggers on stop
Co-Authored-By: Virgil <virgil@lethean.io>
This commit is contained in:
parent
0a195f7962
commit
c164bf2e26
5 changed files with 96 additions and 11 deletions
|
|
@ -26,7 +26,8 @@ type AccessLog struct {
|
|||
mu sync.Mutex
|
||||
// f is opened append-only on first write; nil until first event.
|
||||
// Uses core.File for I/O abstraction.
|
||||
f *os.File
|
||||
f *os.File
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewAccessLog stores the destination path and lazily opens it on first write.
|
||||
|
|
@ -71,6 +72,27 @@ func (l *AccessLog) OnClose(event proxy.Event) {
|
|||
l.writeLine(line)
|
||||
}
|
||||
|
||||
// Close releases the append-only file handle if it has been opened.
|
||||
//
|
||||
// al.Close()
|
||||
func (l *AccessLog) Close() {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
l.closed = true
|
||||
if l.f != nil {
|
||||
_ = l.f.Close()
|
||||
l.f = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *AccessLog) writeLine(line string) {
|
||||
if l == nil || l.path == "" {
|
||||
return
|
||||
|
|
@ -79,6 +101,10 @@ func (l *AccessLog) writeLine(line string) {
|
|||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if l.f == nil {
|
||||
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||||
if errorValue != nil {
|
||||
|
|
|
|||
28
log/share.go
28
log/share.go
|
|
@ -22,7 +22,8 @@ type ShareLog struct {
|
|||
mu sync.Mutex
|
||||
// f is opened append-only on first write; nil until first event.
|
||||
// Uses core.File for I/O abstraction.
|
||||
f *os.File
|
||||
f *os.File
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewShareLog stores the destination path and lazily opens it on first write.
|
||||
|
|
@ -65,6 +66,27 @@ func (l *ShareLog) OnReject(event proxy.Event) {
|
|||
l.writeLine(line)
|
||||
}
|
||||
|
||||
// Close releases the append-only file handle if it has been opened.
|
||||
//
|
||||
// sl.Close()
|
||||
func (l *ShareLog) Close() {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
l.closed = true
|
||||
if l.f != nil {
|
||||
_ = l.f.Close()
|
||||
l.f = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (l *ShareLog) writeLine(line string) {
|
||||
if l == nil || l.path == "" {
|
||||
return
|
||||
|
|
@ -73,6 +95,10 @@ func (l *ShareLog) writeLine(line string) {
|
|||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if l.f == nil {
|
||||
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||||
if errorValue != nil {
|
||||
|
|
|
|||
2
proxy.go
2
proxy.go
|
|
@ -35,6 +35,8 @@ type Proxy struct {
|
|||
minerMu sync.RWMutex
|
||||
servers []*Server
|
||||
httpServer *http.Server
|
||||
accessLogger *appendLineLogger
|
||||
shareLogger *appendLineLogger
|
||||
ticker *time.Ticker
|
||||
watcher *ConfigWatcher
|
||||
done chan struct{}
|
||||
|
|
|
|||
|
|
@ -8,9 +8,10 @@ import (
|
|||
)
|
||||
|
||||
type appendLineLogger struct {
|
||||
path string
|
||||
mu sync.Mutex
|
||||
file *os.File
|
||||
path string
|
||||
mu sync.Mutex
|
||||
file *os.File
|
||||
closed bool
|
||||
}
|
||||
|
||||
func newAppendLineLogger(path string) *appendLineLogger {
|
||||
|
|
@ -25,6 +26,10 @@ func (l *appendLineLogger) writeLine(line string) {
|
|||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
|
||||
if l.file == nil {
|
||||
file, errorValue := os.OpenFile(l.path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
|
||||
if errorValue != nil {
|
||||
|
|
@ -36,11 +41,29 @@ func (l *appendLineLogger) writeLine(line string) {
|
|||
_, _ = l.file.WriteString(line)
|
||||
}
|
||||
|
||||
func subscribeAccessLog(events *EventBus, path string) {
|
||||
if events == nil || path == "" {
|
||||
func (l *appendLineLogger) close() {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
if l.closed {
|
||||
return
|
||||
}
|
||||
l.closed = true
|
||||
if l.file != nil {
|
||||
_ = l.file.Close()
|
||||
l.file = nil
|
||||
}
|
||||
}
|
||||
|
||||
func subscribeAccessLog(events *EventBus, path string) *appendLineLogger {
|
||||
if events == nil || path == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := newAppendLineLogger(path)
|
||||
events.Subscribe(EventLogin, func(event Event) {
|
||||
if event.Miner == nil {
|
||||
|
|
@ -65,11 +88,12 @@ func subscribeAccessLog(events *EventBus, path string) {
|
|||
event.Miner.TX(),
|
||||
))
|
||||
})
|
||||
return logger
|
||||
}
|
||||
|
||||
func subscribeShareLog(events *EventBus, path string) {
|
||||
func subscribeShareLog(events *EventBus, path string) *appendLineLogger {
|
||||
if events == nil || path == "" {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := newAppendLineLogger(path)
|
||||
|
|
@ -94,4 +118,5 @@ func subscribeShareLog(events *EventBus, path string) {
|
|||
event.Error,
|
||||
))
|
||||
})
|
||||
return logger
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,8 +39,8 @@ func New(cfg *Config) (*Proxy, error) {
|
|||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
subscribeAccessLog(events, cfg.AccessLogFile)
|
||||
subscribeShareLog(events, cfg.ShareLogFile)
|
||||
proxyValue.accessLogger = subscribeAccessLog(events, cfg.AccessLogFile)
|
||||
proxyValue.shareLogger = subscribeShareLog(events, cfg.ShareLogFile)
|
||||
|
||||
events.Subscribe(EventLogin, func(event Event) {
|
||||
if event.Miner != nil {
|
||||
|
|
@ -179,6 +179,12 @@ func (p *Proxy) Stop() {
|
|||
server.Stop()
|
||||
}
|
||||
stopHTTPServer(p)
|
||||
if p.accessLogger != nil {
|
||||
p.accessLogger.close()
|
||||
}
|
||||
if p.shareLogger != nil {
|
||||
p.shareLogger.close()
|
||||
}
|
||||
if p.watcher != nil {
|
||||
p.watcher.Stop()
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue