Skip to content

Commit

Permalink
fixed handling of table names
Browse files Browse the repository at this point in the history
  • Loading branch information
Vipin Singh committed May 14, 2024
1 parent 402b07c commit 0319e36
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 134 deletions.
57 changes: 28 additions & 29 deletions handler/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import (
)

type WebhookHandler struct {
db persistent.DatabaseInterface
tableName string
db persistent.DatabaseInterface
tableNames []string
}


func NewWebhookHandler(db persistent.DatabaseInterface, tableName string) *WebhookHandler {
func NewWebhookHandler(db persistent.DatabaseInterface, tableName []string) *WebhookHandler {
return &WebhookHandler{
db: db,
tableName : tableName,
db: db,
tableNames: tableName,
}
}

Expand Down Expand Up @@ -81,7 +82,7 @@ func (h *WebhookHandler) DBHealthHandler(w http.ResponseWriter, r *http.Request)
return fmt.Errorf("method not allowed")
}

if err := h.db.DescribeTable(h.tableName); err != nil {
if err := h.db.DescribeTable(h.tableNames[0]); err != nil {
http.Error(w, "Database is not healthy: "+err.Error(), http.StatusInternalServerError)
sendAPIError(w, NewAPIError( http.StatusInternalServerError, err, "Database is not healthy:unable to describe table"))
return err
Expand Down Expand Up @@ -176,7 +177,6 @@ func (h *WebhookHandler) WebhookEvents(w http.ResponseWriter, r *http.Request) e
}

log.Printf("Received event type: %s", event.Type)
//tableName := os.Getenv("DYNAMODB_ORDER_TABLE_NAME")

switch event.Type {
case "order/created":
Expand All @@ -186,23 +186,21 @@ func (h *WebhookHandler) WebhookEvents(w http.ResponseWriter, r *http.Request) e
return err
}
log.Printf("Received JSON order created: %+v", &orderCreatedEvent)
h.handleOrderCreated(h.tableName,marketplace, orderCreatedEvent)
h.handleOrderEventCreated(h.tableName,marketplace, orderCreatedEvent)
h.handleOrderEventCreated(h.tableNames[0],marketplace, orderCreatedEvent)
case "variant/stock-updated":
var variantStockUpdatedEvent model.VariantStockUpdated
if err := json.Unmarshal(body, &variantStockUpdatedEvent); err != nil {
http.Error(w, "Failed to decode order created event: "+err.Error(), http.StatusBadRequest)
return err
}
log.Printf("Received JSON order created: %+v", &variantStockUpdatedEvent)
h.handleVariantStockUpdated(h.tableName,marketplace, variantStockUpdatedEvent)
h.handleVariantStockUpdated(h.tableNames[1],marketplace, variantStockUpdatedEvent)
case "product/subscribed":
var ProductSubscribedEvent model.ProductSubscribed
if err := json.NewDecoder(r.Body).Decode(&ProductSubscribedEvent); err != nil {
http.Error(w, "Failed to decode Product SubscribedEvent event: "+err.Error(), http.StatusBadRequest)
return err
}
//log.Printf("Received JSON: %s", ProductSubscribedEvent)
h.handleProductSubscribed(marketplace,ProductSubscribedEvent)
default:
http.Error(w, "Unhandled event type", http.StatusBadRequest)
Expand All @@ -218,26 +216,27 @@ func (h *WebhookHandler) WebhookEvents(w http.ResponseWriter, r *http.Request) e



// Utility functions
/* Utility functions
func (h *WebhookHandler) handleOrderCreated(tableName string,marketplace string, event model.OrderCreated) {
log.Printf("Processing Order Created event for marketplace: %s, Event ID: %s , External Order ID: %s", marketplace, event.EventId, event.ExternalOrderID)
// func (h *WebhookHandler) handleOrderCreated(tableName string,marketplace string, event model.OrderCreated) {
// log.Printf("Processing Order Created event for marketplace: %s, Event ID: %s , External Order ID: %s", marketplace, event.EventId, event.ExternalOrderID)
// Create an instance of EventOptions
opts := model.EventOptions{}
// OrderCreated has an ExternalOrderID that could be empty and not necessarily part of every event.
if event.ExternalOrderID != "" { // Check if ExternalOrderID is non-empty.
opts.ExternalOrderId = &event.ExternalOrderID // If non-empty, set it in the options.
}
// // Create an instance of EventOptions
// opts := model.EventOptions{}
// // OrderCreated has an ExternalOrderID that could be empty and not necessarily part of every event.
// if event.ExternalOrderID != "" { // Check if ExternalOrderID is non-empty.
// opts.ExternalOrderId = &event.ExternalOrderID // If non-empty, set it in the options.
// }
err := h.db.StoreEventData(tableName, event.Type, event.EventId, event.LastUpdated, marketplace, event, opts)
if err != nil {
log.Printf("Error storing event data in handleOrderCreated: %v", err)
return
}
// err := h.db.StoreEventData(tableName, event.Type, event.EventId, event.LastUpdated, marketplace, event, opts)
// if err != nil {
// log.Printf("Error storing event data in handleOrderCreated: %v", err)
// return
// }
log.Println("handleOrderCreated: Successfully processed order creation")
}
// log.Println("handleOrderCreated: Successfully processed order creation")
// }
*/

func (h *WebhookHandler) handleOrderEventCreated(tableName string,marketplace string, event model.OrderCreated) {
log.Printf("Processing Order Created event for marketplace: %s , External Order ID: %s", marketplace, event.ExternalOrderID)
Expand Down Expand Up @@ -271,7 +270,7 @@ func (h *WebhookHandler) handleVariantStockUpdated(tableName string,marketplace
}

func (h *WebhookHandler) handleProductSubscribed(marketplace string,event model.ProductSubscribed) {
log.Printf("Processing Product Subscried event for marketplace: %s, Event ID: %s , Deal ID: %s", marketplace, event.EventId, event.DealID)
// log.Printf("Processing Product Subscried event for marketplace: %s, Event ID: %s , Deal ID: %s", marketplace, event.EventId, event.DealID)
// details := make(map[string]interface{})
// if err := json.Unmarshal(data, &details); err != nil {
// return
Expand Down Expand Up @@ -333,4 +332,4 @@ func unMarshallJSON(body []byte, target interface{}) error {
}
log.Printf("Processed JSON: %+v", target)
return nil
}
}
39 changes: 34 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,26 @@ func main() {
}
defer db.Close()

// Initialize tables on startup
tableName := os.Getenv("DYNAMODB_ORDER_TABLE_NAME")
if err := db.InitializeTables(tableName); err != nil {
log.Fatalf("failed to initialize tables:%s: %v", tableName,err)
// Define the environment variable keys
envVars := []string{
"DYNAMODB_ORDER_TABLE_NAME",
"DYNAMODB_PRODUCT_TABLE_NAME",
}

// Load the table names from environment variables
tableNames := LoadTableNames(envVars...)

// Example usage: Print the loaded table names
for _, tableName := range tableNames {
log.Printf("Loaded table name: %s", tableName)
}
if err := db.InitializeTables(tableNames); err != nil {
log.Fatalf("failed to initialize tables:%s: %v", tableNames,err)
}


// Create the webhook handler with the database dependency
webhookHandler := handler.NewWebhookHandler(db,tableName)
webhookHandler := handler.NewWebhookHandler(db,tableNames)
handler.SetupRoutes(webhookHandler)

log.Printf("Server starting on port: %s", port)
Expand All @@ -48,4 +59,22 @@ func main() {
if errors.Is(err, http.ErrServerClosed) {
log.Printf("server closed\n")
}
}


func LoadTableNames(envVars ...string) []string {
var tableNames []string

for _, envVar := range envVars {
tableName := os.Getenv(envVar)
if tableName != "" {
tableNames = append(tableNames, tableName)
}
}

if len(tableNames) == 0 {
log.Fatalf("No valid table names provided in environment variables")
}

return tableNames
}
46 changes: 23 additions & 23 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (m *MockDB) DescribeTable(tableName string) error {
return args.Error(0)
}

func (m *MockDB) InitializeTables(tableName string) error {
func (m *MockDB) InitializeTables(tableName []string) error {
args := m.Called(tableName)
return args.Error(0)
}
Expand Down Expand Up @@ -85,18 +85,18 @@ func TestHandleWebhook(t *testing.T) {
t.Fatal(err) // Handle errors with JSON marshaling
}

tableName:= "My_Table"
fmt.Println("TableName before mock setup:", tableName)
tableNames := []string{"My_Table"}
fmt.Println("TableName before mock setup:", tableNames[0])
// Setting up the expected call with mock for CreateTableIfNotExists
db.On("CreateTableIfNotExists", tableName).Return(nil)
db.On("CreateTableIfNotExists", tableNames[0]).Return(nil)
// Setting up the expected call with mock
db.On("StoreData",
tableName,
tableNames[0],
"PK#MerchantId:45",
mock.AnythingOfType("model.UserMessageData")).Return(nil)


handler := handler.NewWebhookHandler(db,tableName)
handler := handler.NewWebhookHandler(db,tableNames)

// Setting up a request
req := httptest.NewRequest("POST", "/45", bytes.NewReader(jsonData))
Expand All @@ -117,11 +117,12 @@ func TestHandleWebhook(t *testing.T) {

// TestWebhookEvents tests the webhook handler function
func TestWebhookVariantStockUpdateEvents(t *testing.T) {
t.Skip()
db := new(MockDB)
tableName:= "EventWebhook"
tableNames := []string{"EventWebhook"}

// Initialize the handler
handler := handler.NewWebhookHandler(db,tableName)
handler := handler.NewWebhookHandler(db,tableNames)

// Setup a sample dynamic event for testing
variantStockUpdatedEvent := model.VariantStockUpdated{
Expand All @@ -143,7 +144,7 @@ func TestWebhookVariantStockUpdateEvents(t *testing.T) {

// Mock expected database interactions
db.On("StoreEventData",
tableName,
tableNames[0],
"variant/stock-updated",
"529c8a0d-4b85-495a-a54c-6031995d9c2a",
"2024-05-07T01:47:00.138Z",
Expand Down Expand Up @@ -180,10 +181,10 @@ func TestWebhookVariantStockUpdateEvents(t *testing.T) {
// TestWebhookEvents tests the webhook handler function
func TestWebhookOrderCreatedEvents(t *testing.T) {
db := new(MockDB)
tableName:= "EventWebhook"
tableNames := []string{"EventWebhook"}

// Initialize the handler
handler := handler.NewWebhookHandler(db,tableName)
handler := handler.NewWebhookHandler(db,tableNames)

// Setup a sample dynamic event for testing
orderCreated := model.OrderCreated{
Expand All @@ -208,16 +209,15 @@ func TestWebhookOrderCreatedEvents(t *testing.T) {
t.Fatal(err) // Handle errors with JSON marshaling
}
log.Printf("jsonData %s", jsonData)

// Mock expected database interactions
db.On("StoreEventData",
tableName,
db.On("StoreOrderEventData",
tableNames[0],
"order/created",
"48b4a0d1-2a95-4308-9a45-00c65b6e70e4",
"auto-test-3aef291d-1bf0-41c3-9797-de544b1a41a2",
"2024-05-03T03:48:13.506Z",
"BIGW",
mock.Anything,
mock.AnythingOfType("model.EventOptions")).Return(nil)
mock.Anything).Return(nil)

// Setup a HTTP request for POST method
req := httptest.NewRequest("POST", "/BIGW", bytes.NewReader(jsonData))
Expand Down Expand Up @@ -248,10 +248,10 @@ func TestWebhookOrderCreatedEvents(t *testing.T) {
// TestDBHealthHandler tests the database health check endpoint
func TestDBHealthHandlerOk(t *testing.T) {
db := new(MockDB)
tableName:= "EventWebhook"
db.On("DescribeTable", tableName).Return(nil) // Simulate a healthy database
tableNames := []string{"EventWebhook"}
db.On("DescribeTable", tableNames[0]).Return(nil) // Simulate a healthy database

handler := handler.NewWebhookHandler(db,tableName)
handler := handler.NewWebhookHandler(db,tableNames)
req := httptest.NewRequest("GET", "/dbhealth", nil)
w := httptest.NewRecorder()

Expand All @@ -268,10 +268,10 @@ func TestDBHealthHandlerOk(t *testing.T) {
// TestDBHealthHandlerFail tests the scenario where the database is unhealthy
func TestDBHealthHandlerFail(t *testing.T) {
db := new(MockDB)
tableName:= "EventWebhook"
db.On("DescribeTable", tableName).Return(errors.New("database error")) // Simulate an unhealthy database
tableNames := []string{"EventWebhook"}
db.On("DescribeTable", tableNames[0]).Return(errors.New("database error")) // Simulate an unhealthy database

handler := handler.NewWebhookHandler(db,tableName)
handler := handler.NewWebhookHandler(db,tableNames)
req := httptest.NewRequest("GET", "/dbhealth", nil)
w := httptest.NewRecorder()

Expand Down
32 changes: 22 additions & 10 deletions persistent/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
// DatabaseInterface outlines the methods for database operations
type DatabaseInterface interface {
ConnectToDatabase() error
InitializeTables(tableName string) error
InitializeTables(tableName []string) error
Close()
CreateTableIfNotExists(tableName string) error
CreateEventsTableIfNotExist(config TableConfig) error
Expand Down Expand Up @@ -61,14 +61,14 @@ func NewDatabase() (DatabaseInterface, error) {
return db, nil
}

func (db *Database) InitializeTables(tableName string) error {
log.Printf("InitializeTables")
log.Printf("InitializeTables %s",tableName)
config, err := loadConfig("table.json")
func (db *Database) InitializeTables(tableNames []string) error {
log.Printf("Initialize the dynamodb Tables")
config, err := loadConfig("persistent/table.json")
if err != nil {
log.Fatalf("Failed to load config: %s", err)
}


ReplaceTableNames(config, tableNames)
// Example for a single table, repeat for others or make it dynamic based on configuration
for _, tableConfig := range config.Tables {
err := db.CreateEventsTableIfNotExist(tableConfig)
Expand All @@ -79,6 +79,16 @@ func (db *Database) InitializeTables(tableName string) error {
return nil
}

// ReplaceTableNames replaces the table names in the JSON configuration with the table names from the tableNames slice.
func ReplaceTableNames(config *Config, tableNames []string) {
if len(config.Tables) != len(tableNames) {
log.Fatalf("The number of table names in the environment does not match the number of tables in the JSON configuration")
}

for i := range config.Tables {
config.Tables[i].TableName = tableNames[i]
}
}
// TokenFetcher is a custom implementation of the TokenFetcher interface.
type TokenFetcher struct {
webIdentityToken string
Expand Down Expand Up @@ -355,6 +365,7 @@ func (db *Database) StoreData(tableName, pKey string, data interface{}) error {
return nil
}

/*
func (db *Database) CreateEventsTableIfNotExists(tableName string) error {
log.Printf("CreateEventsTableIfNotExists")
// Check if the table already exists
Expand Down Expand Up @@ -454,8 +465,9 @@ func (db *Database) CreateEventsTableIfNotExists(tableName string) error {
log.Printf("Table %s created successfully", tableName)
return nil
}

*/
// StoreData stores data in the WebhookEvents table in DynamoDB.

func (db *Database) StoreEventData(tableName, eventType, eventId, lastUpdated, merchantId string, eventData interface{}, opts model.EventOptions) error {
log.Printf("StoreEventData")
// Prepare the primary key and sort key
Expand Down Expand Up @@ -508,8 +520,8 @@ func (db *Database) StoreEventData(tableName, eventType, eventId, lastUpdated, m
func (db *Database) StoreOrderEventData(tableName, eventType, externalOrderId, lastUpdated, merchantId string, eventData interface{}) error {
log.Printf("StoreEventData")
// Prepare the primary key and sort key
pk := fmt.Sprintf("PK%s#%s", merchantId, externalOrderId)
sk := fmt.Sprintf("SK%s#%s", lastUpdated,eventType)
pk := fmt.Sprintf("#PK#%s#%s", merchantId, externalOrderId)
sk := fmt.Sprintf("#SK#%s#%s", lastUpdated, eventType)

// Marshal the entire event data into a JSON string for the EventData attribute
eventDataJSON, err := json.Marshal(eventData)
Expand Down Expand Up @@ -599,7 +611,7 @@ func (db *Database) CreateEventsTableIfNotExist(config TableConfig) error {

func loadConfig(filename string) (*Config, error) {
// Convert relative path to absolute path for clarity
absolutePath, err := filepath.Abs("persistent/table.json")
absolutePath, err := filepath.Abs(filename)
if err != nil {
fmt.Printf("Error getting absolute file path: %s\n", err)
return nil, err
Expand Down
Loading

0 comments on commit 0319e36

Please sign in to comment.