accesslog: SetOutput change at servetime and wait(no lock) until prev is closed as requested at #1631

This commit is contained in:
Gerasimos (Makis) Maropoulos 2020-09-14 17:35:17 +03:00
parent d00a4b33f1
commit 076f806925
No known key found for this signature in database
GPG Key ID: 5DBE766BD26A54E7
2 changed files with 207 additions and 45 deletions

View File

@ -104,7 +104,7 @@ var (
// A new AccessLog middleware MUST // A new AccessLog middleware MUST
// be created after a `New` function call. // be created after a `New` function call.
type AccessLog struct { type AccessLog struct {
mu sync.Mutex // ensures atomic writes. mu sync.RWMutex // ensures atomic writes.
// The destination writer. // The destination writer.
// If multiple output required, then define an `io.MultiWriter`. // If multiple output required, then define an `io.MultiWriter`.
// See `SetOutput` and `AddOutput` methods too. // See `SetOutput` and `AddOutput` methods too.
@ -348,91 +348,143 @@ func (ac *AccessLog) Broker() *Broker {
// Also, if a writer is a Closer, then it is automatically appended to the Closers. // Also, if a writer is a Closer, then it is automatically appended to the Closers.
// It's safe to used concurrently (experimental). // It's safe to used concurrently (experimental).
func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog { func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog {
ac.mu.Lock()
ac.setOutput(true, writers...) ac.setOutput(true, writers...)
ac.mu.Unlock()
return ac return ac
} }
// AddOutput appends an io.Writer value to the existing writer. // AddOutput appends an io.Writer value to the existing writer.
// Call it before `SetFormatter` and `Handler` methods. // Call it before `SetFormatter` and `Handler` methods.
func (ac *AccessLog) AddOutput(writers ...io.Writer) *AccessLog { func (ac *AccessLog) AddOutput(writers ...io.Writer) *AccessLog {
ac.mu.Lock()
ac.setOutput(false, writers...) ac.setOutput(false, writers...)
ac.mu.Unlock()
return ac return ac
} }
func (ac *AccessLog) setOutput(reset bool, writers ...io.Writer) { // protected by caller. func (ac *AccessLog) setOutput(reset bool, writers ...io.Writer) {
if reset {
/*
Initial idea was to wait for remaining logs to be written
in the existing writer before resetting to the new one.
But, a faster approach would be to just write the logs
to the new writers instead. This can be done by:
1. copy all existing closers and flushers,
2. change the writer immediately
3. fire a goroutine which flushes and closes the old writers,
no locks required there because they are not used for concurrent writing
anymore. Errors there are ignored (we could collect them with sync errgroup
and wait for them before exit this Reset method, but we don't).
*/
if len(ac.Flushers) > 0 || len(ac.Closers) > 0 {
flushers := make([]Flusher, len(ac.Flushers))
copy(flushers, ac.Flushers)
closers := make([]io.Closer, len(ac.Closers)) /*
copy(closers, ac.Closers) Initial idea was to wait for remaining logs to be written
in the existing writer before resetting to the new one.
But, a faster approach would be to just write the logs
to the new writers instead. This can be done by:
1. copy all existing closers and flushers,
2. change the writer immediately
3. fire a goroutine which flushes and closes the old writers,
no locks required there because they are not used for concurrent writing
anymore. Errors there are ignored (we could collect them with sync errgroup
and wait for them before exit this Reset method, but we don't).
*/
ac.Flushers = ac.Flushers[0:0] if !reset {
ac.Closers = ac.Closers[0:0] // prepend if one exists.
ac.mu.Lock()
go func(flushers []Flusher, closers []io.Closer) { if ac.Writer != nil {
for _, flusher := range flushers {
flusher.Flush()
}
for _, closer := range closers {
closer.Close()
}
}(flushers, closers)
}
ac.BufferTruncaters = ac.BufferTruncaters[0:0]
ac.FileTruncaters = ac.FileTruncaters[0:0]
} else {
if ac.Writer != nil { // prepend if one exists.
writers = append([]io.Writer{ac.Writer}, writers...) writers = append([]io.Writer{ac.Writer}, writers...)
} }
ac.mu.Unlock()
} }
switch len(writers) { switch len(writers) {
case 0: case 0:
return return
case 1: case 1:
ac.mu.Lock()
ac.Writer = writers[0] ac.Writer = writers[0]
ac.mu.Unlock()
default: default:
ac.Writer = io.MultiWriter(writers...) multi := io.MultiWriter(writers...)
ac.mu.Lock()
ac.Writer = multi
ac.mu.Unlock()
} }
// NO need to check for a "hadWriter",
// because it will always have a previous writer
// on serve-time (the spot we care about performance),
// so if it set by New, on build-time, we don't rly care about some read locks slowdown.
ac.mu.RLock()
n := len(ac.Flushers)
ac.mu.RUnlock()
flushers := make([]Flusher, n)
if n > 0 {
ac.mu.Lock()
copy(flushers, ac.Flushers)
ac.mu.Unlock()
}
ac.mu.RLock()
n = len(ac.Closers)
ac.mu.RUnlock()
closers := make([]io.Closer, n)
if n > 0 {
ac.mu.Lock()
copy(closers, ac.Closers)
ac.mu.Unlock()
}
if reset {
// Reset previous flushers and closers,
// so any middle request can't flush to the old ones.
// Note that, because we don't lock the whole operation,
// there is a chance of Flush while we are doing this,
// not by the middleware (unless panic, but again, the data are written
// to the new writer, they are not lost, just not flushed immediately),
// an outsider may call it, and if it does
// then it is its responsibility to lock between manual Flush calls and
// SetOutput ones. This is done to be able
// to serve requests fast even on Async == false
// while SetOutput is called at serve-time, if we didn't care about it
// we could lock the whole operation which would make the
// log writers to wait and be done with this.
ac.mu.Lock()
ac.Flushers = ac.Flushers[0:0]
ac.Closers = ac.Closers[0:0]
ac.BufferTruncaters = ac.BufferTruncaters[0:0]
ac.FileTruncaters = ac.FileTruncaters[0:0]
ac.mu.Unlock()
}
// Store the new flushers, closers and truncaters...
for _, w := range writers { for _, w := range writers {
if flusher, ok := w.(Flusher); ok { if flusher, ok := w.(Flusher); ok {
ac.mu.Lock()
ac.Flushers = append(ac.Flushers, flusher) ac.Flushers = append(ac.Flushers, flusher)
ac.mu.Unlock()
} }
if closer, ok := w.(io.Closer); ok { if closer, ok := w.(io.Closer); ok {
ac.mu.Lock()
ac.Closers = append(ac.Closers, closer) ac.Closers = append(ac.Closers, closer)
ac.mu.Unlock()
} }
if truncater, ok := w.(BufferTruncater); ok { if truncater, ok := w.(BufferTruncater); ok {
ac.mu.Lock()
ac.BufferTruncaters = append(ac.BufferTruncaters, truncater) ac.BufferTruncaters = append(ac.BufferTruncaters, truncater)
ac.mu.Unlock()
} }
if truncater, ok := w.(FileTruncater); ok { if truncater, ok := w.(FileTruncater); ok {
ac.mu.Lock()
ac.FileTruncaters = append(ac.FileTruncaters, truncater) ac.FileTruncaters = append(ac.FileTruncaters, truncater)
ac.mu.Unlock()
} }
} }
// And finally, wait before exit this method
// until previous writer's closers and flush finish.
for _, flusher := range flushers {
if flusher != nil {
flusher.Flush()
}
}
for _, closer := range closers {
if closer != nil {
closer.Close()
}
}
} }
// Close terminates any broker listeners, // Close terminates any broker listeners,

View File

@ -100,7 +100,7 @@ func TestAccessLogBroker(t *testing.T) {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
n := 4 n := 4
wg.Add(4) wg.Add(n)
go func() { go func() {
i := 0 i := 0
ln := broker.NewListener() ln := broker.NewListener()
@ -203,6 +203,116 @@ func TestAccessLogBlank(t *testing.T) {
} }
} }
type slowClose struct{ *bytes.Buffer }
func (c *slowClose) Close() error {
time.Sleep(1 * time.Second)
return nil
}
func TestAccessLogSetOutput(t *testing.T) {
var (
w1 = &bytes.Buffer{}
w2 = &bytes.Buffer{}
w3 = &slowClose{&bytes.Buffer{}}
w4 = &bytes.Buffer{}
)
ac := New(w1)
ac.Clock = TClock(time.Time{})
n := 40
expected := strings.Repeat("0001-01-01 00:00:00|1s|200|GET|/|127.0.0.1|0 B|0 B||\n", n)
printLog := func() {
err := ac.Print(
nil,
time.Second,
defaultTimeFormat,
200,
"GET",
"/",
"127.0.0.1",
"",
"",
0,
0,
nil,
nil,
nil,
)
if err != nil {
t.Fatal(err)
}
}
testSetOutput := func(name string, w io.Writer, withSlowClose bool) {
wg := new(sync.WaitGroup)
wg.Add(n / 4)
for i := 0; i < n/4; i++ {
go func(i int) {
defer wg.Done()
if i%2 == 0 {
time.Sleep(10 * time.Millisecond)
}
switch i {
case 5:
if w == nil {
break
}
now := time.Now()
ac.SetOutput(w)
if withSlowClose {
end := time.Since(now)
if end < time.Second {
panic(fmt.Sprintf("[%s] [%d]: SetOutput should wait for previous Close. Expected to return a bit after %s but %s", name, i, time.Second, end))
}
}
}
printLog()
}(i)
}
// wait to finish.
wg.Wait()
}
go testSetOutput("w1", nil, false) // write at least one line and then
time.Sleep(100 * time.Millisecond) // concurrently
testSetOutput("w2", w2, false) // change the writer
testSetOutput("w3", w3, false)
testSetOutput("w4", w4, true)
gotAll := w1.String() + w2.String() + w3.String() + w4.String()
// test if all content written and we have no loses.
if expected != gotAll {
t.Fatalf("expected total written result to be:\n'%s'\n\nbut got:\n'%s'", expected, gotAll)
}
// now, check if all have contents, they should because we wait between them,
// contents spread.
checkLines := func(name, s string, minimumLines int) {
if got := strings.Count(s, "\n"); got < minimumLines {
t.Logf("[%s] expected minimum lines of: %d but got %d", name, minimumLines, got)
}
}
checkLines("w1", w1.String(), 1)
checkLines("w2", w2.String(), 5)
checkLines("w3", w3.String(), 5)
checkLines("w4", w4.String(), 5)
if err := ac.Close(); err != nil {
t.Fatalf("On close: %v", err)
}
}
type noOpFormatter struct{} type noOpFormatter struct{}
func (*noOpFormatter) SetOutput(io.Writer) {} func (*noOpFormatter) SetOutput(io.Writer) {}