UBER-536: Fix test stability (#3466)

Signed-off-by: Andrey Sobolev <haiodo@gmail.com>
This commit is contained in:
Andrey Sobolev 2023-06-29 12:46:28 +07:00 committed by GitHub
parent ee0754422d
commit cf2ca2ee27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 179 deletions

View File

@ -357,7 +357,12 @@ export function devTool (
.action(async (email: string, cmd) => {
const { mongodbUri } = prepareTools()
return await withDatabase(mongodbUri, async (db) => {
await confirmEmail(db, email)
const account = await getAccount(db, email)
if (account?.confirmed === true) {
console.log(`Already confirmed:${email}`)
} else {
await confirmEmail(db, email)
}
})
})

View File

@ -1 +1 @@
{ "major": 0, "minor": 6, "patch": 108 }
{ "major": 0, "minor": 6, "patch": 109 }

View File

@ -625,193 +625,194 @@ export async function restore (
model: 'upgrade'
})) as unknown as CoreClient & BackupClient
async function processDomain (c: Domain): Promise<boolean> {
try {
const changeset = await loadDigest(storage, snapshots, c, date)
// We need to load full changeset from server
const serverChangeset = new Map<Ref<Doc>, string>()
async function processDomain (c: Domain): Promise<void> {
const changeset = await loadDigest(storage, snapshots, c, date)
// We need to load full changeset from server
const serverChangeset = new Map<Ref<Doc>, string>()
let idx: number | undefined
let loaded = 0
let last = 0
let el = 0
let chunks = 0
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx)
chunks++
let idx: number | undefined
let loaded = 0
let last = 0
let el = 0
let chunks = 0
while (true) {
const st = Date.now()
const it = await connection.loadChunk(c, idx)
chunks++
idx = it.idx
el += Date.now() - st
idx = it.idx
el += Date.now() - st
for (const [_id, hash] of Object.entries(it.docs)) {
serverChangeset.set(_id as Ref<Doc>, hash)
loaded++
}
const mr = Math.round(loaded / 10000)
if (mr !== last) {
last = mr
console.log(' loaded from server', loaded, el, chunks)
el = 0
chunks = 0
}
if (it.finished) {
break
}
for (const [_id, hash] of Object.entries(it.docs)) {
serverChangeset.set(_id as Ref<Doc>, hash)
loaded++
}
console.log(' loaded', loaded)
console.log('\tcompare documents', changeset.size, serverChangeset.size)
// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) =>
!serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it))
)
const mr = Math.round(loaded / 10000)
if (mr !== last) {
last = mr
console.log(' loaded from server', loaded, el, chunks)
el = 0
chunks = 0
}
if (it.finished) {
break
}
}
console.log(' loaded', loaded)
console.log('\tcompare documents', changeset.size, serverChangeset.size)
// Let's find difference
const docsToAdd = new Map(
Array.from(changeset.entries()).filter(
([it]) => !serverChangeset.has(it) || (serverChangeset.has(it) && serverChangeset.get(it) !== changeset.get(it))
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))
)
const docsToRemove = Array.from(serverChangeset.keys()).filter((it) => !changeset.has(it))
const docs: Doc[] = []
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
let sendSize = 0
let totalSend = 0
async function sendChunk (doc: Doc | undefined, len: number): Promise<void> {
if (doc !== undefined) {
docsToAdd.delete(doc._id)
docs.push(doc)
}
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}
const docs: Doc[] = []
const blobs = new Map<string, { doc: Doc | undefined, buffer: Buffer | undefined }>()
let sendSize = 0
let totalSend = 0
async function sendChunk (doc: Doc | undefined, len: number): Promise<void> {
if (doc !== undefined) {
docsToAdd.delete(doc._id)
docs.push(doc)
}
let processed = 0
sendSize = sendSize + len
if (sendSize > dataUploadSize || (doc === undefined && docs.length > 0)) {
console.log('upload', docs.length, `send: ${totalSend} from ${docsToAdd.size + totalSend}`, 'size:', sendSize)
totalSend += docs.length
await connection.upload(c, docs)
docs.length = 0
sendSize = 0
}
}
let processed = 0
for (const s of rsnapshots) {
const d = s.domains[c]
for (const s of rsnapshots) {
const d = s.domains[c]
if (d !== undefined && docsToAdd.size > 0) {
const sDigest = await loadDigest(storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
if (requiredDocs.size > 0) {
console.log('updating', c, requiredDocs.size)
// We have required documents here.
for (const sf of d.storage ?? []) {
if (docsToAdd.size === 0) {
break
}
console.log('processing', sf, processed)
if (d !== undefined && docsToAdd.size > 0) {
const sDigest = await loadDigest(storage, [s], c)
const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it)))
if (requiredDocs.size > 0) {
console.log('updating', c, requiredDocs.size)
// We have required documents here.
for (const sf of d.storage ?? []) {
if (docsToAdd.size === 0) {
break
}
console.log('processing', sf, processed)
const readStream = await storage.load(sf)
const ex = extract()
const readStream = await storage.load(sf)
const ex = extract()
ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
processed++
// We found blob data
if (requiredDocs.has(name as Ref<Doc>)) {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
ex.on('entry', (headers, stream, next) => {
const name = headers.name ?? ''
processed++
// We found blob data
if (requiredDocs.has(name as Ref<Doc>)) {
const chunks: Buffer[] = []
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const d = blobs.get(name)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
next()
} else {
const d = blobs.get(name)
blobs.delete(name)
const doc = d?.doc as BlobData
doc.base64Data = bf.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.BlobData) {
const d = blobs.get(bname)
if (d === undefined) {
blobs.set(name, { doc: undefined, buffer: bf })
blobs.set(bname, { doc, buffer: undefined })
next()
} else {
const d = blobs.get(name)
blobs.delete(name)
const doc = d?.doc as BlobData
doc.base64Data = bf.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else if (name.endsWith('.json') && requiredDocs.has(name.substring(0, name.length - 5) as Ref<Doc>)) {
const chunks: Buffer[] = []
const bname = name.substring(0, name.length - 5)
stream.on('data', (chunk) => {
chunks.push(chunk)
})
stream.on('end', () => {
const bf = Buffer.concat(chunks)
const doc = JSON.parse(bf.toString()) as Doc
if (doc._class === core.class.BlobData) {
const d = blobs.get(bname)
if (d === undefined) {
blobs.set(bname, { doc, buffer: undefined })
next()
} else {
const d = blobs.get(bname)
blobs.delete(bname)
;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
} else {
blobs.delete(bname)
;(doc as BlobData).base64Data = d?.buffer?.toString('base64') ?? ''
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else {
next()
}
stream.resume() // just auto drain the stream
})
const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
} else {
sendChunk(doc, bf.length).finally(() => {
requiredDocs.delete(doc._id)
next()
})
}
})
} else {
next()
}
stream.resume() // just auto drain the stream
})
const endPromise = new Promise((resolve) => {
ex.on('finish', () => {
resolve(null)
})
const unzip = createGunzip()
readStream.pipe(unzip)
unzip.pipe(ex)
})
const unzip = createGunzip()
readStream.pipe(unzip)
unzip.pipe(ex)
await endPromise
}
} else {
console.log('domain had no changes', c)
await endPromise
}
} else {
console.log('domain had no changes', c)
}
}
}
await sendChunk(undefined, 0)
if (docsToRemove.length > 0 && merge !== true) {
console.log('cleanup', docsToRemove.length)
while (docsToRemove.length > 0) {
const part = docsToRemove.splice(0, 10000)
await connection.clean(c, part)
}
await sendChunk(undefined, 0)
if (docsToRemove.length > 0 && merge !== true) {
console.log('cleanup', docsToRemove.length)
while (docsToRemove.length > 0) {
const part = docsToRemove.splice(0, 10000)
await connection.clean(c, part)
}
return true
} catch (err: any) {
console.log('error', err)
return false
}
}
try {
for (const c of domains) {
console.log('loading server changeset for', c)
let retry = 3
let retry = 5
while (retry > 0) {
retry--
if (await processDomain(c)) {
try {
await processDomain(c)
break
} catch (err: any) {
if (retry === 0) {
console.log('error', err)
} else {
console.log('Wait for few seconds for elastic')
await new Promise((resolve) => setTimeout(resolve, 1000))
}
}
}
}

View File

@ -18,9 +18,6 @@ services:
- 9002:9000
elastic:
image: 'elasticsearch:7.14.2'
command: |
/bin/sh -c "./bin/elasticsearch-plugin list | grep -q ingest-attachment || yes | ./bin/elasticsearch-plugin install --silent ingest-attachment;
/usr/local/bin/docker-entrypoint.sh eswrapper"
expose:
- 9200
ports:

View File

@ -4,6 +4,8 @@ docker-compose -p sanity kill
docker-compose -p sanity down --volumes
docker-compose -p sanity up -d --force-recreate --renew-anon-volumes
./wait-elastic.sh 9201
# Creae workspace record in accounts
./tool.sh create-workspace sanity-ws -o SanityTest
# Create user record in accounts

View File

@ -1,26 +0,0 @@
res=''
port=$1
echo "Warning Elastic to up and running with attachment processor... ${port}"
while true
do
res=$(curl -s -XPUT "localhost:${port}/_ingest/pipeline/attachment?pretty" -H 'Content-Type: application/json' -d'
{
"description" : "Field for processing file attachments",
"processors" : [
{
"attachment" : {
"field" : "data"
},
"remove" : {
"field" : "data"
}
}
]
}
')
if [[ $res = *"acknowledged"* ]]; then
echo "Elastic processor is up and running..."
exit 0
fi
sleep 1
done

17
tests/wait-elastic.sh Executable file
View File

@ -0,0 +1,17 @@
res=''
port=$1
echo "Warning Elastic to up and running with attachment processor... ${port}"
for i in `seq 1 30`;
do
res=$(curl -s http://localhost:${port}/_cluster/health )
echo "$res"
if [[ $res = *"yellow"* ]]; then
echo "Elastic up and running..."
exit 0
fi
if [[ $res = *"green"* ]]; then
echo "Elastic up and running..."
exit 0
fi
sleep 1
done