我是靠谱客的博主 简单冷风,这篇文章主要介绍SDP(12): MongoDB-Engine - Streaming,现在分享给大家,希望可以做个参考。

   在akka-alpakka工具包里也提供了对MongoDB的stream-connector,能针对MongoDB数据库进行streaming操作。这个MongoDB-connector里包含了MongoSource,MongoFlow,MongoSink。我们只使用MongoSource,其它两个我们直接用mapAsyc来创造。下面是MongoSource的定义:

复制代码
1
2
3
4
5
6
object MongoSource { def apply(query: Observable[Document]): Source[Document, NotUsed] = Source.fromPublisher(ObservableToPublisher(query)) }

实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

复制代码
1
2
3
4
5
6
7
8
case class MGOContext( dbName: String, collName: String, action: MGOCommands = null ) {...} case class DocumentStream(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, ) extends MGOCommands

Source的具体实现:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def mongoStream(ctx: MGOContext)( implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = { val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) ctx.action match { case DocumentStream(None, None) => MongoSource(coll.find()) case DocumentStream(Some(filter), None) => MongoSource(coll.find(filter)) case DocumentStream(None, Some(next)) => MongoSource(next(coll.find())) case DocumentStream(Some(filter), Some(next)) => MongoSource(next(coll.find(filter))) } }

下面是mongoStream的使用示范:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
val clusterSettings = ClusterSettings.builder() .hosts(List(new ServerAddress("localhost:27017")).asJava).build() val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() implicit val client = MongoClient(clientSettings) implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = system.dispatcher case class PO ( ponum: String, podate: MGODate, vendor: String, remarks: Option[String], podtl: Option[MGOArray] ) def toPO(doc: Document): PO = { PO( ponum = doc.getString("ponum"), podate = doc.getDate("podate"), vendor = doc.getString("vendor"), remarks = mgoGetStringOrNone(doc,"remarks"), podtl = mgoGetArrayOrNone(doc,"podtl") ) } case class PODTL( item: String, price: Double, qty: Int, packing: Option[String], payTerm: Option[String] ) def toPODTL(podtl: Document): PODTL = { PODTL( item = podtl.getString("item"), price = podtl.getDouble("price"), qty = podtl.getInteger("qty"), packing = mgoGetStringOrNone(podtl,"packing"), payTerm = mgoGetStringOrNone(podtl,"payterm") ) } def showPO(po: PO) = { println(s"po number: ${po.ponum}") println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") println(s"vendor: ${po.vendor}") if (po.remarks != None) println(s"remarks: ${po.remarks.get}") po.podtl match { case Some(barr) => mgoArrayToDocumentList(barr) .map { dc => toPODTL(dc)} .foreach { doc: PODTL => print(s"==>Item: ${doc.item} ") print(s"price: ${doc.price} ") print(s"qty: ${doc.qty} ") doc.packing.foreach(pk => print(s"packing: ${pk} ")) doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) println("") } case _ => } } import org.mongodb.scala.model.Projections._ import MongoActionStream._ import MGOEngine._ import akka.stream.scaladsl.{Sink, Source} val proj: MGOFilterResult = find => find.projection(exclude("handler","_id")) val ctx = MGOContext("testdb","po").setCommand( DocumentStream(filter = None, andThen = Some(proj))) val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))))

我们看到:使用了许多代码去进行类型转换。不过也没有什么太好的办法,已经是一次性的了。我们也可以通过akka的Flow[A,B]来以stream里的A元素为变量对MongoDB数据进行更新操作:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
object MongoActionStream { import MGOContext._ case class StreamingInsert[A](dbName: String, collName: String, converter: A => Document, parallelism: Int = 1 ) extends MGOCommands case class StreamingDelete[A](dbName: String, collName: String, toFilter: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class StreamingUpdate[A](dbName: String, collName: String, toFilter: A => Bson, toUpdate: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class InsertAction[A](ctx: StreamingInsert[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] = Flow[A].map(ctx.converter) .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc)) } case class UpdateAction[A](ctx: StreamingUpdate[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } case class DeleteAction[A](ctx: StreamingDelete[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a)) } }

下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import jdbcengine._ import JDBCEngine._ import scalikejdbc._ case class DataRow ( rowid: Long, measureid: Long, state: String, county: String, year: Int, value: Int ) val toRow: WrappedResultSet => DataRow = rs => DataRow( rowid = rs.long("ROWID"), measureid = rs.long("MEASUREID"), state = rs.string("STATENAME"), county = rs.string("COUNTYNAME"), year = rs.int("REPORTYEAR"), value = rs.int("VALUE") ) //construct the context val h2ctx = JDBCQueryContext[DataRow]( dbName = 'h2, statement = "select * from AQMRPT", extractor = toRow ) //source from h2 database val jdbcSource = jdbcAkkaStream(h2ctx) //document converter def rowToDoc: DataRow => Document = row => Document ( "rowid" -> row.rowid, "measureid" -> row.measureid, "state" -> row.state, "county" -> row.county, "year" -> row.year, "value" -> row.value ) def docToRow: Document => DataRow = doc => DataRow ( rowid = doc.getLong("rowid"), measureid = doc.getLong("measureid"), state = doc.getString("state"), county = doc.getString("county"), year = doc.getInteger("year"), value = doc.getInteger("value") ) //setup context val mgoctx = StreamingInsert("testdb","members",rowToDoc) val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx) val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow) val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //config jdbc drivers ConfigDBsWithEnv("dev").setup('h2) ConfigDBsWithEnv("dev").loadGlobalSettings() val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run() val mgoCtxShow = MGOContext("testdb","members").setCommand( DocumentStream(filter = None)) mongoStream(mgoCtxShow).map(docToRow).to(sink).run() import com.datastax.driver.core._ import cassandraengine._ import CQLEngine._ import org.mongodb.scala.model.Filters._ //data row converter val cqlToDataRow = (rs: Row) => DataRow( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), state = rs.getString("STATENAME"), county = rs.getString("COUNTYNAME"), year = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE") ) import org.bson.conversions._ import org.mongodb.scala.model.Updates._ //#init-session implicit val session = Cluster.builder .addContactPoint("127.0.0.1") .withPort(9042) .build .connect() //setup context val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow) //construct source val cqlSource = cassandraStream(cqlCtx) def toFilter: DataRow => Bson = row => { and(equal("rowid",row.rowid), lt("value",10)) } def toUpdate: DataRow => Bson = row => { set("value" , row.value * 10) } val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate) val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate) val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() import org.bson.conversions._ import org.mongodb.scala.model.Filters._ def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10)) val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter) val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel) val mgoCtxSrc = MGOContext("testdb","members").setCommand( DocumentStream(filter = None)) mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() import org.mongodb.scala.model.Sorts._ val sortDsc: MGOFilterResult = find => find.sort(descending("rowid")) val mgoCtxShow = MGOContext("testdb","members").setCommand( DocumentStream(filter = None, andThen = Some(sortDsc))) mongoStream(mgoCtxShow).map(docToRow).to(sink).run()

下面是本次示范的全部源代码:

build.sbt

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
name := "learn-mongo" version := "0.1" scalaVersion := "2.12.4" libraryDependencies := Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0", "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1", "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17", "com.typesafe.akka" %% "akka-actor" % "2.5.4", "com.typesafe.akka" %% "akka-stream" % "2.5.4", "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16", "org.scalikejdbc" %% "scalikejdbc" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-test" % "3.2.1" % "test", "org.scalikejdbc" %% "scalikejdbc-config" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1", "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1", "com.h2database" % "h2" % "1.4.196", "mysql" % "mysql-connector-java" % "6.0.6", "org.postgresql" % "postgresql" % "42.2.0", "commons-dbcp" % "commons-dbcp" % "1.4", "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2", "com.zaxxer" % "HikariCP" % "2.7.4", "com.jolbox" % "bonecp" % "0.8.0.RELEASE", "com.typesafe.slick" %% "slick" % "3.2.1", "ch.qos.logback" % "logback-classic" % "1.2.3" )

resources/application.conf

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# JDBC settings test { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "commons-dbcp2" } } db.mysql.driver = "com.mysql.cj.jdbc.Driver" db.mysql.url = "jdbc:mysql://localhost:3306/testdb" db.mysql.user = "root" db.mysql.password = "123" db.mysql.poolInitialSize = 5 db.mysql.poolMaxSize = 7 db.mysql.poolConnectionTimeoutMillis = 1000 db.mysql.poolValidationQuery = "select 1 as one" db.mysql.poolFactoryName = "bonecp" # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 } dev { db { h2 { driver = "org.h2.Driver" url = "jdbc:h2:tcp://localhost/~/slickdemo" user = "" password = "" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } mysql { driver = "com.mysql.cj.jdbc.Driver" url = "jdbc:mysql://localhost:3306/testdb" user = "root" password = "123" poolInitialSize = 5 poolMaxSize = 7 poolConnectionTimeoutMillis = 1000 poolValidationQuery = "select 1 as one" poolFactoryName = "bonecp" } postgres { driver = "org.postgresql.Driver" url = "jdbc:postgresql://localhost:5432/testdb" user = "root" password = "123" poolFactoryName = "hikaricp" numThreads = 10 maxConnections = 12 minConnections = 4 keepAliveConnection = true } } # scallikejdbc Global settings scalikejdbc.global.loggingSQLAndTime.enabled = true scalikejdbc.global.loggingSQLAndTime.logLevel = info scalikejdbc.global.loggingSQLAndTime.warningEnabled = true scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000 scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn scalikejdbc.global.loggingSQLAndTime.singleLineMode = false scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10 }

JDBCEngine.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package jdbcengine import java.sql.PreparedStatement import scala.collection.generic.CanBuildFrom import akka.stream.scaladsl._ import scalikejdbc._ import scalikejdbc.streams._ import akka.NotUsed import akka.stream._ import scala.util._ import java.time._ import scala.concurrent.duration._ import filestreaming.FileStreaming._ import scalikejdbc.TxBoundary.Try._ import scala.concurrent.ExecutionContextExecutor import java.io.InputStream object JDBCContext { type SQLTYPE = Int val SQL_EXEDDL= 1 val SQL_UPDATE = 2 val RETURN_GENERATED_KEYVALUE = true val RETURN_UPDATED_COUNT = false } case class JDBCQueryContext[M]( dbName: Symbol, statement: String, parameters: Seq[Any] = Nil, fetchSize: Int = 100, autoCommit: Boolean = false, queryTimeout: Option[Int] = None, extractor: WrappedResultSet => M) case class JDBCContext( dbName: Symbol, statements: Seq[String] = Nil, parameters: Seq[Seq[Any]] = Nil, fetchSize: Int = 100, queryTimeout: Option[Int] = None, queryTags: Seq[String] = Nil, sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE, batch: Boolean = false, returnGeneratedKey: Seq[Option[Any]] = Nil, // no return: None, return by index: Some(1), by name: Some("id") preAction: Option[PreparedStatement => Unit] = None, postAction: Option[PreparedStatement => Unit] = None) { ctx => //helper functions def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag) def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags) def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size) def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time) def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) ctx.copy(preAction = action) else throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch && ctx.statements.size == 1) ctx.copy(postAction = action) else throw new IllegalStateException("JDBCContex setting error: preAction not supported!") } def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_EXEDDL) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(Seq(_parameters)) ) } else throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) { ctx.copy( statements = ctx.statements ++ Seq(_statement), parameters = ctx.parameters ++ Seq(_parameters), returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None)) ) } else throw new IllegalStateException("JDBCContex setting error: option not supported!") } def appendBatchParameters(_parameters: Any*): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!") var matchParams = true if (ctx.parameters != Nil) if (ctx.parameters.head.size != _parameters.size) matchParams = false if (matchParams) { ctx.copy( parameters = ctx.parameters ++ Seq(_parameters) ) } else throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!") } def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = { if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!") ctx.copy( returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil ) } def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), sqlType = JDBCContext.SQL_EXEDDL, batch = false ) } def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = { ctx.copy( statements = Seq(_statement), parameters = Seq(_parameters), returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None), sqlType = JDBCContext.SQL_UPDATE, batch = false ) } def setBatchCommand(_statement: String): JDBCContext = { ctx.copy ( statements = Seq(_statement), sqlType = JDBCContext.SQL_UPDATE, batch = true ) } type JDBCDate = LocalDate type JDBCDateTime = LocalDateTime def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd) def jdbcSetNow = LocalDateTime.now() type JDBCBlob = InputStream def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToInputStream(fileName,timeOut) def jdbcBlobToFile(blob: JDBCBlob, fileName: String)( implicit mat: Materializer) = InputStreamToFile(blob,fileName) } object JDBCEngine { import JDBCContext._ private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) => throw new IllegalStateException(message) } def jdbcAkkaStream[A](ctx: JDBCQueryContext[A]) (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = { val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.iterator .withDBSessionForceAdjuster(session => { session.connection.setAutoCommit(ctx.autoCommit) session.fetchSize(ctx.fetchSize) }) } Source.fromPublisher[A](publisher) } def jdbcQueryResult[C[_] <: TraversableOnce[_], A]( ctx: JDBCQueryContext[A])( implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = { val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor("")) ctx.queryTimeout.foreach(rawSql.queryTimeout(_)) rawSql.fetchSize(ctx.fetchSize) implicit val session = NamedAutoSession(ctx.dbName) val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor) sql.collection.apply[C]() } def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = { if (ctx.sqlType != SQL_EXEDDL) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")) } else { NamedDB(ctx.dbName) localTx { implicit session => Try { ctx.statements.foreach { stm => val ddl = new SQLExecution(statement = stm, parameters = Nil)( before = WrappedResultSet => {})( after = WrappedResultSet => {}) ddl.apply() } "SQL_EXEDDL executed succesfully." } } } } def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (ctx.batch) { if (noReturnKey(ctx)) { val usql = SQL(ctx.statements.head) .tags(ctx.queryTags: _*) .batch(ctx.parameters: _*) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[Seq]() Seq.empty[Long].to[C] } } } else { val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None) Try { NamedDB(ctx.dbName) localTx { implicit session => ctx.queryTimeout.foreach(session.queryTimeout(_)) usql.apply[C]() } } } } else { Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !")) } } } private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val Some(key) :: xs = ctx.returnGeneratedKey val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key) Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result).to[C] } } } private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { val params: Seq[Any] = ctx.parameters match { case Nil => Nil case p@_ => p.head } val before = ctx.preAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val after = ctx.postAction match { case None => pstm: PreparedStatement => {} case Some(f) => f } val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after) Try { NamedDB(ctx.dbName) localTx {implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val result = usql.apply() Seq(result.toLong).to[C] } } } private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (noReturnKey(ctx)) singleTxUpdateNoReturnKey(ctx) else singleTxUpdateWithReturnKey(ctx) } private def noReturnKey(ctx: JDBCContext): Boolean = { if (ctx.returnGeneratedKey != Nil) { val k :: xs = ctx.returnGeneratedKey k match { case None => true case Some(k) => false } } else true } def noActon: PreparedStatement=>Unit = pstm => {} def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { Try { NamedDB(ctx.dbName) localTx { implicit session => session.fetchSize(ctx.fetchSize) ctx.queryTimeout.foreach(session.queryTimeout(_)) val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match { case Nil => Seq.fill(ctx.statements.size)(None) case k@_ => k } val sqlcmd = ctx.statements zip ctx.parameters zip keys val results = sqlcmd.map { case ((stm, param), key) => key match { case None => new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong case Some(k) => new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong } } results.to[C] } } } def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)( implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = { if (ctx.statements == Nil) throw new IllegalStateException("JDBCContex setting error: statements empty!") if (ctx.sqlType != SQL_UPDATE) { Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")) } else { if (!ctx.batch) { if (ctx.statements.size == 1) singleTxUpdate(ctx) else multiTxUpdates(ctx) } else Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !")) } } case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Any]) { jas => def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db) def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel) def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered) private def perform(r: R) = { import scala.concurrent._ val params = prepareParams(r) NamedDB(dbName) autoCommit { session => session.execute(statement,params: _*) } Future.successful(r) } def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object JDBCActionStream { def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] = new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params) } }

CassandraEngine.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package cassandraengine import com.datastax.driver.core._ import scala.concurrent._ import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} import scala.collection.JavaConverters._ import scala.collection.generic.CanBuildFrom import scala.concurrent.duration.Duration import akka.NotUsed import akka.stream.alpakka.cassandra.scaladsl._ import akka.stream.scaladsl._ import filestreaming.FileStreaming._ object CQLContext { // Consistency Levels type CONSISTENCY_LEVEL = Int val ANY: CONSISTENCY_LEVEL = 0x0000 val ONE: CONSISTENCY_LEVEL = 0x0001 val TWO: CONSISTENCY_LEVEL = 0x0002 val THREE: CONSISTENCY_LEVEL = 0x0003 val QUORUM : CONSISTENCY_LEVEL = 0x0004 val ALL: CONSISTENCY_LEVEL = 0x0005 val LOCAL_QUORUM: CONSISTENCY_LEVEL = 0x0006 val EACH_QUORUM: CONSISTENCY_LEVEL = 0x0007 val LOCAL_ONE: CONSISTENCY_LEVEL = 0x000A val LOCAL_SERIAL: CONSISTENCY_LEVEL = 0x000B val SERIAL: CONSISTENCY_LEVEL = 0x000C def apply(): CQLContext = CQLContext(statements = Nil) def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => { consistency match { case ALL => ConsistencyLevel.ALL case ONE => ConsistencyLevel.ONE case TWO => ConsistencyLevel.TWO case THREE => ConsistencyLevel.THREE case ANY => ConsistencyLevel.ANY case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE case QUORUM => ConsistencyLevel.QUORUM case SERIAL => ConsistencyLevel.SERIAL case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL } } } case class CQLQueryContext[M]( statement: String, extractor: Row => M, parameter: Seq[Object] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None, fetchSize: Int = 100 ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] = ctx.copy(consistency = Some(_consistency)) def setFetchSize(pageSize: Int): CQLQueryContext[M] = ctx.copy(fetchSize = pageSize) } object CQLQueryContext { def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] = new CQLQueryContext[M](statement = stmt, extractor = converter) } case class CQLContext( statements: Seq[String], parameters: Seq[Seq[Object]] = Nil, consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None ) { ctx => def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext = ctx.copy(consistency = Some(_consistency)) def setCommand(_statement: String, _parameters: Object*): CQLContext = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters)) def appendCommand(_statement: String, _parameters: Object*): CQLContext = ctx.copy(statements = ctx.statements :+ _statement, parameters = ctx.parameters ++ Seq(_parameters)) } object CQLEngine { import CQLContext._ import CQLHelpers._ def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)( implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() if (ctx.parameter != Nil) { val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} val resultSet = session.execute(boundStmt.setFetchSize(pageSize)) (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C]) } def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)( extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) = if (resultSet.isFullyFetched) { (resultSet, None) } else { try { val result = Await.result(resultSet.fetchMoreResults(), timeOut) (result, Some((result.asScala.view.map(extractor)).to[C])) } catch { case e: Throwable => (resultSet, None) } } def cqlExecute(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { if (ctx.statements.size == 1) cqlSingleUpdate(ctx) else cqlMultiUpdate(ctx) } def cqlSingleUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val prepStmt = session.prepare(ctx.statements.head) var boundStmt = prepStmt.bind() if (ctx.parameters != Nil) { val params = processParameters(ctx.parameters.head) boundStmt = prepStmt.bind(params:_*) } ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(boundStmt).map(_.wasApplied()) } def cqlMultiUpdate(ctx: CQLContext)( implicit session: Session, ec: ExecutionContext): Future[Boolean] = { val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters var batch = new BatchStatement() commands.foreach { case (stm, params) => val prepStmt = session.prepare(stm) if (params == Nil) batch.add(prepStmt.bind()) else { val p = processParameters(params) batch.add(prepStmt.bind(p: _*)) } } ctx.consistency.foreach {consistency => batch.setConsistencyLevel(consistencyLevel(consistency))} session.executeAsync(batch).map(_.wasApplied()) } def cassandraStream[A](ctx: CQLQueryContext[A]) (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = { val prepStmt = session.prepare(ctx.statement) var boundStmt = prepStmt.bind() val params = processParameters(ctx.parameter) boundStmt = prepStmt.bind(params:_*) ctx.consistency.foreach {consistency => boundStmt.setConsistencyLevel(consistencyLevel(consistency))} CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(ctx.extractor) } case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true, statement: String, prepareParams: R => Seq[Object], consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas => def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel) def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered) def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] = cas.copy(consistency = Some(_consistency)) private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = { val prepStmt = session.prepare(statement) var boundStmt = prepStmt.bind() val params = processParameters(prepareParams(r)) boundStmt = prepStmt.bind(params:_*) consistency.foreach { cons => boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons)) } session.executeAsync(boundStmt).map(_ => r) } def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = if (processInOrder) Flow[R].mapAsync(parallelism)(perform) else Flow[R].mapAsyncUnordered(parallelism)(perform) } object CassandraActionStream { def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] = new CassandraActionStream[R]( statement=_statement, prepareParams = params) } } object CQLHelpers { import java.nio.ByteBuffer import com.datastax.driver.core.LocalDate import java.time.Instant import akka.stream._ import scala.concurrent.duration._ implicit def listenableFutureToFuture[T]( listenableFuture: ListenableFuture[T]): Future[T] = { val promise = Promise[T]() Futures.addCallback(listenableFuture, new FutureCallback[T] { def onFailure(error: Throwable): Unit = { promise.failure(error) () } def onSuccess(result: T): Unit = { promise.success(result) () } }) promise.future } type CQLBlob = ByteBuffer case class CQLDate(year: Int, month: Int, day: Int) case object CQLTodayDate case class CQLDateTime(year: Int, Month: Int, day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0) case object CQLDateTimeNow def processParameters(params: Seq[Object]): Seq[Object] = { import java.time.{Clock,ZoneId} params.map { obj => obj match { case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd) case CQLTodayDate => val today = java.time.LocalDate.now() LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth) case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS))) case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) => Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d") case p@_ => p } } } def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut) def cqlBlobToFile(blob: CQLBlob, fileName: String)( implicit mat: Materializer) = ByteBufferToFile(blob,fileName) }

MongoEngine.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import java.text.SimpleDateFormat import akka.NotUsed import akka.stream.alpakka.mongodb.scaladsl._ import akka.stream.scaladsl.{Flow, Sink, Source} import org.mongodb.scala.MongoClient import org.mongodb.scala.bson.collection.immutable.Document import org.bson.conversions.Bson import org.mongodb.scala._ import org.mongodb.scala.model._ import java.util.Calendar import scala.collection.JavaConverters._ import filestreaming.FileStreaming._ import akka.stream.Materializer import org.mongodb.scala.bson.{BsonArray, BsonBinary} import scala.concurrent._ import scala.concurrent.duration._ object MGOContext { trait MGOCommands object MGOCommands { case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands /* org.mongodb.scala.FindObservable import com.mongodb.async.client.FindIterable val resultDocType = FindIterable[Document] val resultOption = FindObservable(resultDocType) .maxScan(...) .limit(...) .sort(...) .project(...) */ case class Find[M](filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]]= None, converter: Option[Document => M] = None, firstOnly: Boolean = false) extends MGOCommands case class DocumentStream(filter: Option[Bson] = None, andThen: Option[FindObservable[Document] => FindObservable[Document]] = None, ) extends MGOCommands case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands } object MGOAdmins { case class DropCollection(collName: String) extends MGOCommands case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands case class ListCollection(dbName: String) extends MGOCommands case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands } case class MGOContext( dbName: String, collName: String, action: MGOCommands = null ) { ctx => def setDbName(name: String): MGOContext = ctx.copy(dbName = name) def setCollName(name: String): MGOContext = ctx.copy(collName = name) def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd) } object MGOContext { def apply(db: String, coll: String) = new MGOContext(db, coll) def apply(db: String, coll: String, command: MGOCommands) = new MGOContext(db, coll, command) } type MGODate = java.util.Date def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = { val ca = Calendar.getInstance() ca.set(yyyy,mm,dd) ca.getTime() } def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = { val ca = Calendar.getInstance() ca.set(yyyy,mm,dd,hr,min,sec) ca.getTime() } def mgoDateTimeNow: MGODate = { val ca = Calendar.getInstance() ca.getTime } def mgoDateToString(dt: MGODate, formatString: String): String = { val fmt= new SimpleDateFormat(formatString) fmt.format(dt) } type MGOBlob = BsonBinary type MGOArray = BsonArray def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer) = FileToByteArray(fileName,timeOut) def mgoBlobToFile(blob: MGOBlob, fileName: String)( implicit mat: Materializer) = ByteArrayToFile(blob.getData,fileName) def mgoGetStringOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getString(fieldName)) else None } def mgoGetIntOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getInteger(fieldName)) else None } def mgoGetLonggOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getLong(fieldName)) else None } def mgoGetDoubleOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getDouble(fieldName)) else None } def mgoGetBoolOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getBoolean(fieldName)) else None } def mgoGetDateOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) Some(doc.getDate(fieldName)) else None } def mgoGetBlobOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) doc.get(fieldName).asInstanceOf[Option[MGOBlob]] else None } def mgoGetArrayOrNone(doc: Document, fieldName: String) = { if (doc.keySet.contains(fieldName)) doc.get(fieldName).asInstanceOf[Option[MGOArray]] else None } def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = { (arr.getValues.asScala.toList) .asInstanceOf[scala.collection.immutable.List[org.bson.BsonDocument]] } type MGOFilterResult = FindObservable[Document] => FindObservable[Document] } object MGOEngine { import MGOContext._ import MGOCommands._ import MGOAdmins._ def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = { val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) ctx.action match { /* count */ case Count(Some(filter), Some(opt)) => coll.count(filter, opt.asInstanceOf[CountOptions]) .toFuture().asInstanceOf[Future[T]] case Count(Some(filter), None) => coll.count(filter).toFuture() .asInstanceOf[Future[T]] case Count(None, None) => coll.count().toFuture() .asInstanceOf[Future[T]] /* distinct */ case Distict(field, Some(filter)) => coll.distinct(field, filter).toFuture() .asInstanceOf[Future[T]] case Distict(field, None) => coll.distinct((field)).toFuture() .asInstanceOf[Future[T]] /* find */ case Find(None, None, optConv, false) => if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]] else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(None, None, optConv, true) => if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]] else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]] case Find(Some(filter), None, optConv, false) => if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]] else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), None, optConv, true) => if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]] else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]] case Find(None, Some(next), optConv, _) => if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]] case Find(Some(filter), Some(next), optConv, _) => if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]] else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]] /* aggregate */ case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]] /* mapReduce */ case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]] /* insert */ case Insert(docs, Some(opt)) => if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture() .asInstanceOf[Future[T]] else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture() .asInstanceOf[Future[T]] case Insert(docs, None) => if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]] else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]] /* delete */ case Delete(filter, None, onlyOne) => if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]] else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]] case Delete(filter, Some(opt), onlyOne) => if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]] /* replace */ case Replace(filter, replacement, None) => coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]] case Replace(filter, replacement, Some(opt)) => coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] /* update */ case Update(filter, update, None, onlyOne) => if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]] else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]] case Update(filter, update, Some(opt), onlyOne) => if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]] /* bulkWrite */ case BulkWrite(commands, None) => coll.bulkWrite(commands).toFuture().asInstanceOf[Future[T]] case BulkWrite(commands, Some(opt)) => coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]] /* drop collection */ case DropCollection(collName) => val coll = db.getCollection(collName) coll.drop().toFuture().asInstanceOf[Future[T]] /* create collection */ case CreateCollection(collName, None) => db.createCollection(collName).toFuture().asInstanceOf[Future[T]] case CreateCollection(collName, Some(opt)) => db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]] /* list collection */ case ListCollection(dbName) => client.getDatabase(dbName).listCollections().toFuture().asInstanceOf[Future[T]] /* create view */ case CreateView(viewName, viewOn, pline, None) => db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]] case CreateView(viewName, viewOn, pline, Some(opt)) => db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]] /* create index */ case CreateIndex(key, None) => coll.createIndex(key).toFuture().asInstanceOf[Future[T]] case CreateIndex(key, Some(opt)) => coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]] /* drop index */ case DropIndexByName(indexName, None) => coll.dropIndex(indexName).toFuture().asInstanceOf[Future[T]] case DropIndexByName(indexName, Some(opt)) => coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] case DropIndexByKey(key, None) => coll.dropIndex(key).toFuture().asInstanceOf[Future[T]] case DropIndexByKey(key, Some(opt)) => coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] case DropAllIndexes(None) => coll.dropIndexes().toFuture().asInstanceOf[Future[T]] case DropAllIndexes(Some(opt)) => coll.dropIndexes(opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]] } } def mongoStream(ctx: MGOContext)( implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = { val db = client.getDatabase(ctx.dbName) val coll = db.getCollection(ctx.collName) ctx.action match { case DocumentStream(None, None) => MongoSource(coll.find()) case DocumentStream(Some(filter), None) => MongoSource(coll.find(filter)) case DocumentStream(None, Some(next)) => MongoSource(next(coll.find())) case DocumentStream(Some(filter), Some(next)) => MongoSource(next(coll.find(filter))) } } } object MongoActionStream { import MGOContext._ case class StreamingInsert[A](dbName: String, collName: String, converter: A => Document, parallelism: Int = 1 ) extends MGOCommands case class StreamingDelete[A](dbName: String, collName: String, toFilter: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class StreamingUpdate[A](dbName: String, collName: String, toFilter: A => Bson, toUpdate: A => Bson, parallelism: Int = 1, justOne: Boolean = false ) extends MGOCommands case class InsertAction[A](ctx: StreamingInsert[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] = Flow[A].map(ctx.converter) .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc)) } case class UpdateAction[A](ctx: StreamingUpdate[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a)) } case class DeleteAction[A](ctx: StreamingDelete[A])( implicit mongoClient: MongoClient) { val database = mongoClient.getDatabase(ctx.dbName) val collection = database.getCollection(ctx.collName) def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] = if (ctx.justOne) { Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a)) } else Flow[A] .mapAsync(ctx.parallelism)(a => collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a)) } }
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.mongodb.scala._ import scala.concurrent._ import scala.concurrent.duration._ object MGOHelpers { implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] { override val converter: (Document) => String = (doc) => doc.toJson } implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] { override val converter: (C) => String = (doc) => doc.toString } trait ImplicitObservable[C] { val observable: Observable[C] val converter: (C) => String def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds) def headResult() = Await.result(observable.head(), 10 seconds) def printResults(initial: String = ""): Unit = { if (initial.length > 0) print(initial) results().foreach(res => println(converter(res))) } def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}") } def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = { Await.result(fut,timeOut) } def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = { Await.result(fut,timeOut) } }

FileStreaming.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package filestreaming import java.io.{InputStream, ByteArrayInputStream} import java.nio.ByteBuffer import java.nio.file.Paths import akka.stream.{Materializer} import akka.stream.scaladsl.{FileIO, StreamConverters} import scala.concurrent.{Await} import akka.util._ import scala.concurrent.duration._ object FileStreaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }

FileStreaming.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package filestreaming import java.io.{InputStream, ByteArrayInputStream} import java.nio.ByteBuffer import java.nio.file.Paths import akka.stream.{Materializer} import akka.stream.scaladsl.{FileIO, StreamConverters} import scala.concurrent.{Await} import akka.util._ import scala.concurrent.duration._ object FileStreaming { def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer):ByteBuffer = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toByteBuffer } def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): Array[Byte] = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } (Await.result(fut, timeOut)).toArray } def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)( implicit mat: Materializer): InputStream = { val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) => hd ++ bs } val buf = (Await.result(fut, timeOut)).toArray new ByteArrayInputStream(buf) } def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)( implicit mat: Materializer) = { val ba = new Array[Byte](byteBuf.remaining()) byteBuf.get(ba,0,ba.length) val baInput = new ByteArrayInputStream(ba) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def ByteArrayToFile(bytes: Array[Byte], fileName: String)( implicit mat: Materializer) = { val bb = ByteBuffer.wrap(bytes) val baInput = new ByteArrayInputStream(bytes) val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes)) source.runWith(FileIO.toPath(Paths.get(fileName))) } def InputStreamToFile(is: InputStream, fileName: String)( implicit mat: Materializer) = { val source = StreamConverters.fromInputStream(() => is) source.runWith(FileIO.toPath(Paths.get(fileName))) } }

HikariCPool.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package jdbcengine import scala.collection.mutable import scala.concurrent.duration.Duration import scala.language.implicitConversions import com.typesafe.config._ import java.util.concurrent.TimeUnit import java.util.Properties import scalikejdbc.config._ import com.typesafe.config.Config import com.zaxxer.hikari._ import scalikejdbc.ConnectionPoolFactoryRepository /** Extension methods to make Typesafe Config easier to use */ class ConfigExtensionMethods(val c: Config) extends AnyVal { import scala.collection.JavaConverters._ def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default def getDurationOr(path: String, default: => Duration = Duration.Zero) = if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default def getPropertiesOr(path: String, default: => Properties = null): Properties = if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default def toProperties: Properties = { def toProps(m: mutable.Map[String, ConfigValue]): Properties = { val props = new Properties(null) m.foreach { case (k, cv) => val v = if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala) else if(cv.unwrapped eq null) null else cv.unwrapped.toString if(v ne null) props.put(k, v) } props } toProps(c.root.asScala) } def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None def getStringOpt(path: String) = Option(getStringOr(path)) def getPropertiesOpt(path: String) = Option(getPropertiesOr(path)) } object ConfigExtensionMethods { @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c) } trait HikariConfigReader extends TypesafeConfigReader { self: TypesafeConfig => // with TypesafeConfigReader => //NoEnvPrefix => import ConfigExtensionMethods.configExtensionMethods def getFactoryName(dbName: Symbol): String = { val c: Config = config.getConfig(envPrefix + "db." + dbName.name) c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP) } def hikariCPConfig(dbName: Symbol): HikariConfig = { val hconf = new HikariConfig() val c: Config = config.getConfig(envPrefix + "db." + dbName.name) // Connection settings if (c.hasPath("dataSourceClass")) { hconf.setDataSourceClassName(c.getString("dataSourceClass")) } else { Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _) } hconf.setJdbcUrl(c.getStringOr("url", null)) c.getStringOpt("user").foreach(hconf.setUsername) c.getStringOpt("password").foreach(hconf.setPassword) c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties) // Pool configuration hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000)) hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000)) hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000)) hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000)) hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0)) hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false)) c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery) c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql) val numThreads = c.getIntOr("numThreads", 20) hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5)) hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads)) hconf.setPoolName(c.getStringOr("poolName", dbName.name)) hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false)) // Equivalent of ConnectionPreparer hconf.setReadOnly(c.getBooleanOr("readOnly", false)) c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation) hconf.setCatalog(c.getStringOr("catalog", null)) hconf } } import scalikejdbc._ trait ConfigDBs { self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader => def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { getFactoryName(dbName) match { case "hikaricp" => { val hconf = hikariCPConfig(dbName) val hikariCPSource = new HikariDataSource(hconf) if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) { Class.forName(hconf.getDriverClassName) } ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource)) } case _ => { val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName) val cpSettings = readConnectionPoolSettings(dbName) if (driver != null && driver.trim.nonEmpty) { Class.forName(driver) } ConnectionPool.add(dbName, url, user, password, cpSettings) } } } def setupAll(): Unit = { loadGlobalSettings() dbNames.foreach { dbName => setup(Symbol(dbName)) } } def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = { ConnectionPool.close(dbName) } def closeAll(): Unit = { ConnectionPool.closeAll } } object ConfigDBs extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs with TypesafeConfigReader with StandardTypesafeConfig with HikariConfigReader with EnvPrefix { override val env = Option(envValue)

MongoStreamDemo.scala

复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import org.mongodb.scala._ import org.mongodb.scala.connection._ import scala.collection.JavaConverters._ object MongoStream extends App { import MGOContext._ import MGOEngine._ import MGOCommands._ import MGOHelpers._ val clusterSettings = ClusterSettings.builder() .hosts(List(new ServerAddress("localhost:27017")).asJava).build() val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build() implicit val client = MongoClient(clientSettings) implicit val system = ActorSystem() implicit val mat = ActorMaterializer() implicit val ec = system.dispatcher case class PO ( ponum: String, podate: MGODate, vendor: String, remarks: Option[String], podtl: Option[MGOArray] ) def toPO(doc: Document): PO = { PO( ponum = doc.getString("ponum"), podate = doc.getDate("podate"), vendor = doc.getString("vendor"), remarks = mgoGetStringOrNone(doc,"remarks"), podtl = mgoGetArrayOrNone(doc,"podtl") ) } case class PODTL( item: String, price: Double, qty: Int, packing: Option[String], payTerm: Option[String] ) def toPODTL(podtl: Document): PODTL = { PODTL( item = podtl.getString("item"), price = podtl.getDouble("price"), qty = podtl.getInteger("qty"), packing = mgoGetStringOrNone(podtl,"packing"), payTerm = mgoGetStringOrNone(podtl,"payterm") ) } def showPO(po: PO) = { println(s"po number: ${po.ponum}") println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}") println(s"vendor: ${po.vendor}") if (po.remarks != None) println(s"remarks: ${po.remarks.get}") po.podtl match { case Some(barr) => mgoArrayToDocumentList(barr) .map { dc => toPODTL(dc)} .foreach { doc: PODTL => print(s"==>Item: ${doc.item} ") print(s"price: ${doc.price} ") print(s"qty: ${doc.qty} ") doc.packing.foreach(pk => print(s"packing: ${pk} ")) doc.payTerm.foreach(pt => print(s"payTerm: ${pt} ")) println("") } case _ => } } import org.mongodb.scala.model.Projections._ import MongoActionStream._ import MGOEngine._ import akka.stream.scaladsl.{Sink, Source} val proj: MGOFilterResult = find => find.projection(exclude("handler","_id")) val ctx = MGOContext("testdb","po").setCommand( DocumentStream(filter = None, andThen = Some(proj))) val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)) println(getResult(mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO)))) import jdbcengine._ import JDBCEngine._ import scalikejdbc._ case class DataRow ( rowid: Long, measureid: Long, state: String, county: String, year: Int, value: Int ) val toRow: WrappedResultSet => DataRow = rs => DataRow( rowid = rs.long("ROWID"), measureid = rs.long("MEASUREID"), state = rs.string("STATENAME"), county = rs.string("COUNTYNAME"), year = rs.int("REPORTYEAR"), value = rs.int("VALUE") ) //construct the context val h2ctx = JDBCQueryContext[DataRow]( dbName = 'h2, statement = "select * from AQMRPT", extractor = toRow ) //source from h2 database val jdbcSource = jdbcAkkaStream(h2ctx) //document converter def rowToDoc: DataRow => Document = row => Document ( "rowid" -> row.rowid, "measureid" -> row.measureid, "state" -> row.state, "county" -> row.county, "year" -> row.year, "value" -> row.value ) def docToRow: Document => DataRow = doc => DataRow ( rowid = doc.getLong("rowid"), measureid = doc.getLong("measureid"), state = doc.getString("state"), county = doc.getString("county"), year = doc.getInteger("year"), value = doc.getInteger("value") ) //setup context val mgoctx = StreamingInsert("testdb","members",rowToDoc) val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx) val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow) val sink = Sink.foreach[DataRow]{ r => println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}") } //config jdbc drivers ConfigDBsWithEnv("dev").setup('h2) ConfigDBsWithEnv("dev").loadGlobalSettings() val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run() val mgoCtxPrint = MGOContext("testdb","members").setCommand( DocumentStream(filter = None)) mongoStream(mgoCtxPrint).map(docToRow).to(sink).run() import com.datastax.driver.core._ import cassandraengine._ import CQLEngine._ import org.mongodb.scala.model.Filters._ //data row converter val cqlToDataRow = (rs: Row) => DataRow( rowid = rs.getLong("ROWID"), measureid = rs.getLong("MEASUREID"), state = rs.getString("STATENAME"), county = rs.getString("COUNTYNAME"), year = rs.getInt("REPORTYEAR"), value = rs.getInt("VALUE") ) import org.bson.conversions._ import org.mongodb.scala.model.Updates._ //#init-session implicit val session = Cluster.builder .addContactPoint("127.0.0.1") .withPort(9042) .build .connect() //setup context val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow) //construct source val cqlSource = cassandraStream(cqlCtx) def toFilter: DataRow => Bson = row => { and(equal("rowid",row.rowid), lt("value",10)) } def toUpdate: DataRow => Bson = row => { set("value" , row.value * 10) } val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate) val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate) cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run() import org.bson.conversions._ import org.mongodb.scala.model.Filters._ def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10)) val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter) val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel) val mgoCtxSrc = MGOContext("testdb","members").setCommand( DocumentStream(filter = None)) mongoStream(mgoCtxSrc).map(docToRow).via(mgoDelFlow.performOnRow).to(Sink.ignore).run() import org.mongodb.scala.model.Sorts._ val sortDsc: MGOFilterResult = find => find.sort(descending("rowid")) val mgoCtxShow = MGOContext("testdb","members").setCommand( DocumentStream(filter = None, andThen = Some(sortDsc))) mongoStream(mgoCtxShow).map(docToRow).to(sink).run() scala.io.StdIn.readLine() system.terminate() }

 

转载于:https://www.cnblogs.com/tiger-xc/p/8581280.html

最后

以上就是简单冷风最近收集整理的关于SDP(12): MongoDB-Engine - Streaming的全部内容,更多相关SDP(12):内容请搜索靠谱客的其他文章。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(74)

评论列表共有 0 条评论

立即
投稿
返回
顶部