-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommits-csv-to-db.go
113 lines (97 loc) · 2.7 KB
/
commits-csv-to-db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package cmd
import (
"context"
"encoding/csv"
"fmt"
"io"
"log"
"os"
"strconv"
"sync"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const (
workersCount = 10 // Количество воркеров для парсинга CSV
batchSize = 100 // Размер пакета для записи в MongoDB
)
// type User struct {
// Name string
// Age int
// }
func parseCSVWorker(jobs <-chan []string, results chan<- User) {
for record := range jobs {
// Предположим, что первый столбец - имя, второй - возраст
user := User{Name: record[0], Age: parseInt(record[1])}
results <- user
}
}
func batchInsertUsers(ctx context.Context, client *mongo.Client, users []User) {
// Преобразование пользователей в []interface{} для InsertMany
var documents []interface{}
for _, user := range users {
documents = append(documents, user)
}
collection := client.Database("yourDatabase").Collection("users")
_, err := collection.InsertMany(ctx, documents)
if err != nil {
log.Printf("Error inserting documents: %v", err)
}
}
func main() {
client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI("mongodb://localhost:27017"))
if err != nil {
log.Fatal(err)
}
defer client.Disconnect(context.Background())
csvFile, err := os.Open("path/to/your/file.csv")
if err != nil {
log.Fatal("Failed to open CSV file:", err)
}
defer csvFile.Close()
jobs := make(chan []string)
results := make(chan User, batchSize)
// Запуск воркеров для парсинга CSV
for i := 0; i < workersCount; i++ {
go parseCSVWorker(jobs, results)
}
// Чтение CSV и отправка строк в канал jobs
go func() {
reader := csv.NewReader(csvFile)
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal("Failed to read CSV line:", err)
}
jobs <- record
}
close(jobs)
}()
// Пакетная обработка и запись результатов
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var batch []User
for user := range results {
batch = append(batch, user)
if len(batch) >= batchSize {
batchInsertUsers(context.Background(), client, batch)
batch = nil // Сброс пакета после вставки
}
}
// Обработка оставшихся пользователей
if len(batch) > 0 {
batchInsertUsers(context.Background(), client, batch)
}
}()
wg.Wait()
fmt.Println("Data processing completed.")
}
func parseInt(s string) (int, error) {
return strconv.Atoi(s)
}
// parseInt и другие необходимые функции...