From b5b052d391d9553f61dca605dcbf6f753f1f9127 Mon Sep 17 00:00:00 2001 From: eyedeekay Date: Thu, 10 Apr 2025 15:18:30 -0400 Subject: [PATCH] Add basic struct and example usage --- example/main.go | 83 ++++++++++++++ go.mod | 3 + metalistener.go | 293 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 379 insertions(+) create mode 100644 example/main.go create mode 100644 go.mod create mode 100644 metalistener.go diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..8b8049c --- /dev/null +++ b/example/main.go @@ -0,0 +1,83 @@ +package main + +import ( + "context" + "fmt" + "log" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/go-i2p/go-meta-listener" +) + +func main() { + // Create a new meta listener + metaListener := meta.NewMetaListener() + defer metaListener.Close() + + // Create and add TCP listener + tcpListener, err := net.Listen("tcp", "127.0.0.1:8082") + if err != nil { + log.Fatalf("Failed to create TCP listener: %v", err) + } + if err := metaListener.AddListener("tcp", tcpListener); err != nil { + log.Fatalf("Failed to add TCP listener: %v", err) + } + log.Println("Added TCP listener on 127.0.0.1:8080") + + // Create and add a Unix socket listener (on Unix systems) + socketPath := "/tmp/example.sock" + os.Remove(socketPath) // Clean up from previous runs + unixListener, err := net.Listen("unix", socketPath) + if err != nil { + log.Printf("Failed to create Unix socket listener: %v", err) + } else { + if err := metaListener.AddListener("unix", unixListener); err != nil { + log.Printf("Failed to add Unix socket listener: %v", err) + } else { + log.Println("Added Unix socket listener on", socketPath) + } + } + + // Create a simple HTTP server using the meta listener + server := &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Hello from MetaListener! You connected via: %s\n", r.Proto) + }), + } + + // Handle server shutdown gracefully + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + go func() { + log.Println("Server starting, listening on multiple transports") + if err := server.Serve(metaListener); err != nil && err != http.ErrServerClosed { + log.Fatalf("HTTP server error: %v", err) + } + }() + + // Wait for interrupt signal + <-stop + log.Println("Shutting down server...") + + // Create a deadline for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Shut down the HTTP server + if err := server.Shutdown(ctx); err != nil { + log.Printf("Server shutdown error: %v", err) + } + + // Wait for all listener goroutines to exit + if err := metaListener.WaitForShutdown(ctx); err != nil { + log.Printf("Timed out waiting for listener shutdown: %v", err) + } + + log.Println("Server stopped") +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..aeb7293 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module github.com/go-i2p/go-meta-listener + +go 1.23.5 diff --git a/metalistener.go b/metalistener.go new file mode 100644 index 0000000..60ee087 --- /dev/null +++ b/metalistener.go @@ -0,0 +1,293 @@ +package meta + +import ( + "context" + "errors" + "fmt" + "net" + "sync" + "time" +) + +var ( + // ErrListenerClosed is returned when attempting to accept on a closed listener + ErrListenerClosed = errors.New("listener is closed") + // ErrNoListeners is returned when the meta listener has no active listeners + ErrNoListeners = errors.New("no active listeners") +) + +// MetaListener implements the net.Listener interface and manages multiple +// underlying network listeners as a unified interface. +type MetaListener struct { + // listeners is a map of registered listeners with their unique identifiers + listeners map[string]net.Listener + // listenerWg tracks active listener goroutines for graceful shutdown + listenerWg sync.WaitGroup + // connCh is used to receive connections from all managed listeners + connCh chan ConnResult + // errCh is used to receive errors from all managed listeners + errCh chan error + // closeCh signals all goroutines to stop + closeCh chan struct{} + // isClosed indicates whether the meta listener has been closed + isClosed bool + // mu protects concurrent access to the listener's state + mu sync.RWMutex +} + +// ConnResult represents a connection received from a listener +type ConnResult struct { + net.Conn + src string // source listener ID +} + +// NewMetaListener creates a new MetaListener instance ready to manage multiple listeners. +func NewMetaListener() *MetaListener { + return &MetaListener{ + listeners: make(map[string]net.Listener), + connCh: make(chan ConnResult), + errCh: make(chan error, 1), // Buffered to prevent blocking + closeCh: make(chan struct{}), + } +} + +// AddListener adds a new listener with the specified ID. +// Returns an error if a listener with the same ID already exists or if the +// provided listener is nil. +func (ml *MetaListener) AddListener(id string, listener net.Listener) error { + if listener == nil { + return errors.New("cannot add nil listener") + } + + ml.mu.Lock() + defer ml.mu.Unlock() + + if ml.isClosed { + return ErrListenerClosed + } + + if _, exists := ml.listeners[id]; exists { + return fmt.Errorf("listener with ID '%s' already exists", id) + } + + ml.listeners[id] = listener + + // Start a goroutine to handle connections from this listener + ml.listenerWg.Add(1) + go ml.handleListener(id, listener) + + return nil +} + +// RemoveListener stops and removes the listener with the specified ID. +// Returns an error if no listener with that ID exists. +func (ml *MetaListener) RemoveListener(id string) error { + ml.mu.Lock() + defer ml.mu.Unlock() + + listener, exists := ml.listeners[id] + if !exists { + return fmt.Errorf("no listener with ID '%s' exists", id) + } + + // Close the specific listener + err := listener.Close() + delete(ml.listeners, id) + + return err +} + +// handleListener runs in a separate goroutine for each added listener +// and forwards accepted connections to the connCh channel. +func (ml *MetaListener) handleListener(id string, listener net.Listener) { + defer ml.listenerWg.Done() + + for { + conn, err := listener.Accept() + + select { + case <-ml.closeCh: + // Meta listener is being closed, exit + return + default: + // Continue processing + } + + if err != nil { + // Check if this is a temporary error + if netErr, ok := err.(net.Error); ok && netErr.Temporary() { + // For temporary errors, wait a bit and try again + time.Sleep(100 * time.Millisecond) + continue + } + + // For non-temporary errors, check if listener was closed + ml.mu.RLock() + _, stillExists := ml.listeners[id] + ml.mu.RUnlock() + + if stillExists { + // Only report error if this listener hasn't been removed + select { + case ml.errCh <- fmt.Errorf("listener %s error: %w", id, err): + default: + // Don't block if no one is reading errors + } + } + return + } + + // Send the accepted connection to the connection channel + select { + case ml.connCh <- ConnResult{Conn: conn, src: id}: + // Connection forwarded successfully + case <-ml.closeCh: + // If we're closing and got a connection, close it + conn.Close() + return + } + } +} + +// Accept implements the net.Listener Accept method. +// It waits for and returns the next connection from any of the managed listeners. +func (ml *MetaListener) Accept() (net.Conn, error) { + ml.mu.RLock() + if ml.isClosed { + ml.mu.RUnlock() + return nil, ErrListenerClosed + } + + if len(ml.listeners) == 0 { + ml.mu.RUnlock() + return nil, ErrNoListeners + } + ml.mu.RUnlock() + + // Wait for either a connection, an error, or close signal + select { + case result := <-ml.connCh: + return result.Conn, nil + case err := <-ml.errCh: + return nil, err + case <-ml.closeCh: + return nil, ErrListenerClosed + } +} + +// Close implements the net.Listener Close method. +// It closes all managed listeners and releases resources. +func (ml *MetaListener) Close() error { + ml.mu.Lock() + + if ml.isClosed { + ml.mu.Unlock() + return nil + } + + ml.isClosed = true + + // Signal all goroutines to stop + close(ml.closeCh) + + // Close all listeners + var errs []error + for id, listener := range ml.listeners { + if err := listener.Close(); err != nil { + errs = append(errs, fmt.Errorf("failed to close listener %s: %w", id, err)) + } + } + + ml.mu.Unlock() + + // Wait for all listener goroutines to exit + ml.listenerWg.Wait() + + // Return combined errors if any + if len(errs) > 0 { + return fmt.Errorf("errors closing listeners: %v", errs) + } + + return nil +} + +// Addr implements the net.Listener Addr method. +// It returns a MetaAddr representing all managed listeners. +func (ml *MetaListener) Addr() net.Addr { + ml.mu.RLock() + defer ml.mu.RUnlock() + + addresses := make([]net.Addr, 0, len(ml.listeners)) + for _, listener := range ml.listeners { + addresses = append(addresses, listener.Addr()) + } + + return &MetaAddr{addresses: addresses} +} + +// ListenerIDs returns the IDs of all active listeners. +func (ml *MetaListener) ListenerIDs() []string { + ml.mu.RLock() + defer ml.mu.RUnlock() + + ids := make([]string, 0, len(ml.listeners)) + for id := range ml.listeners { + ids = append(ids, id) + } + + return ids +} + +// Count returns the number of active listeners. +func (ml *MetaListener) Count() int { + ml.mu.RLock() + defer ml.mu.RUnlock() + + return len(ml.listeners) +} + +// WaitForShutdown blocks until all listener goroutines have exited. +// This is useful for ensuring clean shutdown in server applications. +func (ml *MetaListener) WaitForShutdown(ctx context.Context) error { + done := make(chan struct{}) + + go func() { + ml.listenerWg.Wait() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// MetaAddr implements the net.Addr interface for a meta listener. +type MetaAddr struct { + addresses []net.Addr +} + +// Network returns the name of the network. +func (ma *MetaAddr) Network() string { + return "meta" +} + +// String returns a string representation of all managed addresses. +func (ma *MetaAddr) String() string { + if len(ma.addresses) == 0 { + return "meta(empty)" + } + + result := "meta(" + for i, addr := range ma.addresses { + if i > 0 { + result += ", " + } + result += addr.String() + } + result += ")" + + return result +}