Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for Azure Blob Storage connector #147

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Supported Connectors:
* [Apache Kafka](https://kafka.apache.org/)
* [Apache Pulsar](https://pulsar.apache.org/)
* AWS ([S3](https://aws.amazon.com/s3/))
* Azure ([Blob Storage](https://azure.microsoft.com/en-us/products/storage/blobs/))
* GCP ([Storage](https://cloud.google.com/storage/))
* [NATS](https://nats.io/)
* [Redis](https://redis.io/)
Expand Down
243 changes: 243 additions & 0 deletions azure/blob_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package azure

import (
"context"
"fmt"
"io"
"log/slog"
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

// BlobStorageSourceConfig represents the configuration options for the Azure
// Blob storage source connector.
type BlobStorageSourceConfig struct {
// The name of the Azure Blob storage container to read from.
Container string
// The path within the container to use. If empty, the root of the container will be used.
Prefix string
// Indicates whether to ignore blob prefixes (virtual directories) in blob segments.
Flat bool
}

// BlobStorageSource represents the Azure Blob storage source connector.
type BlobStorageSource struct {
client *azblob.Client
containerClient *container.Client
config *BlobStorageSourceConfig
out chan any
logger *slog.Logger
}

var _ streams.Source = (*BlobStorageSource)(nil)

// NewBlobStorageSource returns a new [BlobStorageSource].
// The connector reads all objects within the configured path and transmits
// them as a [BlobStorageObject] through the output channel.
func NewBlobStorageSource(ctx context.Context, client *azblob.Client,
config *BlobStorageSourceConfig, logger *slog.Logger) *BlobStorageSource {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "azure.blob"),
slog.String("type", "source")))

blobSource := &BlobStorageSource{
client: client,
containerClient: client.ServiceClient().NewContainerClient(config.Container),
config: config,
out: make(chan any),
logger: logger,
}

// read objects and send them downstream
go blobSource.listBlobs(ctx)

return blobSource
}

func (s *BlobStorageSource) listBlobs(ctx context.Context) {
s.listBlobsHierarchy(ctx, &s.config.Prefix, nil)

s.logger.Info("Blob iteration completed")
close(s.out)
}

func (s *BlobStorageSource) listBlobsHierarchy(ctx context.Context, prefix, marker *string) {
pager := s.containerClient.NewListBlobsHierarchyPager("/",
&container.ListBlobsHierarchyOptions{
Prefix: prefix,
Marker: marker,
})

for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
s.logger.Error("Error reading next page", slog.Any("error", err))
break
}

// set the continuation token
marker = resp.Marker

if !s.config.Flat && resp.Segment.BlobPrefixes != nil {
for _, prefix := range resp.Segment.BlobPrefixes {
s.logger.Debug("Virtual directory", slog.String("prefix", *prefix.Name))

// recursively list blobs in the prefix
s.listBlobsHierarchy(ctx, prefix.Name, nil)
}
}

// list blobs in the current page
for _, blob := range resp.Segment.BlobItems {
resp, err := s.client.DownloadStream(ctx, s.config.Container, *blob.Name, nil)
if err != nil {
s.logger.Error("Error reading blob", slog.Any("error", err))
continue
}

select {
// send the blob downstream
case s.out <- &BlobStorageObject{
Key: *blob.Name,
Data: resp.Body,
}:
case <-ctx.Done():
s.logger.Info("Blob reading terminated", slog.Any("error", ctx.Err()))
return
}
}
}

// retrieve the remainder if the continuation token is available
if marker != nil && *marker != "" {
s.logger.Info("Continuation token is available", slog.String("marker", *marker))
s.listBlobsHierarchy(ctx, prefix, marker)
}
}

// Via streams data to a specified operator and returns it.
func (s *BlobStorageSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(s, operator)
return operator
}

// Out returns the output channel of the BlobStorageSource connector.
func (s *BlobStorageSource) Out() <-chan any {
return s.out
}

// BlobStorageObject contains details of the Azure Blob storage object.
type BlobStorageObject struct {
// Key is the object name including any subdirectories.
// For example, "directory/file.json".
Key string
// Data is an [io.ReadCloser] representing the binary content of the blob object.
Data io.ReadCloser
}

// BlobStorageSinkConfig represents the configuration options for the Azure Blob
// storage sink connector.
type BlobStorageSinkConfig struct {
// The name of the Azure Blob storage container to write to.
Container string
// The number of concurrent workers to use when writing data to Azure Blob storage.
// The default is 1.
Parallelism int
// UploadOptions specifies set of configurations for the blob upload operation.
UploadOptions *blockblob.UploadStreamOptions
}

// BlobStorageSink represents the Azure Blob storage sink connector.
type BlobStorageSink struct {
client *azblob.Client
config *BlobStorageSinkConfig
in chan any
logger *slog.Logger
}

var _ streams.Sink = (*BlobStorageSink)(nil)

// NewBlobStorageSink returns a new [BlobStorageSink].
// Incoming elements are expected to be of the [BlobStorageObject] type. These will
// be uploaded to the configured container using their key field as the path.
func NewBlobStorageSink(ctx context.Context, client *azblob.Client,
config *BlobStorageSinkConfig, logger *slog.Logger) *BlobStorageSink {

if logger == nil {
logger = slog.Default()
}
logger = logger.With(slog.Group("connector",
slog.String("name", "azure.blob"),
slog.String("type", "sink")))

if config.Parallelism < 1 {
config.Parallelism = 1
}

blobSink := &BlobStorageSink{
client: client,
config: config,
in: make(chan any, config.Parallelism),
logger: logger,
}

// start writing incoming data
go blobSink.uploadBlobs(ctx)

return blobSink
}

// uploadBlobs writes incoming stream data elements to Azure Blob storage
// using the configured parallelism.
func (s *BlobStorageSink) uploadBlobs(ctx context.Context) {
var wg sync.WaitGroup
for i := 0; i < s.config.Parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range s.in {
var err error
switch object := data.(type) {
case BlobStorageObject:
err = s.uploadBlob(ctx, &object)
case *BlobStorageObject:
err = s.uploadBlob(ctx, object)
default:
s.logger.Error("Unsupported data type",
slog.String("type", fmt.Sprintf("%T", object)))
}

if err != nil {
s.logger.Error("Error uploading blob",
slog.Any("error", err))
}
}
}()
}

// wait for all writers to exit
wg.Wait()
s.logger.Info("All blob writers exited")
}

// uploadBlob uploads a single blob to Azure Blob storage.
func (s *BlobStorageSink) uploadBlob(ctx context.Context, object *BlobStorageObject) error {
defer object.Data.Close()
_, err := s.client.UploadStream(ctx, s.config.Container, object.Key, object.Data,
s.config.UploadOptions)
return err
}

// In returns the input channel of the BlobStorageSink connector.
func (s *BlobStorageSink) In() chan<- any {
return s.in
}
2 changes: 2 additions & 0 deletions azure/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package azure implements streaming connectors for Microsoft Azure services.
package azure
15 changes: 15 additions & 0 deletions azure/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module github.com/reugn/go-streams/azure

go 1.21.0

require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1
github.com/reugn/go-streams v0.10.0
)

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/text v0.16.0 // indirect
)
38 changes: 38 additions & 0 deletions azure/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0 h1:nyQWyZvwGTvunIMxi1Y9uXkcyr+I7TeNrr/foo4Kpk8=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0/go.mod h1:l38EPgmsp71HHLq9j7De57JcKOWPyhrsW1Awm1JS6K0=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0 h1:tfLQ34V6F7tVSwoTf/4lH5sE0o6eCJuNDTmH09nDpbc=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.7.0/go.mod h1:9kIvujWAA58nmPmWB1m23fyWic1kYZMxD9CxaWn4Qpg=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 h1:PiSrjRPpkQNjrM8H0WwKMnZUdu1RGMtd/LdGKUrOo+c=
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1 h1:cf+OIKbkmMHBaC3u78AXomweqM0oxQSgBXRZf3WH4yM=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.1/go.mod h1:ap1dmS6vQKJxSMNiGJcq4QuUQkOynyD93gLw6MDF7ek=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/reugn/go-streams v0.10.0 h1:Y0wHNihEbHsFOFV2/xTOKvud4ZpJPaRTET01fwx2/rQ=
github.com/reugn/go-streams v0.10.0/go.mod h1:QI5XXifJkVJl2jQ6Cra8I9DvWdJTgqcFYR7amvXZ9Lg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30=
golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/sys v0.22.0 h1:RI27ohtqKCnwULzJLqkv897zojh5/DwS/ENaMzUOaWI=
golang.org/x/sys v0.22.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
1 change: 1 addition & 0 deletions examples/azure/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/blob/blob
Loading
Loading