accesslog: experimental SetOutput at serve-time as requested at #1631

This commit is contained in:
Gerasimos (Makis) Maropoulos 2020-09-14 13:37:42 +03:00
parent 701713efdb
commit d00a4b33f1
No known key found for this signature in database
GPG Key ID: 5DBE766BD26A54E7

View File

@ -340,16 +340,76 @@ func (ac *AccessLog) Broker() *Broker {
ac.broker = newBroker() ac.broker = newBroker()
} }
ac.mu.Unlock() ac.mu.Unlock()
return ac.broker return ac.broker
} }
// SetOutput sets the log's output destination. Accepts one or more io.Writer values. // SetOutput sets the log's output destination. Accepts one or more io.Writer values.
// Also, if a writer is a Closer, then it is automatically appended to the Closers. // Also, if a writer is a Closer, then it is automatically appended to the Closers.
// Call it before `SetFormatter` and `Handler` methods. // It's safe to used concurrently (experimental).
func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog { func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog {
ac.mu.Lock()
ac.setOutput(true, writers...)
ac.mu.Unlock()
return ac
}
// AddOutput appends an io.Writer value to the existing writer.
// Call it before `SetFormatter` and `Handler` methods.
func (ac *AccessLog) AddOutput(writers ...io.Writer) *AccessLog {
ac.mu.Lock()
ac.setOutput(false, writers...)
ac.mu.Unlock()
return ac
}
func (ac *AccessLog) setOutput(reset bool, writers ...io.Writer) { // protected by caller.
if reset {
/*
Initial idea was to wait for remaining logs to be written
in the existing writer before resetting to the new one.
But, a faster approach would be to just write the logs
to the new writers instead. This can be done by:
1. copy all existing closers and flushers,
2. change the writer immediately
3. fire a goroutine which flushes and closes the old writers,
no locks required there because they are not used for concurrent writing
anymore. Errors there are ignored (we could collect them with sync errgroup
and wait for them before exit this Reset method, but we don't).
*/
if len(ac.Flushers) > 0 || len(ac.Closers) > 0 {
flushers := make([]Flusher, len(ac.Flushers))
copy(flushers, ac.Flushers)
closers := make([]io.Closer, len(ac.Closers))
copy(closers, ac.Closers)
ac.Flushers = ac.Flushers[0:0]
ac.Closers = ac.Closers[0:0]
go func(flushers []Flusher, closers []io.Closer) {
for _, flusher := range flushers {
flusher.Flush()
}
for _, closer := range closers {
closer.Close()
}
}(flushers, closers)
}
ac.BufferTruncaters = ac.BufferTruncaters[0:0]
ac.FileTruncaters = ac.FileTruncaters[0:0]
} else {
if ac.Writer != nil { // prepend if one exists.
writers = append([]io.Writer{ac.Writer}, writers...)
}
}
switch len(writers) { switch len(writers) {
case 0: case 0:
return ac return
case 1: case 1:
ac.Writer = writers[0] ac.Writer = writers[0]
default: default:
@ -373,132 +433,6 @@ func (ac *AccessLog) SetOutput(writers ...io.Writer) *AccessLog {
ac.FileTruncaters = append(ac.FileTruncaters, truncater) ac.FileTruncaters = append(ac.FileTruncaters, truncater)
} }
} }
// ac.bufWriter = bufio.NewWriterSize(ac.Writer, 4096)
// No ^ let the caller decide that, we just helping on automatic flushing
// through the `Flushers` field called when necessary.
return ac
}
// AddOutput appends an io.Writer value to the existing writer.
// Call it before `SetFormatter` and `Handler` methods.
func (ac *AccessLog) AddOutput(writers ...io.Writer) *AccessLog {
if ac.Writer != nil { // prepend if one exists.
writers = append([]io.Writer{ac.Writer}, writers...)
}
return ac.SetOutput(writers...)
}
// func (ac *AccessLog) isBrokerActive() bool { // see `Print` method.
// return atomic.LoadUint32(&ac.brokerActive) > 0
// }
// ^ No need, we declare that the Broker should be called
// before serve-time. Let's respect our comment
// and don't try to make it safe for write and read concurrent access.
// Write writes to the log destination.
// It completes the io.Writer interface.
// Safe for concurrent use.
func (ac *AccessLog) Write(p []byte) (n int, err error) {
if ac.Async {
if atomic.LoadUint32(&ac.isClosed) > 0 {
return 0, io.ErrClosedPipe
}
}
ac.mu.Lock()
n, err = ac.Writer.Write(p)
ac.mu.Unlock()
return
}
// Flush writes any buffered data to the underlying Fluser Writer.
// Flush is called automatically on Close.
func (ac *AccessLog) Flush() (err error) {
ac.mu.Lock()
for _, f := range ac.Flushers {
fErr := f.Flush()
if fErr != nil {
if err == nil {
err = fErr
} else {
err = fmt.Errorf("%v, %v", err, fErr)
}
}
}
ac.mu.Unlock()
return
}
// Truncate if the output is a buffer, then
// it discards all but the first n unread bytes.
// See `TruncateFile` for a file size.
//
// It panics if n is negative or greater than the length of the buffer.
func (ac *AccessLog) Truncate(n int) {
ac.mu.Lock() // Lock as we do with all write operations.
for _, truncater := range ac.BufferTruncaters {
truncater.Truncate(n)
}
ac.mu.Unlock()
}
// TruncateFile changes the size of the internal file, directly.
// It does not change the I/O offset.
// If there is an error, it will be of type *PathError.
func (ac *AccessLog) TruncateFile(size int64) (err error) {
ac.mu.Lock()
for _, truncater := range ac.FileTruncaters {
tErr := truncater.Truncate(size)
if tErr != nil {
if err == nil {
err = tErr
} else {
err = fmt.Errorf("%v, %v", err, tErr)
}
}
}
ac.mu.Unlock()
return
}
// SetFormatter sets a custom formatter to print the logs.
// Any custom output writers should be
// already registered before calling this method.
// Returns this AccessLog instance.
//
// Usage:
// ac.SetFormatter(&accesslog.JSON{Indent: " "})
func (ac *AccessLog) SetFormatter(f Formatter) *AccessLog {
if ac.Writer == nil {
panic("accesslog: SetFormatter called with nil Writer")
}
if f == nil {
return ac
}
if flusher, ok := ac.formatter.(Flusher); ok {
// PREPEND formatter flushes, they should run before destination's ones.
ac.Flushers = append([]Flusher{flusher}, ac.Flushers...)
}
// Inject the writer (AccessLog) here, the writer
// is protected with mutex.
f.SetOutput(ac)
ac.formatter = f
return ac
}
// AddFields maps one or more log entries with values extracted by the Request Context.
// You can also add fields per request handler, look the `GetFields` package-level function.
// Note that this method can override a key stored by a handler's fields.
func (ac *AccessLog) AddFields(setters ...FieldSetter) *AccessLog {
ac.FieldSetters = append(ac.FieldSetters, setters...)
return ac
} }
// Close terminates any broker listeners, // Close terminates any broker listeners,
@ -570,6 +504,126 @@ func (ac *AccessLog) waitRemaining(ctx stdContext.Context) {
} }
} }
// func (ac *AccessLog) isBrokerActive() bool { // see `Print` method.
// return atomic.LoadUint32(&ac.brokerActive) > 0
// }
// ^ No need, we declare that the Broker should be called
// before serve-time. Let's respect our comment
// and don't try to make it safe for write and read concurrent access.
// Write writes to the log destination.
// It completes the io.Writer interface.
// Safe for concurrent use.
func (ac *AccessLog) Write(p []byte) (n int, err error) {
if ac.Async {
if atomic.LoadUint32(&ac.isClosed) > 0 {
return 0, io.ErrClosedPipe
}
}
ac.mu.Lock()
n, err = ac.Writer.Write(p)
ac.mu.Unlock()
return
}
// Flush writes any buffered data to the underlying Fluser Writer.
// Flush is called automatically on Close.
func (ac *AccessLog) Flush() (err error) {
ac.mu.Lock()
for _, f := range ac.Flushers {
fErr := f.Flush()
if fErr != nil {
if err == nil {
err = fErr
} else {
err = fmt.Errorf("%v, %v", err, fErr)
}
}
}
ac.mu.Unlock()
return
}
// Truncate if the output is a buffer, then
// it discards all but the first n unread bytes.
// See `TruncateFile` for a file size.
//
// It panics if n is negative or greater than the length of the buffer.
func (ac *AccessLog) Truncate(n int) {
ac.mu.Lock() // Lock as we do with all write operations.
for _, truncater := range ac.BufferTruncaters {
truncater.Truncate(n)
}
ac.mu.Unlock()
}
// TruncateFile flushes any buffered contents
// and changes the size of the internal file destination, directly.
// It does not change the I/O offset.
//
// Note that `TruncateFile` calls the `Truncate(int(size))` automatically
// in order to clear any buffered contents (if the file was wrapped by a buffer)
// before truncating the file itself.
//
// Usage, clear a file:
// err := TruncateFile(0)
func (ac *AccessLog) TruncateFile(size int64) (err error) {
ac.Truncate(int(size))
ac.mu.Lock()
for _, truncater := range ac.FileTruncaters {
tErr := truncater.Truncate(size)
if tErr != nil {
if err == nil {
err = tErr
} else {
err = fmt.Errorf("%v, %v", err, tErr)
}
}
}
ac.mu.Unlock()
return err
}
// SetFormatter sets a custom formatter to print the logs.
// Any custom output writers should be
// already registered before calling this method.
// Returns this AccessLog instance.
//
// Usage:
// ac.SetFormatter(&accesslog.JSON{Indent: " "})
func (ac *AccessLog) SetFormatter(f Formatter) *AccessLog {
if ac.Writer == nil {
panic("accesslog: SetFormatter called with nil Writer")
}
if f == nil {
return ac
}
if flusher, ok := ac.formatter.(Flusher); ok {
// PREPEND formatter flushes, they should run before destination's ones.
ac.Flushers = append([]Flusher{flusher}, ac.Flushers...)
}
// Inject the writer (AccessLog) here, the writer
// is protected with mutex.
f.SetOutput(ac)
ac.formatter = f
return ac
}
// AddFields maps one or more log entries with values extracted by the Request Context.
// You can also add fields per request handler, look the `GetFields` package-level function.
// Note that this method can override a key stored by a handler's fields.
func (ac *AccessLog) AddFields(setters ...FieldSetter) *AccessLog {
ac.FieldSetters = append(ac.FieldSetters, setters...)
return ac
}
func (ac *AccessLog) shouldReadRequestBody() bool { func (ac *AccessLog) shouldReadRequestBody() bool {
return ac.RequestBody || ac.BytesReceived || ac.BytesReceivedBody return ac.RequestBody || ac.BytesReceived || ac.BytesReceivedBody