update for memory leak
- fix memory leak in MeasurementRepository - add logging in MeasurementService - update catalog version
This commit is contained in:
@@ -11,27 +11,25 @@ plugins {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation(hlaeja.com.influxdb.client.kotlin)
|
implementation(hlaeja.influxdb.client.kotlin)
|
||||||
|
implementation(hlaeja.kotlin.logging)
|
||||||
implementation(hlaeja.kotlin.reflect)
|
implementation(hlaeja.kotlin.reflect)
|
||||||
implementation(hlaeja.kotlinx.coroutines)
|
implementation(hlaeja.kotlinx.coroutines)
|
||||||
implementation(hlaeja.ltd.hlaeja.library.common.messages)
|
implementation(hlaeja.library.hlaeja.common.messages)
|
||||||
implementation(hlaeja.org.springframework.springboot.actuator.starter)
|
implementation(hlaeja.springboot.starter.actuator)
|
||||||
implementation(hlaeja.org.springframework.springboot.webflux.starter)
|
implementation(hlaeja.springboot.starter.webflux)
|
||||||
|
|
||||||
testImplementation(hlaeja.io.mockk)
|
testImplementation(hlaeja.mockk)
|
||||||
testImplementation(hlaeja.io.projectreactor.reactor.test)
|
testImplementation(hlaeja.projectreactor.reactor.test)
|
||||||
testImplementation(hlaeja.kotlin.test.junit5)
|
testImplementation(hlaeja.kotlin.test.junit5)
|
||||||
testImplementation(hlaeja.kotlinx.coroutines.test)
|
testImplementation(hlaeja.kotlinx.coroutines.test)
|
||||||
testImplementation(hlaeja.org.springframework.springboot.test.starter)
|
testImplementation(hlaeja.springboot.starter.test)
|
||||||
|
|
||||||
testRuntimeOnly(hlaeja.org.junit.platform.launcher)
|
testRuntimeOnly(hlaeja.junit.platform.launcher)
|
||||||
}
|
}
|
||||||
|
|
||||||
group = "ltd.hlaeja"
|
group = "ltd.hlaeja"
|
||||||
|
|
||||||
tasks {
|
|
||||||
}
|
|
||||||
|
|
||||||
fun influxDbToken(): String = if (extra.has("influxdb.token")) {
|
fun influxDbToken(): String = if (extra.has("influxdb.token")) {
|
||||||
extra["influxdb.token"] as String
|
extra["influxdb.token"] as String
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
kotlin.code.style=official
|
kotlin.code.style=official
|
||||||
version=0.1.1-SNAPSHOT
|
version=0.1.1-SNAPSHOT
|
||||||
catalog=0.3.0
|
catalog=0.6.0
|
||||||
container.port.host=9020
|
container.port.host=9020
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ class MeasurementRepository(
|
|||||||
private val properties: InfluxDbProperties,
|
private val properties: InfluxDbProperties,
|
||||||
) {
|
) {
|
||||||
|
|
||||||
|
private val writeApi = influxDBClient.makeWriteApi()
|
||||||
|
|
||||||
companion object {
|
companion object {
|
||||||
const val BY_NODE_QUERY: String = """
|
const val BY_NODE_QUERY: String = """
|
||||||
from(bucket: "%s")
|
from(bucket: "%s")
|
||||||
@@ -29,9 +31,7 @@ class MeasurementRepository(
|
|||||||
|
|
||||||
suspend fun save(
|
suspend fun save(
|
||||||
point: Point,
|
point: Point,
|
||||||
) = withContext(Dispatchers.IO) {
|
) = withContext(Dispatchers.IO) { writeApi.writePoint(point) }
|
||||||
influxDBClient.makeWriteApi().writePoint(point)
|
|
||||||
}
|
|
||||||
|
|
||||||
suspend fun getByNode(
|
suspend fun getByNode(
|
||||||
client: UUID,
|
client: UUID,
|
||||||
|
|||||||
@@ -4,10 +4,13 @@ import com.influxdb.client.write.Point
|
|||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
import ltd.hlaeja.library.deviceData.MeasurementData
|
import ltd.hlaeja.library.deviceData.MeasurementData
|
||||||
import ltd.hlaeja.repository.MeasurementRepository
|
import ltd.hlaeja.repository.MeasurementRepository
|
||||||
|
import mu.KotlinLogging
|
||||||
import org.springframework.http.HttpStatus.NOT_FOUND
|
import org.springframework.http.HttpStatus.NOT_FOUND
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import org.springframework.web.server.ResponseStatusException
|
import org.springframework.web.server.ResponseStatusException
|
||||||
|
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class MeasurementService(
|
class MeasurementService(
|
||||||
private val repository: MeasurementRepository,
|
private val repository: MeasurementRepository,
|
||||||
@@ -19,7 +22,10 @@ class MeasurementService(
|
|||||||
): MeasurementData.Response {
|
): MeasurementData.Response {
|
||||||
val result = repository.getByNode(client, node)
|
val result = repository.getByNode(client, node)
|
||||||
if (result.isEmpty()) {
|
if (result.isEmpty()) {
|
||||||
throw ResponseStatusException(NOT_FOUND, "No data for client: $client, device: $node")
|
"No data for client: $client, node: $node".also {
|
||||||
|
log.warn { it }
|
||||||
|
throw ResponseStatusException(NOT_FOUND, it)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val latestData = mutableMapOf<String, Number>()
|
val latestData = mutableMapOf<String, Number>()
|
||||||
result.forEach { table ->
|
result.forEach { table ->
|
||||||
@@ -27,6 +33,7 @@ class MeasurementService(
|
|||||||
latestData[record.getValueByKey("_field") as String] = record.value as Number
|
latestData[record.getValueByKey("_field") as String] = record.value as Number
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.info { "Load data for client $client" }
|
||||||
return MeasurementData.Response(latestData)
|
return MeasurementData.Response(latestData)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,7 +45,10 @@ class MeasurementService(
|
|||||||
addTags(request.tags, point)
|
addTags(request.tags, point)
|
||||||
addFields(request.fields, point)
|
addFields(request.fields, point)
|
||||||
}
|
}
|
||||||
.let { point -> repository.save(point) }
|
.let { point ->
|
||||||
|
repository.save(point)
|
||||||
|
log.debug { "Save data for client $client" }
|
||||||
|
}
|
||||||
|
|
||||||
private suspend fun addFields(
|
private suspend fun addFields(
|
||||||
measurements: Map<String, Number>,
|
measurements: Map<String, Number>,
|
||||||
|
|||||||
@@ -19,16 +19,15 @@ import org.junit.jupiter.api.Assertions.assertEquals
|
|||||||
import org.junit.jupiter.api.BeforeEach
|
import org.junit.jupiter.api.BeforeEach
|
||||||
|
|
||||||
class MeasurementRepositoryTest {
|
class MeasurementRepositoryTest {
|
||||||
|
|
||||||
private val client: InfluxDBClient = mockk()
|
private val client: InfluxDBClient = mockk()
|
||||||
private val properties: InfluxDbProperties = mockk()
|
private val properties: InfluxDbProperties = mockk()
|
||||||
private val writeApi: WriteApi = mockk()
|
private val writeApi: WriteApi = mockk()
|
||||||
private val queryApi: QueryApi = mockk()
|
private val queryApi: QueryApi = mockk()
|
||||||
|
|
||||||
private lateinit var repository: MeasurementRepository
|
private lateinit var repository: MeasurementRepository
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
fun setUp() {
|
fun setUp() {
|
||||||
|
every { client.makeWriteApi() } returns writeApi
|
||||||
repository = MeasurementRepository(client, properties)
|
repository = MeasurementRepository(client, properties)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,8 +35,6 @@ class MeasurementRepositoryTest {
|
|||||||
fun `save point to influxdb`() = runTest {
|
fun `save point to influxdb`() = runTest {
|
||||||
// given
|
// given
|
||||||
val point = Point.measurement("test").addField("value", 12.3)
|
val point = Point.measurement("test").addField("value", 12.3)
|
||||||
|
|
||||||
every { client.makeWriteApi() } returns writeApi
|
|
||||||
every { writeApi.writePoint(any()) } just Runs
|
every { writeApi.writePoint(any()) } just Runs
|
||||||
|
|
||||||
// when
|
// when
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ class MeasurementServiceTest {
|
|||||||
// then
|
// then
|
||||||
assertEquals(NOT_FOUND, exception.statusCode)
|
assertEquals(NOT_FOUND, exception.statusCode)
|
||||||
assertEquals(
|
assertEquals(
|
||||||
"No data for client: 00000000-0000-0000-0000-000000000000, device: 00000000-0000-0000-0000-000000000000",
|
"No data for client: 00000000-0000-0000-0000-000000000000, node: 00000000-0000-0000-0000-000000000000",
|
||||||
exception.reason,
|
exception.reason,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user