|
1 | | -import { StructRowProxy, Utf8 } from '@apache-arrow/esnext-esm'; |
2 | | -import { pluginV3 } from '@cloudquery/plugin-pb-javascript'; |
3 | 1 | import { default as Ajv } from 'ajv'; |
4 | 2 |
|
5 | | -import { WriteRequest, WriteStream, ReadStream, ReadRequest } from '../grpc/plugin.js'; |
6 | 3 | import { Plugin, newPlugin, SyncOptions, TableOptions, NewClientFunction } from '../plugin/plugin.js'; |
7 | 4 | import { sync } from '../scheduler/scheduler.js'; |
8 | | -import { createColumn } from '../schema/column.js'; |
9 | | -import { pathResolver } from '../schema/resolvers.js'; |
10 | | -import { Table, createTable, filterTables, decodeTable, decodeRecord, getPrimaryKeys } from '../schema/table.js'; |
| 5 | +import { Table, filterTables } from '../schema/table.js'; |
| 6 | + |
| 7 | +import { createDeleteStale } from './delete-stale.js'; |
| 8 | +import { createOverwrite } from './overwrite.js'; |
| 9 | +import { createRead } from './read.js'; |
| 10 | +import { createTables } from './tables.js'; |
| 11 | +import { createWrite } from './write.js'; |
11 | 12 |
|
12 | 13 | export const createMemDBClient = () => { |
13 | 14 | //eslint-disable-next-line @typescript-eslint/no-explicit-any |
@@ -39,100 +40,12 @@ export const newMemDBPlugin = (): Plugin => { |
39 | 40 | const memoryDB = memdbClient.memoryDB; |
40 | 41 | const tables = memdbClient.tables; |
41 | 42 |
|
42 | | - const allTables: Table[] = [ |
43 | | - createTable({ |
44 | | - name: 'table1', |
45 | | - title: 'Table 1', |
46 | | - description: 'Table 1 description', |
47 | | - resolver: (clientMeta, parent, stream) => { |
48 | | - stream.write({ id: 'table1-name1' }); |
49 | | - stream.write({ id: 'table1-name2' }); |
50 | | - return Promise.resolve(); |
51 | | - }, |
52 | | - columns: [ |
53 | | - createColumn({ |
54 | | - name: 'id', |
55 | | - type: new Utf8(), |
56 | | - resolver: pathResolver('id'), |
57 | | - }), |
58 | | - ], |
59 | | - }), |
60 | | - createTable({ |
61 | | - name: 'table2', |
62 | | - title: 'Table 2', |
63 | | - description: 'Table 2 description', |
64 | | - resolver: (clientMeta, parent, stream) => { |
65 | | - stream.write({ name: 'table2-name1' }); |
66 | | - stream.write({ name: 'table2-name2' }); |
67 | | - return Promise.resolve(); |
68 | | - }, |
69 | | - columns: [ |
70 | | - createColumn({ |
71 | | - name: 'name', |
72 | | - type: new Utf8(), |
73 | | - resolver: pathResolver('name'), |
74 | | - }), |
75 | | - ], |
76 | | - }), |
77 | | - ]; |
78 | | - |
79 | | - const memdb: { inserts: unknown[]; [key: string]: unknown } = { |
80 | | - inserts: [], |
81 | | - ...memoryDB, |
82 | | - }; |
83 | | - |
84 | | - //eslint-disable-next-line @typescript-eslint/no-explicit-any |
85 | | - const overwrite = (table: Table, primaryKeys: string[], record: StructRowProxy<any>) => { |
86 | | - const tableData = memoryDB[table.name] || []; |
87 | | - |
88 | | - if (primaryKeys.length === 0) { |
89 | | - // If there are no primary keys, simply append the data |
90 | | - tableData.push(record); |
91 | | - memoryDB[table.name] = tableData; |
92 | | - return; |
93 | | - } |
94 | | - |
95 | | - // Otherwise, perform an upsert based on the primary keys |
96 | | - const recordIndex = tableData.findIndex((existingRecord) => { |
97 | | - return primaryKeys.every((key) => existingRecord[key] === record[key]); |
98 | | - }); |
99 | | - |
100 | | - if (recordIndex > -1) { |
101 | | - // If record exists, update (overwrite) it |
102 | | - tableData[recordIndex] = record; |
103 | | - } else { |
104 | | - // If record doesn't exist, insert it |
105 | | - tableData.push(record); |
106 | | - } |
107 | | - |
108 | | - memoryDB[table.name] = tableData; // Update the memoryDB with the modified table data |
109 | | - }; |
110 | | - |
111 | | - const deleteStale = (message: pluginV3.cloudquery.plugin.v3.Write.MessageDeleteStale): void => { |
112 | | - const tableName = message.table_name; |
113 | | - |
114 | | - // Filter the table based on the provided criteria |
115 | | - const filteredTable = memoryDB[tableName].filter((row) => { |
116 | | - const sc = row.Schema(); |
117 | | - |
118 | | - const sourceColIndex = sc.FieldIndices('source_name_column'); |
119 | | - const syncColIndex = sc.FieldIndices('sync_time_column'); |
120 | | - |
121 | | - // Ensure both columns are present |
122 | | - if (sourceColIndex === undefined || syncColIndex === undefined) { |
123 | | - return true; // Keep the record if either column is missing |
124 | | - } |
125 | | - |
126 | | - const rowSourceName = row.Column(sourceColIndex).Value(0); |
127 | | - const rowSyncTime = row.Column(syncColIndex).Value(0); // Assuming it returns a Date object |
| 43 | + const overwrite = createOverwrite(memoryDB); |
| 44 | + const deleteStale = createDeleteStale(memoryDB); |
| 45 | + const write = createWrite(memoryDB, tables, overwrite, deleteStale); |
| 46 | + const read = createRead(memoryDB); |
128 | 47 |
|
129 | | - // If source names match and the record's sync time is not before the given sync time, keep the record |
130 | | - return rowSourceName === message.source_name && !rowSyncTime.before(message.sync_time); |
131 | | - }); |
132 | | - |
133 | | - // Update the memory database with the filtered table |
134 | | - memoryDB[tableName] = filteredTable; |
135 | | - }; |
| 48 | + const allTables = createTables(); |
136 | 49 |
|
137 | 50 | const pluginClient = { |
138 | 51 | plugin: null as unknown as Plugin, |
@@ -160,85 +73,11 @@ export const newMemDBPlugin = (): Plugin => { |
160 | 73 | concurrency, |
161 | 74 | }); |
162 | 75 | }, |
163 | | - write(stream: WriteStream): Promise<void> { |
164 | | - return new Promise((resolve, reject) => { |
165 | | - stream.on('data', (request: WriteRequest) => { |
166 | | - switch (request.message) { |
167 | | - case 'migrate_table': { |
168 | | - // Update table schema in the `tables` map |
169 | | - const table = decodeTable(request.migrate_table.table); |
170 | | - tables[table.name] = table; |
171 | | - break; |
172 | | - } |
173 | | - |
174 | | - case 'insert': { |
175 | | - const [tableName, batches] = decodeRecord(request.insert.record); |
176 | | - |
177 | | - if (!memoryDB[tableName]) { |
178 | | - memoryDB[tableName] = []; |
179 | | - } |
180 | | - |
181 | | - const tableSchema = tables[tableName]; |
182 | | - const pks = getPrimaryKeys(tableSchema); |
183 | | - |
184 | | - for (const batch of batches) { |
185 | | - //eslint-disable-next-line unicorn/no-array-for-each |
186 | | - for (const record of batch) { |
187 | | - overwrite(tableSchema, pks, record); |
188 | | - } |
189 | | - } |
190 | | - break; |
191 | | - } |
192 | | - |
193 | | - case 'delete': { |
194 | | - deleteStale(request.delete); |
195 | | - break; |
196 | | - } |
197 | | - |
198 | | - default: { |
199 | | - throw new Error(`Unknown request message type: ${request.message}`); |
200 | | - } |
201 | | - } |
202 | | - }); |
203 | | - |
204 | | - stream.on('finish', () => { |
205 | | - resolve(); |
206 | | - }); |
207 | | - |
208 | | - stream.on('error', (error) => { |
209 | | - reject(error); |
210 | | - }); |
211 | | - }); |
212 | | - }, |
213 | | - read(stream: ReadStream): Promise<void> { |
214 | | - return new Promise((resolve, reject) => { |
215 | | - stream.on('data', (request: ReadRequest) => { |
216 | | - const table = decodeTable(request.table); |
217 | | - |
218 | | - try { |
219 | | - const rows = memoryDB[table.name] || []; |
220 | | - |
221 | | - // We iterate over records in reverse here because we don't set an expectation |
222 | | - // of ordering on plugins, and we want to make sure that the tests are not |
223 | | - // dependent on the order of insertion either. |
224 | | - for (let index = rows.length - 1; index >= 0; index--) { |
225 | | - stream.write(rows[index]); |
226 | | - } |
227 | | - stream.end(); |
228 | | - resolve(); |
229 | | - } catch (error) { |
230 | | - reject(error); |
231 | | - } |
232 | | - }); |
233 | | - |
234 | | - stream.on('error', (error) => { |
235 | | - reject(error); |
236 | | - }); |
237 | | - }); |
238 | | - }, |
| 76 | + write, |
| 77 | + read, |
239 | 78 | }; |
240 | 79 |
|
241 | | - const newClient: NewClientFunction = (logger, spec, options) => { |
| 80 | + const newClient: NewClientFunction = (logger, spec /* options */) => { |
242 | 81 | const parsedSpec = JSON.parse(spec) as Partial<Spec>; |
243 | 82 | const validSchema = validate(parsedSpec); |
244 | 83 | if (!validSchema) { |
|
0 commit comments