Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
if: steps.check-for-backend.outputs.has-backend == 'true'
uses: actions/setup-go@v3
with:
go-version: '1.21'
go-version: '1.26'

- name: Test backend
if: steps.check-for-backend.outputs.has-backend == 'true'
Expand Down
2 changes: 1 addition & 1 deletion .go-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.25.6
1.26.3
102 changes: 44 additions & 58 deletions pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,18 @@ package plugin

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/NeedleInAJayStack/haystack"
"github.com/NeedleInAJayStack/haystack/client"
"github.com/NeedleInAJayStack/haystack/io"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
Expand All @@ -38,24 +36,29 @@ func NewDatasource(ctx context.Context, settings backend.DataSourceInstanceSetti

// settings contains normal inputs in the .JSONData field in JSON byte form
var options Options
jsonErr := json.Unmarshal(settings.JSONData, &options)
if jsonErr != nil {
return nil, jsonErr
err := json.Unmarshal(settings.JSONData, &options)
if err != nil {
return nil, fmt.Errorf("datasource options: %w", err)
}
url := options.Url
username := options.Username

// settings contains secure inputs in .DecryptedSecureJSONData in a string:string map
password := settings.DecryptedSecureJSONData["password"]

client := client.NewClientFromHTTP(url, username, password, &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: options.SkipTlsVerify},
},
})
openErr := client.Open()
if openErr != nil {
return nil, openErr
httpClientOptions, err := settings.HTTPClientOptions(ctx)
if err != nil {
return nil, fmt.Errorf("http client options: %w", err)
}
httpClientOptions.TLS = &httpclient.TLSOptions{InsecureSkipVerify: options.SkipTlsVerify}
httpClient, err := httpclient.New(httpClientOptions)
if err != nil {
return nil, fmt.Errorf("new http client: %w", err)
}
client := client.NewClientFromHTTP(url, username, password, httpClient)
err = client.Open()
if err != nil {
return nil, fmt.Errorf("haystack client opening: %w", err)
}
datasource := Datasource{client: client}
return &datasource, nil
Expand Down Expand Up @@ -117,10 +120,10 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
// Unmarshal the JSON into our queryModel.
var model QueryModel

jsonErr := json.Unmarshal(query.JSON, &model)
if jsonErr != nil {
log.DefaultLogger.Error(jsonErr.Error())
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("json unmarshal failure: %v", jsonErr.Error()))
err := json.Unmarshal(query.JSON, &model)
if err != nil {
log.DefaultLogger.Error(err.Error())
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("json unmarshal failure: %v", err.Error()))
}

variables := map[string]string{
Expand Down Expand Up @@ -201,37 +204,24 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
return backend.ErrDataResponse(backend.StatusBadRequest, errMsg)
}

// Function to read a single point and send it to a channel.
readPoint := func(point haystack.Row, hisReadChannel chan haystack.Grid, wg *sync.WaitGroup) {
hisRead, err := datasource.hisRead(point, query.TimeRange)
if err != nil {
log.DefaultLogger.Error(err.Error())
}
hisReadChannel <- hisRead // hisRead is empty under error condition
wg.Done()
}

// Start a goroutine to collect all the grids into a slice.
hisReadChannel := make(chan haystack.Grid)
combinedChannel := make(chan []haystack.Grid)
go func() {
grids := []haystack.Grid{}
for grid := range hisReadChannel {
grids = append(grids, grid)
}
combinedChannel <- grids
}()

// Read all the points in parallel using goroutines.
var wg sync.WaitGroup
wg.Add(len(points))
hisReadChannel := make(chan haystack.Grid)
for _, point := range points {
go readPoint(point, hisReadChannel, &wg)
go func() {
hisRead, err := datasource.hisRead(point, query.TimeRange)
if err != nil {
log.DefaultLogger.Error(err.Error())
}
hisReadChannel <- hisRead // hisRead is empty under error condition
}()
}

grids := []haystack.Grid{}
for _ = range len(points) {
grid := <-hisReadChannel
grids = append(grids, grid)
}
wg.Wait()
close(hisReadChannel)

grids := <-combinedChannel
response := responseFromGrids(grids)
// Make the display name on the "val" fields the names of the points.
for _, frame := range response.Frames {
Expand Down Expand Up @@ -260,11 +250,7 @@ func (datasource *Datasource) query(ctx context.Context, pCtx backend.PluginCont
func responseFromGrids(grids []haystack.Grid) backend.DataResponse {
frames := data.Frames{}
for _, grid := range grids {
frame, frameErr := dataFrameFromGrid(grid)
if frameErr != nil {
log.DefaultLogger.Error(frameErr.Error())
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("Frame conversion failure: %v", frameErr.Error()))
}
frame := dataFrameFromGrid(grid)
frames = append(frames, frame)
}

Expand Down Expand Up @@ -333,13 +319,13 @@ func (datasource *Datasource) hisRead(point haystack.Row, timeRange backend.Time

// Must convert input date range to the point's timezone.
// See https://github.com/skyfoundry/haystack-java/blob/30380dbbe4b5d9be8eb3f400195b0cdcdcc67b95/src/main/java/org/projecthaystack/server/HServer.java#L328
start, startErr := haystack.NewDateTimeFromGo(timeRange.From).ToTz(tz.String())
if startErr != nil {
return haystack.EmptyGrid(), startErr
start, err := haystack.NewDateTimeFromGo(timeRange.From).ToTz(tz.String())
if err != nil {
return haystack.EmptyGrid(), fmt.Errorf("start time: %w", err)
}
end, endErr := haystack.NewDateTimeFromGo(timeRange.To).ToTz(tz.String())
if endErr != nil {
return haystack.EmptyGrid(), endErr
end, err := haystack.NewDateTimeFromGo(timeRange.To).ToTz(tz.String())
if err != nil {
return haystack.EmptyGrid(), fmt.Errorf("end time: %w", err)
}

return datasource.withRetry(
Expand Down Expand Up @@ -415,7 +401,7 @@ func (datasource *Datasource) withRetry(
}

// dataFrameFromGrid converts a haystack grid to a Grafana data frame
func dataFrameFromGrid(grid haystack.Grid) (*data.Frame, error) {
func dataFrameFromGrid(grid haystack.Grid) *data.Frame {
fields := []*data.Field{}

for _, col := range grid.Cols() {
Expand Down Expand Up @@ -527,7 +513,7 @@ func dataFrameFromGrid(grid haystack.Grid) (*data.Frame, error) {
frame := data.NewFrame("response", fields...)
frameName := disFromMeta(grid.Meta(), "")
frame.Name = frameName
return frame, nil
return frame
}

// disFromMeta returns the display name using metadata. It falls back to the provided string if no other name can be found
Expand Down
Loading