diff --git a/middleware/accesslog/accesslog.go b/middleware/accesslog/accesslog.go index 592f888f..33dd3eef 100644 --- a/middleware/accesslog/accesslog.go +++ b/middleware/accesslog/accesslog.go @@ -104,7 +104,7 @@ var ( // A new AccessLog middleware MUST // be created after a `New` function call. type AccessLog struct { - mu sync.Mutex // ensures atomic writes. + mu sync.RWMutex // ensures atomic writes. // The destination writer. // If multiple output required, then define an `io.MultiWriter`. // See `SetOutput` and `AddOutput` methods too. @@ -348,91 +348,143 @@ func (ac *AccessLog) Broker() *Broker { // Also, if a writer is a Closer, then it is automatically appended to the Closers. // It's safe to used concurrently (experimental). func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog { - ac.mu.Lock() ac.setOutput(true, writers...) - ac.mu.Unlock() - return ac } // AddOutput appends an io.Writer value to the existing writer. // Call it before `SetFormatter` and `Handler` methods. func (ac *AccessLog) AddOutput(writers ...io.Writer) *AccessLog { - ac.mu.Lock() ac.setOutput(false, writers...) - ac.mu.Unlock() - return ac } -func (ac *AccessLog) setOutput(reset bool, writers ...io.Writer) { // protected by caller. - if reset { - /* - Initial idea was to wait for remaining logs to be written - in the existing writer before resetting to the new one. - But, a faster approach would be to just write the logs - to the new writers instead. This can be done by: - 1. copy all existing closers and flushers, - 2. change the writer immediately - 3. fire a goroutine which flushes and closes the old writers, - no locks required there because they are not used for concurrent writing - anymore. Errors there are ignored (we could collect them with sync errgroup - and wait for them before exit this Reset method, but we don't). - */ - if len(ac.Flushers) > 0 || len(ac.Closers) > 0 { - flushers := make([]Flusher, len(ac.Flushers)) - copy(flushers, ac.Flushers) +func (ac *AccessLog) setOutput(reset bool, writers ...io.Writer) { - closers := make([]io.Closer, len(ac.Closers)) - copy(closers, ac.Closers) + /* + Initial idea was to wait for remaining logs to be written + in the existing writer before resetting to the new one. + But, a faster approach would be to just write the logs + to the new writers instead. This can be done by: + 1. copy all existing closers and flushers, + 2. change the writer immediately + 3. fire a goroutine which flushes and closes the old writers, + no locks required there because they are not used for concurrent writing + anymore. Errors there are ignored (we could collect them with sync errgroup + and wait for them before exit this Reset method, but we don't). + */ - ac.Flushers = ac.Flushers[0:0] - ac.Closers = ac.Closers[0:0] - - go func(flushers []Flusher, closers []io.Closer) { - for _, flusher := range flushers { - flusher.Flush() - } - for _, closer := range closers { - closer.Close() - } - }(flushers, closers) - } - - ac.BufferTruncaters = ac.BufferTruncaters[0:0] - ac.FileTruncaters = ac.FileTruncaters[0:0] - } else { - if ac.Writer != nil { // prepend if one exists. + if !reset { + // prepend if one exists. + ac.mu.Lock() + if ac.Writer != nil { writers = append([]io.Writer{ac.Writer}, writers...) } + ac.mu.Unlock() } switch len(writers) { case 0: return case 1: + ac.mu.Lock() ac.Writer = writers[0] + ac.mu.Unlock() default: - ac.Writer = io.MultiWriter(writers...) + multi := io.MultiWriter(writers...) + ac.mu.Lock() + ac.Writer = multi + ac.mu.Unlock() } + // NO need to check for a "hadWriter", + // because it will always have a previous writer + // on serve-time (the spot we care about performance), + // so if it set by New, on build-time, we don't rly care about some read locks slowdown. + ac.mu.RLock() + n := len(ac.Flushers) + ac.mu.RUnlock() + + flushers := make([]Flusher, n) + if n > 0 { + ac.mu.Lock() + copy(flushers, ac.Flushers) + ac.mu.Unlock() + } + + ac.mu.RLock() + n = len(ac.Closers) + ac.mu.RUnlock() + + closers := make([]io.Closer, n) + if n > 0 { + ac.mu.Lock() + copy(closers, ac.Closers) + ac.mu.Unlock() + } + + if reset { + // Reset previous flushers and closers, + // so any middle request can't flush to the old ones. + // Note that, because we don't lock the whole operation, + // there is a chance of Flush while we are doing this, + // not by the middleware (unless panic, but again, the data are written + // to the new writer, they are not lost, just not flushed immediately), + // an outsider may call it, and if it does + // then it is its responsibility to lock between manual Flush calls and + // SetOutput ones. This is done to be able + // to serve requests fast even on Async == false + // while SetOutput is called at serve-time, if we didn't care about it + // we could lock the whole operation which would make the + // log writers to wait and be done with this. + ac.mu.Lock() + ac.Flushers = ac.Flushers[0:0] + ac.Closers = ac.Closers[0:0] + ac.BufferTruncaters = ac.BufferTruncaters[0:0] + ac.FileTruncaters = ac.FileTruncaters[0:0] + ac.mu.Unlock() + } + + // Store the new flushers, closers and truncaters... for _, w := range writers { if flusher, ok := w.(Flusher); ok { + ac.mu.Lock() ac.Flushers = append(ac.Flushers, flusher) + ac.mu.Unlock() } if closer, ok := w.(io.Closer); ok { + ac.mu.Lock() ac.Closers = append(ac.Closers, closer) + ac.mu.Unlock() } if truncater, ok := w.(BufferTruncater); ok { + ac.mu.Lock() ac.BufferTruncaters = append(ac.BufferTruncaters, truncater) + ac.mu.Unlock() } if truncater, ok := w.(FileTruncater); ok { + ac.mu.Lock() ac.FileTruncaters = append(ac.FileTruncaters, truncater) + ac.mu.Unlock() } } + + // And finally, wait before exit this method + // until previous writer's closers and flush finish. + for _, flusher := range flushers { + if flusher != nil { + flusher.Flush() + } + } + for _, closer := range closers { + if closer != nil { + closer.Close() + } + + } } // Close terminates any broker listeners, diff --git a/middleware/accesslog/accesslog_test.go b/middleware/accesslog/accesslog_test.go index c4d59fd1..d46e1e7f 100644 --- a/middleware/accesslog/accesslog_test.go +++ b/middleware/accesslog/accesslog_test.go @@ -100,7 +100,7 @@ func TestAccessLogBroker(t *testing.T) { wg := new(sync.WaitGroup) n := 4 - wg.Add(4) + wg.Add(n) go func() { i := 0 ln := broker.NewListener() @@ -203,6 +203,116 @@ func TestAccessLogBlank(t *testing.T) { } } +type slowClose struct{ *bytes.Buffer } + +func (c *slowClose) Close() error { + time.Sleep(1 * time.Second) + return nil +} + +func TestAccessLogSetOutput(t *testing.T) { + var ( + w1 = &bytes.Buffer{} + w2 = &bytes.Buffer{} + w3 = &slowClose{&bytes.Buffer{}} + w4 = &bytes.Buffer{} + ) + + ac := New(w1) + ac.Clock = TClock(time.Time{}) + + n := 40 + expected := strings.Repeat("0001-01-01 00:00:00|1s|200|GET|/|127.0.0.1|0 B|0 B||\n", n) + + printLog := func() { + err := ac.Print( + nil, + time.Second, + defaultTimeFormat, + 200, + "GET", + "/", + "127.0.0.1", + "", + "", + 0, + 0, + nil, + nil, + nil, + ) + + if err != nil { + t.Fatal(err) + } + } + + testSetOutput := func(name string, w io.Writer, withSlowClose bool) { + wg := new(sync.WaitGroup) + wg.Add(n / 4) + for i := 0; i < n/4; i++ { + go func(i int) { + defer wg.Done() + + if i%2 == 0 { + time.Sleep(10 * time.Millisecond) + } + + switch i { + case 5: + if w == nil { + break + } + + now := time.Now() + ac.SetOutput(w) + if withSlowClose { + end := time.Since(now) + if end < time.Second { + panic(fmt.Sprintf("[%s] [%d]: SetOutput should wait for previous Close. Expected to return a bit after %s but %s", name, i, time.Second, end)) + } + } + } + + printLog() + }(i) + } + + // wait to finish. + wg.Wait() + } + + go testSetOutput("w1", nil, false) // write at least one line and then + time.Sleep(100 * time.Millisecond) // concurrently + testSetOutput("w2", w2, false) // change the writer + testSetOutput("w3", w3, false) + testSetOutput("w4", w4, true) + + gotAll := w1.String() + w2.String() + w3.String() + w4.String() + + // test if all content written and we have no loses. + if expected != gotAll { + t.Fatalf("expected total written result to be:\n'%s'\n\nbut got:\n'%s'", expected, gotAll) + } + + // now, check if all have contents, they should because we wait between them, + // contents spread. + checkLines := func(name, s string, minimumLines int) { + if got := strings.Count(s, "\n"); got < minimumLines { + t.Logf("[%s] expected minimum lines of: %d but got %d", name, minimumLines, got) + } + } + + checkLines("w1", w1.String(), 1) + checkLines("w2", w2.String(), 5) + checkLines("w3", w3.String(), 5) + checkLines("w4", w4.String(), 5) + + if err := ac.Close(); err != nil { + t.Fatalf("On close: %v", err) + } +} + type noOpFormatter struct{} func (*noOpFormatter) SetOutput(io.Writer) {}