Replace olivere/elastic with REST API client, add OpenSearch support (#37411)

Drops `github.com/olivere/elastic/v7` (unmaintained) and replaces it
with a small in-house wrapper that speaks the Elasticsearch REST API
directly via `net/http`. The subset used by Gitea (`_cluster/health`,
`_bulk`, `_doc`, `_delete_by_query`, `_refresh`, `_search`, `HEAD`/`PUT`
index) is stable across the targeted servers, so no client library is
needed.

**Targets tested**
- Elasticsearch 7, 8, 9
- OpenSearch 1, 2, 3

**Why not `go-elasticsearch`?**
The official client enforces an `X-Elastic-Product` server-identity
check that OpenSearch deliberately fails, which would force shipping a
transport shim to defeat it. Going direct over `net/http` removes that
fight along with several MB of transitive deps (`elastic-transport-go`,
`go.opentelemetry.io/otel{,/metric,/trace}`, `auto/sdk`, `easyjson`,
`intern`, `logr`, `stdr`).

Replaces: #30755
Fixes: https://github.com/go-gitea/gitea/issues/30752

---
This PR was written with the help of Claude Opus 4.7

---------

Co-authored-by: Claude (Opus 4.7) <noreply@anthropic.com>
Co-authored-by: Lunny Xiao <xiaolunwen@gmail.com>
This commit is contained in:
silverwind
2026-05-02 00:12:54 +02:00
committed by GitHub
parent 31cee60cc7
commit abcfa53040
15 changed files with 825 additions and 361 deletions

View File

@@ -102,9 +102,10 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
services: services:
elasticsearch: elasticsearch:
image: elasticsearch:7.5.0 image: docker.elastic.co/elasticsearch/elasticsearch:8.19.14
env: env:
discovery.type: single-node discovery.type: single-node
xpack.security.enabled: false
ports: ports:
- "9200:9200" - "9200:9200"
meilisearch: meilisearch:
@@ -180,9 +181,10 @@ jobs:
options: >- options: >-
--mount type=tmpfs,destination=/bitnami/mysql/data --mount type=tmpfs,destination=/bitnami/mysql/data
elasticsearch: elasticsearch:
image: elasticsearch:7.5.0 image: docker.elastic.co/elasticsearch/elasticsearch:8.19.14
env: env:
discovery.type: single-node discovery.type: single-node
xpack.security.enabled: false
ports: ports:
- "9200:9200" - "9200:9200"
smtpimap: smtpimap:

View File

@@ -1004,16 +1004,6 @@
"path": "github.com/olekukonko/tablewriter/LICENSE.md", "path": "github.com/olekukonko/tablewriter/LICENSE.md",
"licenseText": "Copyright (C) 2014 by Oleku Konko\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the \"Software\"), to deal\nin the Software without restriction, including without limitation the rights\nto use, copy, modify, merge, publish, distribute, sublicense, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice and this permission notice shall be included in\nall copies or substantial portions of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\nOUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\nTHE SOFTWARE.\n" "licenseText": "Copyright (C) 2014 by Oleku Konko\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the \"Software\"), to deal\nin the Software without restriction, including without limitation the rights\nto use, copy, modify, merge, publish, distribute, sublicense, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice and this permission notice shall be included in\nall copies or substantial portions of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,\nOUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN\nTHE SOFTWARE.\n"
}, },
{
"name": "github.com/olivere/elastic/v7",
"path": "github.com/olivere/elastic/v7/LICENSE",
"licenseText": "The MIT License (MIT)\nCopyright © 2012-2015 Oliver Eilhard\n\nPermission is hereby granted, free of charge, to any person obtaining a copy\nof this software and associated documentation files (the “Software”), to deal\nin the Software without restriction, including without limitation the rights\nto use, copy, modify, merge, publish, distribute, sublicense, and/or sell\ncopies of the Software, and to permit persons to whom the Software is\nfurnished to do so, subject to the following conditions:\n\nThe above copyright notice and this permission notice shall be included\nin all copies or substantial portions of the Software.\n\nTHE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,\nFITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE\nAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER\nLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING\nFROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS\nIN THE SOFTWARE.\n"
},
{
"name": "github.com/olivere/elastic/v7/uritemplates",
"path": "github.com/olivere/elastic/v7/uritemplates/LICENSE",
"licenseText": "Copyright (c) 2013 Joshua Tacoma\n\nPermission is hereby granted, free of charge, to any person obtaining a copy of\nthis software and associated documentation files (the \"Software\"), to deal in\nthe Software without restriction, including without limitation the rights to\nuse, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of\nthe Software, and to permit persons to whom the Software is furnished to do so,\nsubject to the following conditions:\n\nThe above copyright notice and this permission notice shall be included in all\ncopies or substantial portions of the Software.\n\nTHE SOFTWARE IS PROVIDED \"AS IS\", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR\nIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS\nFOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR\nCOPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER\nIN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN\nCONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.\n"
},
{ {
"name": "github.com/opencontainers/go-digest", "name": "github.com/opencontainers/go-digest",
"path": "github.com/opencontainers/go-digest/LICENSE", "path": "github.com/opencontainers/go-digest/LICENSE",

View File

@@ -1524,7 +1524,7 @@ LEVEL = Info
;; Issue Indexer settings ;; Issue Indexer settings
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ;;
;; Issue indexer type, currently support: bleve, db, elasticsearch or meilisearch default is bleve ;; Issue indexer type, currently support: bleve, db, elasticsearch (also compatible with OpenSearch) or meilisearch default is bleve
;ISSUE_INDEXER_TYPE = bleve ;ISSUE_INDEXER_TYPE = bleve
;; ;;
;; Issue indexer storage path, available when ISSUE_INDEXER_TYPE is bleve ;; Issue indexer storage path, available when ISSUE_INDEXER_TYPE is bleve
@@ -1551,7 +1551,7 @@ LEVEL = Info
;; If empty then it defaults to `sources` only, as if you'd like to disable fully please see REPO_INDEXER_ENABLED. ;; If empty then it defaults to `sources` only, as if you'd like to disable fully please see REPO_INDEXER_ENABLED.
;REPO_INDEXER_REPO_TYPES = sources,forks,mirrors,templates ;REPO_INDEXER_REPO_TYPES = sources,forks,mirrors,templates
;; ;;
;; Code search engine type, could be `bleve` or `elasticsearch`. ;; Code search engine type, could be `bleve` or `elasticsearch` (also compatible with OpenSearch).
;REPO_INDEXER_TYPE = bleve ;REPO_INDEXER_TYPE = bleve
;; ;;
;; Index file used for code search. available when `REPO_INDEXER_TYPE` is bleve ;; Index file used for code search. available when `REPO_INDEXER_TYPE` is bleve

3
go.mod
View File

@@ -87,7 +87,6 @@ require (
github.com/msteinert/pam/v2 v2.1.0 github.com/msteinert/pam/v2 v2.1.0
github.com/nektos/act v0.2.63 github.com/nektos/act v0.2.63
github.com/niklasfasching/go-org v1.9.1 github.com/niklasfasching/go-org v1.9.1
github.com/olivere/elastic/v7 v7.0.32
github.com/opencontainers/go-digest v1.0.0 github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/image-spec v1.1.1 github.com/opencontainers/image-spec v1.1.1
github.com/pquerna/otp v1.5.0 github.com/pquerna/otp v1.5.0
@@ -222,7 +221,7 @@ require (
github.com/klauspost/crc32 v1.3.0 // indirect github.com/klauspost/crc32 v1.3.0 // indirect
github.com/klauspost/pgzip v1.2.6 // indirect github.com/klauspost/pgzip v1.2.6 // indirect
github.com/libdns/libdns v1.1.1 // indirect github.com/libdns/libdns v1.1.1 // indirect
github.com/mailru/easyjson v0.9.2 // indirect github.com/mailru/easyjson v0.7.7 // indirect
github.com/markbates/going v1.0.3 // indirect github.com/markbates/going v1.0.3 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-runewidth v0.0.21 // indirect github.com/mattn/go-runewidth v0.0.21 // indirect

10
go.sum
View File

@@ -267,8 +267,6 @@ github.com/fatih/color v1.19.0 h1:Zp3PiM21/9Ld6FzSKyL5c/BULoe/ONr9KlbYVOfG8+w=
github.com/fatih/color v1.19.0/go.mod h1:zNk67I0ZUT1bEGsSGyCZYZNrHuTkJJB+r6Q9VuMi0LE= github.com/fatih/color v1.19.0/go.mod h1:zNk67I0ZUT1bEGsSGyCZYZNrHuTkJJB+r6Q9VuMi0LE=
github.com/felixge/fgprof v0.9.5 h1:8+vR6yu2vvSKn08urWyEuxx75NWPEvybbkBirEpsbVY= github.com/felixge/fgprof v0.9.5 h1:8+vR6yu2vvSKn08urWyEuxx75NWPEvybbkBirEpsbVY=
github.com/felixge/fgprof v0.9.5/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM= github.com/felixge/fgprof v0.9.5/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
@@ -507,9 +505,8 @@ github.com/lib/pq v1.12.3/go.mod h1:/p+8NSbOcwzAEI7wiMXFlgydTwcgTr3OSKMsD2BitpA=
github.com/libdns/libdns v1.1.1 h1:wPrHrXILoSHKWJKGd0EiAVmiJbFShguILTg9leS/P/U= github.com/libdns/libdns v1.1.1 h1:wPrHrXILoSHKWJKGd0EiAVmiJbFShguILTg9leS/P/U=
github.com/libdns/libdns v1.1.1/go.mod h1:4Bj9+5CQiNMVGf87wjX4CY3HQJypUHRuLvlsfsZqLWQ= github.com/libdns/libdns v1.1.1/go.mod h1:4Bj9+5CQiNMVGf87wjX4CY3HQJypUHRuLvlsfsZqLWQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.9.2 h1:dX8U45hQsZpxd80nLvDGihsQ/OxlvTkVUXH2r/8cb2M=
github.com/mailru/easyjson v0.9.2/go.mod h1:1+xMtQp2MRNVL/V1bOzuP3aP8VNwRW55fQUto+XFtTU=
github.com/markbates/going v1.0.3 h1:mY45T5TvW+Xz5A6jY7lf4+NLg9D8+iuStIHyR7M8qsE= github.com/markbates/going v1.0.3 h1:mY45T5TvW+Xz5A6jY7lf4+NLg9D8+iuStIHyR7M8qsE=
github.com/markbates/going v1.0.3/go.mod h1:fQiT6v6yQar9UD6bd/D4Z5Afbk9J6BBVBtLiyY4gp2o= github.com/markbates/going v1.0.3/go.mod h1:fQiT6v6yQar9UD6bd/D4Z5Afbk9J6BBVBtLiyY4gp2o=
github.com/markbates/goth v1.82.0 h1:8j/c34AjBSTNzO7zTsOyP5IYCQCMBTRBHAbBt/PI0bQ= github.com/markbates/goth v1.82.0 h1:8j/c34AjBSTNzO7zTsOyP5IYCQCMBTRBHAbBt/PI0bQ=
@@ -585,8 +582,6 @@ github.com/olekukonko/ll v0.1.8 h1:ysHCJRGHYKzmBSdz9w5AySztx7lG8SQY+naTGYUbsz8=
github.com/olekukonko/ll v0.1.8/go.mod h1:RPRC6UcscfFZgjo1nulkfMH5IM0QAYim0LfnMvUuozw= github.com/olekukonko/ll v0.1.8/go.mod h1:RPRC6UcscfFZgjo1nulkfMH5IM0QAYim0LfnMvUuozw=
github.com/olekukonko/tablewriter v1.1.4 h1:ORUMI3dXbMnRlRggJX3+q7OzQFDdvgbN9nVWj1drm6I= github.com/olekukonko/tablewriter v1.1.4 h1:ORUMI3dXbMnRlRggJX3+q7OzQFDdvgbN9nVWj1drm6I=
github.com/olekukonko/tablewriter v1.1.4/go.mod h1:+kedxuyTtgoZLwif3P1Em4hARJs+mVnzKxmsCL/C5RY= github.com/olekukonko/tablewriter v1.1.4/go.mod h1:+kedxuyTtgoZLwif3P1Em4hARJs+mVnzKxmsCL/C5RY=
github.com/olivere/elastic/v7 v7.0.32 h1:R7CXvbu8Eq+WlsLgxmKVKPox0oOwAE/2T9Si5BnvK6E=
github.com/olivere/elastic/v7 v7.0.32/go.mod h1:c7PVmLe3Fxq77PIfY/bZmxY/TAamBhCzZ8xDOE09a9k=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
@@ -667,9 +662,8 @@ github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w
github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g= github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g=
github.com/skeema/knownhosts v1.3.2 h1:EDL9mgf4NzwMXCTfaxSD/o/a5fxDw/xL9nkU28JjdBg= github.com/skeema/knownhosts v1.3.2 h1:EDL9mgf4NzwMXCTfaxSD/o/a5fxDw/xL9nkU28JjdBg=
github.com/skeema/knownhosts v1.3.2/go.mod h1:bEg3iQAuw+jyiw+484wwFJoKSLwcfd7fqRy+N0QTiow= github.com/skeema/knownhosts v1.3.2/go.mod h1:bEg3iQAuw+jyiw+484wwFJoKSLwcfd7fqRy+N0QTiow=
github.com/smartystreets/assertions v0.0.0-20190116191733-b6c0e53d7304 h1:Jpy1PXuP99tXNrhbq2BaPz9B+jNAvH1JPQQpG/9GCXY=
github.com/smartystreets/assertions v0.0.0-20190116191733-b6c0e53d7304/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v0.0.0-20190116191733-b6c0e53d7304/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.1.1 h1:T/YLemO5Yp7KPzS+lVtu+WsHn8yoSwTfItdAd1r3cck=
github.com/smartystreets/assertions v1.1.1/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8=
github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=

View File

@@ -18,8 +18,7 @@ import (
"code.gitea.io/gitea/modules/gitrepo" "code.gitea.io/gitea/modules/gitrepo"
"code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/indexer"
"code.gitea.io/gitea/modules/indexer/code/internal" "code.gitea.io/gitea/modules/indexer/code/internal"
indexer_internal "code.gitea.io/gitea/modules/indexer/internal" es "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
"code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting" "code.gitea.io/gitea/modules/setting"
@@ -28,23 +27,15 @@ import (
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/go-enry/go-enry/v2" "github.com/go-enry/go-enry/v2"
"github.com/olivere/elastic/v7"
) )
const ( const esRepoIndexerLatestVersion = 3
esRepoIndexerLatestVersion = 3
// multi-match-types, currently only 2 types are used
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
esMultiMatchTypeBestFields = "best_fields"
esMultiMatchTypePhrasePrefix = "phrase_prefix"
)
var _ internal.Indexer = &Indexer{} var _ internal.Indexer = &Indexer{}
// Indexer implements Indexer interface // Indexer implements Indexer interface
type Indexer struct { type Indexer struct {
inner *inner_elasticsearch.Indexer *es.Indexer
indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much
} }
func (b *Indexer) SupportedSearchModes() []indexer.SearchMode { func (b *Indexer) SupportedSearchModes() []indexer.SearchMode {
@@ -53,12 +44,7 @@ func (b *Indexer) SupportedSearchModes() []indexer.SearchMode {
// NewIndexer creates a new elasticsearch indexer // NewIndexer creates a new elasticsearch indexer
func NewIndexer(url, indexerName string) *Indexer { func NewIndexer(url, indexerName string) *Indexer {
inner := inner_elasticsearch.NewIndexer(url, indexerName, esRepoIndexerLatestVersion, defaultMapping) return &Indexer{Indexer: es.NewIndexer(url, indexerName, esRepoIndexerLatestVersion, defaultMapping)}
indexer := &Indexer{
inner: inner,
Indexer: inner,
}
return indexer
} }
const ( const (
@@ -138,7 +124,7 @@ const (
}` }`
) )
func (b *Indexer) addUpdate(ctx context.Context, catFileBatch git.CatFileBatch, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]elastic.BulkableRequest, error) { func (b *Indexer) addUpdate(ctx context.Context, catFileBatch git.CatFileBatch, sha string, update internal.FileUpdate, repo *repo_model.Repository) ([]es.BulkOp, error) {
// Ignore vendored files in code search // Ignore vendored files in code search
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) { if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
return nil, nil return nil, nil
@@ -157,8 +143,9 @@ func (b *Indexer) addUpdate(ctx context.Context, catFileBatch git.CatFileBatch,
} }
} }
id := internal.FilenameIndexerID(repo.ID, update.Filename)
if size > setting.Indexer.MaxIndexerFileSize { if size > setting.Indexer.MaxIndexerFileSize {
return []elastic.BulkableRequest{b.addDelete(update.Filename, repo)}, nil return []es.BulkOp{es.DeleteOp(id)}, nil
} }
info, batchReader, err := catFileBatch.QueryContent(update.BlobSha) info, batchReader, err := catFileBatch.QueryContent(update.BlobSha)
@@ -177,33 +164,24 @@ func (b *Indexer) addUpdate(ctx context.Context, catFileBatch git.CatFileBatch,
if _, err = batchReader.Discard(1); err != nil { if _, err = batchReader.Discard(1); err != nil {
return nil, err return nil, err
} }
id := internal.FilenameIndexerID(repo.ID, update.Filename)
return []elastic.BulkableRequest{ return []es.BulkOp{es.IndexOp(id, map[string]any{
elastic.NewBulkIndexRequest(). "repo_id": repo.ID,
Index(b.inner.VersionedIndexName()). "filename": update.Filename,
Id(id). "content": string(charset.ToUTF8DropErrors(fileContents)),
Doc(map[string]any{ "commit_id": sha,
"repo_id": repo.ID, "language": analyze.GetCodeLanguage(update.Filename, fileContents),
"filename": update.Filename, "updated_at": timeutil.TimeStampNow(),
"content": string(charset.ToUTF8DropErrors(fileContents)), })}, nil
"commit_id": sha,
"language": analyze.GetCodeLanguage(update.Filename, fileContents),
"updated_at": timeutil.TimeStampNow(),
}),
}, nil
} }
func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) elastic.BulkableRequest { func (b *Indexer) addDelete(filename string, repo *repo_model.Repository) es.BulkOp {
id := internal.FilenameIndexerID(repo.ID, filename) return es.DeleteOp(internal.FilenameIndexerID(repo.ID, filename))
return elastic.NewBulkDeleteRequest().
Index(b.inner.VersionedIndexName()).
Id(id)
} }
// Index will save the index data // Index will save the index data
func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error { func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha string, changes *internal.RepoChanges) error {
reqs := make([]elastic.BulkableRequest, 0) ops := make([]es.BulkOp, 0)
if len(changes.Updates) > 0 { if len(changes.Updates) > 0 {
batch, err := gitrepo.NewBatch(ctx, repo) batch, err := gitrepo.NewBatch(ctx, repo)
if err != nil { if err != nil {
@@ -212,29 +190,25 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st
defer batch.Close() defer batch.Close()
for _, update := range changes.Updates { for _, update := range changes.Updates {
updateReqs, err := b.addUpdate(ctx, batch, sha, update, repo) updateOps, err := b.addUpdate(ctx, batch, sha, update, repo)
if err != nil { if err != nil {
return err return err
} }
if len(updateReqs) > 0 { if len(updateOps) > 0 {
reqs = append(reqs, updateReqs...) ops = append(ops, updateOps...)
} }
} }
} }
for _, filename := range changes.RemovedFilenames { for _, filename := range changes.RemovedFilenames {
reqs = append(reqs, b.addDelete(filename, repo)) ops = append(ops, b.addDelete(filename, repo))
} }
if len(reqs) > 0 { if len(ops) > 0 {
esBatchSize := 50 esBatchSize := 50
for i := 0; i < len(reqs); i += esBatchSize { for i := 0; i < len(ops); i += esBatchSize {
_, err := b.inner.Client.Bulk(). if err := b.Bulk(ctx, ops[i:min(i+esBatchSize, len(ops))]); err != nil {
Index(b.inner.VersionedIndexName()).
Add(reqs[i:min(i+esBatchSize, len(reqs))]...).
Do(ctx)
if err != nil {
return err return err
} }
} }
@@ -246,33 +220,21 @@ func (b *Indexer) Index(ctx context.Context, repo *repo_model.Repository, sha st
func (b *Indexer) Delete(ctx context.Context, repoID int64) error { func (b *Indexer) Delete(ctx context.Context, repoID int64) error {
if err := b.doDelete(ctx, repoID); err != nil { if err := b.doDelete(ctx, repoID); err != nil {
// Maybe there is a conflict during the delete operation, so we should retry after a refresh // Maybe there is a conflict during the delete operation, so we should retry after a refresh
log.Warn("Deletion of entries of repo %v within index %v was erroneous. Trying to refresh index before trying again", repoID, b.inner.VersionedIndexName(), err) log.Warn("Deletion of entries of repo %v within index %v was erroneous: %v. Trying to refresh index before trying again", repoID, b.VersionedIndexName(), err)
if err := b.refreshIndex(ctx); err != nil { if err := b.Refresh(ctx); err != nil {
return err return err
} }
if err := b.doDelete(ctx, repoID); err != nil { if err := b.doDelete(ctx, repoID); err != nil {
log.Error("Could not delete entries of repo %v within index %v", repoID, b.inner.VersionedIndexName()) log.Error("Could not delete entries of repo %v within index %v", repoID, b.VersionedIndexName())
return err return err
} }
} }
return nil return nil
} }
func (b *Indexer) refreshIndex(ctx context.Context) error {
if _, err := b.inner.Client.Refresh(b.inner.VersionedIndexName()).Do(ctx); err != nil {
log.Error("Error while trying to refresh index %v", b.inner.VersionedIndexName(), err)
return err
}
return nil
}
// Delete entries by repoId // Delete entries by repoId
func (b *Indexer) doDelete(ctx context.Context, repoID int64) error { func (b *Indexer) doDelete(ctx context.Context, repoID int64) error {
_, err := b.inner.Client.DeleteByQuery(b.inner.VersionedIndexName()). return b.DeleteByQuery(ctx, es.TermsQuery("repo_id", repoID))
Query(elastic.NewTermsQuery("repo_id", repoID)).
Do(ctx)
return err
} }
// contentMatchIndexPos find words positions for start and the following end on content. It will // contentMatchIndexPos find words positions for start and the following end on content. It will
@@ -291,10 +253,10 @@ func contentMatchIndexPos(content, start, end string) (int, int) {
return startIdx, (startIdx + len(start) + endIdx + len(end)) - 9 // remove the length <em></em> since we give Content the original data return startIdx, (startIdx + len(start) + endIdx + len(end)) - 9 // remove the length <em></em> since we give Content the original data
} }
func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { func convertResult(searchResult *es.SearchResponse, kw string, pageSize int) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) {
hits := make([]*internal.SearchResult, 0, pageSize) hits := make([]*internal.SearchResult, 0, pageSize)
for _, hit := range searchResult.Hits.Hits { for _, hit := range searchResult.Hits {
repoID, fileName := internal.ParseIndexerID(hit.Id) repoID, fileName := internal.ParseIndexerID(hit.ID)
res := make(map[string]any) res := make(map[string]any)
if err := json.Unmarshal(hit.Source, &res); err != nil { if err := json.Unmarshal(hit.Source, &res); err != nil {
return 0, nil, nil, err return 0, nil, nil, err
@@ -333,111 +295,111 @@ func convertResult(searchResult *elastic.SearchResult, kw string, pageSize int)
}) })
} }
return searchResult.TotalHits(), hits, extractAggs(searchResult), nil return searchResult.Total, hits, extractAggs(searchResult), nil
} }
func extractAggs(searchResult *elastic.SearchResult) []*internal.SearchResultLanguages { func extractAggs(searchResult *es.SearchResponse) []*internal.SearchResultLanguages {
var searchResultLanguages []*internal.SearchResultLanguages buckets, found := searchResult.Aggregations["language"]
agg, found := searchResult.Aggregations.Terms("language") if !found {
if found { return nil
searchResultLanguages = make([]*internal.SearchResultLanguages, 0, 10) }
searchResultLanguages := make([]*internal.SearchResultLanguages, 0, 10)
for _, bucket := range agg.Buckets { for _, bucket := range buckets {
searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{ // language is mapped as keyword so the key is always a string; if the
Language: bucket.Key.(string), // mapping ever changes, skip rather than emit an empty-language bucket.
Color: enry.GetColor(bucket.Key.(string)), key, ok := bucket.Key.(string)
Count: int(bucket.DocCount), if !ok {
}) continue
} }
searchResultLanguages = append(searchResultLanguages, &internal.SearchResultLanguages{
Language: key,
Color: enry.GetColor(key),
Count: int(bucket.DocCount),
})
} }
return searchResultLanguages return searchResultLanguages
} }
// Search searches for codes and language stats by given conditions. // Search searches for codes and language stats by given conditions.
func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) { func (b *Indexer) Search(ctx context.Context, opts *internal.SearchOptions) (int64, []*internal.SearchResult, []*internal.SearchResultLanguages, error) {
var contentQuery elastic.Query
searchMode := util.IfZero(opts.SearchMode, b.SupportedSearchModes()[0].ModeValue) searchMode := util.IfZero(opts.SearchMode, b.SupportedSearchModes()[0].ModeValue)
contentQuery := es.Query(es.NewMultiMatchQuery(opts.Keyword, "content").Type(es.MultiMatchTypeBestFields).Operator("and"))
if searchMode == indexer.SearchModeExact { if searchMode == indexer.SearchModeExact {
// 1.21 used NewMultiMatchQuery().Type(esMultiMatchTypePhrasePrefix), but later releases changed to NewMatchPhraseQuery contentQuery = es.MatchPhraseQuery("content", opts.Keyword)
contentQuery = elastic.NewMatchPhraseQuery("content", opts.Keyword)
} else /* words */ {
contentQuery = elastic.NewMultiMatchQuery("content", opts.Keyword).Type(esMultiMatchTypeBestFields).Operator("and")
} }
kwQuery := elastic.NewBoolQuery().Should( kwQuery := es.NewBoolQuery().Should(
contentQuery, contentQuery,
elastic.NewMultiMatchQuery(opts.Keyword, "filename^10").Type(esMultiMatchTypePhrasePrefix), es.NewMultiMatchQuery(opts.Keyword, "filename^10").Type(es.MultiMatchTypePhrasePrefix),
) )
query := elastic.NewBoolQuery() query := es.NewBoolQuery().Must(kwQuery)
query = query.Must(kwQuery)
if len(opts.RepoIDs) > 0 { if len(opts.RepoIDs) > 0 {
repoStrs := make([]any, 0, len(opts.RepoIDs)) query.Must(es.TermsQuery("repo_id", es.ToAnySlice(opts.RepoIDs)...))
for _, repoID := range opts.RepoIDs {
repoStrs = append(repoStrs, repoID)
}
repoQuery := elastic.NewTermsQuery("repo_id", repoStrs...)
query = query.Must(repoQuery)
} }
var ( start, pageSize := opts.GetSkipTake()
start, pageSize = opts.GetSkipTake() kw := "<em>" + opts.Keyword + "</em>"
kw = "<em>" + opts.Keyword + "</em>" languageAggs := map[string]any{
aggregation = elastic.NewTermsAggregation().Field("language").Size(10).OrderByCountDesc() "language": map[string]any{
) "terms": map[string]any{
"field": "language",
"size": 10,
"order": map[string]any{"_count": "desc"},
},
},
}
// number_of_fragments=0 returns the full highlighted content (no fragmentation).
highlight := map[string]any{
"fields": map[string]any{
"content": map[string]any{},
"filename": map[string]any{},
},
"number_of_fragments": 0,
"type": "fvh",
}
sort := []es.SortField{
{Field: "_score", Desc: true},
{Field: "updated_at", Desc: false},
}
if len(opts.Language) == 0 { if len(opts.Language) == 0 {
searchResult, err := b.inner.Client.Search(). resp, err := b.Indexer.Search(ctx, es.SearchRequest{
Index(b.inner.VersionedIndexName()). Query: query,
Aggregation("language", aggregation). Sort: sort,
Query(query). From: start,
Highlight( Size: pageSize,
elastic.NewHighlight(). TrackTotal: true,
Field("content"). Aggregations: languageAggs,
Field("filename"). Highlight: highlight,
NumOfFragments(0). // return all highlighting content on fragments })
HighlighterType("fvh"),
).
Sort("_score", false).
Sort("updated_at", true).
From(start).Size(pageSize).
Do(ctx)
if err != nil { if err != nil {
return 0, nil, nil, err return 0, nil, nil, err
} }
return convertResult(resp, kw, pageSize)
return convertResult(searchResult, kw, pageSize)
} }
langQuery := elastic.NewMatchQuery("language", opts.Language) countResp, err := b.Indexer.Search(ctx, es.SearchRequest{
countResult, err := b.inner.Client.Search(). Query: query,
Index(b.inner.VersionedIndexName()). Size: 0, // stats only
Aggregation("language", aggregation). TrackTotal: true,
Query(query). Aggregations: languageAggs,
Size(0). // We only need stats information })
Do(ctx)
if err != nil { if err != nil {
return 0, nil, nil, err return 0, nil, nil, err
} }
query = query.Must(langQuery) query.Must(es.MatchQuery("language", opts.Language))
searchResult, err := b.inner.Client.Search(). resp, err := b.Indexer.Search(ctx, es.SearchRequest{
Index(b.inner.VersionedIndexName()). Query: query,
Query(query). Sort: sort,
Highlight( From: start,
elastic.NewHighlight(). Size: pageSize,
Field("content"). TrackTotal: true,
Field("filename"). Highlight: highlight,
NumOfFragments(0). // return all highlighting content on fragments })
HighlighterType("fvh"),
).
Sort("_score", false).
Sort("updated_at", true).
From(start).Size(pageSize).
Do(ctx)
if err != nil { if err != nil {
return 0, nil, nil, err return 0, nil, nil, err
} }
total, hits, _, err := convertResult(searchResult, kw, pageSize) total, hits, _, err := convertResult(resp, kw, pageSize)
return total, hits, extractAggs(countResp), err
return total, hits, extractAggs(countResult), err
} }

View File

@@ -8,6 +8,7 @@ import (
"os" "os"
"slices" "slices"
"testing" "testing"
"time"
"code.gitea.io/gitea/models/db" "code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/unittest" "code.gitea.io/gitea/models/unittest"
@@ -39,6 +40,16 @@ func TestMain(m *testing.M) {
func testIndexer(name string, t *testing.T, indexer internal.Indexer) { func testIndexer(name string, t *testing.T, indexer internal.Indexer) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
assert.NoError(t, setupRepositoryIndexes(t.Context(), indexer)) assert.NoError(t, setupRepositoryIndexes(t.Context(), indexer))
// Wait for the index to catch up: ES/OpenSearch make writes visible
// only after a refresh (default interval: 1s). Bleve is synchronous
// and passes on the first iteration.
require.Eventually(t, func() bool {
total, _, _, err := indexer.Search(t.Context(), &internal.SearchOptions{
Keyword: "Description",
Paginator: &db.ListOptions{Page: 1, PageSize: 1},
})
return err == nil && total > 0
}, 10*time.Second, 100*time.Millisecond, "index did not become searchable")
keywords := []struct { keywords := []struct {
RepoIDs []int64 RepoIDs []int64

View File

@@ -4,52 +4,80 @@
package elasticsearch package elasticsearch
import ( import (
"bytes"
"context" "context"
"errors"
"fmt" "fmt"
"io"
"net"
"net/http"
"net/url"
"slices"
"strconv"
"strings"
"time"
"code.gitea.io/gitea/modules/indexer/internal" "code.gitea.io/gitea/modules/indexer/internal"
"code.gitea.io/gitea/modules/json"
"github.com/olivere/elastic/v7"
) )
var _ internal.Indexer = &Indexer{} var _ internal.Indexer = &Indexer{}
// Indexer represents a basic elasticsearch indexer implementation // Indexer is a narrow wrapper around an Elasticsearch/OpenSearch cluster.
// It targets the REST subset shared by Elasticsearch 7/8/9 and OpenSearch 3.
type Indexer struct { type Indexer struct {
Client *elastic.Client client *http.Client
base string // base URL with trailing slash, no userinfo
user string
pass string
url string
indexName string indexName string
version int version int
mapping string mapping string
} }
func NewIndexer(url, indexName string, version int, mapping string) *Indexer { // NewIndexer builds an Indexer. The connection is opened by Init.
func NewIndexer(rawURL, indexName string, version int, mapping string) *Indexer {
return &Indexer{ return &Indexer{
url: url, base: rawURL,
indexName: indexName, indexName: indexName,
version: version, version: version,
mapping: mapping, mapping: mapping,
} }
} }
// Init initializes the indexer // Init connects and creates the versioned index if missing, returning true if it already existed.
func (i *Indexer) Init(ctx context.Context) (bool, error) { func (i *Indexer) Init(ctx context.Context) (bool, error) {
if i == nil { parsed, err := url.Parse(i.base)
return false, errors.New("cannot init nil indexer")
}
if i.Client != nil {
return false, errors.New("indexer is already initialized")
}
client, err := i.initClient()
if err != nil { if err != nil {
return false, err return false, fmt.Errorf("parse elasticsearch url: %w", err)
}
if parsed.User != nil {
i.user = parsed.User.Username()
i.pass, _ = parsed.User.Password()
parsed.User = nil
}
base := parsed.String()
if !strings.HasSuffix(base, "/") {
base += "/"
}
i.base = base
// No client-level Timeout: bulk/_delete_by_query can legitimately run for
// minutes on large repos. Per-request deadlines come from the caller's ctx;
// transport-level timeouts cover stalled connects/handshakes/headers so a
// half-open server cannot wedge the indexer indefinitely.
i.client = &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: 30 * time.Second, KeepAlive: 30 * time.Second}).DialContext,
TLSHandshakeTimeout: 10 * time.Second,
ResponseHeaderTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
IdleConnTimeout: 90 * time.Second,
MaxIdleConns: 100,
},
} }
i.Client = client
exists, err := i.Client.IndexExists(i.VersionedIndexName()).Do(ctx) exists, err := i.indexExists(ctx, i.VersionedIndexName())
if err != nil { if err != nil {
return false, err return false, err
} }
@@ -61,34 +89,321 @@ func (i *Indexer) Init(ctx context.Context) (bool, error) {
return false, err return false, err
} }
return exists, nil return false, nil
} }
// Ping checks if the indexer is available // Ping returns an error when the cluster is unusable (status != green/yellow).
func (i *Indexer) Ping(ctx context.Context) error { func (i *Indexer) Ping(ctx context.Context) error {
if i == nil { var body struct {
return errors.New("cannot ping nil indexer") Status string `json:"status"`
} }
if i.Client == nil { if err := i.doJSON(ctx, http.MethodGet, "_cluster/health", nil, &body); err != nil {
return errors.New("indexer is not initialized")
}
resp, err := i.Client.ClusterHealth().Do(ctx)
if err != nil {
return err return err
} }
if resp.Status != "green" && resp.Status != "yellow" { // Healthy = green; usable = yellow. Red is unusable.
// It's healthy if the status is green, and it's available if the status is yellow, // https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html
// see https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html if body.Status != "green" && body.Status != "yellow" {
return fmt.Errorf("status of elasticsearch cluster is %s", resp.Status) return fmt.Errorf("status of elasticsearch cluster is %s", body.Status)
} }
return nil return nil
} }
// Close closes the indexer // Close releases idle HTTP connections held by the client.
func (i *Indexer) Close() { func (i *Indexer) Close() {
if i == nil { if i == nil || i.client == nil {
return return
} }
i.Client = nil i.client.CloseIdleConnections()
i.client = nil
}
// Bulk submits index/delete ops. Returns the first item-level failure, if any.
func (i *Indexer) Bulk(ctx context.Context, ops []BulkOp) error {
if len(ops) == 0 {
return nil
}
index := i.VersionedIndexName()
var buf bytes.Buffer
buf.Grow(len(ops) * 256)
for _, op := range ops {
meta := map[string]any{op.action: map[string]any{"_index": index, "_id": op.id}}
if err := writeJSONLine(&buf, meta); err != nil {
return err
}
if op.action == bulkActionIndex {
if err := writeJSONLine(&buf, op.doc); err != nil {
return err
}
}
}
res, err := i.do(ctx, http.MethodPost, urlPath(index, "_bulk"), "application/x-ndjson", bytes.NewReader(buf.Bytes()))
if err != nil {
return err
}
defer drainAndClose(res)
var body struct {
Errors bool `json:"errors"`
Items []map[string]struct {
Status int `json:"status"`
Error json.Value `json:"error"`
} `json:"items"`
}
if err := json.NewDecoder(res.Body).Decode(&body); err != nil {
return err
}
if !body.Errors {
return nil
}
return firstBulkError(body.Items)
}
// firstBulkError returns the first item-level failure in a bulk response.
// Each items entry is a single-key map ({"index": {...}} or {"delete": {...}}).
// Delete-of-missing (404) is idempotent and not reported.
func firstBulkError(items []map[string]struct {
Status int `json:"status"`
Error json.Value `json:"error"`
},
) error {
for _, item := range items {
for action, result := range item {
if action == bulkActionDelete && result.Status == http.StatusNotFound {
continue
}
if result.Status >= 300 {
return fmt.Errorf("bulk %s failed (status %d): %s", action, result.Status, string(result.Error))
}
}
}
return nil
}
// Index writes a single document.
func (i *Indexer) Index(ctx context.Context, id string, doc any) error {
body, err := json.Marshal(doc)
if err != nil {
return err
}
return i.doJSON(ctx, http.MethodPut, urlPath(i.VersionedIndexName(), "_doc", id), bytes.NewReader(body), nil)
}
// Delete removes a single document by id. Missing ids are not an error.
func (i *Indexer) Delete(ctx context.Context, id string) error {
res, err := i.do(ctx, http.MethodDelete, urlPath(i.VersionedIndexName(), "_doc", id), "", nil, http.StatusNotFound)
if err != nil {
return err
}
drainAndClose(res)
return nil
}
// DeleteByQuery removes every document matching the query.
func (i *Indexer) DeleteByQuery(ctx context.Context, query Query) error {
body, err := json.Marshal(map[string]any{"query": query.querySource()})
if err != nil {
return err
}
return i.doJSON(ctx, http.MethodPost, urlPath(i.VersionedIndexName(), "_delete_by_query"), bytes.NewReader(body), nil)
}
// Refresh forces a refresh so recent writes are searchable.
func (i *Indexer) Refresh(ctx context.Context) error {
return i.doJSON(ctx, http.MethodPost, urlPath(i.VersionedIndexName(), "_refresh"), nil, nil)
}
// Search runs a search request and decodes the reply.
func (i *Indexer) Search(ctx context.Context, req SearchRequest) (*SearchResponse, error) {
body := map[string]any{}
if req.Query != nil {
body["query"] = req.Query.querySource()
}
if len(req.Sort) > 0 {
sorts := make([]map[string]any, len(req.Sort))
for idx, s := range req.Sort {
sorts[idx] = s.source()
}
body["sort"] = sorts
}
if req.From > 0 {
body["from"] = req.From
}
body["size"] = req.Size
if len(req.Aggregations) > 0 {
body["aggs"] = req.Aggregations
}
if len(req.Highlight) > 0 {
body["highlight"] = req.Highlight
}
payload, err := json.Marshal(body)
if err != nil {
return nil, err
}
// Default track_total_hits is 10000 (capped count); send it explicitly so
// callers can choose between exact totals (true) and skipping counting (false).
path := urlPath(i.VersionedIndexName(), "_search") + "?track_total_hits=" + strconv.FormatBool(req.TrackTotal)
res, err := i.do(ctx, http.MethodPost, path, "application/json", bytes.NewReader(payload))
if err != nil {
return nil, err
}
defer drainAndClose(res)
return decodeSearchResponse(res.Body)
}
func (i *Indexer) indexExists(ctx context.Context, name string) (bool, error) {
res, err := i.do(ctx, http.MethodHead, urlPath(name), "", nil, http.StatusNotFound)
if err != nil {
return false, err
}
drainAndClose(res)
return res.StatusCode == http.StatusOK, nil
}
func (i *Indexer) createIndex(ctx context.Context) error {
var body struct {
Acknowledged bool `json:"acknowledged"`
}
if err := i.doJSON(ctx, http.MethodPut, urlPath(i.VersionedIndexName()), bytes.NewBufferString(i.mapping), &body); err != nil {
return fmt.Errorf("create index %s: %w", i.VersionedIndexName(), err)
}
if !body.Acknowledged {
return fmt.Errorf("create index %s not acknowledged", i.VersionedIndexName())
}
i.checkOldIndexes(ctx)
return nil
}
// do sends a request and returns the response. Status >= 300 is turned into
// an error unless the status appears in okStatus. The caller closes Body.
func (i *Indexer) do(ctx context.Context, method, path, contentType string, body io.Reader, okStatus ...int) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, i.base+path, body)
if err != nil {
return nil, err
}
if contentType != "" {
req.Header.Set("Content-Type", contentType)
}
if i.user != "" || i.pass != "" {
req.SetBasicAuth(i.user, i.pass)
}
res, err := i.client.Do(req)
if err != nil {
return nil, err
}
if res.StatusCode >= 300 && !slices.Contains(okStatus, res.StatusCode) {
msg := readErrBody(res)
res.Body.Close()
return nil, fmt.Errorf("%s %s: %s", method, path, msg)
}
return res, nil
}
// doJSON sends a request with a JSON body and, when out is non-nil, decodes
// the JSON response into it.
func (i *Indexer) doJSON(ctx context.Context, method, path string, body io.Reader, out any) error {
contentType := ""
if body != nil {
contentType = "application/json"
}
res, err := i.do(ctx, method, path, contentType, body)
if err != nil {
return err
}
defer drainAndClose(res)
if out == nil {
return nil
}
return json.NewDecoder(res.Body).Decode(out)
}
// drainAndClose discards any unread response body before closing so the
// underlying TCP connection can be reused for keep-alive.
func drainAndClose(res *http.Response) {
_, _ = io.Copy(io.Discard, res.Body)
res.Body.Close()
}
func writeJSONLine(buf *bytes.Buffer, v any) error {
enc, err := json.Marshal(v)
if err != nil {
return err
}
buf.Write(enc)
buf.WriteByte('\n')
return nil
}
// readErrBody reads up to 4 KiB of an error response and drains the rest so
// the underlying connection can be reused (keep-alive needs Body fully read).
func readErrBody(res *http.Response) string {
const limit = 4 << 10
b, _ := io.ReadAll(io.LimitReader(res.Body, limit))
_, _ = io.Copy(io.Discard, res.Body)
return fmt.Sprintf("status %d: %s", res.StatusCode, bytes.TrimSpace(b))
}
func decodeSearchResponse(r io.Reader) (*SearchResponse, error) {
var raw struct {
Hits struct {
Total struct {
Value int64 `json:"value"`
} `json:"total"`
Hits []struct {
ID string `json:"_id"`
Score float64 `json:"_score"`
Source json.Value `json:"_source"`
Highlight map[string][]string `json:"highlight"`
} `json:"hits"`
} `json:"hits"`
Aggregations map[string]struct {
Buckets []struct {
Key any `json:"key"`
DocCount int64 `json:"doc_count"`
} `json:"buckets"`
} `json:"aggregations"`
}
if err := json.NewDecoder(r).Decode(&raw); err != nil {
return nil, err
}
resp := &SearchResponse{
Total: raw.Hits.Total.Value,
Hits: make([]SearchHit, 0, len(raw.Hits.Hits)),
}
for _, h := range raw.Hits.Hits {
resp.Hits = append(resp.Hits, SearchHit{
ID: h.ID,
Score: h.Score,
Source: h.Source,
Highlight: h.Highlight,
})
}
if len(raw.Aggregations) > 0 {
resp.Aggregations = make(map[string][]AggBucket, len(raw.Aggregations))
for name, agg := range raw.Aggregations {
buckets := make([]AggBucket, len(agg.Buckets))
for idx, b := range agg.Buckets {
buckets[idx] = AggBucket{Key: b.Key, DocCount: b.DocCount}
}
resp.Aggregations[name] = buckets
}
}
return resp, nil
}
// urlPath joins path segments with `/` and percent-escapes each.
func urlPath(segments ...string) string {
var b bytes.Buffer
for idx, s := range segments {
if idx > 0 {
b.WriteByte('/')
}
b.WriteString(url.PathEscape(s))
}
return b.String()
} }

View File

@@ -0,0 +1,44 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package elasticsearch
import (
"os"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func newRealIndexer(t *testing.T) *Indexer {
t.Helper()
url := "http://elasticsearch:9200"
if os.Getenv("CI") == "" {
url = os.Getenv("TEST_ELASTICSEARCH_URL")
if url == "" {
t.Skip("TEST_ELASTICSEARCH_URL not set and not running in CI")
}
}
indexName := "gitea_test_" + strings.ReplaceAll(strings.ToLower(t.Name()), "/", "_")
ix := NewIndexer(url, indexName, 1, `{"mappings":{"properties":{"x":{"type":"keyword"}}}}`)
_, err := ix.Init(t.Context())
require.NoError(t, err)
t.Cleanup(ix.Close)
return ix
}
func TestPing(t *testing.T) {
ix := newRealIndexer(t)
require.NoError(t, ix.Ping(t.Context()))
}
func TestDeleteSwallows404(t *testing.T) {
ix := newRealIndexer(t)
require.NoError(t, ix.Delete(t.Context(), "missing-id"))
}
func TestBulkAcceptsDelete404(t *testing.T) {
ix := newRealIndexer(t)
require.NoError(t, ix.Bulk(t.Context(), []BulkOp{DeleteOp("missing-id")}))
}

View File

@@ -0,0 +1,132 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package elasticsearch
// MultiMatch types used by the call sites. See
// https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-multi-match-query.html#multi-match-types
const (
MultiMatchTypeBestFields = "best_fields"
MultiMatchTypePhrasePrefix = "phrase_prefix"
)
// ToAnySlice converts []T to []any for variadic query args like TermsQuery.
func ToAnySlice[T any](s []T) []any {
out := make([]any, len(s))
for idx, v := range s {
out[idx] = v
}
return out
}
// Query is an Elasticsearch query DSL node. It marshals to the JSON
// object expected by the ES query API.
type Query interface {
querySource() map[string]any
}
type rawQuery map[string]any
func (q rawQuery) querySource() map[string]any { return q }
// TermQuery matches documents whose `field` exactly equals `value`.
func TermQuery(field string, value any) Query {
return rawQuery{"term": map[string]any{field: value}}
}
// TermsQuery matches documents whose `field` equals any of `values`.
func TermsQuery(field string, values ...any) Query {
return rawQuery{"terms": map[string]any{field: values}}
}
// MatchQuery is a full-text match on a single field.
func MatchQuery(field string, value any) Query {
return rawQuery{"match": map[string]any{field: value}}
}
// MatchPhraseQuery matches the exact phrase on `field`.
func MatchPhraseQuery(field, value string) Query {
return rawQuery{"match_phrase": map[string]any{field: value}}
}
// MultiMatchQuery is the fluent builder for a multi_match query.
type MultiMatchQuery struct {
query any
fields []string
typ string
operator string
}
// NewMultiMatchQuery creates a multi_match query over the given fields.
func NewMultiMatchQuery(query any, fields ...string) *MultiMatchQuery {
return &MultiMatchQuery{query: query, fields: fields}
}
func (m *MultiMatchQuery) Type(t string) *MultiMatchQuery { m.typ = t; return m }
func (m *MultiMatchQuery) Operator(op string) *MultiMatchQuery { m.operator = op; return m }
func (m *MultiMatchQuery) querySource() map[string]any {
body := map[string]any{"query": m.query}
if len(m.fields) > 0 {
body["fields"] = m.fields
}
if m.typ != "" {
body["type"] = m.typ
}
if m.operator != "" {
body["operator"] = m.operator
}
return map[string]any{"multi_match": body}
}
// RangeQuery is the fluent builder for a range query.
type RangeQuery struct {
field string
body map[string]any
}
func NewRangeQuery(field string) *RangeQuery {
return &RangeQuery{field: field, body: map[string]any{}}
}
func (r *RangeQuery) Gte(v any) *RangeQuery { r.body["gte"] = v; return r }
func (r *RangeQuery) Lte(v any) *RangeQuery { r.body["lte"] = v; return r }
func (r *RangeQuery) querySource() map[string]any {
return map[string]any{"range": map[string]any{r.field: r.body}}
}
// BoolQuery is the fluent builder for a bool query.
type BoolQuery struct {
must []Query
should []Query
mustNot []Query
}
func NewBoolQuery() *BoolQuery { return &BoolQuery{} }
func (b *BoolQuery) Must(q ...Query) *BoolQuery { b.must = append(b.must, q...); return b }
func (b *BoolQuery) Should(q ...Query) *BoolQuery { b.should = append(b.should, q...); return b }
func (b *BoolQuery) MustNot(q ...Query) *BoolQuery { b.mustNot = append(b.mustNot, q...); return b }
func (b *BoolQuery) querySource() map[string]any {
body := map[string]any{}
if len(b.must) > 0 {
body["must"] = querySlice(b.must)
}
if len(b.should) > 0 {
body["should"] = querySlice(b.should)
}
if len(b.mustNot) > 0 {
body["must_not"] = querySlice(b.mustNot)
}
return map[string]any{"bool": body}
}
func querySlice(queries []Query) []map[string]any {
out := make([]map[string]any, len(queries))
for idx, q := range queries {
out[idx] = q.querySource()
}
return out
}

View File

@@ -0,0 +1,76 @@
// Copyright 2026 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package elasticsearch
import "code.gitea.io/gitea/modules/json"
const (
bulkActionIndex = "index"
bulkActionDelete = "delete"
)
// BulkOp is a single write inside a Bulk call. Construct with IndexOp or DeleteOp.
type BulkOp struct {
action string
id string
doc any
}
// IndexOp builds a bulk index operation.
func IndexOp(id string, doc any) BulkOp {
return BulkOp{action: bulkActionIndex, id: id, doc: doc}
}
// DeleteOp builds a bulk delete operation.
func DeleteOp(id string) BulkOp {
return BulkOp{action: bulkActionDelete, id: id}
}
// SortField is one entry of the search sort array.
type SortField struct {
Field string
Desc bool
}
func (s SortField) source() map[string]any {
order := "asc"
if s.Desc {
order = "desc"
}
return map[string]any{s.Field: map[string]any{"order": order}}
}
// SearchRequest captures everything Gitea sends to the _search endpoint.
// Aggregations and Highlight are raw ES JSON bodies — callers write them as
// map[string]any since each has exactly one call site with a fixed shape.
type SearchRequest struct {
Query Query
Sort []SortField
From int
Size int
TrackTotal bool
Aggregations map[string]any
Highlight map[string]any
}
// SearchHit is a single result row.
type SearchHit struct {
ID string
Score float64
Source json.Value
Highlight map[string][]string
}
// AggBucket is a terms-aggregation bucket.
type AggBucket struct {
Key any
DocCount int64
}
// SearchResponse is Gitea's decoded view of the search reply.
type SearchResponse struct {
Total int64
Hits []SearchHit
Aggregations map[string][]AggBucket
}

View File

@@ -6,14 +6,11 @@ package elasticsearch
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/log"
"github.com/olivere/elastic/v7"
) )
// VersionedIndexName returns the full index name with version // VersionedIndexName returns the full index name with version suffix.
func (i *Indexer) VersionedIndexName() string { func (i *Indexer) VersionedIndexName() string {
return versionedIndexName(i.indexName, i.version) return versionedIndexName(i.indexName, i.version)
} }
@@ -26,41 +23,10 @@ func versionedIndexName(indexName string, version int) string {
return fmt.Sprintf("%s.v%d", indexName, version) return fmt.Sprintf("%s.v%d", indexName, version)
} }
func (i *Indexer) createIndex(ctx context.Context) error {
createIndex, err := i.Client.CreateIndex(i.VersionedIndexName()).BodyString(i.mapping).Do(ctx)
if err != nil {
return err
}
if !createIndex.Acknowledged {
return fmt.Errorf("create index %s with %s failed", i.VersionedIndexName(), i.mapping)
}
i.checkOldIndexes(ctx)
return nil
}
func (i *Indexer) initClient() (*elastic.Client, error) {
opts := []elastic.ClientOptionFunc{
elastic.SetURL(i.url),
elastic.SetSniff(false),
elastic.SetHealthcheckInterval(10 * time.Second),
elastic.SetGzip(false),
}
logger := log.GetLogger(log.DEFAULT)
opts = append(opts, elastic.SetTraceLog(&log.PrintfLogger{Logf: logger.Trace}))
opts = append(opts, elastic.SetInfoLog(&log.PrintfLogger{Logf: logger.Info}))
opts = append(opts, elastic.SetErrorLog(&log.PrintfLogger{Logf: logger.Error}))
return elastic.NewClient(opts...)
}
func (i *Indexer) checkOldIndexes(ctx context.Context) { func (i *Indexer) checkOldIndexes(ctx context.Context) {
for v := 0; v < i.version; v++ { for v := range i.version {
indexName := versionedIndexName(i.indexName, v) indexName := versionedIndexName(i.indexName, v)
exists, err := i.Client.IndexExists(indexName).Do(ctx) exists, err := i.indexExists(ctx, indexName)
if err == nil && exists { if err == nil && exists {
log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName) log.Warn("Found older elasticsearch index named %q, Gitea will keep the old NOT DELETED. You can delete the old version after the upgrade succeed.", indexName)
} }

View File

@@ -11,27 +11,18 @@ import (
"code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/indexer" "code.gitea.io/gitea/modules/indexer"
indexer_internal "code.gitea.io/gitea/modules/indexer/internal" indexer_internal "code.gitea.io/gitea/modules/indexer/internal"
inner_elasticsearch "code.gitea.io/gitea/modules/indexer/internal/elasticsearch" es "code.gitea.io/gitea/modules/indexer/internal/elasticsearch"
"code.gitea.io/gitea/modules/indexer/issues/internal" "code.gitea.io/gitea/modules/indexer/issues/internal"
"code.gitea.io/gitea/modules/util" "code.gitea.io/gitea/modules/util"
"github.com/olivere/elastic/v7"
) )
const ( const issueIndexerLatestVersion = 3
issueIndexerLatestVersion = 3
// multi-match-types, currently only 2 types are used
// Reference: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/query-dsl-multi-match-query.html#multi-match-types
esMultiMatchTypeBestFields = "best_fields"
esMultiMatchTypePhrasePrefix = "phrase_prefix"
)
var _ internal.Indexer = &Indexer{} var _ internal.Indexer = &Indexer{}
// Indexer implements Indexer interface // Indexer implements Indexer interface
type Indexer struct { type Indexer struct {
inner *inner_elasticsearch.Indexer *es.Indexer
indexer_internal.Indexer // do not composite inner_elasticsearch.Indexer directly to avoid exposing too much
} }
func (b *Indexer) SupportedSearchModes() []indexer.SearchMode { func (b *Indexer) SupportedSearchModes() []indexer.SearchMode {
@@ -41,12 +32,7 @@ func (b *Indexer) SupportedSearchModes() []indexer.SearchMode {
// NewIndexer creates a new elasticsearch indexer // NewIndexer creates a new elasticsearch indexer
func NewIndexer(url, indexerName string) *Indexer { func NewIndexer(url, indexerName string) *Indexer {
inner := inner_elasticsearch.NewIndexer(url, indexerName, issueIndexerLatestVersion, defaultMapping) return &Indexer{Indexer: es.NewIndexer(url, indexerName, issueIndexerLatestVersion, defaultMapping)}
indexer := &Indexer{
inner: inner,
Indexer: inner,
}
return indexer
} }
const ( const (
@@ -93,29 +79,14 @@ func (b *Indexer) Index(ctx context.Context, issues ...*internal.IndexerData) er
return nil return nil
} else if len(issues) == 1 { } else if len(issues) == 1 {
issue := issues[0] issue := issues[0]
_, err := b.inner.Client.Index(). return b.Indexer.Index(ctx, strconv.FormatInt(issue.ID, 10), issue)
Index(b.inner.VersionedIndexName()).
Id(strconv.FormatInt(issue.ID, 10)).
BodyJson(issue).
Do(ctx)
return err
} }
reqs := make([]elastic.BulkableRequest, 0) ops := make([]es.BulkOp, 0, len(issues))
for _, issue := range issues { for _, issue := range issues {
reqs = append(reqs, ops = append(ops, es.IndexOp(strconv.FormatInt(issue.ID, 10), issue))
elastic.NewBulkIndexRequest().
Index(b.inner.VersionedIndexName()).
Id(strconv.FormatInt(issue.ID, 10)).
Doc(issue),
)
} }
return b.Bulk(graceful.GetManager().HammerContext(), ops)
_, err := b.inner.Client.Bulk().
Index(b.inner.VersionedIndexName()).
Add(reqs...).
Do(graceful.GetManager().HammerContext())
return err
} }
// Delete deletes indexes by ids // Delete deletes indexes by ids
@@ -123,129 +94,116 @@ func (b *Indexer) Delete(ctx context.Context, ids ...int64) error {
if len(ids) == 0 { if len(ids) == 0 {
return nil return nil
} else if len(ids) == 1 { } else if len(ids) == 1 {
_, err := b.inner.Client.Delete(). return b.Indexer.Delete(ctx, strconv.FormatInt(ids[0], 10))
Index(b.inner.VersionedIndexName()).
Id(strconv.FormatInt(ids[0], 10)).
Do(ctx)
return err
} }
reqs := make([]elastic.BulkableRequest, 0) ops := make([]es.BulkOp, 0, len(ids))
for _, id := range ids { for _, id := range ids {
reqs = append(reqs, ops = append(ops, es.DeleteOp(strconv.FormatInt(id, 10)))
elastic.NewBulkDeleteRequest().
Index(b.inner.VersionedIndexName()).
Id(strconv.FormatInt(id, 10)),
)
} }
return b.Bulk(graceful.GetManager().HammerContext(), ops)
_, err := b.inner.Client.Bulk().
Index(b.inner.VersionedIndexName()).
Add(reqs...).
Do(graceful.GetManager().HammerContext())
return err
} }
// Search searches for issues by given conditions. // Search searches for issues by given conditions.
// Returns the matching issue IDs // Returns the matching issue IDs
func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) { func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (*internal.SearchResult, error) {
query := elastic.NewBoolQuery() query := es.NewBoolQuery()
if options.Keyword != "" { if options.Keyword != "" {
searchMode := util.IfZero(options.SearchMode, b.SupportedSearchModes()[0].ModeValue) searchMode := util.IfZero(options.SearchMode, b.SupportedSearchModes()[0].ModeValue)
mm := es.NewMultiMatchQuery(options.Keyword, "title", "content", "comments")
if searchMode == indexer.SearchModeExact { if searchMode == indexer.SearchModeExact {
query.Must(elastic.NewMultiMatchQuery(options.Keyword, "title", "content", "comments").Type(esMultiMatchTypePhrasePrefix)) mm = mm.Type(es.MultiMatchTypePhrasePrefix)
} else /* words */ { } else {
query.Must(elastic.NewMultiMatchQuery(options.Keyword, "title", "content", "comments").Type(esMultiMatchTypeBestFields).Operator("and")) mm = mm.Type(es.MultiMatchTypeBestFields).Operator("and")
} }
query.Must(mm)
} }
if len(options.RepoIDs) > 0 { if len(options.RepoIDs) > 0 {
q := elastic.NewBoolQuery() q := es.NewBoolQuery()
q.Should(elastic.NewTermsQuery("repo_id", toAnySlice(options.RepoIDs)...)) q.Should(es.TermsQuery("repo_id", es.ToAnySlice(options.RepoIDs)...))
if options.AllPublic { if options.AllPublic {
q.Should(elastic.NewTermQuery("is_public", true)) q.Should(es.TermQuery("is_public", true))
} }
query.Must(q) query.Must(q)
} }
if options.IsPull.Has() { if options.IsPull.Has() {
query.Must(elastic.NewTermQuery("is_pull", options.IsPull.Value())) query.Must(es.TermQuery("is_pull", options.IsPull.Value()))
} }
if options.IsClosed.Has() { if options.IsClosed.Has() {
query.Must(elastic.NewTermQuery("is_closed", options.IsClosed.Value())) query.Must(es.TermQuery("is_closed", options.IsClosed.Value()))
} }
if options.IsArchived.Has() { if options.IsArchived.Has() {
query.Must(elastic.NewTermQuery("is_archived", options.IsArchived.Value())) query.Must(es.TermQuery("is_archived", options.IsArchived.Value()))
} }
if options.NoLabelOnly { if options.NoLabelOnly {
query.Must(elastic.NewTermQuery("no_label", true)) query.Must(es.TermQuery("no_label", true))
} else { } else {
if len(options.IncludedLabelIDs) > 0 { if len(options.IncludedLabelIDs) > 0 {
q := elastic.NewBoolQuery() q := es.NewBoolQuery()
for _, labelID := range options.IncludedLabelIDs { for _, labelID := range options.IncludedLabelIDs {
q.Must(elastic.NewTermQuery("label_ids", labelID)) q.Must(es.TermQuery("label_ids", labelID))
} }
query.Must(q) query.Must(q)
} else if len(options.IncludedAnyLabelIDs) > 0 { } else if len(options.IncludedAnyLabelIDs) > 0 {
query.Must(elastic.NewTermsQuery("label_ids", toAnySlice(options.IncludedAnyLabelIDs)...)) query.Must(es.TermsQuery("label_ids", es.ToAnySlice(options.IncludedAnyLabelIDs)...))
} }
if len(options.ExcludedLabelIDs) > 0 { if len(options.ExcludedLabelIDs) > 0 {
q := elastic.NewBoolQuery() q := es.NewBoolQuery()
for _, labelID := range options.ExcludedLabelIDs { for _, labelID := range options.ExcludedLabelIDs {
q.MustNot(elastic.NewTermQuery("label_ids", labelID)) q.MustNot(es.TermQuery("label_ids", labelID))
} }
query.Must(q) query.Must(q)
} }
} }
if len(options.MilestoneIDs) > 0 { if len(options.MilestoneIDs) > 0 {
query.Must(elastic.NewTermsQuery("milestone_id", toAnySlice(options.MilestoneIDs)...)) query.Must(es.TermsQuery("milestone_id", es.ToAnySlice(options.MilestoneIDs)...))
} }
if options.NoProjectOnly { if options.NoProjectOnly {
query.Must(elastic.NewTermQuery("no_project", true)) query.Must(es.TermQuery("no_project", true))
} else if len(options.ProjectIDs) > 0 { } else if len(options.ProjectIDs) > 0 {
// FIXME: ISSUE-MULTIPLE-PROJECTS-FILTER: this logic is not right, it should use "AND" but not "OR" // FIXME: ISSUE-MULTIPLE-PROJECTS-FILTER: this logic is not right, it should use "AND" but not "OR"
query.Must(elastic.NewTermsQuery("project_ids", toAnySlice(options.ProjectIDs)...)) query.Must(es.TermsQuery("project_ids", es.ToAnySlice(options.ProjectIDs)...))
} }
if options.PosterID != "" { if options.PosterID != "" {
// "(none)" becomes 0, it means no poster // "(none)" becomes 0, it means no poster
posterIDInt64, _ := strconv.ParseInt(options.PosterID, 10, 64) posterIDInt64, _ := strconv.ParseInt(options.PosterID, 10, 64)
query.Must(elastic.NewTermQuery("poster_id", posterIDInt64)) query.Must(es.TermQuery("poster_id", posterIDInt64))
} }
if options.AssigneeID != "" { if options.AssigneeID != "" {
if options.AssigneeID == "(any)" { if options.AssigneeID == "(any)" {
q := elastic.NewRangeQuery("assignee_id") query.Must(es.NewRangeQuery("assignee_id").Gte(1))
q.Gte(1)
query.Must(q)
} else { } else {
// "(none)" becomes 0, it means no assignee // "(none)" becomes 0, it means no assignee
assigneeIDInt64, _ := strconv.ParseInt(options.AssigneeID, 10, 64) assigneeIDInt64, _ := strconv.ParseInt(options.AssigneeID, 10, 64)
query.Must(elastic.NewTermQuery("assignee_id", assigneeIDInt64)) query.Must(es.TermQuery("assignee_id", assigneeIDInt64))
} }
} }
if options.MentionID.Has() { if options.MentionID.Has() {
query.Must(elastic.NewTermQuery("mention_ids", options.MentionID.Value())) query.Must(es.TermQuery("mention_ids", options.MentionID.Value()))
} }
if options.ReviewedID.Has() { if options.ReviewedID.Has() {
query.Must(elastic.NewTermQuery("reviewed_ids", options.ReviewedID.Value())) query.Must(es.TermQuery("reviewed_ids", options.ReviewedID.Value()))
} }
if options.ReviewRequestedID.Has() { if options.ReviewRequestedID.Has() {
query.Must(elastic.NewTermQuery("review_requested_ids", options.ReviewRequestedID.Value())) query.Must(es.TermQuery("review_requested_ids", options.ReviewRequestedID.Value()))
} }
if options.SubscriberID.Has() { if options.SubscriberID.Has() {
query.Must(elastic.NewTermQuery("subscriber_ids", options.SubscriberID.Value())) query.Must(es.TermQuery("subscriber_ids", options.SubscriberID.Value()))
} }
if options.UpdatedAfterUnix.Has() || options.UpdatedBeforeUnix.Has() { if options.UpdatedAfterUnix.Has() || options.UpdatedBeforeUnix.Has() {
q := elastic.NewRangeQuery("updated_unix") q := es.NewRangeQuery("updated_unix")
if options.UpdatedAfterUnix.Has() { if options.UpdatedAfterUnix.Has() {
q.Gte(options.UpdatedAfterUnix.Value()) q.Gte(options.UpdatedAfterUnix.Value())
} }
@@ -258,9 +216,9 @@ func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (
if options.SortBy == "" { if options.SortBy == "" {
options.SortBy = internal.SortByCreatedAsc options.SortBy = internal.SortByCreatedAsc
} }
sortBy := []elastic.Sorter{ sortBy := []es.SortField{
parseSortBy(options.SortBy), parseSortBy(options.SortBy),
elastic.NewFieldSort("id").Desc(), {Field: "id", Desc: true},
} }
// See https://stackoverflow.com/questions/35206409/elasticsearch-2-1-result-window-is-too-large-index-max-result-window/35221900 // See https://stackoverflow.com/questions/35206409/elasticsearch-2-1-result-window-is-too-large-index-max-result-window/35221900
@@ -268,43 +226,30 @@ func (b *Indexer) Search(ctx context.Context, options *internal.SearchOptions) (
const maxPageSize = 10000 const maxPageSize = 10000
skip, limit := indexer_internal.ParsePaginator(options.Paginator, maxPageSize) skip, limit := indexer_internal.ParsePaginator(options.Paginator, maxPageSize)
searchResult, err := b.inner.Client.Search(). resp, err := b.Indexer.Search(ctx, es.SearchRequest{
Index(b.inner.VersionedIndexName()). Query: query,
Query(query). Sort: sortBy,
SortBy(sortBy...). From: skip,
From(skip).Size(limit). Size: limit,
Do(ctx) TrackTotal: true,
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
hits := make([]internal.Match, 0, limit) hits := make([]internal.Match, 0, len(resp.Hits))
for _, hit := range searchResult.Hits.Hits { for _, hit := range resp.Hits {
id, _ := strconv.ParseInt(hit.Id, 10, 64) id, _ := strconv.ParseInt(hit.ID, 10, 64)
hits = append(hits, internal.Match{ hits = append(hits, internal.Match{ID: id})
ID: id,
})
} }
return &internal.SearchResult{ return &internal.SearchResult{
Total: searchResult.TotalHits(), Total: resp.Total,
Hits: hits, Hits: hits,
}, nil }, nil
} }
func toAnySlice[T any](s []T) []any { func parseSortBy(sortBy internal.SortBy) es.SortField {
ret := make([]any, 0, len(s)) field, desc := strings.CutPrefix(string(sortBy), "-")
for _, item := range s { return es.SortField{Field: field, Desc: desc}
ret = append(ret, item)
}
return ret
}
func parseSortBy(sortBy internal.SortBy) elastic.Sorter {
field := strings.TrimPrefix(string(sortBy), "-")
ret := elastic.NewFieldSort(field)
if strings.HasPrefix(string(sortBy), "-") {
ret.Desc()
}
return ret
} }

View File

@@ -6,6 +6,7 @@ package elasticsearch
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"net/url"
"os" "os"
"testing" "testing"
"time" "time"
@@ -17,19 +18,36 @@ import (
func TestElasticsearchIndexer(t *testing.T) { func TestElasticsearchIndexer(t *testing.T) {
// The elasticsearch instance started by pull-db-tests.yml > test-unit > services > elasticsearch // The elasticsearch instance started by pull-db-tests.yml > test-unit > services > elasticsearch
url := "http://elastic:changeme@elasticsearch:9200" rawURL := "http://elastic:changeme@elasticsearch:9200"
if os.Getenv("CI") == "" { if os.Getenv("CI") == "" {
// Make it possible to run tests against a local elasticsearch instance // Make it possible to run tests against a local elasticsearch instance
url = os.Getenv("TEST_ELASTICSEARCH_URL") rawURL = os.Getenv("TEST_ELASTICSEARCH_URL")
if url == "" { if rawURL == "" {
t.Skip("TEST_ELASTICSEARCH_URL not set and not running in CI") t.Skip("TEST_ELASTICSEARCH_URL not set and not running in CI")
return return
} }
} }
// Go's net/http does not auto-attach URL userinfo as Basic Auth, so extract
// it and set the header explicitly; otherwise auth-enforced clusters answer
// 401 and the probe never reports ready.
parsed, err := url.Parse(rawURL)
require.NoError(t, err)
user := parsed.User
parsed.User = nil
probeURL := parsed.String()
require.Eventually(t, func() bool { require.Eventually(t, func() bool {
resp, err := http.Get(url) req, err := http.NewRequest(http.MethodGet, probeURL, nil)
if err != nil {
return false
}
if user != nil {
pass, _ := user.Password()
req.SetBasicAuth(user.Username(), pass)
}
resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return false return false
} }
@@ -37,7 +55,7 @@ func TestElasticsearchIndexer(t *testing.T) {
return resp.StatusCode == http.StatusOK return resp.StatusCode == http.StatusOK
}, time.Minute, time.Second, "Expected elasticsearch to be up") }, time.Minute, time.Second, "Expected elasticsearch to be up")
indexer := NewIndexer(url, fmt.Sprintf("test_elasticsearch_indexer_%d", time.Now().Unix())) indexer := NewIndexer(rawURL, fmt.Sprintf("test_elasticsearch_indexer_%d", time.Now().Unix()))
defer indexer.Close() defer indexer.Close()
tests.TestIndexer(t, indexer) tests.TestIndexer(t, indexer)

View File

@@ -116,6 +116,16 @@ var cases = []*testIndexerCase{
assert.Equal(t, len(data), int(result.Total)) assert.Equal(t, len(data), int(result.Total))
}, },
}, },
{
// Exercises the single-doc Index/Delete fast path in backends that have one (e.g. Elasticsearch).
Name: "single-doc index",
ExtraData: []*internal.IndexerData{
{ID: 999, Title: "solo-issue-marker"},
},
SearchOptions: &internal.SearchOptions{Keyword: "solo-issue-marker"},
ExpectedIDs: []int64{999},
ExpectedTotal: 1,
},
{ {
Name: "Keyword", Name: "Keyword",
ExtraData: []*internal.IndexerData{ ExtraData: []*internal.IndexerData{