implementation tail plugins

This commit is contained in:
Negasus 2022-06-06 14:09:07 +03:00
parent 383b6132d4
commit 8cde326bbb
3 changed files with 122 additions and 17 deletions

View File

@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"sync"
"time"
@ -25,8 +26,9 @@ type Conductor struct {
Address string
RPCDialer RPCDialer
plugins []Handler
lock sync.RWMutex
plugins []Handler
tailPlugins []Handler
lock sync.RWMutex
}
// Handler contains information about a plugin's handler
@ -122,11 +124,7 @@ func (c *Conductor) Middleware(next http.Handler) http.Handler {
if reply.Break {
c.lock.RUnlock()
w.WriteHeader(reply.StatusCode)
_, errWrite := w.Write(reply.Body)
if errWrite != nil {
log.Printf("[WARN] failed to write response body: %v", errWrite)
}
sendResponse(w, reply.StatusCode, reply.Body, nil)
return
}
@ -136,12 +134,79 @@ func (c *Conductor) Middleware(next http.Handler) http.Handler {
return
}
}
c.lock.RUnlock()
next.ServeHTTP(w, r)
// fast path with no tail plugins
if len(c.tailPlugins) == 0 {
c.lock.RUnlock()
next.ServeHTTP(w, r)
return
}
ww := httptest.NewRecorder()
for k, vv := range w.Header() {
for _, v := range vv {
ww.Header().Add(k, v)
}
}
next.ServeHTTP(ww, r)
responseCode := ww.Code
responseBody := ww.Body.Bytes()
responseHeaders := ww.Header()
for _, p := range c.tailPlugins {
if !p.Alive {
continue
}
req := c.makeRequest(r)
req.ResponseCode = responseCode
req.ResponseBody = responseBody
req.ResponseHeaders = responseHeaders
var reply lib.Response
if err := p.client.Call(p.Method, req, &reply); err != nil {
c.lock.RUnlock()
log.Printf("[WARN] failed to invoke tail plugin handler %s: %v", p.Method, err)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
return
}
setHeaders(r.Header, reply.HeadersIn, reply.OverrideHeadersIn)
setHeaders(responseHeaders, reply.HeadersOut, reply.OverrideHeadersOut)
if reply.OverrideStatusCode {
responseCode = reply.StatusCode
}
if reply.OverrideBody {
responseBody = reply.Body
}
if reply.Break {
c.lock.RUnlock()
sendResponse(w, responseCode, responseBody, responseHeaders)
return
}
}
c.lock.RUnlock()
sendResponse(w, responseCode, responseBody, responseHeaders)
})
}
func sendResponse(w http.ResponseWriter, code int, body []byte, headers http.Header) {
for k, vv := range headers {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(code)
_, errWrite := w.Write(body)
if errWrite != nil {
log.Printf("[WARN] failed to write response body: %v", errWrite)
}
}
// makeRequest creates plugin request from http.Request
// uses context set by downstream (by proxyHandler)
func (c *Conductor) makeRequest(r *http.Request) lib.Request {
@ -218,6 +283,20 @@ func (c *Conductor) register(p lib.Plugin) error {
pp = append(pp, h)
}
var tpp []Handler //nolint
for _, h := range c.tailPlugins {
if strings.HasPrefix(h.Method, p.Name+".") && h.Address == p.Address { // already registered
log.Printf("[WARN] tail plugin %+v already registered", p)
return nil
}
if strings.HasPrefix(h.Method, p.Name+".") && h.Address != p.Address { // registered, but address changed
log.Printf("[WARN] tail plugin %+v already registered, but address changed to %s", h, p.Address)
continue // remove from the collected pp
}
tpp = append(tpp, h)
}
client, err := c.RPCDialer.Dial("tcp", p.Address)
if err != nil {
return fmt.Errorf("can't reach plugin %+v: %v", p, err)
@ -229,20 +308,36 @@ func (c *Conductor) register(p lib.Plugin) error {
log.Printf("[INFO] register plugin %s, ip: %s, method: %s", p.Name, p.Address, handler.Method)
}
c.plugins = pp
for _, l := range p.TailMethods {
handler := Handler{client: client, Alive: true, Address: p.Address, Method: p.Name + "." + l}
tpp = append(tpp, handler)
log.Printf("[INFO] tail register plugin %s, ip: %s, method: %s", p.Name, p.Address, handler.Method)
}
c.tailPlugins = tpp
return nil
}
// unregister plugin, not thread safe! call should be enclosed with lock
func (c *Conductor) unregister(p lib.Plugin) {
log.Printf("[INFO] unregister plugin %s, ip: %s", p.Name, p.Address)
var res []Handler //nolint
var pp []Handler //nolint
for _, h := range c.plugins {
if strings.HasPrefix(h.Method, p.Name+".") {
continue
}
res = append(res, h)
pp = append(pp, h)
}
c.plugins = res
c.plugins = pp
var tpp []Handler //nolint
for _, h := range c.tailPlugins {
if strings.HasPrefix(h.Method, p.Name+".") {
continue
}
tpp = append(tpp, h)
}
c.tailPlugins = tpp
}
func (c *Conductor) locked(fn func()) {

View File

@ -16,9 +16,10 @@ import (
// Plugin provides cancelable rpc server used to run custom plugins
type Plugin struct {
Name string `json:"name"`
Address string `json:"address"`
Methods []string `json:"methods"`
Name string `json:"name"`
Address string `json:"address"`
Methods []string `json:"methods"`
TailMethods []string `json:"tail_methods"`
}
// Do register the plugin, send info to reproxy conductor and activate RPC listener.

View File

@ -22,6 +22,11 @@ type Request struct {
AssetsLocation string
AssetsWebRoot string
}
// use for tail plugins
ResponseCode int
ResponseBody []byte
ResponseHeaders http.Header
}
// Response from plugin's handler call
@ -32,6 +37,10 @@ type Response struct {
OverrideHeadersIn bool // indicates plugin removing all the original incoming headers
OverrideHeadersOut bool // indicates plugin removing all the original outgoing headers
Break bool // indicates plugin should stop processing the request and returns StatusCode and Body
Body []byte
Break bool // indicates plugin stop processing the request and returns StatusCode and Body
Body []byte // response body if Break is true
// use for tail plugins
OverrideStatusCode bool
OverrideBody bool
}