clickhouse.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package database
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/ClickHouse/clickhouse-go/v2"
  7. "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
  8. "sync"
  9. "time"
  10. )
  11. type _ch struct {
  12. Database[driver.Rows, driver.Batch]
  13. config *configCH
  14. conn clickhouse.Conn
  15. }
  16. //goland:noinspection GoUnusedExportedFunction
  17. func NewClickhouse(cfg *Config, debug bool) (Database[driver.Rows, driver.Batch], error) {
  18. if cfg == nil || cfg.Clickhouse == nil {
  19. return nil, fmt.Errorf("illegal state: `*config.ClickhouseConfig` is nil")
  20. }
  21. var tlcConfig *tls.Config
  22. if cfg.Clickhouse.Params.Secure {
  23. tlcConfig = &tls.Config{
  24. InsecureSkipVerify: true,
  25. }
  26. }
  27. session, err := clickhouse.Open(&clickhouse.Options{
  28. Protocol: clickhouse.Native,
  29. TLS: tlcConfig,
  30. Addr: []string{
  31. fmt.Sprintf("%s:%d", cfg.Clickhouse.Host, cfg.Clickhouse.Port),
  32. },
  33. Auth: clickhouse.Auth{
  34. Database: cfg.Clickhouse.Database,
  35. Username: cfg.Clickhouse.Username,
  36. Password: cfg.Clickhouse.Password,
  37. },
  38. Debug: debug,
  39. Compression: &clickhouse.Compression{
  40. Method: cfg.Clickhouse.Params.CompressionMethod(),
  41. },
  42. DialTimeout: time.Duration(cfg.Clickhouse.Params.Timeout) * time.Second,
  43. ReadTimeout: time.Duration(cfg.Clickhouse.Params.ReadTimeout) * time.Second,
  44. })
  45. if err != nil {
  46. return nil, err
  47. }
  48. s := &_ch{
  49. config: cfg.Clickhouse,
  50. conn: session,
  51. }
  52. if err = s.Ping(context.TODO()); err != nil {
  53. return nil, err
  54. }
  55. return s, nil
  56. }
  57. func (db *_ch) String() string {
  58. return "clickhouse"
  59. }
  60. func (db *_ch) Close() error {
  61. var wg sync.WaitGroup
  62. wg.Add(1)
  63. go func() {
  64. defer wg.Done()
  65. if db.conn != nil {
  66. _ = db.conn.Close()
  67. }
  68. }()
  69. wg.Wait()
  70. return nil
  71. }
  72. func (db *_ch) Ping(ctx context.Context) error {
  73. return db.conn.Ping(ctx)
  74. }
  75. func (db *_ch) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
  76. return db.conn.Query(ctx, query, args...)
  77. }