object MongoSource {

  def apply(query: Observable[Document]): Source[Document, NotUsed] =


实际上就是把Mongo-scala的Observable[Document]转成Source[Document, NotUsed]。我们还是通过传入context来构建这个Source:

  case class MGOContext(
                         dbName: String,
                         collName: String,
                         action: MGOCommands = null
                       ) {...}
   case class DocumentStream(filter: Option[Bson] = None,
                              andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                             ) extends MGOCommands


    def mongoStream(ctx: MGOContext)(
      implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
      val db = client.getDatabase(ctx.dbName)
      val coll = db.getCollection(ctx.collName)
      ctx.action match {
        case DocumentStream(None, None) =>
        case DocumentStream(Some(filter), None) =>
        case DocumentStream(None, Some(next)) =>
        case DocumentStream(Some(filter), Some(next)) =>


  val clusterSettings = ClusterSettings.builder()
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  implicit val client = MongoClient(clientSettings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray]
  def toPO(doc: Document): PO = {
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl")
  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
  def toPODTL(podtl: Document): PODTL = {
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
          .map { dc => toPODTL(dc)}
          .foreach { doc: PODTL =>
            print(s"==>Item: ${doc.item} ")
            print(s"price: ${doc.price} ")
            print(s"qty: ${doc.qty} ")
            doc.packing.foreach(pk => print(s"packing: ${pk} "))
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
      case _ =>
  import org.mongodb.scala.model.Projections._
  import MongoActionStream._
  import MGOEngine._
  import akka.stream.scaladsl.{Sink, Source}

  val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
  val ctx = MGOContext("testdb","po").setCommand(
    DocumentStream(filter = None, andThen = Some(proj)))

  val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))



 object MongoActionStream {

    import MGOContext._

    case class StreamingInsert[A](dbName: String,
                                  collName: String,
                                  converter: A => Document,
                                  parallelism: Int = 1
                                 ) extends MGOCommands

    case class StreamingDelete[A](dbName: String,
                                  collName: String,
                                  toFilter: A => Bson,
                                  parallelism: Int = 1,
                                  justOne: Boolean = false
                                 ) extends MGOCommands

    case class StreamingUpdate[A](dbName: String,
                                  collName: String,
                                  toFilter: A => Bson,
                                  toUpdate: A => Bson,
                                  parallelism: Int = 1,
                                  justOne: Boolean = false
                                 ) extends MGOCommands

    case class InsertAction[A](ctx: StreamingInsert[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))

    case class UpdateAction[A](ctx: StreamingUpdate[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
        if (ctx.justOne) {
            .mapAsync(ctx.parallelism)(a =>
              collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
        } else
            .mapAsync(ctx.parallelism)(a =>
              collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))

    case class DeleteAction[A](ctx: StreamingDelete[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
        if (ctx.justOne) {
            .mapAsync(ctx.parallelism)(a =>
              collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
        } else
            .mapAsync(ctx.parallelism)(a =>
              collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))


下面是insert, update及delete操作的示范。在这个示范里我们同时调用了JDBCEngine,CassandraEngine和MongoDBEngine:

  import jdbcengine._
  import JDBCEngine._
  import scalikejdbc._

  case class DataRow (
                       rowid: Long,
                       measureid: Long,
                       state: String,
                       county: String,
                       year: Int,
                       value: Int
  val toRow: WrappedResultSet => DataRow = rs => DataRow(
    rowid = rs.long("ROWID"),
    measureid = rs.long("MEASUREID"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    year = rs.int("REPORTYEAR"),
    value = rs.int("VALUE")

  //construct the context
  val h2ctx = JDBCQueryContext[DataRow](
    dbName = 'h2,
    statement = "select * from AQMRPT",
    extractor = toRow

  //source from h2 database
  val jdbcSource = jdbcAkkaStream(h2ctx)

  //document converter
  def rowToDoc: DataRow => Document = row => Document (
    "rowid" -> row.rowid,
    "measureid" ->  row.measureid,
    "state" ->  row.state,
    "county" ->  row.county,
    "year" ->  row.year,
    "value" ->  row.value
  def docToRow: Document => DataRow = doc => DataRow (
    rowid = doc.getLong("rowid"),
    measureid = doc.getLong("measureid"),
    state = doc.getString("state"),
    county = doc.getString("county"),
    year = doc.getInteger("year"),
    value = doc.getInteger("value")
  //setup context
  val mgoctx = StreamingInsert("testdb","members",rowToDoc)
  val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
  val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
  val sink = Sink.foreach[DataRow]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")

  //config jdbc drivers

  val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()

  val mgoCtxShow = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None))


  import com.datastax.driver.core._
  import cassandraengine._
  import CQLEngine._
  import org.mongodb.scala.model.Filters._

  //data row converter
  val cqlToDataRow = (rs: Row) => DataRow(
    rowid = rs.getLong("ROWID"),
    measureid = rs.getLong("MEASUREID"),
    state = rs.getString("STATENAME"),
    county = rs.getString("COUNTYNAME"),
    year = rs.getInt("REPORTYEAR"),
    value = rs.getInt("VALUE")

 import org.bson.conversions._
  import org.mongodb.scala.model.Updates._

  implicit val session = Cluster.builder

  //setup context
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
  //construct source
  val cqlSource = cassandraStream(cqlCtx)

  def toFilter: DataRow => Bson = row => {
    and(equal("rowid",row.rowid), lt("value",10))
  def toUpdate: DataRow => Bson = row => {
    set("value" , row.value * 10)
  val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
  val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)
  val sts = cqlSource.via(mgoUpdateFlow.performOnRow).to(sink).run()

  import org.bson.conversions._
  import org.mongodb.scala.model.Filters._
  def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))

  val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
  val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
  val mgoCtxSrc = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None))

  import org.mongodb.scala.model.Sorts._
  val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
  val mgoCtxShow = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None, andThen = Some(sortDsc)))




name := "learn-mongo"

version := "0.1"

scalaVersion := "2.12.4"

libraryDependencies := Seq(
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
  "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
  "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
  "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17",
  "com.typesafe.akka" %% "akka-actor" % "2.5.4",
  "com.typesafe.akka" %% "akka-stream" % "2.5.4",
  "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
  "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
  "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
  "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
  "com.h2database"  %  "h2"                % "1.4.196",
  "mysql" % "mysql-connector-java" % "6.0.6",
  "org.postgresql" % "postgresql" % "42.2.0",
  "commons-dbcp" % "commons-dbcp" % "1.4",
  "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
  "com.zaxxer" % "HikariCP" % "2.7.4",
  "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
  "com.typesafe.slick" %% "slick" % "3.2.1",
  "ch.qos.logback"  %  "logback-classic"   % "1.2.3"


# JDBC settings
test {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "commons-dbcp2"

  db.mysql.driver = "com.mysql.cj.jdbc.Driver"
  db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
  db.mysql.user = "root"
  db.mysql.password = "123"
  db.mysql.poolInitialSize = 5
  db.mysql.poolMaxSize = 7
  db.mysql.poolConnectionTimeoutMillis = 1000
  db.mysql.poolValidationQuery = "select 1 as one"
  db.mysql.poolFactoryName = "bonecp"

  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
dev {
  db {
    h2 {
      driver = "org.h2.Driver"
      url = "jdbc:h2:tcp://localhost/~/slickdemo"
      user = ""
      password = ""
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
    mysql {
      driver = "com.mysql.cj.jdbc.Driver"
      url = "jdbc:mysql://localhost:3306/testdb"
      user = "root"
      password = "123"
      poolInitialSize = 5
      poolMaxSize = 7
      poolConnectionTimeoutMillis = 1000
      poolValidationQuery = "select 1 as one"
      poolFactoryName = "bonecp"

    postgres {
      driver = "org.postgresql.Driver"
      url = "jdbc:postgresql://localhost:5432/testdb"
      user = "root"
      password = "123"
      poolFactoryName = "hikaricp"
      numThreads = 10
      maxConnections = 12
      minConnections = 4
      keepAliveConnection = true
  # scallikejdbc Global settings
  scalikejdbc.global.loggingSQLAndTime.enabled = true
  scalikejdbc.global.loggingSQLAndTime.logLevel = info
  scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
  scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
  scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
  scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
  scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
  scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10


package jdbcengine
import java.sql.PreparedStatement

import scala.collection.generic.CanBuildFrom
import akka.stream.scaladsl._
import scalikejdbc._
import scalikejdbc.streams._
import akka.NotUsed
import akka.stream._
import scala.util._
import java.time._
import scala.concurrent.duration._
import filestreaming.FileStreaming._

import scalikejdbc.TxBoundary.Try._

import scala.concurrent.ExecutionContextExecutor
import java.io.InputStream

object JDBCContext {
  type SQLTYPE = Int
  val SQL_EXEDDL= 1
  val SQL_UPDATE = 2


case class JDBCQueryContext[M](
                                dbName: Symbol,
                                statement: String,
                                parameters: Seq[Any] = Nil,
                                fetchSize: Int = 100,
                                autoCommit: Boolean = false,
                                queryTimeout: Option[Int] = None,
                                extractor: WrappedResultSet => M)

case class JDBCContext(
                        dbName: Symbol,
                        statements: Seq[String] = Nil,
                        parameters: Seq[Seq[Any]] = Nil,
                        fetchSize: Int = 100,
                        queryTimeout: Option[Int] = None,
                        queryTags: Seq[String] = Nil,
                        sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
                        batch: Boolean = false,
                        returnGeneratedKey: Seq[Option[Any]] = Nil,
                        // no return: None, return by index: Some(1), by name: Some("id")
                        preAction: Option[PreparedStatement => Unit] = None,
                        postAction: Option[PreparedStatement => Unit] = None) {

  ctx =>

  //helper functions

  def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)

  def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)

  def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)

  def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)

  def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(preAction = action)
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")

  def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
      !ctx.batch && ctx.statements.size == 1)
      ctx.copy(postAction = action)
      throw new IllegalStateException("JDBCContex setting error: preAction not supported!")

  def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
        statements = ctx.statements ++ Seq(_statement),
        parameters = ctx.parameters ++ Seq(Seq(_parameters))
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")

  def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
    if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
        statements = ctx.statements ++ Seq(_statement),
        parameters = ctx.parameters ++ Seq(_parameters),
        returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
    } else
      throw new IllegalStateException("JDBCContex setting error: option not supported!")

  def appendBatchParameters(_parameters: Any*): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")

    var matchParams = true
    if (ctx.parameters != Nil)
      if (ctx.parameters.head.size != _parameters.size)
        matchParams = false
    if (matchParams) {
        parameters = ctx.parameters ++ Seq(_parameters)
    } else
      throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")

  def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
    if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
      throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
      returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil

  def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
      statements = Seq(_statement),
      parameters = Seq(_parameters),
      sqlType = JDBCContext.SQL_EXEDDL,
      batch = false

  def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String, _parameters: Any*): JDBCContext = {
      statements = Seq(_statement),
      parameters = Seq(_parameters),
      returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
      sqlType = JDBCContext.SQL_UPDATE,
      batch = false
  def setBatchCommand(_statement: String): JDBCContext = {
    ctx.copy (
      statements = Seq(_statement),
      sqlType = JDBCContext.SQL_UPDATE,
      batch = true

  type JDBCDate = LocalDate
  type JDBCDateTime = LocalDateTime

  def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
  def jdbcSetNow = LocalDateTime.now()

  type JDBCBlob = InputStream

  def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToInputStream(fileName,timeOut)

  def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
    implicit mat: Materializer) =  InputStreamToFile(blob,fileName)


object JDBCEngine {

  import JDBCContext._

  private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
    throw new IllegalStateException(message)

  def jdbcAkkaStream[A](ctx: JDBCQueryContext[A])
                       (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {

    val publisher: DatabasePublisher[A] = NamedDB('h2) readOnlyStream {
      val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
      val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)

        .withDBSessionForceAdjuster(session => {

  def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
                                                      ctx: JDBCQueryContext[A])(
                                                      implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {

    val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
    implicit val session = NamedAutoSession(ctx.dbName)
    val sql: SQL[A, HasExtractor] = rawSql.map(ctx.extractor)


  def jdbcExcuteDDL(ctx: JDBCContext): Try[String] = {
    if (ctx.sqlType != SQL_EXEDDL) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
    else {
      NamedDB(ctx.dbName) localTx { implicit session =>
        Try {
          ctx.statements.foreach { stm =>
            val ddl = new SQLExecution(statement = stm, parameters = Nil)(
              before = WrappedResultSet => {})(
              after = WrappedResultSet => {})

          "SQL_EXEDDL executed succesfully."

  def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    if (ctx.statements == Nil)
      throw new IllegalStateException("JDBCContex setting error: statements empty!")
    if (ctx.sqlType != SQL_UPDATE) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    else {
      if (ctx.batch) {
        if (noReturnKey(ctx)) {
          val usql = SQL(ctx.statements.head)
            .tags(ctx.queryTags: _*)
            .batch(ctx.parameters: _*)
          Try {
            NamedDB(ctx.dbName) localTx { implicit session =>
        } else {
          val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
          Try {
            NamedDB(ctx.dbName) localTx { implicit session =>

      } else {
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
  private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    val Some(key) :: xs = ctx.returnGeneratedKey
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
    Try {
      NamedDB(ctx.dbName) localTx { implicit session =>
        val result = usql.apply()

  private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    val params: Seq[Any] = ctx.parameters match {
      case Nil => Nil
      case p@_ => p.head
    val before = ctx.preAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    val after = ctx.postAction match {
      case None => pstm: PreparedStatement => {}
      case Some(f) => f
    val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
    Try {
      NamedDB(ctx.dbName) localTx {implicit session =>
        val result = usql.apply()


  private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    if (noReturnKey(ctx))

  private def noReturnKey(ctx: JDBCContext): Boolean = {
    if (ctx.returnGeneratedKey != Nil) {
      val k :: xs = ctx.returnGeneratedKey
      k match {
        case None => true
        case Some(k) => false
    } else true

  def noActon: PreparedStatement=>Unit = pstm => {}

  def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    Try {
      NamedDB(ctx.dbName) localTx { implicit session =>
        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
          case Nil => Seq.fill(ctx.statements.size)(None)
          case k@_ => k
        val sqlcmd = ctx.statements zip ctx.parameters zip keys
        val results = sqlcmd.map { case ((stm, param), key) =>
          key match {
            case None =>
              new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
            case Some(k) =>
              new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong

  def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
    implicit cbf: CanBuildFrom[Nothing, Long, C[Long]]): Try[C[Long]] = {
    if (ctx.statements == Nil)
      throw new IllegalStateException("JDBCContex setting error: statements empty!")
    if (ctx.sqlType != SQL_UPDATE) {
      Failure(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
    else {
      if (!ctx.batch) {
        if (ctx.statements.size == 1)
      } else
        Failure(new IllegalStateException("JDBCContex setting error: must set batch = false !"))


  case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
                                 statement: String, prepareParams: R => Seq[Any]) {
    jas =>
    def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
    def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
    def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)

    private def perform(r: R) = {
      import scala.concurrent._
      val params = prepareParams(r)
      NamedDB(dbName) autoCommit { session =>
        session.execute(statement,params: _*)
    def performOnRow(implicit session: DBSession): Flow[R, R, NotUsed] =
      if (processInOrder)

  object JDBCActionStream {
    def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
      new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)



package cassandraengine
import com.datastax.driver.core._

import scala.concurrent._
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}

import scala.collection.JavaConverters._
import scala.collection.generic.CanBuildFrom
import scala.concurrent.duration.Duration
import akka.NotUsed
import akka.stream.alpakka.cassandra.scaladsl._
import akka.stream.scaladsl._
import filestreaming.FileStreaming._

object CQLContext {
  // Consistency Levels
  val ANY: CONSISTENCY_LEVEL          =                                        0x0000
  val ONE: CONSISTENCY_LEVEL          =                                        0x0001
  val TWO: CONSISTENCY_LEVEL          =                                        0x0002
  val THREE: CONSISTENCY_LEVEL        =                                        0x0003
  val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
  val ALL: CONSISTENCY_LEVEL          =                                        0x0005
  val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
  val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
  val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
  val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
  val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C

  def apply(): CQLContext = CQLContext(statements = Nil)

  def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
    consistency match {
      case ALL => ConsistencyLevel.ALL
      case ONE => ConsistencyLevel.ONE
      case TWO => ConsistencyLevel.TWO
      case THREE => ConsistencyLevel.THREE
      case ANY => ConsistencyLevel.ANY
      case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
      case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
      case QUORUM => ConsistencyLevel.QUORUM
      case SERIAL => ConsistencyLevel.SERIAL
      case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL


case class CQLQueryContext[M](
                               statement: String,
                               extractor: Row => M,
                               parameter: Seq[Object] = Nil,
                               consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                               fetchSize: Int = 100
                             ) { ctx =>
  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext[M] =
    ctx.copy(consistency = Some(_consistency))
  def setFetchSize(pageSize: Int): CQLQueryContext[M] =
    ctx.copy(fetchSize = pageSize)
object CQLQueryContext {
  def apply[M](stmt: String, converter: Row => M): CQLQueryContext[M] =
    new CQLQueryContext[M](statement = stmt, extractor = converter)

case class CQLContext(
                       statements: Seq[String],
                       parameters: Seq[Seq[Object]] = Nil,
                       consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                     ) { ctx =>

  def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
    ctx.copy(consistency = Some(_consistency))
  def setCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
  def appendCommand(_statement: String, _parameters: Object*): CQLContext =
    ctx.copy(statements = ctx.statements :+ _statement,
      parameters = ctx.parameters ++ Seq(_parameters))

object CQLEngine {
  import CQLContext._
  import CQLHelpers._

  def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
    implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {

    val prepStmt = session.prepare(ctx.statement)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameter != Nil) {
      val params = processParameters(ctx.parameter)
      boundStmt = prepStmt.bind(params:_*)

    ctx.consistency.foreach {consistency =>

    val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
  def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
    extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
    if (resultSet.isFullyFetched) {
      (resultSet, None)
    } else {
      try {
        val result = Await.result(resultSet.fetchMoreResults(), timeOut)
        (result, Some((result.asScala.view.map(extractor)).to[C]))
      } catch { case e: Throwable => (resultSet, None) }
  def cqlExecute(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    if (ctx.statements.size == 1)
  def cqlSingleUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {

    val prepStmt = session.prepare(ctx.statements.head)

    var boundStmt =  prepStmt.bind()
    if (ctx.parameters != Nil) {
      val params = processParameters(ctx.parameters.head)
      boundStmt = prepStmt.bind(params:_*)

    ctx.consistency.foreach {consistency =>
  def cqlMultiUpdate(ctx: CQLContext)(
    implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
    var batch = new BatchStatement()
    commands.foreach { case (stm, params) =>
      val prepStmt = session.prepare(stm)
      if (params == Nil)
      else {
        val p = processParameters(params)
        batch.add(prepStmt.bind(p: _*))
    ctx.consistency.foreach {consistency =>

  def cassandraStream[A](ctx: CQLQueryContext[A])
                        (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {

    val prepStmt = session.prepare(ctx.statement)
    var boundStmt =  prepStmt.bind()
    val params = processParameters(ctx.parameter)
    boundStmt = prepStmt.bind(params:_*)
    ctx.consistency.foreach {consistency =>


  case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
                                      statement: String, prepareParams: R => Seq[Object],
                                      consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
    def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
    def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
    def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
      cas.copy(consistency = Some(_consistency))

    private def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
      val prepStmt = session.prepare(statement)
      var boundStmt =  prepStmt.bind()
      val params = processParameters(prepareParams(r))
      boundStmt = prepStmt.bind(params:_*)
      consistency.foreach { cons =>
      session.executeAsync(boundStmt).map(_ => r)
    def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
      if (processInOrder)

  object CassandraActionStream {
    def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
      new CassandraActionStream[R]( statement=_statement, prepareParams = params)

object CQLHelpers {
  import java.nio.ByteBuffer
  import com.datastax.driver.core.LocalDate
  import java.time.Instant
  import akka.stream._
  import scala.concurrent.duration._

  implicit def listenableFutureToFuture[T](
                                            listenableFuture: ListenableFuture[T]): Future[T] = {
    val promise = Promise[T]()
    Futures.addCallback(listenableFuture, new FutureCallback[T] {
      def onFailure(error: Throwable): Unit = {
      def onSuccess(result: T): Unit = {

  type CQLBlob = ByteBuffer
  case class CQLDate(year: Int, month: Int, day: Int)
  case object CQLTodayDate
  case class CQLDateTime(year: Int, Month: Int,
                         day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
  case object CQLDateTimeNow

  def processParameters(params: Seq[Object]): Seq[Object] = {
    import java.time.{Clock,ZoneId}
    params.map { obj =>
      obj match {
        case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
        case CQLTodayDate =>
          val today = java.time.LocalDate.now()
          LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
        case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
        case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
        case p@_ => p

  def fileToCQLBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToByteBuffer(fileName,timeOut)

  def cqlBlobToFile(blob: CQLBlob, fileName: String)(
    implicit mat: Materializer) =  ByteBufferToFile(blob,fileName)


import java.text.SimpleDateFormat

import akka.NotUsed
import akka.stream.alpakka.mongodb.scaladsl._
import akka.stream.scaladsl.{Flow, Sink, Source}
import org.mongodb.scala.MongoClient
import org.mongodb.scala.bson.collection.immutable.Document
import org.bson.conversions.Bson
import org.mongodb.scala._
import org.mongodb.scala.model._
import java.util.Calendar

import scala.collection.JavaConverters._
import filestreaming.FileStreaming._
import akka.stream.Materializer
import org.mongodb.scala.bson.{BsonArray, BsonBinary}

import scala.concurrent._
import scala.concurrent.duration._

object MGOContext {

  trait MGOCommands

  object MGOCommands {

    case class Count(filter: Option[Bson], options: Option[Any]) extends MGOCommands

    case class Distict(fieldName: String, filter: Option[Bson]) extends MGOCommands

  /*  org.mongodb.scala.FindObservable
  import com.mongodb.async.client.FindIterable
  val resultDocType = FindIterable[Document]
  val resultOption = FindObservable(resultDocType)
  .project(...) */
    case class Find[M](filter: Option[Bson] = None,
                    andThen: Option[FindObservable[Document] => FindObservable[Document]]= None,
                    converter: Option[Document => M] = None,
                    firstOnly: Boolean = false) extends MGOCommands

    case class DocumentStream(filter: Option[Bson] = None,
                              andThen: Option[FindObservable[Document] => FindObservable[Document]] = None,
                             ) extends MGOCommands

    case class Aggregate(pipeLine: Seq[Bson]) extends MGOCommands

    case class MapReduce(mapFunction: String, reduceFunction: String) extends MGOCommands

    case class Insert(newdocs: Seq[Document], options: Option[Any] = None) extends MGOCommands

    case class Delete(filter: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class Replace(filter: Bson, replacement: Document, options: Option[Any] = None) extends MGOCommands

    case class Update(filter: Bson, update: Bson, options: Option[Any] = None, onlyOne: Boolean = false) extends MGOCommands

    case class BulkWrite(commands: List[WriteModel[Document]], options: Option[Any] = None) extends MGOCommands


  object MGOAdmins {

    case class DropCollection(collName: String) extends MGOCommands

    case class CreateCollection(collName: String, options: Option[Any] = None) extends MGOCommands

    case class ListCollection(dbName: String) extends MGOCommands

    case class CreateView(viewName: String, viewOn: String, pipeline: Seq[Bson], options: Option[Any] = None) extends MGOCommands

    case class CreateIndex(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByName(indexName: String, options: Option[Any] = None) extends MGOCommands

    case class DropIndexByKey(key: Bson, options: Option[Any] = None) extends MGOCommands

    case class DropAllIndexes(options: Option[Any] = None) extends MGOCommands


  case class MGOContext(
                         dbName: String,
                         collName: String,
                         action: MGOCommands = null
                       ) {
    ctx =>
    def setDbName(name: String): MGOContext = ctx.copy(dbName = name)

    def setCollName(name: String): MGOContext = ctx.copy(collName = name)

    def setCommand(cmd: MGOCommands): MGOContext = ctx.copy(action = cmd)

  object MGOContext {
    def apply(db: String, coll: String) = new MGOContext(db, coll)

    def apply(db: String, coll: String, command: MGOCommands) =
      new MGOContext(db, coll, command)


  type MGODate = java.util.Date
  def mgoDate(yyyy: Int, mm: Int, dd: Int): MGODate = {
    val ca = Calendar.getInstance()
  def mgoDateTime(yyyy: Int, mm: Int, dd: Int, hr: Int, min: Int, sec: Int): MGODate = {
    val ca = Calendar.getInstance()
  def mgoDateTimeNow: MGODate = {
    val ca = Calendar.getInstance()

  def mgoDateToString(dt: MGODate, formatString: String): String = {
    val fmt= new SimpleDateFormat(formatString)

  type MGOBlob = BsonBinary
  type MGOArray = BsonArray

  def fileToMGOBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer) = FileToByteArray(fileName,timeOut)

  def mgoBlobToFile(blob: MGOBlob, fileName: String)(
    implicit mat: Materializer) =  ByteArrayToFile(blob.getData,fileName)

  def mgoGetStringOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetIntOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetLonggOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetDoubleOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetBoolOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetDateOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetBlobOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None
  def mgoGetArrayOrNone(doc: Document, fieldName: String) = {
    if (doc.keySet.contains(fieldName))
    else None

  def mgoArrayToDocumentList(arr: MGOArray): scala.collection.immutable.List[org.bson.BsonDocument] = {

  type MGOFilterResult = FindObservable[Document] => FindObservable[Document]
object MGOEngine {

  import MGOContext._
  import MGOCommands._
  import MGOAdmins._

  def mgoExecute[T](ctx: MGOContext)(implicit client: MongoClient): Future[T] = {
    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    ctx.action match {
      /* count */
      case Count(Some(filter), Some(opt)) =>
        coll.count(filter, opt.asInstanceOf[CountOptions])
      case Count(Some(filter), None) =>
      case Count(None, None) =>
      /* distinct */
      case Distict(field, Some(filter)) =>
        coll.distinct(field, filter).toFuture()
      case Distict(field, None) =>
      /* find */
      case Find(None, None, optConv, false) =>
        if (optConv == None) coll.find().toFuture().asInstanceOf[Future[T]]
        else coll.find().map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(None, None, optConv, true) =>
        if (optConv == None) coll.find().first().head().asInstanceOf[Future[T]]
        else coll.find().first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(Some(filter), None, optConv, false) =>
        if (optConv == None) coll.find(filter).toFuture().asInstanceOf[Future[T]]
        else coll.find(filter).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter), None, optConv, true) =>
        if (optConv == None) coll.find(filter).first().head().asInstanceOf[Future[T]]
        else coll.find(filter).first().map(optConv.get).head().asInstanceOf[Future[T]]
      case Find(None, Some(next), optConv, _) =>
        if (optConv == None) next(coll.find[Document]()).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document]()).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      case Find(Some(filter), Some(next), optConv, _) =>
        if (optConv == None) next(coll.find[Document](filter)).toFuture().asInstanceOf[Future[T]]
        else next(coll.find[Document](filter)).map(optConv.get).toFuture().asInstanceOf[Future[T]]
      /* aggregate */
      case Aggregate(pline) => coll.aggregate(pline).toFuture().asInstanceOf[Future[T]]
      /* mapReduce */
      case MapReduce(mf, rf) => coll.mapReduce(mf, rf).toFuture().asInstanceOf[Future[T]]
      /* insert */
      case Insert(docs, Some(opt)) =>
        if (docs.size > 1) coll.insertMany(docs, opt.asInstanceOf[InsertManyOptions]).toFuture()
        else coll.insertOne(docs.head, opt.asInstanceOf[InsertOneOptions]).toFuture()
      case Insert(docs, None) =>
        if (docs.size > 1) coll.insertMany(docs).toFuture().asInstanceOf[Future[T]]
        else coll.insertOne(docs.head).toFuture().asInstanceOf[Future[T]]
      /* delete */
      case Delete(filter, None, onlyOne) =>
        if (onlyOne) coll.deleteOne(filter).toFuture().asInstanceOf[Future[T]]
        else coll.deleteMany(filter).toFuture().asInstanceOf[Future[T]]
      case Delete(filter, Some(opt), onlyOne) =>
        if (onlyOne) coll.deleteOne(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.deleteMany(filter, opt.asInstanceOf[DeleteOptions]).toFuture().asInstanceOf[Future[T]]
      /* replace */
      case Replace(filter, replacement, None) =>
        coll.replaceOne(filter, replacement).toFuture().asInstanceOf[Future[T]]
      case Replace(filter, replacement, Some(opt)) =>
        coll.replaceOne(filter, replacement, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
      /* update */
      case Update(filter, update, None, onlyOne) =>
        if (onlyOne) coll.updateOne(filter, update).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter, update).toFuture().asInstanceOf[Future[T]]
      case Update(filter, update, Some(opt), onlyOne) =>
        if (onlyOne) coll.updateOne(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
        else coll.updateMany(filter, update, opt.asInstanceOf[UpdateOptions]).toFuture().asInstanceOf[Future[T]]
      /* bulkWrite */
      case BulkWrite(commands, None) =>
      case BulkWrite(commands, Some(opt)) =>
        coll.bulkWrite(commands, opt.asInstanceOf[BulkWriteOptions]).toFuture().asInstanceOf[Future[T]]

      /* drop collection */
      case DropCollection(collName) =>
        val coll = db.getCollection(collName)
      /* create collection */
      case CreateCollection(collName, None) =>
      case CreateCollection(collName, Some(opt)) =>
        db.createCollection(collName, opt.asInstanceOf[CreateCollectionOptions]).toFuture().asInstanceOf[Future[T]]
      /* list collection */
      case ListCollection(dbName) =>
      /* create view */
      case CreateView(viewName, viewOn, pline, None) =>
        db.createView(viewName, viewOn, pline).toFuture().asInstanceOf[Future[T]]
      case CreateView(viewName, viewOn, pline, Some(opt)) =>
        db.createView(viewName, viewOn, pline, opt.asInstanceOf[CreateViewOptions]).toFuture().asInstanceOf[Future[T]]
      /* create index */
      case CreateIndex(key, None) =>
      case CreateIndex(key, Some(opt)) =>
        coll.createIndex(key, opt.asInstanceOf[IndexOptions]).toFuture().asInstanceOf[Future[T]]
      /* drop index */
      case DropIndexByName(indexName, None) =>
      case DropIndexByName(indexName, Some(opt)) =>
        coll.dropIndex(indexName, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropIndexByKey(key, None) =>
      case DropIndexByKey(key, Some(opt)) =>
        coll.dropIndex(key, opt.asInstanceOf[DropIndexOptions]).toFuture().asInstanceOf[Future[T]]
      case DropAllIndexes(None) =>
      case DropAllIndexes(Some(opt)) =>

  def mongoStream(ctx: MGOContext)(
    implicit client: MongoClient, ec: ExecutionContextExecutor): Source[Document, NotUsed] = {
    val db = client.getDatabase(ctx.dbName)
    val coll = db.getCollection(ctx.collName)
    ctx.action match {
      case DocumentStream(None, None) =>
      case DocumentStream(Some(filter), None) =>
      case DocumentStream(None, Some(next)) =>
      case DocumentStream(Some(filter), Some(next)) =>


  object MongoActionStream {
    import MGOContext._

    case class StreamingInsert[A](dbName: String,
                                  collName: String,
                                  converter: A => Document,
                                  parallelism: Int = 1
                                 ) extends MGOCommands

    case class StreamingDelete[A](dbName: String,
                                  collName: String,
                                  toFilter: A => Bson,
                                  parallelism: Int = 1,
                                  justOne: Boolean = false
                                 ) extends MGOCommands

    case class StreamingUpdate[A](dbName: String,
                                  collName: String,
                                  toFilter: A => Bson,
                                  toUpdate: A => Bson,
                                  parallelism: Int = 1,
                                  justOne: Boolean = false
                                 ) extends MGOCommands

    case class InsertAction[A](ctx: StreamingInsert[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, Document, NotUsed] =
          .mapAsync(ctx.parallelism)(doc => collection.insertOne(doc).toFuture().map(_ => doc))

    case class UpdateAction[A](ctx: StreamingUpdate[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
        if (ctx.justOne) {
            .mapAsync(ctx.parallelism)(a =>
              collection.updateOne(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))
        } else
            .mapAsync(ctx.parallelism)(a =>
              collection.updateMany(ctx.toFilter(a), ctx.toUpdate(a)).toFuture().map(_ => a))

    case class DeleteAction[A](ctx: StreamingDelete[A])(
      implicit mongoClient: MongoClient) {

      val database = mongoClient.getDatabase(ctx.dbName)
      val collection = database.getCollection(ctx.collName)

      def performOnRow(implicit ec: ExecutionContext): Flow[A, A, NotUsed] =
        if (ctx.justOne) {
            .mapAsync(ctx.parallelism)(a =>
              collection.deleteOne(ctx.toFilter(a)).toFuture().map(_ => a))
        } else
            .mapAsync(ctx.parallelism)(a =>
              collection.deleteMany(ctx.toFilter(a)).toFuture().map(_ => a))

import org.mongodb.scala._
import scala.concurrent._
import scala.concurrent.duration._

object MGOHelpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), 10 seconds)
    def headResult() = Await.result(observable.head(), 10 seconds)
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")

  def getResult[T](fut: Future[T], timeOut: Duration = 1 second): T = {
  def getResults[T](fut: Future[Iterable[T]], timeOut: Duration = 1 second): Iterable[T] = {



package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths

import akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters}

import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._

object FileStreaming {
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer):ByteBuffer = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    (Await.result(fut, timeOut)).toByteBuffer

  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): Array[Byte] = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    (Await.result(fut, timeOut)).toArray

  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): InputStream = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    val buf = (Await.result(fut, timeOut)).toArray
    new ByteArrayInputStream(buf)

  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
    implicit mat: Materializer) = {
    val ba = new Array[Byte](byteBuf.remaining())
    val baInput = new ByteArrayInputStream(ba)
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))

  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
    implicit mat: Materializer) = {
    val bb = ByteBuffer.wrap(bytes)
    val baInput = new ByteArrayInputStream(bytes)
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))

  def InputStreamToFile(is: InputStream, fileName: String)(
    implicit mat: Materializer) = {
    val source = StreamConverters.fromInputStream(() => is)



package filestreaming
import java.io.{InputStream, ByteArrayInputStream}
import java.nio.ByteBuffer
import java.nio.file.Paths

import akka.stream.{Materializer}
import akka.stream.scaladsl.{FileIO, StreamConverters}

import scala.concurrent.{Await}
import akka.util._
import scala.concurrent.duration._

object FileStreaming {
  def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer):ByteBuffer = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    (Await.result(fut, timeOut)).toByteBuffer

  def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): Array[Byte] = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    (Await.result(fut, timeOut)).toArray

  def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
    implicit mat: Materializer): InputStream = {
    val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
      hd ++ bs
    val buf = (Await.result(fut, timeOut)).toArray
    new ByteArrayInputStream(buf)

  def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
    implicit mat: Materializer) = {
    val ba = new Array[Byte](byteBuf.remaining())
    val baInput = new ByteArrayInputStream(ba)
    val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))

  def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
    implicit mat: Materializer) = {
    val bb = ByteBuffer.wrap(bytes)
    val baInput = new ByteArrayInputStream(bytes)
    val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))

  def InputStreamToFile(is: InputStream, fileName: String)(
    implicit mat: Materializer) = {
    val source = StreamConverters.fromInputStream(() => is)



package jdbcengine
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import com.typesafe.config._
import java.util.concurrent.TimeUnit
import java.util.Properties
import scalikejdbc.config._
import com.typesafe.config.Config
import com.zaxxer.hikari._
import scalikejdbc.ConnectionPoolFactoryRepository

/** Extension methods to make Typesafe Config easier to use */
class ConfigExtensionMethods(val c: Config) extends AnyVal {
  import scala.collection.JavaConverters._

  def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
  def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
  def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
  def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default

  def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
  def getDurationOr(path: String, default: => Duration = Duration.Zero) =
    if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default

  def getPropertiesOr(path: String, default: => Properties = null): Properties =
    if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default

  def toProperties: Properties = {
    def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
      val props = new Properties(null)
      m.foreach { case (k, cv) =>
        val v =
          if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
          else if(cv.unwrapped eq null) null
          else cv.unwrapped.toString
        if(v ne null) props.put(k, v)

  def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
  def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
  def getStringOpt(path: String) = Option(getStringOr(path))
  def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))

object ConfigExtensionMethods {
  @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)

trait HikariConfigReader extends TypesafeConfigReader {
  self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>

  import ConfigExtensionMethods.configExtensionMethods

  def getFactoryName(dbName: Symbol): String = {
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)

  def hikariCPConfig(dbName: Symbol): HikariConfig = {

    val hconf = new HikariConfig()
    val c: Config = config.getConfig(envPrefix + "db." + dbName.name)

    // Connection settings
    if (c.hasPath("dataSourceClass")) {
    } else {
      Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
    hconf.setJdbcUrl(c.getStringOr("url", null))

    // Pool configuration
    hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
    hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
    hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
    hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
    hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
    hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
    val numThreads = c.getIntOr("numThreads", 20)
    hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
    hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
    hconf.setPoolName(c.getStringOr("poolName", dbName.name))
    hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))

    // Equivalent of ConnectionPreparer
    hconf.setReadOnly(c.getBooleanOr("readOnly", false))
    c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
    hconf.setCatalog(c.getStringOr("catalog", null))



import scalikejdbc._
trait ConfigDBs {
  self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>

  def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
    getFactoryName(dbName) match {
      case "hikaricp" => {
        val hconf = hikariCPConfig(dbName)
        val hikariCPSource = new HikariDataSource(hconf)
        if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
        ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
      case _ => {
        val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
        val cpSettings = readConnectionPoolSettings(dbName)
        if (driver != null && driver.trim.nonEmpty) {
        ConnectionPool.add(dbName, url, user, password, cpSettings)

  def setupAll(): Unit = {
    dbNames.foreach { dbName => setup(Symbol(dbName)) }

  def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {

  def closeAll(): Unit = {


object ConfigDBs extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader

case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
  with TypesafeConfigReader
  with StandardTypesafeConfig
  with HikariConfigReader
  with EnvPrefix {

  override val env = Option(envValue)


import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.mongodb.scala._
import org.mongodb.scala.connection._
import scala.collection.JavaConverters._

object MongoStream extends App {
  import MGOContext._
  import MGOEngine._
  import MGOCommands._
  import MGOHelpers._

  val clusterSettings = ClusterSettings.builder()
    .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
  val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
  implicit val client = MongoClient(clientSettings)

  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  implicit val ec = system.dispatcher

  case class PO (
                  ponum: String,
                  podate: MGODate,
                  vendor: String,
                  remarks: Option[String],
                  podtl: Option[MGOArray]
  def toPO(doc: Document): PO = {
      ponum = doc.getString("ponum"),
      podate = doc.getDate("podate"),
      vendor = doc.getString("vendor"),
      remarks = mgoGetStringOrNone(doc,"remarks"),
      podtl = mgoGetArrayOrNone(doc,"podtl")
  case class PODTL(
                    item: String,
                    price: Double,
                    qty: Int,
                    packing: Option[String],
                    payTerm: Option[String]
  def toPODTL(podtl: Document): PODTL = {
      item = podtl.getString("item"),
      price = podtl.getDouble("price"),
      qty = podtl.getInteger("qty"),
      packing = mgoGetStringOrNone(podtl,"packing"),
      payTerm = mgoGetStringOrNone(podtl,"payterm")

  def showPO(po: PO) = {
    println(s"po number: ${po.ponum}")
    println(s"po date: ${mgoDateToString(po.podate,"yyyy-MM-dd")}")
    println(s"vendor: ${po.vendor}")
    if (po.remarks != None)
      println(s"remarks: ${po.remarks.get}")
    po.podtl match {
      case Some(barr) =>
          .map { dc => toPODTL(dc)}
          .foreach { doc: PODTL =>
            print(s"==>Item: ${doc.item} ")
            print(s"price: ${doc.price} ")
            print(s"qty: ${doc.qty} ")
            doc.packing.foreach(pk => print(s"packing: ${pk} "))
            doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
      case _ =>
  import org.mongodb.scala.model.Projections._
  import MongoActionStream._
  import MGOEngine._
  import akka.stream.scaladsl.{Sink, Source}

  val proj: MGOFilterResult = find => find.projection(exclude("handler","_id"))
  val ctx = MGOContext("testdb","po").setCommand(
    DocumentStream(filter = None, andThen = Some(proj)))

  val stream = mongoStream(ctx).map(toPO).runWith(Sink.foreach(showPO))


  import jdbcengine._
  import JDBCEngine._
  import scalikejdbc._

  case class DataRow (
                       rowid: Long,
                       measureid: Long,
                       state: String,
                       county: String,
                       year: Int,
                       value: Int
  val toRow: WrappedResultSet => DataRow = rs => DataRow(
    rowid = rs.long("ROWID"),
    measureid = rs.long("MEASUREID"),
    state = rs.string("STATENAME"),
    county = rs.string("COUNTYNAME"),
    year = rs.int("REPORTYEAR"),
    value = rs.int("VALUE")

  //construct the context
  val h2ctx = JDBCQueryContext[DataRow](
    dbName = 'h2,
    statement = "select * from AQMRPT",
    extractor = toRow

  //source from h2 database
  val jdbcSource = jdbcAkkaStream(h2ctx)

  //document converter
  def rowToDoc: DataRow => Document = row => Document (
    "rowid" -> row.rowid,
    "measureid" ->  row.measureid,
    "state" ->  row.state,
    "county" ->  row.county,
    "year" ->  row.year,
    "value" ->  row.value
  def docToRow: Document => DataRow = doc => DataRow (
    rowid = doc.getLong("rowid"),
    measureid = doc.getLong("measureid"),
    state = doc.getString("state"),
    county = doc.getString("county"),
    year = doc.getInteger("year"),
    value = doc.getInteger("value")
  //setup context
  val mgoctx = StreamingInsert("testdb","members",rowToDoc)
  val mgoActionStream = new MongoActionStream.InsertAction[DataRow](mgoctx)
  val mgoActionFlow = mgoActionStream.performOnRow.map(docToRow)
  val sink = Sink.foreach[DataRow]{ r =>
    println(s"${r.rowid} ${r.state} ${r.county} ${r.year} ${r.value}")

  //config jdbc drivers

  val sts = jdbcSource.take(100).via(mgoActionFlow).to(sink).run()

  val mgoCtxPrint = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None))


  import com.datastax.driver.core._
  import cassandraengine._
  import CQLEngine._
  import org.mongodb.scala.model.Filters._

  //data row converter
  val cqlToDataRow = (rs: Row) => DataRow(
    rowid = rs.getLong("ROWID"),
    measureid = rs.getLong("MEASUREID"),
    state = rs.getString("STATENAME"),
    county = rs.getString("COUNTYNAME"),
    year = rs.getInt("REPORTYEAR"),
    value = rs.getInt("VALUE")

 import org.bson.conversions._
  import org.mongodb.scala.model.Updates._

  implicit val session = Cluster.builder

  //setup context
  val cqlCtx = CQLQueryContext("select * from testdb.aqmrpt",cqlToDataRow)
  //construct source
  val cqlSource = cassandraStream(cqlCtx)

  def toFilter: DataRow => Bson = row => {
    and(equal("rowid",row.rowid), lt("value",10))
  def toUpdate: DataRow => Bson = row => {
    set("value" , row.value * 10)
  val mgoCtxUpdate = StreamingUpdate("testdb","members",toFilter,toUpdate)
  val mgoUpdateFlow = new MongoActionStream.UpdateAction[DataRow](mgoCtxUpdate)

  import org.bson.conversions._
  import org.mongodb.scala.model.Filters._
  def toDelFilter: DataRow => Bson = row => and(equal("rowid",row.rowid),equal("value",10))

  val mgoCtxDel = StreamingDelete[DataRow]("testdb","members",toDelFilter)
  val mgoDelFlow = new DeleteAction[DataRow](mgoCtxDel)
  val mgoCtxSrc = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None))

  import org.mongodb.scala.model.Sorts._
  val sortDsc: MGOFilterResult = find => find.sort(descending("rowid"))
  val mgoCtxShow = MGOContext("testdb","members").setCommand(
    DocumentStream(filter = None, andThen = Some(sortDsc)))








