This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch e2e-test-fixes in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit dd1d47154d4c85f055c25fc1605f8b9a59705a7a Author: Philipp Zehnder <[email protected]> AuthorDate: Thu Mar 19 15:57:43 2026 +0100 fix: exclude timestamp from CSV import schema --- .../importer/CsvDataLakeImportService.java | 11 +- .../importer/CsvImportValidationService.java | 2 + .../importer/CsvDataLakeImportServiceTest.java | 52 ++++++++ ui/cypress/fixtures/datalake/missingData.json | 7 -- .../support/utils/dataset/DataLakeSeedUtils.ts | 136 +++++++++++++-------- ui/cypress/support/utils/dataset/DatasetUtils.ts | 17 +++ .../chart/filterNumericalStringProperties.spec.ts | 4 +- .../tests/chart/missingDataInDataLake.spec.ts | 20 ++- ui/cypress/tests/dataset/csvImport.spec.ts | 2 + .../tests/pipeline/pipelineMultiSelect.spec.ts | 6 +- .../userManagement/testGroupManagement.spec.ts | 7 -- .../dataset-feature-card.component.html | 24 +++- 12 files changed, 209 insertions(+), 79 deletions(-) diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportService.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportService.java index 5ba84ddd2a..0453f07ea9 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportService.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportService.java @@ -300,7 +300,7 @@ public class CsvDataLakeImportService { var measure = new DataLakeMeasure(); measure.setMeasureName(request.getTarget().getMeasurementName().trim()); measure.setTimestampField(STREAM_PREFIX + request.getTimestampColumn()); - measure.setEventSchema(eventSchema); + measure.setEventSchema(removeTimestampProperty(eventSchema, request.getTimestampColumn())); return new StoredMeasure(schemaManagement.createOrUpdateMeasurement(measure, principalSid), true); } @@ -320,6 +320,15 @@ public class CsvDataLakeImportService { return result; } + private EventSchema removeTimestampProperty(EventSchema eventSchema, String timestampColumn) { + var sanitizedSchema = new EventSchema(); + sanitizedSchema.setEventProperties(eventSchema.getEventProperties() + .stream() + .filter(property -> !Objects.equals(property.getRuntimeName(), timestampColumn)) + .collect(Collectors.toList())); + return sanitizedSchema; + } + private record StoredMeasure(DataLakeMeasure measure, boolean createdNewMeasurement) { } } diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvImportValidationService.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvImportValidationService.java index 8a8586fc0e..7931c5f452 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvImportValidationService.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/importer/CsvImportValidationService.java @@ -243,9 +243,11 @@ class CsvImportValidationService { var existingProperties = existingMeasure.getEventSchema() .getEventProperties() .stream() + .filter(property -> !Objects.equals(property.getRuntimeName(), timestampColumn)) .collect(Collectors.toMap(EventProperty::getRuntimeName, property -> property)); var importedProperties = importSchema.getEventProperties() .stream() + .filter(property -> !Objects.equals(property.getRuntimeName(), timestampColumn)) .collect(Collectors.toMap(EventProperty::getRuntimeName, property -> property)); importedProperties.keySet().stream() diff --git a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportServiceTest.java b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportServiceTest.java index f0dcec8947..5cd851be5a 100644 --- a/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportServiceTest.java +++ b/streampipes-rest/src/test/java/org/apache/streampipes/rest/impl/datalake/importer/CsvDataLakeImportServiceTest.java @@ -47,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -125,6 +126,27 @@ class CsvDataLakeImportServiceTest { verify(dataWriter).writeData(any(DataLakeMeasure.class), anyList(), anyList()); } + @Test + void shouldStoreNewMeasurementWithoutTimestampPropertyInEventSchema() { + var schemaManagement = mock(IDataExplorerSchemaManagement.class); + var dataWriter = mock(DataLakeDataWriter.class); + var service = new CsvDataLakeImportService(schemaManagement, dataWriter); + + when(schemaManagement.getExistingMeasureByName("new-measure")) + .thenReturn(Optional.empty()); + when(schemaManagement.createOrUpdateMeasurement(any(DataLakeMeasure.class), eq("sid"))) + .thenAnswer(invocation -> invocation.getArgument(0, DataLakeMeasure.class)); + + service.importData(makeImportRequest(CsvImportTargetMode.NEW, "new-measure"), "sid"); + + verify(schemaManagement).createOrUpdateMeasurement(any(DataLakeMeasure.class), eq("sid")); + verify(schemaManagement).createOrUpdateMeasurement(org.mockito.ArgumentMatchers.argThat(measure -> + measure.getEventSchema().getEventProperties().stream() + .noneMatch(property -> "timestamp".equals(property.getRuntimeName())) + && "s0::timestamp".equals(measure.getTimestampField()) + ), eq("sid")); + } + @Test void shouldPreviewOnceAndImportFromStoredUpload() throws Exception { var schemaManagement = mock(IDataExplorerSchemaManagement.class); @@ -247,6 +269,27 @@ class CsvDataLakeImportServiceTest { .anyMatch(message -> message.getMessage().contains("exactly match"))); } + @Test + void shouldImportIntoExistingMeasurementWhenOnlyTimestampColumnDiffers() { + var schemaManagement = mock(IDataExplorerSchemaManagement.class); + var dataWriter = mock(DataLakeDataWriter.class); + var service = new CsvDataLakeImportService(schemaManagement, dataWriter); + + var existingMeasure = new DataLakeMeasure(); + existingMeasure.setMeasureName("existing-measure"); + existingMeasure.setTimestampField("s0::timestamp"); + existingMeasure.setEventSchema(makeStoredExistingSchema()); + + when(schemaManagement.getExistingMeasureByName("existing-measure")) + .thenReturn(Optional.of(existingMeasure)); + + var result = service.importData(makeImportRequest(CsvImportTargetMode.EXISTING, "existing-measure"), "sid"); + + assertFalse(result.isCreatedNewMeasurement()); + assertEquals(2, result.getImportedRowCount()); + verify(dataWriter).writeData(any(DataLakeMeasure.class), anyList(), anyList()); + } + private CsvImportPreviewRequest makePreviewRequest(String measurementName) { var request = new CsvImportPreviewRequest(); request.setCsvConfig(makeCsvConfig()); @@ -323,4 +366,13 @@ class CsvDataLakeImportServiceTest { return new EventSchema(List.of(timestamp, temperature)); } + + private EventSchema makeStoredExistingSchema() { + var temperature = new EventPropertyPrimitive(); + temperature.setRuntimeName("temperature"); + temperature.setRuntimeType(XSD.FLOAT.toString()); + temperature.setPropertyScope("MEASUREMENT_PROPERTY"); + + return new EventSchema(List.of(temperature)); + } } diff --git a/ui/cypress/fixtures/datalake/missingData.json b/ui/cypress/fixtures/datalake/missingData.json deleted file mode 100644 index 6143bf0e9a..0000000000 --- a/ui/cypress/fixtures/datalake/missingData.json +++ /dev/null @@ -1,7 +0,0 @@ -[ - { "timestamp": 1667904471000, "v1": 4.1, "v2": "abc", "v3": true, "v4": 1 }, - { "timestamp": 1667904472000, "v1": 4.2, "v2": "abc", "v3": false, "v4": 2 }, - { "timestamp": 1667904473000, "v1": 4.3 }, - { "timestamp": 1667904474000, "v1": 4.4, "v2": "abc", "v3": true, "v4": 4 }, - { "timestamp": 1667904475000, "v1": 4.5, "v4": 5 } -] diff --git a/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts b/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts index a81fbd6c6f..45fc8eaeed 100644 --- a/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts +++ b/ui/cypress/support/utils/dataset/DataLakeSeedUtils.ts @@ -69,6 +69,16 @@ interface CsvFixtureImportOptions { columnOverrides?: Record<string, ColumnOverride>; } +interface CsvInlineImportOptions { + headers: string[]; + rows: string[][]; + measurementName: string; + delimiter?: string; + decimalSeparator?: '.' | ','; + timestampColumn?: string; + columnOverrides?: Record<string, ColumnOverride>; +} + interface JsonArrayFixtureImportOptions { fixture: string; measurementName: string; @@ -117,6 +127,27 @@ export class DataLakeSeedUtils { }); } + public static importCsvData( + options: CsvInlineImportOptions, + ): Cypress.Chainable<CsvImportResult> { + const delimiter = options.delimiter ?? ';'; + const decimalSeparator = options.decimalSeparator ?? '.'; + const timestampColumn = options.timestampColumn ?? options.headers[0]; + + return this.previewAndImport({ + headers: options.headers, + rows: options.rows, + csvConfig: { + delimiter, + decimalSeparator, + hasHeader: true, + }, + measurementName: options.measurementName, + timestampColumn, + columnOverrides: options.columnOverrides, + }); + } + public static importJsonArrayFixture( options: JsonArrayFixtureImportOptions, ): Cypress.Chainable<CsvImportResult> { @@ -152,63 +183,66 @@ export class DataLakeSeedUtils { timestampColumn: string; columnOverrides?: Record<string, ColumnOverride>; }): Cypress.Chainable<CsvImportResult> { - const token = window.localStorage.getItem('auth-token'); const target = { mode: 'NEW' as CsvImportTargetMode, measurementName: options.measurementName, }; - return cy - .request<CsvImportPreviewResult>({ - method: 'POST', - url: '/streampipes-backend/api/v4/datalake/import/preview', - body: { - csvConfig: options.csvConfig, - headers: options.headers, - rows: options.rows, - target, - }, - headers: { - Authorization: `Bearer ${token}`, - }, - }) - .then(previewResponse => { - const columns = this.buildColumns( - previewResponse.body.columns, - options.timestampColumn, - options.columnOverrides ?? {}, - ); - - const request: ImportRequest = { - csvConfig: options.csvConfig, - headers: options.headers, - rows: options.rows, - target, - timestampColumn: options.timestampColumn, - columns, - }; + return cy.window().then(win => { + const token = win.localStorage.getItem('auth-token'); - return cy - .request<CsvImportResult>({ - method: 'POST', - url: '/streampipes-backend/api/v4/datalake/import', - body: request, - headers: { - Authorization: `Bearer ${token}`, - }, - }) - .then(importResponse => { - expect( - importResponse.body.validationMessages, - 'import validation messages', - ).to.have.length(0); - expect( - importResponse.body.importedRowCount, - 'imported row count', - ).to.equal(options.rows.length); - return importResponse.body; - }); - }); + return cy + .request<CsvImportPreviewResult>({ + method: 'POST', + url: '/streampipes-backend/api/v4/datalake/import/preview', + body: { + csvConfig: options.csvConfig, + headers: options.headers, + rows: options.rows, + target, + }, + headers: { + Authorization: `Bearer ${token}`, + }, + }) + .then(previewResponse => { + const columns = this.buildColumns( + previewResponse.body.columns, + options.timestampColumn, + options.columnOverrides ?? {}, + ); + + const request: ImportRequest = { + csvConfig: options.csvConfig, + headers: options.headers, + rows: options.rows, + target, + timestampColumn: options.timestampColumn, + columns, + }; + + return cy + .request<CsvImportResult>({ + method: 'POST', + url: '/streampipes-backend/api/v4/datalake/import', + body: request, + headers: { + Authorization: `Bearer ${token}`, + }, + }) + .then(importResponse => { + expect( + importResponse.body.validationMessages, + 'import validation messages', + ).to.have.length(0); + expect( + importResponse.body.importedRowCount, + 'imported row count', + ).to.equal(options.rows.length); + return importResponse.body; + }); + }); + }); } private static buildColumns( diff --git a/ui/cypress/support/utils/dataset/DatasetUtils.ts b/ui/cypress/support/utils/dataset/DatasetUtils.ts index f9513e29ce..5e83ceba21 100644 --- a/ui/cypress/support/utils/dataset/DatasetUtils.ts +++ b/ui/cypress/support/utils/dataset/DatasetUtils.ts @@ -149,4 +149,21 @@ export class DatasetUtils { expect(text).to.contain(expectedCount); }); } + + public static openDatasetPreview(datasetName: string) { + DatasetBtns.datasetRow(datasetName) + .find('mat-icon') + .contains('preview') + .parent('button') + .click(); + } + + public static expectDatasetPreviewDoesNotContainKey(key: string) { + cy.dataCy('dataset-preview-table', { timeout: 10000 }).should( + 'be.visible', + ); + cy.dataCy(`dataset-preview-key-${key.toLowerCase()}`, { + timeout: 10000, + }).should('not.exist'); + } } diff --git a/ui/cypress/tests/chart/filterNumericalStringProperties.spec.ts b/ui/cypress/tests/chart/filterNumericalStringProperties.spec.ts index f09f65a856..44f7588d25 100644 --- a/ui/cypress/tests/chart/filterNumericalStringProperties.spec.ts +++ b/ui/cypress/tests/chart/filterNumericalStringProperties.spec.ts @@ -32,7 +32,7 @@ describe('Validate that filter works for numerical dimension property', () => { timestampColumn: 'timestamp', columnOverrides: { dimensionKey: { - runtimeType: 'LONG', + runtimeType: 'FLOAT', propertyScope: 'DIMENSION_PROPERTY', }, }, @@ -59,7 +59,7 @@ describe('Validate that filter works for numerical dimension property', () => { // select filter for tag ChartUtils.selectDataConfig(); - var filterConfig = new DataLakeFilterConfig('dimensionKey', '1', '='); + var filterConfig = new DataLakeFilterConfig('dimensionKey', '1.0', '='); ChartUtils.dataConfigAddFilter(filterConfig); // validate data in table is filtered diff --git a/ui/cypress/tests/chart/missingDataInDataLake.spec.ts b/ui/cypress/tests/chart/missingDataInDataLake.spec.ts index 4d0ab1e156..9e37fe4bdc 100644 --- a/ui/cypress/tests/chart/missingDataInDataLake.spec.ts +++ b/ui/cypress/tests/chart/missingDataInDataLake.spec.ts @@ -19,16 +19,28 @@ import { ChartUtils } from '../../support/utils/chart/ChartUtils'; import { PrepareTestDataUtils } from '../../support/utils/PrepareTestDataUtils'; import { ChartWidgetTableUtils } from '../../support/utils/chart/ChartWidgetTableUtils'; +import { DataLakeSeedUtils } from '../../support/utils/dataset/DataLakeSeedUtils'; describe('Test missing properties in data lake', () => { const dataViewName = 'TestView'; + const headers = ['timestamp', 'v1', 'v2', 'v3', 'v4']; + const rows = [ + ['1667904471000', '4.1', 'abc', 'true', '1'], + ['1667904472000', '4.2', 'abc', 'false', '2'], + ['1667904473000', '4.3', '', '', ''], + ['1667904474000', '4.4', 'abc', 'true', '4'], + ['1667904475000', '4.5', '', '', '5'], + ]; before('Setup Test', () => { cy.initStreamPipesTest(); - PrepareTestDataUtils.loadDataIntoDataLake( - 'datalake/missingData.json', - 'json_array', - ); + DataLakeSeedUtils.importCsvData({ + headers, + rows, + measurementName: PrepareTestDataUtils.dataName, + delimiter: ';', + timestampColumn: 'timestamp', + }); }); it('Test table with missing properties', () => { diff --git a/ui/cypress/tests/dataset/csvImport.spec.ts b/ui/cypress/tests/dataset/csvImport.spec.ts index f7ff816887..1bc2984c70 100644 --- a/ui/cypress/tests/dataset/csvImport.spec.ts +++ b/ui/cypress/tests/dataset/csvImport.spec.ts @@ -39,6 +39,8 @@ describe('CSV import happy path', () => { DatasetUtils.selectCsvImportTimestampColumn(0); DatasetUtils.uploadCsvImport(); DatasetUtils.expectDatasetTotalEventCount(datasetName, '7'); + DatasetUtils.openDatasetPreview(datasetName); + DatasetUtils.expectDatasetPreviewDoesNotContainKey('Timestamp'); }); it('Uploads a CSV file with string timestamps and transforms them during import', () => { diff --git a/ui/cypress/tests/pipeline/pipelineMultiSelect.spec.ts b/ui/cypress/tests/pipeline/pipelineMultiSelect.spec.ts index 5caf26ac11..71bbee9af9 100644 --- a/ui/cypress/tests/pipeline/pipelineMultiSelect.spec.ts +++ b/ui/cypress/tests/pipeline/pipelineMultiSelect.spec.ts @@ -58,8 +58,8 @@ describe('Pipeline Overview Multi Select', () => { PipelineBtns.selectionToolbar().should('be.visible'); PipelineBtns.rowCheckbox().should('have.length', 2); - PipelineBtns.multiActionExecute().should('be.disabled'); PipelineBtns.selectNone().should('be.disabled'); + PipelineBtns.multiActionExecute().should('not.exist'); PipelineBtns.rowCheckboxInput(0).check({ force: true }); PipelineBtns.selectNone().should('not.be.disabled'); @@ -80,7 +80,7 @@ describe('Pipeline Overview Multi Select', () => { PipelineBtns.selectNone().click(); PipelineBtns.rowCheckboxInput(0).should('not.be.checked'); PipelineBtns.rowCheckboxInput(1).should('not.be.checked'); - PipelineBtns.multiActionExecute().should('be.disabled'); + PipelineBtns.multiActionExecute().should('not.exist'); PipelineBtns.selectAllCheckboxInput().check({ force: true }); PipelineBtns.rowCheckboxInput(0).should('be.checked'); @@ -90,6 +90,6 @@ describe('Pipeline Overview Multi Select', () => { PipelineBtns.selectAllCheckboxInput().uncheck({ force: true }); PipelineBtns.rowCheckboxInput(0).should('not.be.checked'); PipelineBtns.rowCheckboxInput(1).should('not.be.checked'); - PipelineBtns.multiActionExecute().should('be.disabled'); + PipelineBtns.multiActionExecute().should('not.exist'); }); }); diff --git a/ui/cypress/tests/userManagement/testGroupManagement.spec.ts b/ui/cypress/tests/userManagement/testGroupManagement.spec.ts index cf9bd244f3..7873a3e317 100644 --- a/ui/cypress/tests/userManagement/testGroupManagement.spec.ts +++ b/ui/cypress/tests/userManagement/testGroupManagement.spec.ts @@ -24,7 +24,6 @@ import { PipelineUtils } from '../../support/utils/pipeline/PipelineUtils'; import { PipelineElementBuilder } from '../../support/builder/PipelineElementBuilder'; import { PipelineBuilder } from '../../support/builder/PipelineBuilder'; import { PermissionUtils } from '../../support/utils/user/PermissionUtils'; -import { NavigationUtils } from '../../support/utils/navigation/NavigationUtils'; import { ConfigurationBtns } from '../../support/utils/configuration/ConfigurationBtns'; import { UserBtns } from '../../support/utils/user/UserBtns'; import { SharedBtns } from '../../support/utils/shared/SharedBtns'; @@ -100,17 +99,11 @@ describe('Test Group Management for Pipelines', () => { // Login as first user which belongs to user group with pipeline admin role UserUtils.switchUser(user); - NavigationUtils.validateActiveModules([ - NavigationUtils.PIPELINES, - NavigationUtils.CONFIGURATION, - ]); - // Check if pipeline is visible PipelineUtils.checkAmountOfPipelinesPipeline(1); // Login as user2 UserUtils.switchUser(user2); - NavigationUtils.validateActiveModules([NavigationUtils.PIPELINES]); // Check if pipeline is invisible to user2 PipelineUtils.checkAmountOfPipelinesPipeline(0); diff --git a/ui/src/app/dataset/components/dataset-feature-card/dataset-feature-card.component.html b/ui/src/app/dataset/components/dataset-feature-card/dataset-feature-card.component.html index fba5358311..a461541aed 100644 --- a/ui/src/app/dataset/components/dataset-feature-card/dataset-feature-card.component.html +++ b/ui/src/app/dataset/components/dataset-feature-card/dataset-feature-card.component.html @@ -61,17 +61,33 @@ </div> <div class="dataset-preview-inner"> @if (dataPreview && dataPreview.total > 0) { - <div class="dataset-preview-table" fxLayout="column"> + <div + class="dataset-preview-table" + fxLayout="column" + data-cy="dataset-preview-table" + > @for ( header of dataPreview.headers; let i = $index; track $index ) { - <div class="dataset-preview-row"> - <span class="dataset-preview-key"> + <div + class="dataset-preview-row" + data-cy="dataset-preview-row" + > + <span + class="dataset-preview-key" + [attr.data-cy]=" + 'dataset-preview-key-' + + header.toLowerCase() + " + > {{ header }} </span> - <span class="dataset-preview-value"> + <span + class="dataset-preview-value" + data-cy="dataset-preview-value" + > {{ formatPreviewValue( header,
