Skip to content
This repository was archived by the owner on May 17, 2025. It is now read-only.

Commit 6c0b2fd

Browse files
authored
chore: lots (#24)
- rename internal server closure representation from c to server - test the subscribe handler a bunch - switch from ts-eager to esbuild-require - always throw an error (using aggregate Error) instead of arrays of errors - run tests on nonstandard sandbox ports - remove unecessary awaits - cleanup array checking
1 parent 9260aa6 commit 6c0b2fd

22 files changed

Lines changed: 563 additions & 198 deletions

lib/gateway.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { connection_init } from './messages/connection_init'
1313
import { pong } from './messages/pong'
1414

1515
export const handleGatewayEvent =
16-
(c: ServerClosure): Handler<APIGatewayWebSocketEvent, WebsocketResponse> =>
16+
(server: ServerClosure): Handler<APIGatewayWebSocketEvent, WebsocketResponse> =>
1717
async (event) => {
1818
if (!event.requestContext) {
1919
return {
@@ -23,7 +23,7 @@ export const handleGatewayEvent =
2323
}
2424

2525
if (event.requestContext.eventType === 'CONNECT') {
26-
await c.onConnect?.({ event })
26+
await server.onConnect?.({ event })
2727
return {
2828
statusCode: 200,
2929
headers: {
@@ -37,39 +37,39 @@ export const handleGatewayEvent =
3737
const message = JSON.parse(event.body!)
3838

3939
if (message.type === MessageType.ConnectionInit) {
40-
await connection_init({ c, event, message })
40+
await connection_init({ server, event, message })
4141
return {
4242
statusCode: 200,
4343
body: '',
4444
}
4545
}
4646

4747
if (message.type === MessageType.Subscribe) {
48-
await subscribe({ c, event, message })
48+
await subscribe({ server, event, message })
4949
return {
5050
statusCode: 200,
5151
body: '',
5252
}
5353
}
5454

5555
if (message.type === MessageType.Complete) {
56-
await complete({ c, event, message })
56+
await complete({ server, event, message })
5757
return {
5858
statusCode: 200,
5959
body: '',
6060
}
6161
}
6262

6363
if (message.type === MessageType.Ping) {
64-
await ping({ c, event, message })
64+
await ping({ server, event, message })
6565
return {
6666
statusCode: 200,
6767
body: '',
6868
}
6969
}
7070

7171
if (message.type === MessageType.Pong) {
72-
await pong({ c, event, message })
72+
await pong({ server, event, message })
7373
return {
7474
statusCode: 200,
7575
body: '',
@@ -78,7 +78,7 @@ export const handleGatewayEvent =
7878
}
7979

8080
if (event.requestContext.eventType === 'DISCONNECT') {
81-
await disconnect({ c, event, message: null })
81+
await disconnect({ server, event, message: null })
8282
return {
8383
statusCode: 200,
8484
body: '',

lib/index.ts

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,12 @@
1-
import { DataMapper } from '@aws/dynamodb-data-mapper'
2-
import { ServerArgs } from './types'
1+
import { ServerArgs, ServerClosure } from './types'
32
import { publish } from './pubsub/publish'
43
import { complete } from './pubsub/complete'
5-
import { createModel } from './model/createModel'
6-
import { Subscription } from './model/Subscription'
74
import { handleGatewayEvent } from './gateway'
85
import { handleStateMachineEvent } from './stepFunctionHandler'
9-
import { Connection } from './model/Connection'
6+
import { makeServerClosure } from './makeServerClosure'
107

118
export const createInstance = (opts: ServerArgs) => {
12-
const closure = {
13-
...opts,
14-
model: {
15-
Subscription: createModel({
16-
model: Subscription,
17-
table:
18-
opts.tableNames?.subscriptions || 'subscriptionless_subscriptions',
19-
}),
20-
Connection: createModel({
21-
model: Connection,
22-
table: opts.tableNames?.connections || 'subscriptionless_connections',
23-
}),
24-
},
25-
mapper: new DataMapper({ client: opts.dynamodb }),
26-
} as const
9+
const closure: ServerClosure = makeServerClosure(opts)
2710

2811
return {
2912
gatewayHandler: handleGatewayEvent(closure),

lib/makeServerClosure.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { DataMapper } from '@aws/dynamodb-data-mapper'
2+
import { ServerArgs, ServerClosure } from './types'
3+
import { createModel } from './model/createModel'
4+
import { Subscription } from './model/Subscription'
5+
import { Connection } from './model/Connection'
6+
7+
export function makeServerClosure(opts: ServerArgs): ServerClosure {
8+
return {
9+
...opts,
10+
model: {
11+
Subscription: createModel({
12+
model: Subscription,
13+
table: opts.tableNames?.subscriptions || 'subscriptionless_subscriptions',
14+
}),
15+
Connection: createModel({
16+
model: Connection,
17+
table: opts.tableNames?.connections || 'subscriptionless_connections',
18+
}),
19+
},
20+
mapper: new DataMapper({ client: opts.dynamodb }),
21+
}
22+
}

lib/messages/complete.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1+
import AggregateError from 'aggregate-error'
12
import { parse } from 'graphql'
23
import { CompleteMessage } from 'graphql-ws'
34
import { buildExecutionContext } from 'graphql/execution/execute'
4-
import { SubscribePsuedoIterable } from '../types'
5+
import { SubscribePsuedoIterable, MessageHandler } from '../types'
56
import { deleteConnection } from '../utils/aws'
67
import { constructContext, getResolverAndArgs } from '../utils/graphql'
7-
import { MessageHandler } from './types'
8+
import { isArray } from '../utils/isArray'
89

910
/** Handler function for 'complete' message. */
1011
export const complete: MessageHandler<CompleteMessage> =
11-
async ({ c, event, message }) => {
12+
async ({ server, event, message }) => {
1213
try {
13-
const topicSubscriptions = await c.mapper.query(c.model.Subscription, {
14+
const topicSubscriptions = server.mapper.query(server.model.Subscription, {
1415
id: `${event.requestContext.connectionId!}|${message.id}`,
1516
})
1617
let deletions = [] as Promise<any>[]
@@ -21,35 +22,35 @@ export const complete: MessageHandler<CompleteMessage> =
2122
// only call onComplete per subscription
2223
if (deletions.length === 0) {
2324
const execContext = buildExecutionContext(
24-
c.schema,
25+
server.schema,
2526
parse(entity.subscription.query),
2627
undefined,
27-
await constructContext(c)(entity),
28+
await constructContext(server)(entity),
2829
entity.subscription.variables,
2930
entity.subscription.operationName,
3031
undefined,
3132
)
3233

33-
if (!('operation' in execContext)) {
34-
throw execContext
34+
if (isArray(execContext)) {
35+
throw new AggregateError(execContext)
3536
}
3637

37-
const [field, root, args, context, info] = getResolverAndArgs(c)(execContext)
38+
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
3839

3940
const onComplete = (field?.subscribe as SubscribePsuedoIterable)?.onComplete
4041
if (onComplete) {
4142
await onComplete(root, args, context, info)
4243
}
4344
}
4445

45-
await c.mapper.delete(entity)
46+
await server.mapper.delete(entity)
4647
})(),
4748
]
4849
}
4950

5051
await Promise.all(deletions)
5152
} catch (err) {
52-
await c.onError?.(err, { event, message })
53-
await deleteConnection(c)(event.requestContext)
53+
await server.onError?.(err, { event, message })
54+
await deleteConnection(server)(event.requestContext)
5455
}
5556
}

lib/messages/connection_init.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,46 @@
11
import { StepFunctions } from 'aws-sdk'
22
import { ConnectionInitMessage, MessageType } from 'graphql-ws'
3-
import { StateFunctionInput } from '../types'
3+
import { StateFunctionInput, MessageHandler } from '../types'
44
import { deleteConnection, sendMessage } from '../utils/aws'
5-
import { MessageHandler } from './types'
65

76
/** Handler function for 'connection_init' message. */
87
export const connection_init: MessageHandler<ConnectionInitMessage> =
9-
async ({ c, event, message }) => {
8+
async ({ server, event, message }) => {
109
try {
11-
const res = c.onConnectionInit
12-
? await c.onConnectionInit({ event, message })
10+
const res = server.onConnectionInit
11+
? await server.onConnectionInit({ event, message })
1312
: message.payload
1413

15-
if (c.pingpong) {
14+
if (server.pingpong) {
1615
await new StepFunctions()
1716
.startExecution({
18-
stateMachineArn: c.pingpong.machine,
17+
stateMachineArn: server.pingpong.machine,
1918
name: event.requestContext.connectionId!,
2019
input: JSON.stringify({
2120
connectionId: event.requestContext.connectionId!,
2221
domainName: event.requestContext.domainName!,
2322
stage: event.requestContext.stage,
2423
state: 'PING',
2524
choice: 'WAIT',
26-
seconds: c.pingpong.delay,
25+
seconds: server.pingpong.delay,
2726
} as StateFunctionInput),
2827
})
2928
.promise()
3029
}
3130

3231
// Write to persistence
33-
const connection = Object.assign(new c.model.Connection(), {
32+
const connection = Object.assign(new server.model.Connection(), {
3433
id: event.requestContext.connectionId!,
3534
requestContext: event.requestContext,
3635
payload: res,
3736
})
38-
await c.mapper.put(connection)
39-
return sendMessage(c)({
37+
await server.mapper.put(connection)
38+
return sendMessage(server)({
4039
...event.requestContext,
4140
message: { type: MessageType.ConnectionAck },
4241
})
4342
} catch (err) {
44-
await c.onError?.(err, { event, message })
45-
await deleteConnection(c)(event.requestContext)
43+
await server.onError?.(err, { event, message })
44+
await deleteConnection(server)(event.requestContext)
4645
}
4746
}

lib/messages/disconnect.ts

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,73 @@
1+
import AggregateError from 'aggregate-error'
12
import { parse } from 'graphql'
23
import { equals } from '@aws/dynamodb-expressions'
34
import { buildExecutionContext } from 'graphql/execution/execute'
4-
import { MessageHandler } from './types'
55
import { constructContext, getResolverAndArgs } from '../utils/graphql'
6-
import { SubscribePsuedoIterable } from '../types'
6+
import { SubscribePsuedoIterable, MessageHandler } from '../types'
7+
import { isArray } from '../utils/isArray'
78

89
/** Handler function for 'disconnect' message. */
910
export const disconnect: MessageHandler<null> =
10-
async ({ c, event }) => {
11+
async ({ server, event }) => {
1112
try {
12-
await c.onDisconnect?.({ event })
13+
await server.onDisconnect?.({ event })
1314

14-
const entities = await c.mapper.query(
15-
c.model.Subscription,
15+
const entities = server.mapper.query(
16+
server.model.Subscription,
1617
{
1718
connectionId: equals(event.requestContext.connectionId),
1819
},
1920
{ indexName: 'ConnectionIndex' },
2021
)
2122

2223
const completed = {} as Record<string, boolean>
23-
let deletions = [] as Promise<any>[]
24+
const deletions = [] as Promise<any>[]
2425
for await (const entity of entities) {
25-
deletions = [
26-
...deletions,
26+
deletions.push(
2727
(async () => {
2828
// only call onComplete per subscription
2929
if (!completed[entity.subscriptionId]) {
3030
completed[entity.subscriptionId] = true
3131

3232
const execContext = buildExecutionContext(
33-
c.schema,
33+
server.schema,
3434
parse(entity.subscription.query),
3535
undefined,
36-
await constructContext(c)(entity),
36+
await constructContext(server)(entity),
3737
entity.subscription.variables,
3838
entity.subscription.operationName,
3939
undefined,
4040
)
4141

42-
if (!('operation' in execContext)) {
43-
throw execContext
42+
if (isArray(execContext)) {
43+
throw new AggregateError(execContext)
4444
}
4545

46-
const [field, root, args, context, info] =
47-
getResolverAndArgs(c)(execContext)
46+
47+
const [field, root, args, context, info] = getResolverAndArgs(server)(execContext)
4848

4949
const onComplete = (field?.subscribe as SubscribePsuedoIterable)?.onComplete
5050
if (onComplete) {
5151
await onComplete(root, args, context, info)
5252
}
5353
}
5454

55-
await c.mapper.delete(entity)
55+
await server.mapper.delete(entity)
5656
})(),
57-
]
57+
)
5858
}
5959

6060
await Promise.all([
6161
// Delete subscriptions
6262
...deletions,
6363
// Delete connection
64-
c.mapper.delete(
65-
Object.assign(new c.model.Connection(), {
64+
server.mapper.delete(
65+
Object.assign(new server.model.Connection(), {
6666
id: event.requestContext.connectionId!,
6767
}),
6868
),
6969
])
7070
} catch (err) {
71-
await c.onError?.(err, { event })
71+
await server.onError?.(err, { event })
7272
}
7373
}

lib/messages/ping.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
import { PingMessage, MessageType } from 'graphql-ws'
22
import { deleteConnection, sendMessage } from '../utils/aws'
3-
import { MessageHandler } from './types'
3+
import { MessageHandler } from '../types'
44

55
/** Handler function for 'ping' message. */
66
export const ping: MessageHandler<PingMessage> =
7-
async ({ c, event, message }) => {
7+
async ({ server, event, message }) => {
88
try {
9-
await c.onPing?.({ event, message })
10-
return sendMessage(c)({
9+
await server.onPing?.({ event, message })
10+
return sendMessage(server)({
1111
...event.requestContext,
1212
message: { type: MessageType.Pong },
1313
})
1414
} catch (err) {
15-
await c.onError?.(err, { event, message })
16-
await deleteConnection(c)(event.requestContext)
15+
await server.onError?.(err, { event, message })
16+
await deleteConnection(server)(event.requestContext)
1717
}
1818
}

0 commit comments

Comments
 (0)